summaryrefslogtreecommitdiff
path: root/src/dist.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-04-21 22:27:48 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-04-21 22:27:48 +0200
commite0246e32d79d71f8e73207b43aed8b23648e4fc7 (patch)
tree9952ee6fd39f4e27bbe932f6b6f30f0073009369 /src/dist.cpp
parent581697695aac72894f2d3fefac904b9d50b3ba67 (diff)
Message-related functionality factored out into msg_t class.
This patch addresses serveral issues: 1. It gathers message related functionality scattered over whole codebase into a single class. 2. It makes zmq_msg_t an opaque datatype. Internals of the class don't pollute zmq.h header file. 3. zmq_msg_t size decreases from 48 to 32 bytes. That saves ~33% of memory in scenarios with large amount of small messages. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/dist.cpp')
-rw-r--r--src/dist.cpp66
1 files changed, 17 insertions, 49 deletions
diff --git a/src/dist.cpp b/src/dist.cpp
index 9d50368..093da79 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -18,8 +18,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "dist.hpp"
#include "pipe.hpp"
#include "err.hpp"
@@ -89,10 +87,10 @@ void zmq::dist_t::activated (writer_t *pipe_)
active++;
}
-int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
+int zmq::dist_t::send (msg_t *msg_, int flags_)
{
// Is this end of a multipart message?
- bool msg_more = msg_->flags & ZMQ_MSG_MORE;
+ bool msg_more = msg_->flags () & msg_t::more;
// Push the message to active pipes.
distribute (msg_, flags_);
@@ -106,63 +104,33 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
return 0;
}
-void zmq::dist_t::distribute (zmq_msg_t *msg_, int flags_)
+void zmq::dist_t::distribute (msg_t *msg_, int flags_)
{
// If there are no active pipes available, simply drop the message.
if (active == 0) {
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return;
- }
-
- msg_content_t *content = (msg_content_t*) msg_->content;
-
- // For VSMs the copying is straighforward.
- if (content == (msg_content_t*) ZMQ_VSM) {
- for (pipes_t::size_type i = 0; i < active;)
- if (write (pipes [i], msg_))
- i++;
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return;
- }
-
- // Optimisation for the case when there's only a single pipe
- // to send the message to - no refcount adjustment i.e. no atomic
- // operations are needed.
- if (active == 1) {
- if (!write (pipes [0], msg_)) {
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- }
- int rc = zmq_msg_init (msg_);
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ rc = msg_->init ();
zmq_assert (rc == 0);
return;
}
- // There are at least 2 destinations for the message. That means we have
- // to deal with reference counting. First add N-1 references to
- // the content (we are holding one reference anyway, that's why -1).
- if (msg_->flags & ZMQ_MSG_SHARED)
- content->refcnt.add (active - 1);
- else {
- content->refcnt.set (active);
- msg_->flags |= ZMQ_MSG_SHARED;
- }
+ // Add active-1 references to the message. We already hold one reference,
+ // that's why -1.
+ msg_->add_refs (active - 1);
- // Push the message to all destinations.
+ // Push copy of the message to each active pipe.
for (pipes_t::size_type i = 0; i < active;) {
if (!write (pipes [i], msg_))
- content->refcnt.sub (1);
+ msg_->rm_refs (1);
else
i++;
}
- // Detach the original message from the data buffer.
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
+ // Detach the original message from the data buffer. Note that we don't
+ // close the message. That's because we've already used all the references.
+ int rc = msg_->init ();
+ errno_assert (rc == 0);
}
bool zmq::dist_t::has_out ()
@@ -170,14 +138,14 @@ bool zmq::dist_t::has_out ()
return true;
}
-bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_)
+bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_)
{
if (!pipe_->write (msg_)) {
active--;
pipes.swap (pipes.index (pipe_), active);
return false;
}
- if (!(msg_->flags & ZMQ_MSG_MORE))
+ if (!(msg_->flags () & msg_t::more))
pipe_->flush ();
return true;
}