summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/zmq_engine.cpp22
-rw-r--r--src/zmq_engine.hpp3
-rw-r--r--src/zmq_init.cpp44
-rw-r--r--src/zmq_init.hpp4
4 files changed, 53 insertions, 20 deletions
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 0c1070d..c9a1d9f 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -40,6 +40,7 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) :
outsize (0),
encoder (out_batch_size),
inout (NULL),
+ ephemeral_inout (NULL),
options (options_),
plugged (false)
{
@@ -57,8 +58,9 @@ void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_)
{
zmq_assert (!plugged);
plugged = true;
+ ephemeral_inout = NULL;
- // Conncet to session/init object.
+ // Connect to session/init object.
zmq_assert (!inout);
zmq_assert (inout_);
encoder.set_inout (inout_);
@@ -89,6 +91,7 @@ void zmq::zmq_engine_t::unplug ()
// Disconnect from init/session object.
encoder.set_inout (NULL);
decoder.set_inout (NULL);
+ ephemeral_inout = inout;
inout = NULL;
}
@@ -139,7 +142,13 @@ void zmq::zmq_engine_t::in_event ()
}
// Flush all messages the decoder may have produced.
- inout->flush ();
+ // If IO handler has unplugged engine, flush transient IO handler.
+ if (unlikely (!plugged)) {
+ zmq_assert (ephemeral_inout);
+ ephemeral_inout->flush ();
+ } else {
+ inout->flush ();
+ }
if (disconnection)
error ();
@@ -152,7 +161,14 @@ void zmq::zmq_engine_t::out_event ()
outpos = NULL;
encoder.get_data (&outpos, &outsize);
-
+
+ // If IO handler has unplugged engine, flush transient IO handler.
+ if (unlikely (!plugged)) {
+ zmq_assert (ephemeral_inout);
+ ephemeral_inout->flush ();
+ return;
+ }
+
// If there is no data to send, stop polling for output.
if (outsize == 0) {
reset_pollout (handle);
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index c5f95dc..65bb85b 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -70,6 +70,9 @@ namespace zmq
i_inout *inout;
+ // Detached transient inout handler.
+ i_inout *ephemeral_inout;
+
options_t options;
bool plugged;
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index a796faa..7fa04fc 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -34,6 +34,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
socket_base_t *socket_, session_t *session_, fd_t fd_,
const options_t &options_) :
own_t (io_thread_, options_),
+ ephemeral_engine (NULL),
sent (false),
received (false),
socket (socket_),
@@ -64,8 +65,7 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
options.identity.size ());
sent = true;
- // If initialisation is done, pass the engine to the session and
- // destroy the init object.
+ // Try finalize initialization.
finalise_initialisation ();
return true;
@@ -92,6 +92,9 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
received = true;
+ // Try finalize initialization.
+ finalise_initialisation ();
+
return true;
}
@@ -101,9 +104,9 @@ void zmq::zmq_init_t::flush ()
if (!received)
return;
- // If initialisation is done, pass the engine to the session and
- // destroy the init object.
- finalise_initialisation ();
+ // Initialization is done, dispatch engine.
+ if (ephemeral_engine)
+ dispatch_engine ();
}
void zmq::zmq_init_t::detach ()
@@ -135,17 +138,30 @@ void zmq::zmq_init_t::process_unplug ()
void zmq::zmq_init_t::finalise_initialisation ()
{
+ // Unplug and prepare to dispatch engine.
+ if (sent && received) {
+ ephemeral_engine = engine;
+ engine = NULL;
+ ephemeral_engine->unplug ();
+ return;
+ }
+}
+
+void zmq::zmq_init_t::dispatch_engine ()
+{
if (sent && received) {
+ // Engine must be detached.
+ zmq_assert (!engine);
+ zmq_assert (ephemeral_engine);
+
// If we know what session we belong to, it's easy, just send the
// engine to that session and destroy the init object. Note that we
// know about the session only if this object is owned by it. Thus,
// lifetime of this object in contained in the lifetime of the session
// so the pointer cannot become invalid without notice.
if (session) {
- engine->unplug ();
- send_attach (session, engine, peer_identity, true);
- engine = NULL;
+ send_attach (session, ephemeral_engine, peer_identity, true);
terminate ();
return;
}
@@ -165,9 +181,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
zmq_assert (session);
session->inc_seqnum ();
launch_sibling (session);
- engine->unplug ();
- send_attach (session, engine, peer_identity, false);
- engine = NULL;
+ send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}
@@ -178,9 +192,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
// than by send_attach.
session = socket->find_session (peer_identity);
if (session) {
- engine->unplug ();
- send_attach (session, engine, peer_identity, false);
- engine = NULL;
+ send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}
@@ -194,9 +206,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
zmq_assert (session);
session->inc_seqnum ();
launch_sibling (session);
- engine->unplug ();
- send_attach (session, engine, peer_identity, false);
- engine = NULL;
+ send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index 511f141..12558a5 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -44,6 +44,7 @@ namespace zmq
private:
void finalise_initialisation ();
+ void dispatch_engine ();
// i_inout interface implementation.
bool read (::zmq_msg_t *msg_);
@@ -58,6 +59,9 @@ namespace zmq
// Associated wire-protocol engine.
i_engine *engine;
+ // Detached transient engine.
+ i_engine *ephemeral_engine;
+
// True if our own identity was already sent to the peer.
bool sent;