From acf0b0e515515e51ad32ba7a2d147ce703579478 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 22 May 2011 17:26:53 +0200 Subject: Introduces bi-directional pipes So far, there was a pair of unidirectional pipes between a socket and a session (or an inproc peer). This resulted in complex problems with half-closed states and tracking which inpipe corresponds to which outpipe. This patch doesn't add any functionality in itself, but is essential for further work on features like subscription forwarding. Signed-off-by: Martin Sustrik --- src/pair.cpp | 98 +++++++++++++++++------------------------------------------- 1 file changed, 28 insertions(+), 70 deletions(-) (limited to 'src/pair.cpp') diff --git a/src/pair.cpp b/src/pair.cpp index d877b54..93a4327 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -25,111 +25,72 @@ zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) : socket_base_t (parent_, tid_), - inpipe (NULL), - outpipe (NULL), - inpipe_alive (false), - outpipe_alive (false), + pipe (NULL), terminating (false) { options.type = ZMQ_PAIR; - options.requires_in = true; - options.requires_out = true; } zmq::pair_t::~pair_t () { - zmq_assert (!inpipe); - zmq_assert (!outpipe); + zmq_assert (!pipe); } -void zmq::pair_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, - const blob_t &peer_identity_) +void zmq::pair_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { - zmq_assert (!inpipe && !outpipe); + zmq_assert (!pipe); - inpipe = inpipe_; - inpipe_alive = true; - inpipe->set_event_sink (this); - - outpipe = outpipe_; - outpipe_alive = true; - outpipe->set_event_sink (this); + pipe = pipe_; + pipe->set_event_sink (this); if (terminating) { - register_term_acks (2); - inpipe_->terminate (); - outpipe_->terminate (); + register_term_acks (1); + pipe_->terminate (); } } -void zmq::pair_t::terminated (reader_t *pipe_) -{ - zmq_assert (pipe_ == inpipe); - inpipe = NULL; - inpipe_alive = false; - - if (terminating) - unregister_term_ack (); -} - -void zmq::pair_t::terminated (writer_t *pipe_) +void zmq::pair_t::terminated (pipe_t *pipe_) { - zmq_assert (pipe_ == outpipe); - outpipe = NULL; - outpipe_alive = false; + zmq_assert (pipe_ == pipe); + pipe = NULL; if (terminating) unregister_term_ack (); } -void zmq::pair_t::delimited (reader_t *pipe_) -{ -} - void zmq::pair_t::process_term (int linger_) { terminating = true; - if (inpipe) { + if (pipe) { register_term_acks (1); - inpipe->terminate (); - } - - if (outpipe) { - register_term_acks (1); - outpipe->terminate (); + pipe->terminate (); } socket_base_t::process_term (linger_); } -void zmq::pair_t::activated (class reader_t *pipe_) +void zmq::pair_t::read_activated (pipe_t *pipe_) { - zmq_assert (!inpipe_alive); - inpipe_alive = true; + // There's just one pipe. No lists of active and inactive pipes. + // There's nothing to do here. } -void zmq::pair_t::activated (class writer_t *pipe_) +void zmq::pair_t::write_activated (pipe_t *pipe_) { - zmq_assert (!outpipe_alive); - outpipe_alive = true; + // There's just one pipe. No lists of active and inactive pipes. + // There's nothing to do here. } int zmq::pair_t::xsend (msg_t *msg_, int flags_) { - if (outpipe == NULL || !outpipe_alive) { - errno = EAGAIN; - return -1; - } - - if (!outpipe->write (msg_)) { - outpipe_alive = false; + if (!pipe || !pipe->write (msg_)) { errno = EAGAIN; return -1; } if (!(flags_ & ZMQ_SNDMORE)) - outpipe->flush (); + pipe->flush (); // Detach the original message from the data buffer. int rc = msg_->init (); @@ -144,14 +105,12 @@ int zmq::pair_t::xrecv (msg_t *msg_, int flags_) int rc = msg_->close (); errno_assert (rc == 0); - if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) { - - // No message is available. - inpipe_alive = false; + if (!pipe || !pipe->read (msg_)) { // Initialise the output parameter to be a 0-byte message. rc = msg_->init (); errno_assert (rc == 0); + errno = EAGAIN; return -1; } @@ -160,24 +119,23 @@ int zmq::pair_t::xrecv (msg_t *msg_, int flags_) bool zmq::pair_t::xhas_in () { - if (!inpipe || !inpipe_alive) + if (!pipe) return false; - inpipe_alive = inpipe->check_read (); - return inpipe_alive; + return pipe->check_read (); } bool zmq::pair_t::xhas_out () { - if (!outpipe || !outpipe_alive) + if (!pipe) return false; msg_t msg; int rc = msg.init (); errno_assert (rc == 0); - outpipe_alive = outpipe->check_write (&msg); + bool result = pipe->check_write (&msg); rc = msg.close (); errno_assert (rc == 0); - return outpipe_alive; + return result; } -- cgit v1.2.3