summaryrefslogtreecommitdiff
path: root/src/xrep.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-08-11 14:09:56 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-08-25 15:39:20 +0200
commitd13933bc62fce71b5a58118020e0dd3776e79aa9 (patch)
tree6586d5b9cc637dbf8acae4b32d24da9c8e046014 /src/xrep.cpp
parentee1f1af0091d9bdffa0e5ce1783da925b3cd7e56 (diff)
I/O object hierarchy implemented
Diffstat (limited to 'src/xrep.cpp')
-rw-r--r--src/xrep.cpp25
1 files changed, 18 insertions, 7 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 73d7856..c1cfb00 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -29,7 +29,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t slot_) :
prefetched (false),
more_in (false),
current_out (NULL),
- more_out (false)
+ more_out (false),
+ terminating (false)
{
options.requires_in = true;
options.requires_out = true;
@@ -62,16 +63,27 @@ void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
inpipe_t inpipe = {inpipe_, peer_identity_, true};
inpipes.push_back (inpipe);
+
+ if (terminating) {
+ register_term_acks (1);
+ inpipe_->terminate ();
+ }
}
-void zmq::xrep_t::xterm_pipes ()
+void zmq::xrep_t::process_term ()
{
+ terminating = true;
+
+ register_term_acks (inpipes.size () + outpipes.size ());
+
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
it++)
it->reader->terminate ();
for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end ();
it++)
it->second.writer->terminate ();
+
+ socket_base_t::process_term ();
}
void zmq::xrep_t::terminated (reader_t *pipe_)
@@ -80,6 +92,8 @@ void zmq::xrep_t::terminated (reader_t *pipe_)
it++) {
if (it->reader == pipe_) {
inpipes.erase (it);
+ if (terminating)
+ unregister_term_ack ();
return;
}
}
@@ -94,17 +108,14 @@ void zmq::xrep_t::terminated (writer_t *pipe_)
outpipes.erase (it);
if (pipe_ == current_out)
current_out = NULL;
+ if (terminating)
+ unregister_term_ack ();
return;
}
}
zmq_assert (false);
}
-bool zmq::xrep_t::xhas_pipes ()
-{
- return !inpipes.empty () || !outpipes.empty ();
-}
-
void zmq::xrep_t::activated (reader_t *pipe_)
{
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();