From 4c6d07d3668558d910c9b1d19d52ccdeacc90574 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 18 Sep 2010 07:37:36 +0200 Subject: single term ack counting mechanism for every socket (no separate mechanisms for fq_t and lb_t) --- src/Makefile.am | 1 - src/fq.cpp | 18 ++++++++---------- src/fq.hpp | 4 ++-- src/i_terminate_events.hpp | 38 -------------------------------------- src/lb.cpp | 18 ++++++++---------- src/lb.hpp | 4 ++-- src/own.hpp | 16 ++++++++-------- src/pull.cpp | 7 ------- src/pull.hpp | 6 +----- src/push.cpp | 7 ------- src/push.hpp | 6 +----- src/sub.cpp | 7 ------- src/sub.hpp | 6 +----- src/xreq.cpp | 7 ------- src/xreq.hpp | 6 +----- 15 files changed, 32 insertions(+), 119 deletions(-) delete mode 100644 src/i_terminate_events.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index 86a6fbd..961b83e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -72,7 +72,6 @@ libzmq_la_SOURCES = \ ip.hpp \ i_engine.hpp \ i_poll_events.hpp \ - i_terminate_events.hpp \ kqueue.hpp \ lb.hpp \ likely.hpp \ diff --git a/src/fq.cpp b/src/fq.cpp index ad70633..2c3a9d9 100644 --- a/src/fq.cpp +++ b/src/fq.cpp @@ -22,9 +22,9 @@ #include "fq.hpp" #include "pipe.hpp" #include "err.hpp" -#include "i_terminate_events.hpp" +#include "own.hpp" -zmq::fq_t::fq_t (i_terminate_events *sink_) : +zmq::fq_t::fq_t (own_t *sink_) : active (0), current (0), more (false), @@ -47,8 +47,10 @@ void zmq::fq_t::attach (reader_t *pipe_) active++; // If we are already terminating, ask the pipe to terminate straight away. - if (terminating) + if (terminating) { + sink->register_term_acks (1); pipe_->terminate (); + } } void zmq::fq_t::terminated (reader_t *pipe_) @@ -67,8 +69,8 @@ void zmq::fq_t::terminated (reader_t *pipe_) } pipes.erase (pipe_); - if (terminating && pipes.empty ()) - sink->terminated (); + if (terminating) + sink->unregister_term_ack (); } void zmq::fq_t::terminate () @@ -76,11 +78,7 @@ void zmq::fq_t::terminate () zmq_assert (!terminating); terminating = true; - if (pipes.empty ()) { - sink->terminated (); - return; - } - + sink->register_term_acks (pipes.size ()); for (pipes_t::size_type i = 0; i != pipes.size (); i++) pipes [i]->terminate (); } diff --git a/src/fq.hpp b/src/fq.hpp index fd853d8..c356b21 100644 --- a/src/fq.hpp +++ b/src/fq.hpp @@ -33,7 +33,7 @@ namespace zmq { public: - fq_t (struct i_terminate_events *sink_); + fq_t (class own_t *sink_); ~fq_t (); void attach (reader_t *pipe_); @@ -64,7 +64,7 @@ namespace zmq bool more; // Object to send events to. - i_terminate_events *sink; + class own_t *sink; // If true, termination process is already underway. bool terminating; diff --git a/src/i_terminate_events.hpp b/src/i_terminate_events.hpp deleted file mode 100644 index 08599ff..0000000 --- a/src/i_terminate_events.hpp +++ /dev/null @@ -1,38 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_I_TERMINATE_EVENTS_HPP_INCLUDED__ -#define __ZMQ_I_TERMINATE_EVENTS_HPP_INCLUDED__ - -namespace zmq -{ - - // Algorithms such as fair queueing (fq_t) and load balancing (lb_t) - // use this interface to communicate termination event to the socket. - - struct i_terminate_events - { - virtual ~i_terminate_events () {} - - virtual void terminated () = 0; - }; - -} - -#endif diff --git a/src/lb.cpp b/src/lb.cpp index 7b3339c..62d0680 100644 --- a/src/lb.cpp +++ b/src/lb.cpp @@ -22,9 +22,9 @@ #include "lb.hpp" #include "pipe.hpp" #include "err.hpp" -#include "i_terminate_events.hpp" +#include "own.hpp" -zmq::lb_t::lb_t (i_terminate_events *sink_) : +zmq::lb_t::lb_t (own_t *sink_) : active (0), current (0), more (false), @@ -46,8 +46,10 @@ void zmq::lb_t::attach (writer_t *pipe_) pipes.swap (active, pipes.size () - 1); active++; - if (terminating) + if (terminating) { + sink->register_term_acks (1); pipe_->terminate (); + } } void zmq::lb_t::terminate () @@ -55,11 +57,7 @@ void zmq::lb_t::terminate () zmq_assert (!terminating); terminating = true; - if (pipes.empty ()) { - sink->terminated (); - return; - } - + sink->register_term_acks (pipes.size ()); for (pipes_t::size_type i = 0; i != pipes.size (); i++) pipes [i]->terminate (); } @@ -75,8 +73,8 @@ void zmq::lb_t::terminated (writer_t *pipe_) } pipes.erase (pipe_); - if (terminating && pipes.empty ()) - sink->terminated (); + if (terminating) + sink->unregister_term_ack (); } void zmq::lb_t::activated (writer_t *pipe_) diff --git a/src/lb.hpp b/src/lb.hpp index ea965f8..29ea343 100644 --- a/src/lb.hpp +++ b/src/lb.hpp @@ -32,7 +32,7 @@ namespace zmq { public: - lb_t (struct i_terminate_events *sink_); + lb_t (class own_t *sink_); ~lb_t (); void attach (writer_t *pipe_); @@ -61,7 +61,7 @@ namespace zmq bool more; // Object to send events to. - struct i_terminate_events *sink; + class own_t *sink; // If true, termination process is already underway. bool terminating; diff --git a/src/own.hpp b/src/own.hpp index b65177e..6b6f7bf 100644 --- a/src/own.hpp +++ b/src/own.hpp @@ -52,6 +52,14 @@ namespace zmq // before the command is delivered. void inc_seqnum (); + // Use following two functions to wait for arbitrary events before + // terminating. Just add number of events to wait for using + // register_tem_acks functions. When event occurs, call + // remove_term_ack. When number of pending acks reaches zero + // object will be deallocated. + void register_term_acks (int count_); + void unregister_term_ack (); + protected: // Launch the supplied object and become its owner. @@ -77,14 +85,6 @@ namespace zmq // steps to the beginning of the termination process. void process_term (); - // Use following two functions to wait for arbitrary events before - // terminating. Just add number of events to wait for using - // register_tem_acks functions. When event occurs, call - // remove_term_ack. When number of pending acks reaches zero - // object will be deallocated. - void register_term_acks (int count_); - void unregister_term_ack (); - // A place to hook in when phyicallal destruction of the object // is to be delayed. virtual void process_destroy (); diff --git a/src/pull.cpp b/src/pull.cpp index e7b5239..cbfcdbf 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -43,17 +43,10 @@ void zmq::pull_t::xattach_pipes (class reader_t *inpipe_, void zmq::pull_t::process_term () { - register_term_acks (1); fq.terminate (); - socket_base_t::process_term (); } -void zmq::pull_t::terminated () -{ - unregister_term_ack (); -} - int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_) { return fq.recv (msg_, flags_); diff --git a/src/pull.hpp b/src/pull.hpp index 997eebf..1b53e3b 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -20,14 +20,13 @@ #ifndef __ZMQ_PULL_HPP_INCLUDED__ #define __ZMQ_PULL_HPP_INCLUDED__ -#include "i_terminate_events.hpp" #include "socket_base.hpp" #include "fq.hpp" namespace zmq { - class pull_t : public socket_base_t, public i_terminate_events + class pull_t : public socket_base_t { public: @@ -44,9 +43,6 @@ namespace zmq private: - // i_terminate_events interface implementation. - void terminated (); - // Hook into the termination process. void process_term (); diff --git a/src/push.cpp b/src/push.cpp index f587cef..20943fc 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -44,17 +44,10 @@ void zmq::push_t::xattach_pipes (class reader_t *inpipe_, void zmq::push_t::process_term () { - register_term_acks (1); lb.terminate (); - socket_base_t::process_term (); } -void zmq::push_t::terminated () -{ - unregister_term_ack (); -} - int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_) { return lb.send (msg_, flags_); diff --git a/src/push.hpp b/src/push.hpp index aed2662..29a1a1a 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -20,14 +20,13 @@ #ifndef __ZMQ_PUSH_HPP_INCLUDED__ #define __ZMQ_PUSH_HPP_INCLUDED__ -#include "i_terminate_events.hpp" #include "socket_base.hpp" #include "lb.hpp" namespace zmq { - class push_t : public socket_base_t, public i_terminate_events + class push_t : public socket_base_t { public: @@ -44,9 +43,6 @@ namespace zmq private: - // i_terminate_events interface implementation. - void terminated (); - // Hook into the termination process. void process_term (); diff --git a/src/sub.cpp b/src/sub.cpp index 89df106..bee8a06 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -49,17 +49,10 @@ void zmq::sub_t::xattach_pipes (class reader_t *inpipe_, void zmq::sub_t::process_term () { - register_term_acks (1); fq.terminate (); - socket_base_t::process_term (); } -void zmq::sub_t::terminated () -{ - unregister_term_ack (); -} - int zmq::sub_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { diff --git a/src/sub.hpp b/src/sub.hpp index 659e04b..06a5333 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -24,13 +24,12 @@ #include "trie.hpp" #include "socket_base.hpp" -#include "i_terminate_events.hpp" #include "fq.hpp" namespace zmq { - class sub_t : public socket_base_t, public i_terminate_events + class sub_t : public socket_base_t { public: @@ -48,9 +47,6 @@ namespace zmq private: - // i_terminate_events interface implementation. - void terminated (); - // Hook into the termination process. void process_term (); diff --git a/src/xreq.cpp b/src/xreq.cpp index e511660..2373f34 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -45,18 +45,11 @@ void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_, void zmq::xreq_t::process_term () { - register_term_acks (2); fq.terminate (); lb.terminate (); - socket_base_t::process_term (); } -void zmq::xreq_t::terminated () -{ - unregister_term_ack (); -} - int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) { return lb.send (msg_, flags_); diff --git a/src/xreq.hpp b/src/xreq.hpp index 9dc10c5..eeb349d 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -21,14 +21,13 @@ #define __ZMQ_XREQ_HPP_INCLUDED__ #include "socket_base.hpp" -#include "i_terminate_events.hpp" #include "fq.hpp" #include "lb.hpp" namespace zmq { - class xreq_t : public socket_base_t, public i_terminate_events + class xreq_t : public socket_base_t { public: @@ -47,9 +46,6 @@ namespace zmq private: - // i_terminate_events interface implementation. - void terminated (); - // Hook into the termination process. void process_term (); -- cgit v1.2.3