From b15f695976d21300beabc3e0ecef87c1a0b4dc4c Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 21 Sep 2009 17:20:13 +0200 Subject: different fixes to req/rep --- src/socket_base.cpp | 40 +++++++++++++--------------------------- 1 file changed, 13 insertions(+), 27 deletions(-) (limited to 'src/socket_base.cpp') diff --git a/src/socket_base.cpp b/src/socket_base.cpp index c669e04..0452993 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -38,9 +38,8 @@ #include "pgm_sender.hpp" #include "pgm_receiver.hpp" -zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) : +zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), - type (type_), pending_term_acks (0), ticks (0), app_thread (parent_), @@ -137,14 +136,14 @@ int zmq::socket_base_t::connect (const char *addr_) pipe_t *out_pipe = NULL; // Create inbound pipe, if required. - if (xrequires_in ()) { + if (options.requires_in) { in_pipe = new pipe_t (this, session, options.hwm, options.lwm); zmq_assert (in_pipe); } // Create outbound pipe, if required. - if (xrequires_out ()) { + if (options.requires_out) { out_pipe = new pipe_t (session, this, options.hwm, options.lwm); zmq_assert (out_pipe); } @@ -163,8 +162,9 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "tcp") { - // Create the connecter object. Supply it with the session name so that - // it can bind the new connection to the session once it is established. + // Create the connecter object. Supply it with the session name + // so that it can bind the new connection to the session once + // it is established. zmq_connecter_t *connecter = new zmq_connecter_t ( choose_io_thread (options.affinity), this, options, session_name.c_str (), false); @@ -184,7 +184,7 @@ int zmq::socket_base_t::connect (const char *addr_) // If the socket type requires bi-directional communication // multicast is not an option (it is uni-directional). - if (xrequires_in () && xrequires_out ()) { + if (options.requires_in && options.requires_out) { errno = EFAULT; return -1; } @@ -194,11 +194,9 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "udp") udp_encapsulation = true; - switch (type) { + if (options.requires_out) { - // PGM sender. - case ZMQ_PUB: - { + // PGM sender. pgm_sender_t *pgm_sender = new pgm_sender_t (choose_io_thread (options.affinity), options, session_name.c_str ()); @@ -212,15 +210,10 @@ int zmq::socket_base_t::connect (const char *addr_) // Reserve a sequence number for following 'attach' command. session->inc_seqnum (); send_attach (session, pgm_sender); - - pgm_sender = NULL; - - break; } + else if (options.requires_in) { - // PGM receiver. - case ZMQ_SUB: - { + // PGM receiver. pgm_receiver_t *pgm_receiver = new pgm_receiver_t (choose_io_thread (options.affinity), options, session_name.c_str ()); @@ -234,16 +227,9 @@ int zmq::socket_base_t::connect (const char *addr_) // Reserve a sequence number for following 'attach' command. session->inc_seqnum (); send_attach (session, pgm_receiver); - - pgm_receiver = NULL; - - break; - } - - default: - errno = EINVAL; - return -1; } + else + zmq_assert (false); return 0; } -- cgit v1.2.3