summaryrefslogtreecommitdiff
path: root/src/dist.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/dist.cpp')
-rw-r--r--src/dist.cpp66
1 files changed, 17 insertions, 49 deletions
diff --git a/src/dist.cpp b/src/dist.cpp
index 9d50368..093da79 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -18,8 +18,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "dist.hpp"
#include "pipe.hpp"
#include "err.hpp"
@@ -89,10 +87,10 @@ void zmq::dist_t::activated (writer_t *pipe_)
active++;
}
-int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
+int zmq::dist_t::send (msg_t *msg_, int flags_)
{
// Is this end of a multipart message?
- bool msg_more = msg_->flags & ZMQ_MSG_MORE;
+ bool msg_more = msg_->flags () & msg_t::more;
// Push the message to active pipes.
distribute (msg_, flags_);
@@ -106,63 +104,33 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
return 0;
}
-void zmq::dist_t::distribute (zmq_msg_t *msg_, int flags_)
+void zmq::dist_t::distribute (msg_t *msg_, int flags_)
{
// If there are no active pipes available, simply drop the message.
if (active == 0) {
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return;
- }
-
- msg_content_t *content = (msg_content_t*) msg_->content;
-
- // For VSMs the copying is straighforward.
- if (content == (msg_content_t*) ZMQ_VSM) {
- for (pipes_t::size_type i = 0; i < active;)
- if (write (pipes [i], msg_))
- i++;
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return;
- }
-
- // Optimisation for the case when there's only a single pipe
- // to send the message to - no refcount adjustment i.e. no atomic
- // operations are needed.
- if (active == 1) {
- if (!write (pipes [0], msg_)) {
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- }
- int rc = zmq_msg_init (msg_);
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ rc = msg_->init ();
zmq_assert (rc == 0);
return;
}
- // There are at least 2 destinations for the message. That means we have
- // to deal with reference counting. First add N-1 references to
- // the content (we are holding one reference anyway, that's why -1).
- if (msg_->flags & ZMQ_MSG_SHARED)
- content->refcnt.add (active - 1);
- else {
- content->refcnt.set (active);
- msg_->flags |= ZMQ_MSG_SHARED;
- }
+ // Add active-1 references to the message. We already hold one reference,
+ // that's why -1.
+ msg_->add_refs (active - 1);
- // Push the message to all destinations.
+ // Push copy of the message to each active pipe.
for (pipes_t::size_type i = 0; i < active;) {
if (!write (pipes [i], msg_))
- content->refcnt.sub (1);
+ msg_->rm_refs (1);
else
i++;
}
- // Detach the original message from the data buffer.
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
+ // Detach the original message from the data buffer. Note that we don't
+ // close the message. That's because we've already used all the references.
+ int rc = msg_->init ();
+ errno_assert (rc == 0);
}
bool zmq::dist_t::has_out ()
@@ -170,14 +138,14 @@ bool zmq::dist_t::has_out ()
return true;
}
-bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_)
+bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_)
{
if (!pipe_->write (msg_)) {
active--;
pipes.swap (pipes.index (pipe_), active);
return false;
}
- if (!(msg_->flags & ZMQ_MSG_MORE))
+ if (!(msg_->flags () & msg_t::more))
pipe_->flush ();
return true;
}