summaryrefslogtreecommitdiff
path: root/src/pair.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-05-22 17:26:53 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-05-22 17:26:53 +0200
commitacf0b0e515515e51ad32ba7a2d147ce703579478 (patch)
treed2032009cf46c23aa0f677c2216914f718ab968a /src/pair.cpp
parent9e6b39925603f9e64db08c469bd628d7ef9465de (diff)
Introduces bi-directional pipes
So far, there was a pair of unidirectional pipes between a socket and a session (or an inproc peer). This resulted in complex problems with half-closed states and tracking which inpipe corresponds to which outpipe. This patch doesn't add any functionality in itself, but is essential for further work on features like subscription forwarding. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/pair.cpp')
-rw-r--r--src/pair.cpp98
1 files changed, 28 insertions, 70 deletions
diff --git a/src/pair.cpp b/src/pair.cpp
index d877b54..93a4327 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -25,111 +25,72 @@
zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
- inpipe (NULL),
- outpipe (NULL),
- inpipe_alive (false),
- outpipe_alive (false),
+ pipe (NULL),
terminating (false)
{
options.type = ZMQ_PAIR;
- options.requires_in = true;
- options.requires_out = true;
}
zmq::pair_t::~pair_t ()
{
- zmq_assert (!inpipe);
- zmq_assert (!outpipe);
+ zmq_assert (!pipe);
}
-void zmq::pair_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
- const blob_t &peer_identity_)
+void zmq::pair_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
- zmq_assert (!inpipe && !outpipe);
+ zmq_assert (!pipe);
- inpipe = inpipe_;
- inpipe_alive = true;
- inpipe->set_event_sink (this);
-
- outpipe = outpipe_;
- outpipe_alive = true;
- outpipe->set_event_sink (this);
+ pipe = pipe_;
+ pipe->set_event_sink (this);
if (terminating) {
- register_term_acks (2);
- inpipe_->terminate ();
- outpipe_->terminate ();
+ register_term_acks (1);
+ pipe_->terminate ();
}
}
-void zmq::pair_t::terminated (reader_t *pipe_)
-{
- zmq_assert (pipe_ == inpipe);
- inpipe = NULL;
- inpipe_alive = false;
-
- if (terminating)
- unregister_term_ack ();
-}
-
-void zmq::pair_t::terminated (writer_t *pipe_)
+void zmq::pair_t::terminated (pipe_t *pipe_)
{
- zmq_assert (pipe_ == outpipe);
- outpipe = NULL;
- outpipe_alive = false;
+ zmq_assert (pipe_ == pipe);
+ pipe = NULL;
if (terminating)
unregister_term_ack ();
}
-void zmq::pair_t::delimited (reader_t *pipe_)
-{
-}
-
void zmq::pair_t::process_term (int linger_)
{
terminating = true;
- if (inpipe) {
+ if (pipe) {
register_term_acks (1);
- inpipe->terminate ();
- }
-
- if (outpipe) {
- register_term_acks (1);
- outpipe->terminate ();
+ pipe->terminate ();
}
socket_base_t::process_term (linger_);
}
-void zmq::pair_t::activated (class reader_t *pipe_)
+void zmq::pair_t::read_activated (pipe_t *pipe_)
{
- zmq_assert (!inpipe_alive);
- inpipe_alive = true;
+ // There's just one pipe. No lists of active and inactive pipes.
+ // There's nothing to do here.
}
-void zmq::pair_t::activated (class writer_t *pipe_)
+void zmq::pair_t::write_activated (pipe_t *pipe_)
{
- zmq_assert (!outpipe_alive);
- outpipe_alive = true;
+ // There's just one pipe. No lists of active and inactive pipes.
+ // There's nothing to do here.
}
int zmq::pair_t::xsend (msg_t *msg_, int flags_)
{
- if (outpipe == NULL || !outpipe_alive) {
- errno = EAGAIN;
- return -1;
- }
-
- if (!outpipe->write (msg_)) {
- outpipe_alive = false;
+ if (!pipe || !pipe->write (msg_)) {
errno = EAGAIN;
return -1;
}
if (!(flags_ & ZMQ_SNDMORE))
- outpipe->flush ();
+ pipe->flush ();
// Detach the original message from the data buffer.
int rc = msg_->init ();
@@ -144,14 +105,12 @@ int zmq::pair_t::xrecv (msg_t *msg_, int flags_)
int rc = msg_->close ();
errno_assert (rc == 0);
- if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) {
-
- // No message is available.
- inpipe_alive = false;
+ if (!pipe || !pipe->read (msg_)) {
// Initialise the output parameter to be a 0-byte message.
rc = msg_->init ();
errno_assert (rc == 0);
+
errno = EAGAIN;
return -1;
}
@@ -160,24 +119,23 @@ int zmq::pair_t::xrecv (msg_t *msg_, int flags_)
bool zmq::pair_t::xhas_in ()
{
- if (!inpipe || !inpipe_alive)
+ if (!pipe)
return false;
- inpipe_alive = inpipe->check_read ();
- return inpipe_alive;
+ return pipe->check_read ();
}
bool zmq::pair_t::xhas_out ()
{
- if (!outpipe || !outpipe_alive)
+ if (!pipe)
return false;
msg_t msg;
int rc = msg.init ();
errno_assert (rc == 0);
- outpipe_alive = outpipe->check_write (&msg);
+ bool result = pipe->check_write (&msg);
rc = msg.close ();
errno_assert (rc == 0);
- return outpipe_alive;
+ return result;
}