From 0f6f7276e32c01ccfe86fb76741a52ac6ffc87af Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 23 May 2011 20:30:01 +0200 Subject: Move the pipe termination code to socket_base_t So far, the pipe termination code was spread among socket type classes, fair queuer, load balancer, etc. This patch moves all the associated logic to a single place. Signed-off-by: Martin Sustrik --- src/xrep.cpp | 33 +++++---------------------------- 1 file changed, 5 insertions(+), 28 deletions(-) (limited to 'src/xrep.cpp') diff --git a/src/xrep.cpp b/src/xrep.cpp index d82890d..920be8d 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -28,8 +28,7 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : prefetched (false), more_in (false), current_out (NULL), - more_out (false), - terminating (false) + more_out (false) { options.type = ZMQ_XREP; @@ -47,7 +46,6 @@ zmq::xrep_t::~xrep_t () void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { zmq_assert (pipe_); - pipe_->set_event_sink (this); // Add the pipe to the map out outbound pipes. // TODO: What if new connection has same peer identity as the old one? @@ -59,27 +57,9 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) // Add the pipe to the list of inbound pipes. inpipe_t inpipe = {pipe_, peer_identity_, true}; inpipes.push_back (inpipe); - - // In case we are already terminating, ask this pipe to terminate as well. - if (terminating) { - register_term_acks (1); - pipe_->terminate (); - } } -void zmq::xrep_t::process_term (int linger_) -{ - terminating = true; - - register_term_acks ((int) (inpipes.size () + outpipes.size ())); - - for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it) - it->pipe->terminate (); - - socket_base_t::process_term (linger_); -} - -void zmq::xrep_t::terminated (pipe_t *pipe_) +void zmq::xrep_t::xterminated (pipe_t *pipe_) { for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it) { @@ -89,8 +69,6 @@ void zmq::xrep_t::terminated (pipe_t *pipe_) inpipes.erase (it); if (current_in >= inpipes.size ()) current_in = 0; - if (terminating) - unregister_term_ack (); goto clean_outpipes; } } @@ -103,15 +81,13 @@ clean_outpipes: outpipes.erase (it); if (pipe_ == current_out) current_out = NULL; - if (terminating) - unregister_term_ack (); return; } } zmq_assert (false); } -void zmq::xrep_t::read_activated (pipe_t *pipe_) +void zmq::xrep_t::xread_activated (pipe_t *pipe_) { for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it) { @@ -124,7 +100,7 @@ void zmq::xrep_t::read_activated (pipe_t *pipe_) zmq_assert (false); } -void zmq::xrep_t::write_activated (pipe_t *pipe_) +void zmq::xrep_t::xwrite_activated (pipe_t *pipe_) { for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); ++it) { @@ -312,3 +288,4 @@ bool zmq::xrep_t::xhas_out () } + -- cgit v1.2.3