summaryrefslogtreecommitdiff
path: root/src/xrep.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-04-21 22:27:48 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-04-21 22:27:48 +0200
commite0246e32d79d71f8e73207b43aed8b23648e4fc7 (patch)
tree9952ee6fd39f4e27bbe932f6b6f30f0073009369 /src/xrep.cpp
parent581697695aac72894f2d3fefac904b9d50b3ba67 (diff)
Message-related functionality factored out into msg_t class.
This patch addresses serveral issues: 1. It gathers message related functionality scattered over whole codebase into a single class. 2. It makes zmq_msg_t an opaque datatype. Internals of the class don't pollute zmq.h header file. 3. zmq_msg_t size decreases from 48 to 32 bytes. That saves ~33% of memory in scenarios with large amount of small messages. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xrep.cpp')
-rw-r--r--src/xrep.cpp69
1 files changed, 34 insertions, 35 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 75dc30e..5e01e2f 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -18,11 +18,9 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zmq.h"
-
#include "xrep.hpp"
-#include "err.hpp"
#include "pipe.hpp"
+#include "err.hpp"
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
@@ -159,7 +157,7 @@ void zmq::xrep_t::activated (writer_t *pipe_)
zmq_assert (false);
}
-int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
+int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
{
// If this is the first part of the message it's the identity of the
// peer to send the message to.
@@ -168,44 +166,43 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
// If we have malformed message (prefix with no subsequent message)
// then just silently ignore it.
- if (msg_->flags & ZMQ_MSG_MORE) {
+ if (msg_->flags () & msg_t::more) {
more_out = true;
// Find the pipe associated with the identity stored in the prefix.
// If there's no such pipe just silently ignore the message.
- blob_t identity ((unsigned char*) zmq_msg_data (msg_),
- zmq_msg_size (msg_));
+ blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
outpipes_t::iterator it = outpipes.find (identity);
if (it != outpipes.end ()) {
current_out = it->second.writer;
- zmq_msg_t empty;
- int rc = zmq_msg_init (&empty);
- zmq_assert (rc == 0);
+ msg_t empty;
+ int rc = empty.init ();
+ errno_assert (rc == 0);
if (!current_out->check_write (&empty)) {
it->second.active = false;
more_out = false;
current_out = NULL;
- rc = zmq_msg_close (&empty);
- zmq_assert (rc == 0);
+ rc = empty.close ();
+ errno_assert (rc == 0);
errno = EAGAIN;
return -1;
}
- rc = zmq_msg_close (&empty);
- zmq_assert (rc == 0);
+ rc = empty.close ();
+ errno_assert (rc == 0);
}
}
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ rc = msg_->init ();
+ errno_assert (rc == 0);
return 0;
}
// Check whether this is the last part of the message.
- more_out = msg_->flags & ZMQ_MSG_MORE;
+ more_out = msg_->flags () & msg_t::more;
// Push the message into the pipe. If there's no out pipe, just drop it.
if (current_out) {
@@ -217,36 +214,38 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
}
}
else {
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
}
// Detach the message from the data buffer.
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
+ int rc = msg_->init ();
+ errno_assert (rc == 0);
return 0;
}
-int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
+int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
{
// If there is a prefetched message, return it.
if (prefetched) {
- zmq_msg_move (msg_, &prefetched_msg);
- more_in = msg_->flags & ZMQ_MSG_MORE;
+ int rc = msg_->move (prefetched_msg);
+ errno_assert (rc == 0);
+ more_in = msg_->flags () & msg_t::more;
prefetched = false;
return 0;
}
// Deallocate old content of the message.
- zmq_msg_close (msg_);
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
// If we are in the middle of reading a message, just grab next part of it.
if (more_in) {
zmq_assert (inpipes [current_in].active);
bool fetched = inpipes [current_in].reader->read (msg_);
zmq_assert (fetched);
- more_in = msg_->flags & ZMQ_MSG_MORE;
+ more_in = msg_->flags () & msg_t::more;
if (!more_in) {
current_in++;
if (current_in >= inpipes.size ())
@@ -264,12 +263,11 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
// If we have a message, create a prefix and return it to the caller.
if (prefetched) {
- int rc = zmq_msg_init_size (msg_,
- inpipes [current_in].identity.size ());
- zmq_assert (rc == 0);
- memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (),
- zmq_msg_size (msg_));
- msg_->flags |= ZMQ_MSG_MORE;
+ int rc = msg_->init_size (inpipes [current_in].identity.size ());
+ errno_assert (rc == 0);
+ memcpy (msg_->data (), inpipes [current_in].identity.data (),
+ msg_->size ());
+ msg_->set_flags (msg_t::more);
return 0;
}
@@ -283,7 +281,8 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
// No message is available. Initialise the output parameter
// to be a 0-byte message.
- zmq_msg_init (msg_);
+ rc = msg_->init ();
+ errno_assert (rc == 0);
errno = EAGAIN;
return -1;
}