From b19ee99bb1a2b19a6bf78c7fa2159a85aa608a10 Mon Sep 17 00:00:00 2001 From: Dhammika Pathirana Date: Fri, 17 Dec 2010 15:30:56 +0100 Subject: fix race condition in session init Signed-off-by: Dhammika Pathirana --- src/zmq_engine.cpp | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) (limited to 'src/zmq_engine.cpp') diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 0c1070d..c9a1d9f 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -40,6 +40,7 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : outsize (0), encoder (out_batch_size), inout (NULL), + ephemeral_inout (NULL), options (options_), plugged (false) { @@ -57,8 +58,9 @@ void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_) { zmq_assert (!plugged); plugged = true; + ephemeral_inout = NULL; - // Conncet to session/init object. + // Connect to session/init object. zmq_assert (!inout); zmq_assert (inout_); encoder.set_inout (inout_); @@ -89,6 +91,7 @@ void zmq::zmq_engine_t::unplug () // Disconnect from init/session object. encoder.set_inout (NULL); decoder.set_inout (NULL); + ephemeral_inout = inout; inout = NULL; } @@ -139,7 +142,13 @@ void zmq::zmq_engine_t::in_event () } // Flush all messages the decoder may have produced. - inout->flush (); + // If IO handler has unplugged engine, flush transient IO handler. + if (unlikely (!plugged)) { + zmq_assert (ephemeral_inout); + ephemeral_inout->flush (); + } else { + inout->flush (); + } if (disconnection) error (); @@ -152,7 +161,14 @@ void zmq::zmq_engine_t::out_event () outpos = NULL; encoder.get_data (&outpos, &outsize); - + + // If IO handler has unplugged engine, flush transient IO handler. + if (unlikely (!plugged)) { + zmq_assert (ephemeral_inout); + ephemeral_inout->flush (); + return; + } + // If there is no data to send, stop polling for output. if (outsize == 0) { reset_pollout (handle); -- cgit v1.2.3