diff options
| -rw-r--r-- | include/zmq.h | 2 | ||||
| -rw-r--r-- | src/Makefile.am | 2 | ||||
| -rw-r--r-- | src/app_thread.cpp | 26 | ||||
| -rw-r--r-- | src/app_thread.hpp | 4 | ||||
| -rw-r--r-- | src/command.hpp | 11 | ||||
| -rw-r--r-- | src/config.hpp | 8 | ||||
| -rw-r--r-- | src/i_endpoint.hpp | 33 | ||||
| -rw-r--r-- | src/object.cpp | 30 | ||||
| -rw-r--r-- | src/object.hpp | 9 | ||||
| -rw-r--r-- | src/pipe.cpp | 112 | ||||
| -rw-r--r-- | src/pipe.hpp | 102 | ||||
| -rw-r--r-- | src/session.cpp | 56 | ||||
| -rw-r--r-- | src/session.hpp | 24 | ||||
| -rw-r--r-- | src/socket_base.cpp | 227 | ||||
| -rw-r--r-- | src/socket_base.hpp | 42 | ||||
| -rw-r--r-- | src/ypipe.hpp | 22 | ||||
| -rw-r--r-- | src/yqueue.hpp | 2 | ||||
| -rw-r--r-- | src/zmq_encoder.cpp | 4 | ||||
| -rw-r--r-- | src/zmq_engine.cpp | 7 | ||||
| -rw-r--r-- | src/zmq_engine.hpp | 4 | ||||
| -rw-r--r-- | src/zmq_listener_init.cpp | 3 | 
21 files changed, 680 insertions, 50 deletions
| diff --git a/include/zmq.h b/include/zmq.h index 34ce80c..fad51ca 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -67,7 +67,7 @@ extern "C" {  //  single accept. There's no message routing or message filtering involved.  #define ZMQ_P2P 0 -//  Socket to distribute data. Recv fuction is not implemeted for this socket +//  Socket to distribute data. Recv fuction is not implemented for this socket  //  type. Messages are distributed in fanout fashion to all peers.  #define ZMQ_PUB 1 diff --git a/src/Makefile.am b/src/Makefile.am index 396e3a3..b6a4540 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -19,6 +19,7 @@ libzmq_la_SOURCES = \      io_object.hpp \      io_thread.hpp \      ip.hpp \ +    i_endpoint.hpp \      i_poller.hpp \      i_poll_events.hpp \      i_signaler.hpp \ @@ -66,6 +67,7 @@ libzmq_la_SOURCES = \      object.cpp \      options.cpp \      owned.cpp \ +    pipe.cpp \      poll.cpp \      select.cpp \      session.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 74ba357..db73ec1 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -77,7 +77,7 @@ bool zmq::app_thread_t::make_current ()      return true;  } -void zmq::app_thread_t::process_commands (bool block_) +void zmq::app_thread_t::process_commands (bool block_, bool throttle_)  {      ypollset_t::signals_t signals;      if (block_) @@ -91,24 +91,26 @@ void zmq::app_thread_t::process_commands (bool block_)          //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU          //  etc. The optimisation makes sense only on platforms where getting          //  a timestamp is a very cheap operation (tens of nanoseconds). +        if (throttle_) { -        //  Get timestamp counter. +            //  Get timestamp counter.  #if defined __GNUC__ -        uint32_t low; -        uint32_t high; -        __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high)); -        uint64_t current_time = (uint64_t) high << 32 | low; +            uint32_t low; +            uint32_t high; +            __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high)); +            uint64_t current_time = (uint64_t) high << 32 | low;  #elif defined _MSC_VER -        uint64_t current_time = __rdtsc (); +            uint64_t current_time = __rdtsc ();  #else  #error  #endif -        //  Check whether certain time have elapsed since last command -        //  processing. -        if (current_time - last_processing_time <= max_command_delay) -            return; -        last_processing_time = current_time; +            //  Check whether certain time have elapsed since last command +            //  processing. +            if (current_time - last_processing_time <= max_command_delay) +                return; +            last_processing_time = current_time; +        }  #endif          //  Check whether there are any commands pending for this thread. diff --git a/src/app_thread.hpp b/src/app_thread.hpp index e7bbf70..e45b1b2 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -53,7 +53,9 @@ namespace zmq          //  Processes commands sent to this thread (if any). If 'block' is          //  set to true, returns only after at least one command was processed. -        void process_commands (bool block_); +        //  If throttle argument is true, commands are processed at most once +        //  in a predefined time period. +        void process_commands (bool block_, bool throttle_);          //  Create a socket of a specified type.          class socket_base_t *create_socket (int type_); diff --git a/src/command.hpp b/src/command.hpp index 41c7d6c..d3bad79 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -39,6 +39,7 @@ namespace zmq              own,              attach,              bind, +            revive,              term_req,              term,              term_ack @@ -65,10 +66,18 @@ namespace zmq                  class zmq_engine_t *engine;              } attach; -            //  Sent between objects to establish pipe(s) between them. +            //  Sent from session to socket to establish pipe(s) between them.              struct { +                class owned_t *session; +                class reader_t *in_pipe; +                class writer_t *out_pipe;              } bind; +            //  Sent by pipe writer to inform dormant pipe reader that there +            //  are messages in the pipe. +            struct { +            } revive; +              //  Sent by I/O object ot the socket to request the shutdown of              //  the I/O object.              struct { diff --git a/src/config.hpp b/src/config.hpp index 88b93d7..17e67b9 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -38,6 +38,14 @@ namespace zmq          //  footprint of dispatcher.          command_pipe_granularity = 4, +        //  Determines how often does socket poll for new commands when it +        //  still has unprocessed messages to handle. Thus, if it is set to 100, +        //  socket will process 100 inbound messages before doing the poll. +        //  If there are no unprocessed messages available, poll is done +        //  immediately. Decreasing the value trades overall latency for more +        //  real-time behaviour (less latency peaks). +        inbound_poll_rate = 100, +          //  Maximal batching size for engines with receiving functionality.          //  So, if there are 10 messages that fit into the batch size, all of          //  them may be read by a single 'recv' system call, thus avoiding diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp new file mode 100644 index 0000000..bb7409e --- /dev/null +++ b/src/i_endpoint.hpp @@ -0,0 +1,33 @@ +/* +    Copyright (c) 2007-2009 FastMQ Inc. + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__ +#define __ZMQ_I_ENDPOINT_HPP_INCLUDED__ + +namespace zmq +{ + +    struct i_endpoint +    { +        virtual void revive (class reader_t *pipe_) = 0; +    }; + +} + +#endif diff --git a/src/object.cpp b/src/object.cpp index 0a25750..4d54ebf 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -20,6 +20,7 @@  #include "object.hpp"  #include "dispatcher.hpp"  #include "err.hpp" +#include "pipe.hpp"  #include "io_thread.hpp"  #include "simple_semaphore.hpp"  #include "owned.hpp" @@ -57,6 +58,10 @@ void zmq::object_t::process_command (command_t &cmd_)  {      switch (cmd_.type) { +    case command_t::revive: +        process_revive (); +        break; +      case command_t::stop:          process_stop ();          break; @@ -74,7 +79,8 @@ void zmq::object_t::process_command (command_t &cmd_)          return;      case command_t::bind: -        process_bind (); +        process_bind (cmd_.args.bind.session, +            cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);          return;      case command_t::term_req: @@ -140,11 +146,23 @@ void zmq::object_t::send_attach (session_t *destination_, zmq_engine_t *engine_)      send_command (cmd);  } -void zmq::object_t::send_bind (object_t *destination_) +void zmq::object_t::send_bind (object_t *destination_, owned_t *session_, +    reader_t *in_pipe_, writer_t *out_pipe_)  {      command_t cmd;      cmd.destination = destination_;      cmd.type = command_t::bind; +    cmd.args.bind.session = session_; +    cmd.args.bind.in_pipe = in_pipe_; +    cmd.args.bind.out_pipe = out_pipe_; +    send_command (cmd); +} + +void zmq::object_t::send_revive (object_t *destination_) +{ +    command_t cmd; +    cmd.destination = destination_; +    cmd.type = command_t::revive;      send_command (cmd);  } @@ -194,7 +212,13 @@ void zmq::object_t::process_attach (zmq_engine_t *engine_)      zmq_assert (false);  } -void zmq::object_t::process_bind () +void zmq::object_t::process_bind (owned_t *session_, +    reader_t *in_pipe_, writer_t *out_pipe_) +{ +    zmq_assert (false); +} + +void zmq::object_t::process_revive ()  {      zmq_assert (false);  } diff --git a/src/object.hpp b/src/object.hpp index 31c8c40..0dbac24 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -24,7 +24,6 @@  namespace zmq  { -      //  Base class for all objects that participate in inter-thread      //  communication. @@ -58,7 +57,9 @@ namespace zmq              class owned_t *object_);          void send_attach (class session_t *destination_,              class zmq_engine_t *engine_); -        void send_bind (object_t *destination_); +        void send_bind (object_t *destination_, class owned_t *session_, +            class reader_t *in_pipe_, class writer_t *out_pipe_); +        void send_revive (class object_t *destination_);          void send_term_req (class socket_base_t *destination_,              class owned_t *object_);          void send_term (class owned_t *destination_); @@ -70,7 +71,9 @@ namespace zmq          virtual void process_plug ();          virtual void process_own (class owned_t *object_);          virtual void process_attach (class zmq_engine_t *engine_); -        virtual void process_bind (); +        virtual void process_bind (class owned_t *session_, +            class reader_t *in_pipe_, class writer_t *out_pipe_); +        virtual void process_revive ();          virtual void process_term_req (class owned_t *object_);          virtual void process_term ();          virtual void process_term_ack (); diff --git a/src/pipe.cpp b/src/pipe.cpp new file mode 100644 index 0000000..5016631 --- /dev/null +++ b/src/pipe.cpp @@ -0,0 +1,112 @@ +/* +    Copyright (c) 2007-2009 FastMQ Inc. + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <pthread.h> + +#include "pipe.hpp" + +zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, +      uint64_t hwm_, uint64_t lwm_) : +    object_t (parent_), +    pipe (pipe_), +    peer (&pipe_->writer), +    hwm (hwm_), +    lwm (lwm_), +    index (-1), +    endpoint (NULL) +{ +} + +zmq::reader_t::~reader_t () +{ +} + +bool zmq::reader_t::read (zmq_msg_t *msg_) +{ +    return pipe->read (msg_); + +    //  TODO: Adjust the size of the pipe. +} + +void zmq::reader_t::set_endpoint (i_endpoint *endpoint_) +{ +    endpoint = endpoint_; +} + +void zmq::reader_t::set_index (int index_) +{ +    index = index_; +} + +int zmq::reader_t::get_index () +{ +    return index; +} + +void zmq::reader_t::process_revive () +{ +    endpoint->revive (this); +} + +zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, +      uint64_t hwm_, uint64_t lwm_) : +    object_t (parent_), +    pipe (pipe_), +    peer (&pipe_->reader), +    hwm (hwm_), +    lwm (lwm_) +{ +} + +zmq::writer_t::~writer_t () +{ +} + +bool zmq::writer_t::check_write (uint64_t size_) +{ +    //  TODO: Check whether hwm is exceeded. + +    return true; +} + +bool zmq::writer_t::write (struct zmq_msg_t *msg_) +{ +    pipe->write (*msg_); +    return true; + +    //  TODO: Adjust size of the pipe. +} + +void zmq::writer_t::flush () +{ +    if (!pipe->flush ()) +        send_revive (peer); +} + +zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_, +      uint64_t hwm_, uint64_t lwm_) : +    reader (reader_parent_, this, hwm_, lwm_), +    writer (writer_parent_, this, hwm_, lwm_) +{ +} + +zmq::pipe_t::~pipe_t () +{ +} + diff --git a/src/pipe.hpp b/src/pipe.hpp index 28e4b4d..d48fc47 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -22,15 +22,117 @@  #include "../include/zmq.h" +#include "stdint.hpp" +#include "i_endpoint.hpp"  #include "ypipe.hpp"  #include "config.hpp" +#include "object.hpp"  namespace zmq  { +    class reader_t : public object_t +    { +    public: + +        reader_t (class object_t *parent_, class pipe_t *pipe_, +            uint64_t hwm_, uint64_t lwm_); +        ~reader_t (); + +        //  Reads a message to the underlying pipe. +        bool read (struct zmq_msg_t *msg_); + +        //  Mnaipulation of index of the pipe. +        void set_endpoint (i_endpoint *endpoint_); +        void set_index (int index_); +        int get_index (); + +    private: + +        //  Command handlers. +        void process_revive (); + +        //  The underlying pipe. +        class pipe_t *pipe; + +        //  Pipe writer associated with the other side of the pipe. +        class object_t *peer; + +        //  High and low watermarks for in-memory storage (in bytes). +        uint64_t hwm; +        uint64_t lwm; + +        //  Positions of head and tail of the pipe (in bytes). +        uint64_t head; +        uint64_t tail; +        uint64_t last_sent_head; + +        //  Index of the pipe in the socket's list of inbound pipes. +        int index; + +        //  Endpoint (either session or socket) the pipe is attached to. +        i_endpoint *endpoint; + +        reader_t (const reader_t&); +        void operator = (const reader_t&); +    }; + +    class writer_t : public object_t +    { +    public: + +        writer_t (class object_t *parent_, class pipe_t *pipe_, +            uint64_t hwm_, uint64_t lwm_); +        ~writer_t (); + +        //  Checks whether message with specified size can be written to the +        //  pipe. If writing the message would cause high watermark to be +        //  exceeded, the function returns false. +        bool check_write (uint64_t size_); + +        //  Writes a message to the underlying pipe. Returns false if the +        //  message cannot be written because high watermark was reached. +        bool write (struct zmq_msg_t *msg_); + +        //  Flush the messages downsteam. +        void flush (); + +    private: + +        //  The underlying pipe. +        class pipe_t *pipe; + +        //  Pipe reader associated with the other side of the pipe. +        class object_t *peer; + +        //  High and low watermarks for in-memory storage (in bytes). +        uint64_t hwm; +        uint64_t lwm; + +        //  Positions of head and tail of the pipe (in bytes). +        uint64_t head; +        uint64_t tail; + +        writer_t (const writer_t&); +        void operator = (const writer_t&); +    }; +      //  Message pipe.      class pipe_t : public ypipe_t <zmq_msg_t, false, message_pipe_granularity>      { +    public: + +        pipe_t (object_t *reader_parent_, object_t *writer_parent_, +            uint64_t hwm_, uint64_t lwm_); +        ~pipe_t (); + +        reader_t reader; +        writer_t writer; + +    private: + +        pipe_t (const pipe_t&); +        void operator = (const pipe_t&);      };  } diff --git a/src/session.cpp b/src/session.cpp index fc1f858..115fb85 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -20,12 +20,17 @@  #include "session.hpp"  #include "zmq_engine.hpp"  #include "err.hpp" +#include "pipe.hpp"  zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, -      const char *name_) : +      const char *name_, const options_t &options_) :      owned_t (parent_, owner_), +    in_pipe (NULL), +    active (false), +    out_pipe (NULL),      engine (NULL), -    name (name_) +    name (name_), +    options (options_)  {  } @@ -33,18 +38,48 @@ zmq::session_t::~session_t ()  {  } +void zmq::session_t::set_inbound_pipe (reader_t *pipe_) +{ +    zmq_assert (!in_pipe); +    in_pipe = pipe_; +    active = true; +    in_pipe->set_endpoint (this); +} +void zmq::session_t::set_outbound_pipe (writer_t *pipe_) +{ +    zmq_assert (!out_pipe); +    out_pipe = pipe_; +} + +  bool zmq::session_t::read (::zmq_msg_t *msg_)  { -    return false; +    if (!active) +        return false; + +    bool fetched = in_pipe->read (msg_); +    if (!fetched) +        active = false; + +    return fetched;  }  bool zmq::session_t::write (::zmq_msg_t *msg_)  { -    return false; +    return out_pipe->write (msg_);  }  void zmq::session_t::flush ()  { +    out_pipe->flush (); +} + +void zmq::session_t::revive (reader_t *pipe_) +{ +    zmq_assert (in_pipe == pipe_); +    active = true; +    if (engine) +        engine->revive ();  }  void zmq::session_t::process_plug () @@ -56,6 +91,19 @@ void zmq::session_t::process_plug ()      //  We should syslog it and drop the session. TODO      zmq_assert (ok); +    //  If session is created by 'connect' function, it has the pipes set +    //  already. Otherwise, it's being created by the listener and the pipes +    //  are yet to be created. +    if (!in_pipe && !out_pipe) { +        pipe_t *inbound = new pipe_t (this, owner, options.hwm, options.lwm); +        zmq_assert (inbound); +        in_pipe = &inbound->reader; +        pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm); +        zmq_assert (outbound); +        out_pipe = &outbound->writer; +        send_bind (owner, this, &outbound->reader, &inbound->writer); +    } +      owned_t::process_plug ();  } diff --git a/src/session.hpp b/src/session.hpp index 6d6bcf7..b79fb4b 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -23,17 +23,22 @@  #include <string>  #include "i_inout.hpp" +#include "i_endpoint.hpp"  #include "owned.hpp"  #include "options.hpp"  namespace zmq  { -    class session_t : public owned_t, public i_inout +    class session_t : public owned_t, public i_inout, public i_endpoint      {      public: -        session_t (object_t *parent_, socket_base_t *owner_, const char *name_); +        session_t (object_t *parent_, socket_base_t *owner_, const char *name_, +            const options_t &options_); + +        void set_inbound_pipe (class reader_t *pipe_); +        void set_outbound_pipe (class writer_t *pipe_);      private: @@ -44,17 +49,32 @@ namespace zmq          bool write (::zmq_msg_t *msg_);          void flush (); +        //  i_endpoint interface implementation. +        void revive (class reader_t *pipe_); +          //  Handlers for incoming commands.          void process_plug ();          void process_unplug ();          void process_attach (class zmq_engine_t *engine_); +        //  Inbound pipe, i.e. one the session is getting messages from. +        class reader_t *in_pipe; + +        //  If true, in_pipe is active. Otherwise there are no messages to get. +        bool active; + +        //  Outbound pipe, i.e. one the socket is sending messages to. +        class writer_t *out_pipe; +          class zmq_engine_t *engine;          //  The name of the session. One that is used to register it with          //  socket-level repository of sessions.          std::string name; +        //  Inherited socket options. +        options_t options; +          session_t (const session_t&);          void operator = (const session_t&);      }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fb7bdcf..68fc82b 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -27,18 +27,23 @@  #include "err.hpp"  #include "zmq_listener.hpp"  #include "zmq_connecter.hpp" +#include "msg_content.hpp"  #include "io_thread.hpp"  #include "session.hpp"  #include "config.hpp"  #include "owned.hpp"  #include "uuid.hpp" +#include "pipe.hpp"  zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :      object_t (parent_), +    current (0), +    active (0),      pending_term_acks (0), +    ticks (0),      app_thread (parent_),      shutting_down (false) -{     +{  }  zmq::socket_base_t::~socket_base_t () @@ -65,7 +70,7 @@ zmq::socket_base_t::~socket_base_t ()          //  Process commands till we get all the termination acknowledgements.          while (pending_term_acks) -            app_thread->process_commands (true); +            app_thread->process_commands (true, false);      }      //  Check whether there are no session leaks. @@ -150,8 +155,28 @@ int zmq::socket_base_t::connect (const char *addr_)      //  Create the session.      io_thread_t *io_thread = choose_io_thread (options.affinity); -    session_t *session = new session_t (io_thread, this, session_name.c_str ()); +    session_t *session = new session_t (io_thread, this, session_name.c_str (), +        options);      zmq_assert (session); + +    //  Create inbound pipe. +    pipe_t *in_pipe = new pipe_t (this, session, options.hwm, options.lwm); +    zmq_assert (in_pipe); +    in_pipe->reader.set_endpoint (this); +    session->set_outbound_pipe (&in_pipe->writer); +    in_pipes.push_back (std::make_pair (&in_pipe->reader, session)); +    in_pipes.back ().first->set_index (active); +    in_pipes [active].first->set_index (in_pipes.size () - 1); +    std::swap (in_pipes.back (), in_pipes [active]); +    active++; + +    //  Create outbound pipe. +    pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm); +    zmq_assert (out_pipe); +    session->set_inbound_pipe (&out_pipe->reader); +    out_pipes.push_back (std::make_pair (&out_pipe->writer, session)); + +    //  Activate the session.      send_plug (session);      send_own (this, session); @@ -173,17 +198,79 @@ int zmq::socket_base_t::connect (const char *addr_)  int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)  { -    zmq_assert (false); +    //  Process pending commands, if any. +    app_thread->process_commands (false, true); + +    //  Try to send the message. +    bool sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH)); + +    if (!(flags_ & ZMQ_NOBLOCK)) { + +        //  Oops, we couldn't send the message. Wait for the next +        //  command, process it and try to send the message again. +        while (!sent) { +            app_thread->process_commands (true, false); +            sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH)); +        } +    } +    else if (!sent) { +        errno = EAGAIN; +        return -1; +    } + +    return 0;  }  int zmq::socket_base_t::flush ()  { -    zmq_assert (false); +    for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); +          it++) +        it->first->flush (); + +    return 0;  }  int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)  { -    zmq_assert (false); +    //  If the message cannot be fetched immediately, there are two scenarios. +    //  For non-blocking recv, commands are processed in case there's a message +    //  already waiting we don't know about. If it's not, return EAGAIN. +    //  In blocking scenario, commands are processed over and over again until +    //  we are able to fetch a message. +    bool fetched = fetch (msg_); +    if (!fetched) { +        if (flags_ & ZMQ_NOBLOCK) { +            app_thread->process_commands (false, false); +            fetched = fetch (msg_); +        } +        else  { +            while (!fetched) { +                app_thread->process_commands (true, false); +                ticks = 0; +                fetched = fetch (msg_); +            } +        } +    } + +    //  Once every inbound_poll_rate messages check for signals and process +    //  incoming commands. This happens only if we are not polling altogether +    //  because there are messages available all the time. If poll occurs, +    //  ticks is set to zero and thus we avoid this code. +    // +    //  Note that 'recv' uses different command throttling algorithm (the one +    //  described above) from the one used by 'send'. This is because counting +    //  ticks is more efficient than doing rdtsc all the time. +    if (++ticks == inbound_poll_rate) { +        app_thread->process_commands (false, false); +        ticks = 0; +    } + +    if (!fetched) { +        errno = EAGAIN; +        return -1; +    } + +    return 0;  }  int zmq::socket_base_t::close () @@ -229,11 +316,35 @@ zmq::session_t *zmq::socket_base_t::find_session (const char *name_)      return it->second;      } +void zmq::socket_base_t::revive (reader_t *pipe_) +{ +    //  Move the pipe to the list of active pipes. +    in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index (); +    in_pipes [index].first->set_index (active); +    in_pipes [active].first->set_index (index);     +    std::swap (in_pipes [index], in_pipes [active]); +    active++; +} +  void zmq::socket_base_t::process_own (owned_t *object_)  {      io_objects.insert (object_);  } +void zmq::socket_base_t::process_bind (owned_t *session_, +    reader_t *in_pipe_, writer_t *out_pipe_) +{ +    zmq_assert (in_pipe_); +    in_pipe_->set_endpoint (this); +    in_pipes.push_back (std::make_pair (in_pipe_, session_)); +    in_pipes.back ().first->set_index (active); +    in_pipes [active].first->set_index (in_pipes.size () - 1); +    std::swap (in_pipes.back (), in_pipes [active]); +    active++; +    zmq_assert (out_pipe_); +    out_pipes.push_back (std::make_pair (out_pipe_, session_)); +} +  void zmq::socket_base_t::process_term_req (owned_t *object_)  {      //  When shutting down we can ignore termination requests from owned @@ -260,3 +371,107 @@ void zmq::socket_base_t::process_term_ack ()      zmq_assert (pending_term_acks);      pending_term_acks--;  } + +bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_) +{ +    int pipes_count = out_pipes.size (); + +    //  If there are no pipes available, simply drop the message. +    if (pipes_count == 0) { +        int rc = zmq_msg_close (msg_); +        zmq_assert (rc == 0); +        rc = zmq_msg_init (msg_); +        zmq_assert (rc == 0); +        return true; +    } + +    //  First check whether all pipes are available for writing. +    for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); +          it++) +        if (!it->first->check_write (zmq_msg_size (msg_))) +            return false; + +    msg_content_t *content = (msg_content_t*) msg_->content; + +    //  For VSMs the copying is straighforward. +    if (content == (msg_content_t*) ZMQ_VSM) { +        for (out_pipes_t::iterator it = out_pipes.begin (); +              it != out_pipes.end (); it++) { +            it->first->write (msg_); +            if (flush_) +                it->first->flush (); +        } +        int rc = zmq_msg_init (msg_); +        zmq_assert (rc == 0); +        return true; +    } + +    //  Optimisation for the case when there's only a single pipe +    //  to send the message to - no refcount adjustment i.e. no atomic +    //  operations are needed. +    if (pipes_count == 1) { +        out_pipes.begin ()->first->write (msg_); +        if (flush_) +            out_pipes.begin ()->first->flush (); +        int rc = zmq_msg_init (msg_); +        zmq_assert (rc == 0); +        return true; +    } + +    //  There are at least 2 destinations for the message. That means we have +    //  to deal with reference counting. First add N-1 references to +    //  the content (we are holding one reference anyway, that's why -1). +    if (msg_->shared) +        content->refcnt.add (pipes_count - 1); +    else { +        content->refcnt.set (pipes_count); +        msg_->shared = true; +    } + +    //  Push the message to all destinations. +    for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); +          it++) { +        it->first->write (msg_); +        if (flush_) +            it->first->flush (); +    } | 
