diff options
-rw-r--r-- | src/zmq_engine.cpp | 22 | ||||
-rw-r--r-- | src/zmq_engine.hpp | 3 | ||||
-rw-r--r-- | src/zmq_init.cpp | 44 | ||||
-rw-r--r-- | src/zmq_init.hpp | 4 |
4 files changed, 53 insertions, 20 deletions
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 0c1070d..c9a1d9f 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -40,6 +40,7 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : outsize (0), encoder (out_batch_size), inout (NULL), + ephemeral_inout (NULL), options (options_), plugged (false) { @@ -57,8 +58,9 @@ void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_) { zmq_assert (!plugged); plugged = true; + ephemeral_inout = NULL; - // Conncet to session/init object. + // Connect to session/init object. zmq_assert (!inout); zmq_assert (inout_); encoder.set_inout (inout_); @@ -89,6 +91,7 @@ void zmq::zmq_engine_t::unplug () // Disconnect from init/session object. encoder.set_inout (NULL); decoder.set_inout (NULL); + ephemeral_inout = inout; inout = NULL; } @@ -139,7 +142,13 @@ void zmq::zmq_engine_t::in_event () } // Flush all messages the decoder may have produced. - inout->flush (); + // If IO handler has unplugged engine, flush transient IO handler. + if (unlikely (!plugged)) { + zmq_assert (ephemeral_inout); + ephemeral_inout->flush (); + } else { + inout->flush (); + } if (disconnection) error (); @@ -152,7 +161,14 @@ void zmq::zmq_engine_t::out_event () outpos = NULL; encoder.get_data (&outpos, &outsize); - + + // If IO handler has unplugged engine, flush transient IO handler. + if (unlikely (!plugged)) { + zmq_assert (ephemeral_inout); + ephemeral_inout->flush (); + return; + } + // If there is no data to send, stop polling for output. if (outsize == 0) { reset_pollout (handle); diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index c5f95dc..65bb85b 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -70,6 +70,9 @@ namespace zmq i_inout *inout; + // Detached transient inout handler. + i_inout *ephemeral_inout; + options_t options; bool plugged; diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index a796faa..7fa04fc 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -34,6 +34,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, socket_base_t *socket_, session_t *session_, fd_t fd_, const options_t &options_) : own_t (io_thread_, options_), + ephemeral_engine (NULL), sent (false), received (false), socket (socket_), @@ -64,8 +65,7 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) options.identity.size ()); sent = true; - // If initialisation is done, pass the engine to the session and - // destroy the init object. + // Try finalize initialization. finalise_initialisation (); return true; @@ -92,6 +92,9 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) received = true; + // Try finalize initialization. + finalise_initialisation (); + return true; } @@ -101,9 +104,9 @@ void zmq::zmq_init_t::flush () if (!received) return; - // If initialisation is done, pass the engine to the session and - // destroy the init object. - finalise_initialisation (); + // Initialization is done, dispatch engine. + if (ephemeral_engine) + dispatch_engine (); } void zmq::zmq_init_t::detach () @@ -135,17 +138,30 @@ void zmq::zmq_init_t::process_unplug () void zmq::zmq_init_t::finalise_initialisation () { + // Unplug and prepare to dispatch engine. + if (sent && received) { + ephemeral_engine = engine; + engine = NULL; + ephemeral_engine->unplug (); + return; + } +} + +void zmq::zmq_init_t::dispatch_engine () +{ if (sent && received) { + // Engine must be detached. + zmq_assert (!engine); + zmq_assert (ephemeral_engine); + // If we know what session we belong to, it's easy, just send the // engine to that session and destroy the init object. Note that we // know about the session only if this object is owned by it. Thus, // lifetime of this object in contained in the lifetime of the session // so the pointer cannot become invalid without notice. if (session) { - engine->unplug (); - send_attach (session, engine, peer_identity, true); - engine = NULL; + send_attach (session, ephemeral_engine, peer_identity, true); terminate (); return; } @@ -165,9 +181,7 @@ void zmq::zmq_init_t::finalise_initialisation () zmq_assert (session); session->inc_seqnum (); launch_sibling (session); - engine->unplug (); - send_attach (session, engine, peer_identity, false); - engine = NULL; + send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; } @@ -178,9 +192,7 @@ void zmq::zmq_init_t::finalise_initialisation () // than by send_attach. session = socket->find_session (peer_identity); if (session) { - engine->unplug (); - send_attach (session, engine, peer_identity, false); - engine = NULL; + send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; } @@ -194,9 +206,7 @@ void zmq::zmq_init_t::finalise_initialisation () zmq_assert (session); session->inc_seqnum (); launch_sibling (session); - engine->unplug (); - send_attach (session, engine, peer_identity, false); - engine = NULL; + send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; } diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp index 511f141..12558a5 100644 --- a/src/zmq_init.hpp +++ b/src/zmq_init.hpp @@ -44,6 +44,7 @@ namespace zmq private: void finalise_initialisation (); + void dispatch_engine (); // i_inout interface implementation. bool read (::zmq_msg_t *msg_); @@ -58,6 +59,9 @@ namespace zmq // Associated wire-protocol engine. i_engine *engine; + // Detached transient engine. + i_engine *ephemeral_engine; + // True if our own identity was already sent to the peer. bool sent; |