diff options
Diffstat (limited to 'src/zmq_init.cpp')
-rw-r--r-- | src/zmq_init.cpp | 44 |
1 files changed, 27 insertions, 17 deletions
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index a796faa..7fa04fc 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -34,6 +34,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, socket_base_t *socket_, session_t *session_, fd_t fd_, const options_t &options_) : own_t (io_thread_, options_), + ephemeral_engine (NULL), sent (false), received (false), socket (socket_), @@ -64,8 +65,7 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) options.identity.size ()); sent = true; - // If initialisation is done, pass the engine to the session and - // destroy the init object. + // Try finalize initialization. finalise_initialisation (); return true; @@ -92,6 +92,9 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) received = true; + // Try finalize initialization. + finalise_initialisation (); + return true; } @@ -101,9 +104,9 @@ void zmq::zmq_init_t::flush () if (!received) return; - // If initialisation is done, pass the engine to the session and - // destroy the init object. - finalise_initialisation (); + // Initialization is done, dispatch engine. + if (ephemeral_engine) + dispatch_engine (); } void zmq::zmq_init_t::detach () @@ -135,17 +138,30 @@ void zmq::zmq_init_t::process_unplug () void zmq::zmq_init_t::finalise_initialisation () { + // Unplug and prepare to dispatch engine. + if (sent && received) { + ephemeral_engine = engine; + engine = NULL; + ephemeral_engine->unplug (); + return; + } +} + +void zmq::zmq_init_t::dispatch_engine () +{ if (sent && received) { + // Engine must be detached. + zmq_assert (!engine); + zmq_assert (ephemeral_engine); + // If we know what session we belong to, it's easy, just send the // engine to that session and destroy the init object. Note that we // know about the session only if this object is owned by it. Thus, // lifetime of this object in contained in the lifetime of the session // so the pointer cannot become invalid without notice. if (session) { - engine->unplug (); - send_attach (session, engine, peer_identity, true); - engine = NULL; + send_attach (session, ephemeral_engine, peer_identity, true); terminate (); return; } @@ -165,9 +181,7 @@ void zmq::zmq_init_t::finalise_initialisation () zmq_assert (session); session->inc_seqnum (); launch_sibling (session); - engine->unplug (); - send_attach (session, engine, peer_identity, false); - engine = NULL; + send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; } @@ -178,9 +192,7 @@ void zmq::zmq_init_t::finalise_initialisation () // than by send_attach. session = socket->find_session (peer_identity); if (session) { - engine->unplug (); - send_attach (session, engine, peer_identity, false); - engine = NULL; + send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; } @@ -194,9 +206,7 @@ void zmq::zmq_init_t::finalise_initialisation () zmq_assert (session); session->inc_seqnum (); launch_sibling (session); - engine->unplug (); - send_attach (session, engine, peer_identity, false); - engine = NULL; + send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; } |