From da1ef4d2685c02e8320847d49a38726b2b6d52f0 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Sun, 8 May 2011 09:02:47 +0200 Subject: Fixed REP assert on missing envelope Signed-off-by: Pieter Hintjens --- src/rep.cpp | 34 ++++++++++++++++++++++++---------- src/xrep.cpp | 13 ++++++++++++- src/xrep.hpp | 5 +++++ 3 files changed, 41 insertions(+), 11 deletions(-) (limited to 'src') diff --git a/src/rep.cpp b/src/rep.cpp index ef0defc..8878bcd 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -67,24 +67,38 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_) if (request_begins) { // Copy the backtrace stack to the reply pipe. - bool bottom = false; - while (!bottom) { + while (true) { - // TODO: What if request can be read but reply pipe is not - // ready for writing? + // TODO: If request can be read but reply pipe is not + // ready for writing, we should drop the reply. // Get next part of the backtrace stack. int rc = xrep_t::xrecv (msg_, flags_); if (rc != 0) return rc; - zmq_assert (msg_->flags () & msg_t::more); - // Empty message part delimits the traceback stack. - bottom = (msg_->size () == 0); + if (msg_->flags () & msg_t::more) { - // Push it to the reply pipe. - rc = xrep_t::xsend (msg_, flags_); - zmq_assert (rc == 0); + // Empty message part delimits the traceback stack. + bool bottom = (msg_->size () == 0); + + // Push it to the reply pipe. + rc = xrep_t::xsend (msg_, flags_); + zmq_assert (rc == 0); + + // The end of the traceback, move to processing message body. + if (bottom) + break; + } + else { + + // If the traceback stack is malformed, discard anything + // already sent to pipe (we're at end of invalid message) + // and continue reading -- that'll switch us to the next pipe + // and next request. + rc = xrep_t::rollback (); + zmq_assert (rc == 0); + } } request_begins = false; diff --git a/src/xrep.cpp b/src/xrep.cpp index 77481c3..f662aaf 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -61,7 +61,7 @@ void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, if (terminating) { register_term_acks (1); - outpipe_->terminate (); + outpipe_->terminate (); } } @@ -289,6 +289,17 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) return -1; } +int zmq::xrep_t::rollback (void) +{ + if (current_out) { + current_out->rollback (); + current_out = NULL; + more_out = false; + } + return 0; +} + + bool zmq::xrep_t::xhas_in () { // There are subsequent parts of the partly-read message available. diff --git a/src/xrep.hpp b/src/xrep.hpp index 1c45655..7ca138c 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -51,6 +51,11 @@ namespace zmq bool xhas_in (); bool xhas_out (); + protected: + + // Rollback any message parts that were sent but not yet flushed. + int rollback (); + private: // Hook into the termination process. -- cgit v1.2.3