summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/dist.cpp4
-rw-r--r--src/encoder.cpp4
-rw-r--r--src/fq.cpp2
-rw-r--r--src/lb.cpp4
-rw-r--r--src/msg.hpp5
-rw-r--r--src/pipe.cpp9
-rw-r--r--src/rep.cpp6
-rw-r--r--src/req.cpp8
-rw-r--r--src/session.cpp2
-rw-r--r--src/socket_base.cpp25
-rw-r--r--src/socket_base.hpp5
-rw-r--r--src/xpub.cpp2
-rw-r--r--src/xrep.cpp11
-rw-r--r--src/xsub.cpp8
14 files changed, 62 insertions, 33 deletions
diff --git a/src/dist.cpp b/src/dist.cpp
index 15bd168..6b95b2e 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -111,7 +111,7 @@ int zmq::dist_t::send_to_all (msg_t *msg_, int flags_)
int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
{
// Is this end of a multipart message?
- bool msg_more = msg_->flags () & msg_t::more;
+ bool msg_more = msg_->flags () & (msg_t::more | msg_t::label);
// Push the message to matching pipes.
distribute (msg_, flags_);
@@ -170,7 +170,7 @@ bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
eligible--;
return false;
}
- if (!(msg_->flags () & msg_t::more))
+ if (!(msg_->flags () & (msg_t::more | msg_t::label)))
pipe_->flush ();
return true;
}
diff --git a/src/encoder.cpp b/src/encoder.cpp
index a42f06f..f579deb 100644
--- a/src/encoder.cpp
+++ b/src/encoder.cpp
@@ -81,14 +81,14 @@ bool zmq::encoder_t::message_ready ()
tmpbuf [0] = (unsigned char) size;
tmpbuf [1] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 2, &encoder_t::size_ready,
- !(in_progress.flags () & msg_t::more));
+ !(in_progress.flags () & (msg_t::more | msg_t::label)));
}
else {
tmpbuf [0] = 0xff;
put_uint64 (tmpbuf + 1, size);
tmpbuf [9] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 10, &encoder_t::size_ready,
- !(in_progress.flags () & msg_t::more));
+ !(in_progress.flags () & (msg_t::more | msg_t::label)));
}
return true;
}
diff --git a/src/fq.cpp b/src/fq.cpp
index 63a50ff..1a2a0c0 100644
--- a/src/fq.cpp
+++ b/src/fq.cpp
@@ -90,7 +90,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_)
if (fetched) {
if (pipe_)
*pipe_ = pipes [current];
- more = msg_->flags () & msg_t::more;
+ more = msg_->flags () & (msg_t::more | msg_t::label);
if (!more) {
current++;
if (current >= active)
diff --git a/src/lb.cpp b/src/lb.cpp
index 7aeef9e..6101c69 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -75,7 +75,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_)
// switch back to non-dropping mode.
if (dropping) {
- more = msg_->flags () & msg_t::more;
+ more = msg_->flags () & (msg_t::more | msg_t::label);
if (!more)
dropping = false;
@@ -88,7 +88,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_)
while (active > 0) {
if (pipes [current]->write (msg_)) {
- more = msg_->flags () & msg_t::more;
+ more = msg_->flags () & (msg_t::more | msg_t::label);
break;
}
diff --git a/src/msg.hpp b/src/msg.hpp
index 466a96a..602ae55 100644
--- a/src/msg.hpp
+++ b/src/msg.hpp
@@ -47,8 +47,9 @@ namespace zmq
// Mesage flags.
enum
{
- more = 1,
- shared = 128
+ label = 1,
+ shared = 64,
+ more = 128
};
bool check ();
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 9d9614b..1813ca0 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -125,7 +125,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
return false;
}
- if (!(msg_->flags () & msg_t::more))
+ if (!(msg_->flags () & (msg_t::more | msg_t::label)))
msgs_read++;
if (lwm > 0 && msgs_read % lwm == 0)
@@ -154,8 +154,9 @@ bool zmq::pipe_t::write (msg_t *msg_)
if (unlikely (!check_write (msg_)))
return false;
- outpipe->write (*msg_, msg_->flags () & msg_t::more);
- if (!(msg_->flags () & msg_t::more))
+ bool more = msg_->flags () & (msg_t::more | msg_t::label);
+ outpipe->write (*msg_, more);
+ if (!more)
msgs_written++;
return true;
@@ -167,7 +168,7 @@ void zmq::pipe_t::rollback ()
msg_t msg;
if (outpipe) {
while (outpipe->unwrite (&msg)) {
- zmq_assert (msg.flags () & msg_t::more);
+ zmq_assert (msg.flags () & (msg_t::more | msg_t::label));
int rc = msg.close ();
errno_assert (rc == 0);
}
diff --git a/src/rep.cpp b/src/rep.cpp
index 8878bcd..b987d9c 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -42,7 +42,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_)
return -1;
}
- bool more = (msg_->flags () & msg_t::more);
+ bool more = msg_->flags () & (msg_t::more | msg_t::label);
// Push message to the reply pipe.
int rc = xrep_t::xsend (msg_, flags_);
@@ -77,7 +77,7 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
if (rc != 0)
return rc;
- if (msg_->flags () & msg_t::more) {
+ if (msg_->flags () & (msg_t::more | msg_t::label)) {
// Empty message part delimits the traceback stack.
bool bottom = (msg_->size () == 0);
@@ -111,7 +111,7 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
return rc;
// If whole request is read, flip the FSM to reply-sending state.
- if (!(msg_->flags () & msg_t::more)) {
+ if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
sending_reply = true;
request_begins = true;
}
diff --git a/src/req.cpp b/src/req.cpp
index 6bf502f..b0e58dc 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -48,14 +48,14 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
msg_t prefix;
int rc = prefix.init ();
errno_assert (rc == 0);
- prefix.set_flags (msg_t::more);
+ prefix.set_flags (msg_t::label);
rc = xreq_t::xsend (&prefix, flags_);
if (rc != 0)
return rc;
message_begins = false;
}
- bool more = msg_->flags () & msg_t::more;
+ bool more = msg_->flags () & (msg_t::more | msg_t::label);
int rc = xreq_t::xsend (msg_, flags_);
if (rc != 0)
@@ -83,7 +83,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
int rc = xreq_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
- zmq_assert (msg_->flags () & msg_t::more);
+ zmq_assert (msg_->flags () & msg_t::label);
zmq_assert (msg_->size () == 0);
message_begins = false;
}
@@ -93,7 +93,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
return rc;
// If the reply is fully received, flip the FSM into request-sending state.
- if (!(msg_->flags () & msg_t::more)) {
+ if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
receiving_reply = false;
message_begins = true;
}
diff --git a/src/session.cpp b/src/session.cpp
index e700313..334763a 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -71,7 +71,7 @@ bool zmq::session_t::read (msg_t *msg_)
if (!pipe->read (msg_))
return false;
- incomplete_in = msg_->flags () & msg_t::more;
+ incomplete_in = msg_->flags () & (msg_t::more | msg_t::label);
return true;
}
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 16e3f0d..8e8676c 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -119,6 +119,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
destroyed (false),
last_tsc (0),
ticks (0),
+ rcvlabel (false),
rcvmore (false)
{
}
@@ -263,6 +264,16 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return -1;
}
+ if (option_ == ZMQ_RCVLABEL) {
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = rcvlabel ? 1 : 0;
+ *optvallen_ = sizeof (int);
+ return 0;
+ }
+
if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
@@ -479,7 +490,9 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
if (unlikely (rc != 0))
return -1;
- // At this point we impose the MORE flag on the message.
+ // At this point we impose the LABEL & MORE flags on the message.
+ if (flags_ & ZMQ_SNDLABEL)
+ msg_->set_flags (msg_t::label);
if (flags_ & ZMQ_SNDMORE)
msg_->set_flags (msg_t::more);
@@ -558,6 +571,9 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// If we have the message, return immediately.
if (rc == 0) {
+ rcvlabel = msg_->flags () & msg_t::label;
+ if (rcvlabel)
+ msg_->reset_flags (msg_t::label);
rcvmore = msg_->flags () & msg_t::more;
if (rcvmore)
msg_->reset_flags (msg_t::more);
@@ -575,6 +591,9 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
rc = xrecv (msg_, flags_);
if (rc == 0) {
+ rcvlabel = msg_->flags () & msg_t::label;
+ if (rcvlabel)
+ msg_->reset_flags (msg_t::label);
rcvmore = msg_->flags () & msg_t::more;
if (rcvmore)
msg_->reset_flags (msg_t::more);
@@ -611,6 +630,10 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
}
}
+ // Extract LABEL & MORE flags from the message.
+ rcvlabel = msg_->flags () & msg_t::label;
+ if (rcvlabel)
+ msg_->reset_flags (msg_t::label);
rcvmore = msg_->flags () & msg_t::more;
if (rcvmore)
msg_->reset_flags (msg_t::more);
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 69a8aac..f114e9d 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -189,7 +189,10 @@ namespace zmq
// Number of messages received since last command processing.
int ticks;
- // If true there's a half-read message in the socket.
+ // True if the last message received had LABEL flag set.
+ bool rcvlabel;
+
+ // True if the last message received had MORE flag set.
bool rcvmore;
// Lists of existing sessions. This list is never referenced from
diff --git a/src/xpub.cpp b/src/xpub.cpp
index a102b68..f4fe7a1 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -100,7 +100,7 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
{
- bool msg_more = msg_->flags () & msg_t::more;
+ bool msg_more = msg_->flags () & (msg_t::more | msg_t::label);
// For the first part of multi-part message, find the matching pipes.
if (!more)
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 920be8d..b935c06 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -122,7 +122,8 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
// If we have malformed message (prefix with no subsequent message)
// then just silently ignore it.
- if (msg_->flags () & msg_t::more) {
+ // TODO: The connections should be killed instead.
+ if (msg_->flags () & msg_t::label) {
more_out = true;
@@ -158,7 +159,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
}
// Check whether this is the last part of the message.
- more_out = msg_->flags () & msg_t::more;
+ more_out = msg_->flags () & (msg_t::more | msg_t::label);
// Push the message into the pipe. If there's no out pipe, just drop it.
if (current_out) {
@@ -187,7 +188,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
if (prefetched) {
int rc = msg_->move (prefetched_msg);
errno_assert (rc == 0);
- more_in = msg_->flags () & msg_t::more;
+ more_in = msg_->flags () & (msg_t::more | msg_t::label);
prefetched = false;
return 0;
}
@@ -201,7 +202,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
zmq_assert (inpipes [current_in].active);
bool fetched = inpipes [current_in].pipe->read (msg_);
zmq_assert (fetched);
- more_in = msg_->flags () & msg_t::more;
+ more_in = msg_->flags () & (msg_t::more | msg_t::label);
if (!more_in) {
current_in++;
if (current_in >= inpipes.size ())
@@ -223,7 +224,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
errno_assert (rc == 0);
memcpy (msg_->data (), inpipes [current_in].identity.data (),
msg_->size ());
- msg_->set_flags (msg_t::more);
+ msg_->set_flags (msg_t::label);
return 0;
}
diff --git a/src/xsub.cpp b/src/xsub.cpp
index f4160c0..bfe12a3 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -116,7 +116,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
int rc = msg_->move (message);
errno_assert (rc == 0);
has_message = false;
- more = msg_->flags () & msg_t::more;
+ more = msg_->flags () & (msg_t::more | msg_t::label);
return 0;
}
@@ -136,13 +136,13 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (more || !options.filter || match (msg_)) {
- more = msg_->flags () & msg_t::more;
+ more = msg_->flags () & (msg_t::more | msg_t::label);
return 0;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
- while (msg_->flags () & msg_t::more) {
+ while (msg_->flags () & (msg_t::more | msg_t::label)) {
rc = fq.recv (msg_, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
@@ -182,7 +182,7 @@ bool zmq::xsub_t::xhas_in ()
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
- while (message.flags () & msg_t::more) {
+ while (message.flags () & (msg_t::more | msg_t::label)) {
rc = fq.recv (&message, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}