diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/dist.cpp | 4 | ||||
-rw-r--r-- | src/encoder.cpp | 4 | ||||
-rw-r--r-- | src/fq.cpp | 2 | ||||
-rw-r--r-- | src/lb.cpp | 4 | ||||
-rw-r--r-- | src/msg.hpp | 5 | ||||
-rw-r--r-- | src/pipe.cpp | 9 | ||||
-rw-r--r-- | src/rep.cpp | 6 | ||||
-rw-r--r-- | src/req.cpp | 8 | ||||
-rw-r--r-- | src/session.cpp | 2 | ||||
-rw-r--r-- | src/socket_base.cpp | 25 | ||||
-rw-r--r-- | src/socket_base.hpp | 5 | ||||
-rw-r--r-- | src/xpub.cpp | 2 | ||||
-rw-r--r-- | src/xrep.cpp | 11 | ||||
-rw-r--r-- | src/xsub.cpp | 8 |
14 files changed, 62 insertions, 33 deletions
diff --git a/src/dist.cpp b/src/dist.cpp index 15bd168..6b95b2e 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -111,7 +111,7 @@ int zmq::dist_t::send_to_all (msg_t *msg_, int flags_) int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_) { // Is this end of a multipart message? - bool msg_more = msg_->flags () & msg_t::more; + bool msg_more = msg_->flags () & (msg_t::more | msg_t::label); // Push the message to matching pipes. distribute (msg_, flags_); @@ -170,7 +170,7 @@ bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_) eligible--; return false; } - if (!(msg_->flags () & msg_t::more)) + if (!(msg_->flags () & (msg_t::more | msg_t::label))) pipe_->flush (); return true; } diff --git a/src/encoder.cpp b/src/encoder.cpp index a42f06f..f579deb 100644 --- a/src/encoder.cpp +++ b/src/encoder.cpp @@ -81,14 +81,14 @@ bool zmq::encoder_t::message_ready () tmpbuf [0] = (unsigned char) size; tmpbuf [1] = (in_progress.flags () & ~msg_t::shared); next_step (tmpbuf, 2, &encoder_t::size_ready, - !(in_progress.flags () & msg_t::more)); + !(in_progress.flags () & (msg_t::more | msg_t::label))); } else { tmpbuf [0] = 0xff; put_uint64 (tmpbuf + 1, size); tmpbuf [9] = (in_progress.flags () & ~msg_t::shared); next_step (tmpbuf, 10, &encoder_t::size_ready, - !(in_progress.flags () & msg_t::more)); + !(in_progress.flags () & (msg_t::more | msg_t::label))); } return true; } @@ -90,7 +90,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_) if (fetched) { if (pipe_) *pipe_ = pipes [current]; - more = msg_->flags () & msg_t::more; + more = msg_->flags () & (msg_t::more | msg_t::label); if (!more) { current++; if (current >= active) @@ -75,7 +75,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_) // switch back to non-dropping mode. if (dropping) { - more = msg_->flags () & msg_t::more; + more = msg_->flags () & (msg_t::more | msg_t::label); if (!more) dropping = false; @@ -88,7 +88,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_) while (active > 0) { if (pipes [current]->write (msg_)) { - more = msg_->flags () & msg_t::more; + more = msg_->flags () & (msg_t::more | msg_t::label); break; } diff --git a/src/msg.hpp b/src/msg.hpp index 466a96a..602ae55 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -47,8 +47,9 @@ namespace zmq // Mesage flags. enum { - more = 1, - shared = 128 + label = 1, + shared = 64, + more = 128 }; bool check (); diff --git a/src/pipe.cpp b/src/pipe.cpp index 9d9614b..1813ca0 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -125,7 +125,7 @@ bool zmq::pipe_t::read (msg_t *msg_) return false; } - if (!(msg_->flags () & msg_t::more)) + if (!(msg_->flags () & (msg_t::more | msg_t::label))) msgs_read++; if (lwm > 0 && msgs_read % lwm == 0) @@ -154,8 +154,9 @@ bool zmq::pipe_t::write (msg_t *msg_) if (unlikely (!check_write (msg_))) return false; - outpipe->write (*msg_, msg_->flags () & msg_t::more); - if (!(msg_->flags () & msg_t::more)) + bool more = msg_->flags () & (msg_t::more | msg_t::label); + outpipe->write (*msg_, more); + if (!more) msgs_written++; return true; @@ -167,7 +168,7 @@ void zmq::pipe_t::rollback () msg_t msg; if (outpipe) { while (outpipe->unwrite (&msg)) { - zmq_assert (msg.flags () & msg_t::more); + zmq_assert (msg.flags () & (msg_t::more | msg_t::label)); int rc = msg.close (); errno_assert (rc == 0); } diff --git a/src/rep.cpp b/src/rep.cpp index 8878bcd..b987d9c 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -42,7 +42,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_) return -1; } - bool more = (msg_->flags () & msg_t::more); + bool more = msg_->flags () & (msg_t::more | msg_t::label); // Push message to the reply pipe. int rc = xrep_t::xsend (msg_, flags_); @@ -77,7 +77,7 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_) if (rc != 0) return rc; - if (msg_->flags () & msg_t::more) { + if (msg_->flags () & (msg_t::more | msg_t::label)) { // Empty message part delimits the traceback stack. bool bottom = (msg_->size () == 0); @@ -111,7 +111,7 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_) return rc; // If whole request is read, flip the FSM to reply-sending state. - if (!(msg_->flags () & msg_t::more)) { + if (!(msg_->flags () & (msg_t::more | msg_t::label))) { sending_reply = true; request_begins = true; } diff --git a/src/req.cpp b/src/req.cpp index 6bf502f..b0e58dc 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -48,14 +48,14 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_) msg_t prefix; int rc = prefix.init (); errno_assert (rc == 0); - prefix.set_flags (msg_t::more); + prefix.set_flags (msg_t::label); rc = xreq_t::xsend (&prefix, flags_); if (rc != 0) return rc; message_begins = false; } - bool more = msg_->flags () & msg_t::more; + bool more = msg_->flags () & (msg_t::more | msg_t::label); int rc = xreq_t::xsend (msg_, flags_); if (rc != 0) @@ -83,7 +83,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) int rc = xreq_t::xrecv (msg_, flags_); if (rc != 0) return rc; - zmq_assert (msg_->flags () & msg_t::more); + zmq_assert (msg_->flags () & msg_t::label); zmq_assert (msg_->size () == 0); message_begins = false; } @@ -93,7 +93,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) return rc; // If the reply is fully received, flip the FSM into request-sending state. - if (!(msg_->flags () & msg_t::more)) { + if (!(msg_->flags () & (msg_t::more | msg_t::label))) { receiving_reply = false; message_begins = true; } diff --git a/src/session.cpp b/src/session.cpp index e700313..334763a 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -71,7 +71,7 @@ bool zmq::session_t::read (msg_t *msg_) if (!pipe->read (msg_)) return false; - incomplete_in = msg_->flags () & msg_t::more; + incomplete_in = msg_->flags () & (msg_t::more | msg_t::label); return true; } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 16e3f0d..8e8676c 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -119,6 +119,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) : destroyed (false), last_tsc (0), ticks (0), + rcvlabel (false), rcvmore (false) { } @@ -263,6 +264,16 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, return -1; } + if (option_ == ZMQ_RCVLABEL) { + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = rcvlabel ? 1 : 0; + *optvallen_ = sizeof (int); + return 0; + } + if (option_ == ZMQ_RCVMORE) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; @@ -479,7 +490,9 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) if (unlikely (rc != 0)) return -1; - // At this point we impose the MORE flag on the message. + // At this point we impose the LABEL & MORE flags on the message. + if (flags_ & ZMQ_SNDLABEL) + msg_->set_flags (msg_t::label); if (flags_ & ZMQ_SNDMORE) msg_->set_flags (msg_t::more); @@ -558,6 +571,9 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // If we have the message, return immediately. if (rc == 0) { + rcvlabel = msg_->flags () & msg_t::label; + if (rcvlabel) + msg_->reset_flags (msg_t::label); rcvmore = msg_->flags () & msg_t::more; if (rcvmore) msg_->reset_flags (msg_t::more); @@ -575,6 +591,9 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) rc = xrecv (msg_, flags_); if (rc == 0) { + rcvlabel = msg_->flags () & msg_t::label; + if (rcvlabel) + msg_->reset_flags (msg_t::label); rcvmore = msg_->flags () & msg_t::more; if (rcvmore) msg_->reset_flags (msg_t::more); @@ -611,6 +630,10 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) } } + // Extract LABEL & MORE flags from the message. + rcvlabel = msg_->flags () & msg_t::label; + if (rcvlabel) + msg_->reset_flags (msg_t::label); rcvmore = msg_->flags () & msg_t::more; if (rcvmore) msg_->reset_flags (msg_t::more); diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 69a8aac..f114e9d 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -189,7 +189,10 @@ namespace zmq // Number of messages received since last command processing. int ticks; - // If true there's a half-read message in the socket. + // True if the last message received had LABEL flag set. + bool rcvlabel; + + // True if the last message received had MORE flag set. bool rcvmore; // Lists of existing sessions. This list is never referenced from diff --git a/src/xpub.cpp b/src/xpub.cpp index a102b68..f4fe7a1 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -100,7 +100,7 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) int zmq::xpub_t::xsend (msg_t *msg_, int flags_) { - bool msg_more = msg_->flags () & msg_t::more; + bool msg_more = msg_->flags () & (msg_t::more | msg_t::label); // For the first part of multi-part message, find the matching pipes. if (!more) diff --git a/src/xrep.cpp b/src/xrep.cpp index 920be8d..b935c06 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -122,7 +122,8 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) // If we have malformed message (prefix with no subsequent message) // then just silently ignore it. - if (msg_->flags () & msg_t::more) { + // TODO: The connections should be killed instead. + if (msg_->flags () & msg_t::label) { more_out = true; @@ -158,7 +159,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) } // Check whether this is the last part of the message. - more_out = msg_->flags () & msg_t::more; + more_out = msg_->flags () & (msg_t::more | msg_t::label); // Push the message into the pipe. If there's no out pipe, just drop it. if (current_out) { @@ -187,7 +188,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) if (prefetched) { int rc = msg_->move (prefetched_msg); errno_assert (rc == 0); - more_in = msg_->flags () & msg_t::more; + more_in = msg_->flags () & (msg_t::more | msg_t::label); prefetched = false; return 0; } @@ -201,7 +202,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) zmq_assert (inpipes [current_in].active); bool fetched = inpipes [current_in].pipe->read (msg_); zmq_assert (fetched); - more_in = msg_->flags () & msg_t::more; + more_in = msg_->flags () & (msg_t::more | msg_t::label); if (!more_in) { current_in++; if (current_in >= inpipes.size ()) @@ -223,7 +224,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) errno_assert (rc == 0); memcpy (msg_->data (), inpipes [current_in].identity.data (), msg_->size ()); - msg_->set_flags (msg_t::more); + msg_->set_flags (msg_t::label); return 0; } diff --git a/src/xsub.cpp b/src/xsub.cpp index f4160c0..bfe12a3 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -116,7 +116,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_) int rc = msg_->move (message); errno_assert (rc == 0); has_message = false; - more = msg_->flags () & msg_t::more; + more = msg_->flags () & (msg_t::more | msg_t::label); return 0; } @@ -136,13 +136,13 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_) // Check whether the message matches at least one subscription. // Non-initial parts of the message are passed if (more || !options.filter || match (msg_)) { - more = msg_->flags () & msg_t::more; + more = msg_->flags () & (msg_t::more | msg_t::label); return 0; } // Message doesn't match. Pop any remaining parts of the message // from the pipe. - while (msg_->flags () & msg_t::more) { + while (msg_->flags () & (msg_t::more | msg_t::label)) { rc = fq.recv (msg_, ZMQ_DONTWAIT); zmq_assert (rc == 0); } @@ -182,7 +182,7 @@ bool zmq::xsub_t::xhas_in () // Message doesn't match. Pop any remaining parts of the message // from the pipe. - while (message.flags () & msg_t::more) { + while (message.flags () & (msg_t::more | msg_t::label)) { rc = fq.recv (&message, ZMQ_DONTWAIT); zmq_assert (rc == 0); } |