summaryrefslogtreecommitdiff
path: root/src/xsub.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/xsub.cpp')
-rw-r--r--src/xsub.cpp41
1 files changed, 21 insertions, 20 deletions
diff --git a/src/xsub.cpp b/src/xsub.cpp
index b0c5795..b0e8cd2 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -20,8 +20,6 @@
#include <string.h>
-#include "../include/zmq.h"
-
#include "xsub.hpp"
#include "err.hpp"
@@ -34,12 +32,14 @@ zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) :
options.type = ZMQ_XSUB;
options.requires_in = true;
options.requires_out = false;
- zmq_msg_init (&message);
+ int rc = message.init ();
+ errno_assert (rc == 0);
}
zmq::xsub_t::~xsub_t ()
{
- zmq_msg_close (&message);
+ int rc = message.close ();
+ errno_assert (rc == 0);
}
void zmq::xsub_t::xattach_pipes (class reader_t *inpipe_,
@@ -55,10 +55,10 @@ void zmq::xsub_t::process_term (int linger_)
socket_base_t::process_term (linger_);
}
-int zmq::xsub_t::xsend (zmq_msg_t *msg_, int options_)
+int zmq::xsub_t::xsend (msg_t *msg_, int options_)
{
- size_t size = zmq_msg_size (msg_);
- unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
+ size_t size = msg_->size ();
+ unsigned char *data = (unsigned char*) msg_->data ();
// Malformed subscriptions are dropped silently.
if (size >= 1) {
@@ -72,10 +72,10 @@ int zmq::xsub_t::xsend (zmq_msg_t *msg_, int options_)
subscriptions.rm (data + 1, size - 1);
}
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ rc = msg_->init ();
+ errno_assert (rc == 0);
return 0;
}
@@ -85,14 +85,15 @@ bool zmq::xsub_t::xhas_out ()
return true;
}
-int zmq::xsub_t::xrecv (zmq_msg_t *msg_, int flags_)
+int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
{
// If there's already a message prepared by a previous call to zmq_poll,
// return it straight ahead.
if (has_message) {
- zmq_msg_move (msg_, &message);
+ int rc = msg_->move (message);
+ errno_assert (rc == 0);
has_message = false;
- more = msg_->flags & ZMQ_MSG_MORE;
+ more = msg_->flags () & msg_t::more;
return 0;
}
@@ -112,13 +113,13 @@ int zmq::xsub_t::xrecv (zmq_msg_t *msg_, int flags_)
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (more || match (msg_)) {
- more = msg_->flags & ZMQ_MSG_MORE;
+ more = msg_->flags () & msg_t::more;
return 0;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
- while (msg_->flags & ZMQ_MSG_MORE) {
+ while (msg_->flags () & msg_t::more) {
rc = fq.recv (msg_, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
@@ -158,15 +159,15 @@ bool zmq::xsub_t::xhas_in ()
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
- while (message.flags & ZMQ_MSG_MORE) {
+ while (message.flags () & msg_t::more) {
rc = fq.recv (&message, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
}
}
-bool zmq::xsub_t::match (zmq_msg_t *msg_)
+bool zmq::xsub_t::match (msg_t *msg_)
{
- return subscriptions.check ((unsigned char*) zmq_msg_data (msg_),
- zmq_msg_size (msg_));
+ return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
}
+