From b15f695976d21300beabc3e0ecef87c1a0b4dc4c Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 21 Sep 2009 17:20:13 +0200 Subject: different fixes to req/rep --- src/dispatcher.cpp | 8 ++++---- src/fd_signaler.cpp | 22 +++++++++++----------- src/options.cpp | 4 +++- src/options.hpp | 5 +++++ src/p2p.cpp | 14 +++----------- src/p2p.hpp | 2 -- src/pub.cpp | 14 +++----------- src/pub.hpp | 2 -- src/rep.cpp | 14 +++----------- src/rep.hpp | 2 -- src/req.cpp | 14 +++----------- src/req.hpp | 2 -- src/session.cpp | 36 ++++++++++++++++++++++++------------ src/socket_base.cpp | 40 +++++++++++++--------------------------- src/socket_base.hpp | 13 ++++--------- src/sub.cpp | 14 +++----------- src/sub.hpp | 2 -- 17 files changed, 79 insertions(+), 129 deletions(-) (limited to 'src') diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 5e7ea46..ef2f7c1 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -88,10 +88,6 @@ int zmq::dispatcher_t::term () zmq::dispatcher_t::~dispatcher_t () { - // Close all application theads, sockets, io_objects etc. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - delete app_threads [i]; - // Ask I/O threads to terminate. If stop signal wasn't sent to I/O // thread subsequent invocation of destructor would hang-up. for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) @@ -101,6 +97,10 @@ zmq::dispatcher_t::~dispatcher_t () for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) delete io_threads [i]; + // Close all application theads, sockets, io_objects etc. + for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) + delete app_threads [i]; + // Deallocate all the orphaned pipes. while (!pipes.empty ()) delete *pipes.begin (); diff --git a/src/fd_signaler.cpp b/src/fd_signaler.cpp index 8c71356..b67b27b 100644 --- a/src/fd_signaler.cpp +++ b/src/fd_signaler.cpp @@ -222,22 +222,22 @@ void zmq::fd_signaler_t::signal (int signal_) uint64_t zmq::fd_signaler_t::poll () { - // If there are signals available, return straight away. - uint64_t signals = check (); - if (signals) - return signals; + unsigned char buffer [64]; + ssize_t nbytes = recv (r, buffer, 64, 0); + zmq_assert (nbytes != -1); - // If there are no signals, wait until at least one signal arrives. - unsigned char sig; - ssize_t nbytes = recv (r, &sig, 1, 0); - errno_assert (nbytes != -1); - return uint64_t (1) << sig; + uint64_t signals = 0; + for (int pos = 0; pos != nbytes; pos ++) { + zmq_assert (buffer [pos] < 64); + signals |= (uint64_t (1) << (buffer [pos])); + } + return signals; } uint64_t zmq::fd_signaler_t::check () { - unsigned char buffer [32]; - ssize_t nbytes = recv (r, buffer, 32, MSG_DONTWAIT); + unsigned char buffer [64]; + ssize_t nbytes = recv (r, buffer, 64, MSG_DONTWAIT); if (nbytes == -1 && errno == EAGAIN) return 0; zmq_assert (nbytes != -1); diff --git a/src/options.cpp b/src/options.cpp index b0e6e6e..8d3de45 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -29,7 +29,9 @@ zmq::options_t::options_t () : affinity (0), rate (100), recovery_ivl (10), - use_multicast_loop (false) + use_multicast_loop (false), + requires_in (false), + requires_out (false) { } diff --git a/src/options.hpp b/src/options.hpp index cde144c..a52fdeb 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -48,6 +48,11 @@ namespace zmq // Enable multicast loopback. Default disabled (false). bool use_multicast_loop; + + // These options are never set by the user directly. Instead they are + // provided by the specific socket type. + bool requires_in; + bool requires_out; }; } diff --git a/src/p2p.cpp b/src/p2p.cpp index 537f3ce..8ef27a7 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -23,24 +23,16 @@ #include "err.hpp" zmq::p2p_t::p2p_t (class app_thread_t *parent_) : - socket_base_t (parent_, ZMQ_P2P) + socket_base_t (parent_) { + options.requires_in = true; + options.requires_out = true; } zmq::p2p_t::~p2p_t () { } -bool zmq::p2p_t::xrequires_in () -{ - return true; -} - -bool zmq::p2p_t::xrequires_out () -{ - return true; -} - void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_) { diff --git a/src/p2p.hpp b/src/p2p.hpp index 84790a1..a3dae31 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -33,8 +33,6 @@ namespace zmq ~p2p_t (); // Overloads of functions from socket_base_t. - bool xrequires_in (); - bool xrequires_out (); void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); diff --git a/src/pub.cpp b/src/pub.cpp index 020d789..faaa9aa 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -25,8 +25,10 @@ #include "pipe.hpp" zmq::pub_t::pub_t (class app_thread_t *parent_) : - socket_base_t (parent_, ZMQ_PUB) + socket_base_t (parent_) { + options.requires_in = false; + options.requires_out = true; } zmq::pub_t::~pub_t () @@ -36,16 +38,6 @@ zmq::pub_t::~pub_t () out_pipes.clear (); } -bool zmq::pub_t::xrequires_in () -{ - return false; -} - -bool zmq::pub_t::xrequires_out () -{ - return true; -} - void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_) { diff --git a/src/pub.hpp b/src/pub.hpp index 8255c6f..c5eeac1 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -34,8 +34,6 @@ namespace zmq ~pub_t (); // Overloads of functions from socket_base_t. - bool xrequires_in (); - bool xrequires_out (); void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); diff --git a/src/rep.cpp b/src/rep.cpp index 2fbb66c..d586988 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -24,28 +24,20 @@ #include "pipe.hpp" zmq::rep_t::rep_t (class app_thread_t *parent_) : - socket_base_t (parent_, ZMQ_REP), + socket_base_t (parent_), active (0), current (0), waiting_for_reply (false), reply_pipe (NULL) { + options.requires_in = true; + options.requires_out = true; } zmq::rep_t::~rep_t () { } -bool zmq::rep_t::xrequires_in () -{ - return true; -} - -bool zmq::rep_t::xrequires_out () -{ - return true; -} - void zmq::rep_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_) { diff --git a/src/rep.hpp b/src/rep.hpp index 6e55f47..07bdaf4 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -34,8 +34,6 @@ namespace zmq ~rep_t (); // Overloads of functions from socket_base_t. - bool xrequires_in (); - bool xrequires_out (); void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); diff --git a/src/req.cpp b/src/req.cpp index 05629df..9b4dffa 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -24,28 +24,20 @@ #include "pipe.hpp" zmq::req_t::req_t (class app_thread_t *parent_) : - socket_base_t (parent_, ZMQ_REQ), + socket_base_t (parent_), current (0), waiting_for_reply (false), reply_pipe_active (false), reply_pipe (NULL) { + options.requires_in = true; + options.requires_out = true; } zmq::req_t::~req_t () { } -bool zmq::req_t::xrequires_in () -{ - return true; -} - -bool zmq::req_t::xrequires_out () -{ - return true; -} - void zmq::req_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_) { diff --git a/src/req.hpp b/src/req.hpp index 9158fbe..95ddf0d 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -34,8 +34,6 @@ namespace zmq ~req_t (); // Overloads of functions from socket_base_t. - bool xrequires_in (); - bool xrequires_out (); void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); diff --git a/src/session.cpp b/src/session.cpp index b829ae9..eb0a963 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -43,7 +43,7 @@ zmq::session_t::~session_t () bool zmq::session_t::read (::zmq_msg_t *msg_) { - if (!active) + if (!in_pipe || !active) return false; return in_pipe->read (msg_); @@ -51,8 +51,9 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) bool zmq::session_t::write (::zmq_msg_t *msg_) { - if (!out_pipe) - return true; + // The communication is unidirectional. + // We don't expect any message to arrive. + zmq_assert (out_pipe); if (out_pipe->write (msg_)) { zmq_msg_init (msg_); @@ -136,15 +137,26 @@ void zmq::session_t::process_plug () // already. Otherwise, it's being created by the listener and the pipes // are yet to be created. if (!in_pipe && !out_pipe) { - pipe_t *inbound = new pipe_t (this, owner, options.hwm, options.lwm); - zmq_assert (inbound); - in_pipe = &inbound->reader; - in_pipe->set_endpoint (this); - pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm); - zmq_assert (outbound); - out_pipe = &outbound->writer; - out_pipe->set_endpoint (this); - send_bind (owner, this, &outbound->reader, &inbound->writer); + + pipe_t *inbound = NULL; + pipe_t *outbound = NULL; + + if (options.requires_out) { + inbound = new pipe_t (this, owner, options.hwm, options.lwm); + zmq_assert (inbound); + in_pipe = &inbound->reader; + in_pipe->set_endpoint (this); + } + + if (options.requires_in) { + outbound = new pipe_t (owner, this, options.hwm, options.lwm); + zmq_assert (outbound); + out_pipe = &outbound->writer; + out_pipe->set_endpoint (this); + } + + send_bind (owner, this, outbound ? &outbound->reader : NULL, + inbound ? &inbound->writer : NULL); } owned_t::process_plug (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index c669e04..0452993 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -38,9 +38,8 @@ #include "pgm_sender.hpp" #include "pgm_receiver.hpp" -zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) : +zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), - type (type_), pending_term_acks (0), ticks (0), app_thread (parent_), @@ -137,14 +136,14 @@ int zmq::socket_base_t::connect (const char *addr_) pipe_t *out_pipe = NULL; // Create inbound pipe, if required. - if (xrequires_in ()) { + if (options.requires_in) { in_pipe = new pipe_t (this, session, options.hwm, options.lwm); zmq_assert (in_pipe); } // Create outbound pipe, if required. - if (xrequires_out ()) { + if (options.requires_out) { out_pipe = new pipe_t (session, this, options.hwm, options.lwm); zmq_assert (out_pipe); } @@ -163,8 +162,9 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "tcp") { - // Create the connecter object. Supply it with the session name so that - // it can bind the new connection to the session once it is established. + // Create the connecter object. Supply it with the session name + // so that it can bind the new connection to the session once + // it is established. zmq_connecter_t *connecter = new zmq_connecter_t ( choose_io_thread (options.affinity), this, options, session_name.c_str (), false); @@ -184,7 +184,7 @@ int zmq::socket_base_t::connect (const char *addr_) // If the socket type requires bi-directional communication // multicast is not an option (it is uni-directional). - if (xrequires_in () && xrequires_out ()) { + if (options.requires_in && options.requires_out) { errno = EFAULT; return -1; } @@ -194,11 +194,9 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "udp") udp_encapsulation = true; - switch (type) { + if (options.requires_out) { - // PGM sender. - case ZMQ_PUB: - { + // PGM sender. pgm_sender_t *pgm_sender = new pgm_sender_t (choose_io_thread (options.affinity), options, session_name.c_str ()); @@ -212,15 +210,10 @@ int zmq::socket_base_t::connect (const char *addr_) // Reserve a sequence number for following 'attach' command. session->inc_seqnum (); send_attach (session, pgm_sender); - - pgm_sender = NULL; - - break; } + else if (options.requires_in) { - // PGM receiver. - case ZMQ_SUB: - { + // PGM receiver. pgm_receiver_t *pgm_receiver = new pgm_receiver_t (choose_io_thread (options.affinity), options, session_name.c_str ()); @@ -234,16 +227,9 @@ int zmq::socket_base_t::connect (const char *addr_) // Reserve a sequence number for following 'attach' command. session->inc_seqnum (); send_attach (session, pgm_receiver); - - pgm_receiver = NULL; - - break; - } - - default: - errno = EINVAL; - return -1; } + else + zmq_assert (false); return 0; } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 120c932..bba27c3 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -40,7 +40,7 @@ namespace zmq { public: - socket_base_t (class app_thread_t *parent_, int type_); + socket_base_t (class app_thread_t *parent_); // Interface for communication with the API layer. int setsockopt (int option_, const void *optval_, @@ -73,8 +73,6 @@ namespace zmq virtual ~socket_base_t (); // Pipe management is done by individual socket types. - virtual bool xrequires_in () = 0; - virtual bool xrequires_out () = 0; virtual void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_) = 0; virtual void xdetach_inpipe (class reader_t *pipe_) = 0; @@ -89,6 +87,9 @@ namespace zmq virtual int xflush () = 0; virtual int xrecv (struct zmq_msg_t *msg_, int options_) = 0; + // Socket options. + options_t options; + private: // Handlers for incoming commands. @@ -98,9 +99,6 @@ namespace zmq void process_term_req (class owned_t *object_); void process_term_ack (); - // Type of the socket. - int type; - // List of all I/O objects owned by this socket. The socket is // responsible for deallocating them before it quits. typedef std::set io_objects_t; @@ -116,9 +114,6 @@ namespace zmq // Application thread the socket lives in. class app_thread_t *app_thread; - // Socket options. - options_t options; - // If true, socket is already shutting down. No new work should be // started. bool shutting_down; diff --git a/src/sub.cpp b/src/sub.cpp index 73510c6..66aa8fd 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -24,11 +24,13 @@ #include "pipe.hpp" zmq::sub_t::sub_t (class app_thread_t *parent_) : - socket_base_t (parent_, ZMQ_SUB), + socket_base_t (parent_), active (0), current (0), all_count (0) { + options.requires_in = true; + options.requires_out = false; } zmq::sub_t::~sub_t () @@ -38,16 +40,6 @@ zmq::sub_t::~sub_t () in_pipes.clear (); } -bool zmq::sub_t::xrequires_in () -{ - return true; -} - -bool zmq::sub_t::xrequires_out () -{ - return false; -} - void zmq::sub_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_) { diff --git a/src/sub.hpp b/src/sub.hpp index 29da27a..180391d 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -39,8 +39,6 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - bool xrequires_in (); - bool xrequires_out (); void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); -- cgit v1.2.3