summaryrefslogtreecommitdiff
path: root/src/session.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/session.cpp')
-rw-r--r--src/session.cpp99
1 files changed, 83 insertions, 16 deletions
diff --git a/src/session.cpp b/src/session.cpp
index 0dd0e34..ed9b44e 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -24,11 +24,16 @@
#include "err.hpp"
#include "pipe.hpp"
#include "likely.hpp"
+#include "zmq_connecter.hpp"
+#include "pgm_sender.hpp"
+#include "pgm_receiver.hpp"
-zmq::session_t::session_t (class io_thread_t *io_thread_,
- class socket_base_t *socket_, const options_t &options_) :
+zmq::session_t::session_t (class io_thread_t *io_thread_, bool connect_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
+ connect (connect_),
pipe (NULL),
incomplete_in (false),
pending (false),
@@ -37,6 +42,10 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
io_thread (io_thread_),
has_linger_timer (false)
{
+ if (protocol_)
+ protocol = protocol_;
+ if (address_)
+ address = address_;
}
zmq::session_t::~session_t ()
@@ -157,6 +166,8 @@ void zmq::session_t::hiccuped (pipe_t *pipe_)
void zmq::session_t::process_plug ()
{
+ if (connect)
+ start_connecting (false);
}
void zmq::session_t::process_attach (i_engine *engine_)
@@ -169,12 +180,6 @@ void zmq::session_t::process_attach (i_engine *engine_)
return;
}
- // Trigger the notfication event about the attachment.
- if (!attached ()) {
- delete engine_;
- return;
- }
-
// Create the pipe if it does not exist yet.
if (!pipe && !is_terminating ()) {
object_t *parents [2] = {this, socket};
@@ -271,25 +276,87 @@ void zmq::session_t::timer_event (int id_)
pipe->terminate (false);
}
-bool zmq::session_t::attached ()
-{
- return xattached ();
-}
-
void zmq::session_t::detached ()
{
- if (!xdetached ()) {
-
- // Derived session type have asked for session termination.
+ // Transient session self-destructs after peer disconnects.
+ if (!connect) {
terminate ();
return;
}
+ // Reconnect.
+ start_connecting (true);
+
// For subscriber sockets we hiccup the inbound pipe, which will cause
// the socket object to resend all the subscriptions.
if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
pipe->hiccup ();
}
+void zmq::session_t::start_connecting (bool wait_)
+{
+ zmq_assert (connect);
+
+ // Choose I/O thread to run connecter in. Given that we are already
+ // running in an I/O thread, there must be at least one available.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ zmq_assert (io_thread);
+
+ // Create the connecter object.
+
+ // Both TCP and IPC transports are using the same infrastructure.
+ if (protocol == "tcp" || protocol == "ipc") {
+
+ zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
+ io_thread, this, options, protocol.c_str (), address.c_str (),
+ wait_);
+ alloc_assert (connecter);
+ launch_child (connecter);
+ return;
+ }
+
+#if defined ZMQ_HAVE_OPENPGM
+ // Both PGM and EPGM transports are using the same infrastructure.
+ if (protocol == "pgm" || protocol == "epgm") {
+
+ // For EPGM transport with UDP encapsulation of PGM is used.
+ bool udp_encapsulation = (protocol == "epgm");
+
+ // At this point we'll create message pipes to the session straight
+ // away. There's no point in delaying it as no concept of 'connect'
+ // exists with PGM anyway.
+ if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
+
+ // PGM sender.
+ pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
+ io_thread, options);
+ alloc_assert (pgm_sender);
+
+ int rc = pgm_sender->init (udp_encapsulation, address.c_str ());
+ zmq_assert (rc == 0);
+
+ send_attach (this, pgm_sender);
+ }
+ else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) {
+
+ // PGM receiver.
+ pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
+ io_thread, options);
+ alloc_assert (pgm_receiver);
+
+ int rc = pgm_receiver->init (udp_encapsulation, address.c_str ());
+ zmq_assert (rc == 0);
+
+ send_attach (this, pgm_receiver);
+ }
+ else
+ zmq_assert (false);
+
+ return;
+ }
+#endif
+
+ zmq_assert (false);
+}