summaryrefslogtreecommitdiff
path: root/src/xrep.cpp
diff options
context:
space:
mode:
authorMartin Lucina <martin@lucina.net>2012-01-23 08:53:25 +0100
committerMartin Lucina <martin@lucina.net>2012-01-23 08:53:25 +0100
commit5ba1cb20fe6f6699cef1cc726718e760cd4c9af1 (patch)
treedf7b144c5325fd8b3c88c49b456fafc24249abe6 /src/xrep.cpp
parenta15854bd92db69fcd0b4444fe1b8fe3610a7acf6 (diff)
Imported Upstream version 2.0.9.dfsgupstream/2.0.9.dfsg
Diffstat (limited to 'src/xrep.cpp')
-rw-r--r--src/xrep.cpp29
1 files changed, 18 insertions, 11 deletions
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 4e8d18a..5fd6cbb 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -26,6 +26,7 @@
zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
socket_base_t (parent_),
current_in (0),
+ prefetched (false),
more_in (false),
current_out (NULL),
more_out (false)
@@ -142,8 +143,11 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
if (!more_out) {
zmq_assert (!current_out);
- // There's no such thing as prefix with no subsequent message.
- zmq_assert (msg_->flags & ZMQ_MSG_MORE);
+ // If we have malformed message (prefix with no subsequent message)
+ // then just silently drop the message.
+ if ((msg_->flags & ZMQ_MSG_MORE) == 0)
+ return 0;
+
more_out = true;
// Find the pipe associated with the identity stored in the prefix.
@@ -153,7 +157,7 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
outpipes_t::iterator it = outpipes.find (identity);
if (it == outpipes.end ())
return 0;
-
+
// Remember the outgoing pipe.
current_out = it->second.writer;
@@ -189,6 +193,13 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
// Deallocate old content of the message.
zmq_msg_close (msg_);
+ if (prefetched) {
+ zmq_msg_move (msg_, &prefetched_msg);
+ more_in = msg_->flags & ZMQ_MSG_MORE;
+ prefetched = false;
+ return 0;
+ }
+
// If we are in the middle of reading a message, just grab next part of it.
if (more_in) {
zmq_assert (inpipes [current_in].active);
@@ -207,21 +218,17 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
for (int count = inpipes.size (); count != 0; count--) {
// Try to fetch new message.
- bool fetched;
- if (!inpipes [current_in].active)
- fetched = false;
- else
- fetched = inpipes [current_in].reader->check_read ();
+ if (inpipes [current_in].active)
+ prefetched = inpipes [current_in].reader->read (&prefetched_msg);
// If we have a message, create a prefix and return it to the caller.
- if (fetched) {
+ if (prefetched) {
int rc = zmq_msg_init_size (msg_,
inpipes [current_in].identity.size ());
zmq_assert (rc == 0);
memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (),
zmq_msg_size (msg_));
msg_->flags = ZMQ_MSG_MORE;
- more_in = true;
return 0;
}
@@ -241,7 +248,7 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
bool zmq::xrep_t::xhas_in ()
{
// There are subsequent parts of the partly-read message available.
- if (more_in)
+ if (prefetched || more_in)
return true;
// Note that messing with current doesn't break the fairness of fair