From 50f225a04422abf64545f5eb36592d8c990b0ae4 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 16 Feb 2012 10:10:49 +0900 Subject: poller_base_t renamed to io_thread_t Signed-off-by: Martin Sustrik --- src/Makefile.am | 4 +- src/ctx.cpp | 6 +- src/ctx.hpp | 6 +- src/devpoll.cpp | 2 +- src/devpoll.hpp | 6 +- src/epoll.cpp | 2 +- src/epoll.hpp | 6 +- src/i_engine.hpp | 4 +- src/io_object.cpp | 37 +++++------ src/io_object.hpp | 12 ++-- src/io_thread.cpp | 174 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/io_thread.hpp | 140 ++++++++++++++++++++++++++++++++++++++++ src/ipc_connecter.cpp | 4 +- src/ipc_connecter.hpp | 6 +- src/ipc_listener.cpp | 6 +- src/ipc_listener.hpp | 4 +- src/kqueue.cpp | 2 +- src/kqueue.hpp | 6 +- src/monitor.cpp | 4 +- src/monitor.hpp | 6 +- src/object.cpp | 4 +- src/object.hpp | 4 +- src/own.cpp | 4 +- src/own.hpp | 4 +- src/pair.cpp | 2 +- src/pair.hpp | 2 +- src/pgm_receiver.cpp | 4 +- src/pgm_receiver.hpp | 6 +- src/pgm_sender.cpp | 8 +-- src/pgm_sender.hpp | 6 +- src/poll.cpp | 2 +- src/poll.hpp | 6 +- src/poller_base.cpp | 174 -------------------------------------------------- src/poller_base.hpp | 140 ---------------------------------------- src/pub.cpp | 2 +- src/pub.hpp | 4 +- src/pull.cpp | 2 +- src/pull.hpp | 4 +- src/push.cpp | 2 +- src/push.hpp | 4 +- src/reaper.cpp | 24 +++---- src/reaper.hpp | 6 +- src/rep.cpp | 2 +- src/rep.hpp | 4 +- src/req.cpp | 2 +- src/req.hpp | 4 +- src/select.cpp | 2 +- src/select.hpp | 6 +- src/session_base.cpp | 6 +- src/session_base.hpp | 8 +-- src/socket_base.cpp | 18 +++--- src/socket_base.hpp | 14 ++-- src/stream_engine.cpp | 8 +-- src/stream_engine.hpp | 4 +- src/sub.cpp | 2 +- src/sub.hpp | 4 +- src/tcp_connecter.cpp | 4 +- src/tcp_connecter.hpp | 6 +- src/tcp_listener.cpp | 6 +- src/tcp_listener.hpp | 4 +- src/xpub.cpp | 2 +- src/xpub.hpp | 4 +- src/xrep.cpp | 2 +- src/xrep.hpp | 4 +- src/xreq.cpp | 2 +- src/xreq.hpp | 4 +- src/xsub.cpp | 2 +- src/xsub.hpp | 4 +- 68 files changed, 488 insertions(+), 491 deletions(-) create mode 100644 src/io_thread.cpp create mode 100644 src/io_thread.hpp delete mode 100644 src/poller_base.cpp delete mode 100644 src/poller_base.hpp diff --git a/src/Makefile.am b/src/Makefile.am index 7d3ee8d..9f11611 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -23,6 +23,7 @@ libxs_la_SOURCES = \ fd.hpp \ fq.hpp \ io_object.hpp \ + io_thread.hpp \ ip.hpp \ ipc_address.hpp \ ipc_connecter.hpp \ @@ -45,7 +46,6 @@ libxs_la_SOURCES = \ pipe.hpp \ platform.hpp \ poll.hpp \ - poller_base.hpp \ pair.hpp \ pub.hpp \ pull.hpp \ @@ -85,6 +85,7 @@ libxs_la_SOURCES = \ err.cpp \ fq.cpp \ io_object.cpp \ + io_thread.cpp \ ip.cpp \ ipc_address.cpp \ ipc_connecter.cpp \ @@ -104,7 +105,6 @@ libxs_la_SOURCES = \ pgm_socket.cpp \ pipe.cpp \ poll.cpp \ - poller_base.cpp \ pull.cpp \ push.cpp \ reaper.cpp \ diff --git a/src/ctx.cpp b/src/ctx.cpp index 8b27b31..789cdc2 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -197,7 +197,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_) // Create I/O thread objects and launch them. for (uint32_t i = 2; i != io_thread_count + 2; i++) { - poller_base_t *io_thread = poller_base_t::create (this, i); + io_thread_t *io_thread = io_thread_t::create (this, i); errno_assert (io_thread); io_threads.push_back (io_thread); slots [i] = io_thread->get_mailbox (); @@ -226,7 +226,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_) #endif // Create the monitor object. - poller_base_t *io_thread = choose_io_thread (0); + io_thread_t *io_thread = choose_io_thread (0); xs_assert (io_thread); monitor = new (std::nothrow) monitor_t (io_thread); alloc_assert (monitor); @@ -301,7 +301,7 @@ void xs::ctx_t::send_command (uint32_t tid_, const command_t &command_) slots [tid_]->send (command_); } -xs::poller_base_t *xs::ctx_t::choose_io_thread (uint64_t affinity_) +xs::io_thread_t *xs::ctx_t::choose_io_thread (uint64_t affinity_) { if (io_threads.empty ()) return NULL; diff --git a/src/ctx.hpp b/src/ctx.hpp index c12fffc..e912443 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -40,7 +40,7 @@ namespace xs class object_t; class monitor_t; - class poller_base_t; + class io_thread_t; class socket_base_t; class reaper_t; @@ -86,7 +86,7 @@ namespace xs // Returns the I/O thread that is the least busy at the moment. // Affinity specifies which I/O threads are eligible (0 = all). // Returns NULL is no I/O thread is available. - xs::poller_base_t *choose_io_thread (uint64_t affinity_); + xs::io_thread_t *choose_io_thread (uint64_t affinity_); // Returns reaper thread object. xs::object_t *get_reaper (); @@ -142,7 +142,7 @@ namespace xs xs::reaper_t *reaper; // I/O threads. - typedef std::vector io_threads_t; + typedef std::vector io_threads_t; io_threads_t io_threads; // Array of pointers to mailboxes for both application and I/O threads. diff --git a/src/devpoll.cpp b/src/devpoll.cpp index e85b765..936d6c2 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -38,7 +38,7 @@ #include "config.hpp" xs::devpoll_t::devpoll_t (xs::ctx_t *ctx_, uint32_t tid_) : - poller_base_t (ctx_, tid_), + io_thread_t (ctx_, tid_), stopping (false) { devpoll_fd = open ("/dev/poll", O_RDWR); diff --git a/src/devpoll.hpp b/src/devpoll.hpp index b32aa28..3947f3e 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -30,7 +30,7 @@ #include "fd.hpp" #include "thread.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" namespace xs { @@ -39,14 +39,14 @@ namespace xs // Implements socket polling mechanism using the "/dev/poll" interface. - class devpoll_t : public poller_base_t + class devpoll_t : public io_thread_t { public: devpoll_t (xs::ctx_t *ctx_, uint32_t tid_); ~devpoll_t (); - // "poller" concept. + // Implementation of virtual functions from io_thread_t. handle_t add_fd (fd_t fd_, xs::i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); diff --git a/src/epoll.cpp b/src/epoll.cpp index 7ca4608..aa13b31 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -35,7 +35,7 @@ #include "err.hpp" xs::epoll_t::epoll_t (xs::ctx_t *ctx_, uint32_t tid_) : - poller_base_t (ctx_, tid_), + io_thread_t (ctx_, tid_), stopping (false) { epoll_fd = epoll_create (1); diff --git a/src/epoll.hpp b/src/epoll.hpp index 232d049..ad94120 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -31,7 +31,7 @@ #include "fd.hpp" #include "thread.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" namespace xs { @@ -42,14 +42,14 @@ namespace xs // This class implements socket polling mechanism using the Linux-specific // epoll mechanism. - class epoll_t : public poller_base_t + class epoll_t : public io_thread_t { public: epoll_t (xs::ctx_t *ctx_, uint32_t tid_); ~epoll_t (); - // "poller" concept. + // Implementation of virtual functions from io_thread_t. handle_t add_fd (fd_t fd_, xs::i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 9106411..fc03f0e 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -25,7 +25,7 @@ namespace xs { - class poller_base_t; + class io_thread_t; // Abstract interface to be implemented by various engines. @@ -34,7 +34,7 @@ namespace xs virtual ~i_engine () {} // Plug the engine to the session. - virtual void plug (xs::poller_base_t *io_thread_, + virtual void plug (xs::io_thread_t *io_thread_, class session_base_t *session_) = 0; // Unplug the engine from the session. diff --git a/src/io_object.cpp b/src/io_object.cpp index 0c73571..c17ff04 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -20,11 +20,11 @@ */ #include "io_object.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" #include "err.hpp" -xs::io_object_t::io_object_t (poller_base_t *io_thread_) : - poller (NULL) +xs::io_object_t::io_object_t (io_thread_t *io_thread_) : + io_thread (NULL) { if (io_thread_) plug (io_thread_); @@ -34,62 +34,59 @@ xs::io_object_t::~io_object_t () { } -void xs::io_object_t::plug (poller_base_t *io_thread_) +void xs::io_object_t::plug (io_thread_t *io_thread_) { xs_assert (io_thread_); - xs_assert (!poller); + xs_assert (!io_thread); - // Retrieve the poller from the thread we are running in. - poller = io_thread_; + // Retrieve the io_thread from the thread we are running in. + io_thread = io_thread_; } void xs::io_object_t::unplug () { - xs_assert (poller); - - // Forget about old poller in preparation to be migrated - // to a different I/O thread. - poller = NULL; + xs_assert (io_thread); + io_thread = NULL; } xs::handle_t xs::io_object_t::add_fd (fd_t fd_) { - return poller->add_fd (fd_, this); + return io_thread->add_fd (fd_, this); } void xs::io_object_t::rm_fd (handle_t handle_) { - poller->rm_fd (handle_); + io_thread->rm_fd (handle_); } void xs::io_object_t::set_pollin (handle_t handle_) { - poller->set_pollin (handle_); + io_thread->set_pollin (handle_); } void xs::io_object_t::reset_pollin (handle_t handle_) { - poller->reset_pollin (handle_); + io_thread->reset_pollin (handle_); } void xs::io_object_t::set_pollout (handle_t handle_) { - poller->set_pollout (handle_); + io_thread->set_pollout (handle_); } void xs::io_object_t::reset_pollout (handle_t handle_) { - poller->reset_pollout (handle_); + io_thread->reset_pollout (handle_); } xs::handle_t xs::io_object_t::add_timer (int timeout_) { - return poller->add_timer (timeout_, this); + return io_thread->add_timer (timeout_, this); } void xs::io_object_t::rm_timer (handle_t handle_) { - poller->rm_timer (handle_); + io_thread->rm_timer (handle_); } void xs::io_object_t::in_event (fd_t fd_) diff --git a/src/io_object.hpp b/src/io_object.hpp index 0749ceb..31f9507 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -25,30 +25,30 @@ #include #include "stdint.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" namespace xs { // Simple base class for objects that live in I/O threads. - // It makes communication with the poller object easier and + // It makes communication with the io_thread object easier and // makes defining unneeded event handlers unnecessary. class io_object_t : public i_poll_events { public: - io_object_t (xs::poller_base_t *io_thread_ = NULL); + io_object_t (xs::io_thread_t *io_thread_ = NULL); ~io_object_t (); // When migrating an object from one I/O thread to another, first // unplug it, then migrate it, then plug it to the new thread. - void plug (xs::poller_base_t *io_thread_); + void plug (xs::io_thread_t *io_thread_); void unplug (); protected: - // Methods to access underlying poller object. + // Methods to access underlying io_thread object. handle_t add_fd (fd_t fd_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); @@ -65,7 +65,7 @@ namespace xs private: - poller_base_t *poller; + io_thread_t *io_thread; io_object_t (const io_object_t&); const io_object_t &operator = (const io_object_t&); diff --git a/src/io_thread.cpp b/src/io_thread.cpp new file mode 100644 index 0000000..d6dca8c --- /dev/null +++ b/src/io_thread.cpp @@ -0,0 +1,174 @@ +/* + Copyright (c) 2010-2012 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads project. + + Crossroads is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "io_thread.hpp" +#include "err.hpp" + +#include "select.hpp" +#include "poll.hpp" +#include "epoll.hpp" +#include "devpoll.hpp" +#include "kqueue.hpp" + +xs::io_thread_t *xs::io_thread_t::create (xs::ctx_t *ctx_, uint32_t tid_) +{ + io_thread_t *result; +#if defined XS_HAVE_SELECT + result = new (std::nothrow) select_t (ctx_, tid_); +#elif defined XS_HAVE_POLL + result = new (std::nothrow) poll_t (ctx_, tid_); +#elif defined XS_HAVE_EPOLL + result = new (std::nothrow) epoll_t (ctx_, tid_); +#elif defined XS_HAVE_DEVPOLL + result = new (std::nothrow) devpoll_t (ctx_, tid_); +#elif defined XS_HAVE_KQUEUE + result = new (std::nothrow) kqueue_t (ctx_, tid_); +#endif + alloc_assert (result); + return result; +} + +xs::io_thread_t::io_thread_t (xs::ctx_t *ctx_, uint32_t tid_) : + object_t (ctx_, tid_) +{ +} + +xs::io_thread_t::~io_thread_t () +{ +} + +void xs::io_thread_t::start () +{ + mailbox_handle = add_fd (mailbox.get_fd (), this); + set_pollin (mailbox_handle); + xstart (); +} + +void xs::io_thread_t::stop () +{ + // Ask the I/O thread to stop. + send_stop (); +} + +void xs::io_thread_t::process_stop () +{ + rm_fd (mailbox_handle); + xstop (); +} + +xs::mailbox_t *xs::io_thread_t::get_mailbox () +{ + return &mailbox; +} + +int xs::io_thread_t::get_load () +{ + return load.get (); +} + +void xs::io_thread_t::adjust_load (int amount_) +{ + if (amount_ > 0) + load.add (amount_); + else if (amount_ < 0) + load.sub (-amount_); +} + +xs::handle_t xs::io_thread_t::add_timer (int timeout_, i_poll_events *sink_) +{ + uint64_t expiration = clock.now_ms () + timeout_; + timer_info_t info = {sink_, timers_t::iterator ()}; + timers_t::iterator it = timers.insert ( + timers_t::value_type (expiration, info)); + it->second.self = it; + return (handle_t) &(it->second); +} + +void xs::io_thread_t::rm_timer (handle_t handle_) +{ + timer_info_t *info = (timer_info_t*) handle_; + timers.erase (info->self); +} + +uint64_t xs::io_thread_t::execute_timers () +{ + // Fast track. + if (timers.empty ()) + return 0; + + // Get the current time. + uint64_t current = clock.now_ms (); + + // Execute the timers that are already due. + timers_t::iterator it = timers.begin (); + while (it != timers.end ()) { + + // If we have to wait to execute the item, same will be true about + // all the following items (multimap is sorted). Thus we can stop + // checking the subsequent timers and return the time to wait for + // the next timer (at least 1ms). + if (it->first > current) + return it->first - current; + + // Trigger the timer. + it->second.sink->timer_event ((handle_t) &it->second); + + // Remove it from the list of active timers. + timers_t::iterator o = it; + ++it; + timers.erase (o); + } + + // There are no more timers. + return 0; +} + +void xs::io_thread_t::in_event (fd_t fd_) +{ + // TODO: Do we want to limit number of commands I/O thread can + // process in a single go? + + while (true) { + + // Get the next command. If there is none, exit. + command_t cmd; + int rc = mailbox.recv (&cmd, 0); + if (rc != 0 && errno == EINTR) + continue; + if (rc != 0 && errno == EAGAIN) + break; + errno_assert (rc == 0); + + // Process the command. + cmd.destination->process_command (cmd); + } +} + +void xs::io_thread_t::out_event (fd_t fd_) +{ + // We are never polling for POLLOUT here. This function is never called. + xs_assert (false); +} + +void xs::io_thread_t::timer_event (handle_t handle_) +{ + // No timers here. This function is never called. + xs_assert (false); +} diff --git a/src/io_thread.hpp b/src/io_thread.hpp new file mode 100644 index 0000000..689b851 --- /dev/null +++ b/src/io_thread.hpp @@ -0,0 +1,140 @@ +/* + Copyright (c) 2010-2012 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads project. + + Crossroads is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __XS_IO_THREAD_HPP_INCLUDED__ +#define __XS_IO_THREAD_HPP_INCLUDED__ + +#include + +#include "fd.hpp" +#include "clock.hpp" +#include "object.hpp" +#include "mailbox.hpp" +#include "atomic_counter.hpp" + +namespace xs +{ + + class ctx_t; + + // Handle of a file descriptor within a pollset. + typedef void* handle_t; + + // Virtual interface to be exposed by object that want to be notified + // about events on file descriptors. + + struct i_poll_events + { + virtual ~i_poll_events () {} + + // Called by I/O thread when file descriptor is ready for reading. + virtual void in_event (fd_t fd_) = 0; + + // Called by I/O thread when file descriptor is ready for writing. + virtual void out_event (fd_t fd_) = 0; + + // Called when timer expires. + virtual void timer_event (handle_t handle_) = 0; + }; + + class io_thread_t : public object_t, public i_poll_events + { + public: + + // Create optimal polling mechanism for this environment. + static io_thread_t *create (xs::ctx_t *ctx_, uint32_t tid_); + + virtual ~io_thread_t (); + + // Returns load of the I/O thread. Note that this function can be + // invoked from a different thread! + int get_load (); + + void start (); + void stop (); + + // Returns mailbox associated with this I/O thread. + mailbox_t *get_mailbox (); + + virtual handle_t add_fd (fd_t fd_, xs::i_poll_events *events_) = 0; + virtual void rm_fd (handle_t handle_) = 0; + virtual void set_pollin (handle_t handle_) = 0; + virtual void reset_pollin (handle_t handle_) = 0; + virtual void set_pollout (handle_t handle_) = 0; + virtual void reset_pollout (handle_t handle_) = 0; + virtual void xstart () = 0; + virtual void xstop () = 0; + + // Add a timeout to expire in timeout_ milliseconds. After the + // expiration timer_event on sink_ object will be called. + handle_t add_timer (int timeout_, xs::i_poll_events *sink_); + + // Cancel the timer identified by the handle. + void rm_timer (handle_t handle_); + + // i_poll_events implementation. + void in_event (fd_t fd_); + void out_event (fd_t fd_); + void timer_event (handle_t handle_); + + protected: + + io_thread_t (xs::ctx_t *ctx_, uint32_t tid_); + + // Called by individual io_thread implementations to manage the load. + void adjust_load (int amount_); + + // Executes any timers that are due. Returns number of milliseconds + // to wait to match the next timer or 0 meaning "no timers". + uint64_t execute_timers (); + + private: + + void process_stop (); + + // Clock instance private to this I/O thread. + clock_t clock; + + // List of active timers. + struct timer_info_t + { + xs::i_poll_events *sink; + std::multimap ::iterator self; + }; + typedef std::multimap timers_t; + timers_t timers; + + // Load of the I/O thread. Currently the number of file descriptors + // registered. + atomic_counter_t load; + + // I/O thread accesses incoming commands via this mailbox. + mailbox_t mailbox; + + // Handle associated with mailbox' file descriptor. + handle_t mailbox_handle; + + io_thread_t (const io_thread_t&); + const io_thread_t &operator = (const io_thread_t&); + }; + +} + +#endif diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index b65887a..f6a01c3 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -26,7 +26,7 @@ #include #include "stream_engine.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" #include "platform.hpp" #include "random.hpp" #include "err.hpp" @@ -37,7 +37,7 @@ #include #include -xs::ipc_connecter_t::ipc_connecter_t (class poller_base_t *io_thread_, +xs::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, class session_base_t *session_, const options_t &options_, const char *address_, bool wait_) : own_t (io_thread_, options_), diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index 46e260c..e598393 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -34,7 +34,7 @@ namespace xs { - class poller_base_t; + class io_thread_t; class session_base_t; class ipc_connecter_t : public own_t, public io_object_t @@ -43,7 +43,7 @@ namespace xs // If 'delay' is true connecter first waits for a while, then starts // connection process. - ipc_connecter_t (xs::poller_base_t *io_thread_, + ipc_connecter_t (xs::io_thread_t *io_thread_, xs::session_base_t *session_, const options_t &options_, const char *address_, bool delay_); ~ipc_connecter_t (); @@ -94,7 +94,7 @@ namespace xs fd_t s; // Handle corresponding to the listening socket or NULL if the socket - // is not registered with the poller. + // is not registered with the io_thread. handle_t handle; // If true, connecter is waiting a while before trying to connect. diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index bd55cc7..db49528 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -28,7 +28,7 @@ #include "stream_engine.hpp" #include "ipc_address.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" #include "session_base.hpp" #include "config.hpp" #include "err.hpp" @@ -39,7 +39,7 @@ #include #include -xs::ipc_listener_t::ipc_listener_t (poller_base_t *io_thread_, +xs::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_, socket_base_t *socket_, const options_t &options_) : own_t (io_thread_, options_), io_object_t (io_thread_), @@ -83,7 +83,7 @@ void xs::ipc_listener_t::in_event (fd_t fd_) // Choose I/O thread to run connecter in. Given that we are already // running in an I/O thread, there must be at least one available. - poller_base_t *io_thread = choose_io_thread (options.affinity); + io_thread_t *io_thread = choose_io_thread (options.affinity); xs_assert (io_thread); // Create and launch a session object. diff --git a/src/ipc_listener.hpp b/src/ipc_listener.hpp index 24b96fb..b599bff 100644 --- a/src/ipc_listener.hpp +++ b/src/ipc_listener.hpp @@ -35,14 +35,14 @@ namespace xs { - class poller_base_t; + class io_thread_t; class socket_base_t; class ipc_listener_t : public own_t, public io_object_t { public: - ipc_listener_t (xs::poller_base_t *io_thread_, + ipc_listener_t (xs::io_thread_t *io_thread_, xs::socket_base_t *socket_, const options_t &options_); ~ipc_listener_t (); diff --git a/src/kqueue.cpp b/src/kqueue.cpp index 6a193df..1a0ee4d 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -45,7 +45,7 @@ #endif xs::kqueue_t::kqueue_t (xs::ctx_t *ctx_, uint32_t tid_) : - poller_base_t (ctx_, tid_), + io_thread_t (ctx_, tid_), stopping (false) { // Create event queue diff --git a/src/kqueue.hpp b/src/kqueue.hpp index f3350b9..dea84ac 100644 --- a/src/kqueue.hpp +++ b/src/kqueue.hpp @@ -29,7 +29,7 @@ #include "fd.hpp" #include "thread.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" namespace xs { @@ -39,14 +39,14 @@ namespace xs // Implements socket polling mechanism using the BSD-specific // kqueue interface. - class kqueue_t : public poller_base_t + class kqueue_t : public io_thread_t { public: kqueue_t (xs::ctx_t *ctx_, uint32_t tid_); ~kqueue_t (); - // "poller" concept. + // Implementation of virtual functions from io_thread_t. handle_t add_fd (fd_t fd_, xs::i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); diff --git a/src/monitor.cpp b/src/monitor.cpp index 14a31f2..b29c874 100644 --- a/src/monitor.cpp +++ b/src/monitor.cpp @@ -19,12 +19,12 @@ */ #include "monitor.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" #include "options.hpp" #include "random.hpp" #include "err.hpp" -xs::monitor_t::monitor_t (xs::poller_base_t *io_thread_) : +xs::monitor_t::monitor_t (xs::io_thread_t *io_thread_) : own_t (io_thread_, options_t ()), io_object_t (io_thread_), timer (NULL) diff --git a/src/monitor.hpp b/src/monitor.hpp index 91d441a..db6f74b 100644 --- a/src/monitor.hpp +++ b/src/monitor.hpp @@ -30,14 +30,14 @@ namespace xs { - class poller_base_t; + class io_thread_t; class socket_base_t; class monitor_t : public own_t, public io_object_t { public: - monitor_t (xs::poller_base_t *io_thread_); + monitor_t (xs::io_thread_t *io_thread_); ~monitor_t (); void start (); @@ -51,7 +51,7 @@ namespace xs void process_plug (); void process_stop (); - // Events from the poller. + // Events from the I/O thread. void timer_event (handle_t handle_); // Actual monitoring data to send and the related critical section. diff --git a/src/object.cpp b/src/object.cpp index df279f5..2d1f5aa 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -26,7 +26,7 @@ #include "ctx.hpp" #include "err.hpp" #include "pipe.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" #include "session_base.hpp" #include "socket_base.hpp" @@ -149,7 +149,7 @@ void xs::object_t::destroy_socket (socket_base_t *socket_) ctx->destroy_socket (socket_); } -xs::poller_base_t *xs::object_t::choose_io_thread (uint64_t affinity_) +xs::io_thread_t *xs::object_t::choose_io_thread (uint64_t affinity_) { return ctx->choose_io_thread (affinity_); } diff --git a/src/object.hpp b/src/object.hpp index 14dad49..b1bc0c3 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -34,7 +34,7 @@ namespace xs class pipe_t; class socket_base_t; class session_base_t; - class poller_base_t; + class io_thread_t; class own_t; // Base class for all objects that participate in inter-thread @@ -62,7 +62,7 @@ namespace xs void destroy_socket (xs::socket_base_t *socket_); // Chooses least loaded I/O thread. - xs::poller_base_t *choose_io_thread (uint64_t affinity_); + xs::io_thread_t *choose_io_thread (uint64_t affinity_); // Logging related functions. void log (int sid_, const char *text_); diff --git a/src/own.cpp b/src/own.cpp index 3047d7b..a60d9d0 100644 --- a/src/own.cpp +++ b/src/own.cpp @@ -20,7 +20,7 @@ #include "own.hpp" #include "err.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" xs::own_t::own_t (class ctx_t *parent_, uint32_t tid_) : object_t (parent_, tid_), @@ -32,7 +32,7 @@ xs::own_t::own_t (class ctx_t *parent_, uint32_t tid_) : { } -xs::own_t::own_t (poller_base_t *io_thread_, const options_t &options_) : +xs::own_t::own_t (io_thread_t *io_thread_, const options_t &options_) : object_t (io_thread_), options (options_), terminating (false), diff --git a/src/own.hpp b/src/own.hpp index de0320a..d0afd62 100644 --- a/src/own.hpp +++ b/src/own.hpp @@ -33,7 +33,7 @@ namespace xs { class ctx_t; - class poller_base_t; + class io_thread_t; // Base class for objects forming a part of ownership hierarchy. // It handles initialisation and destruction of such objects. @@ -50,7 +50,7 @@ namespace xs own_t (xs::ctx_t *parent_, uint32_t tid_); // The object is living within I/O thread. - own_t (xs::poller_base_t *io_thread_, const options_t &options_); + own_t (xs::io_thread_t *io_thread_, const options_t &options_); // When another owned object wants to send command to this object // it calls this function to let it know it should not shut down diff --git a/src/pair.cpp b/src/pair.cpp index c35084f..dd14d8d 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -117,7 +117,7 @@ bool xs::pair_t::xhas_out () return result; } -xs::pair_session_t::pair_session_t (poller_base_t *io_thread_, bool connect_, +xs::pair_session_t::pair_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : session_base_t (io_thread_, connect_, socket_, options_, protocol_, diff --git a/src/pair.hpp b/src/pair.hpp index 658e8e9..8ae3acf 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -63,7 +63,7 @@ namespace xs { public: - pair_session_t (xs::poller_base_t *io_thread_, bool connect_, + pair_session_t (xs::io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~pair_session_t (); diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 3b29197..b2cc513 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -37,7 +37,7 @@ #include "wire.hpp" #include "err.hpp" -xs::pgm_receiver_t::pgm_receiver_t (class poller_base_t *parent_, +xs::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, const options_t &options_) : io_object_t (parent_), pgm_socket (true, options_), @@ -60,7 +60,7 @@ int xs::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_) return pgm_socket.init (udp_encapsulation_, network_); } -void xs::pgm_receiver_t::plug (poller_base_t *io_thread_, +void xs::pgm_receiver_t::plug (io_thread_t *io_thread_, session_base_t *session_) { // Retrieve PGM fds and start polling. diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 952265e..b8390c1 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -43,7 +43,7 @@ namespace xs { - class poller_base_t; + class io_thread_t; class session_base_t; class pgm_receiver_t : public io_object_t, public i_engine @@ -51,13 +51,13 @@ namespace xs public: - pgm_receiver_t (xs::poller_base_t *parent_, const options_t &options_); + pgm_receiver_t (xs::io_thread_t *parent_, const options_t &options_); ~pgm_receiver_t (); int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. - void plug (xs::poller_base_t *io_thread_, + void plug (xs::io_thread_t *io_thread_, xs::session_base_t *session_); void unplug (); void terminate (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 20d3dba..fe57d4d 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -37,7 +37,7 @@ #include "wire.hpp" #include "stdint.hpp" -xs::pgm_sender_t::pgm_sender_t (poller_base_t *parent_, +xs::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, const options_t &options_) : io_object_t (parent_), encoder (0), @@ -64,7 +64,7 @@ int xs::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) return rc; } -void xs::pgm_sender_t::plug (poller_base_t *io_thread_, +void xs::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) { // Alocate 2 fds for PGM socket. @@ -75,7 +75,7 @@ void xs::pgm_sender_t::plug (poller_base_t *io_thread_, encoder.set_session (session_); - // Fill fds from PGM transport and add them to the poller. + // Fill fds from PGM transport and add them to the I/O thread. pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, &rdata_notify_fd, &pending_notify_fd); @@ -204,7 +204,7 @@ void xs::pgm_sender_t::out_event (fd_t fd_) void xs::pgm_sender_t::timer_event (handle_t handle_) { - // Timer cancels on return by poller_base. + // Timer cancels on return by io_thread. if (handle_ == rx_timer) { rx_timer = NULL; in_event (retired_fd); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index be6476b..000f5ba 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -41,7 +41,7 @@ namespace xs { - class poller_base_t; + class io_thread_t; class session_base_t; class pgm_sender_t : public io_object_t, public i_engine @@ -49,13 +49,13 @@ namespace xs public: - pgm_sender_t (xs::poller_base_t *parent_, const options_t &options_); + pgm_sender_t (xs::io_thread_t *parent_, const options_t &options_); ~pgm_sender_t (); int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. - void plug (xs::poller_base_t *io_thread_, + void plug (xs::io_thread_t *io_thread_, xs::session_base_t *session_); void unplug (); void terminate (); diff --git a/src/poll.cpp b/src/poll.cpp index 622a0ea..2cb7669 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -33,7 +33,7 @@ #include "config.hpp" xs::poll_t::poll_t (xs::ctx_t *ctx_, uint32_t tid_) : - poller_base_t (ctx_, tid_), + io_thread_t (ctx_, tid_), retired (false), stopping (false) { diff --git a/src/poll.hpp b/src/poll.hpp index 3d5157e..ec06671 100644 --- a/src/poll.hpp +++ b/src/poll.hpp @@ -32,7 +32,7 @@ #include "fd.hpp" #include "thread.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" namespace xs { @@ -42,14 +42,14 @@ namespace xs // Implements socket polling mechanism using the POSIX.1-2001 // poll() system call. - class poll_t : public poller_base_t + class poll_t : public io_thread_t { public: poll_t (xs::ctx_t *ctx_, uint32_t tid_); ~poll_t (); - // "poller" concept. + // Implementation of virtual functions from io_thread_t. handle_t add_fd (fd_t fd_, xs::i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); diff --git a/src/poller_base.cpp b/src/poller_base.cpp deleted file mode 100644 index 913a8d7..0000000 --- a/src/poller_base.cpp +++ /dev/null @@ -1,174 +0,0 @@ -/* - Copyright (c) 2010-2012 250bpm s.r.o. - Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file - - This file is part of Crossroads project. - - Crossroads is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - Crossroads is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "poller_base.hpp" -#include "err.hpp" - -#include "select.hpp" -#include "poll.hpp" -#include "epoll.hpp" -#include "devpoll.hpp" -#include "kqueue.hpp" - -xs::poller_base_t *xs::poller_base_t::create (xs::ctx_t *ctx_, uint32_t tid_) -{ - poller_base_t *result; -#if defined XS_HAVE_SELECT - result = new (std::nothrow) select_t (ctx_, tid_); -#elif defined XS_HAVE_POLL - result = new (std::nothrow) poll_t (ctx_, tid_); -#elif defined XS_HAVE_EPOLL - result = new (std::nothrow) epoll_t (ctx_, tid_); -#elif defined XS_HAVE_DEVPOLL - result = new (std::nothrow) devpoll_t (ctx_, tid_); -#elif defined XS_HAVE_KQUEUE - result = new (std::nothrow) kqueue_t (ctx_, tid_); -#endif - alloc_assert (result); - return result; -} - -xs::poller_base_t::poller_base_t (xs::ctx_t *ctx_, uint32_t tid_) : - object_t (ctx_, tid_) -{ -} - -xs::poller_base_t::~poller_base_t () -{ -} - -void xs::poller_base_t::start () -{ - mailbox_handle = add_fd (mailbox.get_fd (), this); - set_pollin (mailbox_handle); - xstart (); -} - -void xs::poller_base_t::stop () -{ - // Ask the I/O thread to stop. - send_stop (); -} - -void xs::poller_base_t::process_stop () -{ - rm_fd (mailbox_handle); - xstop (); -} - -xs::mailbox_t *xs::poller_base_t::get_mailbox () -{ - return &mailbox; -} - -int xs::poller_base_t::get_load () -{ - return load.get (); -} - -void xs::poller_base_t::adjust_load (int amount_) -{ - if (amount_ > 0) - load.add (amount_); - else if (amount_ < 0) - load.sub (-amount_); -} - -xs::handle_t xs::poller_base_t::add_timer (int timeout_, i_poll_events *sink_) -{ - uint64_t expiration = clock.now_ms () + timeout_; - timer_info_t info = {sink_, timers_t::iterator ()}; - timers_t::iterator it = timers.insert ( - timers_t::value_type (expiration, info)); - it->second.self = it; - return (handle_t) &(it->second); -} - -void xs::poller_base_t::rm_timer (handle_t handle_) -{ - timer_info_t *info = (timer_info_t*) handle_; - timers.erase (info->self); -} - -uint64_t xs::poller_base_t::execute_timers () -{ - // Fast track. - if (timers.empty ()) - return 0; - - // Get the current time. - uint64_t current = clock.now_ms (); - - // Execute the timers that are already due. - timers_t::iterator it = timers.begin (); - while (it != timers.end ()) { - - // If we have to wait to execute the item, same will be true about - // all the following items (multimap is sorted). Thus we can stop - // checking the subsequent timers and return the time to wait for - // the next timer (at least 1ms). - if (it->first > current) - return it->first - current; - - // Trigger the timer. - it->second.sink->timer_event ((handle_t) &it->second); - - // Remove it from the list of active timers. - timers_t::iterator o = it; - ++it; - timers.erase (o); - } - - // There are no more timers. - return 0; -} - -void xs::poller_base_t::in_event (fd_t fd_) -{ - // TODO: Do we want to limit number of commands I/O thread can - // process in a single go? - - while (true) { - - // Get the next command. If there is none, exit. - command_t cmd; - int rc = mailbox.recv (&cmd, 0); - if (rc != 0 && errno == EINTR) - continue; - if (rc != 0 && errno == EAGAIN) - break; - errno_assert (rc == 0); - - // Process the command. - cmd.destination->process_command (cmd); - } -} - -void xs::poller_base_t::out_event (fd_t fd_) -{ - // We are never polling for POLLOUT here. This function is never called. - xs_assert (false); -} - -void xs::poller_base_t::timer_event (handle_t handle_) -{ - // No timers here. This function is never called. - xs_assert (false); -} diff --git a/src/poller_base.hpp b/src/poller_base.hpp deleted file mode 100644 index 2388fad..0000000 --- a/src/poller_base.hpp +++ /dev/null @@ -1,140 +0,0 @@ -/* - Copyright (c) 2010-2012 250bpm s.r.o. - Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file - - This file is part of Crossroads project. - - Crossroads is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - Crossroads is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __XS_POLLER_BASE_HPP_INCLUDED__ -#define __XS_POLLER_BASE_HPP_INCLUDED__ - -#include - -#include "fd.hpp" -#include "clock.hpp" -#include "object.hpp" -#include "mailbox.hpp" -#include "atomic_counter.hpp" - -namespace xs -{ - - class ctx_t; - - // Handle of a file descriptor within a pollset. - typedef void* handle_t; - - // Virtual interface to be exposed by object that want to be notified - // about events on file descriptors. - - struct i_poll_events - { - virtual ~i_poll_events () {} - - // Called by I/O thread when file descriptor is ready for reading. - virtual void in_event (fd_t fd_) = 0; - - // Called by I/O thread when file descriptor is ready for writing. - virtual void out_event (fd_t fd_) = 0; - - // Called when timer expires. - virtual void timer_event (handle_t handle_) = 0; - }; - - class poller_base_t : public object_t, public i_poll_events - { - public: - - // Create optimal poller mechanism for this environment. - static poller_base_t *create (xs::ctx_t *ctx_, uint32_t tid_); - - virtual ~poller_base_t (); - - // Returns load of the poller. Note that this function can be - // invoked from a different thread! - int get_load (); - - void start (); - void stop (); - - // Returns mailbox associated with this I/O poller. - mailbox_t *get_mailbox (); - - virtual handle_t add_fd (fd_t fd_, xs::i_poll_events *events_) = 0; - virtual void rm_fd (handle_t handle_) = 0; - virtual void set_pollin (handle_t handle_) = 0; - virtual void reset_pollin (handle_t handle_) = 0; - virtual void set_pollout (handle_t handle_) = 0; - virtual void reset_pollout (handle_t handle_) = 0; - virtual void xstart () = 0; - virtual void xstop () = 0; - - // Add a timeout to expire in timeout_ milliseconds. After the - // expiration timer_event on sink_ object will be called. - handle_t add_timer (int timeout_, xs::i_poll_events *sink_); - - // Cancel the timer identified by the handle. - void rm_timer (handle_t handle_); - - // i_poll_events implementation. - void in_event (fd_t fd_); - void out_event (fd_t fd_); - void timer_event (handle_t handle_); - - protected: - - poller_base_t (xs::ctx_t *ctx_, uint32_t tid_); - - // Called by individual poller implementations to manage the load. - void adjust_load (int amount_); - - // Executes any timers that are due. Returns number of milliseconds - // to wait to match the next timer or 0 meaning "no timers". - uint64_t execute_timers (); - - private: - - void process_stop (); - - // Clock instance private to this I/O thread. - clock_t clock; - - // List of active timers. - struct timer_info_t - { - xs::i_poll_events *sink; - std::multimap ::iterator self; - }; - typedef std::multimap timers_t; - timers_t timers; - - // Load of the poller. Currently the number of file descriptors - // registered. - atomic_counter_t load; - - // I/O thread accesses incoming commands via this mailbox. - mailbox_t mailbox; - - // Handle associated with mailbox' file descriptor. - handle_t mailbox_handle; - - poller_base_t (const poller_base_t&); - const poller_base_t &operator = (const poller_base_t&); - }; - -} - -#endif diff --git a/src/pub.cpp b/src/pub.cpp index db285b8..aa0ed8e 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -44,7 +44,7 @@ bool xs::pub_t::xhas_in () return false; } -xs::pub_session_t::pub_session_t (poller_base_t *io_thread_, bool connect_, +xs::pub_session_t::pub_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : xpub_session_t (io_thread_, connect_, socket_, options_, protocol_, diff --git a/src/pub.hpp b/src/pub.hpp index 45b510e..49aecc0 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -28,7 +28,7 @@ namespace xs { class ctx_t; - class poller_base_t; + class io_thread_t; class socket_base_t; class msg_t; @@ -53,7 +53,7 @@ namespace xs { public: - pub_session_t (xs::poller_base_t *io_thread_, bool connect_, + pub_session_t (xs::io_thread_t *io_thread_, bool connect_, xs::socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~pub_session_t (); diff --git a/src/pull.cpp b/src/pull.cpp index 923c85b..e168c03 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -60,7 +60,7 @@ bool xs::pull_t::xhas_in () return fq.has_in (); } -xs::pull_session_t::pull_session_t (poller_base_t *io_thread_, bool connect_, +xs::pull_session_t::pull_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : session_base_t (io_thread_, connect_, socket_, options_, protocol_, diff --git a/src/pull.hpp b/src/pull.hpp index a02ef2d..29ca4d9 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -32,7 +32,7 @@ namespace xs class ctx_t; class pipe_t; class msg_t; - class poller_base_t; + class io_thread_t; class pull_t : public socket_base_t @@ -65,7 +65,7 @@ namespace xs { public: - pull_session_t (xs::poller_base_t *io_thread_, bool connect_, + pull_session_t (xs::io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~pull_session_t (); diff --git a/src/push.cpp b/src/push.cpp index ffc2d6e..45f0c62 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -60,7 +60,7 @@ bool xs::push_t::xhas_out () return lb.has_out (); } -xs::push_session_t::push_session_t (poller_base_t *io_thread_, bool connect_, +xs::push_session_t::push_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : session_base_t (io_thread_, connect_, socket_, options_, protocol_, diff --git a/src/push.hpp b/src/push.hpp index f6102c6..85c2279 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -32,7 +32,7 @@ namespace xs class ctx_t; class pipe_t; class msg_t; - class poller_base_t; + class io_thread_t; class push_t : public socket_base_t @@ -64,7 +64,7 @@ namespace xs { public: - push_session_t (xs::poller_base_t *io_thread_, bool connect_, + push_session_t (xs::io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~push_session_t (); diff --git a/src/reaper.cpp b/src/reaper.cpp index ffce0a9..7ce9e04 100644 --- a/src/reaper.cpp +++ b/src/reaper.cpp @@ -27,16 +27,16 @@ xs::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) : sockets (0), terminating (false) { - poller = poller_base_t::create (ctx_, tid_); - xs_assert (poller); + io_thread = io_thread_t::create (ctx_, tid_); + xs_assert (io_thread); - mailbox_handle = poller->add_fd (mailbox.get_fd (), this); - poller->set_pollin (mailbox_handle); + mailbox_handle = io_thread->add_fd (mailbox.get_fd (), this); + io_thread->set_pollin (mailbox_handle); } xs::reaper_t::~reaper_t () { - delete poller; + delete io_thread; } xs::mailbox_t *xs::reaper_t::get_mailbox () @@ -47,7 +47,7 @@ xs::mailbox_t *xs::reaper_t::get_mailbox () void xs::reaper_t::start () { // Start the thread. - poller->start (); + io_thread->start (); } void xs::reaper_t::stop () @@ -90,15 +90,15 @@ void xs::reaper_t::process_stop () // If there are no sockets being reaped finish immediately. if (!sockets) { send_done (); - poller->rm_fd (mailbox_handle); - poller->stop (); + io_thread->rm_fd (mailbox_handle); + io_thread->stop (); } } void xs::reaper_t::process_reap (socket_base_t *socket_) { - // Add the socket to the poller. - socket_->start_reaping (poller); + // Add the socket to the I/O thread. + socket_->start_reaping (io_thread); ++sockets; } @@ -111,7 +111,7 @@ void xs::reaper_t::process_reaped () // finish immediately. if (!sockets && terminating) { send_done (); - poller->rm_fd (mailbox_handle); - poller->stop (); + io_thread->rm_fd (mailbox_handle); + io_thread->stop (); } } diff --git a/src/reaper.hpp b/src/reaper.hpp index bc40fac..0e7618b 100644 --- a/src/reaper.hpp +++ b/src/reaper.hpp @@ -23,7 +23,7 @@ #include "object.hpp" #include "mailbox.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" namespace xs { @@ -61,8 +61,8 @@ namespace xs // Handle associated with mailbox' file descriptor. handle_t mailbox_handle; - // I/O multiplexing is performed using a poller object. - poller_base_t *poller; + // I/O multiplexing is performed using an io_thread object. + io_thread_t *io_thread; // Number of sockets being reaped at the moment. int sockets; diff --git a/src/rep.cpp b/src/rep.cpp index 5e5e415..6454b57 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -112,7 +112,7 @@ bool xs::rep_t::xhas_out () return xrep_t::xhas_out (); } -xs::rep_session_t::rep_session_t (poller_base_t *io_thread_, bool connect_, +xs::rep_session_t::rep_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : xrep_session_t (io_thread_, connect_, socket_, options_, protocol_, diff --git a/src/rep.hpp b/src/rep.hpp index 2cd3931..0b608ee 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -29,7 +29,7 @@ namespace xs class ctx_t; class msg_t; - class poller_base_t; + class io_thread_t; class socket_base_t; class rep_t : public xrep_t @@ -64,7 +64,7 @@ namespace xs { public: - rep_session_t (xs::poller_base_t *io_thread_, bool connect_, + rep_session_t (xs::io_thread_t *io_thread_, bool connect_, xs::socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~rep_session_t (); diff --git a/src/req.cpp b/src/req.cpp index 6e9867c..af6c8cd 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -137,7 +137,7 @@ bool xs::req_t::xhas_out () return xreq_t::xhas_out (); } -xs::req_session_t::req_session_t (poller_base_t *io_thread_, bool connect_, +xs::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : xreq_session_t (io_thread_, connect_, socket_, options_, protocol_, diff --git a/src/req.hpp b/src/req.hpp index 8da789f..46015e8 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -31,7 +31,7 @@ namespace xs class ctx_t; class msg_t; - class poller_base_t; + class io_thread_t; class socket_base_t; class req_t : public xreq_t @@ -65,7 +65,7 @@ namespace xs { public: - req_session_t (xs::poller_base_t *io_thread_, bool connect_, + req_session_t (xs::io_thread_t *io_thread_, bool connect_, xs::socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~req_session_t (); diff --git a/src/select.cpp b/src/select.cpp index 1b63685..af5f0f7 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -44,7 +44,7 @@ #include "config.hpp" xs::select_t::select_t (xs::ctx_t *ctx_, uint32_t tid_) : - poller_base_t (ctx_, tid_), + io_thread_t (ctx_, tid_), maxfd (retired_fd), retired (false), stopping (false) diff --git a/src/select.hpp b/src/select.hpp index 283cc52..b1b96ef 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -42,7 +42,7 @@ #include "fd.hpp" #include "thread.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" namespace xs { @@ -52,14 +52,14 @@ namespace xs // Implements socket polling mechanism using POSIX.1-2001 select() // function. - class select_t : public poller_base_t + class select_t : public io_thread_t { public: select_t (xs::ctx_t *ctx_, uint32_t tid_); ~select_t (); - // "poller" concept. + // Implementation of virtual functions from io_thread_t. handle_t add_fd (fd_t fd_, xs::i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); diff --git a/src/session_base.cpp b/src/session_base.cpp index 168501c..efc4f8b 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -43,7 +43,7 @@ #include "pull.hpp" #include "pair.hpp" -xs::session_base_t *xs::session_base_t::create (class poller_base_t *io_thread_, +xs::session_base_t *xs::session_base_t::create (class io_thread_t *io_thread_, bool connect_, class socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) { @@ -100,7 +100,7 @@ xs::session_base_t *xs::session_base_t::create (class poller_base_t *io_thread_, return s; } -xs::session_base_t::session_base_t (class poller_base_t *io_thread_, +xs::session_base_t::session_base_t (class io_thread_t *io_thread_, bool connect_, class socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : own_t (io_thread_, options_), @@ -387,7 +387,7 @@ void xs::session_base_t::start_connecting (bool wait_) // Choose I/O thread to run connecter in. Given that we are already // running in an I/O thread, there must be at least one available. - poller_base_t *io_thread = choose_io_thread (options.affinity); + io_thread_t *io_thread = choose_io_thread (options.affinity); xs_assert (io_thread); // Create the connecter object. diff --git a/src/session_base.hpp b/src/session_base.hpp index 6a461d8..45ad8f8 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -33,7 +33,7 @@ namespace xs { class pipe_t; - class poller_base_t; + class io_thread_t; class socket_base_t; struct i_engine; @@ -45,7 +45,7 @@ namespace xs public: // Create a session of the particular type. - static session_base_t *create (xs::poller_base_t *io_thread_, + static session_base_t *create (xs::io_thread_t *io_thread_, bool connect_, xs::socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); @@ -67,7 +67,7 @@ namespace xs protected: - session_base_t (xs::poller_base_t *io_thread_, bool connect_, + session_base_t (xs::io_thread_t *io_thread_, bool connect_, xs::socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~session_base_t (); @@ -116,7 +116,7 @@ namespace xs // I/O thread the session is living in. It will be used to plug in // the engines into the same thread. - xs::poller_base_t *io_thread; + xs::io_thread_t *io_thread; // If true, identity is to be sent/recvd from the network. bool send_identity; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 492d027..7ab00ea 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -39,7 +39,7 @@ #include "tcp_listener.hpp" #include "ipc_listener.hpp" #include "tcp_connecter.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" #include "session_base.hpp" #include "config.hpp" #include "clock.hpp" @@ -338,7 +338,7 @@ int xs::socket_base_t::bind (const char *addr_) // Remaining trasnports require to be run in an I/O thread, so at this // point we'll choose one. - poller_base_t *io_thread = choose_io_thread (options.affinity); + io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { errno = EMTHREAD; return -1; @@ -449,7 +449,7 @@ int xs::socket_base_t::connect (const char *addr_) } // Choose the I/O thread to run the session in. - poller_base_t *io_thread = choose_io_thread (options.affinity); + io_thread_t *io_thread = choose_io_thread (options.affinity); if (!io_thread) { errno = EMTHREAD; return -1; @@ -667,12 +667,12 @@ bool xs::socket_base_t::has_out () return ret; } -void xs::socket_base_t::start_reaping (poller_base_t *poller_) +void xs::socket_base_t::start_reaping (io_thread_t *io_thread_) { // Plug the socket to the reaper thread. - poller = poller_; - handle = poller->add_fd (mailbox.get_fd (), this); - poller->set_pollin (handle); + io_thread = io_thread_; + handle = io_thread->add_fd (mailbox.get_fd (), this); + io_thread->set_pollin (handle); // Initialise the termination and check whether it can be deallocated // immediately. @@ -843,8 +843,8 @@ void xs::socket_base_t::check_destroy () // If the object was already marked as destroyed, finish the deallocation. if (destroyed) { - // Remove the socket from the reaper's poller. - poller->rm_fd (handle); + // Remove the socket from the reaper's I/O thread. + io_thread->rm_fd (handle); // Remove the socket from the context. destroy_socket (this); diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 8b9a948..dd1fee7 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -28,7 +28,7 @@ #include "own.hpp" #include "array.hpp" #include "stdint.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" #include "atomic_counter.hpp" #include "mailbox.hpp" #include "stdint.hpp" @@ -83,12 +83,12 @@ namespace xs bool has_in (); bool has_out (); - // Using this function reaper thread ask the socket to regiter with - // its poller. - void start_reaping (poller_base_t *poller_); + // Using this function reaper thread asks the socket to regiter with + // its I/O thread. + void start_reaping (io_thread_t *io_thread_); // i_poll_events implementation. This interface is used when socket - // is handled by the poller in the reaper thread. + // is handled by the io_thread in the reaper thread. void in_event (fd_t fd_); void out_event (fd_t fd_); void timer_event (handle_t handle_); @@ -188,8 +188,8 @@ namespace xs typedef array_t pipes_t; pipes_t pipes; - // Reaper's poller and handle of this socket within it. - poller_base_t *poller; + // Reaper's io_thread and handle of this socket within it. + io_thread_t *io_thread; handle_t handle; // Timestamp of when commands were processed the last time. diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 7d69848..71032f4 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -36,7 +36,7 @@ #include #include "stream_engine.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" #include "session_base.hpp" #include "config.hpp" #include "err.hpp" @@ -103,7 +103,7 @@ xs::stream_engine_t::~stream_engine_t () } } -void xs::stream_engine_t::plug (poller_base_t *io_thread_, +void xs::stream_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) { xs_assert (!plugged); @@ -117,7 +117,7 @@ void xs::stream_engine_t::plug (poller_base_t *io_thread_, decoder.set_session (session_); session = session_; - // Connect to I/O threads poller object. + // Connect to the io_thread object. io_object_t::plug (io_thread_); handle = add_fd (s); set_pollin (handle); @@ -135,7 +135,7 @@ void xs::stream_engine_t::unplug () // Cancel all fd subscriptions. rm_fd (handle); - // Disconnect from I/O threads poller object. + // Disconnect from the io_thread object. io_object_t::unplug (); // Disconnect from session object. diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 9060cd7..f246552 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -34,7 +34,7 @@ namespace xs { - class poller_base_t; + class io_thread_t; class session_base_t; // This engine handles any socket with SOCK_STREAM semantics, @@ -48,7 +48,7 @@ namespace xs ~stream_engine_t (); // i_engine interface implementation. - void plug (xs::poller_base_t *io_thread_, + void plug (xs::io_thread_t *io_thread_, xs::session_base_t *session_); void unplug (); void terminate (); diff --git a/src/sub.cpp b/src/sub.cpp index 08ce368..ad55ea3 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -80,7 +80,7 @@ bool xs::sub_t::xhas_out () return false; } -xs::sub_session_t::sub_session_t (poller_base_t *io_thread_, bool connect_, +xs::sub_session_t::sub_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : xsub_session_t (io_thread_, connect_, socket_, options_, protocol_, diff --git a/src/sub.hpp b/src/sub.hpp index 9b38822..70ade88 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -29,7 +29,7 @@ namespace xs class ctx_t; class msg_t; - class poller_base_t; + class io_thread_t; class socket_base_t; class sub_t : public xsub_t @@ -55,7 +55,7 @@ namespace xs { public: - sub_session_t (xs::poller_base_t *io_thread_, bool connect_, + sub_session_t (xs::io_thread_t *io_thread_, bool connect_, xs::socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~sub_session_t (); diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 6816e64..5007570 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -24,7 +24,7 @@ #include "tcp_connecter.hpp" #include "stream_engine.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" #include "platform.hpp" #include "random.hpp" #include "err.hpp" @@ -46,7 +46,7 @@ #endif #endif -xs::tcp_connecter_t::tcp_connecter_t (class poller_base_t *io_thread_, +xs::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, class session_base_t *session_, const options_t &options_, const char *address_, bool wait_) : own_t (io_thread_, options_), diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index 2b946a1..6790408 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -31,7 +31,7 @@ namespace xs { - class poller_base_t; + class io_thread_t; class session_base_t; class tcp_connecter_t : public own_t, public io_object_t @@ -40,7 +40,7 @@ namespace xs // If 'delay' is true connecter first waits for a while, then starts // connection process. - tcp_connecter_t (xs::poller_base_t *io_thread_, + tcp_connecter_t (xs::io_thread_t *io_thread_, xs::session_base_t *session_, const options_t &options_, const char *address_, bool delay_); ~tcp_connecter_t (); @@ -91,7 +91,7 @@ namespace xs fd_t s; // Handle corresponding to the listening socket or NULL if the socket - // is not registered with the poller. + // is not registered with the I/O thread. handle_t handle; // If true, connecter is waiting a while before trying to connect. diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index e8900fc..31a9149 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -26,7 +26,7 @@ #include "platform.hpp" #include "tcp_listener.hpp" #include "stream_engine.hpp" -#include "poller_base.hpp" +#include "io_thread.hpp" #include "session_base.hpp" #include "config.hpp" #include "err.hpp" @@ -48,7 +48,7 @@ #include #endif -xs::tcp_listener_t::tcp_listener_t (poller_base_t *io_thread_, +xs::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_, socket_base_t *socket_, const options_t &options_) : own_t (io_thread_, options_), io_object_t (io_thread_), @@ -94,7 +94,7 @@ void xs::tcp_listener_t::in_event (fd_t fd_) // Choose I/O thread to run connecter in. Given that we are already // running in an I/O thread, there must be at least one available. - poller_base_t *io_thread = choose_io_thread (options.affinity); + io_thread_t *io_thread = choose_io_thread (options.affinity); xs_assert (io_thread); // Create and launch a session object. diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp index 22c000f..153f771 100644 --- a/src/tcp_listener.hpp +++ b/src/tcp_listener.hpp @@ -31,14 +31,14 @@ namespace xs { - class poller_base_t; + class io_thread_t; class socket_base_t; class tcp_listener_t : public own_t, public io_object_t { public: - tcp_listener_t (xs::poller_base_t *io_thread_, + tcp_listener_t (xs::io_thread_t *io_thread_, xs::socket_base_t *socket_, const options_t &options_); ~tcp_listener_t (); diff --git a/src/xpub.cpp b/src/xpub.cpp index 6c0f75b..d27337e 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -182,7 +182,7 @@ void xs::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, } } -xs::xpub_session_t::xpub_session_t (poller_base_t *io_thread_, bool connect_, +xs::xpub_session_t::xpub_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : session_base_t (io_thread_, connect_, socket_, options_, protocol_, diff --git a/src/xpub.hpp b/src/xpub.hpp index 2c61c63..a26076e 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -37,7 +37,7 @@ namespace xs class ctx_t; class msg_t; class pipe_t; - class poller_base_t; + class io_thread_t; class xpub_t : public socket_base_t @@ -89,7 +89,7 @@ namespace xs { public: - xpub_session_t (xs::poller_base_t *io_thread_, bool connect_, + xpub_session_t (xs::io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~xpub_session_t (); diff --git a/src/xrep.cpp b/src/xrep.cpp index 4bae2d9..051159d 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -307,7 +307,7 @@ bool xs::xrep_t::xhas_out () return true; } -xs::xrep_session_t::xrep_session_t (poller_base_t *io_thread_, bool connect_, +xs::xrep_session_t::xrep_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : session_base_t (io_thread_, connect_, socket_, options_, protocol_, diff --git a/src/xrep.hpp b/src/xrep.hpp index e5ab895..4b3192a 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -37,7 +37,7 @@ namespace xs class ctx_t; class pipe_t; - class poller_base_t; + class io_thread_t; // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. class xrep_t : @@ -109,7 +109,7 @@ namespace xs { public: - xrep_session_t (xs::poller_base_t *io_thread_, bool connect_, + xrep_session_t (xs::io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~xrep_session_t (); diff --git a/src/xreq.cpp b/src/xreq.cpp index 948ff22..d16f60e 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -115,7 +115,7 @@ void xs::xreq_t::xterminated (pipe_t *pipe_) lb.terminated (pipe_); } -xs::xreq_session_t::xreq_session_t (poller_base_t *io_thread_, bool connect_, +xs::xreq_session_t::xreq_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : session_base_t (io_thread_, connect_, socket_, options_, protocol_, diff --git a/src/xreq.hpp b/src/xreq.hpp index 358b10a..ab3a360 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -32,7 +32,7 @@ namespace xs class ctx_t; class msg_t; class pipe_t; - class poller_base_t; + class io_thread_t; class socket_base_t; class xreq_t : @@ -76,7 +76,7 @@ namespace xs { public: - xreq_session_t (xs::poller_base_t *io_thread_, bool connect_, + xreq_session_t (xs::io_thread_t *io_thread_, bool connect_, xs::socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~xreq_session_t (); diff --git a/src/xsub.cpp b/src/xsub.cpp index ee53b2c..80cc205 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -221,7 +221,7 @@ void xs::xsub_t::send_subscription (unsigned char *data_, size_t size_, xs_assert (sent); } -xs::xsub_session_t::xsub_session_t (poller_base_t *io_thread_, bool connect_, +xs::xsub_session_t::xsub_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_) : session_base_t (io_thread_, connect_, socket_, options_, protocol_, diff --git a/src/xsub.hpp b/src/xsub.hpp index 8769706..f9e5210 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -32,7 +32,7 @@ namespace xs class ctx_t; class pipe_t; - class poller_base_t; + class io_thread_t; class xsub_t : public socket_base_t @@ -91,7 +91,7 @@ namespace xs { public: - xsub_session_t (class poller_base_t *io_thread_, bool connect_, + xsub_session_t (class io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~xsub_session_t (); -- cgit v1.2.3