diff options
Diffstat (limited to 'src/session.cpp')
| -rw-r--r-- | src/session.cpp | 32 | 
1 files changed, 26 insertions, 6 deletions
diff --git a/src/session.cpp b/src/session.cpp index b99a370..9af03c8 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -28,6 +28,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,        const options_t &options_) :      owned_t (parent_, owner_),      in_pipe (NULL), +    incomplete_in (false),      active (true),      out_pipe (NULL),      engine (NULL), @@ -72,7 +73,11 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)      if (!in_pipe || !active)          return false; -    return in_pipe->read (msg_); +    if (!in_pipe->read (msg_)) +        return false; + +    incomplete_in = msg_->flags & ZMQ_MSG_TBC; +    return true;  }  bool zmq::session_t::write (::zmq_msg_t *msg_) @@ -102,6 +107,26 @@ void zmq::session_t::detach (owned_t *reconnecter_)      //  Engine is terminating itself. No need to deallocate it from here.      engine = NULL; +    //  Get rid of half-processed messages in the out pipe. Flush any +    //  unflushed messages upstream. +    if (out_pipe) { +        out_pipe->rollback (); +        out_pipe->flush (); +    } + +    //  Remove any half-read message from the in pipe. +    if (in_pipe) { +        while (incomplete_in) { +            zmq_msg_t msg; +            zmq_msg_init (&msg); +            if (!read (&msg)) { +                zmq_assert (!incomplete_in); +                break; +            } +            zmq_msg_close (&msg); +        } +    } +          //  Terminate transient session.      if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0))          term (); @@ -264,9 +289,4 @@ void zmq::session_t::process_attach (i_engine *engine_,      zmq_assert (engine_);      engine = engine_;      engine->plug (this); - -    //  Once the initial handshaking is over tracerouting should trim prefixes -    //  from outbound messages. -    if (options.traceroute) -        engine->trim_prefix ();  }  | 
