diff options
| author | Martin Sustrik <sustrik@250bpm.com> | 2010-08-11 14:09:56 +0200 | 
|---|---|---|
| committer | Martin Sustrik <sustrik@250bpm.com> | 2010-08-25 15:39:20 +0200 | 
| commit | d13933bc62fce71b5a58118020e0dd3776e79aa9 (patch) | |
| tree | 6586d5b9cc637dbf8acae4b32d24da9c8e046014 /src/pub.cpp | |
| parent | ee1f1af0091d9bdffa0e5ce1783da925b3cd7e56 (diff) | |
I/O object hierarchy implemented
Diffstat (limited to 'src/pub.cpp')
| -rw-r--r-- | src/pub.cpp | 26 | 
1 files changed, 20 insertions, 6 deletions
diff --git a/src/pub.cpp b/src/pub.cpp index d1d1c72..2d0dea2 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -26,7 +26,8 @@  zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t slot_) :      socket_base_t (parent_, slot_), -    active (0) +    active (0), +    terminating (false)  {      options.requires_in = false;      options.requires_out = true; @@ -40,6 +41,7 @@ zmq::pub_t::~pub_t ()  void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,      class writer_t *outpipe_, const blob_t &peer_identity_)  { +    zmq_assert (!terminating);      zmq_assert (!inpipe_);      outpipe_->set_event_sink (this); @@ -47,18 +49,26 @@ void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,      pipes.push_back (outpipe_);      pipes.swap (active, pipes.size () - 1);      active++; + +    if (terminating) { +        register_term_acks (1); +        outpipe_->terminate (); +    }  } -void zmq::pub_t::xterm_pipes () +void zmq::pub_t::process_term ()  { +    terminating = true; +      //  Start shutdown process for all the pipes.      for (pipes_t::size_type i = 0; i != pipes.size (); i++)          pipes [i]->terminate (); -} -bool zmq::pub_t::xhas_pipes () -{ -    return !pipes.empty (); +    //  Wait for pipes to terminate before terminating yourself. +    register_term_acks (pipes.size ()); + +    //  Continue with the termination immediately. +    socket_base_t::process_term ();  }  void zmq::pub_t::activated (writer_t *pipe_) @@ -75,6 +85,10 @@ void zmq::pub_t::terminated (writer_t *pipe_)      if (pipes.index (pipe_) < active)          active--;      pipes.erase (pipe_); + +    //  If we are already terminating, wait for one term ack less. +    if (terminating) +        unregister_term_ack ();  }  int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)  | 
