summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-05-23 20:30:01 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-05-23 20:30:01 +0200
commit0f6f7276e32c01ccfe86fb76741a52ac6ffc87af (patch)
tree3f2cec589f6243742da7e79028633d35f8b362db /src/socket_base.cpp
parentacf0b0e515515e51ad32ba7a2d147ce703579478 (diff)
Move the pipe termination code to socket_base_t
So far, the pipe termination code was spread among socket type classes, fair queuer, load balancer, etc. This patch moves all the associated logic to a single place. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp59
1 files changed, 57 insertions, 2 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index baa4bd2..fae55f2 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -211,10 +211,15 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
return 0;
}
-void zmq::socket_base_t::attach_pipe (class pipe_t *pipe_,
+void zmq::socket_base_t::attach_pipe (pipe_t *pipe_,
const blob_t &peer_identity_)
{
- // If the peer haven't specified it's identity, let's generate one.
+ // First, register the pipe so that we can terminate it later on.
+ pipe_->set_event_sink (this);
+ pipes.push_back (pipe_);
+
+ // Then, pass the pipe to the specific socket type.
+ // If the peer haven't specified it's identity, let's generate one.
if (peer_identity_.size ()) {
xattach_pipe (pipe_, peer_identity_);
}
@@ -223,6 +228,13 @@ void zmq::socket_base_t::attach_pipe (class pipe_t *pipe_,
generate_uuid ((unsigned char*) identity.data () + 1);
xattach_pipe (pipe_, identity);
}
+
+ // If the socket is already being closed, ask any new pipes to terminate
+ // straight away.
+ if (is_terminating ()) {
+ register_term_acks (1);
+ pipe_->terminate ();
+ }
}
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
@@ -635,9 +647,15 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)
void zmq::socket_base_t::start_reaping (poller_t *poller_)
{
+ // Plug the socket to the reaper thread.
poller = poller_;
handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (handle);
+
+ // Initialise the termination and check whether it can be deallocated
+ // immediately.
+ terminate ();
+ check_destroy ();
}
int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
@@ -720,6 +738,11 @@ void zmq::socket_base_t::process_term (int linger_)
// will be initiated.
unregister_endpoints (this);
+ // Ask all attached pipes to terminate.
+ for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
+ pipes [i]->terminate ();
+ register_term_acks (pipes.size ());
+
// Continue the termination process immediately.
own_t::process_term (linger_);
}
@@ -758,6 +781,15 @@ int zmq::socket_base_t::xrecv (msg_t *msg_, int options_)
return -1;
}
+void zmq::socket_base_t::xread_activated (pipe_t *pipe_)
+{
+ zmq_assert (false);
+}
+void zmq::socket_base_t::xwrite_activated (pipe_t *pipe_)
+{
+ zmq_assert (false);
+}
+
void zmq::socket_base_t::in_event ()
{
// Process any commands from other threads/sockets that may be available
@@ -794,3 +826,26 @@ void zmq::socket_base_t::check_destroy ()
own_t::process_destroy ();
}
}
+
+void zmq::socket_base_t::read_activated (pipe_t *pipe_)
+{
+ xread_activated (pipe_);
+}
+
+void zmq::socket_base_t::write_activated (pipe_t *pipe_)
+{
+ xwrite_activated (pipe_);
+}
+
+void zmq::socket_base_t::terminated (pipe_t *pipe_)
+{
+ // Notify the specific socket type about the pipe termination.
+ xterminated (pipe_);
+
+ // Remove the pipe from the list of attached pipes and confirm its
+ // termination if we are already shutting down.
+ pipes.erase (pipe_);
+ if (is_terminating ())
+ unregister_term_ack ();
+}
+