summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-01-14 12:05:10 +0100
committerMartin Sustrik <sustrik@250bpm.com>2011-01-14 12:05:10 +0100
commit8eae7d8507b1c96aec28bca20a157bb7537c3eb8 (patch)
tree0d9d8d7c2a7270c452d5cf3ae9676df9d880fac1
parent18f29ded6a83875c27395d254c25e3d152ae1cc6 (diff)
'message distribution mechanism' separated from XPUB socket
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--src/Makefile.am2
-rw-r--r--src/dist.cpp161
-rw-r--r--src/dist.hpp76
-rw-r--r--src/xpub.cpp119
-rw-r--r--src/xpub.hpp23
5 files changed, 249 insertions, 132 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 3286bc4..9e362f2 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -73,6 +73,7 @@ libzmq_la_SOURCES = \
decoder.hpp \
device.hpp \
devpoll.hpp \
+ dist.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
@@ -141,6 +142,7 @@ libzmq_la_SOURCES = \
decoder.cpp \
device.cpp \
devpoll.cpp \
+ dist.cpp \
encoder.cpp \
epoll.cpp \
err.cpp \
diff --git a/src/dist.cpp b/src/dist.cpp
new file mode 100644
index 0000000..4986e81
--- /dev/null
+++ b/src/dist.cpp
@@ -0,0 +1,161 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../include/zmq.h"
+
+#include "dist.hpp"
+#include "pipe.hpp"
+#include "err.hpp"
+#include "own.hpp"
+#include "msg_content.hpp"
+
+zmq::dist_t::dist_t (own_t *sink_) :
+ active (0),
+ more (false),
+ sink (sink_),
+ terminating (false)
+{
+}
+
+zmq::dist_t::~dist_t ()
+{
+ zmq_assert (pipes.empty ());
+}
+
+void zmq::dist_t::attach (writer_t *pipe_)
+{
+ pipe_->set_event_sink (this);
+
+ pipes.push_back (pipe_);
+ pipes.swap (active, pipes.size () - 1);
+ active++;
+
+ if (terminating) {
+ sink->register_term_acks (1);
+ pipe_->terminate ();
+ }
+}
+
+void zmq::dist_t::terminate ()
+{
+ zmq_assert (!terminating);
+ terminating = true;
+
+ sink->register_term_acks (pipes.size ());
+ for (pipes_t::size_type i = 0; i != pipes.size (); i++)
+ pipes [i]->terminate ();
+}
+
+void zmq::dist_t::terminated (writer_t *pipe_)
+{
+ // Remove the pipe from the list; adjust number of active pipes
+ // accordingly.
+ if (pipes.index (pipe_) < active)
+ active--;
+ pipes.erase (pipe_);
+
+ if (terminating)
+ sink->unregister_term_ack ();
+}
+
+void zmq::dist_t::activated (writer_t *pipe_)
+{
+ // Move the pipe to the list of active pipes.
+ pipes.swap (pipes.index (pipe_), active);
+ active++;
+}
+
+int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
+{
+ // If there are no active pipes available, simply drop the message.
+ if (active == 0) {
+ int rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
+ rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
+ msg_content_t *content = (msg_content_t*) msg_->content;
+
+ // For VSMs the copying is straighforward.
+ if (content == (msg_content_t*) ZMQ_VSM) {
+ for (pipes_t::size_type i = 0; i < active;)
+ if (write (pipes [i], msg_))
+ i++;
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
+ // Optimisation for the case when there's only a single pipe
+ // to send the message to - no refcount adjustment i.e. no atomic
+ // operations are needed.
+ if (active == 1) {
+ if (!write (pipes [0], msg_)) {
+ int rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
+ }
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
+ // There are at least 2 destinations for the message. That means we have
+ // to deal with reference counting. First add N-1 references to
+ // the content (we are holding one reference anyway, that's why -1).
+ if (msg_->flags & ZMQ_MSG_SHARED)
+ content->refcnt.add (active - 1);
+ else {
+ content->refcnt.set (active);
+ msg_->flags |= ZMQ_MSG_SHARED;
+ }
+
+ // Push the message to all destinations.
+ for (pipes_t::size_type i = 0; i < active;) {
+ if (!write (pipes [i], msg_))
+ content->refcnt.sub (1);
+ else
+ i++;
+ }
+
+ // Detach the original message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return 0;
+}
+
+bool zmq::dist_t::has_out ()
+{
+ return true;
+}
+
+bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_)
+{
+ if (!pipe_->write (msg_)) {
+ active--;
+ pipes.swap (pipes.index (pipe_), active);
+ return false;
+ }
+ if (!(msg_->flags & ZMQ_MSG_MORE))
+ pipe_->flush ();
+ return true;
+}
+
diff --git a/src/dist.hpp b/src/dist.hpp
new file mode 100644
index 0000000..4949062
--- /dev/null
+++ b/src/dist.hpp
@@ -0,0 +1,76 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_DIST_HPP_INCLUDED__
+#define __ZMQ_DIST_HPP_INCLUDED__
+
+#include "array.hpp"
+#include "pipe.hpp"
+
+namespace zmq
+{
+
+ // Class manages a set of outbound pipes. It sends each messages to
+ // each of them.
+ class dist_t : public i_writer_events
+ {
+ public:
+
+ dist_t (class own_t *sink_);
+ ~dist_t ();
+
+ void attach (writer_t *pipe_);
+ void terminate ();
+ int send (zmq_msg_t *msg_, int flags_);
+ bool has_out ();
+
+ // i_writer_events interface implementation.
+ void activated (writer_t *pipe_);
+ void terminated (writer_t *pipe_);
+
+ private:
+
+ // Write the message to the pipe. Make the pipe inactive if writing
+ // fails. In such a case false is returned.
+ bool write (class writer_t *pipe_, zmq_msg_t *msg_);
+
+ // List of outbound pipes.
+ typedef array_t <class writer_t> pipes_t;
+ pipes_t pipes;
+
+ // Number of active pipes. All the active pipes are located at the
+ // beginning of the pipes array.
+ pipes_t::size_type active;
+
+ // True if last we are in the middle of a multipart message.
+ bool more;
+
+ // Object to send events to.
+ class own_t *sink;
+
+ // If true, termination process is already underway.
+ bool terminating;
+
+ dist_t (const dist_t&);
+ const dist_t &operator = (const dist_t&);
+ };
+
+}
+
+#endif
diff --git a/src/xpub.cpp b/src/xpub.cpp
index 601ec23..ed9c0e5 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -21,13 +21,11 @@
#include "xpub.hpp"
#include "err.hpp"
-#include "msg_content.hpp"
#include "pipe.hpp"
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
- active (0),
- terminating (false)
+ dist (this)
{
options.type = ZMQ_XPUB;
options.requires_in = false;
@@ -36,136 +34,31 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
zmq::xpub_t::~xpub_t ()
{
- zmq_assert (pipes.empty ());
}
void zmq::xpub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_);
-
- outpipe_->set_event_sink (this);
-
- pipes.push_back (outpipe_);
- pipes.swap (active, pipes.size () - 1);
- active++;
-
- if (terminating) {
- register_term_acks (1);
- outpipe_->terminate ();
- }
+ dist.attach (outpipe_);
}
void zmq::xpub_t::process_term (int linger_)
{
- terminating = true;
-
- // Start shutdown process for all the pipes.
- for (pipes_t::size_type i = 0; i != pipes.size (); i++)
- pipes [i]->terminate ();
-
- // Wait for pipes to terminate before terminating yourself.
- register_term_acks (pipes.size ());
+ // Terminate the outbound pipes.
+ dist.terminate ();
// Continue with the termination immediately.
socket_base_t::process_term (linger_);
}
-void zmq::xpub_t::activated (writer_t *pipe_)
-{
- // Move the pipe to the list of active pipes.
- pipes.swap (pipes.index (pipe_), active);
- active++;
-}
-
-void zmq::xpub_t::terminated (writer_t *pipe_)
-{
- // Remove the pipe from the list; adjust number of active pipes
- // accordingly.
- if (pipes.index (pipe_) < active)
- active--;
- pipes.erase (pipe_);
-
- // If we are already terminating, wait for one term ack less.
- if (terminating)
- unregister_term_ack ();
-}
-
int zmq::xpub_t::xsend (zmq_msg_t *msg_, int flags_)
{
- // If there are no active pipes available, simply drop the message.
- if (active == 0) {
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return 0;
- }
-
- msg_content_t *content = (msg_content_t*) msg_->content;
-
- // For VSMs the copying is straighforward.
- if (content == (msg_content_t*) ZMQ_VSM) {
- for (pipes_t::size_type i = 0; i < active;)
- if (write (pipes [i], msg_))
- i++;
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return 0;
- }
-
- // Optimisation for the case when there's only a single pipe
- // to send the message to - no refcount adjustment i.e. no atomic
- // operations are needed.
- if (active == 1) {
- if (!write (pipes [0], msg_)) {
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- }
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return 0;
- }
-
- // There are at least 2 destinations for the message. That means we have
- // to deal with reference counting. First add N-1 references to
- // the content (we are holding one reference anyway, that's why -1).
- if (msg_->flags & ZMQ_MSG_SHARED)
- content->refcnt.add (active - 1);
- else {
- content->refcnt.set (active);
- msg_->flags |= ZMQ_MSG_SHARED;
- }
-
- // Push the message to all destinations.
- for (pipes_t::size_type i = 0; i < active;) {
- if (!write (pipes [i], msg_))
- content->refcnt.sub (1);
- else
- i++;
- }
-
- // Detach the original message from the data buffer.
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
-
- return 0;
+ return dist.send (msg_, flags_);
}
bool zmq::xpub_t::xhas_out ()
{
- return true;
-}
-
-bool zmq::xpub_t::write (class writer_t *pipe_, zmq_msg_t *msg_)
-{
- if (!pipe_->write (msg_)) {
- active--;
- pipes.swap (pipes.index (pipe_), active);
- return false;
- }
- if (!(msg_->flags & ZMQ_MSG_MORE))
- pipe_->flush ();
- return true;
+ return dist.has_out ();
}
diff --git a/src/xpub.hpp b/src/xpub.hpp
index 1a6fe76..13baf1f 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -23,11 +23,12 @@
#include "socket_base.hpp"
#include "array.hpp"
#include "pipe.hpp"
+#include "dist.hpp"
namespace zmq
{
- class xpub_t : public socket_base_t, public i_writer_events
+ class xpub_t : public socket_base_t
{
public:
@@ -40,29 +41,13 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
bool xhas_out ();
- // i_writer_events interface implementation.
- void activated (writer_t *pipe_);
- void terminated (writer_t *pipe_);
-
private:
// Hook into the termination process.
void process_term (int linger_);
- // Write the message to the pipe. Make the pipe inactive if writing
- // fails. In such a case false is returned.
- bool write (class writer_t *pipe_, zmq_msg_t *msg_);
-
- // Outbound pipes, i.e. those the socket is sending messages to.
- typedef array_t <class writer_t> pipes_t;
- pipes_t pipes;
-
- // Number of active pipes. All the active pipes are located at the
- // beginning of the pipes array.
- pipes_t::size_type active;
-
- // True if termination process is already underway.
- bool terminating;
+ // Distributor of messages holding the list of outbound pipes.
+ dist_t dist;
xpub_t (const xpub_t&);
const xpub_t &operator = (const xpub_t&);