summaryrefslogtreecommitdiff
path: root/src/xrep.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-05-22 17:26:53 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-05-22 17:26:53 +0200
commitacf0b0e515515e51ad32ba7a2d147ce703579478 (patch)
treed2032009cf46c23aa0f677c2216914f718ab968a /src/xrep.cpp
parent9e6b39925603f9e64db08c469bd628d7ef9465de (diff)
Introduces bi-directional pipes
So far, there was a pair of unidirectional pipes between a socket and a session (or an inproc peer). This resulted in complex problems with half-closed states and tracking which inpipe corresponds to which outpipe. This patch doesn't add any functionality in itself, but is essential for further work on features like subscription forwarding. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xrep.cpp')
-rw-r--r--src/xrep.cpp90
1 files changed, 34 insertions, 56 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 2650f4e..d82890d 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -32,8 +32,6 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
terminating (false)
{
options.type = ZMQ_XREP;
- options.requires_in = true;
- options.requires_out = true;
// On connect, pipes are created only after initial handshaking.
// That way we are aware of the peer's identity when binding to the pipes.
@@ -46,36 +44,26 @@ zmq::xrep_t::~xrep_t ()
zmq_assert (outpipes.empty ());
}
-void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
- const blob_t &peer_identity_)
+void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
- if (outpipe_) {
-
- outpipe_->set_event_sink (this);
-
- // TODO: What if new connection has same peer identity as the old one?
- outpipe_t outpipe = {outpipe_, true};
- bool ok = outpipes.insert (outpipes_t::value_type (
- peer_identity_, outpipe)).second;
- zmq_assert (ok);
-
- if (terminating) {
- register_term_acks (1);
- outpipe_->terminate ();
- }
- }
-
- if (inpipe_) {
-
- inpipe_->set_event_sink (this);
-
- inpipe_t inpipe = {inpipe_, peer_identity_, true};
- inpipes.push_back (inpipe);
-
- if (terminating) {
- register_term_acks (1);
- inpipe_->terminate ();
- }
+ zmq_assert (pipe_);
+ pipe_->set_event_sink (this);
+
+ // Add the pipe to the map out outbound pipes.
+ // TODO: What if new connection has same peer identity as the old one?
+ outpipe_t outpipe = {pipe_, true};
+ bool ok = outpipes.insert (outpipes_t::value_type (
+ peer_identity_, outpipe)).second;
+ zmq_assert (ok);
+
+ // Add the pipe to the list of inbound pipes.
+ inpipe_t inpipe = {pipe_, peer_identity_, true};
+ inpipes.push_back (inpipe);
+
+ // In case we are already terminating, ask this pipe to terminate as well.
+ if (terminating) {
+ register_term_acks (1);
+ pipe_->terminate ();
}
}
@@ -85,21 +73,17 @@ void zmq::xrep_t::process_term (int linger_)
register_term_acks ((int) (inpipes.size () + outpipes.size ()));
- for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
- ++it)
- it->reader->terminate ();
- for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end ();
- ++it)
- it->second.writer->terminate ();
+ for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it)
+ it->pipe->terminate ();
socket_base_t::process_term (linger_);
}
-void zmq::xrep_t::terminated (reader_t *pipe_)
+void zmq::xrep_t::terminated (pipe_t *pipe_)
{
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
++it) {
- if (it->reader == pipe_) {
+ if (it->pipe == pipe_) {
if ((inpipes_t::size_type) (it - inpipes.begin ()) < current_in)
current_in--;
inpipes.erase (it);
@@ -107,17 +91,15 @@ void zmq::xrep_t::terminated (reader_t *pipe_)
current_in = 0;
if (terminating)
unregister_term_ack ();
- return;
+ goto clean_outpipes;
}
}
zmq_assert (false);
-}
-void zmq::xrep_t::terminated (writer_t *pipe_)
-{
+clean_outpipes:
for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it) {
- if (it->second.writer == pipe_) {
+ if (it->second.pipe == pipe_) {
outpipes.erase (it);
if (pipe_ == current_out)
current_out = NULL;
@@ -129,15 +111,11 @@ void zmq::xrep_t::terminated (writer_t *pipe_)
zmq_assert (false);
}
-void zmq::xrep_t::delimited (reader_t *pipe_)
-{
-}
-
-void zmq::xrep_t::activated (reader_t *pipe_)
+void zmq::xrep_t::read_activated (pipe_t *pipe_)
{
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
++it) {
- if (it->reader == pipe_) {
+ if (it->pipe == pipe_) {
zmq_assert (!it->active);
it->active = true;
return;
@@ -146,11 +124,11 @@ void zmq::xrep_t::activated (reader_t *pipe_)
zmq_assert (false);
}
-void zmq::xrep_t::activated (writer_t *pipe_)
+void zmq::xrep_t::write_activated (pipe_t *pipe_)
{
for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it) {
- if (it->second.writer == pipe_) {
+ if (it->second.pipe == pipe_) {
zmq_assert (!it->second.active);
it->second.active = true;
return;
@@ -178,7 +156,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
outpipes_t::iterator it = outpipes.find (identity);
if (it != outpipes.end ()) {
- current_out = it->second.writer;
+ current_out = it->second.pipe;
msg_t empty;
int rc = empty.init ();
errno_assert (rc == 0);
@@ -245,7 +223,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
// If we are in the middle of reading a message, just grab next part of it.
if (more_in) {
zmq_assert (inpipes [current_in].active);
- bool fetched = inpipes [current_in].reader->read (msg_);
+ bool fetched = inpipes [current_in].pipe->read (msg_);
zmq_assert (fetched);
more_in = msg_->flags () & msg_t::more;
if (!more_in) {
@@ -261,7 +239,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
// Try to fetch new message.
if (inpipes [current_in].active)
- prefetched = inpipes [current_in].reader->read (&prefetched_msg);
+ prefetched = inpipes [current_in].pipe->read (&prefetched_msg);
// If we have a message, create a prefix and return it to the caller.
if (prefetched) {
@@ -311,7 +289,7 @@ bool zmq::xrep_t::xhas_in ()
// pipe holding messages, skipping only pipes with no messages available.
for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) {
if (inpipes [current_in].active &&
- inpipes [current_in].reader->check_read ())
+ inpipes [current_in].pipe->check_read ())
return true;
// If me don't have a message, mark the pipe as passive and