diff options
-rw-r--r-- | src/Makefile.am | 4 | ||||
-rw-r--r-- | src/connect_session.cpp | 124 | ||||
-rw-r--r-- | src/connect_session.hpp | 65 | ||||
-rw-r--r-- | src/decoder.cpp | 10 | ||||
-rw-r--r-- | src/decoder.hpp | 4 | ||||
-rw-r--r-- | src/encoder.cpp | 10 | ||||
-rw-r--r-- | src/encoder.hpp | 4 | ||||
-rw-r--r-- | src/i_engine.hpp | 21 | ||||
-rw-r--r-- | src/pgm_receiver.cpp | 17 | ||||
-rw-r--r-- | src/pgm_receiver.hpp | 4 | ||||
-rw-r--r-- | src/pgm_sender.cpp | 11 | ||||
-rw-r--r-- | src/pgm_sender.hpp | 2 | ||||
-rw-r--r-- | src/session.cpp | 99 | ||||
-rw-r--r-- | src/session.hpp | 32 | ||||
-rw-r--r-- | src/socket_base.cpp | 6 | ||||
-rw-r--r-- | src/transient_session.cpp | 43 | ||||
-rw-r--r-- | src/transient_session.hpp | 52 | ||||
-rw-r--r-- | src/zmq_engine.cpp | 51 | ||||
-rw-r--r-- | src/zmq_engine.hpp | 9 | ||||
-rw-r--r-- | src/zmq_listener.cpp | 6 |
20 files changed, 168 insertions, 406 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 126a321..b02e302 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -12,7 +12,6 @@ libzmq_la_SOURCES = \ clock.hpp \ command.hpp \ config.hpp \ - connect_session.hpp \ ctx.hpp \ decoder.hpp \ devpoll.hpp \ @@ -64,7 +63,6 @@ libzmq_la_SOURCES = \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ - transient_session.hpp \ trie.hpp \ windows.hpp \ wire.hpp \ @@ -79,7 +77,6 @@ libzmq_la_SOURCES = \ zmq_listener.hpp \ clock.cpp \ ctx.cpp \ - connect_session.cpp \ decoder.cpp \ devpoll.cpp \ dist.cpp \ @@ -122,7 +119,6 @@ libzmq_la_SOURCES = \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ - transient_session.cpp \ trie.cpp \ xpub.cpp \ xrep.cpp \ diff --git a/src/connect_session.cpp b/src/connect_session.cpp deleted file mode 100644 index 14666a6..0000000 --- a/src/connect_session.cpp +++ /dev/null @@ -1,124 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "connect_session.hpp" -#include "zmq_connecter.hpp" -#include "pgm_sender.hpp" -#include "pgm_receiver.hpp" -#include "err.hpp" - -zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_, - class socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_) : - session_t (io_thread_, socket_, options_), - protocol (protocol_), - address (address_) -{ -} - -zmq::connect_session_t::~connect_session_t () -{ -} - -void zmq::connect_session_t::process_plug () -{ - // Start connection process immediately. - start_connecting (false); -} - -void zmq::connect_session_t::start_connecting (bool wait_) -{ - // Choose I/O thread to run connecter in. Given that we are already - // running in an I/O thread, there must be at least one available. - io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_assert (io_thread); - - // Create the connecter object. - - // Both TCP and IPC transports are using the same infrastructure. - if (protocol == "tcp" || protocol == "ipc") { - - zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( - io_thread, this, options, protocol.c_str (), address.c_str (), - wait_); - alloc_assert (connecter); - launch_child (connecter); - return; - } - -#if defined ZMQ_HAVE_OPENPGM - - // Both PGM and EPGM transports are using the same infrastructure. - if (protocol == "pgm" || protocol == "epgm") { - - // For EPGM transport with UDP encapsulation of PGM is used. - bool udp_encapsulation = (protocol == "epgm"); - - // At this point we'll create message pipes to the session straight - // away. There's no point in delaying it as no concept of 'connect' - // exists with PGM anyway. - if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { - - // PGM sender. - pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( - io_thread, options); - alloc_assert (pgm_sender); - - int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); - zmq_assert (rc == 0); - - send_attach (this, pgm_sender); - } - else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) { - - // PGM receiver. - pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( - io_thread, options); - alloc_assert (pgm_receiver); - - int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); - zmq_assert (rc == 0); - - send_attach (this, pgm_receiver); - } - else - zmq_assert (false); - - return; - } -#endif - - zmq_assert (false); -} - -bool zmq::connect_session_t::xattached () -{ - return true; -} - -bool zmq::connect_session_t::xdetached () -{ - // Reconnect. - start_connecting (true); - - // Don't tear the session down. - return true; -} - diff --git a/src/connect_session.hpp b/src/connect_session.hpp deleted file mode 100644 index cdd78f8..0000000 --- a/src/connect_session.hpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_CONNECT_SESSION_HPP_INCLUDED__ -#define __ZMQ_CONNECT_SESSION_HPP_INCLUDED__ - -#include <string> - -#include "session.hpp" - -namespace zmq -{ - - // Connect session contains an address to connect to. On disconnect it - // attempts to reconnect. - - class connect_session_t : public session_t - { - public: - - connect_session_t (class io_thread_t *io_thread_, - class socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_); - ~connect_session_t (); - - private: - - // Handlers for events from session base class. - bool xattached (); - bool xdetached (); - - // Start the connection process. - void start_connecting (bool wait_); - - // Command handlers. - void process_plug (); - - // Address to connect to. - std::string protocol; - std::string address; - - connect_session_t (const connect_session_t&); - const connect_session_t &operator = (const connect_session_t&); - }; - -} - -#endif diff --git a/src/decoder.cpp b/src/decoder.cpp index 8fc1d5e..01ce0bb 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -22,13 +22,13 @@ #include <string.h> #include "decoder.hpp" -#include "i_engine.hpp" +#include "session.hpp" #include "wire.hpp" #include "err.hpp" zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) : decoder_base_t <decoder_t> (bufsize_), - sink (NULL), + session (NULL), maxmsgsize (maxmsgsize_) { int rc = in_progress.init (); @@ -44,9 +44,9 @@ zmq::decoder_t::~decoder_t () errno_assert (rc == 0); } -void zmq::decoder_t::set_sink (i_engine_sink *sink_) +void zmq::decoder_t::set_session (session_t *session_) { - sink = sink_; + session = session_; } bool zmq::decoder_t::one_byte_size_ready () @@ -136,7 +136,7 @@ bool zmq::decoder_t::message_ready () { // Message is completely read. Push it further and start reading // new message. (in_progress is a 0-byte message after this point.) - if (!sink || !sink->write (&in_progress)) + if (!session || !session->write (&in_progress)) return false; next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready); diff --git a/src/decoder.hpp b/src/decoder.hpp index 17fab4b..3ac4f7c 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -184,7 +184,7 @@ namespace zmq decoder_t (size_t bufsize_, int64_t maxmsgsize_); ~decoder_t (); - void set_sink (struct i_engine_sink *sink_); + void set_session (class session_t *session_); private: @@ -193,7 +193,7 @@ namespace zmq bool flags_ready (); bool message_ready (); - struct i_engine_sink *sink; + class session_t *session; unsigned char tmpbuf [8]; msg_t in_progress; diff --git a/src/encoder.cpp b/src/encoder.cpp index a9be68c..087735d 100644 --- a/src/encoder.cpp +++ b/src/encoder.cpp @@ -19,12 +19,12 @@ */ #include "encoder.hpp" -#include "i_engine.hpp" +#include "session.hpp" #include "wire.hpp" zmq::encoder_t::encoder_t (size_t bufsize_) : encoder_base_t <encoder_t> (bufsize_), - sink (NULL) + session (NULL) { int rc = in_progress.init (); errno_assert (rc == 0); @@ -39,9 +39,9 @@ zmq::encoder_t::~encoder_t () errno_assert (rc == 0); } -void zmq::encoder_t::set_sink (i_engine_sink *sink_) +void zmq::encoder_t::set_session (session_t *session_) { - sink = sink_; + session = session_; } bool zmq::encoder_t::size_ready () @@ -62,7 +62,7 @@ bool zmq::encoder_t::message_ready () // Note that new state is set only if write is successful. That way // unsuccessful write will cause retry on the next state machine // invocation. - if (!sink || !sink->read (&in_progress)) { + if (!session || !session->read (&in_progress)) { rc = in_progress.init (); errno_assert (rc == 0); return false; diff --git a/src/encoder.hpp b/src/encoder.hpp index 4bc22ce..b8784a3 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -163,14 +163,14 @@ namespace zmq encoder_t (size_t bufsize_); ~encoder_t (); - void set_sink (struct i_engine_sink *sink_); + void set_session (class session_t *session_); private: bool size_ready (); bool message_ready (); - struct i_engine_sink *sink; + class session_t *session; msg_t in_progress; unsigned char tmpbuf [10]; diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 636985f..c49a107 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -32,7 +32,7 @@ namespace zmq // Plug the engine to the session. virtual void plug (class io_thread_t *io_thread_, - struct i_engine_sink *sink_) = 0; + class session_t *session_) = 0; // Unplug the engine from the session. virtual void unplug () = 0; @@ -50,25 +50,6 @@ namespace zmq virtual void activate_out () = 0; }; - // Abstract interface to be implemented by engine sinks such as sessions. - - struct i_engine_sink - { - virtual ~i_engine_sink () {} - - // Engine asks for a message to send to the network. - virtual bool read (class msg_t *msg_) = 0; - - // Engine received message from the network and sends it further on. - virtual bool write (class msg_t *msg_) = 0; - - // Flush all the previously written messages. - virtual void flush () = 0; - - // Engine is dead. Drop all the references to it. - virtual void detach () = 0; - }; - } #endif diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index b859241..dcd002e 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -29,9 +29,10 @@ #endif #include "pgm_receiver.hpp" -#include "err.hpp" +#include "session.hpp" #include "stdint.hpp" #include "wire.hpp" +#include "err.hpp" zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, const options_t &options_) : @@ -39,7 +40,7 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, has_rx_timer (false), pgm_socket (true, options_), options (options_), - sink (NULL), + session (NULL), mru_decoder (NULL), pending_bytes (0) { @@ -56,7 +57,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_) return pgm_socket.init (udp_encapsulation_, network_); } -void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) +void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, session_t *session_) { // Retrieve PGM fds and start polling. int socket_fd; @@ -67,7 +68,7 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) set_pollin (pipe_handle); set_pollin (socket_handle); - sink = sink_; + session = session_; // If there are any subscriptions already queued in the session, drop them. drop_subscriptions (); @@ -93,7 +94,7 @@ void zmq::pgm_receiver_t::unplug () rm_fd (socket_handle); rm_fd (pipe_handle); - sink = NULL; + session = NULL; } void zmq::pgm_receiver_t::terminate () @@ -220,7 +221,7 @@ void zmq::pgm_receiver_t::in_event () it->second.decoder = new (std::nothrow) decoder_t (0, options.maxmsgsize); alloc_assert (it->second.decoder); - it->second.decoder->set_sink (sink); + it->second.decoder->set_session (session); } mru_decoder = it->second.decoder; @@ -246,7 +247,7 @@ void zmq::pgm_receiver_t::in_event () } // Flush any messages decoder may have produced. - sink->flush (); + session->flush (); } void zmq::pgm_receiver_t::timer_event (int token) @@ -261,7 +262,7 @@ void zmq::pgm_receiver_t::timer_event (int token) void zmq::pgm_receiver_t::drop_subscriptions () { msg_t msg; - while (sink->read (&msg)) + while (session->read (&msg)) msg.close (); } diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index aa010dd..f66c592 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -52,7 +52,7 @@ namespace zmq int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. - void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_); + void plug (class io_thread_t *io_thread_, class session_t *session_); void unplug (); void terminate (); void activate_in (); @@ -105,7 +105,7 @@ namespace zmq options_t options; // Associated session. - i_engine_sink *sink; + class session_t *session; // Most recently used decoder. decoder_t *mru_decoder; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 9b1e215..d2419ea 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -30,6 +30,7 @@ #include "io_thread.hpp" #include "pgm_sender.hpp" +#include "session.hpp" #include "err.hpp" #include "wire.hpp" #include "stdint.hpp" @@ -61,7 +62,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) return rc; } -void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) +void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_t *session_) { // Alocate 2 fds for PGM socket. int downlink_socket_fd = 0; @@ -69,7 +70,7 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) int rdata_notify_fd = 0; int pending_notify_fd = 0; - encoder.set_sink (sink_); + encoder.set_session (session_); // Fill fds from PGM transport and add them to the poller. pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, @@ -94,9 +95,9 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) // subscribe for all the messages. msg_t msg; msg.init (); - bool ok = sink_->write (&msg); + bool ok = session_->write (&msg); zmq_assert (ok); - sink_->flush (); + session_->flush (); } void zmq::pgm_sender_t::unplug () @@ -115,7 +116,7 @@ void zmq::pgm_sender_t::unplug () rm_fd (uplink_handle); rm_fd (rdata_notify_handle); rm_fd (pending_notify_handle); - encoder.set_sink (NULL); + encoder.set_session (NULL); } void zmq::pgm_sender_t::terminate () diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index c29dd12..366e385 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -50,7 +50,7 @@ namespace zmq int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. - void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_); + void plug (class io_thread_t *io_thread_, class session_t *session_); void unplug (); void terminate (); void activate_in (); diff --git a/src/session.cpp b/src/session.cpp index 0dd0e34..ed9b44e 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -24,11 +24,16 @@ #include "err.hpp" #include "pipe.hpp" #include "likely.hpp" +#include "zmq_connecter.hpp" +#include "pgm_sender.hpp" +#include "pgm_receiver.hpp" -zmq::session_t::session_t (class io_thread_t *io_thread_, - class socket_base_t *socket_, const options_t &options_) : +zmq::session_t::session_t (class io_thread_t *io_thread_, bool connect_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : own_t (io_thread_, options_), io_object_t (io_thread_), + connect (connect_), pipe (NULL), incomplete_in (false), pending (false), @@ -37,6 +42,10 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, io_thread (io_thread_), has_linger_timer (false) { + if (protocol_) + protocol = protocol_; + if (address_) + address = address_; } zmq::session_t::~session_t () @@ -157,6 +166,8 @@ void zmq::session_t::hiccuped (pipe_t *pipe_) void zmq::session_t::process_plug () { + if (connect) + start_connecting (false); } void zmq::session_t::process_attach (i_engine *engine_) @@ -169,12 +180,6 @@ void zmq::session_t::process_attach (i_engine *engine_) return; } - // Trigger the notfication event about the attachment. - if (!attached ()) { - delete engine_; - return; - } - // Create the pipe if it does not exist yet. if (!pipe && !is_terminating ()) { object_t *parents [2] = {this, socket}; @@ -271,25 +276,87 @@ void zmq::session_t::timer_event (int id_) pipe->terminate (false); } -bool zmq::session_t::attached () -{ - return xattached (); -} - void zmq::session_t::detached () { - if (!xdetached ()) { - - // Derived session type have asked for session termination. + // Transient session self-destructs after peer disconnects. + if (!connect) { terminate (); return; } + // Reconnect. + start_connecting (true); + // For subscriber sockets we hiccup the inbound pipe, which will cause // the socket object to resend all the subscriptions. if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) pipe->hiccup (); } +void zmq::session_t::start_connecting (bool wait_) +{ + zmq_assert (connect); + + // Choose I/O thread to run connecter in. Given that we are already + // running in an I/O thread, there must be at least one available. + io_thread_t *io_thread = choose_io_thread (options.affinity); + zmq_assert (io_thread); + + // Create the connecter object. + + // Both TCP and IPC transports are using the same infrastructure. + if (protocol == "tcp" || protocol == "ipc") { + + zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( + io_thread, this, options, protocol.c_str (), address.c_str (), + wait_); + alloc_assert (connecter); + launch_child (connecter); + return; + } + +#if defined ZMQ_HAVE_OPENPGM + // Both PGM and EPGM transports are using the same infrastructure. + if (protocol == "pgm" || protocol == "epgm") { + + // For EPGM transport with UDP encapsulation of PGM is used. + bool udp_encapsulation = (protocol == "epgm"); + + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with PGM anyway. + if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { + + // PGM sender. + pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( + io_thread, options); + alloc_assert (pgm_sender); + + int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); + zmq_assert (rc == 0); + + send_attach (this, pgm_sender); + } + else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) { + + // PGM receiver. + pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( + io_thread, options); + alloc_assert (pgm_receiver); + + int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); + zmq_assert (rc == 0); + + send_attach (this, pgm_receiver); + } + else + zmq_assert (false); + + return; + } +#endif + + zmq_assert (false); +} diff --git a/src/session.hpp b/src/session.hpp index 60aa7c5..a155357 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -21,6 +21,8 @@ #ifndef __ZMQ_SESSION_HPP_INCLUDED__ #define __ZMQ_SESSION_HPP_INCLUDED__ +#include <string> + #include "own.hpp" #include "i_engine.hpp" #include "io_object.hpp" @@ -32,18 +34,18 @@ namespace zmq class session_t : public own_t, public io_object_t, - public i_engine_sink, public i_pipe_events { public: - session_t (class io_thread_t *io_thread_, - class socket_base_t *socket_, const options_t &options_); + session_t (class io_thread_t *io_thread_, bool connect_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); // To be used once only, when creating the session. void attach_pipe (class pipe_t *pipe_); - // i_engine_sink interface implementation. + // Following functions are the interface exposed towards the engine. bool read (msg_t *msg_); bool write (msg_t *msg_); void flush (); @@ -55,22 +57,12 @@ namespace zmq void hiccuped (class pipe_t *pipe_); void terminated (class pipe_t *pipe_); - protected: - - // Events from the engine. Attached is triggered when session is - // attached to a peer. The function can reject the new peer by - // returning false. Detached is triggered at the beginning of - // the termination process when session is about to be detached from - // the peer. If it returns false, session will be terminated. - // To be overloaded by the derived session type. - virtual bool xattached () = 0; - virtual bool xdetached () = 0; + private: ~session_t (); - private: + void start_connecting (bool wait_); - bool attached (); void detached (); // Handlers for incoming commands. @@ -88,6 +80,10 @@ namespace zmq // Call this function to move on with the delayed process_term. void proceed_with_term (); + // If true, this session (re)connects to the peer. Otherwise, it's + // a transient session created by the listener. + bool connect; + // Pipe connecting the session to its socket. class pipe_t *pipe; @@ -115,6 +111,10 @@ namespace zmq // True is linger timer is running. bool has_linger_timer; + // Protocol and address to use when connecting. + std::string protocol; + std::string address; + session_t (const session_t&); const session_t &operator = (const session_t&); }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 92715b1..2ec3998 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -37,7 +37,7 @@ #include "zmq_listener.hpp" #include "zmq_connecter.hpp" #include "io_thread.hpp" -#include "connect_session.hpp" +#include "session.hpp" #include "config.hpp" #include "clock.hpp" #include "pipe.hpp" @@ -441,8 +441,8 @@ int zmq::socket_base_t::connect (const char *addr_) } // Create session. - connect_session_t *session = new (std::nothrow) connect_session_t ( - io_thread, this, options, protocol.c_str (), address.c_str ()); + session_t *session = new (std::nothrow) session_t ( + io_thread, true, this, options, protocol.c_str (), address.c_str ()); alloc_assert (session); // Create a bi-directional pipe. diff --git a/src/transient_session.cpp b/src/transient_session.cpp deleted file mode 100644 index d389ff4..0000000 --- a/src/transient_session.cpp +++ /dev/null @@ -1,43 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "transient_session.hpp" - -zmq::transient_session_t::transient_session_t (class io_thread_t *io_thread_, - class socket_base_t *socket_, const options_t &options_) : - session_t (io_thread_, socket_, options_) -{ -} - -zmq::transient_session_t::~transient_session_t () -{ -} - -bool zmq::transient_session_t::xattached () -{ - // Transient session is always valid. - return true; -} - -bool zmq::transient_session_t::xdetached () -{ - // There's no way to reestablish a transient session. Tear it down. - return false; -} diff --git a/src/transient_session.hpp b/src/transient_session.hpp deleted file mode 100644 index eff6b65..0000000 --- a/src/transient_session.hpp +++ /dev/null @@ -1,52 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_TRANSIENT_SESSION_HPP_INCLUDED__ -#define __ZMQ_TRANSIENT_SESSION_HPP_INCLUDED__ - -#include "session.hpp" - -namespace zmq -{ - - // Transient session is created by the listener when the connected peer - // stays anonymous. Transient session is destroyed on disconnect. - - class transient_session_t : public session_t - { - public: - - transient_session_t (class io_thread_t *io_thread_, - class socket_base_t *socket_, const options_t &options_); - ~transient_session_t (); - - private: - - // Handlers for events from session base class. - bool xattached (); - bool xdetached (); - - transient_session_t (const transient_session_t&); - const transient_session_t &operator = (const transient_session_t&); - }; - -} - -#endif diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index b0a7df1..fa1bd45 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -29,6 +29,7 @@ #include "zmq_engine.hpp" #include "zmq_connecter.hpp" #include "io_thread.hpp" +#include "session.hpp" #include "config.hpp" #include "err.hpp" @@ -39,8 +40,8 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : outpos (NULL), outsize (0), encoder (out_batch_size), - sink (NULL), - ephemeral_sink (NULL), + session (NULL), + leftover_session (NULL), options (options_), plugged (false) { @@ -54,18 +55,18 @@ zmq::zmq_engine_t::~zmq_engine_t () zmq_assert (!plugged); } -void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) +void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, session_t *session_) { zmq_assert (!plugged); plugged = true; - ephemeral_sink = NULL; + leftover_session = NULL; - // Connect to session/init object. - zmq_assert (!sink); - zmq_assert (sink_); - encoder.set_sink (sink_); - decoder.set_sink (sink_); - sink = sink_; + // Connect to session object. + zmq_assert (!session); + zmq_assert (session_); + encoder.set_session (session_); + decoder.set_session (session_); + session = session_; // Connect to I/O threads poller object. io_object_t::plug (io_thread_); @@ -88,11 +89,11 @@ void zmq::zmq_engine_t::unplug () // Disconnect from I/O threads poller object. io_object_t::unplug (); - // Disconnect from init/session object. - encoder.set_sink (NULL); - decoder.set_sink (NULL); - ephemeral_sink = sink; - sink = NULL; + // Disconnect from session object. + encoder.set_session (NULL); + decoder.set_session (NULL); + leftover_session = session; + session = NULL; } void zmq::zmq_engine_t::terminate () @@ -133,9 +134,7 @@ void zmq::zmq_engine_t::in_event () // Stop polling for input if we got stuck. if (processed < insize) { - // This may happen if queue limits are in effect or when - // init object reads all required information from the socket - // and rejects to read more data. + // This may happen if queue limits are in effect. if (plugged) reset_pollin (handle); } @@ -148,13 +147,13 @@ void zmq::zmq_engine_t::in_event () // Flush all messages the decoder may have produced. // If IO handler has unplugged engine, flush transient IO handler. if (unlikely (!plugged)) { - zmq_assert (ephemeral_sink); - ephemeral_sink->flush (); + zmq_assert (leftover_session); + leftover_session->flush (); } else { - sink->flush (); + session->flush (); } - if (sink && disconnection) + if (session && disconnection) error (); } @@ -168,8 +167,8 @@ void zmq::zmq_engine_t::out_event () // If IO handler has unplugged engine, flush transient IO handler. if (unlikely (!plugged)) { - zmq_assert (ephemeral_sink); - ephemeral_sink->flush (); + zmq_assert (leftover_session); + leftover_session->flush (); return; } @@ -218,8 +217,8 @@ void zmq::zmq_engine_t::activate_in () void zmq::zmq_engine_t::error () { - zmq_assert (sink); - sink->detach (); + zmq_assert (session); + session->detach (); unplug (); delete this; } diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 7f09775..ad4bc8a 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -43,7 +43,7 @@ namespace zmq ~zmq_engine_t (); // i_engine interface implementation. - void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_); + void plug (class io_thread_t *io_thread_, class session_t *session_); void unplug (); void terminate (); void activate_in (); @@ -69,10 +69,11 @@ namespace zmq size_t outsize; encoder_t encoder; - i_engine_sink *sink; + // The session this engine is attached to. + class session_t *session; - // Detached transient sink. - i_engine_sink *ephemeral_sink; + // Detached transient session. + class session_t *leftover_session; options_t options; diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp index 2d5e7bb..cb73ad8 100644 --- a/src/zmq_listener.cpp +++ b/src/zmq_listener.cpp @@ -21,9 +21,9 @@ #include <new> #include "zmq_listener.hpp" -#include "transient_session.hpp" #include "zmq_engine.hpp" #include "io_thread.hpp" +#include "session.hpp" #include "err.hpp" zmq::zmq_listener_t::zmq_listener_t (io_thread_t *io_thread_, @@ -74,8 +74,8 @@ void zmq::zmq_listener_t::in_event () zmq_assert (io_thread); // Create and launch a session object. - transient_session_t *session = new (std::nothrow) - transient_session_t (io_thread, socket, options); + session_t *session = new (std::nothrow) + session_t (io_thread, false, socket, options, NULL, NULL); alloc_assert (session); session->inc_seqnum (); launch_child (session); |