From 0f6f7276e32c01ccfe86fb76741a52ac6ffc87af Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 23 May 2011 20:30:01 +0200 Subject: Move the pipe termination code to socket_base_t So far, the pipe termination code was spread among socket type classes, fair queuer, load balancer, etc. This patch moves all the associated logic to a single place. Signed-off-by: Martin Sustrik --- src/array.hpp | 38 ++++++++++++++++++++++----- src/dist.cpp | 25 ++---------------- src/dist.hpp | 15 +++-------- src/fq.cpp | 30 ++------------------- src/fq.hpp | 14 +++------- src/lb.cpp | 25 ++---------------- src/lb.hpp | 15 +++-------- src/own.cpp | 8 +++--- src/own.hpp | 3 +++ src/pair.cpp | 31 +++------------------- src/pair.hpp | 17 +++--------- src/pipe.cpp | 4 +-- src/pull.cpp | 21 +++------------ src/pull.hpp | 14 +++------- src/push.cpp | 20 +++----------- src/push.hpp | 14 +++------- src/reaper.cpp | 6 +---- src/socket_base.cpp | 59 ++++++++++++++++++++++++++++++++++++++++-- src/socket_base.hpp | 38 +++++++++++++++------------ src/sub.hpp | 2 +- src/xpub.cpp | 23 +++------------- src/xpub.hpp | 14 +++------- src/xrep.cpp | 33 ++++------------------- src/xrep.hpp | 18 +++---------- src/xreq.cpp | 20 ++++---------- src/xreq.hpp | 15 +++-------- src/xsub.cpp | 18 ++----------- src/xsub.hpp | 14 +++------- tests/test_shutdown_stress.cpp | 2 +- 29 files changed, 190 insertions(+), 366 deletions(-) diff --git a/src/array.hpp b/src/array.hpp index e7b5266..bb3c0ea 100644 --- a/src/array.hpp +++ b/src/array.hpp @@ -38,7 +38,8 @@ namespace zmq inline array_item_t () : array_index1 (-1), - array_index2 (-1) + array_index2 (-1), + array_index3 (-1) { } @@ -68,10 +69,21 @@ namespace zmq return array_index2; } + inline void set_array_index3 (int index_) + { + array_index3 = index_; + } + + inline int get_array_index3 () + { + return array_index3; + } + private: int array_index1; int array_index2; + int array_index3; array_item_t (const array_item_t&); const array_item_t &operator = (const array_item_t&); @@ -117,8 +129,10 @@ namespace zmq if (item_) { if (N == 1) item_->set_array_index1 ((int) items.size ()); - else + else if (N == 2) item_->set_array_index2 ((int) items.size ()); + else + item_->set_array_index3 ((int) items.size ()); } items.push_back (item_); } @@ -127,16 +141,20 @@ namespace zmq { if (N == 1) erase (item_->get_array_index1 ()); - else + else if (N == 2) erase (item_->get_array_index2 ()); + else + erase (item_->get_array_index3 ()); } inline void erase (size_type index_) { if (items.back ()) { if (N == 1) items.back ()->set_array_index1 ((int) index_); - else + else if (N == 2) items.back ()->set_array_index2 ((int) index_); + else + items.back ()->set_array_index3 ((int) index_); } items [index_] = items.back (); items.pop_back (); @@ -150,12 +168,18 @@ namespace zmq if (items [index2_]) items [index2_]->set_array_index1 ((int) index1_); } - else { + else if (N == 2) { if (items [index1_]) items [index1_]->set_array_index2 ((int) index2_); if (items [index2_]) items [index2_]->set_array_index2 ((int) index1_); } + else { + if (items [index1_]) + items [index1_]->set_array_index3 ((int) index2_); + if (items [index2_]) + items [index2_]->set_array_index3 ((int) index1_); + } std::swap (items [index1_], items [index2_]); } @@ -168,8 +192,10 @@ namespace zmq { if (N == 1) return (size_type) item_->get_array_index1 (); - else + else if (N == 2) return (size_type) item_->get_array_index2 (); + else + return (size_type) item_->get_array_index3 (); } private: diff --git a/src/dist.cpp b/src/dist.cpp index 3afa196..f7f0488 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -21,16 +21,13 @@ #include "dist.hpp" #include "pipe.hpp" #include "err.hpp" -#include "own.hpp" #include "msg.hpp" #include "likely.hpp" -zmq::dist_t::dist_t (own_t *sink_) : +zmq::dist_t::dist_t () : active (0), eligible (0), - more (false), - sink (sink_), - terminating (false) + more (false) { } @@ -55,21 +52,6 @@ void zmq::dist_t::attach (pipe_t *pipe_) active++; eligible++; } - - if (unlikely (terminating)) { - sink->register_term_acks (1); - pipe_->terminate (); - } -} - -void zmq::dist_t::terminate () -{ - zmq_assert (!terminating); - terminating = true; - - sink->register_term_acks ((int) pipes.size ()); - for (pipes_t::size_type i = 0; i != pipes.size (); i++) - pipes [i]->terminate (); } void zmq::dist_t::terminated (pipe_t *pipe_) @@ -81,9 +63,6 @@ void zmq::dist_t::terminated (pipe_t *pipe_) if (pipes.index (pipe_) < eligible) eligible--; pipes.erase (pipe_); - - if (unlikely (terminating)) - sink->unregister_term_ack (); } void zmq::dist_t::activated (pipe_t *pipe_) diff --git a/src/dist.hpp b/src/dist.hpp index c137332..10613c1 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -35,17 +35,16 @@ namespace zmq { public: - dist_t (class own_t *sink_); + dist_t (); ~dist_t (); void attach (class pipe_t *pipe_); - void terminate (); - int send (class msg_t *msg_, int flags_); - bool has_out (); - void activated (class pipe_t *pipe_); void terminated (class pipe_t *pipe_); + int send (class msg_t *msg_, int flags_); + bool has_out (); + private: // Write the message to the pipe. Make the pipe inactive if writing @@ -74,12 +73,6 @@ namespace zmq // True if last we are in the middle of a multipart message. bool more; - // Object to send events to. - class own_t *sink; - - // If true, termination process is already underway. - bool terminating; - dist_t (const dist_t&); const dist_t &operator = (const dist_t&); }; diff --git a/src/fq.cpp b/src/fq.cpp index b4ee641..7318822 100644 --- a/src/fq.cpp +++ b/src/fq.cpp @@ -21,15 +21,12 @@ #include "fq.hpp" #include "pipe.hpp" #include "err.hpp" -#include "own.hpp" #include "msg.hpp" -zmq::fq_t::fq_t (own_t *sink_) : +zmq::fq_t::fq_t () : active (0), current (0), - more (false), - sink (sink_), - terminating (false) + more (false) { } @@ -43,20 +40,10 @@ void zmq::fq_t::attach (pipe_t *pipe_) pipes.push_back (pipe_); pipes.swap (active, pipes.size () - 1); active++; - - // If we are already terminating, ask the pipe to terminate straight away. - if (terminating) { - sink->register_term_acks (1); - pipe_->terminate (); - } } void zmq::fq_t::terminated (pipe_t *pipe_) { - // Make sure that we are not closing current pipe while - // message is half-read. - zmq_assert (terminating || (!more || pipes [current] != pipe_)); - // Remove the pipe from the list; adjust number of active pipes // accordingly. if (pipes.index (pipe_) < active) { @@ -65,19 +52,6 @@ void zmq::fq_t::terminated (pipe_t *pipe_) current = 0; } pipes.erase (pipe_); - - if (terminating) - sink->unregister_term_ack (); -} - -void zmq::fq_t::terminate () -{ - zmq_assert (!terminating); - terminating = true; - - sink->register_term_acks ((int) pipes.size ()); - for (pipes_t::size_type i = 0; i != pipes.size (); i++) - pipes [i]->terminate (); } void zmq::fq_t::activated (pipe_t *pipe_) diff --git a/src/fq.hpp b/src/fq.hpp index bbe1b59..106e978 100644 --- a/src/fq.hpp +++ b/src/fq.hpp @@ -35,18 +35,16 @@ namespace zmq { public: - fq_t (class own_t *sink_); + fq_t (); ~fq_t (); void attach (pipe_t *pipe_); - void terminate (); + void activated (pipe_t *pipe_); + void terminated (pipe_t *pipe_); int recv (msg_t *msg_, int flags_); bool has_in (); - void activated (pipe_t *pipe_); - void terminated (pipe_t *pipe_); - private: // Inbound pipes. @@ -64,12 +62,6 @@ namespace zmq // there are following parts still waiting in the current pipe. bool more; - // Object to send events to. - class own_t *sink; - - // If true, termination process is already underway. - bool terminating; - fq_t (const fq_t&); const fq_t &operator = (const fq_t&); }; diff --git a/src/lb.cpp b/src/lb.cpp index 2ba902a..7aeef9e 100644 --- a/src/lb.cpp +++ b/src/lb.cpp @@ -21,16 +21,13 @@ #include "lb.hpp" #include "pipe.hpp" #include "err.hpp" -#include "own.hpp" #include "msg.hpp" -zmq::lb_t::lb_t (own_t *sink_) : +zmq::lb_t::lb_t () : active (0), current (0), more (false), - dropping (false), - sink (sink_), - terminating (false) + dropping (false) { } @@ -44,21 +41,6 @@ void zmq::lb_t::attach (pipe_t *pipe_) pipes.push_back (pipe_); pipes.swap (active, pipes.size () - 1); active++; - - if (terminating) { - sink->register_term_acks (1); - pipe_->terminate (); - } -} - -void zmq::lb_t::terminate () -{ - zmq_assert (!terminating); - terminating = true; - - sink->register_term_acks ((int) pipes.size ()); - for (pipes_t::size_type i = 0; i != pipes.size (); i++) - pipes [i]->terminate (); } void zmq::lb_t::terminated (pipe_t *pipe_) @@ -78,9 +60,6 @@ void zmq::lb_t::terminated (pipe_t *pipe_) current = 0; } pipes.erase (pipe_); - - if (terminating) - sink->unregister_term_ack (); } void zmq::lb_t::activated (pipe_t *pipe_) diff --git a/src/lb.hpp b/src/lb.hpp index d764f6d..0dfd25e 100644 --- a/src/lb.hpp +++ b/src/lb.hpp @@ -34,17 +34,16 @@ namespace zmq { public: - lb_t (class own_t *sink_); + lb_t (); ~lb_t (); void attach (pipe_t *pipe_); - void terminate (); - int send (msg_t *msg_, int flags_); - bool has_out (); - void activated (pipe_t *pipe_); void terminated (pipe_t *pipe_); + int send (msg_t *msg_, int flags_); + bool has_out (); + private: // List of outbound pipes. @@ -64,12 +63,6 @@ namespace zmq // True if we are dropping current message. bool dropping; - // Object to send events to. - class own_t *sink; - - // If true, termination process is already underway. - bool terminating; - lb_t (const lb_t&); const lb_t &operator = (const lb_t&); }; diff --git a/src/own.cpp b/src/own.cpp index cdf20a4..f2ca4b2 100644 --- a/src/own.cpp +++ b/src/own.cpp @@ -153,6 +153,11 @@ void zmq::own_t::terminate () send_term_req (owner, this); } +bool zmq::own_t::is_terminating () +{ + return terminating; +} + void zmq::own_t::process_term (int linger_) { // Double termination should never happen. @@ -173,7 +178,6 @@ void zmq::own_t::process_term (int linger_) void zmq::own_t::register_term_acks (int count_) { term_acks += count_; - printf ("reg %d acks (%p, %d)\n", count_, (void*) this, term_acks); } void zmq::own_t::unregister_term_ack () @@ -181,8 +185,6 @@ void zmq::own_t::unregister_term_ack () zmq_assert (term_acks > 0); term_acks--; - printf ("unreg 1 acks (%p, %d)\n", (void*) this, term_acks); - // This may be a last ack we are waiting for before termination... check_term_acks (); } diff --git a/src/own.hpp b/src/own.hpp index 5023a30..0902f73 100644 --- a/src/own.hpp +++ b/src/own.hpp @@ -76,6 +76,9 @@ namespace zmq // called more than once. void terminate (); + // Returns true if the object is in process of termination. + bool is_terminating (); + // Derived object destroys own_t. There's no point in allowing // others to invoke the destructor. At the same time, it has to be // virtual so that generic own_t deallocation mechanism destroys diff --git a/src/pair.cpp b/src/pair.cpp index 93a4327..30b56e6 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -25,8 +25,7 @@ zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) : socket_base_t (parent_, tid_), - pipe (NULL), - terminating (false) + pipe (NULL) { options.type = ZMQ_PAIR; } @@ -39,44 +38,22 @@ zmq::pair_t::~pair_t () void zmq::pair_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { zmq_assert (!pipe); - pipe = pipe_; - pipe->set_event_sink (this); - - if (terminating) { - register_term_acks (1); - pipe_->terminate (); - } } -void zmq::pair_t::terminated (pipe_t *pipe_) +void zmq::pair_t::xterminated (pipe_t *pipe_) { zmq_assert (pipe_ == pipe); pipe = NULL; - - if (terminating) - unregister_term_ack (); -} - -void zmq::pair_t::process_term (int linger_) -{ - terminating = true; - - if (pipe) { - register_term_acks (1); - pipe->terminate (); - } - - socket_base_t::process_term (linger_); } -void zmq::pair_t::read_activated (pipe_t *pipe_) +void zmq::pair_t::xread_activated (pipe_t *pipe_) { // There's just one pipe. No lists of active and inactive pipes. // There's nothing to do here. } -void zmq::pair_t::write_activated (pipe_t *pipe_) +void zmq::pair_t::xwrite_activated (pipe_t *pipe_) { // There's just one pipe. No lists of active and inactive pipes. // There's nothing to do here. diff --git a/src/pair.hpp b/src/pair.hpp index 2cb050a..1ddf50e 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -22,14 +22,12 @@ #define __ZMQ_PAIR_HPP_INCLUDED__ #include "socket_base.hpp" -#include "pipe.hpp" namespace zmq { class pair_t : - public socket_base_t, - public i_pipe_events + public socket_base_t { public: @@ -42,21 +40,14 @@ namespace zmq int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); - - // i_pipe_events interface implementation. - void read_activated (class pipe_t *pipe_); - void write_activated (class pipe_t *pipe_); - void terminated (class pipe_t *pipe_); + void xread_activated (class pipe_t *pipe_); + void xwrite_activated (class pipe_t *pipe_); + void xterminated (class pipe_t *pipe_); private: - // Hook into termination process. - void process_term (int linger_); - class pipe_t *pipe; - bool terminating; - pair_t (const pair_t&); const pair_t &operator = (const pair_t&); }; diff --git a/src/pipe.cpp b/src/pipe.cpp index fb03042..3d0c0a6 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -108,7 +108,7 @@ bool zmq::pipe_t::check_read () // If pipe_term was already received but wasn't processed because // of pending messages, we can ack it now. - if (terminating) + if (term_recvd) send_pipe_term_ack (peer); return false; @@ -133,7 +133,7 @@ bool zmq::pipe_t::read (msg_t *msg_) // If pipe_term was already received but wasn't processed because // of pending messages, we can ack it now. - if (terminating) + if (term_recvd) send_pipe_term_ack (peer); return false; diff --git a/src/pull.cpp b/src/pull.cpp index 66457b8..5e48777 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -21,10 +21,10 @@ #include "pull.hpp" #include "err.hpp" #include "msg.hpp" +#include "pipe.hpp" zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_), - fq (this) + socket_base_t (parent_, tid_) { options.type = ZMQ_PULL; } @@ -36,32 +36,19 @@ zmq::pull_t::~pull_t () void zmq::pull_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { zmq_assert (pipe_); - pipe_->set_event_sink (this); fq.attach (pipe_); } -void zmq::pull_t::read_activated (pipe_t *pipe_) +void zmq::pull_t::xread_activated (pipe_t *pipe_) { fq.activated (pipe_); } -void zmq::pull_t::write_activated (pipe_t *pipe_) -{ - // There are no outbound messages in pull socket. This should never happen. - zmq_assert (false); -} - -void zmq::pull_t::terminated (pipe_t *pipe_) +void zmq::pull_t::xterminated (pipe_t *pipe_) { fq.terminated (pipe_); } -void zmq::pull_t::process_term (int linger_) -{ - fq.terminate (); - socket_base_t::process_term (linger_); -} - int zmq::pull_t::xrecv (msg_t *msg_, int flags_) { return fq.recv (msg_, flags_); diff --git a/src/pull.hpp b/src/pull.hpp index af59724..cbcf05a 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -22,15 +22,13 @@ #define __ZMQ_PULL_HPP_INCLUDED__ #include "socket_base.hpp" -#include "pipe.hpp" #include "fq.hpp" namespace zmq { class pull_t : - public socket_base_t, - public i_pipe_events + public socket_base_t { public: @@ -43,17 +41,11 @@ namespace zmq void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); + void xread_activated (class pipe_t *pipe_); + void xterminated (class pipe_t *pipe_); private: - // i_pipe_events interface implementation. - void read_activated (pipe_t *pipe_); - void write_activated (pipe_t *pipe_); - void terminated (pipe_t *pipe_); - - // Hook into the termination process. - void process_term (int linger_); - // Fair queueing object for inbound pipes. fq_t fq; diff --git a/src/push.cpp b/src/push.cpp index 12fc8d2..44ebc07 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -24,8 +24,7 @@ #include "msg.hpp" zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_), - lb (this) + socket_base_t (parent_, tid_) { options.type = ZMQ_PUSH; } @@ -37,32 +36,19 @@ zmq::push_t::~push_t () void zmq::push_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { zmq_assert (pipe_); - pipe_->set_event_sink (this); lb.attach (pipe_); } -void zmq::push_t::read_activated (pipe_t *pipe_) -{ - // There are no inbound messages in push socket. This should never happen. - zmq_assert (false); -} - -void zmq::push_t::write_activated (pipe_t *pipe_) +void zmq::push_t::xwrite_activated (pipe_t *pipe_) { lb.activated (pipe_); } -void zmq::push_t::terminated (pipe_t *pipe_) +void zmq::push_t::xterminated (pipe_t *pipe_) { lb.terminated (pipe_); } -void zmq::push_t::process_term (int linger_) -{ - lb.terminate (); - socket_base_t::process_term (linger_); -} - int zmq::push_t::xsend (msg_t *msg_, int flags_) { return lb.send (msg_, flags_); diff --git a/src/push.hpp b/src/push.hpp index 67763eb..5dabe14 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -22,15 +22,13 @@ #define __ZMQ_PUSH_HPP_INCLUDED__ #include "socket_base.hpp" -#include "pipe.hpp" #include "lb.hpp" namespace zmq { class push_t : - public socket_base_t, - public i_pipe_events + public socket_base_t { public: @@ -43,17 +41,11 @@ namespace zmq void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); int xsend (class msg_t *msg_, int flags_); bool xhas_out (); + void xwrite_activated (class pipe_t *pipe_); + void xterminated (class pipe_t *pipe_); private: - // i_pipe_events interface implementation. - void read_activated (pipe_t *pipe_); - void write_activated (pipe_t *pipe_); - void terminated (pipe_t *pipe_); - - // Hook into the termination process. - void process_term (int linger_); - // Load balancer managing the outbound pipes. lb_t lb; diff --git a/src/reaper.cpp b/src/reaper.cpp index d3ebbba..0295137 100644 --- a/src/reaper.cpp +++ b/src/reaper.cpp @@ -87,7 +87,7 @@ void zmq::reaper_t::process_stop () { terminating = true; - // If there are no sockets beig reaped finish immediately. + // If there are no sockets being reaped finish immediately. if (!sockets) { send_done (); poller->rm_fd (mailbox_handle); @@ -100,10 +100,6 @@ void zmq::reaper_t::process_reap (socket_base_t *socket_) // Add the socket to the poller. socket_->start_reaping (poller); - // Start termination of associated I/O object hierarchy. - socket_->terminate (); - socket_->check_destroy (); - ++sockets; } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index baa4bd2..fae55f2 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -211,10 +211,15 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) return 0; } -void zmq::socket_base_t::attach_pipe (class pipe_t *pipe_, +void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { - // If the peer haven't specified it's identity, let's generate one. + // First, register the pipe so that we can terminate it later on. + pipe_->set_event_sink (this); + pipes.push_back (pipe_); + + // Then, pass the pipe to the specific socket type. + // If the peer haven't specified it's identity, let's generate one. if (peer_identity_.size ()) { xattach_pipe (pipe_, peer_identity_); } @@ -223,6 +228,13 @@ void zmq::socket_base_t::attach_pipe (class pipe_t *pipe_, generate_uuid ((unsigned char*) identity.data () + 1); xattach_pipe (pipe_, identity); } + + // If the socket is already being closed, ask any new pipes to terminate + // straight away. + if (is_terminating ()) { + register_term_acks (1); + pipe_->terminate (); + } } int zmq::socket_base_t::setsockopt (int option_, const void *optval_, @@ -635,9 +647,15 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_) void zmq::socket_base_t::start_reaping (poller_t *poller_) { + // Plug the socket to the reaper thread. poller = poller_; handle = poller->add_fd (mailbox.get_fd (), this); poller->set_pollin (handle); + + // Initialise the termination and check whether it can be deallocated + // immediately. + terminate (); + check_destroy (); } int zmq::socket_base_t::process_commands (bool block_, bool throttle_) @@ -720,6 +738,11 @@ void zmq::socket_base_t::process_term (int linger_) // will be initiated. unregister_endpoints (this); + // Ask all attached pipes to terminate. + for (pipes_t::size_type i = 0; i != pipes.size (); ++i) + pipes [i]->terminate (); + register_term_acks (pipes.size ()); + // Continue the termination process immediately. own_t::process_term (linger_); } @@ -758,6 +781,15 @@ int zmq::socket_base_t::xrecv (msg_t *msg_, int options_) return -1; } +void zmq::socket_base_t::xread_activated (pipe_t *pipe_) +{ + zmq_assert (false); +} +void zmq::socket_base_t::xwrite_activated (pipe_t *pipe_) +{ + zmq_assert (false); +} + void zmq::socket_base_t::in_event () { // Process any commands from other threads/sockets that may be available @@ -794,3 +826,26 @@ void zmq::socket_base_t::check_destroy () own_t::process_destroy (); } } + +void zmq::socket_base_t::read_activated (pipe_t *pipe_) +{ + xread_activated (pipe_); +} + +void zmq::socket_base_t::write_activated (pipe_t *pipe_) +{ + xwrite_activated (pipe_); +} + +void zmq::socket_base_t::terminated (pipe_t *pipe_) +{ + // Notify the specific socket type about the pipe termination. + xterminated (pipe_); + + // Remove the pipe from the list of attached pipes and confirm its + // termination if we are already shutting down. + pipes.erase (pipe_); + if (is_terminating ()) + unregister_term_ack (); +} + diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 531751b..7126733 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -34,6 +34,7 @@ #include "mailbox.hpp" #include "stdint.hpp" #include "blob.hpp" +#include "pipe.hpp" #include "own.hpp" namespace zmq @@ -42,7 +43,8 @@ namespace zmq class socket_base_t : public own_t, public array_item_t, - public i_poll_events + public i_poll_events, + public i_pipe_events { friend class reaper_t; @@ -81,14 +83,6 @@ namespace zmq void unregister_session (const blob_t &name_); class session_t *find_session (const blob_t &name_); - // i_reader_events interface implementation. - void activated (class reader_t *pipe_); - void terminated (class reader_t *pipe_); - - // i_writer_events interface implementation. - void activated (class writer_t *pipe_); - void terminated (class writer_t *pipe_); - // Using this function reaper thread ask the socket to regiter with // its poller. void start_reaping (poller_t *poller_); @@ -99,9 +93,10 @@ namespace zmq void out_event (); void timer_event (int id_); - // To be called after processing commands or invoking any command - // handlers explicitly. If required, it will deallocate the socket. - void check_destroy (); + // i_pipe_events interface implementation. + void read_activated (pipe_t *pipe_); + void write_activated (pipe_t *pipe_); + void terminated (pipe_t *pipe_); protected: @@ -127,16 +122,20 @@ namespace zmq virtual bool xhas_in (); virtual int xrecv (class msg_t *msg_, int options_); - // We are declaring termination handler as protected so that - // individual socket types can hook into the termination process - // by overloading it. - void process_term (int linger_); + // i_pipe_events will be forwarded to these functions. + virtual void xread_activated (pipe_t *pipe_); + virtual void xwrite_activated (pipe_t *pipe_); + virtual void xterminated (pipe_t *pipe_) = 0; // Delay actual destruction of the socket. void process_destroy (); private: + // To be called after processing commands or invoking any command + // handlers explicitly. If required, it will deallocate the socket. + void check_destroy (); + // Used to check whether the object is a socket. uint32_t tag; @@ -156,7 +155,7 @@ namespace zmq // bind, is available and compatible with the socket type. int check_protocol (const std::string &protocol_); - // If no identity is set, generate one and call xattach_pipe (). + // Register the pipe with this socket. void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); // Processes commands sent to this socket (if any). If 'block' is @@ -169,10 +168,15 @@ namespace zmq void process_stop (); void process_bind (class pipe_t *pipe_, const blob_t &peer_identity_); void process_unplug (); + void process_term (int linger_); // Socket's mailbox object. mailbox_t mailbox; + // List of attached pipes. + typedef array_t pipes_t; + pipes_t pipes; + // Reaper's poller and handle of this socket within it. poller_t *poller; poller_t::handle_t handle; diff --git a/src/sub.hpp b/src/sub.hpp index 8575961..8ba8987 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -37,7 +37,7 @@ namespace zmq int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (class msg_t *msg_, int options_); - bool xhas_out (); + bool xhas_out (); private: diff --git a/src/xpub.cpp b/src/xpub.cpp index 888b42d..d5dba9f 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -24,8 +24,7 @@ #include "msg.hpp" zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_), - dist (this) + socket_base_t (parent_, tid_) { options.type = ZMQ_XPUB; } @@ -37,35 +36,19 @@ zmq::xpub_t::~xpub_t () void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { zmq_assert (pipe_); - pipe_->set_event_sink (this); dist.attach (pipe_); } -void zmq::xpub_t::read_activated (pipe_t *pipe_) -{ - // PUB socket never receives messages. This should never happen. - zmq_assert (false); -} - -void zmq::xpub_t::write_activated (pipe_t *pipe_) +void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) { dist.activated (pipe_); } -void zmq::xpub_t::terminated (pipe_t *pipe_) +void zmq::xpub_t::xterminated (pipe_t *pipe_) { dist.terminated (pipe_); } -void zmq::xpub_t::process_term (int linger_) -{ - // Terminate the outbound pipes. - dist.terminate (); - - // Continue with the termination immediately. - socket_base_t::process_term (linger_); -} - int zmq::xpub_t::xsend (msg_t *msg_, int flags_) { return dist.send (msg_, flags_); diff --git a/src/xpub.hpp b/src/xpub.hpp index 48efd17..8a6ff73 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -23,15 +23,13 @@ #include "socket_base.hpp" #include "array.hpp" -#include "pipe.hpp" #include "dist.hpp" namespace zmq { class xpub_t : - public socket_base_t, - public i_pipe_events + public socket_base_t { public: @@ -44,17 +42,11 @@ namespace zmq bool xhas_out (); int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); + void xwrite_activated (class pipe_t *pipe_); + void xterminated (class pipe_t *pipe_); private: - // i_pipe_events interface implementation. - void read_activated (pipe_t *pipe_); - void write_activated (pipe_t *pipe_); - void terminated (pipe_t *pipe_); - - // Hook into the termination process. - void process_term (int linger_); - // Distributor of messages holding the list of outbound pipes. dist_t dist; diff --git a/src/xrep.cpp b/src/xrep.cpp index d82890d..920be8d 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -28,8 +28,7 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : prefetched (false), more_in (false), current_out (NULL), - more_out (false), - terminating (false) + more_out (false) { options.type = ZMQ_XREP; @@ -47,7 +46,6 @@ zmq::xrep_t::~xrep_t () void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { zmq_assert (pipe_); - pipe_->set_event_sink (this); // Add the pipe to the map out outbound pipes. // TODO: What if new connection has same peer identity as the old one? @@ -59,27 +57,9 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) // Add the pipe to the list of inbound pipes. inpipe_t inpipe = {pipe_, peer_identity_, true}; inpipes.push_back (inpipe); - - // In case we are already terminating, ask this pipe to terminate as well. - if (terminating) { - register_term_acks (1); - pipe_->terminate (); - } } -void zmq::xrep_t::process_term (int linger_) -{ - terminating = true; - - register_term_acks ((int) (inpipes.size () + outpipes.size ())); - - for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it) - it->pipe->terminate (); - - socket_base_t::process_term (linger_); -} - -void zmq::xrep_t::terminated (pipe_t *pipe_) +void zmq::xrep_t::xterminated (pipe_t *pipe_) { for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it) { @@ -89,8 +69,6 @@ void zmq::xrep_t::terminated (pipe_t *pipe_) inpipes.erase (it); if (current_in >= inpipes.size ()) current_in = 0; - if (terminating) - unregister_term_ack (); goto clean_outpipes; } } @@ -103,15 +81,13 @@ clean_outpipes: outpipes.erase (it); if (pipe_ == current_out) current_out = NULL; - if (terminating) - unregister_term_ack (); return; } } zmq_assert (false); } -void zmq::xrep_t::read_activated (pipe_t *pipe_) +void zmq::xrep_t::xread_activated (pipe_t *pipe_) { for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it) { @@ -124,7 +100,7 @@ void zmq::xrep_t::read_activated (pipe_t *pipe_) zmq_assert (false); } -void zmq::xrep_t::write_activated (pipe_t *pipe_) +void zmq::xrep_t::xwrite_activated (pipe_t *pipe_) { for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); ++it) { @@ -312,3 +288,4 @@ bool zmq::xrep_t::xhas_out () } + diff --git a/src/xrep.hpp b/src/xrep.hpp index d0378c2..fbc7385 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -26,7 +26,6 @@ #include "socket_base.hpp" #include "blob.hpp" -#include "pipe.hpp" #include "msg.hpp" namespace zmq @@ -34,8 +33,7 @@ namespace zmq // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. class xrep_t : - public socket_base_t, - public i_pipe_events + public socket_base_t { public: @@ -48,6 +46,9 @@ namespace zmq int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); + void xread_activated (class pipe_t *pipe_); + void xwrite_activated (class pipe_t *pipe_); + void xterminated (class pipe_t *pipe_); protected: @@ -56,14 +57,6 @@ namespace zmq private: - // Hook into the termination process. - void process_term (int linger_); - - // i_pipe_events interface implementation. - void read_activated (pipe_t *pipe_); - void write_activated (pipe_t *pipe_); - void terminated (pipe_t *pipe_); - struct inpipe_t { class pipe_t *pipe; @@ -103,9 +96,6 @@ namespace zmq // If true, more outgoing message parts are expected. bool more_out; - // If true, termination process is already underway. - bool terminating; - xrep_t (const xrep_t&); const xrep_t &operator = (const xrep_t&); }; diff --git a/src/xreq.cpp b/src/xreq.cpp index 4a6e67e..2371a34 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -23,9 +23,7 @@ #include "msg.hpp" zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_), - fq (this), - lb (this) + socket_base_t (parent_, tid_) { options.type = ZMQ_XREQ; } @@ -34,21 +32,13 @@ zmq::xreq_t::~xreq_t () { } -void zmq::xreq_t::xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_) +void zmq::xreq_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { zmq_assert (pipe_); - pipe_->set_event_sink (this); fq.attach (pipe_); lb.attach (pipe_); } -void zmq::xreq_t::process_term (int linger_) -{ - fq.terminate (); - lb.terminate (); - socket_base_t::process_term (linger_); -} - int zmq::xreq_t::xsend (msg_t *msg_, int flags_) { return lb.send (msg_, flags_); @@ -69,17 +59,17 @@ bool zmq::xreq_t::xhas_out () return lb.has_out (); } -void zmq::xreq_t::read_activated (pipe_t *pipe_) +void zmq::xreq_t::xread_activated (pipe_t *pipe_) { fq.activated (pipe_); } -void zmq::xreq_t::write_activated (pipe_t *pipe_) +void zmq::xreq_t::xwrite_activated (pipe_t *pipe_) { lb.activated (pipe_); } -void zmq::xreq_t::terminated (pipe_t *pipe_) +void zmq::xreq_t::xterminated (pipe_t *pipe_) { fq.terminated (pipe_); lb.terminated (pipe_); diff --git a/src/xreq.hpp b/src/xreq.hpp index a75e5c8..5bf1a03 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -23,7 +23,6 @@ #define __ZMQ_XREQ_HPP_INCLUDED__ #include "socket_base.hpp" -#include "pipe.hpp" #include "fq.hpp" #include "lb.hpp" @@ -31,8 +30,7 @@ namespace zmq { class xreq_t : - public socket_base_t, - public i_pipe_events + public socket_base_t { public: @@ -47,17 +45,12 @@ namespace zmq int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); + void xread_activated (class pipe_t *pipe_); + void xwrite_activated (class pipe_t *pipe_); + void xterminated (class pipe_t *pipe_); private: - // i_pipe_events interface implementation. - void read_activated (pipe_t *pipe_); - void write_activated (pipe_t *pipe_); - void terminated (pipe_t *pipe_); - - // Hook into the termination process. - void process_term (int linger_); - // Messages are fair-queued from inbound pipes. And load-balanced to // the outbound pipes. fq_t fq; diff --git a/src/xsub.cpp b/src/xsub.cpp index dc30d71..c5f610f 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -25,7 +25,6 @@ zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) : socket_base_t (parent_, tid_), - fq (this), has_message (false), more (false) { @@ -43,32 +42,19 @@ zmq::xsub_t::~xsub_t () void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { zmq_assert (pipe_); - pipe_->set_event_sink (this); fq.attach (pipe_); } -void zmq::xsub_t::read_activated (pipe_t *pipe_) +void zmq::xsub_t::xread_activated (pipe_t *pipe_) { fq.activated (pipe_); } -void zmq::xsub_t::write_activated (pipe_t *pipe_) -{ - // SUB socket never sends messages. This should never happen. - zmq_assert (false); -} - -void zmq::xsub_t::terminated (pipe_t *pipe_) +void zmq::xsub_t::xterminated (pipe_t *pipe_) { fq.terminated (pipe_); } -void zmq::xsub_t::process_term (int linger_) -{ - fq.terminate (); - socket_base_t::process_term (linger_); -} - int zmq::xsub_t::xsend (msg_t *msg_, int options_) { size_t size = msg_->size (); diff --git a/src/xsub.hpp b/src/xsub.hpp index ed9c462..58ddae5 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -23,7 +23,6 @@ #include "trie.hpp" #include "socket_base.hpp" -#include "pipe.hpp" #include "msg.hpp" #include "fq.hpp" @@ -31,8 +30,7 @@ namespace zmq { class xsub_t : - public socket_base_t, - public i_pipe_events + public socket_base_t { public: @@ -47,17 +45,11 @@ namespace zmq bool xhas_out (); int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); + void xread_activated (class pipe_t *pipe_); + void xterminated (class pipe_t *pipe_); private: - // i_pipe_events interface implementation. - void read_activated (pipe_t *pipe_); - void write_activated (pipe_t *pipe_); - void terminated (pipe_t *pipe_); - - // Hook into the termination process. - void process_term (int linger_); - // Check whether the message matches at least one subscription. bool match (class msg_t *msg_); diff --git a/tests/test_shutdown_stress.cpp b/tests/test_shutdown_stress.cpp index dccf91f..ef81758 100644 --- a/tests/test_shutdown_stress.cpp +++ b/tests/test_shutdown_stress.cpp @@ -58,7 +58,7 @@ int main (int argc, char *argv []) ctx = zmq_init (7); assert (ctx); - s1 = zmq_socket (ctx, ZMQ_REP); + s1 = zmq_socket (ctx, ZMQ_PUB); assert (s1); rc = zmq_bind (s1, "tcp://127.0.0.1:5560"); -- cgit v1.2.3