summaryrefslogtreecommitdiff
path: root/src/pair.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/pair.cpp')
-rw-r--r--src/pair.cpp112
1 files changed, 75 insertions, 37 deletions
diff --git a/src/pair.cpp b/src/pair.cpp
index 3872b28..1acc60f 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -1,19 +1,20 @@
/*
- Copyright (c) 2007-2010 iMatix Corporation
+ Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
+ the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
+ GNU Lesser General Public License for more details.
- You should have received a copy of the Lesser GNU General Public License
+ You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
@@ -23,68 +24,96 @@
#include "err.hpp"
#include "pipe.hpp"
-zmq::pair_t::pair_t (class app_thread_t *parent_) :
- socket_base_t (parent_),
+zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) :
+ socket_base_t (parent_, tid_),
inpipe (NULL),
outpipe (NULL),
- alive (true)
+ inpipe_alive (false),
+ outpipe_alive (false),
+ terminating (false)
{
+ options.type = ZMQ_PAIR;
options.requires_in = true;
options.requires_out = true;
}
zmq::pair_t::~pair_t ()
{
- if (inpipe)
- inpipe->term ();
- if (outpipe)
- outpipe->term ();
+ zmq_assert (!inpipe);
+ zmq_assert (!outpipe);
}
-void zmq::pair_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_, const blob_t &peer_identity_)
+void zmq::pair_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
+ const blob_t &peer_identity_)
{
zmq_assert (!inpipe && !outpipe);
+
inpipe = inpipe_;
+ inpipe_alive = true;
+ inpipe->set_event_sink (this);
+
outpipe = outpipe_;
outpipe_alive = true;
+ outpipe->set_event_sink (this);
+
+ if (terminating) {
+ register_term_acks (2);
+ inpipe_->terminate ();
+ outpipe_->terminate ();
+ }
}
-void zmq::pair_t::xdetach_inpipe (class reader_t *pipe_)
+void zmq::pair_t::terminated (reader_t *pipe_)
{
zmq_assert (pipe_ == inpipe);
inpipe = NULL;
+ inpipe_alive = false;
+
+ if (terminating)
+ unregister_term_ack ();
}
-void zmq::pair_t::xdetach_outpipe (class writer_t *pipe_)
+void zmq::pair_t::terminated (writer_t *pipe_)
{
zmq_assert (pipe_ == outpipe);
outpipe = NULL;
+ outpipe_alive = false;
+
+ if (terminating)
+ unregister_term_ack ();
}
-void zmq::pair_t::xkill (class reader_t *pipe_)
+void zmq::pair_t::delimited (reader_t *pipe_)
{
- zmq_assert (alive);
- alive = false;
}
-void zmq::pair_t::xrevive (class reader_t *pipe_)
+void zmq::pair_t::process_term (int linger_)
{
- zmq_assert (!alive);
- alive = true;
+ terminating = true;
+
+ if (inpipe) {
+ register_term_acks (1);
+ inpipe->terminate ();
+ }
+
+ if (outpipe) {
+ register_term_acks (1);
+ outpipe->terminate ();
+ }
+
+ socket_base_t::process_term (linger_);
}
-void zmq::pair_t::xrevive (class writer_t *pipe_)
+void zmq::pair_t::activated (class reader_t *pipe_)
{
- zmq_assert (!outpipe_alive);
- outpipe_alive = true;
+ zmq_assert (!inpipe_alive);
+ inpipe_alive = true;
}
-int zmq::pair_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
+void zmq::pair_t::activated (class writer_t *pipe_)
{
- errno = EINVAL;
- return -1;
+ zmq_assert (!outpipe_alive);
+ outpipe_alive = true;
}
int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_)
@@ -100,7 +129,8 @@ int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_)
return -1;
}
- outpipe->flush ();
+ if (!(flags_ & ZMQ_SNDMORE))
+ outpipe->flush ();
// Detach the original message from the data buffer.
int rc = zmq_msg_init (msg_);
@@ -114,9 +144,12 @@ int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_)
// Deallocate old content of the message.
zmq_msg_close (msg_);
- if (!alive || !inpipe || !inpipe->read (msg_)) {
- // No message is available. Initialise the output parameter
- // to be a 0-byte message.
+ if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) {
+
+ // No message is available.
+ inpipe_alive = false;
+
+ // Initialise the output parameter to be a 0-byte message.
zmq_msg_init (msg_);
errno = EAGAIN;
return -1;
@@ -126,17 +159,22 @@ int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_)
bool zmq::pair_t::xhas_in ()
{
- if (alive && inpipe && inpipe->check_read ())
- return true;
- return false;
+ if (!inpipe || !inpipe_alive)
+ return false;
+
+ inpipe_alive = inpipe->check_read ();
+ return inpipe_alive;
}
bool zmq::pair_t::xhas_out ()
{
- if (outpipe == NULL || !outpipe_alive)
+ if (!outpipe || !outpipe_alive)
return false;
- outpipe_alive = outpipe->check_write ();
+ zmq_msg_t msg;
+ zmq_msg_init (&msg);
+ outpipe_alive = outpipe->check_write (&msg);
+ zmq_msg_close (&msg);
return outpipe_alive;
}