From 0f6f7276e32c01ccfe86fb76741a52ac6ffc87af Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 23 May 2011 20:30:01 +0200 Subject: Move the pipe termination code to socket_base_t So far, the pipe termination code was spread among socket type classes, fair queuer, load balancer, etc. This patch moves all the associated logic to a single place. Signed-off-by: Martin Sustrik --- src/socket_base.hpp | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) (limited to 'src/socket_base.hpp') diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 531751b..7126733 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -34,6 +34,7 @@ #include "mailbox.hpp" #include "stdint.hpp" #include "blob.hpp" +#include "pipe.hpp" #include "own.hpp" namespace zmq @@ -42,7 +43,8 @@ namespace zmq class socket_base_t : public own_t, public array_item_t, - public i_poll_events + public i_poll_events, + public i_pipe_events { friend class reaper_t; @@ -81,14 +83,6 @@ namespace zmq void unregister_session (const blob_t &name_); class session_t *find_session (const blob_t &name_); - // i_reader_events interface implementation. - void activated (class reader_t *pipe_); - void terminated (class reader_t *pipe_); - - // i_writer_events interface implementation. - void activated (class writer_t *pipe_); - void terminated (class writer_t *pipe_); - // Using this function reaper thread ask the socket to regiter with // its poller. void start_reaping (poller_t *poller_); @@ -99,9 +93,10 @@ namespace zmq void out_event (); void timer_event (int id_); - // To be called after processing commands or invoking any command - // handlers explicitly. If required, it will deallocate the socket. - void check_destroy (); + // i_pipe_events interface implementation. + void read_activated (pipe_t *pipe_); + void write_activated (pipe_t *pipe_); + void terminated (pipe_t *pipe_); protected: @@ -127,16 +122,20 @@ namespace zmq virtual bool xhas_in (); virtual int xrecv (class msg_t *msg_, int options_); - // We are declaring termination handler as protected so that - // individual socket types can hook into the termination process - // by overloading it. - void process_term (int linger_); + // i_pipe_events will be forwarded to these functions. + virtual void xread_activated (pipe_t *pipe_); + virtual void xwrite_activated (pipe_t *pipe_); + virtual void xterminated (pipe_t *pipe_) = 0; // Delay actual destruction of the socket. void process_destroy (); private: + // To be called after processing commands or invoking any command + // handlers explicitly. If required, it will deallocate the socket. + void check_destroy (); + // Used to check whether the object is a socket. uint32_t tag; @@ -156,7 +155,7 @@ namespace zmq // bind, is available and compatible with the socket type. int check_protocol (const std::string &protocol_); - // If no identity is set, generate one and call xattach_pipe (). + // Register the pipe with this socket. void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); // Processes commands sent to this socket (if any). If 'block' is @@ -169,10 +168,15 @@ namespace zmq void process_stop (); void process_bind (class pipe_t *pipe_, const blob_t &peer_identity_); void process_unplug (); + void process_term (int linger_); // Socket's mailbox object. mailbox_t mailbox; + // List of attached pipes. + typedef array_t pipes_t; + pipes_t pipes; + // Reaper's poller and handle of this socket within it. poller_t *poller; poller_t::handle_t handle; -- cgit v1.2.3