From 7c1dca546d9e49e7af372e4fff9e6a87058a7f12 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 24 Jul 2011 18:25:30 +0200 Subject: Session classes merged into a single class Removal of ZMQ_IDENTITY resulted in various session classes doing almost the same thing. This patch merges the classes into a single class. Signed-off-by: Martin Sustrik --- src/session.cpp | 99 +++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 83 insertions(+), 16 deletions(-) (limited to 'src/session.cpp') diff --git a/src/session.cpp b/src/session.cpp index 0dd0e34..ed9b44e 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -24,11 +24,16 @@ #include "err.hpp" #include "pipe.hpp" #include "likely.hpp" +#include "zmq_connecter.hpp" +#include "pgm_sender.hpp" +#include "pgm_receiver.hpp" -zmq::session_t::session_t (class io_thread_t *io_thread_, - class socket_base_t *socket_, const options_t &options_) : +zmq::session_t::session_t (class io_thread_t *io_thread_, bool connect_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : own_t (io_thread_, options_), io_object_t (io_thread_), + connect (connect_), pipe (NULL), incomplete_in (false), pending (false), @@ -37,6 +42,10 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, io_thread (io_thread_), has_linger_timer (false) { + if (protocol_) + protocol = protocol_; + if (address_) + address = address_; } zmq::session_t::~session_t () @@ -157,6 +166,8 @@ void zmq::session_t::hiccuped (pipe_t *pipe_) void zmq::session_t::process_plug () { + if (connect) + start_connecting (false); } void zmq::session_t::process_attach (i_engine *engine_) @@ -169,12 +180,6 @@ void zmq::session_t::process_attach (i_engine *engine_) return; } - // Trigger the notfication event about the attachment. - if (!attached ()) { - delete engine_; - return; - } - // Create the pipe if it does not exist yet. if (!pipe && !is_terminating ()) { object_t *parents [2] = {this, socket}; @@ -271,25 +276,87 @@ void zmq::session_t::timer_event (int id_) pipe->terminate (false); } -bool zmq::session_t::attached () -{ - return xattached (); -} - void zmq::session_t::detached () { - if (!xdetached ()) { - - // Derived session type have asked for session termination. + // Transient session self-destructs after peer disconnects. + if (!connect) { terminate (); return; } + // Reconnect. + start_connecting (true); + // For subscriber sockets we hiccup the inbound pipe, which will cause // the socket object to resend all the subscriptions. if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) pipe->hiccup (); } +void zmq::session_t::start_connecting (bool wait_) +{ + zmq_assert (connect); + + // Choose I/O thread to run connecter in. Given that we are already + // running in an I/O thread, there must be at least one available. + io_thread_t *io_thread = choose_io_thread (options.affinity); + zmq_assert (io_thread); + + // Create the connecter object. + + // Both TCP and IPC transports are using the same infrastructure. + if (protocol == "tcp" || protocol == "ipc") { + + zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( + io_thread, this, options, protocol.c_str (), address.c_str (), + wait_); + alloc_assert (connecter); + launch_child (connecter); + return; + } + +#if defined ZMQ_HAVE_OPENPGM + // Both PGM and EPGM transports are using the same infrastructure. + if (protocol == "pgm" || protocol == "epgm") { + + // For EPGM transport with UDP encapsulation of PGM is used. + bool udp_encapsulation = (protocol == "epgm"); + + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with PGM anyway. + if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { + + // PGM sender. + pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( + io_thread, options); + alloc_assert (pgm_sender); + + int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); + zmq_assert (rc == 0); + + send_attach (this, pgm_sender); + } + else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) { + + // PGM receiver. + pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( + io_thread, options); + alloc_assert (pgm_receiver); + + int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); + zmq_assert (rc == 0); + + send_attach (this, pgm_receiver); + } + else + zmq_assert (false); + + return; + } +#endif + + zmq_assert (false); +} -- cgit v1.2.3