From f031677100b41347e09932fc973040097a2187e4 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 20 Mar 2010 15:04:30 +0100 Subject: rollback of half-processed messages in case of disconnection --- src/session.cpp | 27 ++++++++++++++++++++++++++- src/session.hpp | 4 ++++ 2 files changed, 30 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/session.cpp b/src/session.cpp index e54afea..9af03c8 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -28,6 +28,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, const options_t &options_) : owned_t (parent_, owner_), in_pipe (NULL), + incomplete_in (false), active (true), out_pipe (NULL), engine (NULL), @@ -72,7 +73,11 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) if (!in_pipe || !active) return false; - return in_pipe->read (msg_); + if (!in_pipe->read (msg_)) + return false; + + incomplete_in = msg_->flags & ZMQ_MSG_TBC; + return true; } bool zmq::session_t::write (::zmq_msg_t *msg_) @@ -102,6 +107,26 @@ void zmq::session_t::detach (owned_t *reconnecter_) // Engine is terminating itself. No need to deallocate it from here. engine = NULL; + // Get rid of half-processed messages in the out pipe. Flush any + // unflushed messages upstream. + if (out_pipe) { + out_pipe->rollback (); + out_pipe->flush (); + } + + // Remove any half-read message from the in pipe. + if (in_pipe) { + while (incomplete_in) { + zmq_msg_t msg; + zmq_msg_init (&msg); + if (!read (&msg)) { + zmq_assert (!incomplete_in); + break; + } + zmq_msg_close (&msg); + } + } + // Terminate transient session. if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0)) term (); diff --git a/src/session.hpp b/src/session.hpp index 25a0d12..9bda1ad 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -72,6 +72,10 @@ namespace zmq // Inbound pipe, i.e. one the session is getting messages from. class reader_t *in_pipe; + // This flag is true if the remainder of the message being processed + // is still in the in pipe. + bool incomplete_in; + // If true, in_pipe is active. Otherwise there are no messages to get. bool active; -- cgit v1.2.3