diff options
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r-- | src/socket_base.cpp | 59 |
1 files changed, 57 insertions, 2 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp index baa4bd2..fae55f2 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -211,10 +211,15 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) return 0; } -void zmq::socket_base_t::attach_pipe (class pipe_t *pipe_, +void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { - // If the peer haven't specified it's identity, let's generate one. + // First, register the pipe so that we can terminate it later on. + pipe_->set_event_sink (this); + pipes.push_back (pipe_); + + // Then, pass the pipe to the specific socket type. + // If the peer haven't specified it's identity, let's generate one. if (peer_identity_.size ()) { xattach_pipe (pipe_, peer_identity_); } @@ -223,6 +228,13 @@ void zmq::socket_base_t::attach_pipe (class pipe_t *pipe_, generate_uuid ((unsigned char*) identity.data () + 1); xattach_pipe (pipe_, identity); } + + // If the socket is already being closed, ask any new pipes to terminate + // straight away. + if (is_terminating ()) { + register_term_acks (1); + pipe_->terminate (); + } } int zmq::socket_base_t::setsockopt (int option_, const void *optval_, @@ -635,9 +647,15 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_) void zmq::socket_base_t::start_reaping (poller_t *poller_) { + // Plug the socket to the reaper thread. poller = poller_; handle = poller->add_fd (mailbox.get_fd (), this); poller->set_pollin (handle); + + // Initialise the termination and check whether it can be deallocated + // immediately. + terminate (); + check_destroy (); } int zmq::socket_base_t::process_commands (bool block_, bool throttle_) @@ -720,6 +738,11 @@ void zmq::socket_base_t::process_term (int linger_) // will be initiated. unregister_endpoints (this); + // Ask all attached pipes to terminate. + for (pipes_t::size_type i = 0; i != pipes.size (); ++i) + pipes [i]->terminate (); + register_term_acks (pipes.size ()); + // Continue the termination process immediately. own_t::process_term (linger_); } @@ -758,6 +781,15 @@ int zmq::socket_base_t::xrecv (msg_t *msg_, int options_) return -1; } +void zmq::socket_base_t::xread_activated (pipe_t *pipe_) +{ + zmq_assert (false); +} +void zmq::socket_base_t::xwrite_activated (pipe_t *pipe_) +{ + zmq_assert (false); +} + void zmq::socket_base_t::in_event () { // Process any commands from other threads/sockets that may be available @@ -794,3 +826,26 @@ void zmq::socket_base_t::check_destroy () own_t::process_destroy (); } } + +void zmq::socket_base_t::read_activated (pipe_t *pipe_) +{ + xread_activated (pipe_); +} + +void zmq::socket_base_t::write_activated (pipe_t *pipe_) +{ + xwrite_activated (pipe_); +} + +void zmq::socket_base_t::terminated (pipe_t *pipe_) +{ + // Notify the specific socket type about the pipe termination. + xterminated (pipe_); + + // Remove the pipe from the list of attached pipes and confirm its + // termination if we are already shutting down. + pipes.erase (pipe_); + if (is_terminating ()) + unregister_term_ack (); +} + |