summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-02-16 10:10:49 +0900
committerMartin Sustrik <sustrik@250bpm.com>2012-02-16 10:10:49 +0900
commit50f225a04422abf64545f5eb36592d8c990b0ae4 (patch)
tree1ad0396c3f32b4e89f80d4abdbf0d9ced6a1e55c /src
parente5e9852181947a9c41db9ff3c47e94d66a933161 (diff)
poller_base_t renamed to io_thread_t
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am4
-rw-r--r--src/ctx.cpp6
-rw-r--r--src/ctx.hpp6
-rw-r--r--src/devpoll.cpp2
-rw-r--r--src/devpoll.hpp6
-rw-r--r--src/epoll.cpp2
-rw-r--r--src/epoll.hpp6
-rw-r--r--src/i_engine.hpp4
-rw-r--r--src/io_object.cpp37
-rw-r--r--src/io_object.hpp12
-rw-r--r--src/io_thread.cpp (renamed from src/poller_base.cpp)34
-rw-r--r--src/io_thread.hpp (renamed from src/poller_base.hpp)26
-rw-r--r--src/ipc_connecter.cpp4
-rw-r--r--src/ipc_connecter.hpp6
-rw-r--r--src/ipc_listener.cpp6
-rw-r--r--src/ipc_listener.hpp4
-rw-r--r--src/kqueue.cpp2
-rw-r--r--src/kqueue.hpp6
-rw-r--r--src/monitor.cpp4
-rw-r--r--src/monitor.hpp6
-rw-r--r--src/object.cpp4
-rw-r--r--src/object.hpp4
-rw-r--r--src/own.cpp4
-rw-r--r--src/own.hpp4
-rw-r--r--src/pair.cpp2
-rw-r--r--src/pair.hpp2
-rw-r--r--src/pgm_receiver.cpp4
-rw-r--r--src/pgm_receiver.hpp6
-rw-r--r--src/pgm_sender.cpp8
-rw-r--r--src/pgm_sender.hpp6
-rw-r--r--src/poll.cpp2
-rw-r--r--src/poll.hpp6
-rw-r--r--src/pub.cpp2
-rw-r--r--src/pub.hpp4
-rw-r--r--src/pull.cpp2
-rw-r--r--src/pull.hpp4
-rw-r--r--src/push.cpp2
-rw-r--r--src/push.hpp4
-rw-r--r--src/reaper.cpp24
-rw-r--r--src/reaper.hpp6
-rw-r--r--src/rep.cpp2
-rw-r--r--src/rep.hpp4
-rw-r--r--src/req.cpp2
-rw-r--r--src/req.hpp4
-rw-r--r--src/select.cpp2
-rw-r--r--src/select.hpp6
-rw-r--r--src/session_base.cpp6
-rw-r--r--src/session_base.hpp8
-rw-r--r--src/socket_base.cpp18
-rw-r--r--src/socket_base.hpp14
-rw-r--r--src/stream_engine.cpp8
-rw-r--r--src/stream_engine.hpp4
-rw-r--r--src/sub.cpp2
-rw-r--r--src/sub.hpp4
-rw-r--r--src/tcp_connecter.cpp4
-rw-r--r--src/tcp_connecter.hpp6
-rw-r--r--src/tcp_listener.cpp6
-rw-r--r--src/tcp_listener.hpp4
-rw-r--r--src/xpub.cpp2
-rw-r--r--src/xpub.hpp4
-rw-r--r--src/xrep.cpp2
-rw-r--r--src/xrep.hpp4
-rw-r--r--src/xreq.cpp2
-rw-r--r--src/xreq.hpp4
-rw-r--r--src/xsub.cpp2
-rw-r--r--src/xsub.hpp4
66 files changed, 204 insertions, 207 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 7d3ee8d..9f11611 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -23,6 +23,7 @@ libxs_la_SOURCES = \
fd.hpp \
fq.hpp \
io_object.hpp \
+ io_thread.hpp \
ip.hpp \
ipc_address.hpp \
ipc_connecter.hpp \
@@ -45,7 +46,6 @@ libxs_la_SOURCES = \
pipe.hpp \
platform.hpp \
poll.hpp \
- poller_base.hpp \
pair.hpp \
pub.hpp \
pull.hpp \
@@ -85,6 +85,7 @@ libxs_la_SOURCES = \
err.cpp \
fq.cpp \
io_object.cpp \
+ io_thread.cpp \
ip.cpp \
ipc_address.cpp \
ipc_connecter.cpp \
@@ -104,7 +105,6 @@ libxs_la_SOURCES = \
pgm_socket.cpp \
pipe.cpp \
poll.cpp \
- poller_base.cpp \
pull.cpp \
push.cpp \
reaper.cpp \
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 8b27b31..789cdc2 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -197,7 +197,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_)
// Create I/O thread objects and launch them.
for (uint32_t i = 2; i != io_thread_count + 2; i++) {
- poller_base_t *io_thread = poller_base_t::create (this, i);
+ io_thread_t *io_thread = io_thread_t::create (this, i);
errno_assert (io_thread);
io_threads.push_back (io_thread);
slots [i] = io_thread->get_mailbox ();
@@ -226,7 +226,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_)
#endif
// Create the monitor object.
- poller_base_t *io_thread = choose_io_thread (0);
+ io_thread_t *io_thread = choose_io_thread (0);
xs_assert (io_thread);
monitor = new (std::nothrow) monitor_t (io_thread);
alloc_assert (monitor);
@@ -301,7 +301,7 @@ void xs::ctx_t::send_command (uint32_t tid_, const command_t &command_)
slots [tid_]->send (command_);
}
-xs::poller_base_t *xs::ctx_t::choose_io_thread (uint64_t affinity_)
+xs::io_thread_t *xs::ctx_t::choose_io_thread (uint64_t affinity_)
{
if (io_threads.empty ())
return NULL;
diff --git a/src/ctx.hpp b/src/ctx.hpp
index c12fffc..e912443 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -40,7 +40,7 @@ namespace xs
class object_t;
class monitor_t;
- class poller_base_t;
+ class io_thread_t;
class socket_base_t;
class reaper_t;
@@ -86,7 +86,7 @@ namespace xs
// Returns the I/O thread that is the least busy at the moment.
// Affinity specifies which I/O threads are eligible (0 = all).
// Returns NULL is no I/O thread is available.
- xs::poller_base_t *choose_io_thread (uint64_t affinity_);
+ xs::io_thread_t *choose_io_thread (uint64_t affinity_);
// Returns reaper thread object.
xs::object_t *get_reaper ();
@@ -142,7 +142,7 @@ namespace xs
xs::reaper_t *reaper;
// I/O threads.
- typedef std::vector <xs::poller_base_t*> io_threads_t;
+ typedef std::vector <xs::io_thread_t*> io_threads_t;
io_threads_t io_threads;
// Array of pointers to mailboxes for both application and I/O threads.
diff --git a/src/devpoll.cpp b/src/devpoll.cpp
index e85b765..936d6c2 100644
--- a/src/devpoll.cpp
+++ b/src/devpoll.cpp
@@ -38,7 +38,7 @@
#include "config.hpp"
xs::devpoll_t::devpoll_t (xs::ctx_t *ctx_, uint32_t tid_) :
- poller_base_t (ctx_, tid_),
+ io_thread_t (ctx_, tid_),
stopping (false)
{
devpoll_fd = open ("/dev/poll", O_RDWR);
diff --git a/src/devpoll.hpp b/src/devpoll.hpp
index b32aa28..3947f3e 100644
--- a/src/devpoll.hpp
+++ b/src/devpoll.hpp
@@ -30,7 +30,7 @@
#include "fd.hpp"
#include "thread.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
namespace xs
{
@@ -39,14 +39,14 @@ namespace xs
// Implements socket polling mechanism using the "/dev/poll" interface.
- class devpoll_t : public poller_base_t
+ class devpoll_t : public io_thread_t
{
public:
devpoll_t (xs::ctx_t *ctx_, uint32_t tid_);
~devpoll_t ();
- // "poller" concept.
+ // Implementation of virtual functions from io_thread_t.
handle_t add_fd (fd_t fd_, xs::i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
diff --git a/src/epoll.cpp b/src/epoll.cpp
index 7ca4608..aa13b31 100644
--- a/src/epoll.cpp
+++ b/src/epoll.cpp
@@ -35,7 +35,7 @@
#include "err.hpp"
xs::epoll_t::epoll_t (xs::ctx_t *ctx_, uint32_t tid_) :
- poller_base_t (ctx_, tid_),
+ io_thread_t (ctx_, tid_),
stopping (false)
{
epoll_fd = epoll_create (1);
diff --git a/src/epoll.hpp b/src/epoll.hpp
index 232d049..ad94120 100644
--- a/src/epoll.hpp
+++ b/src/epoll.hpp
@@ -31,7 +31,7 @@
#include "fd.hpp"
#include "thread.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
namespace xs
{
@@ -42,14 +42,14 @@ namespace xs
// This class implements socket polling mechanism using the Linux-specific
// epoll mechanism.
- class epoll_t : public poller_base_t
+ class epoll_t : public io_thread_t
{
public:
epoll_t (xs::ctx_t *ctx_, uint32_t tid_);
~epoll_t ();
- // "poller" concept.
+ // Implementation of virtual functions from io_thread_t.
handle_t add_fd (fd_t fd_, xs::i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index 9106411..fc03f0e 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -25,7 +25,7 @@
namespace xs
{
- class poller_base_t;
+ class io_thread_t;
// Abstract interface to be implemented by various engines.
@@ -34,7 +34,7 @@ namespace xs
virtual ~i_engine () {}
// Plug the engine to the session.
- virtual void plug (xs::poller_base_t *io_thread_,
+ virtual void plug (xs::io_thread_t *io_thread_,
class session_base_t *session_) = 0;
// Unplug the engine from the session.
diff --git a/src/io_object.cpp b/src/io_object.cpp
index 0c73571..c17ff04 100644
--- a/src/io_object.cpp
+++ b/src/io_object.cpp
@@ -20,11 +20,11 @@
*/
#include "io_object.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
#include "err.hpp"
-xs::io_object_t::io_object_t (poller_base_t *io_thread_) :
- poller (NULL)
+xs::io_object_t::io_object_t (io_thread_t *io_thread_) :
+ io_thread (NULL)
{
if (io_thread_)
plug (io_thread_);
@@ -34,62 +34,59 @@ xs::io_object_t::~io_object_t ()
{
}
-void xs::io_object_t::plug (poller_base_t *io_thread_)
+void xs::io_object_t::plug (io_thread_t *io_thread_)
{
xs_assert (io_thread_);
- xs_assert (!poller);
+ xs_assert (!io_thread);
- // Retrieve the poller from the thread we are running in.
- poller = io_thread_;
+ // Retrieve the io_thread from the thread we are running in.
+ io_thread = io_thread_;
}
void xs::io_object_t::unplug ()
{
- xs_assert (poller);
-
- // Forget about old poller in preparation to be migrated
- // to a different I/O thread.
- poller = NULL;
+ xs_assert (io_thread);
+ io_thread = NULL;
}
xs::handle_t xs::io_object_t::add_fd (fd_t fd_)
{
- return poller->add_fd (fd_, this);
+ return io_thread->add_fd (fd_, this);
}
void xs::io_object_t::rm_fd (handle_t handle_)
{
- poller->rm_fd (handle_);
+ io_thread->rm_fd (handle_);
}
void xs::io_object_t::set_pollin (handle_t handle_)
{
- poller->set_pollin (handle_);
+ io_thread->set_pollin (handle_);
}
void xs::io_object_t::reset_pollin (handle_t handle_)
{
- poller->reset_pollin (handle_);
+ io_thread->reset_pollin (handle_);
}
void xs::io_object_t::set_pollout (handle_t handle_)
{
- poller->set_pollout (handle_);
+ io_thread->set_pollout (handle_);
}
void xs::io_object_t::reset_pollout (handle_t handle_)
{
- poller->reset_pollout (handle_);
+ io_thread->reset_pollout (handle_);
}
xs::handle_t xs::io_object_t::add_timer (int timeout_)
{
- return poller->add_timer (timeout_, this);
+ return io_thread->add_timer (timeout_, this);
}
void xs::io_object_t::rm_timer (handle_t handle_)
{
- poller->rm_timer (handle_);
+ io_thread->rm_timer (handle_);
}
void xs::io_object_t::in_event (fd_t fd_)
diff --git a/src/io_object.hpp b/src/io_object.hpp
index 0749ceb..31f9507 100644
--- a/src/io_object.hpp
+++ b/src/io_object.hpp
@@ -25,30 +25,30 @@
#include <stddef.h>
#include "stdint.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
namespace xs
{
// Simple base class for objects that live in I/O threads.
- // It makes communication with the poller object easier and
+ // It makes communication with the io_thread object easier and
// makes defining unneeded event handlers unnecessary.
class io_object_t : public i_poll_events
{
public:
- io_object_t (xs::poller_base_t *io_thread_ = NULL);
+ io_object_t (xs::io_thread_t *io_thread_ = NULL);
~io_object_t ();
// When migrating an object from one I/O thread to another, first
// unplug it, then migrate it, then plug it to the new thread.
- void plug (xs::poller_base_t *io_thread_);
+ void plug (xs::io_thread_t *io_thread_);
void unplug ();
protected:
- // Methods to access underlying poller object.
+ // Methods to access underlying io_thread object.
handle_t add_fd (fd_t fd_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
@@ -65,7 +65,7 @@ namespace xs
private:
- poller_base_t *poller;
+ io_thread_t *io_thread;
io_object_t (const io_object_t&);
const io_object_t &operator = (const io_object_t&);
diff --git a/src/poller_base.cpp b/src/io_thread.cpp
index 913a8d7..d6dca8c 100644
--- a/src/poller_base.cpp
+++ b/src/io_thread.cpp
@@ -18,7 +18,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "poller_base.hpp"
+#include "io_thread.hpp"
#include "err.hpp"
#include "select.hpp"
@@ -27,9 +27,9 @@
#include "devpoll.hpp"
#include "kqueue.hpp"
-xs::poller_base_t *xs::poller_base_t::create (xs::ctx_t *ctx_, uint32_t tid_)
+xs::io_thread_t *xs::io_thread_t::create (xs::ctx_t *ctx_, uint32_t tid_)
{
- poller_base_t *result;
+ io_thread_t *result;
#if defined XS_HAVE_SELECT
result = new (std::nothrow) select_t (ctx_, tid_);
#elif defined XS_HAVE_POLL
@@ -45,45 +45,45 @@ xs::poller_base_t *xs::poller_base_t::create (xs::ctx_t *ctx_, uint32_t tid_)
return result;
}
-xs::poller_base_t::poller_base_t (xs::ctx_t *ctx_, uint32_t tid_) :
+xs::io_thread_t::io_thread_t (xs::ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_)
{
}
-xs::poller_base_t::~poller_base_t ()
+xs::io_thread_t::~io_thread_t ()
{
}
-void xs::poller_base_t::start ()
+void xs::io_thread_t::start ()
{
mailbox_handle = add_fd (mailbox.get_fd (), this);
set_pollin (mailbox_handle);
xstart ();
}
-void xs::poller_base_t::stop ()
+void xs::io_thread_t::stop ()
{
// Ask the I/O thread to stop.
send_stop ();
}
-void xs::poller_base_t::process_stop ()
+void xs::io_thread_t::process_stop ()
{
rm_fd (mailbox_handle);
xstop ();
}
-xs::mailbox_t *xs::poller_base_t::get_mailbox ()
+xs::mailbox_t *xs::io_thread_t::get_mailbox ()
{
return &mailbox;
}
-int xs::poller_base_t::get_load ()
+int xs::io_thread_t::get_load ()
{
return load.get ();
}
-void xs::poller_base_t::adjust_load (int amount_)
+void xs::io_thread_t::adjust_load (int amount_)
{
if (amount_ > 0)
load.add (amount_);
@@ -91,7 +91,7 @@ void xs::poller_base_t::adjust_load (int amount_)
load.sub (-amount_);
}
-xs::handle_t xs::poller_base_t::add_timer (int timeout_, i_poll_events *sink_)
+xs::handle_t xs::io_thread_t::add_timer (int timeout_, i_poll_events *sink_)
{
uint64_t expiration = clock.now_ms () + timeout_;
timer_info_t info = {sink_, timers_t::iterator ()};
@@ -101,13 +101,13 @@ xs::handle_t xs::poller_base_t::add_timer (int timeout_, i_poll_events *sink_)
return (handle_t) &(it->second);
}
-void xs::poller_base_t::rm_timer (handle_t handle_)
+void xs::io_thread_t::rm_timer (handle_t handle_)
{
timer_info_t *info = (timer_info_t*) handle_;
timers.erase (info->self);
}
-uint64_t xs::poller_base_t::execute_timers ()
+uint64_t xs::io_thread_t::execute_timers ()
{
// Fast track.
if (timers.empty ())
@@ -140,7 +140,7 @@ uint64_t xs::poller_base_t::execute_timers ()
return 0;
}
-void xs::poller_base_t::in_event (fd_t fd_)
+void xs::io_thread_t::in_event (fd_t fd_)
{
// TODO: Do we want to limit number of commands I/O thread can
// process in a single go?
@@ -161,13 +161,13 @@ void xs::poller_base_t::in_event (fd_t fd_)
}
}
-void xs::poller_base_t::out_event (fd_t fd_)
+void xs::io_thread_t::out_event (fd_t fd_)
{
// We are never polling for POLLOUT here. This function is never called.
xs_assert (false);
}
-void xs::poller_base_t::timer_event (handle_t handle_)
+void xs::io_thread_t::timer_event (handle_t handle_)
{
// No timers here. This function is never called.
xs_assert (false);
diff --git a/src/poller_base.hpp b/src/io_thread.hpp
index 2388fad..689b851 100644
--- a/src/poller_base.hpp
+++ b/src/io_thread.hpp
@@ -18,8 +18,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __XS_POLLER_BASE_HPP_INCLUDED__
-#define __XS_POLLER_BASE_HPP_INCLUDED__
+#ifndef __XS_IO_THREAD_HPP_INCLUDED__
+#define __XS_IO_THREAD_HPP_INCLUDED__
#include <map>
@@ -54,23 +54,23 @@ namespace xs
virtual void timer_event (handle_t handle_) = 0;
};
- class poller_base_t : public object_t, public i_poll_events
+ class io_thread_t : public object_t, public i_poll_events
{
public:
- // Create optimal poller mechanism for this environment.
- static poller_base_t *create (xs::ctx_t *ctx_, uint32_t tid_);
+ // Create optimal polling mechanism for this environment.
+ static io_thread_t *create (xs::ctx_t *ctx_, uint32_t tid_);
- virtual ~poller_base_t ();
+ virtual ~io_thread_t ();
- // Returns load of the poller. Note that this function can be
+ // Returns load of the I/O thread. Note that this function can be
// invoked from a different thread!
int get_load ();
void start ();
void stop ();
- // Returns mailbox associated with this I/O poller.
+ // Returns mailbox associated with this I/O thread.
mailbox_t *get_mailbox ();
virtual handle_t add_fd (fd_t fd_, xs::i_poll_events *events_) = 0;
@@ -96,9 +96,9 @@ namespace xs
protected:
- poller_base_t (xs::ctx_t *ctx_, uint32_t tid_);
+ io_thread_t (xs::ctx_t *ctx_, uint32_t tid_);
- // Called by individual poller implementations to manage the load.
+ // Called by individual io_thread implementations to manage the load.
void adjust_load (int amount_);
// Executes any timers that are due. Returns number of milliseconds
@@ -121,7 +121,7 @@ namespace xs
typedef std::multimap <uint64_t, timer_info_t> timers_t;
timers_t timers;
- // Load of the poller. Currently the number of file descriptors
+ // Load of the I/O thread. Currently the number of file descriptors
// registered.
atomic_counter_t load;
@@ -131,8 +131,8 @@ namespace xs
// Handle associated with mailbox' file descriptor.
handle_t mailbox_handle;
- poller_base_t (const poller_base_t&);
- const poller_base_t &operator = (const poller_base_t&);
+ io_thread_t (const io_thread_t&);
+ const io_thread_t &operator = (const io_thread_t&);
};
}
diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp
index b65887a..f6a01c3 100644
--- a/src/ipc_connecter.cpp
+++ b/src/ipc_connecter.cpp
@@ -26,7 +26,7 @@
#include <string>
#include "stream_engine.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
#include "platform.hpp"
#include "random.hpp"
#include "err.hpp"
@@ -37,7 +37,7 @@
#include <sys/socket.h>
#include <sys/un.h>
-xs::ipc_connecter_t::ipc_connecter_t (class poller_base_t *io_thread_,
+xs::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
class session_base_t *session_, const options_t &options_,
const char *address_, bool wait_) :
own_t (io_thread_, options_),
diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp
index 46e260c..e598393 100644
--- a/src/ipc_connecter.hpp
+++ b/src/ipc_connecter.hpp
@@ -34,7 +34,7 @@
namespace xs
{
- class poller_base_t;
+ class io_thread_t;
class session_base_t;
class ipc_connecter_t : public own_t, public io_object_t
@@ -43,7 +43,7 @@ namespace xs
// If 'delay' is true connecter first waits for a while, then starts
// connection process.
- ipc_connecter_t (xs::poller_base_t *io_thread_,
+ ipc_connecter_t (xs::io_thread_t *io_thread_,
xs::session_base_t *session_, const options_t &