summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-09-21 17:20:13 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-09-21 17:20:13 +0200
commitb15f695976d21300beabc3e0ecef87c1a0b4dc4c (patch)
tree09513a17251be5bc8d132b8d00cbf2b893bcf57a /src/socket_base.cpp
parentcb1b6fe32cbf3c7cf5961bb4156f2de743693a3a (diff)
different fixes to req/rep
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp40
1 files changed, 13 insertions, 27 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index c669e04..0452993 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -38,9 +38,8 @@
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
-zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) :
+zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
- type (type_),
pending_term_acks (0),
ticks (0),
app_thread (parent_),
@@ -137,14 +136,14 @@ int zmq::socket_base_t::connect (const char *addr_)
pipe_t *out_pipe = NULL;
// Create inbound pipe, if required.
- if (xrequires_in ()) {
+ if (options.requires_in) {
in_pipe = new pipe_t (this, session, options.hwm, options.lwm);
zmq_assert (in_pipe);
}
// Create outbound pipe, if required.
- if (xrequires_out ()) {
+ if (options.requires_out) {
out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
zmq_assert (out_pipe);
}
@@ -163,8 +162,9 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "tcp") {
- // Create the connecter object. Supply it with the session name so that
- // it can bind the new connection to the session once it is established.
+ // Create the connecter object. Supply it with the session name
+ // so that it can bind the new connection to the session once
+ // it is established.
zmq_connecter_t *connecter = new zmq_connecter_t (
choose_io_thread (options.affinity), this, options,
session_name.c_str (), false);
@@ -184,7 +184,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// If the socket type requires bi-directional communication
// multicast is not an option (it is uni-directional).
- if (xrequires_in () && xrequires_out ()) {
+ if (options.requires_in && options.requires_out) {
errno = EFAULT;
return -1;
}
@@ -194,11 +194,9 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "udp")
udp_encapsulation = true;
- switch (type) {
+ if (options.requires_out) {
- // PGM sender.
- case ZMQ_PUB:
- {
+ // PGM sender.
pgm_sender_t *pgm_sender =
new pgm_sender_t (choose_io_thread (options.affinity), options,
session_name.c_str ());
@@ -212,15 +210,10 @@ int zmq::socket_base_t::connect (const char *addr_)
// Reserve a sequence number for following 'attach' command.
session->inc_seqnum ();
send_attach (session, pgm_sender);
-
- pgm_sender = NULL;
-
- break;
}
+ else if (options.requires_in) {
- // PGM receiver.
- case ZMQ_SUB:
- {
+ // PGM receiver.
pgm_receiver_t *pgm_receiver =
new pgm_receiver_t (choose_io_thread (options.affinity), options,
session_name.c_str ());
@@ -234,16 +227,9 @@ int zmq::socket_base_t::connect (const char *addr_)
// Reserve a sequence number for following 'attach' command.
session->inc_seqnum ();
send_attach (session, pgm_receiver);
-
- pgm_receiver = NULL;
-
- break;
- }
-
- default:
- errno = EINVAL;
- return -1;
}
+ else
+ zmq_assert (false);
return 0;
}