diff options
Diffstat (limited to 'src/pub.cpp')
-rw-r--r-- | src/pub.cpp | 26 |
1 files changed, 20 insertions, 6 deletions
diff --git a/src/pub.cpp b/src/pub.cpp index d1d1c72..2d0dea2 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -26,7 +26,8 @@ zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t slot_) : socket_base_t (parent_, slot_), - active (0) + active (0), + terminating (false) { options.requires_in = false; options.requires_out = true; @@ -40,6 +41,7 @@ zmq::pub_t::~pub_t () void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_) { + zmq_assert (!terminating); zmq_assert (!inpipe_); outpipe_->set_event_sink (this); @@ -47,18 +49,26 @@ void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, pipes.push_back (outpipe_); pipes.swap (active, pipes.size () - 1); active++; + + if (terminating) { + register_term_acks (1); + outpipe_->terminate (); + } } -void zmq::pub_t::xterm_pipes () +void zmq::pub_t::process_term () { + terminating = true; + // Start shutdown process for all the pipes. for (pipes_t::size_type i = 0; i != pipes.size (); i++) pipes [i]->terminate (); -} -bool zmq::pub_t::xhas_pipes () -{ - return !pipes.empty (); + // Wait for pipes to terminate before terminating yourself. + register_term_acks (pipes.size ()); + + // Continue with the termination immediately. + socket_base_t::process_term (); } void zmq::pub_t::activated (writer_t *pipe_) @@ -75,6 +85,10 @@ void zmq::pub_t::terminated (writer_t *pipe_) if (pipes.index (pipe_) < active) active--; pipes.erase (pipe_); + + // If we are already terminating, wait for one term ack less. + if (terminating) + unregister_term_ack (); } int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) |