summaryrefslogtreecommitdiff
path: root/src/zmq_engine.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-07-24 18:25:30 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-07-24 18:25:30 +0200
commit7c1dca546d9e49e7af372e4fff9e6a87058a7f12 (patch)
treef00c6760dcd14b944457928405e7e2eca23b1ff8 /src/zmq_engine.cpp
parentf716b571baf59c1b622c7666bb8bf2905126a3d4 (diff)
Session classes merged into a single class
Removal of ZMQ_IDENTITY resulted in various session classes doing almost the same thing. This patch merges the classes into a single class. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/zmq_engine.cpp')
-rw-r--r--src/zmq_engine.cpp51
1 files changed, 25 insertions, 26 deletions
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index b0a7df1..fa1bd45 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -29,6 +29,7 @@
#include "zmq_engine.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp"
+#include "session.hpp"
#include "config.hpp"
#include "err.hpp"
@@ -39,8 +40,8 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) :
outpos (NULL),
outsize (0),
encoder (out_batch_size),
- sink (NULL),
- ephemeral_sink (NULL),
+ session (NULL),
+ leftover_session (NULL),
options (options_),
plugged (false)
{
@@ -54,18 +55,18 @@ zmq::zmq_engine_t::~zmq_engine_t ()
zmq_assert (!plugged);
}
-void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
+void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, session_t *session_)
{
zmq_assert (!plugged);
plugged = true;
- ephemeral_sink = NULL;
+ leftover_session = NULL;
- // Connect to session/init object.
- zmq_assert (!sink);
- zmq_assert (sink_);
- encoder.set_sink (sink_);
- decoder.set_sink (sink_);
- sink = sink_;
+ // Connect to session object.
+ zmq_assert (!session);
+ zmq_assert (session_);
+ encoder.set_session (session_);
+ decoder.set_session (session_);
+ session = session_;
// Connect to I/O threads poller object.
io_object_t::plug (io_thread_);
@@ -88,11 +89,11 @@ void zmq::zmq_engine_t::unplug ()
// Disconnect from I/O threads poller object.
io_object_t::unplug ();
- // Disconnect from init/session object.
- encoder.set_sink (NULL);
- decoder.set_sink (NULL);
- ephemeral_sink = sink;
- sink = NULL;
+ // Disconnect from session object.
+ encoder.set_session (NULL);
+ decoder.set_session (NULL);
+ leftover_session = session;
+ session = NULL;
}
void zmq::zmq_engine_t::terminate ()
@@ -133,9 +134,7 @@ void zmq::zmq_engine_t::in_event ()
// Stop polling for input if we got stuck.
if (processed < insize) {
- // This may happen if queue limits are in effect or when
- // init object reads all required information from the socket
- // and rejects to read more data.
+ // This may happen if queue limits are in effect.
if (plugged)
reset_pollin (handle);
}
@@ -148,13 +147,13 @@ void zmq::zmq_engine_t::in_event ()
// Flush all messages the decoder may have produced.
// If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) {
- zmq_assert (ephemeral_sink);
- ephemeral_sink->flush ();
+ zmq_assert (leftover_session);
+ leftover_session->flush ();
} else {
- sink->flush ();
+ session->flush ();
}
- if (sink && disconnection)
+ if (session && disconnection)
error ();
}
@@ -168,8 +167,8 @@ void zmq::zmq_engine_t::out_event ()
// If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) {
- zmq_assert (ephemeral_sink);
- ephemeral_sink->flush ();
+ zmq_assert (leftover_session);
+ leftover_session->flush ();
return;
}
@@ -218,8 +217,8 @@ void zmq::zmq_engine_t::activate_in ()
void zmq::zmq_engine_t::error ()
{
- zmq_assert (sink);
- sink->detach ();
+ zmq_assert (session);
+ session->detach ();
unplug ();
delete this;
}