diff options
Diffstat (limited to 'src')
51 files changed, 1441 insertions, 994 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 937372f..d7509dc 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -55,6 +55,7 @@ libzmq_la_SOURCES = \ blob.hpp \ command.hpp \ config.hpp \ + connect_session.hpp \ ctx.hpp \ decoder.hpp \ devpoll.hpp \ @@ -70,15 +71,17 @@ libzmq_la_SOURCES = \ ip.hpp \ i_engine.hpp \ i_poll_events.hpp \ + i_terminate_events.hpp \ kqueue.hpp \ lb.hpp \ likely.hpp \ msg_content.hpp \ msg_store.hpp \ mutex.hpp \ + named_session.hpp \ object.hpp \ options.hpp \ - owned.hpp \ + own.hpp \ pgm_receiver.hpp \ pgm_sender.hpp \ pgm_socket.hpp \ @@ -106,6 +109,7 @@ libzmq_la_SOURCES = \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ + transient_session.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ @@ -123,6 +127,7 @@ libzmq_la_SOURCES = \ zmq_listener.hpp \ command.cpp \ ctx.cpp \ + connect_session.cpp \ devpoll.cpp \ epoll.cpp \ err.cpp \ @@ -134,9 +139,10 @@ libzmq_la_SOURCES = \ kqueue.cpp \ lb.cpp \ msg_store.cpp \ + named_session.cpp \ object.cpp \ options.cpp \ - owned.cpp \ + own.cpp \ pair.cpp \ pgm_receiver.cpp \ pgm_sender.cpp \ @@ -160,6 +166,7 @@ libzmq_la_SOURCES = \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ + transient_session.cpp \ uuid.cpp \ xrep.cpp \ xreq.cpp \ diff --git a/src/command.hpp b/src/command.hpp index 3d00cd7..a924b4e 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -61,7 +61,7 @@ namespace zmq // Sent to socket to let it know about the newly created object. struct { - class owned_t *object; + class own_t *object; } own; // Attach the engine to the session. @@ -104,7 +104,7 @@ namespace zmq // Sent by I/O object ot the socket to request the shutdown of // the I/O object. struct { - class owned_t *object; + class own_t *object; } term_req; // Sent by socket to I/O object to start its shutdown. diff --git a/src/connect_session.cpp b/src/connect_session.cpp new file mode 100644 index 0000000..5c088f6 --- /dev/null +++ b/src/connect_session.cpp @@ -0,0 +1,115 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "connect_session.hpp" +#include "zmq_connecter.hpp" + +zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + session_t (io_thread_, socket_, options_), + protocol (protocol_), + address (address_) +{ +} + +zmq::connect_session_t::~connect_session_t () +{ +} + +void zmq::connect_session_t::process_plug () +{ + // Start connection process immediately. + start_connecting (); +} + +void zmq::connect_session_t::start_connecting () +{ + // Create the connecter object. + + // Both TCP and IPC transports are using the same infrastructure. + if (protocol == "tcp" || protocol == "ipc") { + zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( + choose_io_thread (options.affinity), this, options, + protocol.c_str (), address.c_str ()); + zmq_assert (connecter); + launch_child (connecter); + return; + } + +#if defined ZMQ_HAVE_OPENPGM + + // Both PGM and EPGM transports are using the same infrastructure. + if (addr_type == "pgm" || addr_type == "epgm") { + + // For EPGM transport with UDP encapsulation of PGM is used. + bool udp_encapsulation = (addr_type == "epgm"); + + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with PGM anyway. + if (options.requires_out) { + + // PGM sender. + pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( + choose_io_thread (options.affinity), options); + zmq_assert (pgm_sender); + + int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ()); + if (rc != 0) { + delete pgm_sender; + return -1; + } + + send_attach (this, pgm_sender, blob_t ()); + } + else if (options.requires_in) { + + // PGM receiver. + pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( + choose_io_thread (options.affinity), options); + zmq_assert (pgm_receiver); + + int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ()); + if (rc != 0) { + delete pgm_receiver; + return -1; + } + + send_attach (this, pgm_receiver, blob_t ()); + } + else + zmq_assert (false); + + return; + } +#endif + + zmq_assert (false); +} + +void zmq::connect_session_t::detach () +{ + // Clean up the mess left over by the failed connection. + clean_pipes (); + + // Reconnect. + start_connecting (); +} + diff --git a/src/connect_session.hpp b/src/connect_session.hpp new file mode 100644 index 0000000..8303dda --- /dev/null +++ b/src/connect_session.hpp @@ -0,0 +1,60 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_CONNECT_SESSION_HPP_INCLUDED__ +#define __ZMQ_CONNECT_SESSION_HPP_INCLUDED__ + +#include <string> + +#include "session.hpp" + +namespace zmq +{ + + // Connect session contains an address to connect to. On disconnect it + // attempts to reconnect. + + class connect_session_t : public session_t + { + public: + + connect_session_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~connect_session_t (); + + // i_inout interface implementation. + void detach (); + + private: + + // Start the connection process. + void start_connecting (); + + // Command handlers. + void process_plug (); + + // Address to connect to. + std::string protocol; + std::string address; + }; + +} + +#endif diff --git a/src/ctx.cpp b/src/ctx.cpp index 91157a5..d096b91 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -119,6 +119,7 @@ int zmq::ctx_t::term () // We don't even have to synchronise access to data. zmq_assert (sockets.empty ()); +// TODO: We are accessing the list of zombies in unsynchronised manner here! // Get rid of remaining zombie sockets. while (!zombies.empty ()) { dezombify (); @@ -173,7 +174,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) return s; } -void zmq::ctx_t::zombify (socket_base_t *socket_) +void zmq::ctx_t::zombify_socket (socket_base_t *socket_) { // Zombification of socket basically means that its ownership is tranferred // from the application that created it to the context. @@ -284,7 +285,8 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_) void zmq::ctx_t::dezombify () { - // Try to dezombify each zombie in the list. + // Try to dezombify each zombie in the list. Note that caller is + // responsible for calling this method in the slot_sync critical section. for (zombies_t::size_type i = 0; i != zombies.size ();) if (zombies [i]->dezombify ()) { empty_slots.push_back (zombies [i]->get_slot ()); diff --git a/src/ctx.hpp b/src/ctx.hpp index cb9a2d9..c44cca6 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -58,7 +58,7 @@ namespace zmq class socket_base_t *create_socket (int type_); // Make socket a zombie. - void zombify (socket_base_t *socket_); + void zombify_socket (socket_base_t *socket_); // Send command to the destination slot. void send_command (uint32_t slot_, const command_t &command_); @@ -22,11 +22,14 @@ #include "fq.hpp" #include "pipe.hpp" #include "err.hpp" +#include "i_terminate_events.hpp" -zmq::fq_t::fq_t () : +zmq::fq_t::fq_t (i_terminate_events *sink_) : active (0), current (0), - more (false) + more (false), + sink (sink_), + terminating (false) { } @@ -42,6 +45,10 @@ void zmq::fq_t::attach (reader_t *pipe_) pipes.push_back (pipe_); pipes.swap (active, pipes.size () - 1); active++; + + // If we are already terminating, ask the pipe to terminate straight away. + if (terminating) + pipe_->terminate (); } void zmq::fq_t::terminated (reader_t *pipe_) @@ -59,15 +66,15 @@ void zmq::fq_t::terminated (reader_t *pipe_) current = 0; } pipes.erase (pipe_); -} -bool zmq::fq_t::has_pipes () -{ - return !pipes.empty (); + if (terminating && pipes.empty ()) + sink->terminated (); } -void zmq::fq_t::term_pipes () +void zmq::fq_t::terminate () { + terminating = true; + for (pipes_t::size_type i = 0; i != pipes.size (); i++) pipes [i]->terminate (); } @@ -33,12 +33,11 @@ namespace zmq { public: - fq_t (); + fq_t (struct i_terminate_events *sink_); ~fq_t (); void attach (reader_t *pipe_); - bool has_pipes (); - void term_pipes (); + void terminate (); int recv (zmq_msg_t *msg_, int flags_); bool has_in (); @@ -64,6 +63,12 @@ namespace zmq // there are following parts still waiting in the current pipe. bool more; + // Object to send events to. + i_terminate_events *sink; + + // If true, termination process is already underway. + bool terminating; + fq_t (const fq_t&); void operator = (const fq_t&); }; diff --git a/src/i_engine.hpp b/src/i_engine.hpp index ea6b850..0ba94f5 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -20,8 +20,6 @@ #ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__ #define __ZMQ_I_ENGINE_HPP_INCLUDED__ -#include <stddef.h> - namespace zmq { @@ -30,18 +28,19 @@ namespace zmq virtual ~i_engine () {} // Plug the engine to the session. - virtual void plug (struct i_inout *inout_) = 0; + virtual void plug (class io_thread_t *io_thread_, + struct i_inout *inout_) = 0; // Unplug the engine from the session. virtual void unplug () = 0; - // This method is called by the session to signalise that there - // are messages to send available. - virtual void revive () = 0; - // This method is called by the session to signalise that more // messages can be written to the pipe. - virtual void resume_input () = 0; + virtual void activate_in () = 0; + + // This method is called by the session to signalise that there + // are messages to send available. + virtual void activate_out () = 0; }; } diff --git a/src/i_inout.hpp b/src/i_inout.hpp index 21d1838..60bc518 100644 --- a/src/i_inout.hpp +++ b/src/i_inout.hpp @@ -31,28 +31,17 @@ namespace zmq { virtual ~i_inout () {} - // Engine asks to get a message to send to the network. + // Engine asks for a message to send to the network. virtual bool read (::zmq_msg_t *msg_) = 0; - // Engine sends the incoming message further on downstream. + // Engine received message from the network and sends it further on. virtual bool write (::zmq_msg_t *msg_) = 0; - // Flush all the previously written messages downstream. + // Flush all the previously written messages. virtual void flush () = 0; - - // Drop all the references to the engine. The parameter is the object - // to use to reconnect. If reconnection is not required, the argument - // is set to NULL. - virtual void detach (class owned_t *reconnecter_) = 0; - // Returns least loaded I/O thread. - virtual class io_thread_t *get_io_thread () = 0; - - // Return pointer to the owning socket. - virtual class socket_base_t *get_owner () = 0; - - // Return ordinal number of the session. - virtual uint64_t get_ordinal () = 0; + // Engine is dead. Drop all the references to it. + virtual void detach () = 0; }; } diff --git a/src/i_terminate_events.hpp b/src/i_terminate_events.hpp new file mode 100644 index 0000000..08599ff --- /dev/null +++ b/src/i_terminate_events.hpp @@ -0,0 +1,38 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_I_TERMINATE_EVENTS_HPP_INCLUDED__ +#define __ZMQ_I_TERMINATE_EVENTS_HPP_INCLUDED__ + +namespace zmq +{ + + // Algorithms such as fair queueing (fq_t) and load balancing (lb_t) + // use this interface to communicate termination event to the socket. + + struct i_terminate_events + { + virtual ~i_terminate_events () {} + + virtual void terminated () = 0; + }; + +} + +#endif diff --git a/src/io_object.cpp b/src/io_object.cpp index 086f173..b3b45ee 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -21,21 +21,35 @@ #include "io_thread.hpp" #include "err.hpp" -zmq::io_object_t::io_object_t (io_thread_t *io_thread_) +zmq::io_object_t::io_object_t (io_thread_t *io_thread_) : + poller (NULL) { - // Retrieve the poller from the thread we are running in. - poller = io_thread_->get_poller (); + if (io_thread_) + plug (io_thread_); } zmq::io_object_t::~io_object_t () { } -void zmq::io_object_t::set_io_thread (io_thread_t *io_thread_) +void zmq::io_object_t::plug (io_thread_t *io_thread_) { + zmq_assert (io_thread_); + zmq_assert (!poller); + + // Retrieve the poller from the thread we are running in. poller = io_thread_->get_poller (); } +void zmq::io_object_t::unplug () +{ + zmq_assert (poller); + + // Forget about old poller in preparation to be migrated + // to a different I/O thread. + poller = NULL; +} + zmq::io_object_t::handle_t zmq::io_object_t::add_fd (fd_t fd_) { return poller->add_fd (fd_, this); diff --git a/src/io_object.hpp b/src/io_object.hpp index 655e7f5..284e6d1 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -40,15 +40,15 @@ namespace zmq io_object_t (class io_thread_t *io_thread_ = NULL); ~io_object_t (); + // When migrating an object from one I/O thread to another, first + // unplug it, then migrate it, then plug it to the new thread. + void plug (class io_thread_t *io_thread_); + void unplug (); + protected: typedef poller_t::handle_t handle_t; - // Derived class can init/swap the underlying I/O thread. - // Caution: Remove all the file descriptors from the old I/O thread - // before swapping to the new one! - void set_io_thread (class io_thread_t *io_thread_); - // Methods to access underlying poller object. handle_t add_fd (fd_t fd_); void rm_fd (handle_t handle_); @@ -22,11 +22,14 @@ #include "lb.hpp" #include "pipe.hpp" #include "err.hpp" +#include "i_terminate_events.hpp" -zmq::lb_t::lb_t () : +zmq::lb_t::lb_t (i_terminate_events *sink_) : active (0), current (0), - more (false) + more (false), + sink (sink_), + terminating (false) { } @@ -42,17 +45,22 @@ void zmq::lb_t::attach (writer_t *pipe_) pipes.push_back (pipe_); pipes.swap (active, pipes.size () - 1); active++; + + if (terminating) + pipe_->terminate (); } -void zmq::lb_t::term_pipes () +void zmq::lb_t::terminate () { + terminating = true; + for (pipes_t::size_type i = 0; i != pipes.size (); i++) pipes [i]->terminate (); } void zmq::lb_t::terminated (writer_t *pipe_) { - // ??? + // TODO: ??? zmq_assert (!more || pipes [current] != pipe_); // Remove the pipe from the list; adjust number of active pipes @@ -63,11 +71,9 @@ void zmq::lb_t::terminated (writer_t *pipe_) current = 0; } pipes.erase (pipe_); -} -bool zmq::lb_t::has_pipes () -{ - return !pipes.empty (); + if (terminating && pipes.empty ()) + sink->terminated (); } void zmq::lb_t::activated (writer_t *pipe_) @@ -32,12 +32,11 @@ namespace zmq { public: - lb_t (); + lb_t (struct i_terminate_events *sink_); ~lb_t (); void attach (writer_t *pipe_); - void term_pipes (); - bool has_pipes (); + void terminate (); int send (zmq_msg_t *msg_, int flags_); bool has_out (); @@ -61,6 +60,12 @@ namespace zmq // True if last we are in the middle of a multipart message. bool more; + // Object to send events to. + struct i_terminate_events *sink; + + // If true, termination process is already underway. + bool terminating; + lb_t (const lb_t&); void operator = (const lb_t&); }; diff --git a/src/named_session.cpp b/src/named_session.cpp new file mode 100644 index 0000000..d219286 --- /dev/null +++ b/src/named_session.cpp @@ -0,0 +1,87 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "named_session.hpp" +#include "socket_base.hpp" + +/* +zmq::named_session_t::named_session_t (class io_thread_t *io_thread_, + socket_base_t *socket_, const options_t &options_, + const blob_t &name_) : + session_t (io_thread_, socket_, options_), + name (name_) +{ + // Make double sure that the session has valid name. + zmq_assert (!name.empty ()); + zmq_assert (name [0] != 0); + + if (!socket_->register_session (name, this)) { + + // TODO: There's already a session with the specified + // identity. We should log the error and drop the + // session. + zmq_assert (false); + } +} + +zmq::named_session_t::~named_session_t () +{ +} + +void zmq::named_session_t::detach () +{ + // TODO: + zmq_assert (false); +} + +void zmq::named_session_t::attached (const blob_t &peer_identity_) +{ + if (!peer_identity.empty ()) { + + // If both IDs are temporary, no checking is needed. + // TODO: Old ID should be reused in this case... + if (peer_identity.empty () || peer_identity [0] != 0 || + peer_identity_.empty () || peer_identity_ [0] != 0) { + + // If we already know the peer name do nothing, just check whether + // it haven't changed. + zmq_assert (peer_identity == peer_identity_); + } + } + else if (!peer_identity_.empty ()) { + + // Store the peer identity. + peer_identity = peer_identity_; + + // Register the session using the peer name. + if (!register_session (peer_identity, this)) { + + // TODO: There's already a session with the specified + // identity. We should presumably syslog it and drop the + // session. + zmq_assert (false); + } + } +} + +void zmq::named_session_t::detached () +{ + socket->unregister_session (peer_identity); +} +*/ diff --git a/src/named_session.hpp b/src/named_session.hpp new file mode 100644 index 0000000..7248c8f --- /dev/null +++ b/src/named_session.hpp @@ -0,0 +1,56 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_NAMED_SESSION_HPP_INCLUDED__ +#define __ZMQ_NAMED_SESSION_HPP_INCLUDED__ + +#include "session.hpp" +#include "blob.hpp" + +namespace zmq +{ + + // Named session is created by listener object when the peer identifies + // itself by a strong name. Named session survives reconnections. + + class named_session_t : public session_t + { + public: + + named_session_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_, + const blob_t &name_); + ~named_session_t (); + + // i_inout interface implementation. + void detach (); + + // Handle events from session_t base class. + void attached (const blob_t &peer_identity_); + void detached (); + + private: + + // Name of the session. Corresponds to the peer's strong identity. + blob_t name; + }; + +} + +#endif diff --git a/src/object.cpp b/src/object.cpp index cdb177f..a8294b0 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -24,7 +24,6 @@ #include "err.hpp" #include "pipe.hpp" #include "io_thread.hpp" -#include "owned.hpp" #include "session.hpp" #include "socket_base.hpp" @@ -143,9 +142,9 @@ zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) return ctx->choose_io_thread (taskset_); } -void zmq::object_t::zombify (socket_base_t *socket_) +void zmq::object_t::zombify_socket (socket_base_t *socket_) { - ctx->zombify (socket_); + ctx->zombify_socket (socket_); } void zmq::object_t::send_stop () @@ -158,7 +157,7 @@ void zmq::object_t::send_stop () ctx->send_command (slot, cmd); } -void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_) +void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -169,7 +168,7 @@ void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_) send_command (cmd); } -void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) +void zmq::object_t::send_own (own_t *destination_, own_t *object_) { destination_->inc_seqnum (); command_t cmd; @@ -206,9 +205,8 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, send_command (cmd); } -void zmq::object_t::send_bind (socket_base_t *destination_, - reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_, - bool inc_seqnum_) +void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_, + writer_t *out_pipe_, const blob_t &peer_identity_, bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -269,8 +267,8 @@ void zmq::object_t::send_pipe_term_ack (reader_t *destination_) send_command (cmd); } -void zmq::object_t::send_term_req (socket_base_t *destination_, - owned_t *object_) +void zmq::object_t::send_term_req (own_t *destination_, + own_t *object_) { command_t cmd; cmd.destination = destination_; @@ -279,7 +277,7 @@ void zmq::object_t::send_term_req (socket_base_t *destination_, send_command (cmd); } -void zmq::object_t::send_term (owned_t *destination_) +void zmq::object_t::send_term (own_t *destination_) { command_t cmd; cmd.destination = destination_; @@ -287,7 +285,7 @@ void zmq::object_t::send_term (owned_t *destination_) send_command (cmd); } -void zmq::object_t::send_term_ack (socket_base_t *destination_) +void zmq::object_t::send_term_ack (own_t *destination_) { command_t cmd; cmd.destination = destination_; @@ -305,7 +303,7 @@ void zmq::object_t::process_plug () zmq_assert (false); } -void zmq::object_t::process_own (owned_t *object_) +void zmq::object_t::process_own (own_t *object_) { zmq_assert (false); } @@ -342,7 +340,7 @@ void zmq::object_t::process_pipe_term_ack () zmq_assert (false); } -void zmq::object_t::process_term_req (owned_t *object_) +void zmq::object_t::process_term_req (own_t *object_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index c75a95a..e083ce3 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -53,18 +53,19 @@ namespace zmq // Zombify particular socket. In other words, pass the ownership to // the context. - void zombify (class socket_base_t *socket_); + void zombify_socket (class socket_base_t *socket_); // Derived object can use these functions to send commands // to other objects. void send_stop (); - void send_plug (class owned_t *destination_, bool inc_seqnum_ = true); - void send_own (class socket_base_t *destination_, - class owned_t *object_); + void send_plug (class own_t *destination_, + bool inc_seqnum_ = true); + void send_own (class own_t *destination_, + class own_t *object_); void send_attach (class session_t *destination_, struct i_engine *engine_, const blob_t &peer_identity_, bool inc_seqnum_ = true); - void send_bind (class socket_base_t *destination_, + void send_bind (class own_t *destination_, class reader_t *in_pipe_, class writer_t *out_pipe_, const blob_t &peer_identity_, bool inc_seqnum_ = true); void send_revive (class object_t *destination_); @@ -72,16 +73,16 @@ namespace zmq uint64_t msgs_read_); void send_pipe_term (class writer_t *destination_); void send_pipe_term_ack (class reader_t *destination_); - void send_term_req (class socket_base_t *destination_, - class owned_t *object_); - void send_term (class owned_t *destination_); - void send_term_ack (class socket_base_t *destination_); + void send_term_req (class own_t *destination_, + class own_t *object_); + void send_term (class own_t *destination_); + void send_term_ack (class own_t *destination_); // These handlers can be overloaded by the derived objects. They are // called when command arrives from another thread. virtual void process_stop (); virtual void process_plug (); - virtual void process_own (class owned_t *object_); + virtual void process_own (class own_t *object_); virtual void process_attach (struct i_engine *engine_, const blob_t &peer_identity_); virtual void process_bind (class reader_t *in_pipe_, @@ -90,7 +91,7 @@ namespace zmq virtual void process_reader_info (uint64_t msgs_read_); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); - virtual void process_term_req (class owned_t *object_); + virtual void process_term_req (class own_t *object_); virtual void process_term (); virtual void process_term_ack (); diff --git a/src/own.cpp b/src/own.cpp new file mode 100644 index 0000000..d90e9c4 --- /dev/null +++ b/src/own.cpp @@ -0,0 +1,198 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "own.hpp" +#include "err.hpp" +#include "io_thread.hpp" + +zmq::own_t::own_t (class ctx_t *parent_, uint32_t slot_) : + object_t (parent_, slot_), + terminating (false), + sent_seqnum (0), + processed_seqnum (0), + owner (NULL), + term_acks (0) +{ +} + +zmq::own_t::own_t (io_thread_t *io_thread_) : + object_t (io_thread_), + terminating (false), + sent_seqnum (0), + processed_seqnum (0), + owner (NULL), + term_acks (0) +{ +} + +zmq::own_t::~own_t () +{ +} + +void zmq::own_t::set_owner (own_t *owner_) +{ + zmq_assert (!owner); + owner = owner_; +} + +void zmq::own_t::inc_seqnum () +{ + // This function may be called from a different thread! + sent_seqnum.add (1); +} + +void zmq::own_t::process_seqnum () +{ + // Catch up with counter of processed commands. + processed_seqnum++; + + // We may have catched up and still have pending terms acks. + check_term_acks (); +} + +void zmq::own_t::launch_child (own_t *object_) +{ + // Specify the owner of the object. + object_->set_owner (this); + + // Plug the object into the I/O thread. + send_plug (object_); + + // Take ownership of the object. + send_own (this, object_); +} + +void zmq::own_t::launch_sibling (own_t *object_) +{ + // Specify the owner of the object. + object_->set_owner (owner); + + // Plug the object into its I/O thread. + send_plug (object_); + + // Take ownership of the object. + send_own (owner, object_); +} + +void zmq::own_t::process_term_req (own_t *object_) +{ + // When shutting down we can ignore termination requests from owned + // objects. The termination request was already sent to the object. + if (terminating) + return; + + // If I/O object is well and alive let's ask it to terminate. + owned_t::iterator it = std::find (owned.begin (), owned.end (), object_); + + // If not found, we assume that termination request was already sent to + // the object so we can safely ignore the request. + if (it == owned.end ()) + return; + + owned.erase (it); + register_term_acks (1); + send_term (object_); +} + +void zmq::own_t::process_own (own_t *object_) +{ + // If the object is already being shut down, new owned objects are + // immediately asked to terminate. + if (terminating) { + register_term_acks (1); + send_term (object_); + return; + } + + // Store the reference to the owned object. + owned.insert (object_); +} + +void zmq::own_t::terminate () +{ + // If termination is already underway, there's no point + // in starting it anew. + if (terminating) + return; + + // As for the root of the ownership tree, there's noone to terminate it, + // so it has to terminate itself. + if (!owner) { + process_term (); + return; + } + + // If I am an owned object, I'll ask my owner to terminate me. + send_term_req (owner, this); +} + +void zmq::own_t::process_term () +{ + // Double termination should never happen. + zmq_assert (!terminating); + + // Send termination request to all owned objects. + for (owned_t::iterator it = owned.begin (); it != owned.end (); it++) + send_term (*it); + register_term_acks (owned.size ()); + owned.clear (); + + // Start termination process and check whether by chance we cannot + // terminate immediately. + terminating = true; + check_term_acks (); +} + +void zmq::own_t::register_term_acks (int count_) +{ + term_acks += count_; +} + +void zmq::own_t::unregister_term_ack () +{ + zmq_assert (term_acks > 0); + term_acks--; + + // This may be a last ack we are waiting for before termination... + check_term_acks (); +} + +void zmq::own_t::process_term_ack () +{ + unregister_term_ack (); +} + +void zmq::own_t::check_term_acks () +{ + if (terminating && processed_seqnum == sent_seqnum.get () && + term_acks == 0) { + + // Sanity check. There should be no active children at this point. + zmq_assert (owned.empty ()); + + // The root object has nobody to confirm the termination to. + // Other nodes will confirm the termination to the owner. + if (owner) + send_term_ack (owner); + + // Deallocate the resources. + delete this; + } +} + diff --git a/src/own.hpp b/src/own.hpp new file mode 100644 index 0000000..dc14fcc --- /dev/null +++ b/src/own.hpp @@ -0,0 +1,132 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_OWN_HPP_INCLUDED__ +#define __ZMQ_OWN_HPP_INCLUDED__ + +#include <set> +#include <algorithm> + +#include "object.hpp" +#include "atomic_counter.hpp" +#include "stdint.hpp" + +namespace zmq +{ + + // Base class for objects forming a part of ownership hierarchy. + // It handles initialisation and destruction of such objects. + + class own_t : public object_t + { + public: + + // Note that the owner is unspecified in the constructor. + // It'll be supplied later on when the object is plugged in. + + // The object is not living within an I/O thread. It has it's own + // thread outside of 0MQ infrastructure. + own_t (class ctx_t *parent_, uint32_t slot_); + + // The object is living within I/O thread. + own_t (class io_thread_t *io_thread_); + + // When another owned object wants to send command to this object + // it calls this function to let it know it should not shut down + // before the command is delivered. + void inc_seqnum (); + + protected: + + // Launch the supplied object and become its owner. + void launch_child (own_t *object_); + + // Launch the supplied object and make it your sibling (make your + // owner become its owner as well). + void launch_sibling (own_t *object_); + + // Ask owner object to terminate this object. It may take a while + // while actual termination is started. This function should not be + // called more than once. + void terminate (); + + // Derived object destroys own_t. There's no point in allowing + // others to invoke the destructor. At the same time, it has to be + // virtual so that generic own_t deallocation mechanism destroys + // specific type of the owned object correctly. + virtual ~own_t (); + + // Term handler is protocted rather than private so that it can + // be intercepted by the derived class. This is useful to add custom + // steps to the beginning of the termination process. + void process_term (); + + // Use following two functions to wait for arbitrary events before + // terminating. Just add number of events to wait for using + // register_tem_acks functions. When event occurs, call + // remove_term_ack. When number of pending acks reaches zero + // object will be deallocated. + void register_term_acks (int count_); + void unregister_term_ack (); + + private: + + // Set owner of the object + void set_owner (own_t *owner_); + + // Handlers for incoming commands. + void process_own (own_t *object_); + void process_term_req (own_t *object_); + void process_term_ack (); + + void process_seqnum (); + + // Check whether all the peding term acks were delivered. + // If so, deallocate this object. + void check_term_acks (); + + // True if termination was already initiated. If so, we can destroy + // the object if there are no more child objects or pending term acks. + bool terminating; + + // Sequence number of the last command sent to this object. + atomic_counter_t sent_seqnum; + + // Sequence number of the last command processed by this object. + uint64_t processed_seqnum; + + // Socket owning this object. It's responsible for shutting down + // this object. + own_t *owner; + + // List of all objects owned by this socket. We are responsible + // for deallocating them before we quit. + typedef std::set <own_t*> owned_t; + owned_t owned; + + // Number of events we have to get before we can destroy the object. + int term_acks; + + own_t (const own_t&); + void operator = (const own_t&); + }; + +} + +#endif diff --git a/src/owned.cpp b/src/owned.cpp deleted file mode 100644 index 7d1cf5e..0000000 --- a/src/owned.cpp +++ /dev/null @@ -1,77 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "owned.hpp" -#include "err.hpp" - -zmq::owned_t::owned_t (object_t *parent_, socket_base_t *owner_) : - object_t (parent_), - owner (owner_), - sent_seqnum (0), - processed_seqnum (0), - shutting_down (false) -{ -} - -zmq::owned_t::~owned_t () -{ -} - -void zmq::owned_t::inc_seqnum () -{ - // This function may be called from a different thread! - sent_seqnum.add (1); -} - -void zmq::owned_t::term () -{ - send_term_req (owner, this); -} - -void zmq::owned_t::process_term () -{ - zmq_assert (!shutting_down); - shutting_down = true; - finalise (); -} - -void zmq::owned_t::process_seqnum () -{ - // Catch up with counter of processed commands. - processed_seqnum++; - finalise (); -} - -void zmq::owned_t::finalise () -{ - // If termination request was already received and there are no more - // commands to wait for, terminate the object. - if (shutting_down && processed_seqnum == sent_seqnum.get () - && is_terminable ()) { - process_unplug (); - send_term_ack (owner); - delete this; - } -} - -bool zmq::owned_t::is_terminable () -{ - return true; -} - diff --git a/src/owned.hpp b/src/owned.hpp deleted file mode 100644 index 80cf42f..0000000 --- a/src/owned.hpp +++ /dev/null @@ -1,94 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_OWNED_HPP_INCLUDED__ -#define __ZMQ_OWNED_HPP_INCLUDED__ - -#include "socket_base.hpp" -#include "atomic_counter.hpp" -#include "stdint.hpp" - -namespace zmq -{ - - // Base class for objects owned by individual sockets. Handles - // initialisation and destruction of such objects. - - class owned_t : public object_t - { - public: - - // The object will live in parent's thread, however, its lifetime - // will be managed by its owner socket. - owned_t (object_t *parent_, socket_base_t *owner_); - - // When another owned object wants to send command to this object - // it calls this function to let it know it should not shut down - // before the command is delivered. - void inc_seqnum (); - - protected: - - // A mechanism allowing derived owned objects to postpone the - // termination process. Default implementation defines no such delay. - // Note that the derived object has to call finalise method when the - // delay is over. - virtual bool is_terminable (); - void finalise (); - - // Ask owner socket to terminate this object. - void term (); - - // Derived object destroys owned_t. No point in allowing others to - // invoke the destructor. At the same time, it has to be virtual so - // that generic owned_t deallocation mechanism destroys specific type - // of the owned object correctly. - virtual ~owned_t (); - - // io_object_t defines a new handler used to disconnect the object - // from the poller object. Implement the handlen in the derived - // classes to ensure sane cleanup. - virtual void process_unplug () = 0; - - // Socket owning this object. When the socket is being closed it's - // responsible for shutting down this object. - socket_base_t *owner; - - private: - - // Handlers for incoming commands. - void process_term (); - void process_seqnum (); - - // Sequence number of the last command sent to this object. - atomic_counter_t sent_seqnum; - - // Sequence number of the last command processed by this object. - uint64_t processed_seqnum; - - // If true, the object is already shutting down. - bool shutting_down; - - owned_t (const owned_t&); - void operator = (const owned_t&); - }; - -} - -#endif diff --git a/src/pair.cpp b/src/pair.cpp index 1ff2e1a..8db2ffc 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -28,7 +28,8 @@ zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t slot_) : inpipe (NULL), outpipe (NULL), inpipe_alive (false), - outpipe_alive (false) + outpipe_alive (false), + terminating (false) { options.requires_in = true; options.requires_out = true; @@ -43,6 +44,7 @@ zmq::pair_t::~pair_t () void zmq::pair_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_) { + zmq_assert (!terminating); zmq_assert (!inpipe && !outpipe); inpipe = inpipe_; @@ -59,6 +61,9 @@ void zmq::pair_t::terminated (class reader_t *pipe_) zmq_assert (pipe_ == inpipe); inpipe = NULL; inpipe_alive = false; + + if (terminating) + unregister_term_ack (); } void zmq::pair_t::terminated (class writer_t *pipe_) @@ -66,19 +71,22 @@ void zmq::pair_t::terminated (class writer_t *pipe_) zmq_assert (pipe_ == outpipe); outpipe = NULL; outpipe_alive = false; -} -void zmq::pair_t::xterm_pipes () -{ - if (inpipe) - inpipe->terminate (); - if (outpipe) - outpipe->terminate (); + if (terminating) + unregister_term_ack (); } -bool zmq::pair_t::xhas_pipes () +void zmq::pair_t::process_term () { - return inpipe != NULL || outpipe != NULL; + zmq_assert (inpipe && outpipe); + + terminating = true; + + register_term_acks (2); + inpipe->terminate (); + outpipe->terminate (); + + socket_base_t::process_term (); } void zmq::pair_t::activated (class reader_t *pipe_) diff --git a/src/pair.hpp b/src/pair.hpp index 0c484d7..65b474e 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -39,8 +39,6 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - void xterm_pipes (); - bool xhas_pipes (); int xsend (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); @@ -56,12 +54,17 @@ namespace zmq private: + // Hook into termination process. + void process_term (); + class reader_t *inpipe; class writer_t *outpipe; bool inpipe_alive; bool outpipe_alive; + bool terminating; + pair_t (const pair_t&); void operator = (const pair_t&); }; diff --git a/src/pub.cpp b/src/pub.cpp index d1d1c72..2d0dea2 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -26,7 +26,8 @@ zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t slot_) : socket_base_t (parent_, slot_), - active (0) + active (0), + terminating (false) { options.requires_in = false; options.requires_out = true; @@ -40,6 +41,7 @@ zmq::pub_t::~pub_t () void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_) { + zmq_assert (!terminating); zmq_assert (!inpipe_); outpipe_->set_event_sink (this); @@ -47,18 +49,26 @@ void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, pipes.push_back (outpipe_); pipes.swap (active, pipes.size () - 1); active++; + + if (terminating) { + register_term_acks (1); + outpipe_->terminate (); + } } -void zmq::pub_t::xterm_pipes () +void zmq::pub_t::process_term () { + terminating = true; + // Start shutdown process for all the pipes. for (pipes_t::size_type i = 0; i != pipes.size (); i++) pipes [i]->terminate (); -} -bool zmq::pub_t::xhas_pipes () -{ - return !pipes.empty (); + // Wait for pipes to terminate before terminating yourself. + register_term_acks (pipes.size ()); + + // Continue with the termination immediately. + socket_base_t::process_term (); } void zmq::pub_t::activated (writer_t *pipe_) @@ -75,6 +85,10 @@ void zmq::pub_t::terminated (writer_t *pipe_) if (pipes.index (pipe_) < active) active--; pipes.erase (pipe_); + + // If we are already terminating, wait for one term ack less. + if (terminating) + unregister_term_ack (); } int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) diff --git a/src/pub.hpp b/src/pub.hpp index a81edfe..edc9b53 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -37,8 +37,6 @@ namespace zmq // Implementations of virtual functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - void xterm_pipes (); - bool xhas_pipes (); int xsend (zmq_msg_t *msg_, int flags_); bool xhas_out (); @@ -48,6 +46,9 @@ namespace zmq private: + // Hook into the termination process. + void process_term (); + // Write the message to the pipe. Make the pipe inactive if writing // fails. In such a case false is returned. bool write (class writer_t *pipe_, zmq_msg_t *msg_); @@ -60,6 +61,9 @@ namespace zmq // beginning of the pipes array. pipes_t::size_type active; + // True if termination process is already underway. + bool terminating; + pub_t (const pub_t&); void operator = (const pub_t&); }; diff --git a/src/pull.cpp b/src/pull.cpp index 4f4a8b3..e7b5239 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -23,7 +23,8 @@ #include "err.hpp" zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t slot_) : - socket_base_t (parent_, slot_) + socket_base_t (parent_, slot_), + fq (this) { options.requires_in = true; options.requires_out = false; @@ -40,14 +41,17 @@ void zmq::pull_t::xattach_pipes (class reader_t *inpipe_, fq.attach (inpipe_); } -void zmq::pull_t::xterm_pipes () +void zmq::pull_t::process_term () { - fq.term_pipes (); + register_term_acks (1); + fq.terminate (); + + socket_base_t::process_term (); } -bool zmq::pull_t::xhas_pipes () +void zmq::pull_t::terminated () { - return fq.has_pipes (); + unregister_term_ack (); } int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_) diff --git a/src/pull.hpp b/src/pull.hpp index 4be40dd..997eebf 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -17,32 +17,39 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__ -#define __ZMQ_UPSTREAM_HPP_INCLUDED__ +#ifndef __ZMQ_PULL_HPP_INCLUDED__ +#define __ZMQ_PULL_HPP_INCLUDED__ +#include "i_terminate_events.hpp" #include "socket_base.hpp" #include "fq.hpp" namespace zmq { - class pull_t : public socket_base_t + class pull_t : public socket_base_t, public i_terminate_events { public: pull_t (class ctx_t *parent_, uint32_t slot_); ~pull_t (); + protected: + // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - void xterm_pipes (); - bool xhas_pipes (); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); private: + // i_terminate_events interface implementation. + void terminated (); + + // Hook into the termination process. + void process_term (); + // Fair queueing object for inbound pipes. fq_t fq; diff --git a/src/push.cpp b/src/push.cpp index 3a3d258..f587cef 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -24,7 +24,8 @@ #include "pipe.hpp" zmq::push_t::push_t (class ctx_t *parent_, uint32_t slot_) : - socket_base_t (parent_, slot_) + socket_base_t (parent_, slot_), + lb (this) { options.requires_in = false; options.requires_out = true; @@ -41,14 +42,17 @@ void zmq::push_t::xattach_pipes (class reader_t *inpipe_, lb.attach (outpipe_); } -void zmq::push_t::xterm_pipes () +void zmq::push_t::process_term () { - lb.term_pipes (); + register_term_acks (1); + lb.terminate (); + + socket_base_t::process_term (); } -bool zmq::push_t::xhas_pipes () +void zmq::push_t::terminated () { - return lb.has_pipes (); + unregister_term_ack (); } int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_) diff --git a/src/push.hpp b/src/push.hpp index e604abc..aed2662 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -17,32 +17,39 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__ -#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__ +#ifndef __ZMQ_PUSH_HPP_INCLUDED__ +#define __ZMQ_PUSH_HPP_INCLUDED__ +#include "i_terminate_events.hpp" #include "socket_base.hpp" #include "lb.hpp" namespace zmq { - class push_t : public socket_base_t + class push_t : public socket_base_t, public i_terminate_events { public: push_t (class ctx_t *parent_, uint32_t slot_); ~push_t (); + protected: + // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - void xterm_pipes (); - bool xhas_pipes (); int xsend (zmq_msg_t *msg_, int flags_); bool xhas_out (); private: + // i_terminate_events interface implementation. + void terminated (); + + // Hook into the termination process. + void process_term (); + // Load balancer managing the outbound pipes. lb_t lb; diff --git a/src/session.cpp b/src/session.cpp index 3c74898..0494ff1 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -20,47 +20,25 @@ #include <new> #include "session.hpp" +#include "socket_base.hpp" #include "i_engine.hpp" #include "err.hpp" #include "pipe.hpp" -zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_) : - owned_t (parent_, owner_), +zmq::session_t::session_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_) : + own_t (io_thread_), + options (options_), in_pipe (NULL), incomplete_in (false), active (true), out_pipe (NULL), engine (NULL), - options (options_) + socket (socket_), + io_thread (io_thread_), + attach_processed (false), + term_processed (false) { - // It's possible to register the session at this point as it will be - // searched for only on reconnect, i.e. no race condition (session found - // before it is plugged into it's I/O thread) is possible. - ordinal = owner->register_session (this); -} - -zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_, const blob_t &peer_identity_) : - owned_t (parent_, owner_), - in_pipe (NULL), - incomplete_in (false), - active (true), - out_pipe (NULL), - engine (NULL), - ordinal (0), - peer_identity (peer_identity_), - options (options_) -{ - if (!peer_identity.empty () && peer_identity [0] != 0) { - if (!owner->register_session (peer_identity, this)) { - - // TODO: There's already a session with the specified - // identity. We should presumably syslog it and drop the - // session. - zmq_assert (false); - } - } } zmq::session_t::~session_t () @@ -69,10 +47,10 @@ zmq::session_t::~session_t () zmq_assert (!out_pipe); } -bool zmq::session_t::is_terminable () +void zmq::session_t::terminate () { - // The session won't send term_ack until both in & out pipe are closed. - return !in_pipe && !out_pipe; + // TODO: + zmq_assert (false); } bool zmq::session_t::read (::zmq_msg_t *msg_) @@ -105,17 +83,8 @@ void zmq::session_t::flush () out_pipe->flush (); } -void zmq::session_t::detach (owned_t *reconnecter_) +void zmq::session_t::clean_pipes () { - // Plug in the reconnecter object if any. - if (reconnecter_) { - send_plug (reconnecter_); - send_own (owner, reconnecter_); - } - - // Engine is terminating itself. No need to deallocate it from here. - engine = NULL; - // Get rid of half-processed messages in the out pipe. Flush any // unflushed messages upstream. if (out_pipe) { @@ -135,26 +104,6 @@ void zmq::session_t::detach (owned_t *reconnecter_) zmq_msg_close (&msg); } } - - // Terminate transient session. - if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0)) - term (); -} - -zmq::io_thread_t *zmq::session_t::get_io_thread () -{ - return choose_io_thread (options.affinity); -} - -class zmq::socket_base_t *zmq::session_t::get_owner () -{ - return owner; -} - -uint64_t zmq::session_t::get_ordinal () -{ - zmq_assert (ordinal); - return ordinal; } void zmq::session_t::attach_pipes (class reader_t *inpipe_, @@ -172,6 +121,9 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_, out_pipe = outpipe_; out_pipe->set_event_sink (this); } + + attach_processed = true; + finalise (); } void zmq::session_t::terminated (reader_t *pipe_) @@ -192,14 +144,14 @@ void zmq::session_t::activated (reader_t *pipe_) zmq_assert (in_pipe == pipe_); active = true; if (engine) - engine->revive (); + engine->activate_out (); } void zmq::session_t::activated (writer_t *pipe_) { zmq_assert (out_pipe == pipe_); if (engine) - engine->resume_input (); + engine->activate_in (); } void zmq::session_t::process_plug () @@ -214,10 +166,9 @@ void zmq::session_t::process_unplug () // there may be some commands being sent to the session right now. // Unregister the session from the socket. - if (ordinal) - owner->unregister_session (ordinal); - else if (!peer_identity.empty () && peer_identity [0] != 0) - owner->unregister_session (peer_identity); +// if (!peer_identity.empty () && peer_identity [0] != 0) +// unregister_session (peer_identity); +// TODO: Should be done in named session. // Ask associated pipes to terminate. if (in_pipe) @@ -232,63 +183,65 @@ void zmq::session_t::process_unplug () } } +void zmq::session_t::finalise () +{ + // If all conditions are met, proceed with termination: + // 1. Owner object already asked us to terminate. + // 2. The pipes were already attached to the session. + // 3. Both pipes have already terminated. Note that inbound pipe + // is terminated after delimiter is read, i.e. all messages + // were already sent to the wire. + if (term_processed && attach_processed && !in_pipe && !out_pipe) + own_t::process_term (); +} + void zmq::session_t::process_attach (i_engine *engine_, const blob_t &peer_identity_) { - if (!peer_identity.empty ()) { - - // If both IDs are temporary, no checking is needed. - // TODO: Old ID should be reused in this case... - if (peer_identity.empty () || peer_identity [0] != 0 || - peer_identity_.empty () || peer_identity_ [0] != 0) { - - // If we already know the peer name do nothing, just check whether - // it haven't changed. - zmq_assert (peer_identity == peer_identity_); - } - } - else if (!peer_identity_.empty ()) { - - // Store the peer identity. - peer_identity = peer_identity_; - - // If the session is not registered with the ordinal, let's register - // it using the peer name. - if (!ordinal) { - if (!owner->register_session (peer_identity, this)) { - - // TODO: There's already a session with the specified - // identity. We should presumably syslog it and drop the - // session. - zmq_assert (false); - } - } - } - // Check whether the required pipes already exist. If not so, we'll // create them and bind them to the socket object. reader_t *socket_reader = NULL; writer_t *socket_writer = NULL; if (options.requires_in && !out_pipe) { - create_pipe (owner, this, options.hwm, options.swap, &socket_reader, + create_pipe (socket, this, options.hwm, options.swap, &socket_reader, &out_pipe); out_pipe->set_event_sink (this); } if (options.requires_out && !in_pipe) { - create_pipe (this, owner, options.hwm, options.swap, &in_pipe, + create_pipe (this, socket, options.hwm, options.swap, &in_pipe, &socket_writer); in_pipe->set_event_sink (this); } if (socket_reader || socket_writer) - send_bind (owner, socket_reader, socket_writer, peer_identity); + send_bind (socket, socket_reader, socket_writer, peer_identity); // Plug in the engine. zmq_assert (!engine); zmq_assert (engine_); engine = engine_; - engine->plug (this); + engine->plug (io_thread, this); + + // Trigger the notfication about the attachment. + attached (peer_identity_); +} + +void zmq::session_t::process_term () +{ + // Here we are pugging into the own_t's termination mechanism. + // The goal is to postpone the termination till all the pending messages + // are sent to the peer. + term_processed = true; + finalise (); +} + +void zmq::session_t::attached (const blob_t &peer_identity_) +{ +} + +void zmq::session_t::detached () +{ } diff --git a/src/session.hpp b/src/session.hpp index 603b50c..ba259dc 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -20,8 +20,8 @@ #ifndef __ZMQ_SESSION_HPP_INCLUDED__ #define __ZMQ_SESSION_HPP_INCLUDED__ +#include "own.hpp" #include "i_inout.hpp" -#include "owned.hpp" #include "options.hpp" #include "blob.hpp" #include "pipe.hpp" @@ -30,29 +30,22 @@ namespace zmq { class session_t : - public owned_t, + public own_t, public i_inout, public i_reader_events, public i_writer_events { public: - // Creates unnamed session. - session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_); + session_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_); - // Creates named session. - session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_, const blob_t &peer_identity_); - - // i_inout interface implementation. + // i_inout interface implementation. Note that detach method is not + // implemented by generic session. Different session types may handle + // engine disconnection in different ways. bool read (::zmq_msg_t *msg_); bool write (::zmq_msg_t *msg_); void flush (); - void detach (owned_t *reconnecter_); - class io_thread_t *get_io_thread (); - class socket_base_t *get_owner (); - uint64_t get_ordinal (); void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); @@ -65,19 +58,40 @@ namespace zmq void activated (class writer_t *pipe_); void terminated (class writer_t *pipe_); - private: + protected: + + // Forcefully close this session (without sending + // outbound messages to the wire). + void terminate (); + + // Two events for the derived session type. Attached is triggered + // when session is attached to a peer, detached is triggered at the + // beginning of the termination process when session is about to + // be detached from the peer. + virtual void attached (const blob_t &peer_identity_); + virtual void detached (); ~session_t (); - // Define the delayed termination. (I.e. termination is postponed - // till all the data is flushed to the kernel.) - bool is_terminable (); + // Remove any half processed messages. Flush unflushed messages. + // Call this function when engine disconnect to get rid of leftovers. + void clean_pipes (); + + // Inherited socket options. These are visible to all session classes. + options_t options; + + private: // Handlers for incoming commands. void process_plug (); void process_unplug (); void process_attach (struct i_engine *engine_, const blob_t &peer_identity_); + void process_term (); + + // Check whether object is ready for termination. If so proceed + // with closing child objects. + void finalise (); // Inbound pipe, i.e. one the session is getting messages from. class reader_t *in_pipe; @@ -92,18 +106,25 @@ namespace zmq // Outbound pipe, i.e. one the socket is sending messages to. class writer_t *out_pipe; + // The protocol I/O engine connected to the session. struct i_engine *engine; - // Session is identified by ordinal in the case when it was created - // before connection to the peer was established and thus we are - // unaware of peer's identity. - uint64_t ordinal; - - // Identity of the peer. + // Identity of the peer (say the component on the other side + // of TCP connection). blob_t peer_identity; - // Inherited socket options. - options_t options; + // The socket the session belongs to. + class socket_base_t *socket; + + // I/O thread the session is living in. It will be used to plug in + // the engines into the same thread. + class io_thread_t *io_thread; + + // True if pipes were already attached. + bool attach_processed; + + // True if term command was already processed. + bool term_processed; session_t (const session_t&); void operator = (const session_t&); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 5d3175a..903e781 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -38,9 +38,8 @@ #include "zmq_listener.hpp" #include "zmq_connecter.hpp" #include "io_thread.hpp" -#include "session.hpp" +#include "connect_session.hpp" #include "config.hpp" -#include "owned.hpp" #include "pipe.hpp" #include "err.hpp" #include "ctx.hpp" @@ -109,20 +108,20 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, } zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) : - object_t (parent_, slot_), + own_t (parent_, slot_), zombie (false), last_processing_time (0), - pending_term_acks (0), ticks (0), - rcvmore (false), - sent_seqnum (0), - processed_seqnum (0), - next_ordinal (1) + rcvmore (false) { } zmq::socket_base_t::~socket_base_t () { + // Check whether there are no session leaks. + sessions_sync.lock (); + zmq_assert (sessions.empty ()); + sessions_sync.unlock (); } zmq::signaler_t *zmq::socket_base_t::get_signaler () @@ -139,6 +138,46 @@ void zmq::socket_base_t::stop () send_stop (); } +int zmq::socket_base_t::check_protocol (const std::string &protocol_) +{ + // First check out whether the protcol is something we are aware of. + if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && + protocol_ != "pgm" && protocol_ != "epgm") { + errno = EPROTONOSUPPORT; + return -1; + } + + // If 0MQ is not compiled with OpenPGM, pgm and epgm transports + // are not avaialble. +#if !defined ZMQ_HAVE_OPENPGM + if (protocol_ == "pgm" || protocol_ == "epgm") { + errno = EPROTONOSUPPORT; + return -1; + } +#endif + + // IPC transport is not available on Windows and OpenVMS. +#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS + if (protocol_ != "ipc") { + // Unknown protocol. + errno = EPROTONOSUPPORT; + return -1; + } +#endif + + // Check whether socket type and transport protocol match. + // Specifically, multicast protocols can't be combined with + // bi-directional messaging patterns (socket types). + if ((protocol_ == "pgm" || protocol_ == "epgm") && + options.requires_in && options.requires_out) { + errno = ENOCOMPATPROTO; + return -1; + } + + // Protocol is available. + return 0; +} + void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_) { @@ -225,56 +264,48 @@ int zmq::socket_base_t::bind (const char *addr_) } // Parse addr_ string. - std::string addr_type; - std::string addr_args; - - std::string addr (addr_); - std::string::size_type pos = addr.find ("://"); - - if (pos == std::string::npos) { - errno = EINVAL; - return -1; + std::string protocol; + std::string address; + { + std::string addr (addr_); + std::string::size_type pos = addr.find ("://"); + if (pos == std::string::npos) { + errno = EINVAL; + return -1; + } + protocol = addr.substr (0, pos); + address = addr.substr (pos + 3); } - addr_type = addr.substr (0, pos); - addr_args = addr.substr (pos + 3); - - if (addr_type == "inproc") - return register_endpoint (addr_args.c_str (), this); - - if (addr_type == "tcp" || addr_type == "ipc") { + int rc = check_protocol (protocol); + if (rc != 0) + return -1; -#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS - if (addr_type == "ipc") { - errno = EPROTONOSUPPORT; - return -1; - } -#endif + if (protocol == "inproc") + return register_endpoint (address.c_str (), this); + if (protocol == "tcp" || protocol == "ipc") { zmq_listener_t *listener = new (std::nothrow) zmq_listener_t ( choose_io_thread (options.affinity), this, options); zmq_assert (listener); - int rc = listener->set_address (addr_type.c_str(), addr_args.c_str ()); + int rc = listener->set_address (protocol.c_str(), address.c_str ()); if (rc != 0) { delete listener; return -1; } + launch_child (listener); - send_plug (listener); - send_own (this, listener); return 0; } -#if defined ZMQ_HAVE_OPENPGM - if (addr_type == "pgm" || addr_type == "epgm") { - // In the case of PGM bind behaves the same like connect. + if (protocol == "pgm" || protocol == "epgm") { + + // For convenience's sake, bind can be used interchageable with + // connect for PGM and EPGM transports. return connect (addr_); } -#endif - // Unknown protocol. - errno = EPROTONOSUPPORT; - return -1; + zmq_assert (false); } int zmq::socket_base_t::connect (const char *addr_) @@ -285,28 +316,31 @@ int zmq::socket_base_t::connect (const char *addr_) } // Parse addr_ string. - std::string addr_type; - std::string addr_args; - - std::string addr (addr_); - std::string::size_type pos = addr.find ("://"); - - if (pos == std::string::npos) { - errno = EINVAL; - return -1; + std::string protocol; + std::string address; + { + std::string addr (addr_); + std::string::size_type pos = addr.find ("://"); + if (pos == std::string::npos) { + errno = EINVAL; + return -1; + } + protocol = addr.substr (0, pos); + address = addr.substr (pos + 3); } - addr_type = addr.substr (0, pos); - addr_args = addr.substr (pos + 3); + int rc = check_protocol (protocol); + if (rc != 0) + return -1; - if (addr_type == "inproc") { + if (protocol == "inproc") { // TODO: inproc connect is specific with respect to creating pipes // as there's no 'reconnect' functionality implemented. Once that // is in place we should follow generic pipe creation algorithm. // Find the peer socket. - socket_base_t *peer = find_endpoint (addr_args.c_str ()); + socket_base_t *peer = find_endpoint (address.c_str ()); if (!peer) return -1; @@ -329,18 +363,18 @@ int zmq::socket_base_t::connect (const char *addr_) attach_pipes (inpipe_reader, outpipe_writer, blob_t ()); // Attach the pipes to the peer socket. Note that peer's seqnum - // was incremented in find_endpoint function. The callee is notified - // about the fact via the last parameter. + // was incremented in find_endpoint function. We don't need it + // increased here. send_bind (peer, outpipe_reader, inpipe_writer, options.identity, false); return 0; } - // Create unnamed session. - io_thread_t *io_thread = choose_io_thread (options.affinity); - session_t *session = new (std::nothrow) session_t (io_thread, - this, options); + // Create session. + connect_session_t *session = new (std::nothrow) connect_session_t ( + choose_io_thread (options.affinity), this, options, + protocol.c_str (), address.c_str ()); zmq_assert (session); // If 'immediate connect' feature is required, we'll create the pipes @@ -370,95 +404,10 @@ int zmq::socket_base_t::connect (const char *addr_) session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ()); } - // Activate the session. - send_plug (session); - send_own (this, session); - - if (addr_type == "tcp" || addr_type == "ipc") { - -#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS - // Windows named pipes are not compatible with Winsock API. - // There's no UNIX domain socket implementation on OpenVMS. - if (addr_type == "ipc") { - errno = EPROTONOSUPPORT; - return -1; - } -#endif - - // Create the connecter object. Supply it with the session name - // so that it can bind the new connection to the session once - // it is established. - zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( - choose_io_thread (options.affinity), this, options, - session->get_ordinal (), false); - zmq_assert (connecter); - int rc = connecter->set_address (addr_type.c_str(), addr_args.c_str ()); - if (rc != 0) { - delete connecter; - return -1; - } - send_plug (connecter); - send_own (this, connecter); - - return 0; - } - -#if defined ZMQ_HAVE_OPENPGM - if (addr_type == "pgm" || addr_type == "epgm") { - - // If the socket type requires bi-directional communication - // multicast is not an option (it is uni-directional). - if (options.requires_in && options.requires_out) { - errno = ENOCOMPATPROTO; - return -1; - } - - // For epgm, pgm transport with UDP encapsulation is used. - bool udp_encapsulation = (addr_type == "epgm"); - - // At this point we'll create message pipes to the session straight - // away. There's no point in delaying it as no concept of 'connect' - // exists with PGM anyway. - if (options.requires_out) { - - // PGM sender. - pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( - choose_io_thread (options.affinity), options); - zmq_assert (pgm_sender); + // Activate the session. Make it a child of this socket. + launch_child (session); - int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ()); - if (rc != 0) { - delete pgm_sender; - return -1; - } - - send_attach (session, pgm_sender, blob_t ()); - } - else if (options.requires_in) { - - // PGM receiver. - pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( - choose_io_thread (options.affinity), options); - zmq_assert (pgm_receiver); - - int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ()); - if (rc != 0) { - delete pgm_receiver; - return -1; - } - - send_attach (session, pgm_receiver, blob_t ()); - } - else - zmq_assert (false); - - return 0; - } -#endif - - // Unknown protoco. - errno = EPROTONOSUPPORT; - return -1; + return 0; } int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) @@ -587,72 +536,23 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::close () { + zmq_assert (!zombie); + // Socket becomes a zombie. From now on all new arrived pipes (bind - // command) and I/O objects (own command) are immediately terminated. - // Also, any further requests form I/O object termination are ignored - // (we are going to shut them down anyway -- this way we assure that - // we do so once only). + // command) are immediately terminated. zombie = true; - // Unregister all inproc endpoints associated with this socket. - // Doing this we make sure that no new pipes from other sockets (inproc) - // will be initiated. However, there may be some inproc pipes already - // on the fly, but not yet received by this socket. To get finished - // with them we'll do the subsequent waiting from on-the-fly commands. - // This should happen very quickly. There's no way to block here for - // extensive period of time. - unregister_endpoints (this); - while (processed_seqnum != sent_seqnum.get ()) - process_commands (true, false); - // TODO: My feeling is that the above has to be done in the dezombification - // loop, otherwise we may end up with number of i/o object dropping to zero - // even though there are more i/o objects on the way. - - // The above process ensures that only pipes that will arrive from now on - // are those initiated by sessions. These in turn have a nice property of - // not arriving totally asynchronously. When a session -- being an I/O - // object -- acknowledges its termination we are 100% sure that we'll get - // no new pipe from it. - - // Start termination of all the pipes presently associated with the socket. - xterm_pipes (); - - // Send termination request to all associated I/O objects. - // Start waiting for the acks. Note that the actual waiting is not done - // in this function. Rather it is done in delayed manner as socket is - // being dezombified. The reason is that I/O object shutdown can take - // considerable amount of time in case there's still a lot of data to - // push to the network. - for (io_objects_t::iterator it = io_objects.begin (); - it != io_objects.end (); it++) - send_term (*it); - pending_term_acks += io_objects.size (); - io_objects.clear (); - - // Note that new I/O objects may arrive even in zombie state (say new - // session initiated by a listener object), however, in such case number - // of pending acks never drops to zero. Here's the scenario: We have an - // pending ack for the listener object. Then 'own' commands arrives from - // the listener notifying the socket about new session. It immediately - // triggers termination request and number of of pending acks if - // incremented. Then term_acks arrives from the listener. Number of pending - // acks is decremented. Later on, the session itself will ack its - // termination. During the process, number of pending acks never dropped - // to zero and thus the socket remains safely in the zombie state. - - // Transfer the ownership of the socket from this application thread + // Start termination of associated I/O object hierarchy. + terminate (); + + // Ask context to zombify this socket. In other words, transfer + // the ownership of the socket from this application thread // to the context which will take care of the rest of shutdown process. - zombify (this); + zombify_socket (this); return 0; } -void zmq::socket_base_t::inc_seqnum () -{ - // Be aware: This function may be called from a different thread! - sent_seqnum.add (1); -} - bool zmq::socket_base_t::has_in () { return xhas_in (); @@ -667,7 +567,7 @@ bool zmq::socket_base_t::register_session (const blob_t &peer_identity_, session_t *session_) { sessions_sync.lock (); - bool registered = named_sessions.insert ( + bool registered = sessions.insert ( std::make_pair (peer_identity_, session_)).second; sessions_sync.unlock (); return registered; @@ -676,17 +576,17 @@ bool zmq::socket_base_t::register_session (const blob_t &peer_identity_, void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_) { sessions_sync.lock (); - named_sessions_t::iterator it = named_sessions.find (peer_identity_); - zmq_assert (it != named_sessions.end ()); - named_sessions.erase (it); + sessions_t::iterator it = sessions.find (peer_identity_); + zmq_assert (it != sessions.end ()); + sessions.erase (it); sessions_sync.unlock (); } zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_) { sessions_sync.lock (); - named_sessions_t::iterator it = named_sessions.find (peer_identity_); - if (it == named_sessions.end ()) { + sessions_t::iterator it = sessions.find (peer_identity_); + if (it == sessions.end ()) { sessions_sync.unlock (); return NULL; } @@ -699,74 +599,16 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_) return session; } -uint64_t zmq::socket_base_t::register_session (session_t *session_) -{ - sessions_sync.lock (); - uint64_t ordinal = next_ordinal; - next_ordinal++; - unnamed_sessions.insert (std::make_pair (ordinal, session_)); - sessions_sync.unlock (); - return ordinal; -} - -void zmq::socket_base_t::unregister_session (uint64_t ordinal_) -{ - sessions_sync.lock (); - unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_); - zmq_assert (it != unnamed_sessions.end ()); - unnamed_sessions.erase (it); - sessions_sync.unlock (); -} - -zmq::session_t *zmq::socket_base_t::find_session (uint64_t ordinal_) -{ - sessions_sync.lock (); - - unnamed_sessions_t::iterator it = unnamed_sessions.find (ordinal_); - if (it == unnamed_sessions.end ()) { - sessions_sync.unlock (); - return NULL; - } - session_t *session = it->second; - - // Prepare the session for subsequent attach command. - session->inc_seqnum (); - - sessions_sync.unlock (); - return session; -} - bool zmq::socket_base_t::dezombify () { zmq_assert (zombie); // Process any commands from other threads/sockets that may be available - // at the moment. + // at the moment. Ultimately, socket will be destroyed. process_commands (false, false); - // If there are no more pipes attached and there are no more I/O objects - // owned by the socket, we can kill the zombie. - if (!pending_term_acks && !xhas_pipes ()) { - - // If all objects have acknowledged their termination there should - // definitely be no I/O object remaining in the list. - zmq_assert (io_objects.empty ()); - - // Check whether there are no session leaks. - sessions_sync.lock (); - zmq_assert (named_sessions.empty ()); - zmq_assert (unnamed_sessions.empty ()); - sessions_sync.unlock (); - - // Deallocate all the resources tied to this socket. - delete this; - - // Notify the caller about the fact that the zombie is finally dead. - return true; - } - - // The zombie remains undead. - return false; +// TODO: ??? + return true; } void zmq::socket_base_t::process_commands (bool block_, bool throttle_) @@ -828,19 +670,6 @@ void zmq::socket_base_t::process_stop () zombie = true; } -void zmq::socket_base_t::process_own (owned_t *object_) -{ - // If the socket is already being shut down, new owned objects are - // immediately asked to terminate. - if (zombie) { - send_term (object_); - pending_term_acks++; - return; - } - - io_objects.insert (object_); -} - void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_) { @@ -857,37 +686,21 @@ void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, attach_pipes (in_pipe_, out_pipe_, peer_identity_); } -void zmq::socket_base_t::process_term_req (owned_t *object_) +void zmq::socket_base_t::process_unplug () { - // When shutting down we can ignore termination requests from owned - // objects. It means the termination request was already sent to - // the object. - if (zombie) - return; - - // If I/O object is well and alive ask it to terminate. - io_objects_t::iterator it = std::find (io_objects.begin (), - io_objects.end (), object_); - - // If not found, we assume that termination request was already sent to - // the object so we can safely ignore the request. - if (it == io_objects.end ()) - return; - - pending_term_acks++; - io_objects.erase (it); - send_term (object_); } -void zmq::socket_base_t::process_term_ack () +void zmq::socket_base_t::process_term () { - zmq_assert (pending_term_acks); - pending_term_acks--; -} + zmq_assert (zombie); -void zmq::socket_base_t::process_seqnum () -{ - processed_seqnum++; + // Unregister all inproc endpoints associated with this socket. + // Doing this we make sure that no new pipes from other sockets (inproc) + // will be initiated. + unregister_endpoints (this); + + // Continue the termination process immediately. + own_t::process_term (); } int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_, diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 386fdbb..f76dc4c 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -20,13 +20,12 @@ #ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__ #define __ZMQ_SOCKET_BASE_HPP_INCLUDED__ -#include <set> #include <map> #include <vector> #include "../include/zmq.h" -#include "object.hpp" +#include "own.hpp" #include "yarray_item.hpp" #include "mutex.hpp" #include "options.hpp" @@ -35,12 +34,13 @@ #include "signaler.hpp" #include "stdint.hpp" #include "blob.hpp" +#include "own.hpp" namespace zmq { class socket_base_t : - public object_t, + public own_t, public yarray_item_t { public: @@ -65,30 +65,16 @@ namespace zmq int recv (zmq_msg_t *msg_, int flags_); int close (); - // When another owned object wants to send command to this object - // it calls this function to let it know it should not shut down - // before the command is delivered. - void inc_seqnum (); - // These functions are used by the polling mechanism to determine // which events are to be reported from this socket. bool has_in (); bool has_out (); - // The list of sessions cannot be accessed via inter-thread - // commands as it is unacceptable to wait for the completion of the - // action till user application yields control of the application - // thread to 0MQ. Locking is used instead. - // There are two distinct types of sessions: those identified by name - // and those identified by ordinal number. Thus two sets of session - // management functions. + // Registry of named sessions. bool register_session (const blob_t &peer_identity_, class session_t *session_); void unregister_session (const blob_t &peer_identity_); class session_t *find_session (const blob_t &peer_identity_); - uint64_t register_session (class session_t *session_); - void unregister_session (uint64_t ordinal_); - class session_t *find_session (uint64_t ordinal_); // i_reader_events interface implementation. void activated (class reader_t *pipe_); @@ -99,7 +85,7 @@ namespace zmq void terminated (class writer_t *pipe_); // This function should be called only on zombie sockets. It tries - // to deallocate the zombie and returns true is successful. + // to deallocate the zombie. Returns true if zombie is finally dead. bool dezombify (); protected: @@ -109,11 +95,8 @@ namespace zmq // Concrete algorithms for the x- methods are to be defined by // individual socket types. - virtual void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_) = 0; - virtual void xterm_pipes () = 0; - virtual bool xhas_pipes () = 0; // The default implementation assumes there are no specific socket // options for the particular socket type. If not so, overload this @@ -132,12 +115,22 @@ namespace zmq // Socket options. options_t options; + // We are declaring termination handler as protected so that + // individual socket types can hook into the termination process + // by overloading it. + void process_term (); + + private: + +// TODO: Check whether we still need this flag... // If true, socket was already closed but not yet deallocated // because either shutdown is in process or there are still pipes // attached to the socket. bool zombie; - private: + // Check whether transport protocol, as specified in connect or + // bind, is available and compatible with the socket type. + int check_protocol (const std::string &protocol_); // If no identity set generate one and call xattach_pipes (). void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, @@ -151,12 +144,9 @@ namespace zmq // Handlers for incoming commands. void process_stop (); - void process_own (class owned_t *object_); void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_, const blob_t &peer_identity_); - void process_term_req (class owned_t *object_); - void process_term_ack (); - void process_seqnum (); + void process_unplug (); // App thread's signaler object. signaler_t signaler; @@ -164,36 +154,18 @@ namespace zmq // Timestamp of when commands were processed the last time. uint64_t last_processing_time; - // List of all I/O objects owned by this socket. The socket is - // responsible for deallocating them before it quits. - typedef std::set <class owned_t*> io_objects_t; - io_objects_t io_objects; - - // Number of I/O objects that were already asked to terminate - // but haven't acknowledged it yet. - int pending_term_acks; - // Number of messages received since last command processing. int ticks; // If true there's a half-read message in the socket. bool rcvmore; - // Sequence number of the last command sent to this object. - atomic_counter_t sent_seqnum; - - // Sequence number of the last command processed by this object. - uint64_t processed_seqnum; - // Lists of existing sessions. This lists are never referenced from - // within the socket, instead they are used by I/O objects owned by + // within the socket, instead they are used by objects owned by // the socket. As those objects can live in different threads, // the access is synchronised by mutex. - typedef std::map <blob_t, session_t*> named_sessions_t; - named_sessions_t named_sessions; - typedef std::map <uint64_t, session_t*> unnamed_sessions_t; - unnamed_sessions_t unnamed_sessions; - uint64_t next_ordinal; + typedef std::map <blob_t, session_t*> sessions_t; + sessions_t sessions; mutex_t sessions_sync; socket_base_t (const socket_base_t&); diff --git a/src/sub.cpp b/src/sub.cpp index a1e8fb7..89df106 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -26,6 +26,7 @@ zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t slot_) : socket_base_t (parent_, slot_), + fq (this), has_message (false), more (false) { @@ -46,14 +47,17 @@ void zmq::sub_t::xattach_pipes (class reader_t *inpipe_, fq.attach (inpipe_); } -void zmq::sub_t::xterm_pipes () +void zmq::sub_t::process_term () { - fq.term_pipes (); + register_term_acks (1); + fq.terminate (); + + socket_base_t::process_term (); } -bool zmq::sub_t::xhas_pipes () +void zmq::sub_t::terminated () { - return fq.has_pipes (); + unregister_term_ack (); } int zmq::sub_t::xsetsockopt (int option_, const void *optval_, diff --git a/src/sub.hpp b/src/sub.hpp index da69892..a832c48 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -24,12 +24,13 @@ #include "prefix_tree.hpp" #include "socket_base.hpp" +#include "i_terminate_events.hpp" #include "fq.hpp" namespace zmq { - class sub_t : public socket_base_t + class sub_t : public socket_base_t, public i_terminate_events { public: @@ -41,14 +42,18 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - void xterm_pipes (); - bool xhas_pipes (); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); private: + // i_terminate_events interface implementation. + void terminated (); + + // Hook into the termination process. + void process_term (); + // Check whether the message matches at least one subscription. bool match (zmq_msg_t *msg_); diff --git a/src/transient_session.cpp b/src/transient_session.cpp new file mode 100644 index 0000000..9dd7431 --- /dev/null +++ b/src/transient_session.cpp @@ -0,0 +1,36 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "transient_session.hpp" + +zmq::transient_session_t::transient_session_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_) : + session_t (io_thread_, socket_, options_) +{ +} + +zmq::transient_session_t::~transient_session_t () +{ +} + +void zmq::transient_session_t::detach () +{ + // There's no way to reestablish a transient session. Tear it down. + terminate (); +} diff --git a/src/transient_session.hpp b/src/transient_session.hpp new file mode 100644 index 0000000..30d4c29 --- /dev/null +++ b/src/transient_session.hpp @@ -0,0 +1,46 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_TRANSIENT_SESSION_HPP_INCLUDED__ +#define __ZMQ_TRANSIENT_SESSION_HPP_INCLUDED__ + +#include "session.hpp" + +namespace zmq +{ + + // Transient session is created by the listener when the connected peer + // stays anonymous. Transient session is destroyed on disconnect. + + class transient_session_t : public session_t + { + public: + + transient_session_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_); + ~transient_session_t (); + + // i_inout interface implementation. + void detach (); + + }; + +} + +#endif diff --git a/src/xrep.cpp b/src/xrep.cpp index 73d7856..c1cfb00 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -29,7 +29,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t slot_) : prefetched (false), more_in (false), current_out (NULL), - more_out (false) + more_out (false), + terminating (false) { options.requires_in = true; options.requires_out = true; @@ -62,16 +63,27 @@ void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, inpipe_t inpipe = {inpipe_, peer_identity_, true}; inpipes.push_back (inpipe); + + if (terminating) { + register_term_acks (1); + inpipe_->terminate (); + } } -void zmq::xrep_t::xterm_pipes () +void zmq::xrep_t::process_term () { + terminating = true; + + register_term_acks (inpipes.size () + outpipes.size ()); + for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); it++) it->reader->terminate (); for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); it++) it->second.writer->terminate (); + + socket_base_t::process_term (); } void zmq::xrep_t::terminated (reader_t *pipe_) @@ -80,6 +92,8 @@ void zmq::xrep_t::terminated (reader_t *pipe_) it++) { if (it->reader == pipe_) { inpipes.erase (it); + if (terminating) + unregister_term_ack (); return; } } @@ -94,17 +108,14 @@ void zmq::xrep_t::terminated (writer_t *pipe_) outpipes.erase (it); if (pipe_ == current_out) current_out = NULL; + if (terminating) + unregister_term_ack (); return; } } zmq_assert (false); } -bool zmq::xrep_t::xhas_pipes () -{ - return !inpipes.empty () || !outpipes.empty (); -} - void zmq::xrep_t::activated (reader_t *pipe_) { for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); diff --git a/src/xrep.hpp b/src/xrep.hpp index 1c240ff..b16682d 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -44,13 +44,16 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, const blob_t &peer_identity_); - void xterm_pipes (); - bool xhas_pipes (); int xsend (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); + private: + + // Hook into the termination process. + void process_term (); + // i_reader_events interface implementation. void activated (reader_t *pipe_); void terminated (reader_t *pipe_); @@ -59,8 +62,6 @@ namespace zmq void activated (writer_t *pipe_); void terminated (writer_t *pipe_); - private: - struct inpipe_t { class reader_t *reader; @@ -100,6 +101,9 @@ namespace zmq // If true, more outgoing message parts are expected. bool more_out; + // If true, termination process is already underway. + bool terminating; + xrep_t (const xrep_t&); void operator = (const xrep_t&); }; diff --git a/src/xreq.cpp b/src/xreq.cpp index 893c18e..e511660 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -23,7 +23,9 @@ #include "err.hpp" zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t slot_) : - socket_base_t (parent_, slot_) + socket_base_t (parent_, slot_), + fq (this), + lb (this) { options.requires_in = true; options.requires_out = true; @@ -41,15 +43,18 @@ void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_, lb.attach (outpipe_); } -void zmq::xreq_t::xterm_pipes () +void zmq::xreq_t::process_term () { - fq.term_pipes (); - lb.term_pipes (); + register_term_acks (2); + fq.terminate (); + lb.terminate (); + + socket_base_t::process_term (); } -bool zmq::xreq_t::xhas_pipes () +void zmq::xreq_t::terminated () { - return fq.has_pipes () || lb.has_pipes (); + unregister_term_ack (); } int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) diff --git a/src/xreq.hpp b/src/xreq.hpp index b8b9a0b..9dc10c5 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -21,24 +21,25 @@ #define __ZMQ_XREQ_HPP_INCLUDED__ #include "socket_base.hpp" +#include "i_terminate_events.hpp" #include "fq.hpp" #include "lb.hpp" namespace zmq { - class xreq_t : public socket_base_t + class xreq_t : public socket_base_t, public i_terminate_events { public: xreq_t (class ctx_t *parent_, uint32_t slot_); ~xreq_t (); + protected: + // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - void xterm_pipes (); - bool xhas_pipes (); int xsend (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); @@ -46,6 +47,12 @@ namespace zmq private: + // i_terminate_events interface implementation. + void terminated (); + + // Hook into the termination process. + void process_term (); + // Messages are fair-queued from inbound pipes. And load-balanced to // the outbound pipes. fq_t fq; diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index ebd7572..6223c45 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -25,33 +25,24 @@ #include "io_thread.hpp" #include "err.hpp" -zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, - socket_base_t *owner_, const options_t &options_, - uint64_t session_ordinal_, bool wait_) : - owned_t (parent_, owner_), - io_object_t (parent_), +zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_, + class session_t *session_, const options_t &options_, + const char *protocol_, const char *address_) : + own_t (io_thread_), + io_object_t (io_thread_), handle_valid (false), - wait (wait_), - session_ordinal (session_ordinal_), + wait (false), + session (session_), options (options_) { + int rc = tcp_connecter.set_address (protocol_, address_); + zmq_assert (rc == 0); } zmq::zmq_connecter_t::~zmq_connecter_t () { } -int zmq::zmq_connecter_t::set_address (const char *protocol_, - const char *address_) -{ - int rc = tcp_connecter.set_address (protocol_, address_); - if (rc != 0) - return rc; - protocol = protocol_; - address = address_; - return 0; -} - void zmq::zmq_connecter_t::process_plug () { if (wait) @@ -92,15 +83,12 @@ void zmq::zmq_connecter_t::out_event () // Create an init object. zmq_init_t *init = new (std::nothrow) zmq_init_t ( - choose_io_thread (options.affinity), owner, - fd, options, true, protocol.c_str (), address.c_str (), - session_ordinal); + choose_io_thread (options.affinity), NULL, session, fd, options); zmq_assert (init); - send_plug (init); - send_own (owner, init); + launch_sibling (init); - // Ask owner socket to shut the connecter down. - term (); + // Shut the connecter down. + terminate (); } void zmq::zmq_connecter_t::timer_event () diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp index 328dd6a..191e5b2 100644 --- a/src/zmq_connecter.hpp +++ b/src/zmq_connecter.hpp @@ -20,9 +20,7 @@ #ifndef __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__ #define __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__ -#include <string> - -#include "owned.hpp" +#include "own.hpp" #include "io_object.hpp" #include "tcp_connecter.hpp" #include "options.hpp" @@ -31,17 +29,17 @@ namespace zmq { - class zmq_connecter_t : public owned_t, public io_object_t + class zmq_connecter_t : public own_t, public io_object_t { public: - zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_, - const options_t &options_, uint64_t session_ordinal_, bool wait_); + // If 'wait' is true connecter first waits for a while, then starts + // connection process. + zmq_connecter_t (class io_thread_t *io_thread_, + class session_t *session_, const options_t &options_, + const char *protocol_, const char *address_); ~zmq_connecter_t (); - // Set address to connect to. - int set_address (const char *protocol_, const char *address_); - private: // Handlers for incoming commands. @@ -69,16 +67,12 @@ namespace zmq // If true, connecter is waiting a while before trying to connect. bool wait; - // Ordinal of the session to attach to. - uint64_t session_ordinal; + // Reference to the session we belong to. + class session_t *session; // Associated socket options. options_t options; - // Protocol and address to connect to. - std::string protocol; - std::string address; - zmq_connecter_t (const zmq_connecter_t&); void operator = (const zmq_connecter_t&); }; diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 41b10c8..de26b27 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -32,10 +32,7 @@ #include "config.hpp" #include "err.hpp" -zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, - const options_t &options_, bool reconnect_, - const char *protocol_, const char *address_) : - io_object_t (parent_), +zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : inpos (NULL), insize (0), decoder (in_batch_size), @@ -43,14 +40,8 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, outsize (0), encoder (out_batch_size), inout (NULL), - options (options_), - reconnect (reconnect_) + options (options_) { - if (reconnect) { - protocol = protocol_; - address = address_; - } - // Initialise the underlying socket. int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf); zmq_assert (rc == 0); @@ -60,26 +51,37 @@ zmq::zmq_engine_t::~zmq_engine_t () { } -void zmq::zmq_engine_t::plug (i_inout *inout_) +void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_) { + // Conncet to session/init object. zmq_assert (!inout); - + zmq_assert (inout_); encoder.set_inout (inout_); decoder.set_inout (inout_); + inout = inout_; + + // Connect to I/O threads poller object. + io_object_t::plug (io_thread_); handle = add_fd (tcp_socket.get_fd ()); set_pollin (handle); set_pollout (handle); - inout = inout_; - // Flush all the data that may have been already received downstream. in_event (); + + // TODO: Re-plug to the new I/O thread & poller! } void zmq::zmq_engine_t::unplug () { + // Cancel all fd subscriptions. rm_fd (handle); + + // Disconnect from I/O threads poller object. + io_object_t::unplug (); + + // Disconnect from init/session object. encoder.set_inout (NULL); decoder.set_inout (NULL); inout = NULL; @@ -155,7 +157,7 @@ void zmq::zmq_engine_t::out_event () outsize -= nbytes; } -void zmq::zmq_engine_t::revive () +void zmq::zmq_engine_t::activate_out () { set_pollout (handle); @@ -166,30 +168,18 @@ void zmq::zmq_engine_t::revive () out_event (); } -void zmq::zmq_engine_t::resume_input () +void zmq::zmq_engine_t::activate_in () { set_pollin (handle); + // Speculative read. in_event (); } void zmq::zmq_engine_t::error () { zmq_assert (inout); - - zmq_connecter_t *reconnecter = NULL; - if (reconnect) { - - // Create a connecter object to attempt reconnect. - // Ask it to wait for a while before reconnecting. - reconnecter = new (std::nothrow) zmq_connecter_t ( - inout->get_io_thread (), inout->get_owner (), - options, inout->get_ordinal (), true); - zmq_assert (reconnecter); - reconnecter->set_address (protocol.c_str(), address.c_str ()); - } - - inout->detach (reconnecter); + inout->detach (); unplug (); delete this; } diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index d89dccc..328ec95 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -38,16 +38,14 @@ namespace zmq { public: - zmq_engine_t (class io_thread_t *parent_, fd_t fd_, - const options_t &options_, bool reconnect_, - const char *protocol_, const char *address_); + zmq_engine_t (fd_t fd_, const options_t &options_); ~zmq_engine_t (); // i_engine interface implementation. - void plug (struct i_inout *inout_); + void plug (class io_thread_t *io_thread_, struct i_inout *inout_); void unplug (); - void revive (); - void resume_input (); + void activate_in (); + void activate_out (); // i_poll_events interface implementation. void in_event (); @@ -73,10 +71,6 @@ namespace zmq options_t options; - bool reconnect; - std::string protocol; - std::string address; - zmq_engine_t (const zmq_engine_t&); void operator = (const zmq_engine_t&); }; diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index 5824f5c..68007a4 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -20,24 +20,29 @@ #include <string.h> #include "zmq_init.hpp" +#include "transient_session.hpp" +#include "named_session.hpp" +#include "socket_base.hpp" #include "zmq_engine.hpp" #include "io_thread.hpp" #include "session.hpp" #include "uuid.hpp" +#include "blob.hpp" #include "err.hpp" -zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_, - fd_t fd_, const options_t &options_, bool reconnect_, - const char *protocol_, const char *address_, uint64_t session_ordinal_) : - owned_t (parent_, owner_), +zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, + socket_base_t *socket_, session_t *session_, fd_t fd_, + const options_t &options_) : + own_t (io_thread_), sent (false), received (false), - session_ordinal (session_ordinal_), - options (options_) + socket (socket_), + session (session_), + options (options_), + io_thread (io_thread_) { // Create the engine object for this connection. - engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, - reconnect_, protocol_, address_); + engine = new (std::nothrow) zmq_engine_t (fd_, options); zmq_assert (engine); } @@ -62,7 +67,7 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) // If initialisation is done, pass the engine to the session and // destroy the init object. - finalise (); + finalise_initialisation (); return true; } @@ -99,44 +104,28 @@ void zmq::zmq_init_t::flush () // If initialisation is done, pass the engine to the session and // destroy the init object. - finalise (); + finalise_initialisation (); } -void zmq::zmq_init_t::detach (owned_t *reconnecter_) +void zmq::zmq_init_t::detach () { // This function is called by engine when disconnection occurs. - // If required, launch the reconnecter. - if (reconnecter_) { - send_plug (reconnecter_); - send_own (owner, reconnecter_); - } + // If there is an associated session, send it a null engine to let it know + // that connection process was unsuccesful. + if (session) + send_attach (session, NULL, blob_t (), true); // The engine will destroy itself, so let's just drop the pointer here and // start termination of the init object. engine = NULL; - term (); -} - -zmq::io_thread_t *zmq::zmq_init_t::get_io_thread () -{ - return choose_io_thread (options.affinity); -} - -class zmq::socket_base_t *zmq::zmq_init_t::get_owner () -{ - return owner; -} - -uint64_t zmq::zmq_init_t::get_ordinal () -{ - return session_ordinal; + terminate (); } void zmq::zmq_init_t::process_plug () { zmq_assert (engine); - engine->plug (this); + engine->plug (io_thread, this); } void zmq::zmq_init_t::process_unplug () @@ -145,51 +134,62 @@ void zmq::zmq_init_t::process_unplug () engine->unplug (); } -void zmq::zmq_init_t::finalise () +void zmq::zmq_init_t::finalise_initialisation () { if (sent && received) { - // Disconnect the engine from the init object. - engine->unplug (); + // If we know what session we belong to, it's easy, just send the + // engine to that session and destroy the init object. + if (session) { + engine->unplug (); + send_attach (session, engine, peer_identity, true); + engine = NULL; + terminate (); + return; + } - session_t *session = NULL; - - // If we have the session ordinal, let's use it to find the session. - // If it is not found, it means socket is already being shut down - // and the session have been deallocated. - // TODO: We should check whether the name of the peer haven't changed - // upon reconnection. - if (session_ordinal) { - session = owner->find_session (session_ordinal); - if (!session) { - term (); - return; - } + // All the cases below are listener-based. Therefore we need the socket + // reference so that new sessions can bind to that socket. + zmq_assert (socket); + + // We have no associated session. If the peer has no identity we'll + // create a transient session for the connection. + if (peer_identity [0] == 0) { + session = new (std::nothrow) transient_session_t (io_thread, + socket, options); + zmq_assert (session); + launch_sibling (session); + engine->unplug (); + send_attach (session, engine, peer_identity, true); + engine = NULL; + terminate (); + return; } - else { - - // If the peer has a unique name, find the associated session. - // If it does not exist, create it. - zmq_assert (!peer_identity.empty ()); - session = owner->find_session (peer_identity); - if (!session) { - session = new (std::nothrow) session_t ( - choose_io_thread (options.affinity), owner, options, - peer_identity); - zmq_assert (session); - send_plug (session); - send_own (owner, session); - - // Reserve a sequence number for following 'attach' command. - session->inc_seqnum (); - } + + // Try to find the session corresponding to the peer's identity. + // If found, send the engine to that session and destroy this object. + // Note that session's seqnum is incremented by find_session rather + // than by send_attach. + session = socket->find_session (peer_identity); + if (session) { + engine->unplug (); + send_attach (session, engine, peer_identity, false); + engine = NULL; + terminate (); + return; } - // No need to increment seqnum as it was already incremented above. - send_attach (session, engine, peer_identity, false); - - // Destroy the init object. + // There's no such named session. We have to create one. +// TODO: +zmq_assert (false); +// session = new (std::nothrow) named_session_t (io_thread, socket, +// options, peer_identity); + zmq_assert (session); + launch_sibling (session); + engine->unplug (); + send_attach (session, engine, peer_identity, true); engine = NULL; - term (); + terminate (); + return; } } diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp index 6f935c2..887f070 100644 --- a/src/zmq_init.hpp +++ b/src/zmq_init.hpp @@ -22,7 +22,7 @@ #include "i_inout.hpp" #include "i_engine.hpp" -#include "owned.hpp" +#include "own.hpp" #include "fd.hpp" #include "stdint.hpp" #include "options.hpp" @@ -34,28 +34,23 @@ namespace zmq // The class handles initialisation phase of 0MQ wire-level protocol. - class zmq_init_t : public owned_t, public i_inout + class zmq_init_t : public own_t, public i_inout { public: - zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_, - fd_t fd_, const options_t &options_, bool reconnect_, - const char *protocol_, const char *address_, - uint64_t session_ordinal_); + zmq_init_t (class io_thread_t *io_thread_, class socket_base_t *socket_, + class session_t *session_, fd_t fd_, const options_t &options_); ~zmq_init_t (); private: - void finalise (); + void finalise_initialisation (); // i_inout interface implementation. bool read (::zmq_msg_t *msg_); bool write (::zmq_msg_t *msg_); void flush (); - void detach (owned_t *reconnecter_); - class io_thread_t *get_io_thread (); - class socket_base_t *get_owner (); - uint64_t get_ordinal (); + void detach (); // Handlers for incoming commands. void process_plug (); @@ -70,16 +65,24 @@ namespace zmq // True if peer's identity was already received. bool received; + // Socket the object belongs to. + class socket_base_t *socket; + + // Reference to the session the init object belongs to. + // If the associated session is unknown and should be found + // depending on peer identity this value is NULL. + class session_t *session; + // Identity of the peer socket. blob_t peer_identity; - // TCP connecter creates session before the name of the peer is known. - // Thus we know only its ordinal number. - uint64_t session_ordinal; - // Associated socket options. options_t options; + // I/O thread the object is living in. It will be used to plug + // the engine into the same I/O thread. + class io_thread_t *io_thread; + zmq_init_t (const zmq_init_t&); void operator = (const zmq_init_t&); }; diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp index d7cf292..93a68f5 100644 --- a/src/zmq_listener.cpp +++ b/src/zmq_listener.cpp @@ -24,11 +24,12 @@ #include "io_thread.hpp" #include "err.hpp" -zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, - socket_base_t *owner_, const options_t &options_) : - owned_t (parent_, owner_), - io_object_t (parent_), - options (options_) +zmq::zmq_listener_t::zmq_listener_t (io_thread_t *io_thread_, + socket_base_t *socket_, const options_t &options_) : + own_t (io_thread_), + io_object_t (io_thread_), + options (options_), + socket (socket_) { } @@ -62,13 +63,11 @@ void zmq::zmq_listener_t::in_event () if (fd == retired_fd) return; - // Create an init object. - io_thread_t *io_thread = choose_io_thread (options.affinity); + // Create and launch an init object. zmq_init_t *init = new (std::nothrow) zmq_init_t ( - io_thread, owner, fd, options, false, NULL, NULL, 0); + choose_io_thread (options.affinity), socket, NULL, fd, options); zmq_assert (init); - send_plug (init); - send_own (owner, init); + launch_sibling (init); } diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp index c990b02..05f4522 100644 --- a/src/zmq_listener.hpp +++ b/src/zmq_listener.hpp @@ -20,7 +20,7 @@ #ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ #define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ -#include "owned.hpp" +#include "own.hpp" #include "io_object.hpp" #include "tcp_listener.hpp" #include "options.hpp" @@ -29,12 +29,12 @@ namespace zmq { - class zmq_listener_t : public owned_t, public io_object_t + class zmq_listener_t : public own_t, public io_object_t { public: - zmq_listener_t (class io_thread_t *parent_, socket_base_t *owner_, - const options_t &options_); + zmq_listener_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_); ~zmq_listener_t (); // Set address to listen on. @@ -58,6 +58,9 @@ namespace zmq // Associated socket options. options_t options; + // Socket the listerner belongs to. + class socket_base_t *socket; + zmq_listener_t (const zmq_listener_t&); void operator = (const zmq_listener_t&); }; |