summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/array.hpp38
-rw-r--r--src/dist.cpp25
-rw-r--r--src/dist.hpp15
-rw-r--r--src/fq.cpp30
-rw-r--r--src/fq.hpp14
-rw-r--r--src/lb.cpp25
-rw-r--r--src/lb.hpp15
-rw-r--r--src/own.cpp8
-rw-r--r--src/own.hpp3
-rw-r--r--src/pair.cpp31
-rw-r--r--src/pair.hpp17
-rw-r--r--src/pipe.cpp4
-rw-r--r--src/pull.cpp21
-rw-r--r--src/pull.hpp14
-rw-r--r--src/push.cpp20
-rw-r--r--src/push.hpp14
-rw-r--r--src/reaper.cpp6
-rw-r--r--src/socket_base.cpp59
-rw-r--r--src/socket_base.hpp38
-rw-r--r--src/sub.hpp2
-rw-r--r--src/xpub.cpp23
-rw-r--r--src/xpub.hpp14
-rw-r--r--src/xrep.cpp33
-rw-r--r--src/xrep.hpp18
-rw-r--r--src/xreq.cpp20
-rw-r--r--src/xreq.hpp15
-rw-r--r--src/xsub.cpp18
-rw-r--r--src/xsub.hpp14
-rw-r--r--tests/test_shutdown_stress.cpp2
29 files changed, 190 insertions, 366 deletions
diff --git a/src/array.hpp b/src/array.hpp
index e7b5266..bb3c0ea 100644
--- a/src/array.hpp
+++ b/src/array.hpp
@@ -38,7 +38,8 @@ namespace zmq
inline array_item_t () :
array_index1 (-1),
- array_index2 (-1)
+ array_index2 (-1),
+ array_index3 (-1)
{
}
@@ -68,10 +69,21 @@ namespace zmq
return array_index2;
}
+ inline void set_array_index3 (int index_)
+ {
+ array_index3 = index_;
+ }
+
+ inline int get_array_index3 ()
+ {
+ return array_index3;
+ }
+
private:
int array_index1;
int array_index2;
+ int array_index3;
array_item_t (const array_item_t&);
const array_item_t &operator = (const array_item_t&);
@@ -117,8 +129,10 @@ namespace zmq
if (item_) {
if (N == 1)
item_->set_array_index1 ((int) items.size ());
- else
+ else if (N == 2)
item_->set_array_index2 ((int) items.size ());
+ else
+ item_->set_array_index3 ((int) items.size ());
}
items.push_back (item_);
}
@@ -127,16 +141,20 @@ namespace zmq
{
if (N == 1)
erase (item_->get_array_index1 ());
- else
+ else if (N == 2)
erase (item_->get_array_index2 ());
+ else
+ erase (item_->get_array_index3 ());
}
inline void erase (size_type index_) {
if (items.back ()) {
if (N == 1)
items.back ()->set_array_index1 ((int) index_);
- else
+ else if (N == 2)
items.back ()->set_array_index2 ((int) index_);
+ else
+ items.back ()->set_array_index3 ((int) index_);
}
items [index_] = items.back ();
items.pop_back ();
@@ -150,12 +168,18 @@ namespace zmq
if (items [index2_])
items [index2_]->set_array_index1 ((int) index1_);
}
- else {
+ else if (N == 2) {
if (items [index1_])
items [index1_]->set_array_index2 ((int) index2_);
if (items [index2_])
items [index2_]->set_array_index2 ((int) index1_);
}
+ else {
+ if (items [index1_])
+ items [index1_]->set_array_index3 ((int) index2_);
+ if (items [index2_])
+ items [index2_]->set_array_index3 ((int) index1_);
+ }
std::swap (items [index1_], items [index2_]);
}
@@ -168,8 +192,10 @@ namespace zmq
{
if (N == 1)
return (size_type) item_->get_array_index1 ();
- else
+ else if (N == 2)
return (size_type) item_->get_array_index2 ();
+ else
+ return (size_type) item_->get_array_index3 ();
}
private:
diff --git a/src/dist.cpp b/src/dist.cpp
index 3afa196..f7f0488 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -21,16 +21,13 @@
#include "dist.hpp"
#include "pipe.hpp"
#include "err.hpp"
-#include "own.hpp"
#include "msg.hpp"
#include "likely.hpp"
-zmq::dist_t::dist_t (own_t *sink_) :
+zmq::dist_t::dist_t () :
active (0),
eligible (0),
- more (false),
- sink (sink_),
- terminating (false)
+ more (false)
{
}
@@ -55,21 +52,6 @@ void zmq::dist_t::attach (pipe_t *pipe_)
active++;
eligible++;
}
-
- if (unlikely (terminating)) {
- sink->register_term_acks (1);
- pipe_->terminate ();
- }
-}
-
-void zmq::dist_t::terminate ()
-{
- zmq_assert (!terminating);
- terminating = true;
-
- sink->register_term_acks ((int) pipes.size ());
- for (pipes_t::size_type i = 0; i != pipes.size (); i++)
- pipes [i]->terminate ();
}
void zmq::dist_t::terminated (pipe_t *pipe_)
@@ -81,9 +63,6 @@ void zmq::dist_t::terminated (pipe_t *pipe_)
if (pipes.index (pipe_) < eligible)
eligible--;
pipes.erase (pipe_);
-
- if (unlikely (terminating))
- sink->unregister_term_ack ();
}
void zmq::dist_t::activated (pipe_t *pipe_)
diff --git a/src/dist.hpp b/src/dist.hpp
index c137332..10613c1 100644
--- a/src/dist.hpp
+++ b/src/dist.hpp
@@ -35,17 +35,16 @@ namespace zmq
{
public:
- dist_t (class own_t *sink_);
+ dist_t ();
~dist_t ();
void attach (class pipe_t *pipe_);
- void terminate ();
- int send (class msg_t *msg_, int flags_);
- bool has_out ();
-
void activated (class pipe_t *pipe_);
void terminated (class pipe_t *pipe_);
+ int send (class msg_t *msg_, int flags_);
+ bool has_out ();
+
private:
// Write the message to the pipe. Make the pipe inactive if writing
@@ -74,12 +73,6 @@ namespace zmq
// True if last we are in the middle of a multipart message.
bool more;
- // Object to send events to.
- class own_t *sink;
-
- // If true, termination process is already underway.
- bool terminating;
-
dist_t (const dist_t&);
const dist_t &operator = (const dist_t&);
};
diff --git a/src/fq.cpp b/src/fq.cpp
index b4ee641..7318822 100644
--- a/src/fq.cpp
+++ b/src/fq.cpp
@@ -21,15 +21,12 @@
#include "fq.hpp"
#include "pipe.hpp"
#include "err.hpp"
-#include "own.hpp"
#include "msg.hpp"
-zmq::fq_t::fq_t (own_t *sink_) :
+zmq::fq_t::fq_t () :
active (0),
current (0),
- more (false),
- sink (sink_),
- terminating (false)
+ more (false)
{
}
@@ -43,20 +40,10 @@ void zmq::fq_t::attach (pipe_t *pipe_)
pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1);
active++;
-
- // If we are already terminating, ask the pipe to terminate straight away.
- if (terminating) {
- sink->register_term_acks (1);
- pipe_->terminate ();
- }
}
void zmq::fq_t::terminated (pipe_t *pipe_)
{
- // Make sure that we are not closing current pipe while
- // message is half-read.
- zmq_assert (terminating || (!more || pipes [current] != pipe_));
-
// Remove the pipe from the list; adjust number of active pipes
// accordingly.
if (pipes.index (pipe_) < active) {
@@ -65,19 +52,6 @@ void zmq::fq_t::terminated (pipe_t *pipe_)
current = 0;
}
pipes.erase (pipe_);
-
- if (terminating)
- sink->unregister_term_ack ();
-}
-
-void zmq::fq_t::terminate ()
-{
- zmq_assert (!terminating);
- terminating = true;
-
- sink->register_term_acks ((int) pipes.size ());
- for (pipes_t::size_type i = 0; i != pipes.size (); i++)
- pipes [i]->terminate ();
}
void zmq::fq_t::activated (pipe_t *pipe_)
diff --git a/src/fq.hpp b/src/fq.hpp
index bbe1b59..106e978 100644
--- a/src/fq.hpp
+++ b/src/fq.hpp
@@ -35,18 +35,16 @@ namespace zmq
{
public:
- fq_t (class own_t *sink_);
+ fq_t ();
~fq_t ();
void attach (pipe_t *pipe_);
- void terminate ();
+ void activated (pipe_t *pipe_);
+ void terminated (pipe_t *pipe_);
int recv (msg_t *msg_, int flags_);
bool has_in ();
- void activated (pipe_t *pipe_);
- void terminated (pipe_t *pipe_);
-
private:
// Inbound pipes.
@@ -64,12 +62,6 @@ namespace zmq
// there are following parts still waiting in the current pipe.
bool more;
- // Object to send events to.
- class own_t *sink;
-
- // If true, termination process is already underway.
- bool terminating;
-
fq_t (const fq_t&);
const fq_t &operator = (const fq_t&);
};
diff --git a/src/lb.cpp b/src/lb.cpp
index 2ba902a..7aeef9e 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -21,16 +21,13 @@
#include "lb.hpp"
#include "pipe.hpp"
#include "err.hpp"
-#include "own.hpp"
#include "msg.hpp"
-zmq::lb_t::lb_t (own_t *sink_) :
+zmq::lb_t::lb_t () :
active (0),
current (0),
more (false),
- dropping (false),
- sink (sink_),
- terminating (false)
+ dropping (false)
{
}
@@ -44,21 +41,6 @@ void zmq::lb_t::attach (pipe_t *pipe_)
pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1);
active++;
-
- if (terminating) {
- sink->register_term_acks (1);
- pipe_->terminate ();
- }
-}
-
-void zmq::lb_t::terminate ()
-{
- zmq_assert (!terminating);
- terminating = true;
-
- sink->register_term_acks ((int) pipes.size ());
- for (pipes_t::size_type i = 0; i != pipes.size (); i++)
- pipes [i]->terminate ();
}
void zmq::lb_t::terminated (pipe_t *pipe_)
@@ -78,9 +60,6 @@ void zmq::lb_t::terminated (pipe_t *pipe_)
current = 0;
}
pipes.erase (pipe_);
-
- if (terminating)
- sink->unregister_term_ack ();
}
void zmq::lb_t::activated (pipe_t *pipe_)
diff --git a/src/lb.hpp b/src/lb.hpp
index d764f6d..0dfd25e 100644
--- a/src/lb.hpp
+++ b/src/lb.hpp
@@ -34,17 +34,16 @@ namespace zmq
{
public:
- lb_t (class own_t *sink_);
+ lb_t ();
~lb_t ();
void attach (pipe_t *pipe_);
- void terminate ();
- int send (msg_t *msg_, int flags_);
- bool has_out ();
-
void activated (pipe_t *pipe_);
void terminated (pipe_t *pipe_);
+ int send (msg_t *msg_, int flags_);
+ bool has_out ();
+
private:
// List of outbound pipes.
@@ -64,12 +63,6 @@ namespace zmq
// True if we are dropping current message.
bool dropping;
- // Object to send events to.
- class own_t *sink;
-
- // If true, termination process is already underway.
- bool terminating;
-
lb_t (const lb_t&);
const lb_t &operator = (const lb_t&);
};
diff --git a/src/own.cpp b/src/own.cpp
index cdf20a4..f2ca4b2 100644
--- a/src/own.cpp
+++ b/src/own.cpp
@@ -153,6 +153,11 @@ void zmq::own_t::terminate ()
send_term_req (owner, this);
}
+bool zmq::own_t::is_terminating ()
+{
+ return terminating;
+}
+
void zmq::own_t::process_term (int linger_)
{
// Double termination should never happen.
@@ -173,7 +178,6 @@ void zmq::own_t::process_term (int linger_)
void zmq::own_t::register_term_acks (int count_)
{
term_acks += count_;
- printf ("reg %d acks (%p, %d)\n", count_, (void*) this, term_acks);
}
void zmq::own_t::unregister_term_ack ()
@@ -181,8 +185,6 @@ void zmq::own_t::unregister_term_ack ()
zmq_assert (term_acks > 0);
term_acks--;
- printf ("unreg 1 acks (%p, %d)\n", (void*) this, term_acks);
-
// This may be a last ack we are waiting for before termination...
check_term_acks ();
}
diff --git a/src/own.hpp b/src/own.hpp
index 5023a30..0902f73 100644
--- a/src/own.hpp
+++ b/src/own.hpp
@@ -76,6 +76,9 @@ namespace zmq
// called more than once.
void terminate ();
+ // Returns true if the object is in process of termination.
+ bool is_terminating ();
+
// Derived object destroys own_t. There's no point in allowing
// others to invoke the destructor. At the same time, it has to be
// virtual so that generic own_t deallocation mechanism destroys
diff --git a/src/pair.cpp b/src/pair.cpp
index 93a4327..30b56e6 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -25,8 +25,7 @@
zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
- pipe (NULL),
- terminating (false)
+ pipe (NULL)
{
options.type = ZMQ_PAIR;
}
@@ -39,44 +38,22 @@ zmq::pair_t::~pair_t ()
void zmq::pair_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
zmq_assert (!pipe);
-
pipe = pipe_;
- pipe->set_event_sink (this);
-
- if (terminating) {
- register_term_acks (1);
- pipe_->terminate ();
- }
}
-void zmq::pair_t::terminated (pipe_t *pipe_)
+void zmq::pair_t::xterminated (pipe_t *pipe_)
{
zmq_assert (pipe_ == pipe);
pipe = NULL;
-
- if (terminating)
- unregister_term_ack ();
-}
-
-void zmq::pair_t::process_term (int linger_)
-{
- terminating = true;
-
- if (pipe) {
- register_term_acks (1);
- pipe->terminate ();
- }
-
- socket_base_t::process_term (linger_);
}
-void zmq::pair_t::read_activated (pipe_t *pipe_)
+void zmq::pair_t::xread_activated (pipe_t *pipe_)
{
// There's just one pipe. No lists of active and inactive pipes.
// There's nothing to do here.
}
-void zmq::pair_t::write_activated (pipe_t *pipe_)
+void zmq::pair_t::xwrite_activated (pipe_t *pipe_)
{
// There's just one pipe. No lists of active and inactive pipes.
// There's nothing to do here.
diff --git a/src/pair.hpp b/src/pair.hpp
index 2cb050a..1ddf50e 100644
--- a/src/pair.hpp
+++ b/src/pair.hpp
@@ -22,14 +22,12 @@
#define __ZMQ_PAIR_HPP_INCLUDED__
#include "socket_base.hpp"
-#include "pipe.hpp"
namespace zmq
{
class pair_t :
- public socket_base_t,
- public i_pipe_events
+ public socket_base_t
{
public:
@@ -42,21 +40,14 @@ namespace zmq
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
-
- // i_pipe_events interface implementation.
- void read_activated (class pipe_t *pipe_);
- void write_activated (class pipe_t *pipe_);
- void terminated (class pipe_t *pipe_);
+ void xread_activated (class pipe_t *pipe_);
+ void xwrite_activated (class pipe_t *pipe_);
+ void xterminated (class pipe_t *pipe_);
private:
- // Hook into termination process.
- void process_term (int linger_);
-
class pipe_t *pipe;
- bool terminating;
-
pair_t (const pair_t&);
const pair_t &operator = (const pair_t&);
};
diff --git a/src/pipe.cpp b/src/pipe.cpp
index fb03042..3d0c0a6 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -108,7 +108,7 @@ bool zmq::pipe_t::check_read ()
// If pipe_term was already received but wasn't processed because
// of pending messages, we can ack it now.
- if (terminating)
+ if (term_recvd)
send_pipe_term_ack (peer);
return false;
@@ -133,7 +133,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
// If pipe_term was already received but wasn't processed because
// of pending messages, we can ack it now.
- if (terminating)
+ if (term_recvd)
send_pipe_term_ack (peer);
return false;
diff --git a/src/pull.cpp b/src/pull.cpp
index 66457b8..5e48777 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -21,10 +21,10 @@
#include "pull.hpp"
#include "err.hpp"
#include "msg.hpp"
+#include "pipe.hpp"
zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_) :
- socket_base_t (parent_, tid_),
- fq (this)
+ socket_base_t (parent_, tid_)
{
options.type = ZMQ_PULL;
}
@@ -36,32 +36,19 @@ zmq::pull_t::~pull_t ()
void zmq::pull_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
zmq_assert (pipe_);
- pipe_->set_event_sink (this);
fq.attach (pipe_);
}
-void zmq::pull_t::read_activated (pipe_t *pipe_)
+void zmq::pull_t::xread_activated (pipe_t *pipe_)
{
fq.activated (pipe_);
}
-void zmq::pull_t::write_activated (pipe_t *pipe_)
-{
- // There are no outbound messages in pull socket. This should never happen.
- zmq_assert (false);
-}
-
-void zmq::pull_t::terminated (pipe_t *pipe_)
+void zmq::pull_t::xterminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
}
-void zmq::pull_t::process_term (int linger_)
-{
- fq.terminate ();
- socket_base_t::process_term (linger_);
-}
-
int zmq::pull_t::xrecv (msg_t *msg_, int flags_)
{
return fq.recv (msg_, flags_);
diff --git a/src/pull.hpp b/src/pull.hpp
index af59724..cbcf05a 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -22,15 +22,13 @@
#define __ZMQ_PULL_HPP_INCLUDED__
#include "socket_base.hpp"
-#include "pipe.hpp"
#include "fq.hpp"
namespace zmq
{
class pull_t :
- public socket_base_t,
- public i_pipe_events
+ public socket_base_t
{
public:
@@ -43,17 +41,11 @@ namespace zmq
void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
+ void xread_activated (class pipe_t *pipe_);
+ void xterminated (class pipe_t *pipe_);
private:
- // i_pipe_events interface implementation.
- void read_activated (pipe_t *pipe_);
- void write_activated (pipe_t *pipe_);
- void terminated (pipe_t *pipe_);
-
- // Hook into the termination process.
- void process_term (int linger_);
-
// Fair queueing object for inbound pipes.
fq_t fq;
diff --git a/src/push.cpp b/src/push.cpp
index 12fc8d2..44ebc07 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -24,8 +24,7 @@
#include "msg.hpp"
zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_) :
- socket_base_t (parent_, tid_),
- lb (this)
+ socket_base_t (parent_, tid_)
{
options.type = ZMQ_PUSH;
}
@@ -37,32 +36,19 @@ zmq::push_t::~push_t ()
void zmq::push_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
zmq_assert (pipe_);
- pipe_->set_event_sink (this);
lb.attach (pipe_);
}
-void zmq::push_t::read_activated (pipe_t *pipe_)
-{
- // There are no inbound messages in push socket. This should never happen.
- zmq_assert (false);
-}
-
-void zmq::push_t::write_activated (pipe_t *pipe_)
+void zmq::push_t::xwrite_activated (pipe_t *pipe_)
{
lb.activated (pipe_);
}
-void zmq::push_t::terminated (pipe_t *pipe_)
+void zmq::push_t::xterminated (pipe_t *pipe_)
{
lb.terminated (pipe_);
}
-void zmq::push_t::process_term (int linger_)
-{
- lb.terminate ();
- socket_base_t::process_term (linger_);
-}
-
int zmq::push_t::xsend (msg_t *msg_, int flags_)
{
return lb.send (msg_, flags_);
diff --git a/src/push.hpp b/src/push.hpp
index 67763eb..5dabe14 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -22,15 +22,13 @@
#define __ZMQ_PUSH_HPP_INCLUDED__
#include "socket_base.hpp"
-#include "pipe.hpp"
#include "lb.hpp"
namespace zmq
{
class push_t :
- public socket_base_t,
- public i_pipe_events
+ public socket_base_t
{
public:
@@ -43,17 +41,11 @@ namespace zmq
void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
int xsend (class msg_t *msg_, int flags_);
bool xhas_out ();
+ void xwrite_activated (class pipe_t *pipe_);
+ void xterminated (class pipe_t *pipe_);
private:
- // i_pipe_events interface implementation.
- void read_activated (pipe_t *pipe_);
- void write_activated (pipe_t *pipe_);
- void terminated (pipe_t *pipe_);
-
- // Hook into the termination process.
- void process_term (int linger_);
-
// Load balancer managing the outbound pipes.
lb_t lb;
diff --git a/src/reaper.cpp b/src/reaper.cpp
index d3ebbba..0295137 100644
--- a/src/reaper.cpp
+++ b/src/reaper.cpp
@@ -87,7 +87,7 @@ void zmq::reaper_t::process_stop ()
{
terminating = true;
- // If there are no sockets beig reaped finish immediately.
+ // If there are no sockets being reaped finish immediately.
if (!sockets) {
send_done ();
poller->rm_fd (mailbox_handle);
@@ -100,10 +100,6 @@ void zmq::reaper_t::process_reap (socket_base_t *socket_)
// Add the socket to the poller.
socket_->start_reaping (poller);
- // Start termination of associated I/O object hierarchy.
- socket_->terminate ();
- socket_->check_destroy ();
-
++sockets;
}
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index baa4bd2..fae55f2 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -211,10 +211,15 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
return 0;
}
-void zmq::socket_base_t::attach_pipe (class pipe_t *pipe_,
+void zmq::socket_base_t::attach_pipe (pipe_t *pipe_,
const blob_t &peer_identity_)
{
- // If the peer haven't specified it's identity, let's generate one.
+ // First, register the pipe so that we can terminate it later on.
+ pipe_->set_event_sink (this);
+ pipes.push_back (pipe_);
+
+ // Then, pass the pipe to the specific socket type.
+ // If the peer haven't specified it's identity, let's generate one.
if (peer_identity_.size ()) {
xattach_pipe (pipe_, peer_identity_);
}
@@ -223,6 +228,13 @@ void zmq::socket_base_t::attach_pipe (class pipe_t *pipe_,
generate_uuid ((unsigned char*) identity.data () + 1);
xattach_pipe (pipe_, identity);
}
+
+ // If the socket is already being closed, ask any new pipes to terminate
+ // straight away.
+ if (is_terminating ()) {
+ register_term_acks (1);
+ pipe_->terminate ();
+ }
}
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
@@ -635,9 +647,15 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)
void zmq::socket_base_t::start_reaping (poller_t *poller_)
{
+ // Plug the socket to the reaper thread.
poller = poller_;
handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (handle);
+
+ // Initialise the termination and check whether it can be deallocated
+ // immediately.
+ terminate ();
+ check_destroy ();
}
int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
@@ -720,6 +738,11 @@ void zmq::socket_base_t::process_term (int linger_)
// will be initiated.
unregister_endpoints (this);
+ // Ask all attached pipes to terminate.
+ for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
+ pipes [i]->terminate ();
+ register_term_acks (pipes.size ());
+
// Continue the termination process immediately.
own_t::process_term (linger_);
}
@@ -758,6 +781,15 @@ int zmq::socket_base_t::xrecv (msg_t *msg_, int options_)
return -1;
}
+void zmq::socket_base_t::xread_activated (pipe_t *pipe_)
+{
+ zmq_assert (false);
+}
+void zmq::socket_base_t::xwrite_activated (pipe_t *pipe_)
+{
+ zmq_assert (false);
+}
+
void zmq::socket_base_t::in_event ()
{
// Process any commands from other threads/sockets that may be available
@@ -794,3 +826,26 @@ void zmq::socket_base_t::check_destroy ()
own_t::process_destroy ();
}
}
+
+void zmq::socket_base_t::read_activated (pipe_t *pipe_)
+{
+ xread_activated (pipe_);
+}
+
+void zmq::socket_base_t::write_activated (pipe_t *pipe_)
+{
+ xwrite_activated (pipe_);
+}
+
+void zmq::socket_base_t::terminated (pipe_t *pipe_)
+{
+ // Notify the specific socket type about the pipe termination.
+ xterminated (pipe_);
+
+ // Remove the pipe from the list of attached pipes and confirm its
+ // termination if we are already shutting down.
+ pipes.erase (pipe_);
+ if (is_terminating ())
+ unregister_term_ack ();
+}
+
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 531751b..7126733 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -34,6 +34,7 @@
#include "mailbox.hpp"
#include "stdint.hpp"
#include "blob.hpp"
+#include "pipe.hpp"
#include "own.hpp"
namespace zmq
@@ -42,7 +43,8 @@ namespace zmq
class socket_base_t :
public own_t,
public array_item_t,
- public i_poll_events
+ public i_poll_events,
+ public i_pipe_events
{
friend class reaper_t;
@@ -81,14 +83,6 @@ namespace zmq
void unregister_session (const blob_t &name_);
class session_t *find_session (const blob_t &name_);
- // i_reader_events interface implementation.
- void activated (class reader_t *pipe_);
- void terminated (class reader_t *pipe_);
-
- // i_writer_events interface implementation.
- void activated (class writer_t *pipe_);
- void terminated (class writer_t *pipe_);
-
// Using this function reaper thread ask the socket to regiter with
// its poller.
void start_reaping (poller_t *poller_);
@@ -99,9 +93,10 @@ namespace zmq
void out_event ();
void timer_event (int id_);
- // To be called after processing commands or invoking any command
- // handlers explicitly. If required, it will deallocate the socket.
- void check_destroy ();
+ // i_pipe_events interface implementation.
+ void read_activated (pipe_t *pipe_);
+ void write_activated (pipe_t *pipe_);
+ void terminated (pipe_t *pipe_);
protected:
@@ -127,16 +122,20 @@ namespace zmq
virtual bool xhas_in ();
virtual int xrecv (class msg_t *msg_, int options_);
- // We are declaring termination handler as protected so that
- // individual socket types can hook into the termination process
- // by overloading it.
- void process_term (int linger_);
+ // i_pipe_events will be forwarded to these functions.
+ virtual void xread_activated (pipe_t *pipe_);
+ virtual void xwrite_activated (pipe_t *pipe_);
+ virtual void xterminated (pipe_t *pipe_) = 0;
// Delay actual destruction of the socket.
void process_destroy ();
private:
+ // To be called after processing commands or invoking any command
+ // handlers explicitly. If required, it will deallocate the socket.
+ void check_destroy ();
+
// Used to check whether the object is a socket.
uint32_t tag;
@@ -156,7 +155,7 @@ namespace zmq
// bind, is available and compatible with the socket type.
int check_protocol (const std::string &protocol_);
- // If no identity is set, generate one and call xattach_pipe ().
+ // Register the pipe with this socket.
void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
// Processes commands sent to this socket (if any). If 'block' is
@@ -169,10 +168,15 @@ namespace zmq
void process_stop ();
void process_bind (class pipe_t *pipe_, const blob_t &peer_identity_);
void process_unplug ();
+ void process_term (int linger_);
// Socket's mailbox object.
mailbox_t mailbox;
+ // List of attached pipes.
+ typedef array_t <pipe_t, 3> pipes_t;
+ pipes_t pipes;
+
// Reaper's poller and handle of this socket within it.
poller_t *poller;
poller_t::handle_t handle;
diff --git a/src/sub.hpp b/src/sub.hpp
index 8575961..8ba8987 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -37,7 +37,7 @@ namespace zmq
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (class msg_t *msg_, int options_);
- bool xhas_out ();
+ bool xhas_out ();
private:
diff --git a/src/xpub.cpp b/src/xpub.cpp
index 888b42d..d5dba9f 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -24,8 +24,7 @@
#include "msg.hpp"
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
- socket_base_t (parent_, tid_),
- dist (this)
+ socket_base_t (parent_, tid_)
{
options.type = ZMQ_XPUB;
}
@@ -37,35 +36,19 @@ zmq::xpub_t::~xpub_t ()
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
zmq_assert (pipe_);
- pipe_->set_event_sink (this);
dist.attach (pipe_);
}
-void zmq::xpub_t::read_activated (pipe_t *pipe_)
-{
- // PUB socket never receives messages. This should never happen.
- zmq_assert (false);
-}
-
-void zmq::xpub_t::write_activated (pipe_t *pipe_)
+void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
{
dist.activated (pipe_);
}
-void zmq::xpub_t::terminated (pipe_t *pipe_)
+void zmq::xpub_t::xterminated (pipe_t *pipe_)
{
dist.terminated (pipe_);
}
-void zmq::xpub_t::process_term (int linger_)
-{
- // Terminate the outbound pipes.
- dist.terminate ();
-
- // Continue with the termination immediately.
- socket_base_t::process_term (linger_);
-}
-
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
{
return dist.send (msg_, flags_);
diff --git a/src/xpub.hpp b/src/xpub.hpp
index 48efd17..8a6ff73 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -23,15 +23,13 @@
#include "socket_base.hpp"
#include "array.hpp"
-#include "pipe.hpp"
#include "dist.hpp"
namespace zmq
{
class xpub_t :
- public socket_base_t,
- public i_pipe_events
+ public socket_base_t
{
public:
@@ -44,17 +42,11 @@ namespace zmq
bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
+ void xwrite_activated (class pipe_t *pipe_);
+ void xterminated (class pipe_t *pipe_);
private:
- // i_pipe_events interface implementation.
- void read_activated (pipe_t *pipe_);
- void write_activated (pipe_t *pipe_);
- void terminated (pipe_t *pipe_);
-
- // Hook into the termination process.
- void process_term (int linger_);
-
// Distributor of messages holding the list of outbound pipes.
dist_t dist;
diff --git a/src/xrep.cpp b/src/xrep.cpp
index d82890d..920be8d 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -28,8 +28,7 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
prefetched (false),
more_in (false),
current_out (NULL),
- more_out (false),
- terminating (false)
+ more_out (false)
{
options.type = ZMQ_XREP;
@@ -47,7 +46,6 @@ zmq::xrep_t::~xrep_t ()
void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
zmq_assert (pipe_);
- pipe_->set_event_sink (this);
// Add the pipe to the map out outbound pipes.
// TODO: What if new connection has same peer identity as the old one?
@@ -59,27 +57,9 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
// Add the pipe to the list of inbound pipes.
inpipe_t inpipe = {pipe_, peer_identity_, true};
inpipes.push_back (inpipe);
-
- // In case we are already terminating, ask this pipe to terminate as well.
- if (terminating) {
- register_term_acks (1);
- pipe_->terminate ();
- }
}
-void zmq::xrep_t::process_term (int linger_)
-{
- terminating = true;
-
- register_term_acks ((int) (inpipes.size () + outpipes.size ()));
-
- for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it)
- it->pipe->terminate ();
-
- socket_base_t::process_term (linger_);
-}
-
-void zmq::xrep_t::terminated (pipe_t *pipe_)
+void zmq::xrep_t::xterminated (pipe_t *pipe_)
{
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
++it) {
@@ -89,8 +69,6 @@ void zmq::xrep_t::terminated (pipe_t *pipe_)
inpipes.erase (it);
if (current_in >= inpipes.size ())
current_in = 0;
- if (terminating)
- unregister_term_ack ();
goto clean_outpipes;
}
}
@@ -103,15 +81,13 @@ clean_outpipes:
outpipes.erase (it);
if (pipe_ == current_out)
current_out = NULL;
- if (terminating)
- unregister_term_ack ();
return;
}
}
zmq_assert (false);
}
-void zmq::xrep_t::read_activated (pipe_t *pipe_)
+void zmq::xrep_t::xread_activated (pipe_t *pipe_)
{
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
++it) {
@@ -124,7 +100,7 @@ void zmq::xrep_t::read_activated (pipe_t *pipe_)
zmq_assert (false);
}
-void zmq::xrep_t::write_activated (pipe_t *pipe_)
+void zmq::xrep_t::xwrite_activated (pipe_t *pipe_)
{
for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it) {
@@ -312,3 +288,4 @@ bool zmq::xrep_t::xhas_out ()
}
+
diff --git a/src/xrep.hpp b/src/xrep.hpp
index d0378c2..fbc7385 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -26,7 +26,6 @@
#include "socket_base.hpp"
#include "blob.hpp"
-#include "pipe.hpp"
#include "msg.hpp"
namespace zmq
@@ -34,8 +33,7 @@ namespace zmq
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
class xrep_t :
- public socket_base_t,
- public i_pipe_events
+ public socket_base_t
{
public:
@@ -48,6 +46,9 @@ namespace zmq
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
+ void xread_activated (class pipe_t *pipe_);
+ void xwrite_activated (class pipe_t *pipe_);
+ void xterminated (class pipe_t *pipe_);
protected:
@@ -56,14 +57,6 @@ namespace zmq
private:
- // Hook into the termination process.
- void process_term (int linger_);
-
- // i_pipe_events interface implementation.
- void read_activated (pipe_t *pipe_);
- void write_activated (pipe_t *pipe_);
- void terminated (pipe_t *pipe_);
-
struct inpipe_t
{
class pipe_t *pipe;
@@ -103,9 +96,6 @@ namespace zmq
// If true, more outgoing message parts are expected.
bool more_out;
- // If true, termination process is already underway.
- bool terminating;
-
xrep_t (const xrep_t&);
const xrep_t &operator = (const xrep_t&);
};
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 4a6e67e..2371a34 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -23,9 +23,7 @@
#include "msg.hpp"
zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) :
- socket_base_t (parent_, tid_),
- fq (this),
- lb (this)
+ socket_base_t (parent_, tid_)
{
options.type = ZMQ_XREQ;
}
@@ -34,21 +32,13 @@ zmq::xreq_t::~xreq_t ()
{
}
-void zmq::xreq_t::xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_)
+void zmq::xreq_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
zmq_assert (pipe_);
- pipe_->set_event_sink (this);
fq.attach (pipe_);
lb.attach (pipe_);
}
-void zmq::xreq_t::process_term (int linger_)
-{
- fq.terminate ();
- lb.terminate ();
- socket_base_t::process_term (linger_);
-}
-
int zmq::xreq_t::xsend (msg_t *msg_, int flags_)
{
return lb.send (msg_, flags_);
@@ -69,17 +59,17 @@ bool zmq::xreq_t::xhas_out ()
return lb.has_out ();
}
-void zmq::xreq_t::read_activated (pipe_t *pipe_)
+void zmq::xreq_t::xread_activated (pipe_t *pipe_)
{
fq.activated (pipe_);
}
-void zmq::xreq_t::write_activated (pipe_t *pipe_)
+void zmq::xreq_t::xwrite_activated (pipe_t *pipe_)
{
lb.activated (pipe_);
}
-void zmq::xreq_t::terminated (pipe_t *pipe_)
+void zmq::xreq_t::xterminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
lb.terminated (pipe_);
diff --git a/src/xreq.hpp b/src/xreq.hpp
index a75e5c8..5bf1a03 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -23,7 +23,6 @@
#define __ZMQ_XREQ_HPP_INCLUDED__
#include "socket_base.hpp"
-#include "pipe.hpp"
#include "fq.hpp"
#include "lb.hpp"
@@ -31,8 +30,7 @@ namespace zmq
{
class xreq_t :
- public socket_base_t,
- public i_pipe_events
+ public socket_base_t
{
public:
@@ -47,17 +45,12 @@ namespace zmq
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
+ void xread_activated (class pipe_t *pipe_);
+ void xwrite_activated (class pipe_t *pipe_);
+ void xterminated (class pipe_t *pipe_);
private:
- // i_pipe_events interface implementation.
- void read_activated (pipe_t *pipe_);
- void write_activated (pipe_t *pipe_);
- void terminated (pipe_t *pipe_);
-
- // Hook into the termination process.
- void process_term (int linger_);
-
// Messages are fair-queued from inbound pipes. And load-balanced to
// the outbound pipes.
fq_t fq;
diff --git a/src/xsub.cpp b/src/xsub.cpp
index dc30d71..c5f610f 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -25,7 +25,6 @@
zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_),
- fq (this),
has_message (false),
more (false)
{
@@ -43,32 +42,19 @@ zmq::xsub_t::~xsub_t ()
void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
zmq_assert (pipe_);
- pipe_->set_event_sink (this);
fq.attach (pipe_);
}
-void zmq::xsub_t::read_activated (pipe_t *pipe_)
+void zmq::xsub_t::xread_activated (pipe_t *pipe_)
{
fq.activated (pipe_);
}
-void zmq::xsub_t::write_activated (pipe_t *pipe_)
-{
- // SUB socket never sends messages. This should never happen.
- zmq_assert (false);
-}
-
-void zmq::xsub_t::terminated (pipe_t *pipe_)
+void zmq::xsub_t::xterminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
}
-void zmq::xsub_t::process_term (int linger_)
-{
- fq.terminate ();
- socket_base_t::process_term (linger_);
-}
-
int zmq::xsub_t::xsend (msg_t *msg_, int options_)
{
size_t size = msg_->size ();
diff --git a/src/xsub.hpp b/src/xsub.hpp
index ed9c462..58ddae5 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -23,7 +23,6 @@
#include "trie.hpp"
#include "socket_base.hpp"
-#include "pipe.hpp"
#include "msg.hpp"
#include "fq.hpp"
@@ -31,8 +30,7 @@ namespace zmq
{
class xsub_t :
- public socket_base_t,
- public i_pipe_events
+ public socket_base_t
{
public:
@@ -47,17 +45,11 @@ namespace zmq
bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
+ void xread_activated (class pipe_t *pipe_);
+ void xterminated (class pipe_t *pipe_);
private:
- // i_pipe_events interface implementation.
- void read_activated (pipe_t *pipe_);
- void write_activated (pipe_t *pipe_);
- void terminated (pipe_t *pipe_);
-
- // Hook into the termination process.
- void process_term (int linger_);
-
// Check whether the message matches at least one subscription.
bool match (class msg_t *msg_);
diff --git a/tests/test_shutdown_stress.cpp b/tests/test_shutdown_stress.cpp
index dccf91f..ef81758 100644
--- a/tests/test_shutdown_stress.cpp
+++ b/tests/test_shutdown_stress.cpp
@@ -58,7 +58,7 @@ int main (int argc, char *argv [])
ctx = zmq_init (7);
assert (ctx);
- s1 = zmq_socket (ctx, ZMQ_REP);
+ s1 = zmq_socket (ctx, ZMQ_PUB);
assert (s1);
rc = zmq_bind (s1, "tcp://127.0.0.1:5560");