diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-06-20 11:33:54 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-06-20 11:33:54 +0200 |
commit | ab99975ad44ed0fe9ab651f31cc47d493e7fb77e (patch) | |
tree | ec7d05592886bf48cb4ed60d10aa8f5eaf02dacd /src/xrep.cpp | |
parent | ada5d424721c0c0139b8011a5e9de348d061ba2f (diff) |
LABEL flag added to the wire format
So far there was no distinction between message parts used by 0MQ
and message parts used by user. Now, the message parts used by 0MQ
are marked as 'LABEL'.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xrep.cpp')
-rw-r--r-- | src/xrep.cpp | 11 |
1 files changed, 6 insertions, 5 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp index 920be8d..b935c06 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -122,7 +122,8 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) // If we have malformed message (prefix with no subsequent message) // then just silently ignore it. - if (msg_->flags () & msg_t::more) { + // TODO: The connections should be killed instead. + if (msg_->flags () & msg_t::label) { more_out = true; @@ -158,7 +159,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) } // Check whether this is the last part of the message. - more_out = msg_->flags () & msg_t::more; + more_out = msg_->flags () & (msg_t::more | msg_t::label); // Push the message into the pipe. If there's no out pipe, just drop it. if (current_out) { @@ -187,7 +188,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) if (prefetched) { int rc = msg_->move (prefetched_msg); errno_assert (rc == 0); - more_in = msg_->flags () & msg_t::more; + more_in = msg_->flags () & (msg_t::more | msg_t::label); prefetched = false; return 0; } @@ -201,7 +202,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) zmq_assert (inpipes [current_in].active); bool fetched = inpipes [current_in].pipe->read (msg_); zmq_assert (fetched); - more_in = msg_->flags () & msg_t::more; + more_in = msg_->flags () & (msg_t::more | msg_t::label); if (!more_in) { current_in++; if (current_in >= inpipes.size ()) @@ -223,7 +224,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) errno_assert (rc == 0); memcpy (msg_->data (), inpipes [current_in].identity.data (), msg_->size ()); - msg_->set_flags (msg_t::more); + msg_->set_flags (msg_t::label); return 0; } |