diff options
-rw-r--r-- | src/command.hpp | 11 | ||||
-rw-r--r-- | src/object.cpp | 43 | ||||
-rw-r--r-- | src/object.hpp | 19 | ||||
-rw-r--r-- | src/owned.cpp | 53 | ||||
-rw-r--r-- | src/owned.hpp | 33 | ||||
-rw-r--r-- | src/session.cpp | 5 | ||||
-rw-r--r-- | src/session.hpp | 2 | ||||
-rw-r--r-- | src/socket_base.cpp | 40 | ||||
-rw-r--r-- | src/socket_base.hpp | 26 | ||||
-rw-r--r-- | src/zmq_connecter.cpp | 4 | ||||
-rw-r--r-- | src/zmq_connecter.hpp | 2 | ||||
-rw-r--r-- | src/zmq_engine.cpp | 3 | ||||
-rw-r--r-- | src/zmq_init.cpp | 8 | ||||
-rw-r--r-- | src/zmq_init.hpp | 2 | ||||
-rw-r--r-- | src/zmq_listener.cpp | 4 | ||||
-rw-r--r-- | src/zmq_listener.hpp | 2 |
16 files changed, 184 insertions, 73 deletions
diff --git a/src/command.hpp b/src/command.hpp index de94ca3..41c7d6c 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -37,11 +37,11 @@ namespace zmq stop, plug, own, + attach, bind, term_req, term, term_ack - } type; union { @@ -57,9 +57,14 @@ namespace zmq // Sent to socket to let it know about the newly created object. struct { - class object_t *object; + class owned_t *object; } own; + // Attach the engine to the session. + struct { + class zmq_engine_t *engine; + } attach; + // Sent between objects to establish pipe(s) between them. struct { } bind; @@ -67,7 +72,7 @@ namespace zmq // Sent by I/O object ot the socket to request the shutdown of // the I/O object. struct { - class object_t *object; + class owned_t *object; } term_req; // Sent by socket to I/O object to start its shutdown. diff --git a/src/object.cpp b/src/object.cpp index e2267d6..b1b7c8a 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -22,6 +22,10 @@ #include "err.hpp" #include "io_thread.hpp" #include "simple_semaphore.hpp" +#include "owned.hpp" +#include "session.hpp" +#include "socket_base.hpp" +#include "zmq_engine.hpp" // TODO: remove this line zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) : dispatcher (dispatcher_), @@ -65,6 +69,10 @@ void zmq::object_t::process_command (command_t &cmd_) process_own (cmd_.args.own.object); return; + case command_t::attach: + process_attach (cmd_.args.attach.engine); + return; + case command_t::bind: process_bind (); return; @@ -101,15 +109,18 @@ void zmq::object_t::send_stop () dispatcher->write (thread_slot, thread_slot, cmd); } -void zmq::object_t::send_plug (object_t *destination_) +void zmq::object_t::send_plug (owned_t *destination_) { + // Let the object know that it cannot shut down till it gets this command. + destination_->inc_seqnum (); + command_t cmd; cmd.destination = destination_; cmd.type = command_t::plug; send_command (cmd); } -void zmq::object_t::send_own (object_t *destination_, object_t *object_) +void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) { command_t cmd; cmd.destination = destination_; @@ -118,6 +129,18 @@ void zmq::object_t::send_own (object_t *destination_, object_t *object_) send_command (cmd); } +void zmq::object_t::send_attach (session_t *destination_, zmq_engine_t *engine_) +{ + // Let the object know that it cannot shut down till it gets this command. + destination_->inc_seqnum (); + + command_t cmd; + cmd.destination = destination_; + cmd.type = command_t::attach; + cmd.args.attach.engine = engine_; + send_command (cmd); +} + void zmq::object_t::send_bind (object_t *destination_) { command_t cmd; @@ -126,7 +149,8 @@ void zmq::object_t::send_bind (object_t *destination_) send_command (cmd); } -void zmq::object_t::send_term_req (object_t *destination_, object_t *object_) +void zmq::object_t::send_term_req (socket_base_t *destination_, + owned_t *object_) { command_t cmd; cmd.destination = destination_; @@ -135,7 +159,7 @@ void zmq::object_t::send_term_req (object_t *destination_, object_t *object_) send_command (cmd); } -void zmq::object_t::send_term (object_t *destination_) +void zmq::object_t::send_term (owned_t *destination_) { command_t cmd; cmd.destination = destination_; @@ -143,7 +167,7 @@ void zmq::object_t::send_term (object_t *destination_) send_command (cmd); } -void zmq::object_t::send_term_ack (object_t *destination_) +void zmq::object_t::send_term_ack (socket_base_t *destination_) { command_t cmd; cmd.destination = destination_; @@ -161,7 +185,12 @@ void zmq::object_t::process_plug () zmq_assert (false); } -void zmq::object_t::process_own (object_t *object_) +void zmq::object_t::process_own (owned_t *object_) +{ + zmq_assert (false); +} + +void zmq::object_t::process_attach (zmq_engine_t *engine_) { zmq_assert (false); } @@ -171,7 +200,7 @@ void zmq::object_t::process_bind () zmq_assert (false); } -void zmq::object_t::process_term_req (object_t *object_) +void zmq::object_t::process_term_req (owned_t *object_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index 7357549..02a071a 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -49,20 +49,25 @@ namespace zmq // Derived object can use these functions to send commands // to other objects. void send_stop (); - void send_plug (object_t *destination_); - void send_own (object_t *destination_, object_t *object_); + void send_plug (class owned_t *destination_); + void send_own (class socket_base_t *destination_, + class owned_t *object_); + void send_attach (class session_t *destination_, + class zmq_engine_t *engine_); void send_bind (object_t *destination_); - void send_term_req (object_t *destination_, object_t *object_); - void send_term (object_t *destination_); - void send_term_ack (object_t *destination_); + void send_term_req (class socket_base_t *destination_, + class owned_t *object_); + void send_term (class owned_t *destination_); + void send_term_ack (class socket_base_t *destination_); // These handlers can be overloaded by the derived objects. They are // called when command arrives from another thread. virtual void process_stop (); virtual void process_plug (); - virtual void process_own (object_t *object_); + virtual void process_own (class owned_t *object_); + virtual void process_attach (class zmq_engine_t *engine_); virtual void process_bind (); - virtual void process_term_req (object_t *object_); + virtual void process_term_req (class owned_t *object_); virtual void process_term (); virtual void process_term_ack (); diff --git a/src/owned.cpp b/src/owned.cpp index 22e257f..6995a39 100644 --- a/src/owned.cpp +++ b/src/owned.cpp @@ -20,11 +20,12 @@ #include "owned.hpp" #include "err.hpp" -zmq::owned_t::owned_t (object_t *parent_, object_t *owner_) : +zmq::owned_t::owned_t (object_t *parent_, socket_base_t *owner_) : object_t (parent_), owner (owner_), - plugged_in (false), - terminated (false) + sent_seqnum (0), + processed_seqnum (0), + shutting_down (false) { } @@ -32,21 +33,18 @@ zmq::owned_t::~owned_t () { } -void zmq::owned_t::process_plug () +void zmq::owned_t::inc_seqnum () { - zmq_assert (!plugged_in); + // NB: This function may be called from a different thread! + sent_seqnum.add (1); +} - // If termination of the object was already requested, destroy it and - // send the termination acknowledgement. - if (terminated) { - send_term_ack (owner); - delete this; - return; - } +void zmq::owned_t::process_plug () +{ + // Keep track of how many commands were processed so far. + processed_seqnum++; - // Notify the generic termination mechanism (io_object_t) that the object - // is already plugged in. - plugged_in = true; + finalise_command (); } void zmq::owned_t::term () @@ -56,19 +54,20 @@ void zmq::owned_t::term () void zmq::owned_t::process_term () { - zmq_assert (!terminated); + zmq_assert (!shutting_down); + shutting_down = true; - // If termination request has occured even before the object was plugged in - // wait till plugging in happens, then acknowledge the termination. - if (!plugged_in) { - terminated = true; - return; - } + finalise_command (); +} - // Otherwise, destroy the object and acknowledge the termination - // straight away. - send_term_ack (owner); - process_unplug (); - delete this; +void zmq::owned_t::finalise_command () +{ + // If termination request was already received and there are no more + // commands to wait for, terminate the object. + if (shutting_down && processed_seqnum == sent_seqnum.get ()) { + send_term_ack (owner); + process_unplug (); + delete this; + } } diff --git a/src/owned.hpp b/src/owned.hpp index 164622e..22595d1 100644 --- a/src/owned.hpp +++ b/src/owned.hpp @@ -20,7 +20,9 @@ #ifndef __ZMQ_OWNED_HPP_INCLUDED__ #define __ZMQ_OWNED_HPP_INCLUDED__ -#include "object.hpp" +#include "socket_base.hpp" +#include "atomic_counter.hpp" +#include "stdint.hpp" namespace zmq { @@ -34,7 +36,12 @@ namespace zmq // The object will live in parent's thread, however, its lifetime // will be managed by its owner socket. - owned_t (object_t *parent_, object_t *owner_); + owned_t (object_t *parent_, socket_base_t *owner_); + + // When another owned object wants to send command to this object + // it calls this function to let it know it should not shut down + // before the command is delivered. + void inc_seqnum (); protected: @@ -57,21 +64,27 @@ namespace zmq // classes to ensure sane cleanup. virtual void process_unplug () = 0; - // Socket owning this object. It is responsible for destroying - // it when it's being closed. - object_t *owner; + // Socket owning this object. When the socket is being closed it's + // responsible for shutting down this object. + socket_base_t *owner; private: // Handlers for incoming commands. void process_term (); - // Set to true when object is plugged in. - bool plugged_in; + // Generic command handler (to be called from all command handlers + // once the processing is done). + void finalise_command (); + + // Sequence number of the last command sent to this object. + atomic_counter_t sent_seqnum; + + // Sequence number of the last command processed by this object. + uint64_t processed_seqnum; - // Set to true when object was terminated before it was plugged in. - // In such case destruction is delayed till 'plug' command arrives. - bool terminated; + // If true, the object is already shutting down. + bool shutting_down; owned_t (const owned_t&); void operator = (const owned_t&); diff --git a/src/session.cpp b/src/session.cpp index fa29dd3..2bb4ff6 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -21,7 +21,7 @@ #include "zmq_engine.hpp" #include "err.hpp" -zmq::session_t::session_t (object_t *parent_, object_t *owner_, +zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, zmq_engine_t *engine_) : owned_t (parent_, owner_), engine (engine_) @@ -48,11 +48,14 @@ void zmq::session_t::flush () void zmq::session_t::process_plug () { + zmq_assert (engine); engine->plug (this); owned_t::process_plug (); } void zmq::session_t::process_unplug () { + zmq_assert (engine); engine->unplug (); + delete engine; } diff --git a/src/session.hpp b/src/session.hpp index 4228fd9..2cb8e18 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -30,7 +30,7 @@ namespace zmq { public: - session_t (object_t *parent_, object_t *owner_, + session_t (object_t *parent_, socket_base_t *owner_, class zmq_engine_t *engine_); private: diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 9701f65..fa6c1e3 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -27,7 +27,9 @@ #include "zmq_listener.hpp" #include "zmq_connecter.hpp" #include "io_thread.hpp" +#include "session.hpp" #include "config.hpp" +#include "owned.hpp" zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), @@ -60,6 +62,9 @@ zmq::socket_base_t::~socket_base_t () while (pending_term_acks) app_thread->process_commands (true); } + + // Check whether there are no session leaks. + zmq_assert (sessions.empty ()); } int zmq::socket_base_t::setsockopt (int option_, void *optval_, @@ -169,12 +174,43 @@ int zmq::socket_base_t::close () return 0; } -void zmq::socket_base_t::process_own (object_t *object_) +void zmq::socket_base_t::register_session (const char *name_, + session_t *session_) +{ + sessions_sync.lock (); + bool inserted = sessions.insert (std::make_pair (name_, session_)).second; + zmq_assert (inserted); + sessions_sync.unlock (); +} + +void zmq::socket_base_t::unregister_session (const char *name_) +{ + sessions_sync.lock (); + sessions_t::iterator it = sessions.find (name_); + zmq_assert (it != sessions.end ()); + sessions.erase (it); + sessions_sync.unlock (); +} + +zmq::session_t *zmq::socket_base_t::get_session (const char *name_) +{ + sessions_sync.lock (); + sessions_t::iterator it = sessions.find (name_); + session_t *session = NULL; + if (it != sessions.end ()) { + session = it->second; + session->inc_seqnum (); + } + sessions_sync.unlock (); + return session; +} + +void zmq::socket_base_t::process_own (owned_t *object_) { io_objects.insert (object_); } -void zmq::socket_base_t::process_term_req (object_t *object_) +void zmq::socket_base_t::process_term_req (owned_t *object_) { // If I/O object is well and alive ask it to terminate. io_objects_t::iterator it = std::find (io_objects.begin (), diff --git a/src/socket_base.hpp b/src/socket_base.hpp index e96cc2d..8e99654 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -21,9 +21,11 @@ #define __ZMQ_SOCKET_BASE_HPP_INCLUDED__ #include <set> +#include <map> #include <string> #include "object.hpp" +#include "mutex.hpp" #include "options.hpp" #include "stdint.hpp" @@ -46,16 +48,26 @@ namespace zmq virtual int recv (struct zmq_msg *msg_, int flags_); virtual int close (); + // Functions that owned objects use to manipulate socket's list + // of existing sessions. + // Note that this functionality cannot be implemented via inter-thread + // commands as it is unacceptable to wait for the completion of the + // action till user application yields control of the application + // thread to 0MQ. + void register_session (const char *name_, class session_t *session_); + void unregister_session (const char *name_); + class session_t *get_session (const char *name_); + private: // Handlers for incoming commands. - void process_own (object_t *object_); - void process_term_req (object_t *object_); + void process_own (class owned_t *object_); + void process_term_req (class owned_t *object_); void process_term_ack (); // List of all I/O objects owned by this socket. The socket is // responsible for deallocating them before it quits. - typedef std::set <object_t*> io_objects_t; + typedef std::set <class owned_t*> io_objects_t; io_objects_t io_objects; // Number of I/O objects that were already asked to terminate @@ -68,6 +80,14 @@ namespace zmq // Socket options. options_t options; + // List of existing sessions. This list is never referenced from within + // the socket, instead it is used by I/O objects owned by the session. + // As those objects can live in different threads, the access is + // synchronised using 'sessions_sync' mutex. + typedef std::map <std::string, session_t*> sessions_t; + sessions_t sessions; + mutex_t sessions_sync; + socket_base_t (const socket_base_t&); void operator = (const socket_base_t&); }; diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index 4416a70..00c8cb2 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -22,8 +22,8 @@ #include "io_thread.hpp" #include "err.hpp" -zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, object_t *owner_, - const options_t &options_) : +zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_, + socket_base_t *owner_, const options_t &options_) : owned_t (parent_, owner_), io_object_t (parent_), handle_valid (false), diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp index 93497cb..dcdec19 100644 --- a/src/zmq_connecter.hpp +++ b/src/zmq_connecter.hpp @@ -33,7 +33,7 @@ namespace zmq { public: - zmq_connecter_t (class io_thread_t *parent_, object_t *owner_, + zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_, const options_t &options_); // Set IP address to connect to. diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 3620d30..d8b8cfc 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -73,7 +73,6 @@ void zmq::zmq_engine_t::in_event () // Read as much data as possible to the read buffer. insize = tcp_socket.read (inbuf, in_batch_size); -printf ("%d bytes read\n", (int) insize); inpos = 0; // Check whether the peer has closed the connection. @@ -132,5 +131,5 @@ void zmq::zmq_engine_t::out_event () void zmq::zmq_engine_t::error () { - zmq_assert (false); +// zmq_assert (false); } diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index fea1452..124622d 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -22,8 +22,8 @@ #include "session.hpp" #include "err.hpp" -zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, object_t *owner_, fd_t fd_, - bool connected_, const options_t &options_) : +zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_, + fd_t fd_, bool connected_, const options_t &options_) : owned_t (parent_, owner_), connected (connected_), options (options_) @@ -81,13 +81,15 @@ void zmq::zmq_init_t::flush () void zmq::zmq_init_t::process_plug () { + zmq_assert (engine); engine->plug (this); owned_t::process_plug (); } void zmq::zmq_init_t::process_unplug () { - engine->unplug (); + if (engine) + engine->unplug (); } void zmq::zmq_init_t::create_session () diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp index 2e0910a..5eb289e 100644 --- a/src/zmq_init.hpp +++ b/src/zmq_init.hpp @@ -44,7 +44,7 @@ namespace zmq // Set 'connected' to true if the connection was created by 'connect' // function. If it was accepted from a listening socket, set it to // false. - zmq_init_t (class io_thread_t *parent_, object_t *owner_, fd_t fd_, + zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_, fd_t fd_, bool connected_, const options_t &options); ~zmq_init_t (); diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp index c990468..49bbf61 100644 --- a/src/zmq_listener.cpp +++ b/src/zmq_listener.cpp @@ -22,8 +22,8 @@ #include "io_thread.hpp" #include "err.hpp" -zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, object_t *owner_, - const options_t &options_) : +zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, + socket_base_t *owner_, const options_t &options_) : owned_t (parent_, owner_), io_object_t (parent_), options (options_) diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp index f85ad5a..899aaf9 100644 --- a/src/zmq_listener.hpp +++ b/src/zmq_listener.hpp @@ -33,7 +33,7 @@ namespace zmq { public: - zmq_listener_t (class io_thread_t *parent_, object_t *owner_, + zmq_listener_t (class io_thread_t *parent_, socket_base_t *owner_, const options_t &options_); // Set IP address to listen on. |