From 5ba1cb20fe6f6699cef1cc726718e760cd4c9af1 Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Mon, 23 Jan 2012 08:53:25 +0100 Subject: Imported Upstream version 2.0.9.dfsg --- src/xrep.cpp | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) (limited to 'src/xrep.cpp') diff --git a/src/xrep.cpp b/src/xrep.cpp index 4e8d18a..5fd6cbb 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -26,6 +26,7 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) : socket_base_t (parent_), current_in (0), + prefetched (false), more_in (false), current_out (NULL), more_out (false) @@ -142,8 +143,11 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) if (!more_out) { zmq_assert (!current_out); - // There's no such thing as prefix with no subsequent message. - zmq_assert (msg_->flags & ZMQ_MSG_MORE); + // If we have malformed message (prefix with no subsequent message) + // then just silently drop the message. + if ((msg_->flags & ZMQ_MSG_MORE) == 0) + return 0; + more_out = true; // Find the pipe associated with the identity stored in the prefix. @@ -153,7 +157,7 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) outpipes_t::iterator it = outpipes.find (identity); if (it == outpipes.end ()) return 0; - + // Remember the outgoing pipe. current_out = it->second.writer; @@ -189,6 +193,13 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) // Deallocate old content of the message. zmq_msg_close (msg_); + if (prefetched) { + zmq_msg_move (msg_, &prefetched_msg); + more_in = msg_->flags & ZMQ_MSG_MORE; + prefetched = false; + return 0; + } + // If we are in the middle of reading a message, just grab next part of it. if (more_in) { zmq_assert (inpipes [current_in].active); @@ -207,21 +218,17 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) for (int count = inpipes.size (); count != 0; count--) { // Try to fetch new message. - bool fetched; - if (!inpipes [current_in].active) - fetched = false; - else - fetched = inpipes [current_in].reader->check_read (); + if (inpipes [current_in].active) + prefetched = inpipes [current_in].reader->read (&prefetched_msg); // If we have a message, create a prefix and return it to the caller. - if (fetched) { + if (prefetched) { int rc = zmq_msg_init_size (msg_, inpipes [current_in].identity.size ()); zmq_assert (rc == 0); memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (), zmq_msg_size (msg_)); msg_->flags = ZMQ_MSG_MORE; - more_in = true; return 0; } @@ -241,7 +248,7 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) bool zmq::xrep_t::xhas_in () { // There are subsequent parts of the partly-read message available. - if (more_in) + if (prefetched || more_in) return true; // Note that messing with current doesn't break the fairness of fair -- cgit v1.2.3