From ad6fa9d0d4f1cf29ce63998d7efe337b1a784ef6 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 27 Apr 2010 17:36:00 +0200 Subject: initial version of multi-hop REQ/REP --- src/rep.cpp | 97 +++++++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 65 insertions(+), 32 deletions(-) (limited to 'src/rep.cpp') diff --git a/src/rep.cpp b/src/rep.cpp index f77dfce..6711509 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -167,15 +167,15 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) if (reply_pipe) { - // Push message to the reply pipe. + // Push message to the reply pipe. bool written = reply_pipe->write (msg_); zmq_assert (!more || written); - // The pipe is full... - // When this happens, we simply return an error. - // This makes REP sockets vulnerable to DoS attack when - // misbehaving requesters stop collecting replies. - // TODO: Tear down the underlying connection (?) + // The pipe is full... + // When this happens, we simply return an error. + // This makes REP sockets vulnerable to DoS attack when + // misbehaving requesters stop collecting replies. + // TODO: Tear down the underlying connection (?) if (!written) { errno = EAGAIN; return -1; @@ -185,12 +185,12 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) } else { - // If the requester have disconnected in the meantime, drop the reply. + // If the requester have disconnected in the meantime, drop the reply. more = msg_->flags & ZMQ_MSG_MORE; zmq_msg_close (msg_); } - // Flush the reply to the requester. + // Flush the reply to the requester. if (!more) { if (reply_pipe) reply_pipe->flush (); @@ -198,7 +198,7 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) reply_pipe = NULL; } - // Detach the message from the data buffer. + // Detach the message from the data buffer. int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); @@ -207,37 +207,70 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) { - // Deallocate old content of the message. - zmq_msg_close (msg_); - + // If we are in middle of sending a reply, we cannot receive next request. if (sending_reply) { errno = EFSM; return -1; } - // Round-robin over the pipes to get next message. - for (int count = active; count != 0; count--) { - bool fetched = in_pipes [current]->read (msg_); - zmq_assert (!(more && !fetched)); - - if (fetched) { - more = msg_->flags & ZMQ_MSG_MORE; - if (!more) { - reply_pipe = out_pipes [current]; - sending_reply = true; - current++; - if (current >= active) - current = 0; - } - return 0; + // Deallocate old content of the message. + zmq_msg_close (msg_); + + // We haven't started reading a request yet... + if (!more) { + + // Round-robin over the pipes to get next message. + int count; + for (count = active; count != 0; count--) { + if (in_pipes [current]->read (msg_)) + break; + current++; + if (current >= active) + current = 0; + } + + // No message is available. Initialise the output parameter + // to be a 0-byte message. + if (count == 0) { + zmq_msg_init (msg_); + errno = EAGAIN; + return -1; + } + + // We are aware of a new message now. Setup the reply pipe. + reply_pipe = out_pipes [current]; + + // Copy the routing info to the reply pipe. + while (true) { + + // Push message to the reply pipe. + // TODO: What if the pipe is full? + // Tear down the underlying connection? + bool written = reply_pipe->write (msg_); + zmq_assert (written); + + // Message part of zero size delimits the traceback stack. + if (zmq_msg_size (msg_) == 0) + break; + + // Get next part of the message. + bool fetched = in_pipes [current]->read (msg_); + zmq_assert (fetched); } } - // No message is available. Initialise the output parameter - // to be a 0-byte message. - zmq_msg_init (msg_); - errno = EAGAIN; - return -1; + // Now the routing info is processed. Get the first part + // of the message payload and exit. + bool fetched = in_pipes [current]->read (msg_); + zmq_assert (fetched); + more = msg_->flags & ZMQ_MSG_MORE; + if (!more) { + current++; + if (current >= active) + current = 0; + sending_reply = true; + } + return 0; } bool zmq::rep_t::xhas_in () -- cgit v1.2.3