From ad6fa9d0d4f1cf29ce63998d7efe337b1a784ef6 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 27 Apr 2010 17:36:00 +0200 Subject: initial version of multi-hop REQ/REP --- src/req.cpp | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) (limited to 'src/req.cpp') diff --git a/src/req.cpp b/src/req.cpp index c8b7b98..969755b 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -190,7 +190,17 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) return -1; } - // Push message to the selected pipe. + // If we are starting to send the request, generate a prefix. + if (!more) { + zmq_msg_t prefix; + int rc = zmq_msg_init (&prefix); + zmq_assert (rc == 0); + prefix.flags |= ZMQ_MSG_MORE; + bool written = out_pipes [current]->write (&prefix); + zmq_assert (written); + } + + // Push the message to the selected pipe. bool written = out_pipes [current]->write (msg_); zmq_assert (written); more = msg_->flags & ZMQ_MSG_MORE; @@ -218,7 +228,8 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_) { // Deallocate old content of the message. - zmq_msg_close (msg_); + int rc = zmq_msg_close (msg_); + zmq_assert (rc == 0); // If request wasn't send, we can't wait for reply. if (!receiving_reply) { @@ -234,6 +245,19 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_) return -1; } + // If we are starting to receive new reply, check whether prefix + // is well-formed and drop it. + if (!more) { + zmq_assert (msg_->flags & ZMQ_MSG_MORE); + zmq_assert (zmq_msg_size (msg_) == 0); + rc = zmq_msg_close (msg_); + zmq_assert (rc == 0); + } + + // Get the actual reply. + bool recvd = reply_pipe->read (msg_); + zmq_assert (recvd); + // If this was last part of the reply, switch to request phase. more = msg_->flags & ZMQ_MSG_MORE; if (!more) { -- cgit v1.2.3