diff options
author | Martin Lucina <mato@kotelna.sk> | 2010-09-08 15:25:45 +0200 |
---|---|---|
committer | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:27 +0100 |
commit | 90d73cba9cd1d1724f38ed82fc0eefb1781c9c20 (patch) | |
tree | 1760872164a93384d1adb90db9c8d41777dbb2a7 /src/xrep.cpp | |
parent | cf026feae205bfeb7e007f6afd0e8d7b283865c8 (diff) | |
parent | 5ba1cb20fe6f6699cef1cc726718e760cd4c9af1 (diff) |
Imported Debian patch 2.0.9.dfsg-1debian/2.0.9.dfsg-1
Diffstat (limited to 'src/xrep.cpp')
-rw-r--r-- | src/xrep.cpp | 29 |
1 files changed, 18 insertions, 11 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp index 4e8d18a..5fd6cbb 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -26,6 +26,7 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) : socket_base_t (parent_), current_in (0), + prefetched (false), more_in (false), current_out (NULL), more_out (false) @@ -142,8 +143,11 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) if (!more_out) { zmq_assert (!current_out); - // There's no such thing as prefix with no subsequent message. - zmq_assert (msg_->flags & ZMQ_MSG_MORE); + // If we have malformed message (prefix with no subsequent message) + // then just silently drop the message. + if ((msg_->flags & ZMQ_MSG_MORE) == 0) + return 0; + more_out = true; // Find the pipe associated with the identity stored in the prefix. @@ -153,7 +157,7 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) outpipes_t::iterator it = outpipes.find (identity); if (it == outpipes.end ()) return 0; - + // Remember the outgoing pipe. current_out = it->second.writer; @@ -189,6 +193,13 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) // Deallocate old content of the message. zmq_msg_close (msg_); + if (prefetched) { + zmq_msg_move (msg_, &prefetched_msg); + more_in = msg_->flags & ZMQ_MSG_MORE; + prefetched = false; + return 0; + } + // If we are in the middle of reading a message, just grab next part of it. if (more_in) { zmq_assert (inpipes [current_in].active); @@ -207,21 +218,17 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) for (int count = inpipes.size (); count != 0; count--) { // Try to fetch new message. - bool fetched; - if (!inpipes [current_in].active) - fetched = false; - else - fetched = inpipes [current_in].reader->check_read (); + if (inpipes [current_in].active) + prefetched = inpipes [current_in].reader->read (&prefetched_msg); // If we have a message, create a prefix and return it to the caller. - if (fetched) { + if (prefetched) { int rc = zmq_msg_init_size (msg_, inpipes [current_in].identity.size ()); zmq_assert (rc == 0); memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (), zmq_msg_size (msg_)); msg_->flags = ZMQ_MSG_MORE; - more_in = true; return 0; } @@ -241,7 +248,7 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) bool zmq::xrep_t::xhas_in () { // There are subsequent parts of the partly-read message available. - if (more_in) + if (prefetched || more_in) return true; // Note that messing with current doesn't break the fairness of fair |