From d13933bc62fce71b5a58118020e0dd3776e79aa9 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 11 Aug 2010 14:09:56 +0200 Subject: I/O object hierarchy implemented --- src/socket_base.cpp | 435 +++++++++++++++------------------------------------- 1 file changed, 124 insertions(+), 311 deletions(-) (limited to 'src/socket_base.cpp') diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 5d3175a..903e781 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -38,9 +38,8 @@ #include "zmq_listener.hpp" #include "zmq_connecter.hpp" #include "io_thread.hpp" -#include "session.hpp" +#include "connect_session.hpp" #include "config.hpp" -#include "owned.hpp" #include "pipe.hpp" #include "err.hpp" #include "ctx.hpp" @@ -109,20 +108,20 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, } zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) : - object_t (parent_, slot_), + own_t (parent_, slot_), zombie (false), last_processing_time (0), - pending_term_acks (0), ticks (0), - rcvmore (false), - sent_seqnum (0), - processed_seqnum (0), - next_ordinal (1) + rcvmore (false) { } zmq::socket_base_t::~socket_base_t () { + // Check whether there are no session leaks. + sessions_sync.lock (); + zmq_assert (sessions.empty ()); + sessions_sync.unlock (); } zmq::signaler_t *zmq::socket_base_t::get_signaler () @@ -139,6 +138,46 @@ void zmq::socket_base_t::stop () send_stop (); } +int zmq::socket_base_t::check_protocol (const std::string &protocol_) +{ + // First check out whether the protcol is something we are aware of. + if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && + protocol_ != "pgm" && protocol_ != "epgm") { + errno = EPROTONOSUPPORT; + return -1; + } + + // If 0MQ is not compiled with OpenPGM, pgm and epgm transports + // are not avaialble. +#if !defined ZMQ_HAVE_OPENPGM + if (protocol_ == "pgm" || protocol_ == "epgm") { + errno = EPROTONOSUPPORT; + return -1; + } +#endif + + // IPC transport is not available on Windows and OpenVMS. +#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS + if (protocol_ != "ipc") { + // Unknown protocol. + errno = EPROTONOSUPPORT; + return -1; + } +#endif + + // Check whether socket type and transport protocol match. + // Specifically, multicast protocols can't be combined with + // bi-directional messaging patterns (socket types). + if ((protocol_ == "pgm" || protocol_ == "epgm") && + options.requires_in && options.requires_out) { + errno = ENOCOMPATPROTO; + return -1; + } + + // Protocol is available. + return 0; +} + void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_) { @@ -225,56 +264,48 @@ int zmq::socket_base_t::bind (const char *addr_) } // Parse addr_ string. - std::string addr_type; - std::string addr_args; - - std::string addr (addr_); - std::string::size_type pos = addr.find ("://"); - - if (pos == std::string::npos) { - errno = EINVAL; - return -1; + std::string protocol; + std::string address; + { + std::string addr (addr_); + std::string::size_type pos = addr.find ("://"); + if (pos == std::string::npos) { + errno = EINVAL; + return -1; + } + protocol = addr.substr (0, pos); + address = addr.substr (pos + 3); } - addr_type = addr.substr (0, pos); - addr_args = addr.substr (pos + 3); - - if (addr_type == "inproc") - return register_endpoint (addr_args.c_str (), this); - - if (addr_type == "tcp" || addr_type == "ipc") { + int rc = check_protocol (protocol); + if (rc != 0) + return -1; -#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS - if (addr_type == "ipc") { - errno = EPROTONOSUPPORT; - return -1; - } -#endif + if (protocol == "inproc") + return register_endpoint (address.c_str (), this); + if (protocol == "tcp" || protocol == "ipc") { zmq_listener_t *listener = new (std::nothrow) zmq_listener_t ( choose_io_thread (options.affinity), this, options); zmq_assert (listener); - int rc = listener->set_address (addr_type.c_str(), addr_args.c_str ()); + int rc = listener->set_address (protocol.c_str(), address.c_str ()); if (rc != 0) { delete listener; return -1; } + launch_child (listener); - send_plug (listener); - send_own (this, listener); return 0; } -#if defined ZMQ_HAVE_OPENPGM - if (addr_type == "pgm" || addr_type == "epgm") { - // In the case of PGM bind behaves the same like connect. + if (protocol == "pgm" || protocol == "epgm") { + + // For convenience's sake, bind can be used interchageable with + // connect for PGM and EPGM transports. return connect (addr_); } -#endif - // Unknown protocol. - errno = EPROTONOSUPPORT; - return -1; + zmq_assert (false); } int zmq::socket_base_t::connect (const char *addr_) @@ -285,28 +316,31 @@ int zmq::socket_base_t::connect (const char *addr_) } // Parse addr_ string. - std::string addr_type; - std::string addr_args; - - std::string addr (addr_); - std::string::size_type pos = addr.find ("://"); - - if (pos == std::string::npos) { - errno = EINVAL; - return -1; + std::string protocol; + std::string address; + { + std::string addr (addr_); + std::string::size_type pos = addr.find ("://"); + if (pos == std::string::npos) { + errno = EINVAL; + return -1; + } + protocol = addr.substr (0, pos); + address = addr.substr (pos + 3); } - addr_type = addr.substr (0, pos); - addr_args = addr.substr (pos + 3); + int rc = check_protocol (protocol); + if (rc != 0) + return -1; - if (addr_type == "inproc") { + if (protocol == "inproc") { // TODO: inproc connect is specific with respect to creating pipes // as there's no 'reconnect' functionality implemented. Once that // is in place we should follow generic pipe creation algorithm. // Find the peer socket. - socket_base_t *peer = find_endpoint (addr_args.c_str ()); + socket_base_t *peer = find_endpoint (address.c_str ()); if (!peer) return -1; @@ -329,18 +363,18 @@ int zmq::socket_base_t::connect (const char *addr_) attach_pipes (inpipe_reader, outpipe_writer, blob_t ()); // Attach the pipes to the peer socket. Note that peer's seqnum - // was incremented in find_endpoint function. The callee is notified - // about the fact via the last parameter. + // was incremented in find_endpoint function. We don't need it + // increased here. send_bind (peer, outpipe_reader, inpipe_writer, options.identity, false); return 0; } - // Create unnamed session. - io_thread_t *io_thread = choose_io_thread (options.affinity); - session_t *session = new (std::nothrow) session_t (io_thread, - this, options); + // Create session. + connect_session_t *session = new (std::nothrow) connect_session_t ( + choose_io_thread (options.affinity), this, options, + protocol.c_str (), address.c_str ()); zmq_assert (session); // If 'immediate connect' feature is required, we'll create the pipes @@ -370,95 +404,10 @@ int zmq::socket_base_t::connect (const char *addr_) session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ()); } - // Activate the session. - send_plug (session); - send_own (this, session); - - if (addr_type == "tcp" || addr_type == "ipc") { - -#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS - // Windows named pipes are not compatible with Winsock API. - // There's no UNIX domain socket implementation on OpenVMS. - if (addr_type == "ipc") { - errno = EPROTONOSUPPORT; - return -1; - } -#endif - - // Create the connecter object. Supply it with the session name - // so that it can bind the new connection to the session once - // it is established. - zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( - choose_io_thread (options.affinity), this, options, - session->get_ordinal (), false); - zmq_assert (connecter); - int rc = connecter->set_address (addr_type.c_str(), addr_args.c_str ()); - if (rc != 0) { - delete connecter; - return -1; - } - send_plug (connecter); - send_own (this, connecter); - - return 0; - } - -#if defined ZMQ_HAVE_OPENPGM - if (addr_type == "pgm" || addr_type == "epgm") { - - // If the socket type requires bi-directional communication - // multicast is not an option (it is uni-directional). - if (options.requires_in && options.requires_out) { - errno = ENOCOMPATPROTO; - return -1; - } - - // For epgm, pgm transport with UDP encapsulation is used. - bool udp_encapsulation = (addr_type == "epgm"); - - // At this point we'll create message pipes to the session straight - // away. There's no point in delaying it as no concept of 'connect' - // exists with PGM anyway. - if (options.requires_out) { - - // PGM sender. - pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( - choose_io_thread (options.affinity), options); - zmq_assert (pgm_sender); + // Activate the session. Make it a child of this socket. + launch_child (session); - int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ()); - if (rc != 0) { - delete pgm_sender; - return -1; - } - - send_attach (session, pgm_sender, blob_t ()); - } - else if (options.requires_in) { - - // PGM receiver. - pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( - choose_io_thread (options.affinity), options); - zmq_assert (pgm_receiver); - - int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ()); - if (rc != 0) { - delete pgm_receiver; - return -1; - } - - send_attach (session, pgm_receiver, blob_t ()); - } - else - zmq_assert (false); - - return 0; - } -#endif - - // Unknown protoco. - errno = EPROTONOSUPPORT; - return -1; + return 0; } int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) @@ -587,72 +536,23 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::close () { + zmq_assert (!zombie); + // Socket becomes a zombie. From now on all new arrived pipes (bind - // command) and I/O objects (own command) are immediately terminated. - // Also, any further requests form I/O object termination are ignored - // (we are going to shut them down anyway -- this way we assure that - // we do so once only). + // command) are immediately terminated. zombie = true; - // Unregister all inproc endpoints associated with this socket. - // Doing this we make sure that no new pipes from other sockets (inproc) - // will be initiated. However, there may be some inproc pipes already - // on the fly, but not yet received by this socket. To get finished - // with them we'll do the subsequent waiting from on-the-fly commands. - // This should happen very quickly. There's no way to block here for - // extensive period of time. - unregister_endpoints (this); - while (processed_seqnum != sent_seqnum.get ()) - process_commands (true, false); - // TODO: My feeling is that the above has to be done in the dezombification - // loop, otherwise we may end up with number of i/o object dropping to zero - // even though there are more i/o objects on the way. - - // The above process ensures that only pipes that will arrive from now on - // are those initiated by sessions. These in turn have a nice property of - // not arriving totally asynchronously. When a session -- being an I/O - // object -- acknowledges its termination we are 100% sure that we'll get - // no new pipe from it. - - // Start termination of all the pipes presently associated with the socket. - xterm_pipes (); - - // Send termination request to all associated I/O objects. - // Start waiting for the acks. Note that the actual waiting is not done - // in this function. Rather it is done in delayed manner as socket is - // being dezombified. The reason is that I/O object shutdown can take - // considerable amount of time in case there's still a lot of data to - // push to the network. - for (io_objects_t::iterator it = io_objects.begin (); - it != io_objects.end (); it++) - send_term (*it); - pending_term_acks += io_objects.size (); - io_objects.clear (); - - // Note that new I/O objects may arrive even in zombie state (say new - // session initiated by a listener object), however, in such case number - // of pending acks never drops to zero. Here's the scenario: We have an - // pending ack for the listener object. Then 'own' commands arrives from - // the listener notifying the socket about new session. It immediately - // triggers termination request and number of of pending acks if - // incremented. Then term_acks arrives from the listener. Number of pending - // acks is decremented. Later on, the session itself will ack its - // termination. During the process, number of pending acks never dropped - // to zero and thus the socket remains safely in the zombie state. - - // Transfer the ownership of the socket from this application thread + // Start termination of associated I/O object hierarchy. + terminate (); + + // Ask context to zombify this socket. In other words, transfer + // the ownership of the socket from this application thread // to the context which will take care of the rest of shutdown process. - zombify (this); + zombify_socket (this); return 0; } -void zmq::socket_base_t::inc_seqnum () -{ - // Be aware: This function may be called from a different thread! - sent_seqnum.add (1); -} - bool zmq::socket_base_t::has_in () { return xhas_in (); @@ -667,7 +567,7 @@ bool zmq::socket_base_t::register_session (const blob_t &peer_identity_, session_t *session_) { sessions_sync.lock (); - bool registered = named_sessions.insert ( + bool registered = sessions.insert ( std::make_pair (peer_identity_, session_)).second; sessions_sync.unlock (); return registered; @@ -676,17 +576,17 @@ bool zmq::socket_base_t::register_session (const blob_t &peer_identity_, void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_) { sessions_sync.lock (); - named_sessions_t::iterator it = named_sessions.find (peer_identity_); - zmq_assert (it != named_sessions.end ()); - named_sessions.erase (it); + sessions_t::iterator it = sessions.find (peer_identity_); + zmq_assert (it != sessions.end ()); + sessions.erase (it); sessions_sync.unlock (); } zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_) { sessions_sync.lock (); - named_sessions_t::iterator it = named_sessions.find (peer_identity_); - if (it == named_sessions.end ()) { + sessions_t::iterator it = sessions.find (peer_identity_); + if (it == sessions.end ()) { sessions_sync.unlock (); return NULL; } @@ -699,74 +599,16 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_) return session; } -uint64_t zmq::socket_base_t::register_session (session_t *session_) -{ - sessions_sync.lock (); - uint64_t ordinal = next_ordinal; - next_ordinal++; - unnamed_sessions.insert (std::make_pair (ordinal, session_)); - sessions_sync.unlock (); - return ordinal; -} - -void zmq::socket_base_t::unregister_session (uint64_t ordinal_) -{ - sessions_sync.lock (); - unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_); - zmq_assert (it != unnamed_sessions.end ()); - unnamed_sessions.erase (it); - sessions_sync.unlock (); -} - -zmq::session_t *zmq::socket_base_t::find_session (uint64_t ordinal_) -{ - sessions_sync.lock (); - - unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_); - if (it == unnamed_sessions.end ()) { - sessions_sync.unlock (); - return NULL; - } - session_t *session = it->second; - - // Prepare the session for subsequent attach command. - session->inc_seqnum (); - - sessions_sync.unlock (); - return session; -} - bool zmq::socket_base_t::dezombify () { zmq_assert (zombie); // Process any commands from other threads/sockets that may be available - // at the moment. + // at the moment. Ultimately, socket will be destroyed. process_commands (false, false); - // If there are no more pipes attached and there are no more I/O objects - // owned by the socket, we can kill the zombie. - if (!pending_term_acks && !xhas_pipes ()) { - - // If all objects have acknowledged their termination there should - // definitely be no I/O object remaining in the list. - zmq_assert (io_objects.empty ()); - - // Check whether there are no session leaks. - sessions_sync.lock (); - zmq_assert (named_sessions.empty ()); - zmq_assert (unnamed_sessions.empty ()); - sessions_sync.unlock (); - - // Deallocate all the resources tied to this socket. - delete this; - - // Notify the caller about the fact that the zombie is finally dead. - return true; - } - - // The zombie remains undead. - return false; +// TODO: ??? + return true; } void zmq::socket_base_t::process_commands (bool block_, bool throttle_) @@ -828,19 +670,6 @@ void zmq::socket_base_t::process_stop () zombie = true; } -void zmq::socket_base_t::process_own (owned_t *object_) -{ - // If the socket is already being shut down, new owned objects are - // immediately asked to terminate. - if (zombie) { - send_term (object_); - pending_term_acks++; - return; - } - - io_objects.insert (object_); -} - void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_) { @@ -857,37 +686,21 @@ void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, attach_pipes (in_pipe_, out_pipe_, peer_identity_); } -void zmq::socket_base_t::process_term_req (owned_t *object_) +void zmq::socket_base_t::process_unplug () { - // When shutting down we can ignore termination requests from owned - // objects. It means the termination request was already sent to - // the object. - if (zombie) - return; - - // If I/O object is well and alive ask it to terminate. - io_objects_t::iterator it = std::find (io_objects.begin (), - io_objects.end (), object_); - - // If not found, we assume that termination request was already sent to - // the object so we can safely ignore the request. - if (it == io_objects.end ()) - return; - - pending_term_acks++; - io_objects.erase (it); - send_term (object_); } -void zmq::socket_base_t::process_term_ack () +void zmq::socket_base_t::process_term () { - zmq_assert (pending_term_acks); - pending_term_acks--; -} + zmq_assert (zombie); -void zmq::socket_base_t::process_seqnum () -{ - processed_seqnum++; + // Unregister all inproc endpoints associated with this socket. + // Doing this we make sure that no new pipes from other sockets (inproc) + // will be initiated. + unregister_endpoints (this); + + // Continue the termination process immediately. + own_t::process_term (); } int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_, -- cgit v1.2.3