diff options
Diffstat (limited to 'src/socket_base.cpp')
| -rw-r--r-- | src/socket_base.cpp | 495 | 
1 files changed, 359 insertions, 136 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp index c933954..5d3175a 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -23,9 +23,18 @@  #include "../include/zmq.h" -#include "socket_base.hpp" -#include "app_thread.hpp" +#include "platform.hpp" + +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#if defined _MSC_VER +#include <intrin.h> +#endif +#else +#include <unistd.h> +#endif +#include "socket_base.hpp"  #include "zmq_listener.hpp"  #include "zmq_connecter.hpp"  #include "io_thread.hpp" @@ -39,15 +48,73 @@  #include "pgm_sender.hpp"  #include "pgm_receiver.hpp"  #include "likely.hpp" +#include "pair.hpp" +#include "pub.hpp" +#include "sub.hpp" +#include "req.hpp" +#include "rep.hpp" +#include "pull.hpp" +#include "push.hpp" +#include "xreq.hpp" +#include "xrep.hpp"  #include "uuid.hpp" -zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : -    object_t (parent_), +//  If the RDTSC is available we use it to prevent excessive +//  polling for commands. The nice thing here is that it will work on any +//  system with x86 architecture and gcc or MSVC compiler. +#if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\ +    (defined _MSC_VER && (defined _M_IX86 || defined _M_X64)) +#define ZMQ_DELAY_COMMANDS +#endif + +zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, +    uint32_t slot_) +{ +    socket_base_t *s = NULL; +    switch (type_) { + +    case ZMQ_PAIR: +        s = new (std::nothrow) pair_t (parent_, slot_); +        break; +    case ZMQ_PUB: +        s = new (std::nothrow) pub_t (parent_, slot_); +        break; +    case ZMQ_SUB: +        s = new (std::nothrow) sub_t (parent_, slot_); +        break; +    case ZMQ_REQ: +        s = new (std::nothrow) req_t (parent_, slot_); +        break; +    case ZMQ_REP: +        s = new (std::nothrow) rep_t (parent_, slot_); +        break; +    case ZMQ_XREQ: +        s = new (std::nothrow) xreq_t (parent_, slot_); +        break; +    case ZMQ_XREP: +        s = new (std::nothrow) xrep_t (parent_, slot_); +        break;      +    case ZMQ_PULL: +        s = new (std::nothrow) pull_t (parent_, slot_); +        break; +    case ZMQ_PUSH: +        s = new (std::nothrow) push_t (parent_, slot_); +        break; +    default: +        errno = EINVAL; +        return NULL; +    } +    zmq_assert (s); +    return s; +} + +zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) : +    object_t (parent_, slot_), +    zombie (false), +    last_processing_time (0),      pending_term_acks (0),      ticks (0),      rcvmore (false), -    app_thread (parent_), -    shutting_down (false),      sent_seqnum (0),      processed_seqnum (0),      next_ordinal (1) @@ -58,10 +125,38 @@ zmq::socket_base_t::~socket_base_t ()  {  } +zmq::signaler_t *zmq::socket_base_t::get_signaler () +{ +    return &signaler; +} + +void zmq::socket_base_t::stop () +{ +    //  Called by ctx when it is terminated (zmq_term). +    //  'stop' command is sent from the threads that called zmq_term to +    //  the thread owning the socket. This way, blocking call in the +    //  owner thread can be interrupted. +    send_stop (); +} + +void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, +    class writer_t *outpipe_, const blob_t &peer_identity_) +{ +    // If the peer haven't specified it's identity, let's generate one. +    if (peer_identity_.size ()) { +        xattach_pipes (inpipe_, outpipe_, peer_identity_); +    } +    else { +        blob_t identity (1, 0); +        identity.append (uuid_t ().to_blob (), uuid_t::uuid_blob_len); +        xattach_pipes (inpipe_, outpipe_, identity); +    } +} +  int zmq::socket_base_t::setsockopt (int option_, const void *optval_,      size_t optvallen_)  { -    if (unlikely (app_thread->is_terminated ())) { +    if (unlikely (zombie)) {          errno = ETERM;          return -1;      } @@ -79,7 +174,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,  int zmq::socket_base_t::getsockopt (int option_, void *optval_,      size_t *optvallen_)  { -    if (unlikely (app_thread->is_terminated ())) { +    if (unlikely (zombie)) {          errno = ETERM;          return -1;      } @@ -94,12 +189,37 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,          return 0;      } +    if (option_ == ZMQ_FD) { +        if (*optvallen_ < sizeof (fd_t)) { +            errno = EINVAL; +            return -1; +        } +        *((fd_t*) optval_) = signaler.get_fd (); +        *optvallen_ = sizeof (fd_t); +        return 0; +    } + +    if (option_ == ZMQ_EVENTS) { +        if (*optvallen_ < sizeof (uint32_t)) { +            errno = EINVAL; +            return -1; +        } +        process_commands(false, false); +        *((uint32_t*) optval_) = 0; +        if (has_out ()) +            *((uint32_t*) optval_) |= ZMQ_POLLOUT; +        if (has_in ()) +            *((uint32_t*) optval_) |= ZMQ_POLLIN; +        *optvallen_ = sizeof (uint32_t); +        return 0; +    } +      return options.getsockopt (option_, optval_, optvallen_);  }  int zmq::socket_base_t::bind (const char *addr_)  { -    if (unlikely (app_thread->is_terminated ())) { +    if (unlikely (zombie)) {          errno = ETERM;          return -1;      } @@ -159,7 +279,7 @@ int zmq::socket_base_t::bind (const char *addr_)  int zmq::socket_base_t::connect (const char *addr_)  { -    if (unlikely (app_thread->is_terminated ())) { +    if (unlikely (zombie)) {          errno = ETERM;          return -1;      } @@ -190,30 +310,29 @@ int zmq::socket_base_t::connect (const char *addr_)          if (!peer)              return -1; -        pipe_t *in_pipe = NULL; -        pipe_t *out_pipe = NULL; - +        reader_t *inpipe_reader = NULL; +        writer_t *inpipe_writer = NULL; +        reader_t *outpipe_reader = NULL; +        writer_t *outpipe_writer = NULL; +           //  Create inbound pipe, if required. -        if (options.requires_in) { -            in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm, options.swap); -            zmq_assert (in_pipe); -        } +        if (options.requires_in) +            create_pipe (this, peer, options.hwm, options.swap, +                &inpipe_reader, &inpipe_writer);          //  Create outbound pipe, if required. -        if (options.requires_out) { -            out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm, options.swap); -            zmq_assert (out_pipe); -        } +        if (options.requires_out) +            create_pipe (peer, this, options.hwm, options.swap, +                &outpipe_reader, &outpipe_writer);          //  Attach the pipes to this socket object. -        attach_pipes (in_pipe ? &in_pipe->reader : NULL, -            out_pipe ? &out_pipe->writer : NULL, blob_t ()); +        attach_pipes (inpipe_reader, outpipe_writer, blob_t ());          //  Attach the pipes to the peer socket. Note that peer's seqnum          //  was incremented in find_endpoint function. The callee is notified          //  about the fact via the last parameter. -        send_bind (peer, out_pipe ? &out_pipe->reader : NULL, -            in_pipe ? &in_pipe->writer : NULL, options.identity, false); +        send_bind (peer, outpipe_reader, inpipe_writer, +            options.identity, false);          return 0;      } @@ -224,34 +343,31 @@ int zmq::socket_base_t::connect (const char *addr_)          this, options);      zmq_assert (session); -    //  If 'immediate connect' feature is required, we'll created the pipes +    //  If 'immediate connect' feature is required, we'll create the pipes      //  to the session straight away. Otherwise, they'll be created by the      //  session once the connection is established.      if (options.immediate_connect) { -        pipe_t *in_pipe = NULL; -        pipe_t *out_pipe = NULL; +        reader_t *inpipe_reader = NULL; +        writer_t *inpipe_writer = NULL; +        reader_t *outpipe_reader = NULL; +        writer_t *outpipe_writer = NULL;          //  Create inbound pipe, if required. -        if (options.requires_in) { -            in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm, options.swap); -            zmq_assert (in_pipe); - -        } +        if (options.requires_in) +            create_pipe (this, session, options.hwm, options.swap, +                &inpipe_reader, &inpipe_writer);          //  Create outbound pipe, if required. -        if (options.requires_out) { -            out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm, options.swap); -            zmq_assert (out_pipe); -        } +        if (options.requires_out) +            create_pipe (session, this, options.hwm, options.swap, +                &outpipe_reader, &outpipe_writer);          //  Attach the pipes to the socket object. -        attach_pipes (in_pipe ? &in_pipe->reader : NULL, -            out_pipe ? &out_pipe->writer : NULL, blob_t ()); +        attach_pipes (inpipe_reader, outpipe_writer, blob_t ());          //  Attach the pipes to the session object. -        session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, -            in_pipe ? &in_pipe->writer : NULL, blob_t ()); +        session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ());      }      //  Activate the session. @@ -347,8 +463,14 @@ int zmq::socket_base_t::connect (const char *addr_)  int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)  { +    if (unlikely (zombie)) { +        errno = ETERM; +        return -1; +    } +      //  Process pending commands, if any. -    if (unlikely (!app_thread->process_commands (false, true))) { +    process_commands (false, true); +    if (unlikely (zombie)) {          errno = ETERM;          return -1;      } @@ -372,7 +494,8 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)      while (rc != 0) {          if (errno != EAGAIN)              return -1; -        if (unlikely (!app_thread->process_commands (true, false))) { +        process_commands (true, false); +        if (unlikely (zombie)) {              errno = ETERM;              return -1;          } @@ -383,6 +506,11 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)  int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)  { +    if (unlikely (zombie)) { +        errno = ETERM; +        return -1; +    } +      //  Get the message.      int rc = xrecv (msg_, flags_);      int err = errno; @@ -396,7 +524,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)      //  described above) from the one used by 'send'. This is because counting      //  ticks is more efficient than doing rdtsc all the time.      if (++ticks == inbound_poll_rate) { -        if (unlikely (!app_thread->process_commands (false, false))) { +        process_commands (false, false); +        if (unlikely (zombie)) {              errno = ETERM;              return -1;          } @@ -420,7 +549,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)      if (flags_ & ZMQ_NOBLOCK) {          if (errno != EAGAIN)              return -1; -        if (unlikely (!app_thread->process_commands (false, false))) { +        process_commands (false, false); +        if (unlikely (zombie)) {              errno = ETERM;              return -1;          } @@ -440,7 +570,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)      while (rc != 0) {          if (errno != EAGAIN)              return -1; -        if (unlikely (!app_thread->process_commands (true, false))) { +        process_commands (true, false); +        if (unlikely (zombie)) {              errno = ETERM;              return -1;          } @@ -456,74 +587,72 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)  int zmq::socket_base_t::close ()  { -    shutting_down = true; - -    //  Let the thread know that the socket is no longer available. -    app_thread->remove_socket (this); - -    //  Pointer to the context must be retrieved before the socket is -    //  deallocated. Afterwards it is not available. -    ctx_t *ctx = get_ctx (); +    //  Socket becomes a zombie. From now on all new arrived pipes (bind +    //  command) and I/O objects (own command) are immediately terminated. +    //  Also, any further requests form I/O object termination are ignored +    //  (we are going to shut them down anyway -- this way we assure that +    //  we do so once only). +    zombie = true;      //  Unregister all inproc endpoints associated with this socket. -    //  From this point we are sure that inc_seqnum won't be called again -    //  on this object. -    ctx->unregister_endpoints (this); - -    //  Wait till all undelivered commands are delivered. This should happen -    //  very quickly. There's no way to wait here for extensive period of time. +    //  Doing this we make sure that no new pipes from other sockets (inproc) +    //  will be initiated. However, there may be some inproc pipes already +    //  on the fly, but not yet received by this socket. To get finished +    //  with them we'll do the subsequent waiting from on-the-fly commands. +    //  This should happen very quickly. There's no way to block here for +    //  extensive period of time. +    unregister_endpoints (this);      while (processed_seqnum != sent_seqnum.get ()) -        app_thread->process_commands (true, false); - -    while (true) { - -        //  On third pass of the loop there should be no more I/O objects -        //  because all connecters and listerners were destroyed during -        //  the first pass and all engines delivered by delayed 'own' commands -        //  are destroyed during the second pass. -        if (io_objects.empty () && !pending_term_acks) -            break; - -        //  Send termination request to all associated I/O objects. -        for (io_objects_t::iterator it = io_objects.begin (); -              it != io_objects.end (); it++) -            send_term (*it); - -        //  Move the objects to the list of pending term acks. -        pending_term_acks += io_objects.size (); -        io_objects.clear (); - -        //  Process commands till we get all the termination acknowledgements. -        while (pending_term_acks) -            app_thread->process_commands (true, false); -    } - -    //  Check whether there are no session leaks. -    sessions_sync.lock (); -    zmq_assert (named_sessions.empty ()); -    zmq_assert (unnamed_sessions.empty ()); -    sessions_sync.unlock (); - -    delete this; - -    //  This function must be called after the socket is completely deallocated -    //  as it may cause termination of the whole 0MQ infrastructure. -    ctx->destroy_socket (); +        process_commands (true, false); +    //  TODO: My feeling is that the above has to be done in the dezombification +    //  loop, otherwise we may end up with number of i/o object dropping to zero +    //  even though there are more i/o objects on the way. + +    //  The above process ensures that only pipes that will arrive from now on +    //  are those initiated by sessions. These in turn have a nice property of +    //  not arriving totally asynchronously. When a session -- being an I/O +    //  object -- acknowledges its termination we are 100% sure that we'll get +    //  no new pipe from it. + +    //  Start termination of all the pipes presently associated with the socket. +    xterm_pipes (); + +    //  Send termination request to all associated I/O objects. +    //  Start waiting for the acks. Note that the actual waiting is not done +    //  in this function. Rather it is done in delayed manner as socket is +    //  being dezombified. The reason is that I/O object shutdown can take +    //  considerable amount of time in case there's still a lot of data to +    //  push to the network. +    for (io_objects_t::iterator it = io_objects.begin (); +          it != io_objects.end (); it++) +        send_term (*it); +    pending_term_acks += io_objects.size (); +    io_objects.clear (); + +    //  Note that new I/O objects may arrive even in zombie state (say new +    //  session initiated by a listener object), however, in such case number +    //  of pending acks never drops to zero. Here's the scenario: We have an +    //  pending ack for the listener object. Then 'own' commands arrives from +    //  the listener notifying the socket about new session. It immediately +    //  triggers termination request and number of of pending acks if +    //  incremented. Then term_acks arrives from the listener. Number of pending +    //  acks is decremented. Later on, the session itself will ack its +    //  termination. During the process, number of pending acks never dropped +    //  to zero and thus the socket remains safely in the zombie state. + +    //  Transfer the ownership of the socket from this application thread +    //  to the context which will take care of the rest of shutdown process. +    zombify (this);      return 0;  }  void zmq::socket_base_t::inc_seqnum ()  { -    //  NB: This function may be called from a different thread! +    //  Be aware: This function may be called from a different thread!      sent_seqnum.add (1);  } -zmq::app_thread_t *zmq::socket_base_t::get_thread () -{ -    return app_thread; -} -  bool zmq::socket_base_t::has_in ()  {      return xhas_in (); @@ -607,68 +736,133 @@ zmq::session_t *zmq::socket_base_t::find_session (uint64_t ordinal_)      return session;   } -void zmq::socket_base_t::kill (reader_t *pipe_) +bool zmq::socket_base_t::dezombify ()  { -    xkill (pipe_); -} +    zmq_assert (zombie); -void zmq::socket_base_t::revive (reader_t *pipe_) -{ -    xrevive (pipe_); -} +    //  Process any commands from other threads/sockets that may be available +    //  at the moment. +    process_commands (false, false); -void zmq::socket_base_t::revive (writer_t *pipe_) -{ -    xrevive (pipe_); +    //  If there are no more pipes attached and there are no more I/O objects +    //  owned by the socket, we can kill the zombie. +    if (!pending_term_acks && !xhas_pipes ()) { + +        //  If all objects have acknowledged their termination there should +        //  definitely be no I/O object remaining in the list. +        zmq_assert (io_objects.empty ()); + +        //  Check whether there are no session leaks. +        sessions_sync.lock (); +        zmq_assert (named_sessions.empty ()); +        zmq_assert (unnamed_sessions.empty ()); +        sessions_sync.unlock (); + +        //  Deallocate all the resources tied to this socket. +        delete this; + +        //  Notify the caller about the fact that the zombie is finally dead. +        return true; +    } + +    //  The zombie remains undead. +    return false;  } -void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, -    class writer_t *outpipe_, const blob_t &peer_identity_) +void zmq::socket_base_t::process_commands (bool block_, bool throttle_)  { -    if (inpipe_) -        inpipe_->set_endpoint (this); -    if (outpipe_) -        outpipe_->set_endpoint (this); - -    //  If the peer haven't specified it's identity, let's generate one. -    if (peer_identity_.size ()) { -        xattach_pipes (inpipe_, outpipe_, peer_identity_); +    bool received; +    command_t cmd; +    if (block_) { +        received = signaler.recv (&cmd, true); +        zmq_assert (received);      }      else { -        blob_t identity (1, 0); -        identity.append (uuid_t ().to_blob (), uuid_t::uuid_blob_len); -        xattach_pipes (inpipe_, outpipe_, identity); + +#if defined ZMQ_DELAY_COMMANDS +        //  Optimised version of command processing - it doesn't have to check +        //  for incoming commands each time. It does so only if certain time +        //  elapsed since last command processing. Command delay varies +        //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU +        //  etc. The optimisation makes sense only on platforms where getting +        //  a timestamp is a very cheap operation (tens of nanoseconds). +        if (throttle_) { + +            //  Get timestamp counter. +#if defined __GNUC__ +            uint32_t low; +            uint32_t high; +            __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high)); +            uint64_t current_time = (uint64_t) high << 32 | low; +#elif defined _MSC_VER +            uint64_t current_time = __rdtsc (); +#else +#error +#endif + +            //  Check whether certain time have elapsed since last command +            //  processing. +            if (current_time - last_processing_time <= max_command_delay) +                return; +            last_processing_time = current_time; +        } +#endif + +        //  Check whether there are any commands pending for this thread. +        received = signaler.recv (&cmd, false);      } -} -void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_) -{ -    xdetach_inpipe (pipe_); -    pipe_->set_endpoint (NULL); // ? +    //  Process all the commands available at the moment. +    while (received) { +        cmd.destination->process_command (cmd); +        received = signaler.recv (&cmd, false); +    }  } -void zmq::socket_base_t::detach_outpipe (class writer_t *pipe_) +void zmq::socket_base_t::process_stop ()  { -    xdetach_outpipe (pipe_); -    pipe_->set_endpoint (NULL); // ? +    //  Here, someone have called zmq_term while the socket was still alive. +    //  We'll zombify it so that any blocking call is interrupted and any +    //  further attempt to use the socket will return ETERM. The user is still +    //  responsible for calling zmq_close on the socket though! +    zombie = true;  }  void zmq::socket_base_t::process_own (owned_t *object_)  { +    //  If the socket is already being shut down, new owned objects are +    //  immediately asked to terminate. +    if (zombie) { +        send_term (object_); +        pending_term_acks++; +        return; +    } +      io_objects.insert (object_);  }  void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,      const blob_t &peer_identity_)  { +    //  If the socket is already being shut down, the termination process on +    //  the new pipes is started immediately. However, they are still attached +    //  as to let the process finish in a decent manner. +    if (unlikely (zombie)) { +        if (in_pipe_) +            in_pipe_->terminate (); +        if (out_pipe_) +            out_pipe_->terminate (); +    } +      attach_pipes (in_pipe_, out_pipe_, peer_identity_);  }  void zmq::socket_base_t::process_term_req (owned_t *object_)  {      //  When shutting down we can ignore termination requests from owned -    //  objects. They are going to be terminated anyway. -    if (shutting_down) +    //  objects. It means the termination request was already sent to +    //  the object. +    if (zombie)          return;      //  If I/O object is well and alive ask it to terminate. @@ -676,7 +870,7 @@ void zmq::socket_base_t::process_term_req (owned_t *object_)          io_objects.end (), object_);      //  If not found, we assume that termination request was already sent to -    //  the object so we can sagely ignore the request. +    //  the object so we can safely ignore the request.      if (it == io_objects.end ())          return; @@ -696,3 +890,32 @@ void zmq::socket_base_t::process_seqnum ()      processed_seqnum++;  } +int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_, +    size_t optvallen_) +{ +    errno = EINVAL; +    return -1; +} + +bool zmq::socket_base_t::xhas_out () +{ +    return false; +} + +int zmq::socket_base_t::xsend (zmq_msg_t *msg_, int options_) +{ +    errno = ENOTSUP; +    return -1; +} + +bool zmq::socket_base_t::xhas_in () +{ +    return false; +} + +int zmq::socket_base_t::xrecv (zmq_msg_t *msg_, int options_) +{ +    errno = ENOTSUP; +    return -1; +} +  | 
