summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-02-16 10:10:49 +0900
committerMartin Sustrik <sustrik@250bpm.com>2012-02-16 10:10:49 +0900
commit50f225a04422abf64545f5eb36592d8c990b0ae4 (patch)
tree1ad0396c3f32b4e89f80d4abdbf0d9ced6a1e55c /src
parente5e9852181947a9c41db9ff3c47e94d66a933161 (diff)
poller_base_t renamed to io_thread_t
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am4
-rw-r--r--src/ctx.cpp6
-rw-r--r--src/ctx.hpp6
-rw-r--r--src/devpoll.cpp2
-rw-r--r--src/devpoll.hpp6
-rw-r--r--src/epoll.cpp2
-rw-r--r--src/epoll.hpp6
-rw-r--r--src/i_engine.hpp4
-rw-r--r--src/io_object.cpp37
-rw-r--r--src/io_object.hpp12
-rw-r--r--src/io_thread.cpp (renamed from src/poller_base.cpp)34
-rw-r--r--src/io_thread.hpp (renamed from src/poller_base.hpp)26
-rw-r--r--src/ipc_connecter.cpp4
-rw-r--r--src/ipc_connecter.hpp6
-rw-r--r--src/ipc_listener.cpp6
-rw-r--r--src/ipc_listener.hpp4
-rw-r--r--src/kqueue.cpp2
-rw-r--r--src/kqueue.hpp6
-rw-r--r--src/monitor.cpp4
-rw-r--r--src/monitor.hpp6
-rw-r--r--src/object.cpp4
-rw-r--r--src/object.hpp4
-rw-r--r--src/own.cpp4
-rw-r--r--src/own.hpp4
-rw-r--r--src/pair.cpp2
-rw-r--r--src/pair.hpp2
-rw-r--r--src/pgm_receiver.cpp4
-rw-r--r--src/pgm_receiver.hpp6
-rw-r--r--src/pgm_sender.cpp8
-rw-r--r--src/pgm_sender.hpp6
-rw-r--r--src/poll.cpp2
-rw-r--r--src/poll.hpp6
-rw-r--r--src/pub.cpp2
-rw-r--r--src/pub.hpp4
-rw-r--r--src/pull.cpp2
-rw-r--r--src/pull.hpp4
-rw-r--r--src/push.cpp2
-rw-r--r--src/push.hpp4
-rw-r--r--src/reaper.cpp24
-rw-r--r--src/reaper.hpp6
-rw-r--r--src/rep.cpp2
-rw-r--r--src/rep.hpp4
-rw-r--r--src/req.cpp2
-rw-r--r--src/req.hpp4
-rw-r--r--src/select.cpp2
-rw-r--r--src/select.hpp6
-rw-r--r--src/session_base.cpp6
-rw-r--r--src/session_base.hpp8
-rw-r--r--src/socket_base.cpp18
-rw-r--r--src/socket_base.hpp14
-rw-r--r--src/stream_engine.cpp8
-rw-r--r--src/stream_engine.hpp4
-rw-r--r--src/sub.cpp2
-rw-r--r--src/sub.hpp4
-rw-r--r--src/tcp_connecter.cpp4
-rw-r--r--src/tcp_connecter.hpp6
-rw-r--r--src/tcp_listener.cpp6
-rw-r--r--src/tcp_listener.hpp4
-rw-r--r--src/xpub.cpp2
-rw-r--r--src/xpub.hpp4
-rw-r--r--src/xrep.cpp2
-rw-r--r--src/xrep.hpp4
-rw-r--r--src/xreq.cpp2
-rw-r--r--src/xreq.hpp4
-rw-r--r--src/xsub.cpp2
-rw-r--r--src/xsub.hpp4
66 files changed, 204 insertions, 207 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 7d3ee8d..9f11611 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -23,6 +23,7 @@ libxs_la_SOURCES = \
fd.hpp \
fq.hpp \
io_object.hpp \
+ io_thread.hpp \
ip.hpp \
ipc_address.hpp \
ipc_connecter.hpp \
@@ -45,7 +46,6 @@ libxs_la_SOURCES = \
pipe.hpp \
platform.hpp \
poll.hpp \
- poller_base.hpp \
pair.hpp \
pub.hpp \
pull.hpp \
@@ -85,6 +85,7 @@ libxs_la_SOURCES = \
err.cpp \
fq.cpp \
io_object.cpp \
+ io_thread.cpp \
ip.cpp \
ipc_address.cpp \
ipc_connecter.cpp \
@@ -104,7 +105,6 @@ libxs_la_SOURCES = \
pgm_socket.cpp \
pipe.cpp \
poll.cpp \
- poller_base.cpp \
pull.cpp \
push.cpp \
reaper.cpp \
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 8b27b31..789cdc2 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -197,7 +197,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_)
// Create I/O thread objects and launch them.
for (uint32_t i = 2; i != io_thread_count + 2; i++) {
- poller_base_t *io_thread = poller_base_t::create (this, i);
+ io_thread_t *io_thread = io_thread_t::create (this, i);
errno_assert (io_thread);
io_threads.push_back (io_thread);
slots [i] = io_thread->get_mailbox ();
@@ -226,7 +226,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_)
#endif
// Create the monitor object.
- poller_base_t *io_thread = choose_io_thread (0);
+ io_thread_t *io_thread = choose_io_thread (0);
xs_assert (io_thread);
monitor = new (std::nothrow) monitor_t (io_thread);
alloc_assert (monitor);
@@ -301,7 +301,7 @@ void xs::ctx_t::send_command (uint32_t tid_, const command_t &command_)
slots [tid_]->send (command_);
}
-xs::poller_base_t *xs::ctx_t::choose_io_thread (uint64_t affinity_)
+xs::io_thread_t *xs::ctx_t::choose_io_thread (uint64_t affinity_)
{
if (io_threads.empty ())
return NULL;
diff --git a/src/ctx.hpp b/src/ctx.hpp
index c12fffc..e912443 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -40,7 +40,7 @@ namespace xs
class object_t;
class monitor_t;
- class poller_base_t;
+ class io_thread_t;
class socket_base_t;
class reaper_t;
@@ -86,7 +86,7 @@ namespace xs
// Returns the I/O thread that is the least busy at the moment.
// Affinity specifies which I/O threads are eligible (0 = all).
// Returns NULL is no I/O thread is available.
- xs::poller_base_t *choose_io_thread (uint64_t affinity_);
+ xs::io_thread_t *choose_io_thread (uint64_t affinity_);
// Returns reaper thread object.
xs::object_t *get_reaper ();
@@ -142,7 +142,7 @@ namespace xs
xs::reaper_t *reaper;
// I/O threads.
- typedef std::vector <xs::poller_base_t*> io_threads_t;
+ typedef std::vector <xs::io_thread_t*> io_threads_t;
io_threads_t io_threads;
// Array of pointers to mailboxes for both application and I/O threads.
diff --git a/src/devpoll.cpp b/src/devpoll.cpp
index e85b765..936d6c2 100644
--- a/src/devpoll.cpp
+++ b/src/devpoll.cpp
@@ -38,7 +38,7 @@
#include "config.hpp"
xs::devpoll_t::devpoll_t (xs::ctx_t *ctx_, uint32_t tid_) :
- poller_base_t (ctx_, tid_),
+ io_thread_t (ctx_, tid_),
stopping (false)
{
devpoll_fd = open ("/dev/poll", O_RDWR);
diff --git a/src/devpoll.hpp b/src/devpoll.hpp
index b32aa28..3947f3e 100644
--- a/src/devpoll.hpp
+++ b/src/devpoll.hpp
@@ -30,7 +30,7 @@
#include "fd.hpp"
#include "thread.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
namespace xs
{
@@ -39,14 +39,14 @@ namespace xs
// Implements socket polling mechanism using the "/dev/poll" interface.
- class devpoll_t : public poller_base_t
+ class devpoll_t : public io_thread_t
{
public:
devpoll_t (xs::ctx_t *ctx_, uint32_t tid_);
~devpoll_t ();
- // "poller" concept.
+ // Implementation of virtual functions from io_thread_t.
handle_t add_fd (fd_t fd_, xs::i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
diff --git a/src/epoll.cpp b/src/epoll.cpp
index 7ca4608..aa13b31 100644
--- a/src/epoll.cpp
+++ b/src/epoll.cpp
@@ -35,7 +35,7 @@
#include "err.hpp"
xs::epoll_t::epoll_t (xs::ctx_t *ctx_, uint32_t tid_) :
- poller_base_t (ctx_, tid_),
+ io_thread_t (ctx_, tid_),
stopping (false)
{
epoll_fd = epoll_create (1);
diff --git a/src/epoll.hpp b/src/epoll.hpp
index 232d049..ad94120 100644
--- a/src/epoll.hpp
+++ b/src/epoll.hpp
@@ -31,7 +31,7 @@
#include "fd.hpp"
#include "thread.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
namespace xs
{
@@ -42,14 +42,14 @@ namespace xs
// This class implements socket polling mechanism using the Linux-specific
// epoll mechanism.
- class epoll_t : public poller_base_t
+ class epoll_t : public io_thread_t
{
public:
epoll_t (xs::ctx_t *ctx_, uint32_t tid_);
~epoll_t ();
- // "poller" concept.
+ // Implementation of virtual functions from io_thread_t.
handle_t add_fd (fd_t fd_, xs::i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index 9106411..fc03f0e 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -25,7 +25,7 @@
namespace xs
{
- class poller_base_t;
+ class io_thread_t;
// Abstract interface to be implemented by various engines.
@@ -34,7 +34,7 @@ namespace xs
virtual ~i_engine () {}
// Plug the engine to the session.
- virtual void plug (xs::poller_base_t *io_thread_,
+ virtual void plug (xs::io_thread_t *io_thread_,
class session_base_t *session_) = 0;
// Unplug the engine from the session.
diff --git a/src/io_object.cpp b/src/io_object.cpp
index 0c73571..c17ff04 100644
--- a/src/io_object.cpp
+++ b/src/io_object.cpp
@@ -20,11 +20,11 @@
*/
#include "io_object.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
#include "err.hpp"
-xs::io_object_t::io_object_t (poller_base_t *io_thread_) :
- poller (NULL)
+xs::io_object_t::io_object_t (io_thread_t *io_thread_) :
+ io_thread (NULL)
{
if (io_thread_)
plug (io_thread_);
@@ -34,62 +34,59 @@ xs::io_object_t::~io_object_t ()
{
}
-void xs::io_object_t::plug (poller_base_t *io_thread_)
+void xs::io_object_t::plug (io_thread_t *io_thread_)
{
xs_assert (io_thread_);
- xs_assert (!poller);
+ xs_assert (!io_thread);
- // Retrieve the poller from the thread we are running in.
- poller = io_thread_;
+ // Retrieve the io_thread from the thread we are running in.
+ io_thread = io_thread_;
}
void xs::io_object_t::unplug ()
{
- xs_assert (poller);
-
- // Forget about old poller in preparation to be migrated
- // to a different I/O thread.
- poller = NULL;
+ xs_assert (io_thread);
+ io_thread = NULL;
}
xs::handle_t xs::io_object_t::add_fd (fd_t fd_)
{
- return poller->add_fd (fd_, this);
+ return io_thread->add_fd (fd_, this);
}
void xs::io_object_t::rm_fd (handle_t handle_)
{
- poller->rm_fd (handle_);
+ io_thread->rm_fd (handle_);
}
void xs::io_object_t::set_pollin (handle_t handle_)
{
- poller->set_pollin (handle_);
+ io_thread->set_pollin (handle_);
}
void xs::io_object_t::reset_pollin (handle_t handle_)
{
- poller->reset_pollin (handle_);
+ io_thread->reset_pollin (handle_);
}
void xs::io_object_t::set_pollout (handle_t handle_)
{
- poller->set_pollout (handle_);
+ io_thread->set_pollout (handle_);
}
void xs::io_object_t::reset_pollout (handle_t handle_)
{
- poller->reset_pollout (handle_);
+ io_thread->reset_pollout (handle_);
}
xs::handle_t xs::io_object_t::add_timer (int timeout_)
{
- return poller->add_timer (timeout_, this);
+ return io_thread->add_timer (timeout_, this);
}
void xs::io_object_t::rm_timer (handle_t handle_)
{
- poller->rm_timer (handle_);
+ io_thread->rm_timer (handle_);
}
void xs::io_object_t::in_event (fd_t fd_)
diff --git a/src/io_object.hpp b/src/io_object.hpp
index 0749ceb..31f9507 100644
--- a/src/io_object.hpp
+++ b/src/io_object.hpp
@@ -25,30 +25,30 @@
#include <stddef.h>
#include "stdint.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
namespace xs
{
// Simple base class for objects that live in I/O threads.
- // It makes communication with the poller object easier and
+ // It makes communication with the io_thread object easier and
// makes defining unneeded event handlers unnecessary.
class io_object_t : public i_poll_events
{
public:
- io_object_t (xs::poller_base_t *io_thread_ = NULL);
+ io_object_t (xs::io_thread_t *io_thread_ = NULL);
~io_object_t ();
// When migrating an object from one I/O thread to another, first
// unplug it, then migrate it, then plug it to the new thread.
- void plug (xs::poller_base_t *io_thread_);
+ void plug (xs::io_thread_t *io_thread_);
void unplug ();
protected:
- // Methods to access underlying poller object.
+ // Methods to access underlying io_thread object.
handle_t add_fd (fd_t fd_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
@@ -65,7 +65,7 @@ namespace xs
private:
- poller_base_t *poller;
+ io_thread_t *io_thread;
io_object_t (const io_object_t&);
const io_object_t &operator = (const io_object_t&);
diff --git a/src/poller_base.cpp b/src/io_thread.cpp
index 913a8d7..d6dca8c 100644
--- a/src/poller_base.cpp
+++ b/src/io_thread.cpp
@@ -18,7 +18,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "poller_base.hpp"
+#include "io_thread.hpp"
#include "err.hpp"
#include "select.hpp"
@@ -27,9 +27,9 @@
#include "devpoll.hpp"
#include "kqueue.hpp"
-xs::poller_base_t *xs::poller_base_t::create (xs::ctx_t *ctx_, uint32_t tid_)
+xs::io_thread_t *xs::io_thread_t::create (xs::ctx_t *ctx_, uint32_t tid_)
{
- poller_base_t *result;
+ io_thread_t *result;
#if defined XS_HAVE_SELECT
result = new (std::nothrow) select_t (ctx_, tid_);
#elif defined XS_HAVE_POLL
@@ -45,45 +45,45 @@ xs::poller_base_t *xs::poller_base_t::create (xs::ctx_t *ctx_, uint32_t tid_)
return result;
}
-xs::poller_base_t::poller_base_t (xs::ctx_t *ctx_, uint32_t tid_) :
+xs::io_thread_t::io_thread_t (xs::ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_)
{
}
-xs::poller_base_t::~poller_base_t ()
+xs::io_thread_t::~io_thread_t ()
{
}
-void xs::poller_base_t::start ()
+void xs::io_thread_t::start ()
{
mailbox_handle = add_fd (mailbox.get_fd (), this);
set_pollin (mailbox_handle);
xstart ();
}
-void xs::poller_base_t::stop ()
+void xs::io_thread_t::stop ()
{
// Ask the I/O thread to stop.
send_stop ();
}
-void xs::poller_base_t::process_stop ()
+void xs::io_thread_t::process_stop ()
{
rm_fd (mailbox_handle);
xstop ();
}
-xs::mailbox_t *xs::poller_base_t::get_mailbox ()
+xs::mailbox_t *xs::io_thread_t::get_mailbox ()
{
return &mailbox;
}
-int xs::poller_base_t::get_load ()
+int xs::io_thread_t::get_load ()
{
return load.get ();
}
-void xs::poller_base_t::adjust_load (int amount_)
+void xs::io_thread_t::adjust_load (int amount_)
{
if (amount_ > 0)
load.add (amount_);
@@ -91,7 +91,7 @@ void xs::poller_base_t::adjust_load (int amount_)
load.sub (-amount_);
}
-xs::handle_t xs::poller_base_t::add_timer (int timeout_, i_poll_events *sink_)
+xs::handle_t xs::io_thread_t::add_timer (int timeout_, i_poll_events *sink_)
{
uint64_t expiration = clock.now_ms () + timeout_;
timer_info_t info = {sink_, timers_t::iterator ()};
@@ -101,13 +101,13 @@ xs::handle_t xs::poller_base_t::add_timer (int timeout_, i_poll_events *sink_)
return (handle_t) &(it->second);
}
-void xs::poller_base_t::rm_timer (handle_t handle_)
+void xs::io_thread_t::rm_timer (handle_t handle_)
{
timer_info_t *info = (timer_info_t*) handle_;
timers.erase (info->self);
}
-uint64_t xs::poller_base_t::execute_timers ()
+uint64_t xs::io_thread_t::execute_timers ()
{
// Fast track.
if (timers.empty ())
@@ -140,7 +140,7 @@ uint64_t xs::poller_base_t::execute_timers ()
return 0;
}
-void xs::poller_base_t::in_event (fd_t fd_)
+void xs::io_thread_t::in_event (fd_t fd_)
{
// TODO: Do we want to limit number of commands I/O thread can
// process in a single go?
@@ -161,13 +161,13 @@ void xs::poller_base_t::in_event (fd_t fd_)
}
}
-void xs::poller_base_t::out_event (fd_t fd_)
+void xs::io_thread_t::out_event (fd_t fd_)
{
// We are never polling for POLLOUT here. This function is never called.
xs_assert (false);
}
-void xs::poller_base_t::timer_event (handle_t handle_)
+void xs::io_thread_t::timer_event (handle_t handle_)
{
// No timers here. This function is never called.
xs_assert (false);
diff --git a/src/poller_base.hpp b/src/io_thread.hpp
index 2388fad..689b851 100644
--- a/src/poller_base.hpp
+++ b/src/io_thread.hpp
@@ -18,8 +18,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __XS_POLLER_BASE_HPP_INCLUDED__
-#define __XS_POLLER_BASE_HPP_INCLUDED__
+#ifndef __XS_IO_THREAD_HPP_INCLUDED__
+#define __XS_IO_THREAD_HPP_INCLUDED__
#include <map>
@@ -54,23 +54,23 @@ namespace xs
virtual void timer_event (handle_t handle_) = 0;
};
- class poller_base_t : public object_t, public i_poll_events
+ class io_thread_t : public object_t, public i_poll_events
{
public:
- // Create optimal poller mechanism for this environment.
- static poller_base_t *create (xs::ctx_t *ctx_, uint32_t tid_);
+ // Create optimal polling mechanism for this environment.
+ static io_thread_t *create (xs::ctx_t *ctx_, uint32_t tid_);
- virtual ~poller_base_t ();
+ virtual ~io_thread_t ();
- // Returns load of the poller. Note that this function can be
+ // Returns load of the I/O thread. Note that this function can be
// invoked from a different thread!
int get_load ();
void start ();
void stop ();
- // Returns mailbox associated with this I/O poller.
+ // Returns mailbox associated with this I/O thread.
mailbox_t *get_mailbox ();
virtual handle_t add_fd (fd_t fd_, xs::i_poll_events *events_) = 0;
@@ -96,9 +96,9 @@ namespace xs
protected:
- poller_base_t (xs::ctx_t *ctx_, uint32_t tid_);
+ io_thread_t (xs::ctx_t *ctx_, uint32_t tid_);
- // Called by individual poller implementations to manage the load.
+ // Called by individual io_thread implementations to manage the load.
void adjust_load (int amount_);
// Executes any timers that are due. Returns number of milliseconds
@@ -121,7 +121,7 @@ namespace xs
typedef std::multimap <uint64_t, timer_info_t> timers_t;
timers_t timers;
- // Load of the poller. Currently the number of file descriptors
+ // Load of the I/O thread. Currently the number of file descriptors
// registered.
atomic_counter_t load;
@@ -131,8 +131,8 @@ namespace xs
// Handle associated with mailbox' file descriptor.
handle_t mailbox_handle;
- poller_base_t (const poller_base_t&);
- const poller_base_t &operator = (const poller_base_t&);
+ io_thread_t (const io_thread_t&);
+ const io_thread_t &operator = (const io_thread_t&);
};
}
diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp
index b65887a..f6a01c3 100644
--- a/src/ipc_connecter.cpp
+++ b/src/ipc_connecter.cpp
@@ -26,7 +26,7 @@
#include <string>
#include "stream_engine.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
#include "platform.hpp"
#include "random.hpp"
#include "err.hpp"
@@ -37,7 +37,7 @@
#include <sys/socket.h>
#include <sys/un.h>
-xs::ipc_connecter_t::ipc_connecter_t (class poller_base_t *io_thread_,
+xs::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
class session_base_t *session_, const options_t &options_,
const char *address_, bool wait_) :
own_t (io_thread_, options_),
diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp
index 46e260c..e598393 100644
--- a/src/ipc_connecter.hpp
+++ b/src/ipc_connecter.hpp
@@ -34,7 +34,7 @@
namespace xs
{
- class poller_base_t;
+ class io_thread_t;
class session_base_t;
class ipc_connecter_t : public own_t, public io_object_t
@@ -43,7 +43,7 @@ namespace xs
// If 'delay' is true connecter first waits for a while, then starts
// connection process.
- ipc_connecter_t (xs::poller_base_t *io_thread_,
+ ipc_connecter_t (xs::io_thread_t *io_thread_,
xs::session_base_t *session_, const options_t &options_,
const char *address_, bool delay_);
~ipc_connecter_t ();
@@ -94,7 +94,7 @@ namespace xs
fd_t s;
// Handle corresponding to the listening socket or NULL if the socket
- // is not registered with the poller.
+ // is not registered with the io_thread.
handle_t handle;
// If true, connecter is waiting a while before trying to connect.
diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp
index bd55cc7..db49528 100644
--- a/src/ipc_listener.cpp
+++ b/src/ipc_listener.cpp
@@ -28,7 +28,7 @@
#include "stream_engine.hpp"
#include "ipc_address.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
#include "session_base.hpp"
#include "config.hpp"
#include "err.hpp"
@@ -39,7 +39,7 @@
#include <fcntl.h>
#include <sys/un.h>
-xs::ipc_listener_t::ipc_listener_t (poller_base_t *io_thread_,
+xs::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_,
socket_base_t *socket_, const options_t &options_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
@@ -83,7 +83,7 @@ void xs::ipc_listener_t::in_event (fd_t fd_)
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
- poller_base_t *io_thread = choose_io_thread (options.affinity);
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
xs_assert (io_thread);
// Create and launch a session object.
diff --git a/src/ipc_listener.hpp b/src/ipc_listener.hpp
index 24b96fb..b599bff 100644
--- a/src/ipc_listener.hpp
+++ b/src/ipc_listener.hpp
@@ -35,14 +35,14 @@
namespace xs
{
- class poller_base_t;
+ class io_thread_t;
class socket_base_t;
class ipc_listener_t : public own_t, public io_object_t
{
public:
- ipc_listener_t (xs::poller_base_t *io_thread_,
+ ipc_listener_t (xs::io_thread_t *io_thread_,
xs::socket_base_t *socket_, const options_t &options_);
~ipc_listener_t ();
diff --git a/src/kqueue.cpp b/src/kqueue.cpp
index 6a193df..1a0ee4d 100644
--- a/src/kqueue.cpp
+++ b/src/kqueue.cpp
@@ -45,7 +45,7 @@
#endif
xs::kqueue_t::kqueue_t (xs::ctx_t *ctx_, uint32_t tid_) :
- poller_base_t (ctx_, tid_),
+ io_thread_t (ctx_, tid_),
stopping (false)
{
// Create event queue
diff --git a/src/kqueue.hpp b/src/kqueue.hpp
index f3350b9..dea84ac 100644
--- a/src/kqueue.hpp
+++ b/src/kqueue.hpp
@@ -29,7 +29,7 @@
#include "fd.hpp"
#include "thread.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
namespace xs
{
@@ -39,14 +39,14 @@ namespace xs
// Implements socket polling mechanism using the BSD-specific
// kqueue interface.
- class kqueue_t : public poller_base_t
+ class kqueue_t : public io_thread_t
{
public:
kqueue_t (xs::ctx_t *ctx_, uint32_t tid_);
~kqueue_t ();
- // "poller" concept.
+ // Implementation of virtual functions from io_thread_t.
handle_t add_fd (fd_t fd_, xs::i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
diff --git a/src/monitor.cpp b/src/monitor.cpp
index 14a31f2..b29c874 100644
--- a/src/monitor.cpp
+++ b/src/monitor.cpp
@@ -19,12 +19,12 @@
*/
#include "monitor.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
#include "options.hpp"
#include "random.hpp"
#include "err.hpp"
-xs::monitor_t::monitor_t (xs::poller_base_t *io_thread_) :
+xs::monitor_t::monitor_t (xs::io_thread_t *io_thread_) :
own_t (io_thread_, options_t ()),
io_object_t (io_thread_),
timer (NULL)
diff --git a/src/monitor.hpp b/src/monitor.hpp
index 91d441a..db6f74b 100644
--- a/src/monitor.hpp
+++ b/src/monitor.hpp
@@ -30,14 +30,14 @@
namespace xs
{
- class poller_base_t;
+ class io_thread_t;
class socket_base_t;
class monitor_t : public own_t, public io_object_t
{
public:
- monitor_t (xs::poller_base_t *io_thread_);
+ monitor_t (xs::io_thread_t *io_thread_);
~monitor_t ();
void start ();
@@ -51,7 +51,7 @@ namespace xs
void process_plug ();
void process_stop ();
- // Events from the poller.
+ // Events from the I/O thread.
void timer_event (handle_t handle_);
// Actual monitoring data to send and the related critical section.
diff --git a/src/object.cpp b/src/object.cpp
index df279f5..2d1f5aa 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -26,7 +26,7 @@
#include "ctx.hpp"
#include "err.hpp"
#include "pipe.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
#include "session_base.hpp"
#include "socket_base.hpp"
@@ -149,7 +149,7 @@ void xs::object_t::destroy_socket (socket_base_t *socket_)
ctx->destroy_socket (socket_);
}
-xs::poller_base_t *xs::object_t::choose_io_thread (uint64_t affinity_)
+xs::io_thread_t *xs::object_t::choose_io_thread (uint64_t affinity_)
{
return ctx->choose_io_thread (affinity_);
}
diff --git a/src/object.hpp b/src/object.hpp
index 14dad49..b1bc0c3 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -34,7 +34,7 @@ namespace xs
class pipe_t;
class socket_base_t;
class session_base_t;
- class poller_base_t;
+ class io_thread_t;
class own_t;
// Base class for all objects that participate in inter-thread
@@ -62,7 +62,7 @@ namespace xs
void destroy_socket (xs::socket_base_t *socket_);
// Chooses least loaded I/O thread.
- xs::poller_base_t *choose_io_thread (uint64_t affinity_);
+ xs::io_thread_t *choose_io_thread (uint64_t affinity_);
// Logging related functions.
void log (int sid_, const char *text_);
diff --git a/src/own.cpp b/src/own.cpp
index 3047d7b..a60d9d0 100644
--- a/src/own.cpp
+++ b/src/own.cpp
@@ -20,7 +20,7 @@
#include "own.hpp"
#include "err.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
xs::own_t::own_t (class ctx_t *parent_, uint32_t tid_) :
object_t (parent_, tid_),
@@ -32,7 +32,7 @@ xs::own_t::own_t (class ctx_t *parent_, uint32_t tid_) :
{
}
-xs::own_t::own_t (poller_base_t *io_thread_, const options_t &options_) :
+xs::own_t::own_t (io_thread_t *io_thread_, const options_t &options_) :
object_t (io_thread_),
options (options_),
terminating (false),
diff --git a/src/own.hpp b/src/own.hpp
index de0320a..d0afd62 100644
--- a/src/own.hpp
+++ b/src/own.hpp
@@ -33,7 +33,7 @@ namespace xs
{
class ctx_t;
- class poller_base_t;
+ class io_thread_t;
// Base class for objects forming a part of ownership hierarchy.
// It handles initialisation and destruction of such objects.
@@ -50,7 +50,7 @@ namespace xs
own_t (xs::ctx_t *parent_, uint32_t tid_);
// The object is living within I/O thread.
- own_t (xs::poller_base_t *io_thread_, const options_t &options_);
+ own_t (xs::io_thread_t *io_thread_, const options_t &options_);
// When another owned object wants to send command to this object
// it calls this function to let it know it should not shut down
diff --git a/src/pair.cpp b/src/pair.cpp
index c35084f..dd14d8d 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -117,7 +117,7 @@ bool xs::pair_t::xhas_out ()
return result;
}
-xs::pair_session_t::pair_session_t (poller_base_t *io_thread_, bool connect_,
+xs::pair_session_t::pair_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_,
diff --git a/src/pair.hpp b/src/pair.hpp
index 658e8e9..8ae3acf 100644
--- a/src/pair.hpp
+++ b/src/pair.hpp
@@ -63,7 +63,7 @@ namespace xs
{
public:
- pair_session_t (xs::poller_base_t *io_thread_, bool connect_,
+ pair_session_t (xs::io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~pair_session_t ();
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 3b29197..b2cc513 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -37,7 +37,7 @@
#include "wire.hpp"
#include "err.hpp"
-xs::pgm_receiver_t::pgm_receiver_t (class poller_base_t *parent_,
+xs::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
const options_t &options_) :
io_object_t (parent_),
pgm_socket (true, options_),
@@ -60,7 +60,7 @@ int xs::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
return pgm_socket.init (udp_encapsulation_, network_);
}
-void xs::pgm_receiver_t::plug (poller_base_t *io_thread_,
+void xs::pgm_receiver_t::plug (io_thread_t *io_thread_,
session_base_t *session_)
{
// Retrieve PGM fds and start polling.
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index 952265e..b8390c1 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -43,7 +43,7 @@
namespace xs
{
- class poller_base_t;
+ class io_thread_t;
class session_base_t;
class pgm_receiver_t : public io_object_t, public i_engine
@@ -51,13 +51,13 @@ namespace xs
public:
- pgm_receiver_t (xs::poller_base_t *parent_, const options_t &options_);
+ pgm_receiver_t (xs::io_thread_t *parent_, const options_t &options_);
~pgm_receiver_t ();
int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation.
- void plug (xs::poller_base_t *io_thread_,
+ void plug (xs::io_thread_t *io_thread_,
xs::session_base_t *session_);
void unplug ();
void terminate ();
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 20d3dba..fe57d4d 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -37,7 +37,7 @@
#include "wire.hpp"
#include "stdint.hpp"
-xs::pgm_sender_t::pgm_sender_t (poller_base_t *parent_,
+xs::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
const options_t &options_) :
io_object_t (parent_),
encoder (0),
@@ -64,7 +64,7 @@ int xs::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
return rc;
}
-void xs::pgm_sender_t::plug (poller_base_t *io_thread_,
+void xs::pgm_sender_t::plug (io_thread_t *io_thread_,
session_base_t *session_)
{
// Alocate 2 fds for PGM socket.
@@ -75,7 +75,7 @@ void xs::pgm_sender_t::plug (poller_base_t *io_thread_,
encoder.set_session (session_);
- // Fill fds from PGM transport and add them to the poller.
+ // Fill fds from PGM transport and add them to the I/O thread.
pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
&rdata_notify_fd, &pending_notify_fd);
@@ -204,7 +204,7 @@ void xs::pgm_sender_t::out_event (fd_t fd_)
void xs::pgm_sender_t::timer_event (handle_t handle_)
{
- // Timer cancels on return by poller_base.
+ // Timer cancels on return by io_thread.
if (handle_ == rx_timer) {
rx_timer = NULL;
in_event (retired_fd);
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index be6476b..000f5ba 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -41,7 +41,7 @@
namespace xs
{
- class poller_base_t;
+ class io_thread_t;
class session_base_t;
class pgm_sender_t : public io_object_t, public i_engine
@@ -49,13 +49,13 @@ namespace xs
public:
- pgm_sender_t (xs::poller_base_t *parent_, const options_t &options_);
+ pgm_sender_t (xs::io_thread_t *parent_, const options_t &options_);
~pgm_sender_t ();
int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation.
- void plug (xs::poller_base_t *io_thread_,
+ void plug (xs::io_thread_t *io_thread_,
xs::session_base_t *session_);
void unplug ();
void terminate ();
diff --git a/src/poll.cpp b/src/poll.cpp
index 622a0ea..2cb7669 100644
--- a/src/poll.cpp
+++ b/src/poll.cpp
@@ -33,7 +33,7 @@
#include "config.hpp"
xs::poll_t::poll_t (xs::ctx_t *ctx_, uint32_t tid_) :
- poller_base_t (ctx_, tid_),
+ io_thread_t (ctx_, tid_),
retired (false),
stopping (false)
{
diff --git a/src/poll.hpp b/src/poll.hpp
index 3d5157e..ec06671 100644
--- a/src/poll.hpp
+++ b/src/poll.hpp
@@ -32,7 +32,7 @@
#include "fd.hpp"
#include "thread.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
namespace xs
{
@@ -42,14 +42,14 @@ namespace xs
// Implements socket polling mechanism using the POSIX.1-2001
// poll() system call.
- class poll_t : public poller_base_t
+ class poll_t : public io_thread_t
{
public:
poll_t (xs::ctx_t *ctx_, uint32_t tid_);
~poll_t ();
- // "poller" concept.
+ // Implementation of virtual functions from io_thread_t.
handle_t add_fd (fd_t fd_, xs::i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
diff --git a/src/pub.cpp b/src/pub.cpp
index db285b8..aa0ed8e 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -44,7 +44,7 @@ bool xs::pub_t::xhas_in ()
return false;
}
-xs::pub_session_t::pub_session_t (poller_base_t *io_thread_, bool connect_,
+xs::pub_session_t::pub_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
xpub_session_t (io_thread_, connect_, socket_, options_, protocol_,
diff --git a/src/pub.hpp b/src/pub.hpp
index 45b510e..49aecc0 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -28,7 +28,7 @@ namespace xs
{
class ctx_t;
- class poller_base_t;
+ class io_thread_t;
class socket_base_t;
class msg_t;
@@ -53,7 +53,7 @@ namespace xs
{
public:
- pub_session_t (xs::poller_base_t *io_thread_, bool connect_,
+ pub_session_t (xs::io_thread_t *io_thread_, bool connect_,
xs::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~pub_session_t ();
diff --git a/src/pull.cpp b/src/pull.cpp
index 923c85b..e168c03 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -60,7 +60,7 @@ bool xs::pull_t::xhas_in ()
return fq.has_in ();
}
-xs::pull_session_t::pull_session_t (poller_base_t *io_thread_, bool connect_,
+xs::pull_session_t::pull_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_,
diff --git a/src/pull.hpp b/src/pull.hpp
index a02ef2d..29ca4d9 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -32,7 +32,7 @@ namespace xs
class ctx_t;
class pipe_t;
class msg_t;
- class poller_base_t;
+ class io_thread_t;
class pull_t :
public socket_base_t
@@ -65,7 +65,7 @@ namespace xs
{
public:
- pull_session_t (xs::poller_base_t *io_thread_, bool connect_,
+ pull_session_t (xs::io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~pull_session_t ();
diff --git a/src/push.cpp b/src/push.cpp
index ffc2d6e..45f0c62 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -60,7 +60,7 @@ bool xs::push_t::xhas_out ()
return lb.has_out ();
}
-xs::push_session_t::push_session_t (poller_base_t *io_thread_, bool connect_,
+xs::push_session_t::push_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_,
diff --git a/src/push.hpp b/src/push.hpp
index f6102c6..85c2279 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -32,7 +32,7 @@ namespace xs
class ctx_t;
class pipe_t;
class msg_t;
- class poller_base_t;
+ class io_thread_t;
class push_t :
public socket_base_t
@@ -64,7 +64,7 @@ namespace xs
{
public:
- push_session_t (xs::poller_base_t *io_thread_, bool connect_,
+ push_session_t (xs::io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~push_session_t ();
diff --git a/src/reaper.cpp b/src/reaper.cpp
index ffce0a9..7ce9e04 100644
--- a/src/reaper.cpp
+++ b/src/reaper.cpp
@@ -27,16 +27,16 @@ xs::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
sockets (0),
terminating (false)
{
- poller = poller_base_t::create (ctx_, tid_);
- xs_assert (poller);
+ io_thread = io_thread_t::create (ctx_, tid_);
+ xs_assert (io_thread);
- mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
- poller->set_pollin (mailbox_handle);
+ mailbox_handle = io_thread->add_fd (mailbox.get_fd (), this);
+ io_thread->set_pollin (mailbox_handle);
}
xs::reaper_t::~reaper_t ()
{
- delete poller;
+ delete io_thread;
}
xs::mailbox_t *xs::reaper_t::get_mailbox ()
@@ -47,7 +47,7 @@ xs::mailbox_t *xs::reaper_t::get_mailbox ()
void xs::reaper_t::start ()
{
// Start the thread.
- poller->start ();
+ io_thread->start ();
}
void xs::reaper_t::stop ()
@@ -90,15 +90,15 @@ void xs::reaper_t::process_stop ()
// If there are no sockets being reaped finish immediately.
if (!sockets) {
send_done ();
- poller->rm_fd (mailbox_handle);
- poller->stop ();
+ io_thread->rm_fd (mailbox_handle);
+ io_thread->stop ();
}
}
void xs::reaper_t::process_reap (socket_base_t *socket_)
{
- // Add the socket to the poller.
- socket_->start_reaping (poller);
+ // Add the socket to the I/O thread.
+ socket_->start_reaping (io_thread);
++sockets;
}
@@ -111,7 +111,7 @@ void xs::reaper_t::process_reaped ()
// finish immediately.
if (!sockets && terminating) {
send_done ();
- poller->rm_fd (mailbox_handle);
- poller->stop ();
+ io_thread->rm_fd (mailbox_handle);
+ io_thread->stop ();
}
}
diff --git a/src/reaper.hpp b/src/reaper.hpp
index bc40fac..0e7618b 100644
--- a/src/reaper.hpp
+++ b/src/reaper.hpp
@@ -23,7 +23,7 @@
#include "object.hpp"
#include "mailbox.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
namespace xs
{
@@ -61,8 +61,8 @@ namespace xs
// Handle associated with mailbox' file descriptor.
handle_t mailbox_handle;
- // I/O multiplexing is performed using a poller object.
- poller_base_t *poller;
+ // I/O multiplexing is performed using an io_thread object.
+ io_thread_t *io_thread;
// Number of sockets being reaped at the moment.
int sockets;
diff --git a/src/rep.cpp b/src/rep.cpp
index 5e5e415..6454b57 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -112,7 +112,7 @@ bool xs::rep_t::xhas_out ()
return xrep_t::xhas_out ();
}
-xs::rep_session_t::rep_session_t (poller_base_t *io_thread_, bool connect_,
+xs::rep_session_t::rep_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
xrep_session_t (io_thread_, connect_, socket_, options_, protocol_,
diff --git a/src/rep.hpp b/src/rep.hpp
index 2cd3931..0b608ee 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -29,7 +29,7 @@ namespace xs
class ctx_t;
class msg_t;
- class poller_base_t;
+ class io_thread_t;
class socket_base_t;
class rep_t : public xrep_t
@@ -64,7 +64,7 @@ namespace xs
{
public:
- rep_session_t (xs::poller_base_t *io_thread_, bool connect_,
+ rep_session_t (xs::io_thread_t *io_thread_, bool connect_,
xs::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~rep_session_t ();
diff --git a/src/req.cpp b/src/req.cpp
index 6e9867c..af6c8cd 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -137,7 +137,7 @@ bool xs::req_t::xhas_out ()
return xreq_t::xhas_out ();
}
-xs::req_session_t::req_session_t (poller_base_t *io_thread_, bool connect_,
+xs::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
xreq_session_t (io_thread_, connect_, socket_, options_, protocol_,
diff --git a/src/req.hpp b/src/req.hpp
index 8da789f..46015e8 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -31,7 +31,7 @@ namespace xs
class ctx_t;
class msg_t;
- class poller_base_t;
+ class io_thread_t;
class socket_base_t;
class req_t : public xreq_t
@@ -65,7 +65,7 @@ namespace xs
{
public:
- req_session_t (xs::poller_base_t *io_thread_, bool connect_,
+ req_session_t (xs::io_thread_t *io_thread_, bool connect_,
xs::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~req_session_t ();
diff --git a/src/select.cpp b/src/select.cpp
index 1b63685..af5f0f7 100644
--- a/src/select.cpp
+++ b/src/select.cpp
@@ -44,7 +44,7 @@
#include "config.hpp"
xs::select_t::select_t (xs::ctx_t *ctx_, uint32_t tid_) :
- poller_base_t (ctx_, tid_),
+ io_thread_t (ctx_, tid_),
maxfd (retired_fd),
retired (false),
stopping (false)
diff --git a/src/select.hpp b/src/select.hpp
index 283cc52..b1b96ef 100644
--- a/src/select.hpp
+++ b/src/select.hpp
@@ -42,7 +42,7 @@
#include "fd.hpp"
#include "thread.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
namespace xs
{
@@ -52,14 +52,14 @@ namespace xs
// Implements socket polling mechanism using POSIX.1-2001 select()
// function.
- class select_t : public poller_base_t
+ class select_t : public io_thread_t
{
public:
select_t (xs::ctx_t *ctx_, uint32_t tid_);
~select_t ();
- // "poller" concept.
+ // Implementation of virtual functions from io_thread_t.
handle_t add_fd (fd_t fd_, xs::i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 168501c..efc4f8b 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -43,7 +43,7 @@
#include "pull.hpp"
#include "pair.hpp"
-xs::session_base_t *xs::session_base_t::create (class poller_base_t *io_thread_,
+xs::session_base_t *xs::session_base_t::create (class io_thread_t *io_thread_,
bool connect_, class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_)
{
@@ -100,7 +100,7 @@ xs::session_base_t *xs::session_base_t::create (class poller_base_t *io_thread_,
return s;
}
-xs::session_base_t::session_base_t (class poller_base_t *io_thread_,
+xs::session_base_t::session_base_t (class io_thread_t *io_thread_,
bool connect_, class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
own_t (io_thread_, options_),
@@ -387,7 +387,7 @@ void xs::session_base_t::start_connecting (bool wait_)
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
- poller_base_t *io_thread = choose_io_thread (options.affinity);
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
xs_assert (io_thread);
// Create the connecter object.
diff --git a/src/session_base.hpp b/src/session_base.hpp
index 6a461d8..45ad8f8 100644
--- a/src/session_base.hpp
+++ b/src/session_base.hpp
@@ -33,7 +33,7 @@ namespace xs
{
class pipe_t;
- class poller_base_t;
+ class io_thread_t;
class socket_base_t;
struct i_engine;
@@ -45,7 +45,7 @@ namespace xs
public:
// Create a session of the particular type.
- static session_base_t *create (xs::poller_base_t *io_thread_,
+ static session_base_t *create (xs::io_thread_t *io_thread_,
bool connect_, xs::socket_base_t *socket_,
const options_t &options_, const char *protocol_,
const char *address_);
@@ -67,7 +67,7 @@ namespace xs
protected:
- session_base_t (xs::poller_base_t *io_thread_, bool connect_,
+ session_base_t (xs::io_thread_t *io_thread_, bool connect_,
xs::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~session_base_t ();
@@ -116,7 +116,7 @@ namespace xs
// I/O thread the session is living in. It will be used to plug in
// the engines into the same thread.
- xs::poller_base_t *io_thread;
+ xs::io_thread_t *io_thread;
// If true, identity is to be sent/recvd from the network.
bool send_identity;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 492d027..7ab00ea 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -39,7 +39,7 @@
#include "tcp_listener.hpp"
#include "ipc_listener.hpp"
#include "tcp_connecter.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
#include "session_base.hpp"
#include "config.hpp"
#include "clock.hpp"
@@ -338,7 +338,7 @@ int xs::socket_base_t::bind (const char *addr_)
// Remaining trasnports require to be run in an I/O thread, so at this
// point we'll choose one.
- poller_base_t *io_thread = choose_io_thread (options.affinity);
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
errno = EMTHREAD;
return -1;
@@ -449,7 +449,7 @@ int xs::socket_base_t::connect (const char *addr_)
}
// Choose the I/O thread to run the session in.
- poller_base_t *io_thread = choose_io_thread (options.affinity);
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
errno = EMTHREAD;
return -1;
@@ -667,12 +667,12 @@ bool xs::socket_base_t::has_out ()
return ret;
}
-void xs::socket_base_t::start_reaping (poller_base_t *poller_)
+void xs::socket_base_t::start_reaping (io_thread_t *io_thread_)
{
// Plug the socket to the reaper thread.
- poller = poller_;
- handle = poller->add_fd (mailbox.get_fd (), this);
- poller->set_pollin (handle);
+ io_thread = io_thread_;
+ handle = io_thread->add_fd (mailbox.get_fd (), this);
+ io_thread->set_pollin (handle);
// Initialise the termination and check whether it can be deallocated
// immediately.
@@ -843,8 +843,8 @@ void xs::socket_base_t::check_destroy ()
// If the object was already marked as destroyed, finish the deallocation.
if (destroyed) {
- // Remove the socket from the reaper's poller.
- poller->rm_fd (handle);
+ // Remove the socket from the reaper's I/O thread.
+ io_thread->rm_fd (handle);
// Remove the socket from the context.
destroy_socket (this);
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 8b9a948..dd1fee7 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -28,7 +28,7 @@
#include "own.hpp"
#include "array.hpp"
#include "stdint.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
#include "atomic_counter.hpp"
#include "mailbox.hpp"
#include "stdint.hpp"
@@ -83,12 +83,12 @@ namespace xs
bool has_in ();
bool has_out ();
- // Using this function reaper thread ask the socket to regiter with
- // its poller.
- void start_reaping (poller_base_t *poller_);
+ // Using this function reaper thread asks the socket to regiter with
+ // its I/O thread.
+ void start_reaping (io_thread_t *io_thread_);
// i_poll_events implementation. This interface is used when socket
- // is handled by the poller in the reaper thread.
+ // is handled by the io_thread in the reaper thread.
void in_event (fd_t fd_);
void out_event (fd_t fd_);
void timer_event (handle_t handle_);
@@ -188,8 +188,8 @@ namespace xs
typedef array_t <pipe_t, 3> pipes_t;
pipes_t pipes;
- // Reaper's poller and handle of this socket within it.
- poller_base_t *poller;
+ // Reaper's io_thread and handle of this socket within it.
+ io_thread_t *io_thread;
handle_t handle;
// Timestamp of when commands were processed the last time.
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index 7d69848..71032f4 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -36,7 +36,7 @@
#include <new>
#include "stream_engine.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
#include "session_base.hpp"
#include "config.hpp"
#include "err.hpp"
@@ -103,7 +103,7 @@ xs::stream_engine_t::~stream_engine_t ()
}
}
-void xs::stream_engine_t::plug (poller_base_t *io_thread_,
+void xs::stream_engine_t::plug (io_thread_t *io_thread_,
session_base_t *session_)
{
xs_assert (!plugged);
@@ -117,7 +117,7 @@ void xs::stream_engine_t::plug (poller_base_t *io_thread_,
decoder.set_session (session_);
session = session_;
- // Connect to I/O threads poller object.
+ // Connect to the io_thread object.
io_object_t::plug (io_thread_);
handle = add_fd (s);
set_pollin (handle);
@@ -135,7 +135,7 @@ void xs::stream_engine_t::unplug ()
// Cancel all fd subscriptions.
rm_fd (handle);
- // Disconnect from I/O threads poller object.
+ // Disconnect from the io_thread object.
io_object_t::unplug ();
// Disconnect from session object.
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp
index 9060cd7..f246552 100644
--- a/src/stream_engine.hpp
+++ b/src/stream_engine.hpp
@@ -34,7 +34,7 @@
namespace xs
{
- class poller_base_t;
+ class io_thread_t;
class session_base_t;
// This engine handles any socket with SOCK_STREAM semantics,
@@ -48,7 +48,7 @@ namespace xs
~stream_engine_t ();
// i_engine interface implementation.
- void plug (xs::poller_base_t *io_thread_,
+ void plug (xs::io_thread_t *io_thread_,
xs::session_base_t *session_);
void unplug ();
void terminate ();
diff --git a/src/sub.cpp b/src/sub.cpp
index 08ce368..ad55ea3 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -80,7 +80,7 @@ bool xs::sub_t::xhas_out ()
return false;
}
-xs::sub_session_t::sub_session_t (poller_base_t *io_thread_, bool connect_,
+xs::sub_session_t::sub_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
xsub_session_t (io_thread_, connect_, socket_, options_, protocol_,
diff --git a/src/sub.hpp b/src/sub.hpp
index 9b38822..70ade88 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -29,7 +29,7 @@ namespace xs
class ctx_t;
class msg_t;
- class poller_base_t;
+ class io_thread_t;
class socket_base_t;
class sub_t : public xsub_t
@@ -55,7 +55,7 @@ namespace xs
{
public:
- sub_session_t (xs::poller_base_t *io_thread_, bool connect_,
+ sub_session_t (xs::io_thread_t *io_thread_, bool connect_,
xs::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~sub_session_t ();
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index 6816e64..5007570 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -24,7 +24,7 @@
#include "tcp_connecter.hpp"
#include "stream_engine.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
#include "platform.hpp"
#include "random.hpp"
#include "err.hpp"
@@ -46,7 +46,7 @@
#endif
#endif
-xs::tcp_connecter_t::tcp_connecter_t (class poller_base_t *io_thread_,
+xs::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
class session_base_t *session_, const options_t &options_,
const char *address_, bool wait_) :
own_t (io_thread_, options_),
diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp
index 2b946a1..6790408 100644
--- a/src/tcp_connecter.hpp
+++ b/src/tcp_connecter.hpp
@@ -31,7 +31,7 @@
namespace xs
{
- class poller_base_t;
+ class io_thread_t;
class session_base_t;
class tcp_connecter_t : public own_t, public io_object_t
@@ -40,7 +40,7 @@ namespace xs
// If 'delay' is true connecter first waits for a while, then starts
// connection process.
- tcp_connecter_t (xs::poller_base_t *io_thread_,
+ tcp_connecter_t (xs::io_thread_t *io_thread_,
xs::session_base_t *session_, const options_t &options_,
const char *address_, bool delay_);
~tcp_connecter_t ();
@@ -91,7 +91,7 @@ namespace xs
fd_t s;
// Handle corresponding to the listening socket or NULL if the socket
- // is not registered with the poller.
+ // is not registered with the I/O thread.
handle_t handle;
// If true, connecter is waiting a while before trying to connect.
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index e8900fc..31a9149 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -26,7 +26,7 @@
#include "platform.hpp"
#include "tcp_listener.hpp"
#include "stream_engine.hpp"
-#include "poller_base.hpp"
+#include "io_thread.hpp"
#include "session_base.hpp"
#include "config.hpp"
#include "err.hpp"
@@ -48,7 +48,7 @@
#include <ioctl.h>
#endif
-xs::tcp_listener_t::tcp_listener_t (poller_base_t *io_thread_,
+xs::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_,
socket_base_t *socket_, const options_t &options_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
@@ -94,7 +94,7 @@ void xs::tcp_listener_t::in_event (fd_t fd_)
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
- poller_base_t *io_thread = choose_io_thread (options.affinity);
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
xs_assert (io_thread);
// Create and launch a session object.
diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp
index 22c000f..153f771 100644
--- a/src/tcp_listener.hpp
+++ b/src/tcp_listener.hpp
@@ -31,14 +31,14 @@
namespace xs
{
- class poller_base_t;
+ class io_thread_t;
class socket_base_t;
class tcp_listener_t : public own_t, public io_object_t
{
public:
- tcp_listener_t (xs::poller_base_t *io_thread_,
+ tcp_listener_t (xs::io_thread_t *io_thread_,
xs::socket_base_t *socket_, const options_t &options_);
~tcp_listener_t ();
diff --git a/src/xpub.cpp b/src/xpub.cpp
index 6c0f75b..d27337e 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -182,7 +182,7 @@ void xs::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
}
}
-xs::xpub_session_t::xpub_session_t (poller_base_t *io_thread_, bool connect_,
+xs::xpub_session_t::xpub_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_,
diff --git a/src/xpub.hpp b/src/xpub.hpp
index 2c61c63..a26076e 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -37,7 +37,7 @@ namespace xs
class ctx_t;
class msg_t;
class pipe_t;
- class poller_base_t;
+ class io_thread_t;
class xpub_t :
public socket_base_t
@@ -89,7 +89,7 @@ namespace xs
{
public:
- xpub_session_t (xs::poller_base_t *io_thread_, bool connect_,
+ xpub_session_t (xs::io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~xpub_session_t ();
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 4bae2d9..051159d 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -307,7 +307,7 @@ bool xs::xrep_t::xhas_out ()
return true;
}
-xs::xrep_session_t::xrep_session_t (poller_base_t *io_thread_, bool connect_,
+xs::xrep_session_t::xrep_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_,
diff --git a/src/xrep.hpp b/src/xrep.hpp
index e5ab895..4b3192a 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -37,7 +37,7 @@ namespace xs
class ctx_t;
class pipe_t;
- class poller_base_t;
+ class io_thread_t;
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
class xrep_t :
@@ -109,7 +109,7 @@ namespace xs
{
public:
- xrep_session_t (xs::poller_base_t *io_thread_, bool connect_,
+ xrep_session_t (xs::io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~xrep_session_t ();
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 948ff22..d16f60e 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -115,7 +115,7 @@ void xs::xreq_t::xterminated (pipe_t *pipe_)
lb.terminated (pipe_);
}
-xs::xreq_session_t::xreq_session_t (poller_base_t *io_thread_, bool connect_,
+xs::xreq_session_t::xreq_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_,
diff --git a/src/xreq.hpp b/src/xreq.hpp
index 358b10a..ab3a360 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -32,7 +32,7 @@ namespace xs
class ctx_t;
class msg_t;
class pipe_t;
- class poller_base_t;
+ class io_thread_t;
class socket_base_t;
class xreq_t :
@@ -76,7 +76,7 @@ namespace xs
{
public:
- xreq_session_t (xs::poller_base_t *io_thread_, bool connect_,
+ xreq_session_t (xs::io_thread_t *io_thread_, bool connect_,
xs::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~xreq_session_t ();
diff --git a/src/xsub.cpp b/src/xsub.cpp
index ee53b2c..80cc205 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -221,7 +221,7 @@ void xs::xsub_t::send_subscription (unsigned char *data_, size_t size_,
xs_assert (sent);
}
-xs::xsub_session_t::xsub_session_t (poller_base_t *io_thread_, bool connect_,
+xs::xsub_session_t::xsub_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_,
diff --git a/src/xsub.hpp b/src/xsub.hpp
index 8769706..f9e5210 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -32,7 +32,7 @@ namespace xs
class ctx_t;
class pipe_t;
- class poller_base_t;
+ class io_thread_t;
class xsub_t :
public socket_base_t
@@ -91,7 +91,7 @@ namespace xs
{
public:
- xsub_session_t (class poller_base_t *io_thread_, bool connect_,
+ xsub_session_t (class io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~xsub_session_t ();