summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-06-20 11:33:54 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-06-20 11:33:54 +0200
commitab99975ad44ed0fe9ab651f31cc47d493e7fb77e (patch)
treeec7d05592886bf48cb4ed60d10aa8f5eaf02dacd /src
parentada5d424721c0c0139b8011a5e9de348d061ba2f (diff)
LABEL flag added to the wire format
So far there was no distinction between message parts used by 0MQ and message parts used by user. Now, the message parts used by 0MQ are marked as 'LABEL'. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/dist.cpp4
-rw-r--r--src/encoder.cpp4
-rw-r--r--src/fq.cpp2
-rw-r--r--src/lb.cpp4
-rw-r--r--src/msg.hpp5
-rw-r--r--src/pipe.cpp9
-rw-r--r--src/rep.cpp6
-rw-r--r--src/req.cpp8
-rw-r--r--src/session.cpp2
-rw-r--r--src/socket_base.cpp25
-rw-r--r--src/socket_base.hpp5
-rw-r--r--src/xpub.cpp2
-rw-r--r--src/xrep.cpp11
-rw-r--r--src/xsub.cpp8
14 files changed, 62 insertions, 33 deletions
diff --git a/src/dist.cpp b/src/dist.cpp
index 15bd168..6b95b2e 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -111,7 +111,7 @@ int zmq::dist_t::send_to_all (msg_t *msg_, int flags_)
int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
{
// Is this end of a multipart message?
- bool msg_more = msg_->flags () & msg_t::more;
+ bool msg_more = msg_->flags () & (msg_t::more | msg_t::label);
// Push the message to matching pipes.
distribute (msg_, flags_);
@@ -170,7 +170,7 @@ bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
eligible--;
return false;
}
- if (!(msg_->flags () & msg_t::more))
+ if (!(msg_->flags () & (msg_t::more | msg_t::label)))
pipe_->flush ();
return true;
}
diff --git a/src/encoder.cpp b/src/encoder.cpp
index a42f06f..f579deb 100644
--- a/src/encoder.cpp
+++ b/src/encoder.cpp
@@ -81,14 +81,14 @@ bool zmq::encoder_t::message_ready ()
tmpbuf [0] = (unsigned char) size;
tmpbuf [1] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 2, &encoder_t::size_ready,
- !(in_progress.flags () & msg_t::more));
+ !(in_progress.flags () & (msg_t::more | msg_t::label)));
}
else {
tmpbuf [0] = 0xff;
put_uint64 (tmpbuf + 1, size);
tmpbuf [9] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 10, &encoder_t::size_ready,
- !(in_progress.flags () & msg_t::more));
+ !(in_progress.flags () & (msg_t::more | msg_t::label)));
}
return true;
}
diff --git a/src/fq.cpp b/src/fq.cpp
index 63a50ff..1a2a0c0 100644
--- a/src/fq.cpp
+++ b/src/fq.cpp
@@ -90,7 +90,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_)
if (fetched) {
if (pipe_)
*pipe_ = pipes [current];
- more = msg_->flags () & msg_t::more;
+ more = msg_->flags () & (msg_t::more | msg_t::label);
if (!more) {
current++;
if (current >= active)
diff --git a/src/lb.cpp b/src/lb.cpp
index 7aeef9e..6101c69 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -75,7 +75,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_)
// switch back to non-dropping mode.
if (dropping) {
- more = msg_->flags () & msg_t::more;
+ more = msg_->flags () & (msg_t::more | msg_t::label);
if (!more)
dropping = false;
@@ -88,7 +88,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_)
while (active > 0) {
if (pipes [current]->write (msg_)) {
- more = msg_->flags () & msg_t::more;
+ more = msg_->flags () & (msg_t::more | msg_t::label);
break;
}
diff --git a/src/msg.hpp b/src/msg.hpp
index 466a96a..602ae55 100644
--- a/src/msg.hpp
+++ b/src/msg.hpp
@@ -47,8 +47,9 @@ namespace zmq
// Mesage flags.
enum
{
- more = 1,
- shared = 128
+ label = 1,
+ shared = 64,
+ more = 128
};
bool check ();
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 9d9614b..1813ca0 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -125,7 +125,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
return false;
}
- if (!(msg_->flags () & msg_t::more))
+ if (!(msg_->flags () & (msg_t::more | msg_t::label)))
msgs_read++;
if (lwm > 0 && msgs_read % lwm == 0)
@@ -154,8 +154,9 @@ bool zmq::pipe_t::write (msg_t *msg_)
if (unlikely (!check_write (msg_)))
return false;
- outpipe->write (*msg_, msg_->flags () & msg_t::more);
- if (!(msg_->flags () & msg_t::more))
+ bool more = msg_->flags () & (msg_t::more | msg_t::label);
+ outpipe->write (*msg_, more);
+ if (!more)
msgs_written++;
return true;
@@ -167,7 +168,7 @@ void zmq::pipe_t::rollback ()
msg_t msg;
if (outpipe) {
while (outpipe->unwrite (&msg)) {
- zmq_assert (msg.flags () & msg_t::more);
+ zmq_assert (msg.flags () & (msg_t::more | msg_t::label));
int rc = msg.close ();
errno_assert (rc == 0);
}
diff --git a/src/rep.cpp b/src/rep.cpp
index 8878bcd..b987d9c 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -42,7 +42,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_)
return -1;
}
- bool more = (msg_->flags () & msg_t::more);
+ bool more = msg_->flags () & (msg_t::more | msg_t::label);
// Push message to the reply pipe.
int rc = xrep_t::xsend (msg_, flags_);
@@ -77,7 +77,7 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
if (rc != 0)
return rc;
- if (msg_->flags () & msg_t::more) {
+ if (msg_->flags () & (msg_t::more | msg_t::label)) {
// Empty message part delimits the traceback stack.
bool bottom = (msg_->size () == 0);
@@ -111,7 +111,7 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
return rc;
// If whole request is read, flip the FSM to reply-sending state.
- if (!(msg_->flags () & msg_t::more)) {
+ if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
sending_reply = true;
request_begins = true;
}
diff --git a/src/req.cpp b/src/req.cpp
index 6bf502f..b0e58dc 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -48,14 +48,14 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
msg_t prefix;
int rc = prefix.init ();
errno_assert (rc == 0);
- prefix.set_flags (msg_t::more);
+ prefix.set_flags (msg_t::label);
rc = xreq_t::xsend (&prefix, flags_);
if (rc != 0)
return rc;
message_begins = false;
}
- bool more = msg_->flags () & msg_t::more;
+ bool more = msg_->flags () & (msg_t::more | msg_t::label);
int rc = xreq_t::xsend (msg_, flags_);
if (rc != 0)
@@ -83,7 +83,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
int rc = xreq_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
- zmq_assert (msg_->flags () & msg_t::more);
+ zmq_assert (msg_->flags () & msg_t::label);
zmq_assert (msg_->size () == 0);
message_begins = false;
}
@@ -93,7 +93,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
return rc;
// If the reply is fully received, flip the FSM into request-sending state.
- if (!(msg_->flags () & msg_t::more)) {
+ if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
receiving_reply = false;
message_begins = true;
}
diff --git a/src/session.cpp b/src/session.cpp
index e700313..334763a 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -71,7 +71,7 @@ bool zmq::session_t::read (msg_t *msg_)
if (!pipe->read (msg_))
return false;
- incomplete_in = msg_->flags () & msg_t::more;
+ incomplete_in = msg_->flags () & (msg_t::more | msg_t::label);
return true;
}
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 16e3f0d..8e8676c 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -119,6 +119,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
destroyed (false),
last_tsc (0),
ticks (0),
+ rcvlabel (false),
rcvmore (false)
{
}
@@ -263,6 +264,16 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return -1;
}
+ if (option_ == ZMQ_RCVLABEL) {
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = rcvlabel ? 1 : 0;
+ *optvallen_ = sizeof (int);
+ return 0;
+ }
+
if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
@@ -479,7 +490,9 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
if (unlikely (rc != 0))
return -1;
- // At this point we impose the MORE flag on the message.
+ // At this point we impose the LABEL & MORE flags on the message.
+ if (flags_ & ZMQ_SNDLABEL)
+ msg_->set_flags (msg_t::label);
if (flags_ & ZMQ_SNDMORE)
msg_->set_flags (msg_t::more);
@@ -558,6 +571,9 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// If we have the message, return immediately.
if (rc == 0) {
+ rcvlabel = msg_->flags () & msg_t::label;
+ if (rcvlabel)
+ msg_->reset_flags (msg_t::label);
rcvmore = msg_->flags () & msg_t::more;
if (rcvmore)
msg_->reset_flags (msg_t::more);
@@ -575,6 +591,9 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
rc = xrecv (msg_, flags_);
if (rc == 0) {
+ rcvlabel = msg_->flags () & msg_t::label;
+ if (rcvlabel)
+ msg_->reset_flags (msg_t::label);
rcvmore = msg_->flags () & msg_t::more;
if (rcvmore)
msg_->reset_flags (msg_t::more);
@@ -611,6 +630,10 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
}
}
+ // Extract LABEL & MORE flags from the message.
+ rcvlabel = msg_->flags () & msg_t::label;
+ if (rcvlabel)
+ msg_->reset_flags (msg_t::label);
rcvmore = msg_->flags () & msg_t::more;
if (rcvmore)
msg_->reset_flags (msg_t::more);
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 69a8aac..f114e9d 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -189,7 +189,10 @@ namespace zmq
// Number of messages received since last command processing.
int ticks;
- // If true there's a half-read message in the socket.
+ // True if the last message received had LABEL flag set.
+ bool rcvlabel;
+
+ // True if the last message received had MORE flag set.
bool rcvmore;
// Lists of existing sessions. This list is never referenced from
diff --git a/src/xpub.cpp b/src/xpub.cpp
index a102b68..f4fe7a1 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -100,7 +100,7 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
{
- bool msg_more = msg_->flags () & msg_t::more;
+ bool msg_more = msg_->flags () & (msg_t::more | msg_t::label);
// For the first part of multi-part message, find the matching pipes.
if (!more)
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 920be8d..b935c06 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -122,7 +122,8 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
// If we have malformed message (prefix with no subsequent message)
// then just silently ignore it.
- if (msg_->flags () & msg_t::more) {
+ // TODO: The connections should be killed instead.
+ if (msg_->flags () & msg_t::label) {
more_out = true;
@@ -158,7 +159,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
}
// Check whether this is the last part of the message.
- more_out = msg_->flags () & msg_t::more;
+ more_out = msg_->flags () & (msg_t::more | msg_t::label);
// Push the message into the pipe. If there's no out pipe, just drop it.
if (current_out) {
@@ -187,7 +188,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
if (prefetched) {
int rc = msg_->move (prefetched_msg);
errno_assert (rc == 0);
- more_in = msg_->flags () & msg_t::more;
+ more_in = msg_->flags () & (msg_t::more | msg_t::label);
prefetched = false;
return 0;
}
@@ -201,7 +202,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
zmq_assert (inpipes [current_in].active);
bool fetched = inpipes [current_in].pipe->read (msg_);
zmq_assert (fetched);
- more_in = msg_->flags () & msg_t::more;
+ more_in = msg_->flags () & (msg_t::more | msg_t::label);
if (!more_in) {
current_in++;
if (current_in >= inpipes.size ())
@@ -223,7 +224,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
errno_assert (rc == 0);
memcpy (msg_->data (), inpipes [current_in].identity.data (),
msg_->size ());
- msg_->set_flags (msg_t::more);
+ msg_->set_flags (msg_t::label);
return 0;
}
diff --git a/src/xsub.cpp b/src/xsub.cpp
index f4160c0..bfe12a3 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -116,7 +116,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
int rc = msg_->move (message);
errno_assert (rc == 0);
has_message = false;
- more = msg_->flags () & msg_t::more;
+ more = msg_->flags () & (msg_t::more | msg_t::label);
return 0;
}
@@ -136,13 +136,13 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (more || !options.filter || match (msg_)) {
- more = msg_->flags () & msg_t::more;
+ more = msg_->flags () & (msg_t::more | msg_t::label);
return 0;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
- while (msg_->flags () & msg_t::more) {
+ while (msg_->flags () & (msg_t::more | msg_t::label)) {
rc = fq.recv (msg_, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
@@ -182,7 +182,7 @@ bool zmq::xsub_t::xhas_in ()
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
- while (message.flags () & msg_t::more) {
+ while (message.flags () & (msg_t::more | msg_t::label)) {
rc = fq.recv (&message, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}