From 7c1dca546d9e49e7af372e4fff9e6a87058a7f12 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 24 Jul 2011 18:25:30 +0200 Subject: Session classes merged into a single class Removal of ZMQ_IDENTITY resulted in various session classes doing almost the same thing. This patch merges the classes into a single class. Signed-off-by: Martin Sustrik --- src/zmq_engine.cpp | 51 +++++++++++++++++++++++++-------------------------- 1 file changed, 25 insertions(+), 26 deletions(-) (limited to 'src/zmq_engine.cpp') diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index b0a7df1..fa1bd45 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -29,6 +29,7 @@ #include "zmq_engine.hpp" #include "zmq_connecter.hpp" #include "io_thread.hpp" +#include "session.hpp" #include "config.hpp" #include "err.hpp" @@ -39,8 +40,8 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : outpos (NULL), outsize (0), encoder (out_batch_size), - sink (NULL), - ephemeral_sink (NULL), + session (NULL), + leftover_session (NULL), options (options_), plugged (false) { @@ -54,18 +55,18 @@ zmq::zmq_engine_t::~zmq_engine_t () zmq_assert (!plugged); } -void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) +void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, session_t *session_) { zmq_assert (!plugged); plugged = true; - ephemeral_sink = NULL; + leftover_session = NULL; - // Connect to session/init object. - zmq_assert (!sink); - zmq_assert (sink_); - encoder.set_sink (sink_); - decoder.set_sink (sink_); - sink = sink_; + // Connect to session object. + zmq_assert (!session); + zmq_assert (session_); + encoder.set_session (session_); + decoder.set_session (session_); + session = session_; // Connect to I/O threads poller object. io_object_t::plug (io_thread_); @@ -88,11 +89,11 @@ void zmq::zmq_engine_t::unplug () // Disconnect from I/O threads poller object. io_object_t::unplug (); - // Disconnect from init/session object. - encoder.set_sink (NULL); - decoder.set_sink (NULL); - ephemeral_sink = sink; - sink = NULL; + // Disconnect from session object. + encoder.set_session (NULL); + decoder.set_session (NULL); + leftover_session = session; + session = NULL; } void zmq::zmq_engine_t::terminate () @@ -133,9 +134,7 @@ void zmq::zmq_engine_t::in_event () // Stop polling for input if we got stuck. if (processed < insize) { - // This may happen if queue limits are in effect or when - // init object reads all required information from the socket - // and rejects to read more data. + // This may happen if queue limits are in effect. if (plugged) reset_pollin (handle); } @@ -148,13 +147,13 @@ void zmq::zmq_engine_t::in_event () // Flush all messages the decoder may have produced. // If IO handler has unplugged engine, flush transient IO handler. if (unlikely (!plugged)) { - zmq_assert (ephemeral_sink); - ephemeral_sink->flush (); + zmq_assert (leftover_session); + leftover_session->flush (); } else { - sink->flush (); + session->flush (); } - if (sink && disconnection) + if (session && disconnection) error (); } @@ -168,8 +167,8 @@ void zmq::zmq_engine_t::out_event () // If IO handler has unplugged engine, flush transient IO handler. if (unlikely (!plugged)) { - zmq_assert (ephemeral_sink); - ephemeral_sink->flush (); + zmq_assert (leftover_session); + leftover_session->flush (); return; } @@ -218,8 +217,8 @@ void zmq::zmq_engine_t::activate_in () void zmq::zmq_engine_t::error () { - zmq_assert (sink); - sink->detach (); + zmq_assert (session); + session->detach (); unplug (); delete this; } -- cgit v1.2.3