summaryrefslogtreecommitdiff
path: root/src/xrep.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-05-23 20:30:01 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-05-23 20:30:01 +0200
commit0f6f7276e32c01ccfe86fb76741a52ac6ffc87af (patch)
tree3f2cec589f6243742da7e79028633d35f8b362db /src/xrep.cpp
parentacf0b0e515515e51ad32ba7a2d147ce703579478 (diff)
Move the pipe termination code to socket_base_t
So far, the pipe termination code was spread among socket type classes, fair queuer, load balancer, etc. This patch moves all the associated logic to a single place. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xrep.cpp')
-rw-r--r--src/xrep.cpp33
1 files changed, 5 insertions, 28 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp
index d82890d..920be8d 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -28,8 +28,7 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
prefetched (false),
more_in (false),
current_out (NULL),
- more_out (false),
- terminating (false)
+ more_out (false)
{
options.type = ZMQ_XREP;
@@ -47,7 +46,6 @@ zmq::xrep_t::~xrep_t ()
void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
zmq_assert (pipe_);
- pipe_->set_event_sink (this);
// Add the pipe to the map out outbound pipes.
// TODO: What if new connection has same peer identity as the old one?
@@ -59,27 +57,9 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
// Add the pipe to the list of inbound pipes.
inpipe_t inpipe = {pipe_, peer_identity_, true};
inpipes.push_back (inpipe);
-
- // In case we are already terminating, ask this pipe to terminate as well.
- if (terminating) {
- register_term_acks (1);
- pipe_->terminate ();
- }
}
-void zmq::xrep_t::process_term (int linger_)
-{
- terminating = true;
-
- register_term_acks ((int) (inpipes.size () + outpipes.size ()));
-
- for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it)
- it->pipe->terminate ();
-
- socket_base_t::process_term (linger_);
-}
-
-void zmq::xrep_t::terminated (pipe_t *pipe_)
+void zmq::xrep_t::xterminated (pipe_t *pipe_)
{
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
++it) {
@@ -89,8 +69,6 @@ void zmq::xrep_t::terminated (pipe_t *pipe_)
inpipes.erase (it);
if (current_in >= inpipes.size ())
current_in = 0;
- if (terminating)
- unregister_term_ack ();
goto clean_outpipes;
}
}
@@ -103,15 +81,13 @@ clean_outpipes:
outpipes.erase (it);
if (pipe_ == current_out)
current_out = NULL;
- if (terminating)
- unregister_term_ack ();
return;
}
}
zmq_assert (false);
}
-void zmq::xrep_t::read_activated (pipe_t *pipe_)
+void zmq::xrep_t::xread_activated (pipe_t *pipe_)
{
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
++it) {
@@ -124,7 +100,7 @@ void zmq::xrep_t::read_activated (pipe_t *pipe_)
zmq_assert (false);
}
-void zmq::xrep_t::write_activated (pipe_t *pipe_)
+void zmq::xrep_t::xwrite_activated (pipe_t *pipe_)
{
for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it) {
@@ -312,3 +288,4 @@ bool zmq::xrep_t::xhas_out ()
}
+