summaryrefslogtreecommitdiff
path: root/src/pub.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-08-06 17:49:37 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-08-25 15:39:20 +0200
commit05d908492dc382941fc633ad7082b5bd86e84e67 (patch)
treeae10e49766152e42521a6c100e622dc616998143 /src/pub.cpp
parentb7e0fa972f45d21e45cacb93a1a92d38fdc11f40 (diff)
WIP: Socket migration between threads, new zmq_close() semantics
Sockets may now be migrated between OS threads; sockets may not be used by more than one thread at any time. To migrate a socket to another thread the caller must ensure that a full memory barrier is called before using the socket from the target thread. The new zmq_close() semantics implement the behaviour discussed at: http://lists.zeromq.org/pipermail/zeromq-dev/2010-July/004244.html Specifically, zmq_close() is now deterministic and while it still returns immediately, it does not discard any data that may still be queued for sending. Further, zmq_term() will now block until all outstanding data has been sent. TODO: Many bugs have been introduced, needs testing. Further, SO_LINGER or an equivalent mechanism (possibly a configurable timeout to zmq_term()) needs to be implemented.
Diffstat (limited to 'src/pub.cpp')
-rw-r--r--src/pub.cpp62
1 files changed, 21 insertions, 41 deletions
diff --git a/src/pub.cpp b/src/pub.cpp
index 4e73b19..d1d1c72 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -24,8 +24,8 @@
#include "msg_content.hpp"
#include "pipe.hpp"
-zmq::pub_t::pub_t (class app_thread_t *parent_) :
- socket_base_t (parent_),
+zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t slot_) :
+ socket_base_t (parent_, slot_),
active (0)
{
options.requires_in = false;
@@ -34,56 +34,47 @@ zmq::pub_t::pub_t (class app_thread_t *parent_) :
zmq::pub_t::~pub_t ()
{
- for (pipes_t::size_type i = 0; i != pipes.size (); i++)
- pipes [i]->term ();
- pipes.clear ();
+ zmq_assert (pipes.empty ());
}
void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_);
+
+ outpipe_->set_event_sink (this);
+
pipes.push_back (outpipe_);
pipes.swap (active, pipes.size () - 1);
active++;
}
-void zmq::pub_t::xdetach_inpipe (class reader_t *pipe_)
-{
- zmq_assert (false);
-}
-
-void zmq::pub_t::xdetach_outpipe (class writer_t *pipe_)
-{
- // Remove the pipe from the list; adjust number of active pipes
- // accordingly.
- if (pipes.index (pipe_) < active)
- active--;
- pipes.erase (pipe_);
-}
-
-void zmq::pub_t::xkill (class reader_t *pipe_)
+void zmq::pub_t::xterm_pipes ()
{
- zmq_assert (false);
+ // Start shutdown process for all the pipes.
+ for (pipes_t::size_type i = 0; i != pipes.size (); i++)
+ pipes [i]->terminate ();
}
-void zmq::pub_t::xrevive (class reader_t *pipe_)
+bool zmq::pub_t::xhas_pipes ()
{
- zmq_assert (false);
+ return !pipes.empty ();
}
-void zmq::pub_t::xrevive (class writer_t *pipe_)
+void zmq::pub_t::activated (writer_t *pipe_)
{
// Move the pipe to the list of active pipes.
pipes.swap (pipes.index (pipe_), active);
active++;
}
-int zmq::pub_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
+void zmq::pub_t::terminated (writer_t *pipe_)
{
- errno = EINVAL;
- return -1;
+ // Remove the pipe from the list; adjust number of active pipes
+ // accordingly.
+ if (pipes.index (pipe_) < active)
+ active--;
+ pipes.erase (pipe_);
}
int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
@@ -101,7 +92,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// For VSMs the copying is straighforward.
if (content == (msg_content_t*) ZMQ_VSM) {
- for (pipes_t::size_type i = 0; i != active;)
+ for (pipes_t::size_type i = 0; i < active;)
if (write (pipes [i], msg_))
i++;
int rc = zmq_msg_init (msg_);
@@ -133,7 +124,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
}
// Push the message to all destinations.
- for (pipes_t::size_type i = 0; i != active;) {
+ for (pipes_t::size_type i = 0; i < active;) {
if (!write (pipes [i], msg_))
content->refcnt.sub (1);
else
@@ -147,17 +138,6 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
return 0;
}
-int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_)
-{
- errno = ENOTSUP;
- return -1;
-}
-
-bool zmq::pub_t::xhas_in ()
-{
- return false;
-}
-
bool zmq::pub_t::xhas_out ()
{
return true;