summaryrefslogtreecommitdiff
path: root/src/req.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-04-27 17:36:00 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-04-27 17:36:00 +0200
commitad6fa9d0d4f1cf29ce63998d7efe337b1a784ef6 (patch)
tree3d938f5e634b8bf140d3717565623517be0a97bf /src/req.cpp
parent1ad6ade0ed465030716ce720077f3aa31e6cd136 (diff)
initial version of multi-hop REQ/REP
Diffstat (limited to 'src/req.cpp')
-rw-r--r--src/req.cpp28
1 files changed, 26 insertions, 2 deletions
diff --git a/src/req.cpp b/src/req.cpp
index c8b7b98..969755b 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -190,7 +190,17 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
return -1;
}
- // Push message to the selected pipe.
+ // If we are starting to send the request, generate a prefix.
+ if (!more) {
+ zmq_msg_t prefix;
+ int rc = zmq_msg_init (&prefix);
+ zmq_assert (rc == 0);
+ prefix.flags |= ZMQ_MSG_MORE;
+ bool written = out_pipes [current]->write (&prefix);
+ zmq_assert (written);
+ }
+
+ // Push the message to the selected pipe.
bool written = out_pipes [current]->write (msg_);
zmq_assert (written);
more = msg_->flags & ZMQ_MSG_MORE;
@@ -218,7 +228,8 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
{
// Deallocate old content of the message.
- zmq_msg_close (msg_);
+ int rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
// If request wasn't send, we can't wait for reply.
if (!receiving_reply) {
@@ -234,6 +245,19 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
return -1;
}
+ // If we are starting to receive new reply, check whether prefix
+ // is well-formed and drop it.
+ if (!more) {
+ zmq_assert (msg_->flags & ZMQ_MSG_MORE);
+ zmq_assert (zmq_msg_size (msg_) == 0);
+ rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
+ }
+
+ // Get the actual reply.
+ bool recvd = reply_pipe->read (msg_);
+ zmq_assert (recvd);
+
// If this was last part of the reply, switch to request phase.
more = msg_->flags & ZMQ_MSG_MORE;
if (!more) {