summaryrefslogtreecommitdiff
path: root/src/pub.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/pub.cpp')
-rw-r--r--src/pub.cpp78
1 files changed, 36 insertions, 42 deletions
diff --git a/src/pub.cpp b/src/pub.cpp
index 4e73b19..b1a1239 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -24,66 +24,71 @@
#include "msg_content.hpp"
#include "pipe.hpp"
-zmq::pub_t::pub_t (class app_thread_t *parent_) :
- socket_base_t (parent_),
- active (0)
+zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t slot_) :
+ socket_base_t (parent_, slot_),
+ active (0),
+ terminating (false)
{
+ options.type = ZMQ_PUB;
options.requires_in = false;
options.requires_out = true;
}
zmq::pub_t::~pub_t ()
{
- for (pipes_t::size_type i = 0; i != pipes.size (); i++)
- pipes [i]->term ();
- pipes.clear ();
+ zmq_assert (pipes.empty ());
}
void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_);
+
+ outpipe_->set_event_sink (this);
+
pipes.push_back (outpipe_);
pipes.swap (active, pipes.size () - 1);
active++;
-}
-void zmq::pub_t::xdetach_inpipe (class reader_t *pipe_)
-{
- zmq_assert (false);
+ if (terminating) {
+ register_term_acks (1);
+ outpipe_->terminate ();
+ }
}
-void zmq::pub_t::xdetach_outpipe (class writer_t *pipe_)
+void zmq::pub_t::process_term ()
{
- // Remove the pipe from the list; adjust number of active pipes
- // accordingly.
- if (pipes.index (pipe_) < active)
- active--;
- pipes.erase (pipe_);
-}
+ terminating = true;
-void zmq::pub_t::xkill (class reader_t *pipe_)
-{
- zmq_assert (false);
-}
+ // Start shutdown process for all the pipes.
+ for (pipes_t::size_type i = 0; i != pipes.size (); i++)
+ pipes [i]->terminate ();
-void zmq::pub_t::xrevive (class reader_t *pipe_)
-{
- zmq_assert (false);
+ // Wait for pipes to terminate before terminating yourself.
+ register_term_acks (pipes.size ());
+
+ // Continue with the termination immediately.
+ socket_base_t::process_term ();
}
-void zmq::pub_t::xrevive (class writer_t *pipe_)
+void zmq::pub_t::activated (writer_t *pipe_)
{
// Move the pipe to the list of active pipes.
pipes.swap (pipes.index (pipe_), active);
active++;
}
-int zmq::pub_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
+void zmq::pub_t::terminated (writer_t *pipe_)
{
- errno = EINVAL;
- return -1;
+ // Remove the pipe from the list; adjust number of active pipes
+ // accordingly.
+ if (pipes.index (pipe_) < active)
+ active--;
+ pipes.erase (pipe_);
+
+ // If we are already terminating, wait for one term ack less.
+ if (terminating)
+ unregister_term_ack ();
}
int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
@@ -101,7 +106,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// For VSMs the copying is straighforward.
if (content == (msg_content_t*) ZMQ_VSM) {
- for (pipes_t::size_type i = 0; i != active;)
+ for (pipes_t::size_type i = 0; i < active;)
if (write (pipes [i], msg_))
i++;
int rc = zmq_msg_init (msg_);
@@ -133,7 +138,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
}
// Push the message to all destinations.
- for (pipes_t::size_type i = 0; i != active;) {
+ for (pipes_t::size_type i = 0; i < active;) {
if (!write (pipes [i], msg_))
content->refcnt.sub (1);
else
@@ -147,17 +152,6 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
return 0;
}
-int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_)
-{
- errno = ENOTSUP;
- return -1;
-}
-
-bool zmq::pub_t::xhas_in ()
-{
- return false;
-}
-
bool zmq::pub_t::xhas_out ()
{
return true;