From ec81f8fb2523e1e2fe45eaadc05311a35bf551d7 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 22 Jun 2011 11:02:16 +0200 Subject: New wire format for REQ/REP pattern This patch introduces two changes: 1. 32-bit ID is used to identify the peer instead of UUID 2. REQ socket seeds the label stack with unique 32-bit request ID It also drops any replies with non-matching request ID Signed-off-by: Martin Sustrik --- src/rep.cpp | 52 +++++++++++++++------------------------------------- 1 file changed, 15 insertions(+), 37 deletions(-) (limited to 'src/rep.cpp') diff --git a/src/rep.cpp b/src/rep.cpp index b987d9c..a5d1462 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -64,54 +64,32 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_) return -1; } + // First thing to do when receiving a request is to copy all the labels + // to the reply pipe. if (request_begins) { - - // Copy the backtrace stack to the reply pipe. while (true) { - - // TODO: If request can be read but reply pipe is not - // ready for writing, we should drop the reply. - - // Get next part of the backtrace stack. int rc = xrep_t::xrecv (msg_, flags_); if (rc != 0) return rc; + if (!(msg_->flags () & msg_t::label)) + break; - if (msg_->flags () & (msg_t::more | msg_t::label)) { - - // Empty message part delimits the traceback stack. - bool bottom = (msg_->size () == 0); - - // Push it to the reply pipe. - rc = xrep_t::xsend (msg_, flags_); - zmq_assert (rc == 0); - - // The end of the traceback, move to processing message body. - if (bottom) - break; - } - else { - - // If the traceback stack is malformed, discard anything - // already sent to pipe (we're at end of invalid message) - // and continue reading -- that'll switch us to the next pipe - // and next request. - rc = xrep_t::rollback (); - zmq_assert (rc == 0); - } + // TODO: If the reply cannot be sent to the peer because + // od congestion, we should drop it. + rc = xrep_t::xsend (msg_, flags_); + zmq_assert (rc == 0); } - request_begins = false; } - - // Now the routing info is safely stored. Get the first part - // of the message payload and exit. - int rc = xrep_t::xrecv (msg_, flags_); - if (rc != 0) - return rc; + else { + int rc = xrep_t::xrecv (msg_, flags_); + if (rc != 0) + return rc; + } + zmq_assert (!(msg_->flags () & msg_t::label)); // If whole request is read, flip the FSM to reply-sending state. - if (!(msg_->flags () & (msg_t::more | msg_t::label))) { + if (!(msg_->flags () & msg_t::more)) { sending_reply = true; request_begins = true; } -- cgit v1.2.3