summaryrefslogtreecommitdiff
path: root/src/rep.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-06-22 11:02:16 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-06-22 11:02:16 +0200
commitec81f8fb2523e1e2fe45eaadc05311a35bf551d7 (patch)
treee6fbd9b7a789d72678fa02ca06883de15a932beb /src/rep.cpp
parent10a93bb79fd3d4be1b3ffedfa6785564fbcc082b (diff)
New wire format for REQ/REP pattern
This patch introduces two changes: 1. 32-bit ID is used to identify the peer instead of UUID 2. REQ socket seeds the label stack with unique 32-bit request ID It also drops any replies with non-matching request ID Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/rep.cpp')
-rw-r--r--src/rep.cpp52
1 files changed, 15 insertions, 37 deletions
diff --git a/src/rep.cpp b/src/rep.cpp
index b987d9c..a5d1462 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -64,54 +64,32 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
return -1;
}
+ // First thing to do when receiving a request is to copy all the labels
+ // to the reply pipe.
if (request_begins) {
-
- // Copy the backtrace stack to the reply pipe.
while (true) {
-
- // TODO: If request can be read but reply pipe is not
- // ready for writing, we should drop the reply.
-
- // Get next part of the backtrace stack.
int rc = xrep_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
+ if (!(msg_->flags () & msg_t::label))
+ break;
- if (msg_->flags () & (msg_t::more | msg_t::label)) {
-
- // Empty message part delimits the traceback stack.
- bool bottom = (msg_->size () == 0);
-
- // Push it to the reply pipe.
- rc = xrep_t::xsend (msg_, flags_);
- zmq_assert (rc == 0);
-
- // The end of the traceback, move to processing message body.
- if (bottom)
- break;
- }
- else {
-
- // If the traceback stack is malformed, discard anything
- // already sent to pipe (we're at end of invalid message)
- // and continue reading -- that'll switch us to the next pipe
- // and next request.
- rc = xrep_t::rollback ();
- zmq_assert (rc == 0);
- }
+ // TODO: If the reply cannot be sent to the peer because
+ // od congestion, we should drop it.
+ rc = xrep_t::xsend (msg_, flags_);
+ zmq_assert (rc == 0);
}
-
request_begins = false;
}
-
- // Now the routing info is safely stored. Get the first part
- // of the message payload and exit.
- int rc = xrep_t::xrecv (msg_, flags_);
- if (rc != 0)
- return rc;
+ else {
+ int rc = xrep_t::xrecv (msg_, flags_);
+ if (rc != 0)
+ return rc;
+ }
+ zmq_assert (!(msg_->flags () & msg_t::label));
// If whole request is read, flip the FSM to reply-sending state.
- if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
+ if (!(msg_->flags () & msg_t::more)) {
sending_reply = true;
request_begins = true;
}