diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2012-02-16 10:10:49 +0900 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2012-02-16 10:10:49 +0900 |
commit | 50f225a04422abf64545f5eb36592d8c990b0ae4 (patch) | |
tree | 1ad0396c3f32b4e89f80d4abdbf0d9ced6a1e55c /src | |
parent | e5e9852181947a9c41db9ff3c47e94d66a933161 (diff) |
poller_base_t renamed to io_thread_t
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
66 files changed, 204 insertions, 207 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 7d3ee8d..9f11611 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -23,6 +23,7 @@ libxs_la_SOURCES = \ fd.hpp \ fq.hpp \ io_object.hpp \ + io_thread.hpp \ ip.hpp \ ipc_address.hpp \ ipc_connecter.hpp \ @@ -45,7 +46,6 @@ libxs_la_SOURCES = \ pipe.hpp \ platform.hpp \ poll.hpp \ - poller_base.hpp \ pair.hpp \ pub.hpp \ pull.hpp \ @@ -85,6 +85,7 @@ libxs_la_SOURCES = \ err.cpp \ fq.cpp \ io_object.cpp \ + io_thread.cpp \ ip.cpp \ ipc_address.cpp \ ipc_connecter.cpp \ @@ -104,7 +105,6 @@ libxs_la_SOURCES = \ pgm_socket.cpp \ pipe.cpp \ poll.cpp \ - poller_base.cpp \ pull.cpp \ push.cpp \ reaper.cpp \ diff --git a/src/ctx.cpp b/src/ctx.cpp index 8b27b31..789cdc2 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -197,7 +197,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_) // Create I/O thread objects and launch them. for (uint32_t i = 2; i != io_thread_count + 2; i++) { - poller_base_t *io_thread = poller_base_t::create (this, i); + io_thread_t *io_thread = io_thread_t::create (this, i); errno_assert (io_thread); io_threads.push_back (io_thread); slots [i] = io_thread->get_mailbox (); @@ -226,7 +226,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_) #endif // Create the monitor object. - poller_base_t *io_thread = choose_io_thread (0); + io_thread_t *io_thread = choose_io_thread (0); xs_assert (io_thread); monitor = new (std::nothrow) monitor_t (io_thread); alloc_assert (monitor); @@ -301,7 +301,7 @@ void xs::ctx_t::send_command (uint32_t tid_, const command_t &command_) slots [tid_]->send (command_); } -xs::poller_base_t *xs::ctx_t::choose_io_thread (uint64_t affinity_) +xs::io_thread_t *xs::ctx_t::choose_io_thread (uint64_t affinity_) { if (io_threads.empty ()) return NULL; diff --git a/src/ctx.hpp b/src/ctx.hpp index c12fffc..e912443 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -40,7 +40,7 @@ namespace xs class object_t; class monitor_t; - class poller_base_t; + class io_thread_t; class socket_base_t; class reaper_t; @@ -86,7 +86,7 @@ namespace xs // Returns the I/O thread that is the least busy at the moment. // Affinity specifies which I/O threads are eligible (0 = all). // Returns NULL is no I/O thread is available. - xs::poller_base_t *choose_io_thread (uint64_t affinity_); + xs::io_thread_t *choose_io_thread (uint64_t affinity_); // Returns reaper thread object. xs::object_t *get_reaper (); @@ -142,7 +142,7 @@ namespace xs xs::reaper_t *reaper; // I/O threads. - typedef std::vector <xs::poller_base_t*> io_threads_t; + typedef std::vector <xs::io_thread_t*> io_threads_t; io_threads_t io_threads; // Array of pointers to mailboxes for both application and I/O threads. diff --git a/src/devpoll.cpp b/src/devpoll.cpp index e85b765..936d6c2 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -38,7 +38,7 @@ #include "config.hpp" xs::devpoll_t::devpoll_t (xs::ctx_t *ctx_, uint32_t tid_) : - poller_base_t (ctx_, tid_), + io_thread_t (ctx_, tid_), stopping (false) { devpoll_fd = open ("/dev/poll", O_RDWR); diff --git a/src/devpoll.hpp b/src/devpoll.hpp index b32aa28..3947f3e 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -30,7 +30,7 @@ #include "fd.hpp" #include "thread.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" namespace xs { @@ -39,14 +39,14 @@ namespace xs // Implements socket polling mechanism using the "/dev/poll" interface. - class devpoll_t : public poller_base_t + class devpoll_t : public io_thread_t { public: devpoll_t (xs::ctx_t *ctx_, uint32_t tid_); ~devpoll_t (); - // "poller" concept. + // Implementation of virtual functions from io_thread_t. handle_t add_fd (fd_t fd_, xs::i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); diff --git a/src/epoll.cpp b/src/epoll.cpp index 7ca4608..aa13b31 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -35,7 +35,7 @@ #include "err.hpp" xs::epoll_t::epoll_t (xs::ctx_t *ctx_, uint32_t tid_) : - poller_base_t (ctx_, tid_), + io_thread_t (ctx_, tid_), stopping (false) { epoll_fd = epoll_create (1); diff --git a/src/epoll.hpp b/src/epoll.hpp index 232d049..ad94120 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -31,7 +31,7 @@ #include "fd.hpp" #include "thread.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" namespace xs { @@ -42,14 +42,14 @@ namespace xs // This class implements socket polling mechanism using the Linux-specific // epoll mechanism. - class epoll_t : public poller_base_t + class epoll_t : public io_thread_t { public: epoll_t (xs::ctx_t *ctx_, uint32_t tid_); ~epoll_t (); - // "poller" concept. + // Implementation of virtual functions from io_thread_t. handle_t add_fd (fd_t fd_, xs::i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 9106411..fc03f0e 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -25,7 +25,7 @@ namespace xs { - class poller_base_t; + class io_thread_t; // Abstract interface to be implemented by various engines. @@ -34,7 +34,7 @@ namespace xs virtual ~i_engine () {} // Plug the engine to the session. - virtual void plug (xs::poller_base_t *io_thread_, + virtual void plug (xs::io_thread_t *io_thread_, class session_base_t *session_) = 0; // Unplug the engine from the session. diff --git a/src/io_object.cpp b/src/io_object.cpp index 0c73571..c17ff04 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -20,11 +20,11 @@ */ #include "io_object.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" #include "err.hpp" -xs::io_object_t::io_object_t (poller_base_t *io_thread_) : - poller (NULL) +xs::io_object_t::io_object_t (io_thread_t *io_thread_) : + io_thread (NULL) { if (io_thread_) plug (io_thread_); @@ -34,62 +34,59 @@ xs::io_object_t::~io_object_t () { } -void xs::io_object_t::plug (poller_base_t *io_thread_) +void xs::io_object_t::plug (io_thread_t *io_thread_) { xs_assert (io_thread_); - xs_assert (!poller); + xs_assert (!io_thread); - // Retrieve the poller from the thread we are running in. - poller = io_thread_; + // Retrieve the io_thread from the thread we are running in. + io_thread = io_thread_; } void xs::io_object_t::unplug () { - xs_assert (poller); - - // Forget about old poller in preparation to be migrated - // to a different I/O thread. - poller = NULL; + xs_assert (io_thread); + io_thread = NULL; } xs::handle_t xs::io_object_t::add_fd (fd_t fd_) { - return poller->add_fd (fd_, this); + return io_thread->add_fd (fd_, this); } void xs::io_object_t::rm_fd (handle_t handle_) { - poller->rm_fd (handle_); + io_thread->rm_fd (handle_); } void xs::io_object_t::set_pollin (handle_t handle_) { - poller->set_pollin (handle_); + io_thread->set_pollin (handle_); } void xs::io_object_t::reset_pollin (handle_t handle_) { - poller->reset_pollin (handle_); + io_thread->reset_pollin (handle_); } void xs::io_object_t::set_pollout (handle_t handle_) { - poller->set_pollout (handle_); + io_thread->set_pollout (handle_); } void xs::io_object_t::reset_pollout (handle_t handle_) { - poller->reset_pollout (handle_); + io_thread->reset_pollout (handle_); } xs::handle_t xs::io_object_t::add_timer (int timeout_) { - return poller->add_timer (timeout_, this); + return io_thread->add_timer (timeout_, this); } void xs::io_object_t::rm_timer (handle_t handle_) { - poller->rm_timer (handle_); + io_thread->rm_timer (handle_); } void xs::io_object_t::in_event (fd_t fd_) diff --git a/src/io_object.hpp b/src/io_object.hpp index 0749ceb..31f9507 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -25,30 +25,30 @@ #include <stddef.h> #include "stdint.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" namespace xs { // Simple base class for objects that live in I/O threads. - // It makes communication with the poller object easier and + // It makes communication with the io_thread object easier and // makes defining unneeded event handlers unnecessary. class io_object_t : public i_poll_events { public: - io_object_t (xs::poller_base_t *io_thread_ = NULL); + io_object_t (xs::io_thread_t *io_thread_ = NULL); ~io_object_t (); // When migrating an object from one I/O thread to another, first // unplug it, then migrate it, then plug it to the new thread. - void plug (xs::poller_base_t *io_thread_); + void plug (xs::io_thread_t *io_thread_); void unplug (); protected: - // Methods to access underlying poller object. + // Methods to access underlying io_thread object. handle_t add_fd (fd_t fd_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); @@ -65,7 +65,7 @@ namespace xs private: - poller_base_t *poller; + io_thread_t *io_thread; io_object_t (const io_object_t&); const io_object_t &operator = (const io_object_t&); diff --git a/src/poller_base.cpp b/src/io_thread.cpp index 913a8d7..d6dca8c 100644 --- a/src/poller_base.cpp +++ b/src/io_thread.cpp @@ -18,7 +18,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "poller_base.hpp" +#include "io_thread.hpp" #include "err.hpp" #include "select.hpp" @@ -27,9 +27,9 @@ #include "devpoll.hpp" #include "kqueue.hpp" -xs::poller_base_t *xs::poller_base_t::create (xs::ctx_t *ctx_, uint32_t tid_) +xs::io_thread_t *xs::io_thread_t::create (xs::ctx_t *ctx_, uint32_t tid_) { - poller_base_t *result; + io_thread_t *result; #if defined XS_HAVE_SELECT result = new (std::nothrow) select_t (ctx_, tid_); #elif defined XS_HAVE_POLL @@ -45,45 +45,45 @@ xs::poller_base_t *xs::poller_base_t::create (xs::ctx_t *ctx_, uint32_t tid_) return result; } -xs::poller_base_t::poller_base_t (xs::ctx_t *ctx_, uint32_t tid_) : +xs::io_thread_t::io_thread_t (xs::ctx_t *ctx_, uint32_t tid_) : object_t (ctx_, tid_) { } -xs::poller_base_t::~poller_base_t () +xs::io_thread_t::~io_thread_t () { } -void xs::poller_base_t::start () +void xs::io_thread_t::start () { mailbox_handle = add_fd (mailbox.get_fd (), this); set_pollin (mailbox_handle); xstart (); } -void xs::poller_base_t::stop () +void xs::io_thread_t::stop () { // Ask the I/O thread to stop. send_stop (); } -void xs::poller_base_t::process_stop () +void xs::io_thread_t::process_stop () { rm_fd (mailbox_handle); xstop (); } -xs::mailbox_t *xs::poller_base_t::get_mailbox () +xs::mailbox_t *xs::io_thread_t::get_mailbox () { return &mailbox; } -int xs::poller_base_t::get_load () +int xs::io_thread_t::get_load () { return load.get (); } -void xs::poller_base_t::adjust_load (int amount_) +void xs::io_thread_t::adjust_load (int amount_) { if (amount_ > 0) load.add (amount_); @@ -91,7 +91,7 @@ void xs::poller_base_t::adjust_load (int amount_) load.sub (-amount_); } -xs::handle_t xs::poller_base_t::add_timer (int timeout_, i_poll_events *sink_) +xs::handle_t xs::io_thread_t::add_timer (int timeout_, i_poll_events *sink_) { uint64_t expiration = clock.now_ms () + timeout_; timer_info_t info = {sink_, timers_t::iterator ()}; @@ -101,13 +101,13 @@ xs::handle_t xs::poller_base_t::add_timer (int timeout_, i_poll_events *sink_) return (handle_t) &(it->second); } -void xs::poller_base_t::rm_timer (handle_t handle_) +void xs::io_thread_t::rm_timer (handle_t handle_) { timer_info_t *info = (timer_info_t*) handle_; timers.erase (info->self); } -uint64_t xs::poller_base_t::execute_timers () +uint64_t xs::io_thread_t::execute_timers () { // Fast track. if (timers.empty ()) @@ -140,7 +140,7 @@ uint64_t xs::poller_base_t::execute_timers () return 0; } -void xs::poller_base_t::in_event (fd_t fd_) +void xs::io_thread_t::in_event (fd_t fd_) { // TODO: Do we want to limit number of commands I/O thread can // process in a single go? @@ -161,13 +161,13 @@ void xs::poller_base_t::in_event (fd_t fd_) } } -void xs::poller_base_t::out_event (fd_t fd_) +void xs::io_thread_t::out_event (fd_t fd_) { // We are never polling for POLLOUT here. This function is never called. xs_assert (false); } -void xs::poller_base_t::timer_event (handle_t handle_) +void xs::io_thread_t::timer_event (handle_t handle_) { // No timers here. This function is never called. xs_assert (false); diff --git a/src/poller_base.hpp b/src/io_thread.hpp index 2388fad..689b851 100644 --- a/src/poller_base.hpp +++ b/src/io_thread.hpp @@ -18,8 +18,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __XS_POLLER_BASE_HPP_INCLUDED__ -#define __XS_POLLER_BASE_HPP_INCLUDED__ +#ifndef __XS_IO_THREAD_HPP_INCLUDED__ +#define __XS_IO_THREAD_HPP_INCLUDED__ #include <map> @@ -54,23 +54,23 @@ namespace xs virtual void timer_event (handle_t handle_) = 0; }; - class poller_base_t : public object_t, public i_poll_events + class io_thread_t : public object_t, public i_poll_events { public: - // Create optimal poller mechanism for this environment. - static poller_base_t *create (xs::ctx_t *ctx_, uint32_t tid_); + // Create optimal polling mechanism for this environment. + static io_thread_t *create (xs::ctx_t *ctx_, uint32_t tid_); - virtual ~poller_base_t (); + virtual ~io_thread_t (); - // Returns load of the poller. Note that this function can be + // Returns load of the I/O thread. Note that this function can be // invoked from a different thread! int get_load (); void start (); void stop (); - // Returns mailbox associated with this I/O poller. + // Returns mailbox associated with this I/O thread. mailbox_t *get_mailbox (); virtual handle_t add_fd (fd_t fd_, xs::i_poll_events *events_) = 0; @@ -96,9 +96,9 @@ namespace xs protected: - poller_base_t (xs::ctx_t *ctx_, uint32_t tid_); + io_thread_t (xs::ctx_t *ctx_, uint32_t tid_); - // Called by individual poller implementations to manage the load. + // Called by individual io_thread implementations to manage the load. void adjust_load (int amount_); // Executes any timers that are due. Returns number of milliseconds @@ -121,7 +121,7 @@ namespace xs typedef std::multimap <uint64_t, timer_info_t> timers_t; timers_t timers; - // Load of the poller. Currently the number of file descriptors + // Load of the I/O thread. Currently the number of file descriptors // registered. atomic_counter_t load; @@ -131,8 +131,8 @@ namespace xs // Handle associated with mailbox' file descriptor. handle_t mailbox_handle; - poller_base_t (const poller_base_t&); - const poller_base_t &operator = (const poller_base_t&); + io_thread_t (const io_thread_t&); + const io_thread_t &operator = (const io_thread_t&); }; } diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index b65887a..f6a01c3 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -26,7 +26,7 @@ #include <string> #include "stream_engine.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" #include "platform.hpp" #include "random.hpp" #include "err.hpp" @@ -37,7 +37,7 @@ #include <sys/socket.h> #include <sys/un.h> -xs::ipc_connecter_t::ipc_connecter_t (class poller_base_t *io_thread_, +xs::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, class session_base_t *session_, const options_t &options_, const char *address_, bool wait_) : own_t (io_thread_, options_), diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index 46e260c..e598393 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -34,7 +34,7 @@ namespace xs { - class poller_base_t; + class io_thread_t; class session_base_t; class ipc_connecter_t : public own_t, public io_object_t @@ -43,7 +43,7 @@ namespace xs // If 'delay' is true connecter first waits for a while, then starts // connection process. - ipc_connecter_t (xs::poller_base_t *io_thread_, + ipc_connecter_t (xs::io_thread_t *io_thread_, xs::session_base_t *session_, const options_t & |