diff options
author | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:35 +0100 |
---|---|---|
committer | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:35 +0100 |
commit | e645fc2693acc796304498909786b7b47005b429 (patch) | |
tree | 4118cd4c7b9eba3ba1d6892800c79669ea94c4e9 /src/socket_base.cpp | |
parent | 2c416a793ea781273a5da6742211f5f01af13a2b (diff) |
Imported Upstream version 2.1.3upstream/2.1.3
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r-- | src/socket_base.cpp | 814 |
1 files changed, 452 insertions, 362 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp index c933954..4cefb6f 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1,19 +1,20 @@ /* - Copyright (c) 2007-2010 iMatix Corporation + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by + the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. + GNU Lesser General Public License for more details. - You should have received a copy of the Lesser GNU General Public License + You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ @@ -23,45 +24,203 @@ #include "../include/zmq.h" -#include "socket_base.hpp" -#include "app_thread.hpp" +#include "platform.hpp" + +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#if defined _MSC_VER +#include <intrin.h> +#endif +#else +#include <unistd.h> +#endif +#include "socket_base.hpp" #include "zmq_listener.hpp" #include "zmq_connecter.hpp" #include "io_thread.hpp" -#include "session.hpp" +#include "connect_session.hpp" #include "config.hpp" -#include "owned.hpp" +#include "clock.hpp" #include "pipe.hpp" #include "err.hpp" #include "ctx.hpp" #include "platform.hpp" -#include "pgm_sender.hpp" -#include "pgm_receiver.hpp" #include "likely.hpp" #include "uuid.hpp" -zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : - object_t (parent_), - pending_term_acks (0), +#include "pair.hpp" +#include "pub.hpp" +#include "sub.hpp" +#include "req.hpp" +#include "rep.hpp" +#include "pull.hpp" +#include "push.hpp" +#include "xreq.hpp" +#include "xrep.hpp" +#include "xpub.hpp" +#include "xsub.hpp" + +zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, + uint32_t tid_) +{ + socket_base_t *s = NULL; + switch (type_) { + + case ZMQ_PAIR: + s = new (std::nothrow) pair_t (parent_, tid_); + break; + case ZMQ_PUB: + s = new (std::nothrow) pub_t (parent_, tid_); + break; + case ZMQ_SUB: + s = new (std::nothrow) sub_t (parent_, tid_); + break; + case ZMQ_REQ: + s = new (std::nothrow) req_t (parent_, tid_); + break; + case ZMQ_REP: + s = new (std::nothrow) rep_t (parent_, tid_); + break; + case ZMQ_XREQ: + s = new (std::nothrow) xreq_t (parent_, tid_); + break; + case ZMQ_XREP: + s = new (std::nothrow) xrep_t (parent_, tid_); + break; + case ZMQ_PULL: + s = new (std::nothrow) pull_t (parent_, tid_); + break; + case ZMQ_PUSH: + s = new (std::nothrow) push_t (parent_, tid_); + break; + case ZMQ_XPUB: + s = new (std::nothrow) xpub_t (parent_, tid_); + break; + case ZMQ_XSUB: + s = new (std::nothrow) xsub_t (parent_, tid_); + break; + default: + errno = EINVAL; + return NULL; + } + alloc_assert (s); + return s; +} + +zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) : + own_t (parent_, tid_), + ctx_terminated (false), + destroyed (false), + last_tsc (0), ticks (0), - rcvmore (false), - app_thread (parent_), - shutting_down (false), - sent_seqnum (0), - processed_seqnum (0), - next_ordinal (1) + rcvmore (false) { } zmq::socket_base_t::~socket_base_t () { + zmq_assert (destroyed); + + // Check whether there are no session leaks. + sessions_sync.lock (); + zmq_assert (sessions.empty ()); + sessions_sync.unlock (); +} + +zmq::mailbox_t *zmq::socket_base_t::get_mailbox () +{ + return &mailbox; +} + +void zmq::socket_base_t::stop () +{ + // Called by ctx when it is terminated (zmq_term). + // 'stop' command is sent from the threads that called zmq_term to + // the thread owning the socket. This way, blocking call in the + // owner thread can be interrupted. + send_stop (); +} + +int zmq::socket_base_t::parse_uri (const char *uri_, + std::string &protocol_, std::string &address_) +{ + zmq_assert (uri_ != NULL); + + std::string uri (uri_); + std::string::size_type pos = uri.find ("://"); + if (pos == std::string::npos) { + errno = EINVAL; + return -1; + } + protocol_ = uri.substr (0, pos); + address_ = uri.substr (pos + 3); + if (protocol_.empty () || address_.empty ()) { + errno = EINVAL; + return -1; + } + return 0; +} + +int zmq::socket_base_t::check_protocol (const std::string &protocol_) +{ + // First check out whether the protcol is something we are aware of. + if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && + protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys") { + errno = EPROTONOSUPPORT; + return -1; + } + + // If 0MQ is not compiled with OpenPGM, pgm and epgm transports + // are not avaialble. +#if !defined ZMQ_HAVE_OPENPGM + if (protocol_ == "pgm" || protocol_ == "epgm") { + errno = EPROTONOSUPPORT; + return -1; + } +#endif + + // IPC transport is not available on Windows and OpenVMS. +#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS + if (protocol_ == "ipc") { + // Unknown protocol. + errno = EPROTONOSUPPORT; + return -1; + } +#endif + + // Check whether socket type and transport protocol match. + // Specifically, multicast protocols can't be combined with + // bi-directional messaging patterns (socket types). + if ((protocol_ == "pgm" || protocol_ == "epgm") && + options.type != ZMQ_PUB && options.type != ZMQ_SUB && + options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) { + errno = ENOCOMPATPROTO; + return -1; + } + + // Protocol is available. + return 0; +} + +void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_, const blob_t &peer_identity_) +{ + // If the peer haven't specified it's identity, let's generate one. + if (peer_identity_.size ()) { + xattach_pipes (inpipe_, outpipe_, peer_identity_); + } + else { + blob_t identity (1, 0); + identity.append (uuid_t ().to_blob (), uuid_t::uuid_blob_len); + xattach_pipes (inpipe_, outpipe_, identity); + } } int zmq::socket_base_t::setsockopt (int option_, const void *optval_, size_t optvallen_) { - if (unlikely (app_thread->is_terminated ())) { + if (unlikely (ctx_terminated)) { errno = ETERM; return -1; } @@ -79,7 +238,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, int zmq::socket_base_t::getsockopt (int option_, void *optval_, size_t *optvallen_) { - if (unlikely (app_thread->is_terminated ())) { + if (unlikely (ctx_terminated)) { errno = ETERM; return -1; } @@ -94,271 +253,227 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, return 0; } + if (option_ == ZMQ_FD) { + if (*optvallen_ < sizeof (fd_t)) { + errno = EINVAL; + return -1; + } + *((fd_t*) optval_) = mailbox.get_fd (); + *optvallen_ = sizeof (fd_t); + return 0; + } + + if (option_ == ZMQ_EVENTS) { + if (*optvallen_ < sizeof (uint32_t)) { + errno = EINVAL; + return -1; + } + int rc = process_commands (false, false); + if (rc != 0 && (errno == EINTR || errno == ETERM)) + return -1; + errno_assert (rc == 0); + *((uint32_t*) optval_) = 0; + if (has_out ()) + *((uint32_t*) optval_) |= ZMQ_POLLOUT; + if (has_in ()) + *((uint32_t*) optval_) |= ZMQ_POLLIN; + *optvallen_ = sizeof (uint32_t); + return 0; + } + return options.getsockopt (option_, optval_, optvallen_); } int zmq::socket_base_t::bind (const char *addr_) { - if (unlikely (app_thread->is_terminated ())) { + if (unlikely (ctx_terminated)) { errno = ETERM; return -1; } // Parse addr_ string. - std::string addr_type; - std::string addr_args; - - std::string addr (addr_); - std::string::size_type pos = addr.find ("://"); - - if (pos == std::string::npos) { - errno = EINVAL; + std::string protocol; + std::string address; + int rc = parse_uri (addr_, protocol, address); + if (rc != 0) return -1; - } - addr_type = addr.substr (0, pos); - addr_args = addr.substr (pos + 3); + rc = check_protocol (protocol); + if (rc != 0) + return -1; - if (addr_type == "inproc") - return register_endpoint (addr_args.c_str (), this); + if (protocol == "inproc" || protocol == "sys") { + endpoint_t endpoint = {this, options}; + return register_endpoint (addr_, endpoint); + } - if (addr_type == "tcp" || addr_type == "ipc") { + if (protocol == "tcp" || protocol == "ipc") { -#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS - if (addr_type == "ipc") { - errno = EPROTONOSUPPORT; + // Choose I/O thread to run the listerner in. + io_thread_t *io_thread = choose_io_thread (options.affinity); + if (!io_thread) { + errno = EMTHREAD; return -1; } -#endif + // Create and run the listener. zmq_listener_t *listener = new (std::nothrow) zmq_listener_t ( - choose_io_thread (options.affinity), this, options); - zmq_assert (listener); - int rc = listener->set_address (addr_type.c_str(), addr_args.c_str ()); + io_thread, this, options); + alloc_assert (listener); + int rc = listener->set_address (protocol.c_str(), address.c_str ()); if (rc != 0) { delete listener; return -1; } + launch_child (listener); - send_plug (listener); - send_own (this, listener); return 0; } -#if defined ZMQ_HAVE_OPENPGM - if (addr_type == "pgm" || addr_type == "epgm") { - // In the case of PGM bind behaves the same like connect. + if (protocol == "pgm" || protocol == "epgm") { + + // For convenience's sake, bind can be used interchageable with + // connect for PGM and EPGM transports. return connect (addr_); } -#endif - // Unknown protocol. - errno = EPROTONOSUPPORT; + zmq_assert (false); return -1; } int zmq::socket_base_t::connect (const char *addr_) { - if (unlikely (app_thread->is_terminated ())) { + if (unlikely (ctx_terminated)) { errno = ETERM; return -1; } // Parse addr_ string. - std::string addr_type; - std::string addr_args; - - std::string addr (addr_); - std::string::size_type pos = addr.find ("://"); - - if (pos == std::string::npos) { - errno = EINVAL; + std::string protocol; + std::string address; + int rc = parse_uri (addr_, protocol, address); + if (rc != 0) return -1; - } - addr_type = addr.substr (0, pos); - addr_args = addr.substr (pos + 3); + rc = check_protocol (protocol); + if (rc != 0) + return -1; - if (addr_type == "inproc") { + if (protocol == "inproc" || protocol == "sys") { // TODO: inproc connect is specific with respect to creating pipes // as there's no 'reconnect' functionality implemented. Once that // is in place we should follow generic pipe creation algorithm. - // Find the peer socket. - socket_base_t *peer = find_endpoint (addr_args.c_str ()); - if (!peer) + // Find the peer endpoint. + endpoint_t peer = find_endpoint (addr_); + if (!peer.socket) return -1; - pipe_t *in_pipe = NULL; - pipe_t *out_pipe = NULL; + reader_t *inpipe_reader = NULL; + writer_t *inpipe_writer = NULL; + reader_t *outpipe_reader = NULL; + writer_t *outpipe_writer = NULL; + + // The total HWM for an inproc connection should be the sum of + // the binder's HWM and the connector's HWM. (Similarly for the + // SWAP.) + int64_t hwm; + if (options.hwm == 0 || peer.options.hwm == 0) + hwm = 0; + else + hwm = options.hwm + peer.options.hwm; + int64_t swap; + if (options.swap == 0 && peer.options.swap == 0) + swap = 0; + else + swap = options.swap + peer.options.swap; // Create inbound pipe, if required. - if (options.requires_in) { - in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm, options.swap); - zmq_assert (in_pipe); - } + if (options.requires_in) + create_pipe (this, peer.socket, hwm, swap, + &inpipe_reader, &inpipe_writer); // Create outbound pipe, if required. - if (options.requires_out) { - out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm, options.swap); - zmq_assert (out_pipe); - } + if (options.requires_out) + create_pipe (peer.socket, this, hwm, swap, + &outpipe_reader, &outpipe_writer); // Attach the pipes to this socket object. - attach_pipes (in_pipe ? &in_pipe->reader : NULL, - out_pipe ? &out_pipe->writer : NULL, blob_t ()); + attach_pipes (inpipe_reader, outpipe_writer, peer.options.identity); // Attach the pipes to the peer socket. Note that peer's seqnum - // was incremented in find_endpoint function. The callee is notified - // about the fact via the last parameter. - send_bind (peer, out_pipe ? &out_pipe->reader : NULL, - in_pipe ? &in_pipe->writer : NULL, options.identity, false); + // was incremented in find_endpoint function. We don't need it + // increased here. + send_bind (peer.socket, outpipe_reader, inpipe_writer, + options.identity, false); return 0; } - // Create unnamed session. + // Choose the I/O thread to run the session in. io_thread_t *io_thread = choose_io_thread (options.affinity); - session_t *session = new (std::nothrow) session_t (io_thread, - this, options); - zmq_assert (session); + if (!io_thread) { + errno = EMTHREAD; + return -1; + } + + // Create session. + connect_session_t *session = new (std::nothrow) connect_session_t ( + io_thread, this, options, protocol.c_str (), address.c_str ()); + alloc_assert (session); - // If 'immediate connect' feature is required, we'll created the pipes + // If 'immediate connect' feature is required, we'll create the pipes // to the session straight away. Otherwise, they'll be created by the // session once the connection is established. if (options.immediate_connect) { - pipe_t *in_pipe = NULL; - pipe_t *out_pipe = NULL; + reader_t *inpipe_reader = NULL; + writer_t *inpipe_writer = NULL; + reader_t *outpipe_reader = NULL; + writer_t *outpipe_writer = NULL; // Create inbound pipe, if required. - if (options.requires_in) { - in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm, options.swap); - zmq_assert (in_pipe); - - } + if (options.requires_in) + create_pipe (this, session, options.hwm, options.swap, + &inpipe_reader, &inpipe_writer); // Create outbound pipe, if required. - if (options.requires_out) { - out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm, options.swap); - zmq_assert (out_pipe); - } + if (options.requires_out) + create_pipe (session, this, options.hwm, options.swap, + &outpipe_reader, &outpipe_writer); // Attach the pipes to the socket object. - attach_pipes (in_pipe ? &in_pipe->reader : NULL, - out_pipe ? &out_pipe->writer : NULL, blob_t ()); + attach_pipes (inpipe_reader, outpipe_writer, blob_t ()); // Attach the pipes to the session object. - session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, - in_pipe ? &in_pipe->writer : NULL, blob_t ()); + session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ()); } - // Activate the session. - send_plug (session); - send_own (this, session); - - if (addr_type == "tcp" || addr_type == "ipc") { - -#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS - // Windows named pipes are not compatible with Winsock API. - // There's no UNIX domain socket implementation on OpenVMS. - if (addr_type == "ipc") { - errno = EPROTONOSUPPORT; - return -1; - } -#endif - - // Create the connecter object. Supply it with the session name - // so that it can bind the new connection to the session once - // it is established. - zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( - choose_io_thread (options.affinity), this, options, - session->get_ordinal (), false); - zmq_assert (connecter); - int rc = connecter->set_address (addr_type.c_str(), addr_args.c_str ()); - if (rc != 0) { - delete connecter; - return -1; - } - send_plug (connecter); - send_own (this, connecter); + // Activate the session. Make it a child of this socket. + launch_child (session); - return 0; - } - -#if defined ZMQ_HAVE_OPENPGM - if (addr_type == "pgm" || addr_type == "epgm") { - - // If the socket type requires bi-directional communication - // multicast is not an option (it is uni-directional). - if (options.requires_in && options.requires_out) { - errno = ENOCOMPATPROTO; - return -1; - } - - // For epgm, pgm transport with UDP encapsulation is used. - bool udp_encapsulation = (addr_type == "epgm"); - - // At this point we'll create message pipes to the session straight - // away. There's no point in delaying it as no concept of 'connect' - // exists with PGM anyway. - if (options.requires_out) { - - // PGM sender. - pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( - choose_io_thread (options.affinity), options); - zmq_assert (pgm_sender); - - int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ()); - if (rc != 0) { - delete pgm_sender; - return -1; - } - - send_attach (session, pgm_sender, blob_t ()); - } - else if (options.requires_in) { - - // PGM receiver. - pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( - choose_io_thread (options.affinity), options); - zmq_assert (pgm_receiver); - - int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ()); - if (rc != 0) { - delete pgm_receiver; - return -1; - } - - send_attach (session, pgm_receiver, blob_t ()); - } - else - zmq_assert (false); - - return 0; - } -#endif - - // Unknown protoco. - errno = EPROTONOSUPPORT; - return -1; + return 0; } int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) { - // Process pending commands, if any. - if (unlikely (!app_thread->process_commands (false, true))) { + if (unlikely (ctx_terminated)) { errno = ETERM; return -1; } + // Process pending commands, if any. + int rc = process_commands (false, true); + if (unlikely (rc != 0)) + return -1; + // At this point we impose the MORE flag on the message. if (flags_ & ZMQ_SNDMORE) msg_->flags |= ZMQ_MSG_MORE; // Try to send the message. - int rc = xsend (msg_, flags_); + rc = xsend (msg_, flags_); if (rc == 0) return 0; @@ -372,10 +487,8 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) while (rc != 0) { if (errno != EAGAIN) return -1; - if (unlikely (!app_thread->process_commands (true, false))) { - errno = ETERM; + if (unlikely (process_commands (true, false) != 0)) return -1; - } rc = xsend (msg_, flags_); } return 0; @@ -383,6 +496,11 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) { + if (unlikely (ctx_terminated)) { + errno = ETERM; + return -1; + } + // Get the message. int rc = xrecv (msg_, flags_); int err = errno; @@ -394,12 +512,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) // // Note that 'recv' uses different command throttling algorithm (the one // described above) from the one used by 'send'. This is because counting - // ticks is more efficient than doing rdtsc all the time. + // ticks is more efficient than doing RDTSC all the time. if (++ticks == inbound_poll_rate) { - if (unlikely (!app_thread->process_commands (false, false))) { - errno = ETERM; + if (unlikely (process_commands (false, false) != 0)) return -1; - } ticks = 0; } @@ -415,15 +531,14 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) errno = err; // If the message cannot be fetched immediately, there are two scenarios. - // For non-blocking recv, commands are processed in case there's a revive - // command already waiting int a command pipe. If it's not, return EAGAIN. + // For non-blocking recv, commands are processed in case there's an + // activate_reader command already waiting int a command pipe. + // If it's not, return EAGAIN. if (flags_ & ZMQ_NOBLOCK) { if (errno != EAGAIN) return -1; - if (unlikely (!app_thread->process_commands (false, false))) { - errno = ETERM; + if (unlikely (process_commands (false, false) != 0)) return -1; - } ticks = 0; rc = xrecv (msg_, flags_); @@ -437,15 +552,15 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) // In blocking scenario, commands are processed over and over again until // we are able to fetch a message. + bool block = (ticks != 0); while (rc != 0) { if (errno != EAGAIN) return -1; - if (unlikely (!app_thread->process_commands (true, false))) { - errno = ETERM; + if (unlikely (process_commands (block, false) != 0)) return -1; - } rc = xrecv (msg_, flags_); ticks = 0; + block = true; } rcvmore = msg_->flags & ZMQ_MSG_MORE; @@ -456,74 +571,14 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::close () { - shutting_down = true; - - // Let the thread know that the socket is no longer available. - app_thread->remove_socket (this); - - // Pointer to the context must be retrieved before the socket is - // deallocated. Afterwards it is not available. - ctx_t *ctx = get_ctx (); - - // Unregister all inproc endpoints associated with this socket. - // From this point we are sure that inc_seqnum won't be called again - // on this object. - ctx->unregister_endpoints (this); - - // Wait till all undelivered commands are delivered. This should happen - // very quickly. There's no way to wait here for extensive period of time. - while (processed_seqnum != sent_seqnum.get ()) - app_thread->process_commands (true, false); - - while (true) { - - // On third pass of the loop there should be no more I/O objects - // because all connecters and listerners were destroyed during - // the first pass and all engines delivered by delayed 'own' commands - // are destroyed during the second pass. - if (io_objects.empty () && !pending_term_acks) - break; - - // Send termination request to all associated I/O objects. - for (io_objects_t::iterator it = io_objects.begin (); - it != io_objects.end (); it++) - send_term (*it); - - // Move the objects to the list of pending term acks. - pending_term_acks += io_objects.size (); - io_objects.clear (); - - // Process commands till we get all the termination acknowledgements. - while (pending_term_acks) - app_thread->process_commands (true, false); - } - - // Check whether there are no session leaks. - sessions_sync.lock (); - zmq_assert (named_sessions.empty ()); - zmq_assert (unnamed_sessions.empty ()); - sessions_sync.unlock (); - - delete this; - - // This function must be called after the socket is completely deallocated - // as it may cause termination of the whole 0MQ infrastructure. - ctx->destroy_socket (); + // Transfer the ownership of the socket from this application thread + // to the reaper thread which will take care of the rest of shutdown + // process. + send_reap (this); return 0; } -void zmq::socket_base_t::inc_seqnum () -{ - // NB: This function may be called from a different thread! - sent_seqnum.add (1); -} - -zmq::app_thread_t *zmq::socket_base_t::get_thread () -{ - return app_thread; -} - bool zmq::socket_base_t::has_in () { return xhas_in (); @@ -534,30 +589,30 @@ bool zmq::socket_base_t::has_out () return xhas_out (); } -bool zmq::socket_base_t::register_session (const blob_t &peer_identity_, +bool zmq::socket_base_t::register_session (const blob_t &name_, session_t *session_) { sessions_sync.lock (); - bool registered = named_sessions.insert ( - std::make_pair (peer_identity_, session_)).second; + bool registered = sessions.insert ( + sessions_t::value_type (name_, session_)).second; sessions_sync.unlock (); return registered; } -void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_) +void zmq::socket_base_t::unregister_session (const blob_t &name_) { sessions_sync.lock (); - named_sessions_t::iterator it = named_sessions.find (peer_identity_); - zmq_assert (it != named_sessions.end ()); - named_sessions.erase (it); + sessions_t::iterator it = sessions.find (name_); + zmq_assert (it != sessions.end ()); + sessions.erase (it); sessions_sync.unlock (); } -zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_) +zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_) { sessions_sync.lock (); - named_sessions_t::iterator it = named_sessions.find (peer_identity_); - if (it == named_sessions.end ()) { + sessions_t::iterator it = sessions.find (name_); + if (it == sessions.end ()) { sessions_sync.unlock (); return NULL; } @@ -570,129 +625,164 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_) return session; } -uint64_t zmq::socket_base_t::register_session (session_t *session_) +void zmq::socket_base_t::start_reaping (poller_t *poller_) { - sessions_sync.lock (); - uint64_t ordinal = next_ordinal; - next_ordinal++; - unnamed_sessions.insert (std::make_pair (ordinal, session_)); - sessions_sync.unlock (); - return ordinal; + poller = poller_; + handle = poller->add_fd (mailbox.get_fd (), this); + poller->set_pollin (handle); } -void zmq::socket_base_t::unregister_session (uint64_t ordinal_) +int zmq::socket_base_t::process_commands (bool block_, bool throttle_) { - sessions_sync.lock (); - unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_); - zmq_assert (it != unnamed_sessions.end ()); - unnamed_sessions.erase (it); - sessions_sync.unlock (); -} + int rc; + command_t cmd; + if (block_) { + rc = mailbox.recv (&cmd, true); + if (rc == -1 && errno == EINTR) + return -1; + errno_assert (rc == 0); + } + else { -zmq::session_t *zmq::socket_base_t::find_session (uint64_t ordinal_) -{ - sessions_sync.lock (); + // Get the CPU's tick counter. If 0, the counter is not available. + uint64_t tsc = zmq::clock_t::rdtsc (); + + // Optimised version of command processing - it doesn't have to check + // for incoming commands each time. It does so only if certain time + // elapsed since last command processing. Command delay varies + // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU + // etc. The optimisation makes sense only on platforms where getting + // a timestamp is a very cheap operation (tens of nanoseconds). + if (tsc && throttle_) { + + // Check whether TSC haven't jumped backwards (in case of migration + // between CPU cores) and whether certain time have elapsed since + // last command processing. If it didn't do nothing. + if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay) + return 0; + last_tsc = tsc; + } - unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_); - if (it == unnamed_sessions.end ()) { - sessions_sync.unlock (); - return NULL; + // Check whether there are any commands pending for this thread. + rc = mailbox.recv (&cmd, false); } - session_t *session = it->second; - // Prepare the session for subsequent attach command. - session->inc_seqnum (); + // Process all the commands available at the moment. + while (true) { + if (rc == -1 && errno == EAGAIN) + break; + if (rc == -1 && errno == EINTR) + return -1; + errno_assert (rc == 0); + cmd.destination->process_command (cmd); + rc = mailbox.recv (&cmd, false); + } - sessions_sync.unlock (); - return session; + if (ctx_terminated) { + errno = ETERM; + return -1; + } + + return 0; } -void zmq::socket_base_t::kill (reader_t *pipe_) +void zmq::socket_base_t::process_stop () { - xkill (pipe_); + // Here, someone have called zmq_term while the socket was still alive. + // We'll remember the fact so that any blocking call is interrupted and any + // further attempt to use the socket will return ETERM. The user is still + // responsible for calling zmq_close on the socket though! + ctx_terminated = true; } -void zmq::socket_base_t::revive (reader_t *pipe_) +void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, + const blob_t &peer_identity_) { - xrevive (pipe_); + attach_pipes (in_pipe_, out_pipe_, peer_identity_); } -void zmq::socket_base_t::revive (writer_t *pipe_) +void zmq::socket_base_t::process_unplug () { - xrevive (pipe_); } -void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) +void zmq::socket_base_t::process_term (int linger_) { - if (inpipe_) - inpipe_->set_endpoint (this); - if (outpipe_) - outpipe_->set_endpoint (this); + // Unregister all inproc endpoints associated with this socket. + // Doing this we make sure that no new pipes from other sockets (inproc) + // will be initiated. + unregister_endpoints (this); - // If the peer haven't specified it's identity, let's generate one. - if (peer_identity_.size ()) { - xattach_pipes (inpipe_, outpipe_, peer_identity_); - } - else { - blob_t identity (1, 0); - identity.append (uuid_t ().to_blob (), uuid_t::uuid_blob_len); - xattach_pipes (inpipe_, outpipe_, identity); - } + // Continue the termination process immediately. + own_t::process_term (linger_); } -void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_) +void zmq::socket_base_t::process_destroy () { - xdetach_inpipe (pipe_); - pipe_->set_endpoint (NULL); // ? + destroyed = true; } -void zmq::socket_base_t::detach_outpipe (class writer_t *pipe_) +int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) { - xdetach_outpipe (pipe_); - pipe_->set_endpoint (NULL); // ? + errno = EINVAL; + return -1; } -void zmq::socket_base_t::process_own (owned_t *object_) +bool zmq::socket_base_t::xhas_out () { - io_objects.insert (object_); + return false; } -void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, - const blob_t &peer_identity_) +int zmq::socket_base_t::xsend (zmq_msg_t *msg_, int options_) { - attach_pipes (in_pipe_, out_pipe_, peer_identity_); + errno = ENOTSUP; + return -1; } -void zmq::socket_base_t::process_term_req (owned_t *object_) +bool zmq::socket_base_t::xhas_in () { - // When shutting down we can ignore termination requests from owned - // objects. They are going to be terminated anyway. - if (shutting_down) - return; - - // If I/O object is well and alive ask it to terminate. - io_objects_t::iterator it = std::find (io_objects.begin (), - io_objects.end (), object_); + return false; +} - // If not found, we assume that termination request was already sent to - // the object so we can sagely ignore the request. - if (it == io_objects.end ()) - return; +int zmq::socket_base_t::xrecv (zmq_msg_t *msg_, int options_) +{ + errno = ENOTSUP; + return -1; +} - pending_term_acks++; - io_objects.erase (it); - send_term (object_); +void zmq::socket_base_t::in_event () +{ + // Process any commands from other threads/sockets that may be available + // at the moment. Ultimately, socket will be destroyed. + process_commands (false, false); + check_destroy (); } -void zmq::socket_base_t::process_term_ack () +void zmq::socket_base_t::out_event () { - zmq_assert (pending_term_acks); - pending_term_acks--; + zmq_assert (false); } -void zmq::socket_base_t::process_seqnum () +void zmq::socket_base_t::timer_event (int id_) { - processed_seqnum++; + zmq_assert (false); } +void zmq::socket_base_t::check_destroy () +{ + // If the object was already marked as destroyed, finish the deallocation. + if (destroyed) { + + // Remove the socket from the reaper's poller. + poller->rm_fd (handle); + + // Remove the socket from the context. + destroy_socket (this); + + // Notify the reaper about the fact. + send_reaped (); + + // Deallocate. + own_t::process_destroy (); + } +} |