summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-09-18 07:37:36 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-09-18 07:37:36 +0200
commit4c6d07d3668558d910c9b1d19d52ccdeacc90574 (patch)
tree3530c8b03e9b475165d8bb3cf2eb425f5bfdd45e
parentfb6ce536d96c82e8f4378a87a5d59aefcc57a96d (diff)
single term ack counting mechanism for every socket (no separate mechanisms for fq_t and lb_t)
-rw-r--r--src/Makefile.am1
-rw-r--r--src/fq.cpp18
-rw-r--r--src/fq.hpp4
-rw-r--r--src/i_terminate_events.hpp38
-rw-r--r--src/lb.cpp18
-rw-r--r--src/lb.hpp4
-rw-r--r--src/own.hpp16
-rw-r--r--src/pull.cpp7
-rw-r--r--src/pull.hpp6
-rw-r--r--src/push.cpp7
-rw-r--r--src/push.hpp6
-rw-r--r--src/sub.cpp7
-rw-r--r--src/sub.hpp6
-rw-r--r--src/xreq.cpp7
-rw-r--r--src/xreq.hpp6
15 files changed, 32 insertions, 119 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 86a6fbd..961b83e 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -72,7 +72,6 @@ libzmq_la_SOURCES = \
ip.hpp \
i_engine.hpp \
i_poll_events.hpp \
- i_terminate_events.hpp \
kqueue.hpp \
lb.hpp \
likely.hpp \
diff --git a/src/fq.cpp b/src/fq.cpp
index ad70633..2c3a9d9 100644
--- a/src/fq.cpp
+++ b/src/fq.cpp
@@ -22,9 +22,9 @@
#include "fq.hpp"
#include "pipe.hpp"
#include "err.hpp"
-#include "i_terminate_events.hpp"
+#include "own.hpp"
-zmq::fq_t::fq_t (i_terminate_events *sink_) :
+zmq::fq_t::fq_t (own_t *sink_) :
active (0),
current (0),
more (false),
@@ -47,8 +47,10 @@ void zmq::fq_t::attach (reader_t *pipe_)
active++;
// If we are already terminating, ask the pipe to terminate straight away.
- if (terminating)
+ if (terminating) {
+ sink->register_term_acks (1);
pipe_->terminate ();
+ }
}
void zmq::fq_t::terminated (reader_t *pipe_)
@@ -67,8 +69,8 @@ void zmq::fq_t::terminated (reader_t *pipe_)
}
pipes.erase (pipe_);
- if (terminating && pipes.empty ())
- sink->terminated ();
+ if (terminating)
+ sink->unregister_term_ack ();
}
void zmq::fq_t::terminate ()
@@ -76,11 +78,7 @@ void zmq::fq_t::terminate ()
zmq_assert (!terminating);
terminating = true;
- if (pipes.empty ()) {
- sink->terminated ();
- return;
- }
-
+ sink->register_term_acks (pipes.size ());
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate ();
}
diff --git a/src/fq.hpp b/src/fq.hpp
index fd853d8..c356b21 100644
--- a/src/fq.hpp
+++ b/src/fq.hpp
@@ -33,7 +33,7 @@ namespace zmq
{
public:
- fq_t (struct i_terminate_events *sink_);
+ fq_t (class own_t *sink_);
~fq_t ();
void attach (reader_t *pipe_);
@@ -64,7 +64,7 @@ namespace zmq
bool more;
// Object to send events to.
- i_terminate_events *sink;
+ class own_t *sink;
// If true, termination process is already underway.
bool terminating;
diff --git a/src/i_terminate_events.hpp b/src/i_terminate_events.hpp
deleted file mode 100644
index 08599ff..0000000
--- a/src/i_terminate_events.hpp
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_I_TERMINATE_EVENTS_HPP_INCLUDED__
-#define __ZMQ_I_TERMINATE_EVENTS_HPP_INCLUDED__
-
-namespace zmq
-{
-
- // Algorithms such as fair queueing (fq_t) and load balancing (lb_t)
- // use this interface to communicate termination event to the socket.
-
- struct i_terminate_events
- {
- virtual ~i_terminate_events () {}
-
- virtual void terminated () = 0;
- };
-
-}
-
-#endif
diff --git a/src/lb.cpp b/src/lb.cpp
index 7b3339c..62d0680 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -22,9 +22,9 @@
#include "lb.hpp"
#include "pipe.hpp"
#include "err.hpp"
-#include "i_terminate_events.hpp"
+#include "own.hpp"
-zmq::lb_t::lb_t (i_terminate_events *sink_) :
+zmq::lb_t::lb_t (own_t *sink_) :
active (0),
current (0),
more (false),
@@ -46,8 +46,10 @@ void zmq::lb_t::attach (writer_t *pipe_)
pipes.swap (active, pipes.size () - 1);
active++;
- if (terminating)
+ if (terminating) {
+ sink->register_term_acks (1);
pipe_->terminate ();
+ }
}
void zmq::lb_t::terminate ()
@@ -55,11 +57,7 @@ void zmq::lb_t::terminate ()
zmq_assert (!terminating);
terminating = true;
- if (pipes.empty ()) {
- sink->terminated ();
- return;
- }
-
+ sink->register_term_acks (pipes.size ());
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate ();
}
@@ -75,8 +73,8 @@ void zmq::lb_t::terminated (writer_t *pipe_)
}
pipes.erase (pipe_);
- if (terminating && pipes.empty ())
- sink->terminated ();
+ if (terminating)
+ sink->unregister_term_ack ();
}
void zmq::lb_t::activated (writer_t *pipe_)
diff --git a/src/lb.hpp b/src/lb.hpp
index ea965f8..29ea343 100644
--- a/src/lb.hpp
+++ b/src/lb.hpp
@@ -32,7 +32,7 @@ namespace zmq
{
public:
- lb_t (struct i_terminate_events *sink_);
+ lb_t (class own_t *sink_);
~lb_t ();
void attach (writer_t *pipe_);
@@ -61,7 +61,7 @@ namespace zmq
bool more;
// Object to send events to.
- struct i_terminate_events *sink;
+ class own_t *sink;
// If true, termination process is already underway.
bool terminating;
diff --git a/src/own.hpp b/src/own.hpp
index b65177e..6b6f7bf 100644
--- a/src/own.hpp
+++ b/src/own.hpp
@@ -52,6 +52,14 @@ namespace zmq
// before the command is delivered.
void inc_seqnum ();
+ // Use following two functions to wait for arbitrary events before
+ // terminating. Just add number of events to wait for using
+ // register_tem_acks functions. When event occurs, call
+ // remove_term_ack. When number of pending acks reaches zero
+ // object will be deallocated.
+ void register_term_acks (int count_);
+ void unregister_term_ack ();
+
protected:
// Launch the supplied object and become its owner.
@@ -77,14 +85,6 @@ namespace zmq
// steps to the beginning of the termination process.
void process_term ();
- // Use following two functions to wait for arbitrary events before
- // terminating. Just add number of events to wait for using
- // register_tem_acks functions. When event occurs, call
- // remove_term_ack. When number of pending acks reaches zero
- // object will be deallocated.
- void register_term_acks (int count_);
- void unregister_term_ack ();
-
// A place to hook in when phyicallal destruction of the object
// is to be delayed.
virtual void process_destroy ();
diff --git a/src/pull.cpp b/src/pull.cpp
index e7b5239..cbfcdbf 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -43,17 +43,10 @@ void zmq::pull_t::xattach_pipes (class reader_t *inpipe_,
void zmq::pull_t::process_term ()
{
- register_term_acks (1);
fq.terminate ();
-
socket_base_t::process_term ();
}
-void zmq::pull_t::terminated ()
-{
- unregister_term_ack ();
-}
-
int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_)
{
return fq.recv (msg_, flags_);
diff --git a/src/pull.hpp b/src/pull.hpp
index 997eebf..1b53e3b 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -20,14 +20,13 @@
#ifndef __ZMQ_PULL_HPP_INCLUDED__
#define __ZMQ_PULL_HPP_INCLUDED__
-#include "i_terminate_events.hpp"
#include "socket_base.hpp"
#include "fq.hpp"
namespace zmq
{
- class pull_t : public socket_base_t, public i_terminate_events
+ class pull_t : public socket_base_t
{
public:
@@ -44,9 +43,6 @@ namespace zmq
private:
- // i_terminate_events interface implementation.
- void terminated ();
-
// Hook into the termination process.
void process_term ();
diff --git a/src/push.cpp b/src/push.cpp
index f587cef..20943fc 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -44,17 +44,10 @@ void zmq::push_t::xattach_pipes (class reader_t *inpipe_,
void zmq::push_t::process_term ()
{
- register_term_acks (1);
lb.terminate ();
-
socket_base_t::process_term ();
}
-void zmq::push_t::terminated ()
-{
- unregister_term_ack ();
-}
-
int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_)
{
return lb.send (msg_, flags_);
diff --git a/src/push.hpp b/src/push.hpp
index aed2662..29a1a1a 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -20,14 +20,13 @@
#ifndef __ZMQ_PUSH_HPP_INCLUDED__
#define __ZMQ_PUSH_HPP_INCLUDED__
-#include "i_terminate_events.hpp"
#include "socket_base.hpp"
#include "lb.hpp"
namespace zmq
{
- class push_t : public socket_base_t, public i_terminate_events
+ class push_t : public socket_base_t
{
public:
@@ -44,9 +43,6 @@ namespace zmq
private:
- // i_terminate_events interface implementation.
- void terminated ();
-
// Hook into the termination process.
void process_term ();
diff --git a/src/sub.cpp b/src/sub.cpp
index 89df106..bee8a06 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -49,17 +49,10 @@ void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
void zmq::sub_t::process_term ()
{
- register_term_acks (1);
fq.terminate ();
-
socket_base_t::process_term ();
}
-void zmq::sub_t::terminated ()
-{
- unregister_term_ack ();
-}
-
int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
diff --git a/src/sub.hpp b/src/sub.hpp
index 659e04b..06a5333 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -24,13 +24,12 @@
#include "trie.hpp"
#include "socket_base.hpp"
-#include "i_terminate_events.hpp"
#include "fq.hpp"
namespace zmq
{
- class sub_t : public socket_base_t, public i_terminate_events
+ class sub_t : public socket_base_t
{
public:
@@ -48,9 +47,6 @@ namespace zmq
private:
- // i_terminate_events interface implementation.
- void terminated ();
-
// Hook into the termination process.
void process_term ();
diff --git a/src/xreq.cpp b/src/xreq.cpp
index e511660..2373f34 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -45,18 +45,11 @@ void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,
void zmq::xreq_t::process_term ()
{
- register_term_acks (2);
fq.terminate ();
lb.terminate ();
-
socket_base_t::process_term ();
}
-void zmq::xreq_t::terminated ()
-{
- unregister_term_ack ();
-}
-
int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
{
return lb.send (msg_, flags_);
diff --git a/src/xreq.hpp b/src/xreq.hpp
index 9dc10c5..eeb349d 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -21,14 +21,13 @@
#define __ZMQ_XREQ_HPP_INCLUDED__
#include "socket_base.hpp"
-#include "i_terminate_events.hpp"
#include "fq.hpp"
#include "lb.hpp"
namespace zmq
{
- class xreq_t : public socket_base_t, public i_terminate_events
+ class xreq_t : public socket_base_t
{
public:
@@ -47,9 +46,6 @@ namespace zmq
private:
- // i_terminate_events interface implementation.
- void terminated ();
-
// Hook into the termination process.
void process_term ();