diff options
| -rw-r--r-- | src/command.hpp | 11 | ||||
| -rw-r--r-- | src/dispatcher.cpp | 20 | ||||
| -rw-r--r-- | src/dispatcher.hpp | 18 | ||||
| -rw-r--r-- | src/i_endpoint.hpp | 2 | ||||
| -rw-r--r-- | src/object.cpp | 44 | ||||
| -rw-r--r-- | src/object.hpp | 8 | ||||
| -rw-r--r-- | src/pipe.cpp | 81 | ||||
| -rw-r--r-- | src/pipe.hpp | 30 | ||||
| -rw-r--r-- | src/session.cpp | 18 | ||||
| -rw-r--r-- | src/session.hpp | 2 | ||||
| -rw-r--r-- | src/socket_base.cpp | 80 | ||||
| -rw-r--r-- | src/socket_base.hpp | 13 | 
12 files changed, 290 insertions, 37 deletions
diff --git a/src/command.hpp b/src/command.hpp index d3bad79..d16d4fa 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -40,6 +40,8 @@ namespace zmq              attach,              bind,              revive, +            pipe_term, +            pipe_term_ack,              term_req,              term,              term_ack @@ -78,6 +80,15 @@ namespace zmq              struct {              } revive; +            //  Sent by pipe reader to pipe writer to ask it to terminate +            //  its end of the pipe. +            struct { +            } pipe_term; + +            //  Pipe writer acknowledges pipe_term command. +            struct { +            } pipe_term_ack; +              //  Sent by I/O object ot the socket to request the shutdown of              //  the I/O object.              struct { diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index c0f4541..71e20df 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -83,6 +83,10 @@ zmq::dispatcher_t::~dispatcher_t ()      for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)          delete io_threads [i]; +    //  Deallocate all the orphaned pipes. +    for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it++) +        delete *it; +      delete [] command_pipes;  #ifdef ZMQ_HAVE_WINDOWS @@ -146,3 +150,19 @@ zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_)      return io_threads [result];  } + +void zmq::dispatcher_t::register_pipe (class pipe_t *pipe_) +{ +    pipes_sync.lock (); +    bool inserted = pipes.insert (pipe_).second; +    zmq_assert (inserted); +    pipes_sync.unlock (); +} + +void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_) +{ +    pipes_sync.lock (); +    pipes_t::size_type erased = pipes.erase (pipe_); +    zmq_assert (erased == 1); +    pipes_sync.unlock (); +} diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index 08596cb..cb445ef 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -21,6 +21,7 @@  #define __ZMQ_DISPATCHER_HPP_INCLUDED__  #include <vector> +#include <set>  #include <map>  #include <string> @@ -85,6 +86,11 @@ namespace zmq          //  Taskset specifies which I/O threads are eligible (0 = all).          class io_thread_t *choose_io_thread (uint64_t taskset_); +        //  All pipes are registered with the dispatcher so that even the +        //  orphaned pipes can be deallocated on the terminal shutdown. +        void register_pipe (class pipe_t *pipe_); +        void unregister_pipe (class pipe_t *pipe_); +      private:          //  Returns the app thread associated with the current thread. @@ -112,6 +118,18 @@ namespace zmq          //  Synchronisation of accesses to shared thread data.          mutex_t threads_sync; +        //  As pipes may reside in orphaned state in particular moments +        //  of the pipe shutdown process, i.e. neither pipe reader nor +        //  pipe writer hold reference to the pipe, we have to hold references +        //  to all pipes in dispatcher so that we can deallocate them +        //  during terminal shutdown even though it conincides with the +        //  pipe being in the orphaned state. +        typedef std::set <class pipe_t*> pipes_t; +        pipes_t pipes; + +        // Synchronisation of access to the pipes repository. +        mutex_t pipes_sync; +          dispatcher_t (const dispatcher_t&);          void operator = (const dispatcher_t&);      }; diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp index bb7409e..14a479e 100644 --- a/src/i_endpoint.hpp +++ b/src/i_endpoint.hpp @@ -26,6 +26,8 @@ namespace zmq      struct i_endpoint      {          virtual void revive (class reader_t *pipe_) = 0; +        virtual void detach_inpipe (class reader_t *pipe_) = 0; +        virtual void detach_outpipe (class writer_t *pipe_) = 0;      };  } diff --git a/src/object.cpp b/src/object.cpp index 4d54ebf..b3cf898 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -83,6 +83,14 @@ void zmq::object_t::process_command (command_t &cmd_)              cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);          return; +    case command_t::pipe_term: +        process_pipe_term (); +        return; + +    case command_t::pipe_term_ack: +        process_pipe_term_ack (); +        return; +      case command_t::term_req:          process_term_req (cmd_.args.term_req.object);          return; @@ -100,6 +108,16 @@ void zmq::object_t::process_command (command_t &cmd_)      }  } +void zmq::object_t::register_pipe (class pipe_t *pipe_) +{ +    dispatcher->register_pipe (pipe_); +} + +void zmq::object_t::unregister_pipe (class pipe_t *pipe_) +{ +    dispatcher->unregister_pipe (pipe_); +} +  zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)  {      return dispatcher->choose_io_thread (taskset_); @@ -166,6 +184,22 @@ void zmq::object_t::send_revive (object_t *destination_)      send_command (cmd);  } +void zmq::object_t::send_pipe_term (writer_t *destination_) +{ +    command_t cmd; +    cmd.destination = destination_; +    cmd.type = command_t::pipe_term; +    send_command (cmd); +} + +void zmq::object_t::send_pipe_term_ack (reader_t *destination_) +{ +    command_t cmd; +    cmd.destination = destination_; +    cmd.type = command_t::pipe_term_ack; +    send_command (cmd); +} +  void zmq::object_t::send_term_req (socket_base_t *destination_,      owned_t *object_)  { @@ -223,6 +257,16 @@ void zmq::object_t::process_revive ()      zmq_assert (false);  } +void zmq::object_t::process_pipe_term () +{ +    zmq_assert (false); +} + +void zmq::object_t::process_pipe_term_ack () +{ +    zmq_assert (false); +} +  void zmq::object_t::process_term_req (owned_t *object_)  {      zmq_assert (false); diff --git a/src/object.hpp b/src/object.hpp index 0dbac24..8ce569e 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -42,6 +42,10 @@ namespace zmq          int get_thread_slot ();          void process_command (struct command_t &cmd_); +        //  Allow pipe to access corresponding dispatcher functions. +        void register_pipe (class pipe_t *pipe_); +        void unregister_pipe (class pipe_t *pipe_); +      protected:          //  Derived object can use following functions to interact with @@ -60,6 +64,8 @@ namespace zmq          void send_bind (object_t *destination_, class owned_t *session_,              class reader_t *in_pipe_, class writer_t *out_pipe_);          void send_revive (class object_t *destination_); +        void send_pipe_term (class writer_t *destination_); +        void send_pipe_term_ack (class reader_t *destination_);          void send_term_req (class socket_base_t *destination_,              class owned_t *object_);          void send_term (class owned_t *destination_); @@ -74,6 +80,8 @@ namespace zmq          virtual void process_bind (class owned_t *session_,              class reader_t *in_pipe_, class writer_t *out_pipe_);          virtual void process_revive (); +        virtual void process_pipe_term (); +        virtual void process_pipe_term_ack ();          virtual void process_term_req (class owned_t *object_);          virtual void process_term ();          virtual void process_term_ack (); diff --git a/src/pipe.cpp b/src/pipe.cpp index 5016631..3748ae9 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -19,6 +19,8 @@  #include <pthread.h> +#include <../include/zmq.h> +  #include "pipe.hpp"  zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, @@ -39,9 +41,21 @@ zmq::reader_t::~reader_t ()  bool zmq::reader_t::read (zmq_msg_t *msg_)  { -    return pipe->read (msg_); +    if (!pipe->read (msg_)) +        return false; + +    //  If delimiter was read, start termination process of the pipe. +    unsigned char *offset = 0; +    if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) { +        if (endpoint) +            endpoint->detach_inpipe (this); +        term (); +        return false; +    }      //  TODO: Adjust the size of the pipe. + +    return true;  }  void zmq::reader_t::set_endpoint (i_endpoint *endpoint_) @@ -59,19 +73,48 @@ int zmq::reader_t::get_index ()      return index;  } +void zmq::reader_t::term () +{ +    endpoint = NULL; +    send_pipe_term (peer); +} +  void zmq::reader_t::process_revive ()  {      endpoint->revive (this);  } +void zmq::reader_t::process_pipe_term_ack () +{ +    peer = NULL; +    delete pipe; +} +  zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_,        uint64_t hwm_, uint64_t lwm_) :      object_t (parent_),      pipe (pipe_),      peer (&pipe_->reader),      hwm (hwm_), -    lwm (lwm_) +    lwm (lwm_), +    index (-1), +    endpoint (NULL) +{ +} + +void zmq::writer_t::set_endpoint (i_endpoint *endpoint_) +{ +    endpoint = endpoint_; +} + +void zmq::writer_t::set_index (int index_) +{ +    index = index_; +} + +int zmq::writer_t::get_index ()  { +    return index;  }  zmq::writer_t::~writer_t () @@ -99,14 +142,46 @@ void zmq::writer_t::flush ()          send_revive (peer);  } +void zmq::writer_t::term () +{ +    endpoint = NULL; + +    //  Push delimiter into the pipe. +    //  Trick the compiler to belive that the tag is a valid pointer. +    zmq_msg_t msg; +    const unsigned char *offset = 0; +    msg.content = (void*) (offset + ZMQ_DELIMITER); +    msg.shared = false; +    pipe->write (msg); +    pipe->flush (); +} + +void zmq::writer_t::process_pipe_term () +{ +    if (endpoint) +        endpoint->detach_outpipe (this); + +    reader_t *p = peer; +    peer = NULL; +    send_pipe_term_ack (p); +} +  zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,        uint64_t hwm_, uint64_t lwm_) :      reader (reader_parent_, this, hwm_, lwm_),      writer (writer_parent_, this, hwm_, lwm_)  { +    reader.register_pipe (this);  }  zmq::pipe_t::~pipe_t ()  { +    //  Deallocate all the unread messages in the pipe. We have to do it by +    //  hand because zmq_msg_t is a POD, not a class, so there's no associated +    //  destructor. +    zmq_msg_t msg; +    while (read (&msg)) +       zmq_msg_close (&msg); + +    reader.unregister_pipe (this);  } - diff --git a/src/pipe.hpp b/src/pipe.hpp index b7593c7..b4e592a 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -39,25 +39,29 @@ namespace zmq              uint64_t hwm_, uint64_t lwm_);          ~reader_t (); +        void set_endpoint (i_endpoint *endpoint_); +          //  Reads a message to the underlying pipe.          bool read (struct zmq_msg_t *msg_); -        void set_endpoint (i_endpoint *endpoint_); -          //  Mnaipulation of index of the pipe.          void set_index (int index_);          int get_index (); +        //  Ask pipe to terminate. +        void term (); +      private:          //  Command handlers.          void process_revive (); +        void process_pipe_term_ack ();          //  The underlying pipe.          class pipe_t *pipe;          //  Pipe writer associated with the other side of the pipe. -        class object_t *peer; +        class writer_t *peer;          //  High and low watermarks for in-memory storage (in bytes).          uint64_t hwm; @@ -86,6 +90,8 @@ namespace zmq              uint64_t hwm_, uint64_t lwm_);          ~writer_t (); +        void set_endpoint (i_endpoint *endpoint_); +          //  Checks whether message with specified size can be written to the          //  pipe. If writing the message would cause high watermark to be          //  exceeded, the function returns false. @@ -98,13 +104,23 @@ namespace zmq          //  Flush the messages downsteam.          void flush (); +        //  Mnaipulation of index of the pipe. +        void set_index (int index_); +        int get_index (); + +        //  Ask pipe to terminate. +        void term (); +      private: +        //  Command handlers. +        void process_pipe_term (); +          //  The underlying pipe.          class pipe_t *pipe;          //  Pipe reader associated with the other side of the pipe. -        class object_t *peer; +        class reader_t *peer;          //  High and low watermarks for in-memory storage (in bytes).          uint64_t hwm; @@ -114,6 +130,12 @@ namespace zmq          uint64_t head;          uint64_t tail; +        //  Index of the pipe in the socket's list of outbound pipes. +        int index; + +        //  Endpoint (either session or socket) the pipe is attached to. +        i_endpoint *endpoint; +          writer_t (const writer_t&);          void operator = (const writer_t&);      }; diff --git a/src/session.cpp b/src/session.cpp index 0b1b947..d667851 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -36,6 +36,11 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,  zmq::session_t::~session_t ()  { +    //  Ask associated pipes to terminate. +    if (in_pipe) +        in_pipe->term (); +    if (out_pipe) +        out_pipe->term ();  }  void zmq::session_t::set_inbound_pipe (reader_t *pipe_) @@ -49,6 +54,7 @@ void zmq::session_t::set_outbound_pipe (writer_t *pipe_)  {      zmq_assert (!out_pipe);      out_pipe = pipe_; +    out_pipe->set_endpoint (this);  } @@ -92,6 +98,17 @@ void zmq::session_t::revive (reader_t *pipe_)          engine->revive ();  } +void zmq::session_t::detach_inpipe (reader_t *pipe_) +{ +    active = false; +    in_pipe = NULL; +} + +void zmq::session_t::detach_outpipe (writer_t *pipe_) +{ +    out_pipe = NULL; +} +  void zmq::session_t::process_plug ()  {      //  Register the session with the socket. @@ -112,6 +129,7 @@ void zmq::session_t::process_plug ()          pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm);          zmq_assert (outbound);          out_pipe = &outbound->writer; +        out_pipe->set_endpoint (this);          send_bind (owner, this, &outbound->reader, &inbound->writer);      } diff --git a/src/session.hpp b/src/session.hpp index 4a0882b..ba5bcdd 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -52,6 +52,8 @@ namespace zmq          //  i_endpoint interface implementation.          void revive (class reader_t *pipe_); +        void detach_inpipe (class reader_t *pipe_); +        void detach_outpipe (class writer_t *pipe_);          //  Handlers for incoming commands.          void process_plug (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 68fc82b..e14065b 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -50,6 +50,16 @@ zmq::socket_base_t::~socket_base_t ()  {      shutting_down = true; +    //  Ask all pipes to terminate. +    for (in_pipes_t::iterator it = in_pipes.begin (); +          it != in_pipes.end (); it++) +        (*it)->term (); +    in_pipes.clear (); +    for (out_pipes_t::iterator it = out_pipes.begin (); +          it != out_pipes.end (); it++) +        (*it)->term (); +    out_pipes.clear (); +      while (true) {          //  On third pass of the loop there should be no more I/O objects @@ -164,17 +174,18 @@ int zmq::socket_base_t::connect (const char *addr_)      zmq_assert (in_pipe);      in_pipe->reader.set_endpoint (this);      session->set_outbound_pipe (&in_pipe->writer); -    in_pipes.push_back (std::make_pair (&in_pipe->reader, session)); -    in_pipes.back ().first->set_index (active); -    in_pipes [active].first->set_index (in_pipes.size () - 1); +    in_pipes.push_back (&in_pipe->reader); +    in_pipes.back ()->set_index (active); +    in_pipes [active]->set_index (in_pipes.size () - 1);      std::swap (in_pipes.back (), in_pipes [active]);      active++;      //  Create outbound pipe.      pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm);      zmq_assert (out_pipe); +    out_pipe->writer.set_endpoint (this);      session->set_inbound_pipe (&out_pipe->reader); -    out_pipes.push_back (std::make_pair (&out_pipe->writer, session)); +    out_pipes.push_back (&out_pipe->writer);      //  Activate the session.      send_plug (session); @@ -225,7 +236,7 @@ int zmq::socket_base_t::flush ()  {      for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();            it++) -        it->first->flush (); +        (*it)->flush ();      return 0;  } @@ -320,12 +331,38 @@ void zmq::socket_base_t::revive (reader_t *pipe_)  {      //  Move the pipe to the list of active pipes.      in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index (); -    in_pipes [index].first->set_index (active); -    in_pipes [active].first->set_index (index);     +    in_pipes [index]->set_index (active); +    in_pipes [active]->set_index (index);          std::swap (in_pipes [index], in_pipes [active]);      active++;  } +void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_) +{ +    //  Remove the pipe from the list of inbound pipes. +    in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index (); +    if (index < active) { +        in_pipes [index]->set_index (active - 1); +        in_pipes [active - 1]->set_index (index); +        std::swap (in_pipes [index], in_pipes [active - 1]); +        active--; +        index = active; +    } +    in_pipes [index]->set_index (in_pipes.size () - 1); +    in_pipes [in_pipes.size () - 1]->set_index (index); +    std::swap (in_pipes [index], in_pipes [in_pipes.size () - 1]); +    in_pipes.pop_back (); +} + +void zmq::socket_base_t::detach_outpipe (class writer_t *pipe_) +{ +    out_pipes_t::size_type index = (out_pipes_t::size_type) pipe_->get_index (); +    out_pipes [index]->set_index (out_pipes.size () - 1); +    out_pipes [out_pipes.size () - 1]->set_index (index); +    std::swap (out_pipes [index], out_pipes [out_pipes.size () - 1]); +    out_pipes.pop_back (); +} +  void zmq::socket_base_t::process_own (owned_t *object_)  {      io_objects.insert (object_); @@ -336,13 +373,14 @@ void zmq::socket_base_t::process_bind (owned_t *session_,  {      zmq_assert (in_pipe_);      in_pipe_->set_endpoint (this); -    in_pipes.push_back (std::make_pair (in_pipe_, session_)); -    in_pipes.back ().first->set_index (active); -    in_pipes [active].first->set_index (in_pipes.size () - 1); +    in_pipes.push_back (in_pipe_); +    in_pipes.back ()->set_index (active); +    in_pipes [active]->set_index (in_pipes.size () - 1);      std::swap (in_pipes.back (), in_pipes [active]);      active++;      zmq_assert (out_pipe_); -    out_pipes.push_back (std::make_pair (out_pipe_, session_)); +    out_pipe_->set_endpoint (this); +    out_pipes.push_back (out_pipe_);  }  void zmq::socket_base_t::process_term_req (owned_t *object_) @@ -388,7 +426,7 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)      //  First check whether all pipes are available for writing.      for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();            it++) -        if (!it->first->check_write (zmq_msg_size (msg_))) +        if (!(*it)->check_write (zmq_msg_size (msg_)))              return false;      msg_content_t *content = (msg_content_t*) msg_->content; @@ -397,9 +435,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)      if (content == (msg_content_t*) ZMQ_VSM) {          for (out_pipes_t::iterator it = out_pipes.begin ();                it != out_pipes.end (); it++) { -            it->first->write (msg_); +            (*it)->write (msg_);              if (flush_) -                it->first->flush (); +                (*it)->flush ();          }          int rc = zmq_msg_init (msg_);          zmq_assert (rc == 0); @@ -410,9 +448,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)      //  to send the message to - no refcount adjustment i.e. no atomic      //  operations are needed.      if (pipes_count == 1) { -        out_pipes.begin ()->first->write (msg_); +        (*out_pipes.begin ())->write (msg_);          if (flush_) -            out_pipes.begin ()->first->flush (); +            (*out_pipes.begin ())->flush ();          int rc = zmq_msg_init (msg_);          zmq_assert (rc == 0);          return true; @@ -431,9 +469,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)      //  Push the message to all destinations.      for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();            it++) { -        it->first->write (msg_); +        (*it)->write (msg_);          if (flush_) -            it->first->flush (); +            (*it)->flush ();      }      //  Detach the original message from the data buffer. @@ -451,13 +489,13 @@ bool zmq::socket_base_t::fetch (zmq_msg_t *msg_)      //  Round-robin over the pipes to get next message.      for (int count = active; count != 0; count--) { -        bool fetched = in_pipes [current].first->read (msg_); +        bool fetched = in_pipes [current]->read (msg_);          //  If there's no message in the pipe, move it to the list of          //  non-active pipes.          if (!fetched) { -            in_pipes [current].first->set_index (active - 1); -            in_pipes [active - 1].first->set_index (current); +            in_pipes [current]->set_index (active - 1); +            in_pipes [active - 1]->set_index (current);              std::swap (in_pipes [current], in_pipes [active - 1]);              active--;          } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 1f04bda..490c09a 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -24,7 +24,6 @@  #include <map>  #include <vector>  #include <string> -#include <utility>  #include "i_endpoint.hpp"  #include "object.hpp" @@ -62,6 +61,8 @@ namespace zmq          //  i_endpoint interface implementation.          void revive (class reader_t *pipe_); +        void detach_inpipe (class reader_t *pipe_); +        void detach_outpipe (class writer_t *pipe_);      private: @@ -86,10 +87,7 @@ namespace zmq          io_objects_t io_objects;          //  Inbound pipes, i.e. those the socket is getting messages from. -        //  The second member in the pair indicates the object on the other -        //  side of the pipe. -        typedef std::vector <std::pair <class reader_t*, owned_t*> > -            in_pipes_t; +        typedef std::vector <class reader_t*> in_pipes_t;          in_pipes_t in_pipes;          //  Index of the next inbound pipe to read messages from. @@ -100,10 +98,7 @@ namespace zmq          in_pipes_t::size_type active;          //  Outbound pipes, i.e. those the socket is sending messages to. -        //  The second member in the pair indicates the object on the other -        //  side of the pipe. -        typedef std::vector <std::pair <class writer_t*, owned_t*> > -            out_pipes_t; +        typedef std::vector <class writer_t*> out_pipes_t;          out_pipes_t out_pipes;          //  Number of I/O objects that were already asked to terminate  | 
