summaryrefslogtreecommitdiff
path: root/src/rep.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-04-27 17:36:00 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-04-27 17:36:00 +0200
commitad6fa9d0d4f1cf29ce63998d7efe337b1a784ef6 (patch)
tree3d938f5e634b8bf140d3717565623517be0a97bf /src/rep.cpp
parent1ad6ade0ed465030716ce720077f3aa31e6cd136 (diff)
initial version of multi-hop REQ/REP
Diffstat (limited to 'src/rep.cpp')
-rw-r--r--src/rep.cpp97
1 files changed, 65 insertions, 32 deletions
diff --git a/src/rep.cpp b/src/rep.cpp
index f77dfce..6711509 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -167,15 +167,15 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
if (reply_pipe) {
- // Push message to the reply pipe.
+ // Push message to the reply pipe.
bool written = reply_pipe->write (msg_);
zmq_assert (!more || written);
- // The pipe is full...
- // When this happens, we simply return an error.
- // This makes REP sockets vulnerable to DoS attack when
- // misbehaving requesters stop collecting replies.
- // TODO: Tear down the underlying connection (?)
+ // The pipe is full...
+ // When this happens, we simply return an error.
+ // This makes REP sockets vulnerable to DoS attack when
+ // misbehaving requesters stop collecting replies.
+ // TODO: Tear down the underlying connection (?)
if (!written) {
errno = EAGAIN;
return -1;
@@ -185,12 +185,12 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
}
else {
- // If the requester have disconnected in the meantime, drop the reply.
+ // If the requester have disconnected in the meantime, drop the reply.
more = msg_->flags & ZMQ_MSG_MORE;
zmq_msg_close (msg_);
}
- // Flush the reply to the requester.
+ // Flush the reply to the requester.
if (!more) {
if (reply_pipe)
reply_pipe->flush ();
@@ -198,7 +198,7 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
reply_pipe = NULL;
}
- // Detach the message from the data buffer.
+ // Detach the message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
@@ -207,37 +207,70 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
{
- // Deallocate old content of the message.
- zmq_msg_close (msg_);
-
+ // If we are in middle of sending a reply, we cannot receive next request.
if (sending_reply) {
errno = EFSM;
return -1;
}
- // Round-robin over the pipes to get next message.
- for (int count = active; count != 0; count--) {
- bool fetched = in_pipes [current]->read (msg_);
- zmq_assert (!(more && !fetched));
-
- if (fetched) {
- more = msg_->flags & ZMQ_MSG_MORE;
- if (!more) {
- reply_pipe = out_pipes [current];
- sending_reply = true;
- current++;
- if (current >= active)
- current = 0;
- }
- return 0;
+ // Deallocate old content of the message.
+ zmq_msg_close (msg_);
+
+ // We haven't started reading a request yet...
+ if (!more) {
+
+ // Round-robin over the pipes to get next message.
+ int count;
+ for (count = active; count != 0; count--) {
+ if (in_pipes [current]->read (msg_))
+ break;
+ current++;
+ if (current >= active)
+ current = 0;
+ }
+
+ // No message is available. Initialise the output parameter
+ // to be a 0-byte message.
+ if (count == 0) {
+ zmq_msg_init (msg_);
+ errno = EAGAIN;
+ return -1;
+ }
+
+ // We are aware of a new message now. Setup the reply pipe.
+ reply_pipe = out_pipes [current];
+
+ // Copy the routing info to the reply pipe.
+ while (true) {
+
+ // Push message to the reply pipe.
+ // TODO: What if the pipe is full?
+ // Tear down the underlying connection?
+ bool written = reply_pipe->write (msg_);
+ zmq_assert (written);
+
+ // Message part of zero size delimits the traceback stack.
+ if (zmq_msg_size (msg_) == 0)
+ break;
+
+ // Get next part of the message.
+ bool fetched = in_pipes [current]->read (msg_);
+ zmq_assert (fetched);
}
}
- // No message is available. Initialise the output parameter
- // to be a 0-byte message.
- zmq_msg_init (msg_);
- errno = EAGAIN;
- return -1;
+ // Now the routing info is processed. Get the first part
+ // of the message payload and exit.
+ bool fetched = in_pipes [current]->read (msg_);
+ zmq_assert (fetched);
+ more = msg_->flags & ZMQ_MSG_MORE;
+ if (!more) {
+ current++;
+ if (current >= active)
+ current = 0;
+ sending_reply = true;
+ }
+ return 0;
}
bool zmq::rep_t::xhas_in ()