From f78d9b6bfca13e298c29fadabbbc870b37a0a573 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 15 Sep 2011 10:00:23 +0200 Subject: Session class separated into socket-type-specific sessions This is a preliminary patch allowing for socket-type-specific functionality in the I/O thread. For example, message format can be checked asynchronously and misbehaved connections dropped straight away. Signed-off-by: Martin Sustrik --- src/Makefile.am | 4 +- src/decoder.cpp | 4 +- src/decoder.hpp | 4 +- src/encoder.cpp | 4 +- src/encoder.hpp | 4 +- src/i_engine.hpp | 2 +- src/ipc_connecter.cpp | 2 +- src/ipc_connecter.hpp | 4 +- src/ipc_listener.cpp | 8 +- src/object.cpp | 6 +- src/object.hpp | 2 +- src/pair.cpp | 12 ++ src/pair.hpp | 16 ++ src/pgm_receiver.cpp | 5 +- src/pgm_receiver.hpp | 5 +- src/pgm_sender.cpp | 4 +- src/pgm_sender.hpp | 3 +- src/pub.cpp | 12 ++ src/pub.hpp | 15 ++ src/pull.cpp | 12 ++ src/pull.hpp | 16 ++ src/push.cpp | 12 ++ src/push.hpp | 16 ++ src/rep.cpp | 12 ++ src/rep.hpp | 15 ++ src/req.cpp | 11 ++ src/req.hpp | 15 ++ src/router.cpp | 10 ++ src/router.hpp | 16 ++ src/session.cpp | 383 ----------------------------------------- src/session.hpp | 124 -------------- src/session_base.cpp | 457 +++++++++++++++++++++++++++++++++++++++++++++++++ src/session_base.hpp | 131 ++++++++++++++ src/socket_base.cpp | 8 +- src/stream_engine.cpp | 5 +- src/stream_engine.hpp | 7 +- src/sub.cpp | 12 ++ src/sub.hpp | 15 ++ src/tcp_connecter.cpp | 2 +- src/tcp_connecter.hpp | 4 +- src/tcp_listener.cpp | 8 +- src/vtcp_connecter.cpp | 2 +- src/vtcp_connecter.hpp | 4 +- src/vtcp_listener.cpp | 6 +- src/xpub.cpp | 12 ++ src/xpub.hpp | 16 ++ src/xrep.cpp | 10 ++ src/xrep.hpp | 16 ++ src/xreq.cpp | 12 ++ src/xreq.hpp | 16 ++ src/xsub.cpp | 11 ++ src/xsub.hpp | 16 ++ 52 files changed, 970 insertions(+), 558 deletions(-) delete mode 100644 src/session.cpp delete mode 100644 src/session.hpp create mode 100644 src/session_base.cpp create mode 100644 src/session_base.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index 7992ab8..3b7dec6 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -57,7 +57,7 @@ libzmq_la_SOURCES = \ req.hpp \ router.hpp \ select.hpp \ - session.hpp \ + session_base.hpp \ signaler.hpp \ socket_base.hpp \ stdint.hpp \ @@ -117,7 +117,7 @@ libzmq_la_SOURCES = \ rep.cpp \ req.cpp \ select.cpp \ - session.cpp \ + session_base.cpp \ signaler.cpp \ socket_base.cpp \ stream_engine.cpp \ diff --git a/src/decoder.cpp b/src/decoder.cpp index 01ce0bb..9e93b73 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -22,7 +22,7 @@ #include #include "decoder.hpp" -#include "session.hpp" +#include "session_base.hpp" #include "wire.hpp" #include "err.hpp" @@ -44,7 +44,7 @@ zmq::decoder_t::~decoder_t () errno_assert (rc == 0); } -void zmq::decoder_t::set_session (session_t *session_) +void zmq::decoder_t::set_session (session_base_t *session_) { session = session_; } diff --git a/src/decoder.hpp b/src/decoder.hpp index 3ac4f7c..01021c4 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -184,7 +184,7 @@ namespace zmq decoder_t (size_t bufsize_, int64_t maxmsgsize_); ~decoder_t (); - void set_session (class session_t *session_); + void set_session (class session_base_t *session_); private: @@ -193,7 +193,7 @@ namespace zmq bool flags_ready (); bool message_ready (); - class session_t *session; + class session_base_t *session; unsigned char tmpbuf [8]; msg_t in_progress; diff --git a/src/encoder.cpp b/src/encoder.cpp index 087735d..6d09384 100644 --- a/src/encoder.cpp +++ b/src/encoder.cpp @@ -19,7 +19,7 @@ */ #include "encoder.hpp" -#include "session.hpp" +#include "session_base.hpp" #include "wire.hpp" zmq::encoder_t::encoder_t (size_t bufsize_) : @@ -39,7 +39,7 @@ zmq::encoder_t::~encoder_t () errno_assert (rc == 0); } -void zmq::encoder_t::set_session (session_t *session_) +void zmq::encoder_t::set_session (session_base_t *session_) { session = session_; } diff --git a/src/encoder.hpp b/src/encoder.hpp index b8784a3..f7e3cbc 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -163,14 +163,14 @@ namespace zmq encoder_t (size_t bufsize_); ~encoder_t (); - void set_session (class session_t *session_); + void set_session (class session_base_t *session_); private: bool size_ready (); bool message_ready (); - class session_t *session; + class session_base_t *session; msg_t in_progress; unsigned char tmpbuf [10]; diff --git a/src/i_engine.hpp b/src/i_engine.hpp index c49a107..26e475b 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -32,7 +32,7 @@ namespace zmq // Plug the engine to the session. virtual void plug (class io_thread_t *io_thread_, - class session_t *session_) = 0; + class session_base_t *session_) = 0; // Unplug the engine from the session. virtual void unplug () = 0; diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index 9b8520d..a54e8fe 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -38,7 +38,7 @@ #include zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, - class session_t *session_, const options_t &options_, + class session_base_t *session_, const options_t &options_, const char *address_, bool wait_) : own_t (io_thread_, options_), io_object_t (io_thread_), diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index 0bb9d69..721bcf4 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -41,7 +41,7 @@ namespace zmq // If 'delay' is true connecter first waits for a while, then starts // connection process. ipc_connecter_t (class io_thread_t *io_thread_, - class session_t *session_, const options_t &options_, + class session_base_t *session_, const options_t &options_, const char *address_, bool delay_); ~ipc_connecter_t (); @@ -101,7 +101,7 @@ namespace zmq bool wait; // Reference to the session we belong to. - class session_t *session; + class session_base_t *session; // Current reconnect ivl, updated for backoff strategy int current_reconnect_ivl; diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index cad58ba..5ba41be 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -29,7 +29,7 @@ #include "stream_engine.hpp" #include "ipc_address.hpp" #include "io_thread.hpp" -#include "session.hpp" +#include "session_base.hpp" #include "config.hpp" #include "err.hpp" #include "ip.hpp" @@ -87,9 +87,9 @@ void zmq::ipc_listener_t::in_event () zmq_assert (io_thread); // Create and launch a session object. - session_t *session = new (std::nothrow) - session_t (io_thread, false, socket, options, NULL, NULL); - alloc_assert (session); + session_base_t *session = session_base_t::create (io_thread, false, socket, + options, NULL, NULL); + errno_assert (session); session->inc_seqnum (); launch_child (session); send_attach (session, engine, false); diff --git a/src/object.cpp b/src/object.cpp index 7f7d7f8..807fb04 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -26,7 +26,7 @@ #include "err.hpp" #include "pipe.hpp" #include "io_thread.hpp" -#include "session.hpp" +#include "session_base.hpp" #include "socket_base.hpp" zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) : @@ -201,8 +201,8 @@ void zmq::object_t::send_own (own_t *destination_, own_t *object_) send_command (cmd); } -void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, - bool inc_seqnum_) +void zmq::object_t::send_attach (session_base_t *destination_, + i_engine *engine_, bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); diff --git a/src/object.hpp b/src/object.hpp index e05b958..1a38b24 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -62,7 +62,7 @@ namespace zmq bool inc_seqnum_ = true); void send_own (class own_t *destination_, class own_t *object_); - void send_attach (class session_t *destination_, + void send_attach (class session_base_t *destination_, struct i_engine *engine_, bool inc_seqnum_ = true); void send_bind (class own_t *destination_, class pipe_t *pipe_, bool inc_seqnum_ = true); diff --git a/src/pair.cpp b/src/pair.cpp index 12a1881..2fa4eac 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -116,3 +116,15 @@ bool zmq::pair_t::xhas_out () return result; } +zmq::pair_session_t::pair_session_t (io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + session_base_t (io_thread_, connect_, socket_, options_, protocol_, + address_) +{ +} + +zmq::pair_session_t::~pair_session_t () +{ +} + diff --git a/src/pair.hpp b/src/pair.hpp index 59300ae..e7390d6 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -22,6 +22,7 @@ #define __ZMQ_PAIR_HPP_INCLUDED__ #include "socket_base.hpp" +#include "session_base.hpp" namespace zmq { @@ -52,6 +53,21 @@ namespace zmq const pair_t &operator = (const pair_t&); }; + class pair_session_t : public session_base_t + { + public: + + pair_session_t (class io_thread_t *io_thread_, bool connect_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~pair_session_t (); + + private: + + pair_session_t (const pair_session_t&); + const pair_session_t &operator = (const pair_session_t&); + }; + } #endif diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 5c1517d..6c292cd 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -29,7 +29,7 @@ #endif #include "pgm_receiver.hpp" -#include "session.hpp" +#include "session_base.hpp" #include "stdint.hpp" #include "wire.hpp" #include "err.hpp" @@ -57,7 +57,8 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_) return pgm_socket.init (udp_encapsulation_, network_); } -void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, session_t *session_) +void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, + session_base_t *session_) { // Retrieve PGM fds and start polling. fd_t socket_fd = retired_fd; diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index f66c592..b9e9a05 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -52,7 +52,8 @@ namespace zmq int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. - void plug (class io_thread_t *io_thread_, class session_t *session_); + void plug (class io_thread_t *io_thread_, + class session_base_t *session_); void unplug (); void terminate (); void activate_in (); @@ -105,7 +106,7 @@ namespace zmq options_t options; // Associated session. - class session_t *session; + class session_base_t *session; // Most recently used decoder. decoder_t *mru_decoder; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index e103d9a..733b1ec 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -30,7 +30,7 @@ #include "io_thread.hpp" #include "pgm_sender.hpp" -#include "session.hpp" +#include "session_base.hpp" #include "err.hpp" #include "wire.hpp" #include "stdint.hpp" @@ -62,7 +62,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) return rc; } -void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_t *session_) +void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) { // Alocate 2 fds for PGM socket. fd_t downlink_socket_fd = retired_fd; diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 366e385..d3d5924 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -50,7 +50,8 @@ namespace zmq int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. - void plug (class io_thread_t *io_thread_, class session_t *session_); + void plug (class io_thread_t *io_thread_, + class session_base_t *session_); void unplug (); void terminate (); void activate_in (); diff --git a/src/pub.cpp b/src/pub.cpp index 4787c32..15ec291 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -43,3 +43,15 @@ bool zmq::pub_t::xhas_in () return false; } +zmq::pub_session_t::pub_session_t (io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + xpub_session_t (io_thread_, connect_, socket_, options_, protocol_, + address_) +{ +} + +zmq::pub_session_t::~pub_session_t () +{ +} + diff --git a/src/pub.hpp b/src/pub.hpp index c8db55f..4a4da0f 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -43,6 +43,21 @@ namespace zmq const pub_t &operator = (const pub_t&); }; + class pub_session_t : public xpub_session_t + { + public: + + pub_session_t (class io_thread_t *io_thread_, bool connect_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~pub_session_t (); + + private: + + pub_session_t (const pub_session_t&); + const pub_session_t &operator = (const pub_session_t&); + }; + } #endif diff --git a/src/pull.cpp b/src/pull.cpp index afde236..06575da 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -59,3 +59,15 @@ bool zmq::pull_t::xhas_in () return fq.has_in (); } +zmq::pull_session_t::pull_session_t (io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + session_base_t (io_thread_, connect_, socket_, options_, protocol_, + address_) +{ +} + +zmq::pull_session_t::~pull_session_t () +{ +} + diff --git a/src/pull.hpp b/src/pull.hpp index be82af9..6a46ead 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -22,6 +22,7 @@ #define __ZMQ_PULL_HPP_INCLUDED__ #include "socket_base.hpp" +#include "session_base.hpp" #include "fq.hpp" namespace zmq @@ -54,6 +55,21 @@ namespace zmq }; + class pull_session_t : public session_base_t + { + public: + + pull_session_t (class io_thread_t *io_thread_, bool connect_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~pull_session_t (); + + private: + + pull_session_t (const pull_session_t&); + const pull_session_t &operator = (const pull_session_t&); + }; + } #endif diff --git a/src/push.cpp b/src/push.cpp index 77cc9d8..e91b789 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -59,3 +59,15 @@ bool zmq::push_t::xhas_out () return lb.has_out (); } +zmq::push_session_t::push_session_t (io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + session_base_t (io_thread_, connect_, socket_, options_, protocol_, + address_) +{ +} + +zmq::push_session_t::~push_session_t () +{ +} + diff --git a/src/push.hpp b/src/push.hpp index 222a62d..1feb71d 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -22,6 +22,7 @@ #define __ZMQ_PUSH_HPP_INCLUDED__ #include "socket_base.hpp" +#include "session_base.hpp" #include "lb.hpp" namespace zmq @@ -53,6 +54,21 @@ namespace zmq const push_t &operator = (const push_t&); }; + class push_session_t : public session_base_t + { + public: + + push_session_t (class io_thread_t *io_thread_, bool connect_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~push_session_t (); + + private: + + push_session_t (const push_session_t&); + const push_session_t &operator = (const push_session_t&); + }; + } #endif diff --git a/src/rep.cpp b/src/rep.cpp index 2ad494d..564fa89 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -110,3 +110,15 @@ bool zmq::rep_t::xhas_out () return xrep_t::xhas_out (); } +zmq::rep_session_t::rep_session_t (io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + xrep_session_t (io_thread_, connect_, socket_, options_, protocol_, + address_) +{ +} + +zmq::rep_session_t::~rep_session_t () +{ +} + diff --git a/src/rep.hpp b/src/rep.hpp index a13853d..55d57bd 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -54,6 +54,21 @@ namespace zmq }; + class rep_session_t : public xrep_session_t + { + public: + + rep_session_t (class io_thread_t *io_thread_, bool connect_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~rep_session_t (); + + private: + + rep_session_t (const rep_session_t&); + const rep_session_t &operator = (const rep_session_t&); + }; + } #endif diff --git a/src/req.cpp b/src/req.cpp index b3a9359..323e058 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -146,4 +146,15 @@ bool zmq::req_t::xhas_out () return xreq_t::xhas_out (); } +zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + xreq_session_t (io_thread_, connect_, socket_, options_, protocol_, + address_) +{ +} + +zmq::req_session_t::~req_session_t () +{ +} diff --git a/src/req.hpp b/src/req.hpp index 50dcb44..2c2cbc4 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -58,6 +58,21 @@ namespace zmq const req_t &operator = (const req_t&); }; + class req_session_t : public xreq_session_t + { + public: + + req_session_t (class io_thread_t *io_thread_, bool connect_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~req_session_t (); + + private: + + req_session_t (const req_session_t&); + const req_session_t &operator = (const req_session_t&); + }; + } #endif diff --git a/src/router.cpp b/src/router.cpp index 2c9ade9..c8cc278 100755 --- a/src/router.cpp +++ b/src/router.cpp @@ -270,5 +270,15 @@ bool zmq::router_t::xhas_out () return true; } +zmq::router_session_t::router_session_t (io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + session_base_t (io_thread_, connect_, socket_, options_, protocol_, + address_) +{ +} +zmq::router_session_t::~router_session_t () +{ +} diff --git a/src/router.hpp b/src/router.hpp index aeac865..9a5c0f9 100755 --- a/src/router.hpp +++ b/src/router.hpp @@ -25,6 +25,7 @@ #include #include "socket_base.hpp" +#include "session_base.hpp" #include "stdint.hpp" #include "msg.hpp" #include "fq.hpp" @@ -102,6 +103,21 @@ namespace zmq const router_t &operator = (const router_t&); }; + class router_session_t : public session_base_t + { + public: + + router_session_t (class io_thread_t *io_thread_, bool connect_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~router_session_t (); + + private: + + router_session_t (const router_session_t&); + const router_session_t &operator = (const router_session_t&); + }; + } #endif diff --git a/src/session.cpp b/src/session.cpp deleted file mode 100644 index 8001ba8..0000000 --- a/src/session.cpp +++ /dev/null @@ -1,383 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "session.hpp" -#include "socket_base.hpp" -#include "i_engine.hpp" -#include "err.hpp" -#include "pipe.hpp" -#include "likely.hpp" -#include "tcp_connecter.hpp" -#include "ipc_connecter.hpp" -#include "vtcp_connecter.hpp" -#include "pgm_sender.hpp" -#include "pgm_receiver.hpp" - -zmq::session_t::session_t (class io_thread_t *io_thread_, bool connect_, - class socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_) : - own_t (io_thread_, options_), - io_object_t (io_thread_), - connect (connect_), - pipe (NULL), - incomplete_in (false), - pending (false), - engine (NULL), - socket (socket_), - io_thread (io_thread_), - has_linger_timer (false) -{ - if (protocol_) - protocol = protocol_; - if (address_) - address = address_; -} - -zmq::session_t::~session_t () -{ - zmq_assert (!pipe); - - // If there's still a pending linger timer, remove it. - if (has_linger_timer) { - cancel_timer (linger_timer_id); - has_linger_timer = false; - } - - // Close the engine. - if (engine) - engine->terminate (); -} - -void zmq::session_t::attach_pipe (pipe_t *pipe_) -{ - zmq_assert (!is_terminating ()); - zmq_assert (!pipe); - zmq_assert (pipe_); - pipe = pipe_; - pipe->set_event_sink (this); -} - -bool zmq::session_t::read (msg_t *msg_) -{ - if (!pipe) - return false; - - if (!pipe->read (msg_)) - return false; - - incomplete_in = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; - return true; -} - -bool zmq::session_t::write (msg_t *msg_) -{ - if (pipe && pipe->write (msg_)) { - int rc = msg_->init (); - errno_assert (rc == 0); - return true; - } - - return false; -} - -void zmq::session_t::flush () -{ - if (pipe) - pipe->flush (); -} - -void zmq::session_t::clean_pipes () -{ - if (pipe) { - - // Get rid of half-processed messages in the out pipe. Flush any - // unflushed messages upstream. - pipe->rollback (); - pipe->flush (); - - // Remove any half-read message from the in pipe. - while (incomplete_in) { - msg_t msg; - int rc = msg.init (); - errno_assert (rc == 0); - if (!read (&msg)) { - zmq_assert (!incomplete_in); - break; - } - rc = msg.close (); - errno_assert (rc == 0); - } - } -} - -void zmq::session_t::terminated (pipe_t *pipe_) -{ - // Drop the reference to the deallocated pipe. - zmq_assert (pipe == pipe_); - pipe = NULL; - - // If we are waiting for pending messages to be sent, at this point - // we are sure that there will be no more messages and we can proceed - // with termination safely. - if (pending) - proceed_with_term (); -} - -void zmq::session_t::read_activated (pipe_t *pipe_) -{ - zmq_assert (pipe == pipe_); - - if (likely (engine != NULL)) - engine->activate_out (); - else - pipe->check_read (); -} - -void zmq::session_t::write_activated (pipe_t *pipe_) -{ - zmq_assert (pipe == pipe_); - - if (engine) - engine->activate_in (); -} - -void zmq::session_t::hiccuped (pipe_t *pipe_) -{ - // Hiccups are always sent from session to socket, not the other - // way round. - zmq_assert (false); -} - -void zmq::session_t::process_plug () -{ - if (connect) - start_connecting (false); -} - -void zmq::session_t::process_attach (i_engine *engine_) -{ - // If some other object (e.g. init) notifies us that the connection failed - // without creating an engine we need to start the reconnection process. - if (!engine_) { - zmq_assert (!engine); - detached (); - return; - } - - // Create the pipe if it does not exist yet. - if (!pipe && !is_terminating ()) { - object_t *parents [2] = {this, socket}; - pipe_t *pipes [2] = {NULL, NULL}; - int hwms [2] = {options.rcvhwm, options.sndhwm}; - bool delays [2] = {options.delay_on_close, options.delay_on_disconnect}; - int rc = pipepair (parents, pipes, hwms, delays); - errno_assert (rc == 0); - - // Plug the local end of the pipe. - pipes [0]->set_event_sink (this); - - // Remember the local end of the pipe. - zmq_assert (!pipe); - pipe = pipes [0]; - - // Ask socket to plug into the remote end of the pipe. - send_bind (socket, pipes [1]); - } - - // Plug in the engine. - zmq_assert (!engine); - engine = engine_; - engine->plug (io_thread, this); -} - -void zmq::session_t::detach () -{ - // Engine is dead. Let's forget about it. - engine = NULL; - - // Remove any half-done messages from the pipes. - clean_pipes (); - - // Send the event to the derived class. - detached (); - - // Just in case there's only a delimiter in the pipe. - if (pipe) - pipe->check_read (); -} - -void zmq::session_t::process_term (int linger_) -{ - zmq_assert (!pending); - - // If the termination of the pipe happens before the term command is - // delivered there's nothing much to do. We can proceed with the - // stadard termination immediately. - if (!pipe) { - proceed_with_term (); - return; - } - - pending = true; - - // If there's finite linger value, delay the termination. - // If linger is infinite (negative) we don't even have to set - // the timer. - if (linger_ > 0) { - zmq_assert (!has_linger_timer); - add_timer (linger_, linger_timer_id); - has_linger_timer = true; - } - - // Start pipe termination process. Delay the termination till all messages - // are processed in case the linger time is non-zero. - pipe->terminate (linger_ != 0); - - // TODO: Should this go into pipe_t::terminate ? - // In case there's no engine and there's only delimiter in the - // pipe it wouldn't be ever read. Thus we check for it explicitly. - pipe->check_read (); -} - -void zmq::session_t::proceed_with_term () -{ - // The pending phase have just ended. - pending = false; - - // Continue with standard termination. - own_t::process_term (0); -} - -void zmq::session_t::timer_event (int id_) -{ - // Linger period expired. We can proceed with termination even though - // there are still pending messages to be sent. - zmq_assert (id_ == linger_timer_id); - has_linger_timer = false; - - // Ask pipe to terminate even though there may be pending messages in it. - zmq_assert (pipe); - pipe->terminate (false); -} - -void zmq::session_t::detached () -{ - // Transient session self-destructs after peer disconnects. - if (!connect) { - terminate (); - return; - } - - // Reconnect. - start_connecting (true); - - // For subscriber sockets we hiccup the inbound pipe, which will cause - // the socket object to resend all the subscriptions. - if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) - pipe->hiccup (); -} - -void zmq::session_t::start_connecting (bool wait_) -{ - zmq_assert (connect); - - // Choose I/O thread to run connecter in. Given that we are already - // running in an I/O thread, there must be at least one available. - io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_assert (io_thread); - - // Create the connecter object. - - if (protocol == "tcp") { - tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t ( - io_thread, this, options, address.c_str (), wait_); - alloc_assert (connecter); - launch_child (connecter); - return; - } - -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS - if (protocol == "ipc") { - ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t ( - io_thread, this, options, address.c_str (), wait_); - alloc_assert (connecter); - launch_child (connecter); - return; - } -#endif - -#if defined ZMQ_HAVE_VTCP - if (protocol == "vtcp") { - - vtcp_connecter_t *connecter = new (std::nothrow) vtcp_connecter_t ( - io_thread, this, options, address.c_str (), - wait_); - alloc_assert (connecter); - launch_child (connecter); - return; - } -#endif - -#if defined ZMQ_HAVE_OPENPGM - - // Both PGM and EPGM transports are using the same infrastructure. - if (protocol == "pgm" || protocol == "epgm") { - - // For EPGM transport with UDP encapsulation of PGM is used. - bool udp_encapsulation = (protocol == "epgm"); - - // At this point we'll create message pipes to the session straight - // away. There's no point in delaying it as no concept of 'connect' - // exists with PGM anyway. - if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { - - // PGM sender. - pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( - io_thread, options); - alloc_assert (pgm_sender); - - int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); - zmq_assert (rc == 0); - - send_attach (this, pgm_sender); - } - else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) { - - // PGM receiver. - pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( - io_thread, options); - alloc_assert (pgm_receiver); - - int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); - zmq_assert (rc == 0); - - send_attach (this, pgm_receiver); - } - else - zmq_assert (false); - - return; - } -#endif - - zmq_assert (false); -} - diff --git a/src/session.hpp b/src/session.hpp deleted file mode 100644 index a155357..0000000 --- a/src/session.hpp +++ /dev/null @@ -1,124 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_SESSION_HPP_INCLUDED__ -#define __ZMQ_SESSION_HPP_INCLUDED__ - -#include - -#include "own.hpp" -#include "i_engine.hpp" -#include "io_object.hpp" -#include "pipe.hpp" - -namespace zmq -{ - - class session_t : - public own_t, - public io_object_t, - public i_pipe_events - { - public: - - session_t (class io_thread_t *io_thread_, bool connect_, - class socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_); - - // To be used once only, when creating the session. - void attach_pipe (class pipe_t *pipe_); - - // Following functions are the interface exposed towards the engine. - bool read (msg_t *msg_); - bool write (msg_t *msg_); - void flush (); - void detach (); - - // i_pipe_events interface implementation. - void read_activated (class pipe_t *pipe_); - void write_activated (class pipe_t *pipe_); - void hiccuped (class pipe_t *pipe_); - void terminated (class pipe_t *pipe_); - - private: - - ~session_t (); - - void start_connecting (bool wait_); - - void detached (); - - // Handlers for incoming commands. - void process_plug (); - void process_attach (struct i_engine *engine_); - void process_term (int linger_); - - // i_poll_events handlers. - void timer_event (int id_); - - // Remove any half processed messages. Flush unflushed messages. - // Call this function when engine disconnect to get rid of leftovers. - void clean_pipes (); - - // Call this function to move on with the delayed process_term. - void proceed_with_term (); - - // If true, this session (re)connects to the peer. Otherwise, it's - // a transient session created by the listener. - bool connect; - - // Pipe connecting the session to its socket. - class pipe_t *pipe; - - // This flag is true if the remainder of the message being processed - // is still in the in pipe. - bool incomplete_in; - - // True if termination have been suspended to push the pending - // messages to the network. - bool pending; - - // The protocol I/O engine connected to the session. - struct i_engine *engine; - - // The socket the session belongs to. - class socket_base_t *socket; - - // I/O thread the session is living in. It will be used to plug in - // the engines into the same thread. - class io_thread_t *io_thread; - - // ID of the linger timer - enum {linger_timer_id = 0x20}; - - // True is linger timer is running. - bool has_linger_timer; - - // Protocol and address to use when connecting. - std::string protocol; - std::string address; - - session_t (const session_t&); - const session_t &operator = (const session_t&); - }; - -} - -#endif diff --git a/src/session_base.cpp b/src/session_base.cpp new file mode 100644 index 0000000..7d4c5ab --- /dev/null +++ b/src/session_base.cpp @@ -0,0 +1,457 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "session_base.hpp" +#include "socket_base.hpp" +#include "i_engine.hpp" +#include "err.hpp" +#include "pipe.hpp" +#include "likely.hpp" +#include "tcp_connecter.hpp" +#include "ipc_connecter.hpp" +#include "vtcp_connecter.hpp" +#include "pgm_sender.hpp" +#include "pgm_receiver.hpp" + +#include "req.hpp" +#include "xreq.hpp" +#include "rep.hpp" +#include "xrep.hpp" +#include "pub.hpp" +#include "xpub.hpp" +#include "sub.hpp" +#include "xsub.hpp" +#include "push.hpp" +#include "pull.hpp" +#include "router.hpp" +#include "pair.hpp" + +zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, + bool connect_, class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) +{ + session_base_t *s = NULL; + switch (options_.type) { + case ZMQ_REQ: + s = new (std::nothrow) req_session_t (io_thread_, connect_, + socket_, options_, protocol_, address_); + break; + case ZMQ_XREQ: + s = new (std::nothrow) xreq_session_t (io_thread_, connect_, + socket_, options_, protocol_, address_); + case ZMQ_REP: + s = new (std::nothrow) rep_session_t (io_thread_, connect_, + socket_, options_, protocol_, address_); + break; + case ZMQ_XREP: + s = new (std::nothrow) xrep_session_t (io_thread_, connect_, + socket_, options_, protocol_, address_); + break; + case ZMQ_PUB: + s = new (std::nothrow) pub_session_t (io_thread_, connect_, + socket_, options_, protocol_, address_); + break; + case ZMQ_XPUB: + s = new (std::nothrow) xpub_session_t (io_thread_, connect_, + socket_, options_, protocol_, address_); + break; + case ZMQ_SUB: + s = new (std::nothrow) sub_session_t (io_thread_, connect_, + socket_, options_, protocol_, address_); + break; + case ZMQ_XSUB: + s = new (std::nothrow) xsub_session_t (io_thread_, connect_, + socket_, options_, protocol_, address_); + break; + case ZMQ_PUSH: + s = new (std::nothrow) push_session_t (io_thread_, connect_, + socket_, options_, protocol_, address_); + break; + case ZMQ_PULL: + s = new (std::nothrow) pull_session_t (io_thread_, connect_, + socket_, options_, protocol_, address_); + break; + case ZMQ_ROUTER: + s = new (std::nothrow) router_session_t (io_thread_, connect_, + socket_, options_, protocol_, address_); + break; + case ZMQ_PAIR: + s = new (std::nothrow) pair_session_t (io_thread_, connect_, + socket_, options_, protocol_, address_); + break; + default: + errno = EINVAL; + return NULL; + } + alloc_assert (s); + return s; +} + +zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, + bool connect_, class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + own_t (io_thread_, options_), + io_object_t (io_thread_), + connect (connect_), + pipe (NULL), + incomplete_in (false), + pending (false), + engine (NULL), + socket (socket_), + io_thread (io_thread_), + has_linger_timer (false) +{ + if (protocol_) + protocol = protocol_; + if (address_) + address = address_; +} + +zmq::session_base_t::~session_base_t () +{ + zmq_assert (!pipe); + + // If there's still a pending linger timer, remove it. + if (has_linger_timer) { + cancel_timer (linger_timer_id); + has_linger_timer = false; + } + + // Close the engine. + if (engine) + engine->terminate (); +} + +void zmq::session_base_t::attach_pipe (pipe_t *pipe_) +{ + zmq_assert (!is_terminating ()); + zmq_assert (!pipe); + zmq_assert (pipe_); + pipe = pipe_; + pipe->set_event_sink (this); +} + +bool zmq::session_base_t::read (msg_t *msg_) +{ + if (!pipe) + return false; + + if (!pipe->read (msg_)) + return false; + + incomplete_in = + msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + return true; +} + +bool zmq::session_base_t::write (msg_t *msg_) +{ + if (pipe && pipe->write (msg_)) { + int rc = msg_->init (); + errno_assert (rc == 0); + return true; + } + + return false; +} + +void zmq::session_base_t::flush () +{ + if (pipe) + pipe->flush (); +} + +void zmq::session_base_t::clean_pipes () +{ + if (pipe) { + + // Get rid of half-processed messages in the out pipe. Flush any + // unflushed messages upstream. + pipe->rollback (); + pipe->flush (); + + // Remove any half-read message from the in pipe. + while (incomplete_in) { + msg_t msg; + int rc = msg.init (); + errno_assert (rc == 0); + if (!read (&msg)) { + zmq_assert (!incomplete_in); + break; + } + rc = msg.close (); + errno_assert (rc == 0); + } + } +} + +void zmq::session_base_t::terminated (pipe_t *pipe_) +{ + // Drop the reference to the deallocated pipe. + zmq_assert (pipe == pipe_); + pipe = NULL; + + // If we are waiting for pending messages to be sent, at this point + // we are sure that there will be no more messages and we can proceed + // with termination safely. + if (pending) + proceed_with_term (); +} + +void zmq::session_base_t::read_activated (pipe_t *pipe_) +{ + zmq_assert (pipe == pipe_); + + if (likely (engine != NULL)) + engine->activate_out (); + else + pipe->check_read (); +} + +void zmq::session_base_t::write_activated (pipe_t *pipe_) +{ + zmq_assert (pipe == pipe_); + + if (engine) + engine->activate_in (); +} + +void zmq::session_base_t::hiccuped (pipe_t *pipe_) +{ + // Hiccups are always sent from session to socket, not the other + // way round. + zmq_assert (false); +} + +void zmq::session_base_t::process_plug () +{ + if (connect) + start_connecting (false); +} + +void zmq::session_base_t::process_attach (i_engine *engine_) +{ + // If some other object (e.g. init) notifies us that the connection failed + // without creating an engine we need to start the reconnection process. + if (!engine_) { + zmq_assert (!engine); + detached (); + return; + } + + // Create the pipe if it does not exist yet. + if (!pipe && !is_terminating ()) { + object_t *parents [2] = {this, socket}; + pipe_t *pipes [2] = {NULL, NULL}; + int hwms [2] = {options.rcvhwm, options.sndhwm}; + bool delays [2] = {options.delay_on_close, options.delay_on_disconnect}; + int rc = pipepair (parents, pipes, hwms, delays); + errno_assert (rc == 0); + + // Plug the local end of the pipe. + pipes [0]->set_event_sink (this); + + // Remember the local end of the pipe. + zmq_assert (!pipe); + pipe = pipes [0]; + + // Ask socket to plug into the remote end of the pipe. + send_bind (socket, pipes [1]); + } + + // Plug in the engine. + zmq_assert (!engine); + engine = engine_; + engine->plug (io_thread, this); +} + +void zmq::session_base_t::detach () +{ + // Engine is dead. Let's forget about it. + engine = NULL; + + // Remove any half-done messages from the pipes. + clean_pipes (); + + // Send the event to the derived class. + detached (); + + // Just in case there's only a delimiter in the pipe. + if (pipe) + pipe->check_read (); +} + +void zmq::session_base_t::process_term (int linger_) +{ + zmq_assert (!pending); + + // If the termination of the pipe happens before the term command is + // delivered there's nothing much to do. We can proceed with the + // stadard termination immediately. + if (!pipe) { + proceed_with_term (); + return; + } + + pending = true; + + // If there's finite linger value, delay the termination. + // If linger is infinite (negative) we don't even have to set + // the timer. + if (linger_ > 0) { + zmq_assert (!has_linger_timer); + add_timer (linger_, linger_timer_id); + has_linger_timer = true; + } + + // Start pipe termination process. Delay the termination till all messages + // are processed in case the linger time is non-zero. + pipe->terminate (linger_ != 0); + + // TODO: Should this go into pipe_t::terminate ? + // In case there's no engine and there's only delimiter in the + // pipe it wouldn't be ever read. Thus we check for it explicitly. + pipe->check_read (); +} + +void zmq::session_base_t::proceed_with_term () +{ + // The pending phase have just ended. + pending = false; + + // Continue with standard termination. + own_t::process_term (0); +} + +void zmq::session_base_t::timer_event (int id_) +{ + // Linger period expired. We can proceed with termination even though + // there are still pending messages to be sent. + zmq_assert (id_ == linger_timer_id); + has_linger_timer = false; + + // Ask pipe to terminate even though there may be pending messages in it. + zmq_assert (pipe); + pipe->terminate (false); +} + +void zmq::session_base_t::detached () +{ + // Transient session self-destructs after peer disconnects. + if (!connect) { + terminate (); + return; + } + + // Reconnect. + start_connecting (true); + + // For subscriber sockets we hiccup the inbound pipe, which will cause + // the socket object to resend all the subscriptions. + if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) + pipe->hiccup (); +} + +void zmq::session_base_t::start_connecting (bool wait_) +{ + zmq_assert (connect); + + // Choose I/O thread to run connecter in. Given that we are already + // running in an I/O thread, there must be at least one available. + io_thread_t *io_thread = choose_io_thread (options.affinity); + zmq_assert (io_thread); + + // Create the connecter object. + + if (protocol == "tcp") { + tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t ( + io_thread, this, options, address.c_str (), wait_); + alloc_assert (connecter); + launch_child (connecter); + return; + } + +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS + if (protocol == "ipc") { + ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t ( + io_thread, this, options, address.c_str (), wait_); + alloc_assert (connecter); + launch_child (connecter); + return; + } +#endif + +#if defined ZMQ_HAVE_VTCP + if (protocol == "vtcp") { + + vtcp_connecter_t *connecter = new (std::nothrow) vtcp_connecter_t ( + io_thread, this, options, address.c_str (), + wait_); + alloc_assert (connecter); + launch_child (connecter); + return; + } +#endif + +#if defined ZMQ_HAVE_OPENPGM + + // Both PGM and EPGM transports are using the same infrastructure. + if (protocol == "pgm" || protocol == "epgm") { + + // For EPGM transport with UDP encapsulation of PGM is used. + bool udp_encapsulation = (protocol == "epgm"); + + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with PGM anyway. + if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { + + // PGM sender. + pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( + io_thread, options); + alloc_assert (pgm_sender); + + int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); + zmq_assert (rc == 0); + + send_attach (this, pgm_sender); + } + else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) { + + // PGM receiver. + pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( + io_thread, options); + alloc_assert (pgm_receiver); + + int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); + zmq_assert (rc == 0); + + send_attach (this, pgm_receiver); + } + else + zmq_assert (false); + + return; + } +#endif + + zmq_assert (false); +} + diff --git a/src/session_base.hpp b/src/session_base.hpp new file mode 100644 index 0000000..175a11d --- /dev/null +++ b/src/session_base.hpp @@ -0,0 +1,131 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_SESSION_BASE_HPP_INCLUDED__ +#define __ZMQ_SESSION_BASE_HPP_INCLUDED__ + +#include + +#include "own.hpp" +#include "i_engine.hpp" +#include "io_object.hpp" +#include "pipe.hpp" + +namespace zmq +{ + + class session_base_t : + public own_t, + public io_object_t, + public i_pipe_events + { + public: + + // Create a session of the particular type. + static session_base_t *create (class io_thread_t *io_thread_, + bool connect_, class socket_base_t *socket_, + const options_t &options_, const char *protocol_, + const char *address_); + + // To be used once only, when creating the session. + void attach_pipe (class pipe_t *pipe_); + + // Following functions are the interface exposed towards the engine. + bool read (msg_t *msg_); + bool write (msg_t *msg_); + void flush (); + void detach (); + + // i_pipe_events interface implementation. + void read_activated (class pipe_t *pipe_); + void write_activated (class pipe_t *pipe_); + void hiccuped (class pipe_t *pipe_); + void terminated (class pipe_t *pipe_); + + protected: + + session_base_t (class io_thread_t *io_thread_, bool connect_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~session_base_t (); + + private: + + void start_connecting (bool wait_); + + void detached (); + + // Handlers for incoming commands. + void process_plug (); + void process_attach (struct i_engine *engine_); + void process_term (int linger_); + + // i_poll_events handlers. + void timer_event (int id_); + + // Remove any half processed messages. Flush unflushed messages. + // Call this function when engine disconnect to get rid of leftovers. + void clean_pipes (); + + // Call this function to move on with the delayed process_term. + void proceed_with_term (); + + // If true, this session (re)connects to the peer. Otherwise, it's + // a transient session created by the listener. + bool connect; + + // Pipe connecting the session to its socket. + class pipe_t *pipe; + + // This flag is true if the remainder of the message being processed + // is still in the in pipe. + bool incomplete_in; + + // True if termination have been suspended to push the pending + // messages to the network. + bool pending; + + // The protocol I/O engine connected to the session. + struct i_engine *engine; + + // The socket the session belongs to. + class socket_base_t *socket; + + // I/O thread the session is living in. It will be used to plug in + // the engines into the same thread. + class io_thread_t *io_thread; + + // ID of the linger timer + enum {linger_timer_id = 0x20}; + + // True is linger timer is running. + bool has_linger_timer; + + // Protocol and address to use when connecting. + std::string protocol; + std::string address; + + session_base_t (const session_base_t&); + const session_base_t &operator = (const session_base_t&); + }; + +} + +#endif diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 4209a69..a4d89db 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -39,7 +39,7 @@ #include "vtcp_listener.hpp" #include "tcp_connecter.hpp" #include "io_thread.hpp" -#include "session.hpp" +#include "session_base.hpp" #include "config.hpp" #include "clock.hpp" #include "pipe.hpp" @@ -480,9 +480,9 @@ int zmq::socket_base_t::connect (const char *addr_) } // Create session. - session_t *session = new (std::nothrow) session_t ( - io_thread, true, this, options, protocol.c_str (), address.c_str ()); - alloc_assert (session); + session_base_t *session = session_base_t::create (io_thread, true, this, + options, protocol.c_str (), address.c_str ()); + errno_assert (session); // Create a bi-directional pipe. object_t *parents [2] = {this, session}; diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 15e7c21..2647795 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -36,7 +36,7 @@ #include "stream_engine.hpp" #include "io_thread.hpp" -#include "session.hpp" +#include "session_base.hpp" #include "config.hpp" #include "err.hpp" #include "ip.hpp" @@ -102,7 +102,8 @@ zmq::stream_engine_t::~stream_engine_t () } } -void zmq::stream_engine_t::plug (io_thread_t *io_thread_, session_t *session_) +void zmq::stream_engine_t::plug (io_thread_t *io_thread_, + session_base_t *session_) { zmq_assert (!plugged); plugged = true; diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index ac9a5be..92fc55f 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -44,7 +44,8 @@ namespace zmq ~stream_engine_t (); // i_engine interface implementation. - void plug (class io_thread_t *io_thread_, class session_t *session_); + void plug (class io_thread_t *io_thread_, + class session_base_t *session_); void unplug (); void terminate (); void activate_in (); @@ -84,10 +85,10 @@ namespace zmq encoder_t encoder; // The session this engine is attached to. - class session_t *session; + class session_base_t *session; // Detached transient session. - class session_t *leftover_session; + class session_base_t *leftover_session; options_t options; diff --git a/src/sub.cpp b/src/sub.cpp index 81082a2..d9f2f2e 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -79,3 +79,15 @@ bool zmq::sub_t::xhas_out () return false; } +zmq::sub_session_t::sub_session_t (io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + xsub_session_t (io_thread_, connect_, socket_, options_, protocol_, + address_) +{ +} + +zmq::sub_session_t::~sub_session_t () +{ +} + diff --git a/src/sub.hpp b/src/sub.hpp index b5980ba..7d3cf0b 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -45,6 +45,21 @@ namespace zmq const sub_t &operator = (const sub_t&); }; + class sub_session_t : public xsub_session_t + { + public: + + sub_session_t (class io_thread_t *io_thread_, bool connect_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~sub_session_t (); + + private: + + sub_session_t (const sub_session_t&); + const sub_session_t &operator = (const sub_session_t&); + }; + } #endif diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 487f4f5..fe99252 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -46,7 +46,7 @@ #endif zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, - class session_t *session_, const options_t &options_, + class session_base_t *session_, const options_t &options_, const char *address_, bool wait_) : own_t (io_thread_, options_), io_object_t (io_thread_), diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index 17f8a70..d1a93cd 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -37,7 +37,7 @@ namespace zmq // If 'delay' is true connecter first waits for a while, then starts // connection process. tcp_connecter_t (class io_thread_t *io_thread_, - class session_t *session_, const options_t &options_, + class session_base_t *session_, const options_t &options_, const char *address_, bool delay_); ~tcp_connecter_t (); @@ -97,7 +97,7 @@ namespace zmq bool wait; // Reference to the session we belong to. - class session_t *session; + class session_base_t *session; // Current reconnect ivl, updated for backoff strategy int current_reconnect_ivl; diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index da476a4..9b6068c 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -26,7 +26,7 @@ #include "tcp_listener.hpp" #include "stream_engine.hpp" #include "io_thread.hpp" -#include "session.hpp" +#include "session_base.hpp" #include "config.hpp" #include "err.hpp" #include "ip.hpp" @@ -97,9 +97,9 @@ void zmq::tcp_listener_t::in_event () zmq_assert (io_thread); // Create and launch a session object. - session_t *session = new (std::nothrow) - session_t (io_thread, false, socket, options, NULL, NULL); - alloc_assert (session); + session_base_t *session = session_base_t::create (io_thread, false, socket, + options, NULL, NULL); + errno_assert (session); session->inc_seqnum (); launch_child (session); send_attach (session, engine, false); diff --git a/src/vtcp_connecter.cpp b/src/vtcp_connecter.cpp index 3d2900e..5dc147e 100644 --- a/src/vtcp_connecter.cpp +++ b/src/vtcp_connecter.cpp @@ -50,7 +50,7 @@ #endif zmq::vtcp_connecter_t::vtcp_connecter_t (class io_thread_t *io_thread_, - class session_t *session_, const options_t &options_, + class session_base_t *session_, const options_t &options_, const char *address_, bool wait_) : own_t (io_thread_, options_), io_object_t (io_thread_), diff --git a/src/vtcp_connecter.hpp b/src/vtcp_connecter.hpp index f467b5f..fe5260e 100644 --- a/src/vtcp_connecter.hpp +++ b/src/vtcp_connecter.hpp @@ -43,7 +43,7 @@ namespace zmq // If 'delay' is true connecter first waits for a while, then starts // connection process. vtcp_connecter_t (class io_thread_t *io_thread_, - class session_t *session_, const options_t &options_, + class session_base_t *session_, const options_t &options_, const char *address_, bool delay_); ~vtcp_connecter_t (); @@ -104,7 +104,7 @@ namespace zmq bool wait; // Reference to the session we belong to. - class session_t *session; + class session_base_t *session; // Current reconnect ivl, updated for backoff strategy int current_reconnect_ivl; diff --git a/src/vtcp_listener.cpp b/src/vtcp_listener.cpp index b394833..7e496e5 100644 --- a/src/vtcp_listener.cpp +++ b/src/vtcp_listener.cpp @@ -27,7 +27,7 @@ #include #include "stream_engine.hpp" -#include "session.hpp" +#include "session_base.hpp" #include "stdint.hpp" #include "err.hpp" #include "ip.hpp" @@ -113,8 +113,8 @@ void zmq::vtcp_listener_t::in_event () zmq_assert (io_thread); // Create and launch a session object. - session_t *session = new (std::nothrow) - session_t (io_thread, false, socket, options, NULL, NULL); + session_base_t *session = session_base_t::create (io_thread, false, socket, + options, NULL, NULL); alloc_assert (session); session->inc_seqnum (); launch_child (session); diff --git a/src/xpub.cpp b/src/xpub.cpp index 8da9cf9..a245fea 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -169,3 +169,15 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, } } +zmq::xpub_session_t::xpub_session_t (io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + session_base_t (io_thread_, connect_, socket_, options_, protocol_, + address_) +{ +} + +zmq::xpub_session_t::~xpub_session_t () +{ +} + diff --git a/src/xpub.hpp b/src/xpub.hpp index 001fa2d..b410e6c 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -25,6 +25,7 @@ #include #include "socket_base.hpp" +#include "session_base.hpp" #include "mtrie.hpp" #include "array.hpp" #include "dist.hpp" @@ -79,6 +80,21 @@ namespace zmq const xpub_t &operator = (const xpub_t&); }; + class xpub_session_t : public session_base_t + { + public: + + xpub_session_t (class io_thread_t *io_thread_, bool connect_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~xpub_session_t (); + + private: + + xpub_session_t (const xpub_session_t&); + const xpub_session_t &operator = (const xpub_session_t&); + }; + } #endif diff --git a/src/xrep.cpp b/src/xrep.cpp index a11b8c1..c304463 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -243,5 +243,15 @@ bool zmq::xrep_t::xhas_out () return true; } +zmq::xrep_session_t::xrep_session_t (io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + session_base_t (io_thread_, connect_, socket_, options_, protocol_, + address_) +{ +} +zmq::xrep_session_t::~xrep_session_t () +{ +} diff --git a/src/xrep.hpp b/src/xrep.hpp index 07f10ba..562f87d 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -24,6 +24,7 @@ #include #include "socket_base.hpp" +#include "session_base.hpp" #include "stdint.hpp" #include "msg.hpp" #include "fq.hpp" @@ -93,6 +94,21 @@ namespace zmq const xrep_t &operator = (const xrep_t&); }; + class xrep_session_t : public session_base_t + { + public: + + xrep_session_t (class io_thread_t *io_thread_, bool connect_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~xrep_session_t (); + + private: + + xrep_session_t (const xrep_session_t&); + const xrep_session_t &operator = (const xrep_session_t&); + }; + } #endif diff --git a/src/xreq.cpp b/src/xreq.cpp index 7b66137..79b3b94 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -79,3 +79,15 @@ void zmq::xreq_t::xterminated (pipe_t *pipe_) lb.terminated (pipe_); } +zmq::xreq_session_t::xreq_session_t (io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + session_base_t (io_thread_, connect_, socket_, options_, protocol_, + address_) +{ +} + +zmq::xreq_session_t::~xreq_session_t () +{ +} + diff --git a/src/xreq.hpp b/src/xreq.hpp index a427ba9..d7e28c4 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -23,6 +23,7 @@ #define __ZMQ_XREQ_HPP_INCLUDED__ #include "socket_base.hpp" +#include "session_base.hpp" #include "fq.hpp" #include "lb.hpp" @@ -60,6 +61,21 @@ namespace zmq const xreq_t &operator = (const xreq_t&); }; + class xreq_session_t : public session_base_t + { + public: + + xreq_session_t (class io_thread_t *io_thread_, bool connect_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~xreq_session_t (); + + private: + + xreq_session_t (const xreq_session_t&); + const xreq_session_t &operator = (const xreq_session_t&); + }; + } #endif diff --git a/src/xsub.cpp b/src/xsub.cpp index 4122c67..b24f082 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -213,4 +213,15 @@ void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_, zmq_assert (sent); } +zmq::xsub_session_t::xsub_session_t (io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + session_base_t (io_thread_, connect_, socket_, options_, protocol_, + address_) +{ +} + +zmq::xsub_session_t::~xsub_session_t () +{ +} diff --git a/src/xsub.hpp b/src/xsub.hpp index ea59cdb..310df6e 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -22,6 +22,7 @@ #define __ZMQ_XSUB_HPP_INCLUDED__ #include "socket_base.hpp" +#include "session_base.hpp" #include "dist.hpp" #include "fq.hpp" #include "trie.hpp" @@ -83,6 +84,21 @@ namespace zmq const xsub_t &operator = (const xsub_t&); }; + class xsub_session_t : public session_base_t + { + public: + + xsub_session_t (class io_thread_t *io_thread_, bool connect_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~xsub_session_t (); + + private: + + xsub_session_t (const xsub_session_t&); + const xsub_session_t &operator = (const xsub_session_t&); + }; + } #endif -- cgit v1.2.3