From 06538fc11790a0cf895c43d137a33febf97f3a28 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 27 Mar 2010 14:24:57 +0100 Subject: multi-part messages work with REQ/REP sockets --- src/rep.cpp | 71 ++++++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 47 insertions(+), 24 deletions(-) (limited to 'src/rep.cpp') diff --git a/src/rep.cpp b/src/rep.cpp index eaeff41..881f39a 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -27,7 +27,8 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) : socket_base_t (parent_), active (0), current (0), - waiting_for_reply (false), + sending_reply (false), + tbc (false), reply_pipe (NULL) { options.requires_in = true; @@ -58,6 +59,8 @@ void zmq::rep_t::xattach_pipes (class reader_t *inpipe_, void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_) { + zmq_assert (sending_reply || !tbc || in_pipes [current] != pipe_); + zmq_assert (pipe_); zmq_assert (in_pipes.size () == out_pipes.size ()); @@ -90,6 +93,8 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_) void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_) { + zmq_assert (!sending_reply || !tbc || reply_pipe != pipe_); + zmq_assert (pipe_); zmq_assert (in_pipes.size () == out_pipes.size ()); @@ -98,7 +103,7 @@ void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_) // If the connection we've got the request from disconnects, // there's nowhere to send the reply. Forget about the reply pipe. // Once the reply is sent it will be dropped. - if (waiting_for_reply && pipe_ == reply_pipe) + if (sending_reply && pipe_ == reply_pipe) reply_pipe = NULL; // If corresponding inpipe is still in place simply nullify the pointer @@ -157,29 +162,36 @@ int zmq::rep_t::xsetsockopt (int option_, const void *optval_, int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) { - if (!waiting_for_reply) { + if (!sending_reply) { errno = EFSM; return -1; } - // Push message to the selected pipe. If requester have disconnected - // in the meantime, drop the reply. + // Check whether it's last part of the reply. + tbc = msg_->flags & ZMQ_MSG_TBC; + if (reply_pipe) { + + // Push message to the reply pipe. bool written = reply_pipe->write (msg_); - if (written) - reply_pipe->flush (); - else - // The pipe is full; just drop the reference to - // the message content. - // TODO: Tear down the underlying connection. - zmq_msg_close (msg_); + zmq_assert (!tbc || written); + + // The pipe is full... + // TODO: Tear down the underlying connection (?) + zmq_assert (written); } else { + + // If the requester have disconnected in the meantime, drop the reply. zmq_msg_close (msg_); } - waiting_for_reply = false; - reply_pipe = NULL; + // Flush the reply to the requester. + if (!tbc) { + reply_pipe->flush (); + sending_reply = false; + reply_pipe = NULL; + } // Detach the message from the data buffer. int rc = zmq_msg_init (msg_); @@ -193,7 +205,7 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) // Deallocate old content of the message. zmq_msg_close (msg_); - if (waiting_for_reply) { + if (sending_reply) { errno = EFSM; return -1; } @@ -201,15 +213,19 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) // Round-robin over the pipes to get next message. for (int count = active; count != 0; count--) { bool fetched = in_pipes [current]->read (msg_); + zmq_assert (!(tbc && !fetched)); + if (fetched) { - reply_pipe = out_pipes [current]; - waiting_for_reply = true; - } - current++; - if (current >= active) - current = 0; - if (fetched) + tbc = msg_->flags & ZMQ_MSG_TBC; + if (!tbc) { + reply_pipe = out_pipes [current]; + sending_reply = true; + current++; + if (current >= active) + current = 0; + } return 0; + } } // No message is available. Initialise the output parameter @@ -221,9 +237,12 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) bool zmq::rep_t::xhas_in () { + if (!sending_reply && tbc) + return true; + for (int count = active; count != 0; count--) { if (in_pipes [current]->check_read ()) - return !waiting_for_reply; + return !sending_reply; current++; if (current >= active) current = 0; @@ -234,6 +253,10 @@ bool zmq::rep_t::xhas_in () bool zmq::rep_t::xhas_out () { - return waiting_for_reply; + if (sending_reply && tbc) + return true; + + // TODO: No check for write here... + return sending_reply; } -- cgit v1.2.3