summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-03-20 15:04:30 +0100
committerMartin Sustrik <sustrik@250bpm.com>2010-03-20 15:04:30 +0100
commitf031677100b41347e09932fc973040097a2187e4 (patch)
tree246771e78ad6b486971187a9ec193ec8fb3110b8 /src
parentdfdaff5eba1e6980adb3326c119d2070d0ad42bb (diff)
rollback of half-processed messages in case of disconnection
Diffstat (limited to 'src')
-rw-r--r--src/session.cpp27
-rw-r--r--src/session.hpp4
2 files changed, 30 insertions, 1 deletions
diff --git a/src/session.cpp b/src/session.cpp
index e54afea..9af03c8 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -28,6 +28,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
const options_t &options_) :
owned_t (parent_, owner_),
in_pipe (NULL),
+ incomplete_in (false),
active (true),
out_pipe (NULL),
engine (NULL),
@@ -72,7 +73,11 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
if (!in_pipe || !active)
return false;
- return in_pipe->read (msg_);
+ if (!in_pipe->read (msg_))
+ return false;
+
+ incomplete_in = msg_->flags & ZMQ_MSG_TBC;
+ return true;
}
bool zmq::session_t::write (::zmq_msg_t *msg_)
@@ -102,6 +107,26 @@ void zmq::session_t::detach (owned_t *reconnecter_)
// Engine is terminating itself. No need to deallocate it from here.
engine = NULL;
+ // Get rid of half-processed messages in the out pipe. Flush any
+ // unflushed messages upstream.
+ if (out_pipe) {
+ out_pipe->rollback ();
+ out_pipe->flush ();
+ }
+
+ // Remove any half-read message from the in pipe.
+ if (in_pipe) {
+ while (incomplete_in) {
+ zmq_msg_t msg;
+ zmq_msg_init (&msg);
+ if (!read (&msg)) {
+ zmq_assert (!incomplete_in);
+ break;
+ }
+ zmq_msg_close (&msg);
+ }
+ }
+
// Terminate transient session.
if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0))
term ();
diff --git a/src/session.hpp b/src/session.hpp
index 25a0d12..9bda1ad 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -72,6 +72,10 @@ namespace zmq
// Inbound pipe, i.e. one the session is getting messages from.
class reader_t *in_pipe;
+ // This flag is true if the remainder of the message being processed
+ // is still in the in pipe.
+ bool incomplete_in;
+
// If true, in_pipe is active. Otherwise there are no messages to get.
bool active;