summaryrefslogtreecommitdiff
path: root/src/socket_base.hpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-05-23 20:30:01 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-05-23 20:30:01 +0200
commit0f6f7276e32c01ccfe86fb76741a52ac6ffc87af (patch)
tree3f2cec589f6243742da7e79028633d35f8b362db /src/socket_base.hpp
parentacf0b0e515515e51ad32ba7a2d147ce703579478 (diff)
Move the pipe termination code to socket_base_t
So far, the pipe termination code was spread among socket type classes, fair queuer, load balancer, etc. This patch moves all the associated logic to a single place. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/socket_base.hpp')
-rw-r--r--src/socket_base.hpp38
1 files changed, 21 insertions, 17 deletions
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 531751b..7126733 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -34,6 +34,7 @@
#include "mailbox.hpp"
#include "stdint.hpp"
#include "blob.hpp"
+#include "pipe.hpp"
#include "own.hpp"
namespace zmq
@@ -42,7 +43,8 @@ namespace zmq
class socket_base_t :
public own_t,
public array_item_t,
- public i_poll_events
+ public i_poll_events,
+ public i_pipe_events
{
friend class reaper_t;
@@ -81,14 +83,6 @@ namespace zmq
void unregister_session (const blob_t &name_);
class session_t *find_session (const blob_t &name_);
- // i_reader_events interface implementation.
- void activated (class reader_t *pipe_);
- void terminated (class reader_t *pipe_);
-
- // i_writer_events interface implementation.
- void activated (class writer_t *pipe_);
- void terminated (class writer_t *pipe_);
-
// Using this function reaper thread ask the socket to regiter with
// its poller.
void start_reaping (poller_t *poller_);
@@ -99,9 +93,10 @@ namespace zmq
void out_event ();
void timer_event (int id_);
- // To be called after processing commands or invoking any command
- // handlers explicitly. If required, it will deallocate the socket.
- void check_destroy ();
+ // i_pipe_events interface implementation.
+ void read_activated (pipe_t *pipe_);
+ void write_activated (pipe_t *pipe_);
+ void terminated (pipe_t *pipe_);
protected:
@@ -127,16 +122,20 @@ namespace zmq
virtual bool xhas_in ();
virtual int xrecv (class msg_t *msg_, int options_);
- // We are declaring termination handler as protected so that
- // individual socket types can hook into the termination process
- // by overloading it.
- void process_term (int linger_);
+ // i_pipe_events will be forwarded to these functions.
+ virtual void xread_activated (pipe_t *pipe_);
+ virtual void xwrite_activated (pipe_t *pipe_);
+ virtual void xterminated (pipe_t *pipe_) = 0;
// Delay actual destruction of the socket.
void process_destroy ();
private:
+ // To be called after processing commands or invoking any command
+ // handlers explicitly. If required, it will deallocate the socket.
+ void check_destroy ();
+
// Used to check whether the object is a socket.
uint32_t tag;
@@ -156,7 +155,7 @@ namespace zmq
// bind, is available and compatible with the socket type.
int check_protocol (const std::string &protocol_);
- // If no identity is set, generate one and call xattach_pipe ().
+ // Register the pipe with this socket.
void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
// Processes commands sent to this socket (if any). If 'block' is
@@ -169,10 +168,15 @@ namespace zmq
void process_stop ();
void process_bind (class pipe_t *pipe_, const blob_t &peer_identity_);
void process_unplug ();
+ void process_term (int linger_);
// Socket's mailbox object.
mailbox_t mailbox;
+ // List of attached pipes.
+ typedef array_t <pipe_t, 3> pipes_t;
+ pipes_t pipes;
+
// Reaper's poller and handle of this socket within it.
poller_t *poller;
poller_t::handle_t handle;