From 05d908492dc382941fc633ad7082b5bd86e84e67 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 6 Aug 2010 17:49:37 +0200 Subject: WIP: Socket migration between threads, new zmq_close() semantics Sockets may now be migrated between OS threads; sockets may not be used by more than one thread at any time. To migrate a socket to another thread the caller must ensure that a full memory barrier is called before using the socket from the target thread. The new zmq_close() semantics implement the behaviour discussed at: http://lists.zeromq.org/pipermail/zeromq-dev/2010-July/004244.html Specifically, zmq_close() is now deterministic and while it still returns immediately, it does not discard any data that may still be queued for sending. Further, zmq_term() will now block until all outstanding data has been sent. TODO: Many bugs have been introduced, needs testing. Further, SO_LINGER or an equivalent mechanism (possibly a configurable timeout to zmq_term()) needs to be implemented. --- src/pair.cpp | 72 +++++++++++++++++++++++++++++++++++------------------------- 1 file changed, 42 insertions(+), 30 deletions(-) (limited to 'src/pair.cpp') diff --git a/src/pair.cpp b/src/pair.cpp index 3872b28..1ff2e1a 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -23,11 +23,12 @@ #include "err.hpp" #include "pipe.hpp" -zmq::pair_t::pair_t (class app_thread_t *parent_) : - socket_base_t (parent_), +zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t slot_) : + socket_base_t (parent_, slot_), inpipe (NULL), outpipe (NULL), - alive (true) + inpipe_alive (false), + outpipe_alive (false) { options.requires_in = true; options.requires_out = true; @@ -35,56 +36,61 @@ zmq::pair_t::pair_t (class app_thread_t *parent_) : zmq::pair_t::~pair_t () { - if (inpipe) - inpipe->term (); - if (outpipe) - outpipe->term (); + zmq_assert (!inpipe); + zmq_assert (!outpipe); } void zmq::pair_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (!inpipe && !outpipe); + inpipe = inpipe_; + inpipe_alive = true; + inpipe->set_event_sink (this); + outpipe = outpipe_; outpipe_alive = true; + outpipe->set_event_sink (this); } -void zmq::pair_t::xdetach_inpipe (class reader_t *pipe_) +void zmq::pair_t::terminated (class reader_t *pipe_) { zmq_assert (pipe_ == inpipe); inpipe = NULL; + inpipe_alive = false; } -void zmq::pair_t::xdetach_outpipe (class writer_t *pipe_) +void zmq::pair_t::terminated (class writer_t *pipe_) { zmq_assert (pipe_ == outpipe); outpipe = NULL; + outpipe_alive = false; } -void zmq::pair_t::xkill (class reader_t *pipe_) +void zmq::pair_t::xterm_pipes () { - zmq_assert (alive); - alive = false; + if (inpipe) + inpipe->terminate (); + if (outpipe) + outpipe->terminate (); } -void zmq::pair_t::xrevive (class reader_t *pipe_) +bool zmq::pair_t::xhas_pipes () { - zmq_assert (!alive); - alive = true; + return inpipe != NULL || outpipe != NULL; } -void zmq::pair_t::xrevive (class writer_t *pipe_) +void zmq::pair_t::activated (class reader_t *pipe_) { - zmq_assert (!outpipe_alive); - outpipe_alive = true; + zmq_assert (!inpipe_alive); + inpipe_alive = true; } -int zmq::pair_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) +void zmq::pair_t::activated (class writer_t *pipe_) { - errno = EINVAL; - return -1; + zmq_assert (!outpipe_alive); + outpipe_alive = true; } int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_) @@ -100,7 +106,8 @@ int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_) return -1; } - outpipe->flush (); + if (!(flags_ & ZMQ_SNDMORE)) + outpipe->flush (); // Detach the original message from the data buffer. int rc = zmq_msg_init (msg_); @@ -114,9 +121,12 @@ int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_) // Deallocate old content of the message. zmq_msg_close (msg_); - if (!alive || !inpipe || !inpipe->read (msg_)) { - // No message is available. Initialise the output parameter - // to be a 0-byte message. + if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) { + + // No message is available. + inpipe_alive = false; + + // Initialise the output parameter to be a 0-byte message. zmq_msg_init (msg_); errno = EAGAIN; return -1; @@ -126,14 +136,16 @@ int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_) bool zmq::pair_t::xhas_in () { - if (alive && inpipe && inpipe->check_read ()) - return true; - return false; + if (!inpipe || !inpipe_alive) + return false; + + inpipe_alive = inpipe->check_read (); + return inpipe_alive; } bool zmq::pair_t::xhas_out () { - if (outpipe == NULL || !outpipe_alive) + if (!outpipe || !outpipe_alive) return false; outpipe_alive = outpipe->check_write (); -- cgit v1.2.3