From e0246e32d79d71f8e73207b43aed8b23648e4fc7 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 21 Apr 2011 22:27:48 +0200 Subject: Message-related functionality factored out into msg_t class. This patch addresses serveral issues: 1. It gathers message related functionality scattered over whole codebase into a single class. 2. It makes zmq_msg_t an opaque datatype. Internals of the class don't pollute zmq.h header file. 3. zmq_msg_t size decreases from 48 to 32 bytes. That saves ~33% of memory in scenarios with large amount of small messages. Signed-off-by: Martin Sustrik --- src/xrep.cpp | 69 ++++++++++++++++++++++++++++++------------------------------ 1 file changed, 34 insertions(+), 35 deletions(-) (limited to 'src/xrep.cpp') diff --git a/src/xrep.cpp b/src/xrep.cpp index 75dc30e..5e01e2f 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -18,11 +18,9 @@ along with this program. If not, see . */ -#include "../include/zmq.h" - #include "xrep.hpp" -#include "err.hpp" #include "pipe.hpp" +#include "err.hpp" zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : socket_base_t (parent_, tid_), @@ -159,7 +157,7 @@ void zmq::xrep_t::activated (writer_t *pipe_) zmq_assert (false); } -int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) +int zmq::xrep_t::xsend (msg_t *msg_, int flags_) { // If this is the first part of the message it's the identity of the // peer to send the message to. @@ -168,44 +166,43 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) // If we have malformed message (prefix with no subsequent message) // then just silently ignore it. - if (msg_->flags & ZMQ_MSG_MORE) { + if (msg_->flags () & msg_t::more) { more_out = true; // Find the pipe associated with the identity stored in the prefix. // If there's no such pipe just silently ignore the message. - blob_t identity ((unsigned char*) zmq_msg_data (msg_), - zmq_msg_size (msg_)); + blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); outpipes_t::iterator it = outpipes.find (identity); if (it != outpipes.end ()) { current_out = it->second.writer; - zmq_msg_t empty; - int rc = zmq_msg_init (&empty); - zmq_assert (rc == 0); + msg_t empty; + int rc = empty.init (); + errno_assert (rc == 0); if (!current_out->check_write (&empty)) { it->second.active = false; more_out = false; current_out = NULL; - rc = zmq_msg_close (&empty); - zmq_assert (rc == 0); + rc = empty.close (); + errno_assert (rc == 0); errno = EAGAIN; return -1; } - rc = zmq_msg_close (&empty); - zmq_assert (rc == 0); + rc = empty.close (); + errno_assert (rc == 0); } } - int rc = zmq_msg_close (msg_); - zmq_assert (rc == 0); - rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); + errno_assert (rc == 0); return 0; } // Check whether this is the last part of the message. - more_out = msg_->flags & ZMQ_MSG_MORE; + more_out = msg_->flags () & msg_t::more; // Push the message into the pipe. If there's no out pipe, just drop it. if (current_out) { @@ -217,36 +214,38 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) } } else { - int rc = zmq_msg_close (msg_); - zmq_assert (rc == 0); + int rc = msg_->close (); + errno_assert (rc == 0); } // Detach the message from the data buffer. - int rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); + int rc = msg_->init (); + errno_assert (rc == 0); return 0; } -int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) +int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) { // If there is a prefetched message, return it. if (prefetched) { - zmq_msg_move (msg_, &prefetched_msg); - more_in = msg_->flags & ZMQ_MSG_MORE; + int rc = msg_->move (prefetched_msg); + errno_assert (rc == 0); + more_in = msg_->flags () & msg_t::more; prefetched = false; return 0; } // Deallocate old content of the message. - zmq_msg_close (msg_); + int rc = msg_->close (); + errno_assert (rc == 0); // If we are in the middle of reading a message, just grab next part of it. if (more_in) { zmq_assert (inpipes [current_in].active); bool fetched = inpipes [current_in].reader->read (msg_); zmq_assert (fetched); - more_in = msg_->flags & ZMQ_MSG_MORE; + more_in = msg_->flags () & msg_t::more; if (!more_in) { current_in++; if (current_in >= inpipes.size ()) @@ -264,12 +263,11 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) // If we have a message, create a prefix and return it to the caller. if (prefetched) { - int rc = zmq_msg_init_size (msg_, - inpipes [current_in].identity.size ()); - zmq_assert (rc == 0); - memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (), - zmq_msg_size (msg_)); - msg_->flags |= ZMQ_MSG_MORE; + int rc = msg_->init_size (inpipes [current_in].identity.size ()); + errno_assert (rc == 0); + memcpy (msg_->data (), inpipes [current_in].identity.data (), + msg_->size ()); + msg_->set_flags (msg_t::more); return 0; } @@ -283,7 +281,8 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) // No message is available. Initialise the output parameter // to be a 0-byte message. - zmq_msg_init (msg_); + rc = msg_->init (); + errno_assert (rc == 0); errno = EAGAIN; return -1; } -- cgit v1.2.3