summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-02-16 10:09:58 +0900
committerMartin Sustrik <sustrik@250bpm.com>2012-02-16 10:09:58 +0900
commit24cf53ad6eaa0eb9a873b909e932d95417f52d2c (patch)
tree0d2f19a76536d09d0fe6e22874ce572eda70eccd /src
parent0641ffa5e9588dd7daaf389c40a213994fe4b1b1 (diff)
io_thread_t merged with poller_base_t
The relationship of these two classes was 1:1. Thus one of them was obsolete. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am2
-rw-r--r--src/ctx.cpp9
-rw-r--r--src/ctx.hpp6
-rw-r--r--src/epoll.cpp3
-rw-r--r--src/epoll.hpp3
-rw-r--r--src/i_engine.hpp4
-rw-r--r--src/io_object.cpp8
-rw-r--r--src/io_object.hpp6
-rw-r--r--src/io_thread.cpp108
-rw-r--r--src/io_thread.hpp90
-rw-r--r--src/ipc_connecter.cpp4
-rw-r--r--src/ipc_connecter.hpp4
-rw-r--r--src/ipc_listener.cpp6
-rw-r--r--src/ipc_listener.hpp4
-rw-r--r--src/monitor.cpp4
-rw-r--r--src/monitor.hpp4
-rw-r--r--src/object.cpp4
-rw-r--r--src/object.hpp4
-rw-r--r--src/own.cpp4
-rw-r--r--src/own.hpp4
-rw-r--r--src/pair.cpp2
-rw-r--r--src/pair.hpp2
-rw-r--r--src/pgm_receiver.cpp4
-rw-r--r--src/pgm_receiver.hpp6
-rw-r--r--src/pgm_sender.cpp6
-rw-r--r--src/pgm_sender.hpp6
-rw-r--r--src/poller_base.cpp64
-rw-r--r--src/poller_base.hpp26
-rw-r--r--src/pub.cpp2
-rw-r--r--src/pub.hpp4
-rw-r--r--src/pull.cpp2
-rw-r--r--src/pull.hpp4
-rw-r--r--src/push.cpp2
-rw-r--r--src/push.hpp4
-rw-r--r--src/reaper.cpp2
-rw-r--r--src/rep.cpp2
-rw-r--r--src/rep.hpp4
-rw-r--r--src/req.cpp2
-rw-r--r--src/req.hpp4
-rw-r--r--src/session_base.cpp6
-rw-r--r--src/session_base.hpp8
-rw-r--r--src/socket_base.cpp6
-rw-r--r--src/stream_engine.cpp4
-rw-r--r--src/stream_engine.hpp4
-rw-r--r--src/sub.cpp2
-rw-r--r--src/sub.hpp4
-rw-r--r--src/tcp_connecter.cpp4
-rw-r--r--src/tcp_connecter.hpp4
-rw-r--r--src/tcp_listener.cpp6
-rw-r--r--src/tcp_listener.hpp4
-rw-r--r--src/xpub.cpp2
-rw-r--r--src/xpub.hpp4
-rw-r--r--src/xrep.cpp2
-rw-r--r--src/xrep.hpp3
-rw-r--r--src/xreq.cpp2
-rw-r--r--src/xreq.hpp4
-rw-r--r--src/xsub.cpp2
-rw-r--r--src/xsub.hpp4
58 files changed, 185 insertions, 319 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index c2c6f65..7d3ee8d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -23,7 +23,6 @@ libxs_la_SOURCES = \
fd.hpp \
fq.hpp \
io_object.hpp \
- io_thread.hpp \
ip.hpp \
ipc_address.hpp \
ipc_connecter.hpp \
@@ -86,7 +85,6 @@ libxs_la_SOURCES = \
err.cpp \
fq.cpp \
io_object.cpp \
- io_thread.cpp \
ip.cpp \
ipc_address.cpp \
ipc_connecter.cpp \
diff --git a/src/ctx.cpp b/src/ctx.cpp
index a8df950..8b27b31 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -31,7 +31,6 @@
#include "ctx.hpp"
#include "socket_base.hpp"
-#include "io_thread.hpp"
#include "monitor.hpp"
#include "reaper.hpp"
#include "pipe.hpp"
@@ -198,8 +197,8 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_)
// Create I/O thread objects and launch them.
for (uint32_t i = 2; i != io_thread_count + 2; i++) {
- io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
- alloc_assert (io_thread);
+ poller_base_t *io_thread = poller_base_t::create (this, i);
+ errno_assert (io_thread);
io_threads.push_back (io_thread);
slots [i] = io_thread->get_mailbox ();
io_thread->start ();
@@ -227,7 +226,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_)
#endif
// Create the monitor object.
- io_thread_t *io_thread = choose_io_thread (0);
+ poller_base_t *io_thread = choose_io_thread (0);
xs_assert (io_thread);
monitor = new (std::nothrow) monitor_t (io_thread);
alloc_assert (monitor);
@@ -302,7 +301,7 @@ void xs::ctx_t::send_command (uint32_t tid_, const command_t &command_)
slots [tid_]->send (command_);
}
-xs::io_thread_t *xs::ctx_t::choose_io_thread (uint64_t affinity_)
+xs::poller_base_t *xs::ctx_t::choose_io_thread (uint64_t affinity_)
{
if (io_threads.empty ())
return NULL;
diff --git a/src/ctx.hpp b/src/ctx.hpp
index e912443..c12fffc 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -40,7 +40,7 @@ namespace xs
class object_t;
class monitor_t;
- class io_thread_t;
+ class poller_base_t;
class socket_base_t;
class reaper_t;
@@ -86,7 +86,7 @@ namespace xs
// Returns the I/O thread that is the least busy at the moment.
// Affinity specifies which I/O threads are eligible (0 = all).
// Returns NULL is no I/O thread is available.
- xs::io_thread_t *choose_io_thread (uint64_t affinity_);
+ xs::poller_base_t *choose_io_thread (uint64_t affinity_);
// Returns reaper thread object.
xs::object_t *get_reaper ();
@@ -142,7 +142,7 @@ namespace xs
xs::reaper_t *reaper;
// I/O threads.
- typedef std::vector <xs::io_thread_t*> io_threads_t;
+ typedef std::vector <xs::poller_base_t*> io_threads_t;
io_threads_t io_threads;
// Array of pointers to mailboxes for both application and I/O threads.
diff --git a/src/epoll.cpp b/src/epoll.cpp
index 7208a57..7ca4608 100644
--- a/src/epoll.cpp
+++ b/src/epoll.cpp
@@ -34,7 +34,8 @@
#include "config.hpp"
#include "err.hpp"
-xs::epoll_t::epoll_t () :
+xs::epoll_t::epoll_t (xs::ctx_t *ctx_, uint32_t tid_) :
+ poller_base_t (ctx_, tid_),
stopping (false)
{
epoll_fd = epoll_create (1);
diff --git a/src/epoll.hpp b/src/epoll.hpp
index bac555b..232d049 100644
--- a/src/epoll.hpp
+++ b/src/epoll.hpp
@@ -36,6 +36,7 @@
namespace xs
{
+ class ctx_t;
struct i_poll_events;
// This class implements socket polling mechanism using the Linux-specific
@@ -45,7 +46,7 @@ namespace xs
{
public:
- epoll_t ();
+ epoll_t (xs::ctx_t *ctx_, uint32_t tid_);
~epoll_t ();
// "poller" concept.
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index fc03f0e..9106411 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -25,7 +25,7 @@
namespace xs
{
- class io_thread_t;
+ class poller_base_t;
// Abstract interface to be implemented by various engines.
@@ -34,7 +34,7 @@ namespace xs
virtual ~i_engine () {}
// Plug the engine to the session.
- virtual void plug (xs::io_thread_t *io_thread_,
+ virtual void plug (xs::poller_base_t *io_thread_,
class session_base_t *session_) = 0;
// Unplug the engine from the session.
diff --git a/src/io_object.cpp b/src/io_object.cpp
index abc8204..0c73571 100644
--- a/src/io_object.cpp
+++ b/src/io_object.cpp
@@ -20,10 +20,10 @@
*/
#include "io_object.hpp"
-#include "io_thread.hpp"
+#include "poller_base.hpp"
#include "err.hpp"
-xs::io_object_t::io_object_t (io_thread_t *io_thread_) :
+xs::io_object_t::io_object_t (poller_base_t *io_thread_) :
poller (NULL)
{
if (io_thread_)
@@ -34,13 +34,13 @@ xs::io_object_t::~io_object_t ()
{
}
-void xs::io_object_t::plug (io_thread_t *io_thread_)
+void xs::io_object_t::plug (poller_base_t *io_thread_)
{
xs_assert (io_thread_);
xs_assert (!poller);
// Retrieve the poller from the thread we are running in.
- poller = io_thread_->get_poller ();
+ poller = io_thread_;
}
void xs::io_object_t::unplug ()
diff --git a/src/io_object.hpp b/src/io_object.hpp
index 0abd457..0749ceb 100644
--- a/src/io_object.hpp
+++ b/src/io_object.hpp
@@ -30,8 +30,6 @@
namespace xs
{
- class io_thread_t;
-
// Simple base class for objects that live in I/O threads.
// It makes communication with the poller object easier and
// makes defining unneeded event handlers unnecessary.
@@ -40,12 +38,12 @@ namespace xs
{
public:
- io_object_t (xs::io_thread_t *io_thread_ = NULL);
+ io_object_t (xs::poller_base_t *io_thread_ = NULL);
~io_object_t ();
// When migrating an object from one I/O thread to another, first
// unplug it, then migrate it, then plug it to the new thread.
- void plug (xs::io_thread_t *io_thread_);
+ void plug (xs::poller_base_t *io_thread_);
void unplug ();
protected:
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
deleted file mode 100644
index 77cebfe..0000000
--- a/src/io_thread.cpp
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- Copyright (c) 2009-2012 250bpm s.r.o.
- Copyright (c) 2007-2009 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of Crossroads project.
-
- Crossroads is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- Crossroads is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <new>
-
-#include "io_thread.hpp"
-#include "platform.hpp"
-#include "err.hpp"
-#include "ctx.hpp"
-
-xs::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
- object_t (ctx_, tid_)
-{
- poller = poller_base_t::create ();
- xs_assert (poller);
-
- mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
- poller->set_pollin (mailbox_handle);
-}
-
-xs::io_thread_t::~io_thread_t ()
-{
- delete poller;
-}
-
-void xs::io_thread_t::start ()
-{
- // Start the underlying I/O thread.
- poller->start ();
-}
-
-void xs::io_thread_t::stop ()
-{
- send_stop ();
-}
-
-xs::mailbox_t *xs::io_thread_t::get_mailbox ()
-{
- return &mailbox;
-}
-
-int xs::io_thread_t::get_load ()
-{
- return poller->get_load ();
-}
-
-void xs::io_thread_t::in_event (fd_t fd_)
-{
- // TODO: Do we want to limit number of commands I/O thread can
- // process in a single go?
-
- while (true) {
-
- // Get the next command. If there is none, exit.
- command_t cmd;
- int rc = mailbox.recv (&cmd, 0);
- if (rc != 0 && errno == EINTR)
- continue;
- if (rc != 0 && errno == EAGAIN)
- break;
- errno_assert (rc == 0);
-
- // Process the command.
- cmd.destination->process_command (cmd);
- }
-}
-
-void xs::io_thread_t::out_event (fd_t fd_)
-{
- // We are never polling for POLLOUT here. This function is never called.
- xs_assert (false);
-}
-
-void xs::io_thread_t::timer_event (handle_t handle_)
-{
- // No timers here. This function is never called.
- xs_assert (false);
-}
-
-xs::poller_base_t *xs::io_thread_t::get_poller ()
-{
- xs_assert (poller);
- return poller;
-}
-
-void xs::io_thread_t::process_stop ()
-{
- poller->rm_fd (mailbox_handle);
- poller->stop ();
-}
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
deleted file mode 100644
index 379f420..0000000
--- a/src/io_thread.hpp
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- Copyright (c) 2009-2012 250bpm s.r.o.
- Copyright (c) 2007-2009 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of Crossroads project.
-
- Crossroads is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- Crossroads is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __XS_IO_THREAD_HPP_INCLUDED__
-#define __XS_IO_THREAD_HPP_INCLUDED__
-
-#include <vector>
-
-#include "stdint.hpp"
-#include "object.hpp"
-#include "poller_base.hpp"
-#include "mailbox.hpp"
-
-namespace xs
-{
-
- class ctx_t;
-
- // Generic part of the I/O thread. Polling-mechanism-specific features
- // are implemented in separate "polling objects".
-
- class io_thread_t : public object_t, public i_poll_events
- {
- public:
-
- io_thread_t (xs::ctx_t *ctx_, uint32_t tid_);
-
- // Clean-up. If the thread was started, it's neccessary to call 'stop'
- // before invoking destructor. Otherwise the destructor would hang up.
- ~io_thread_t ();
-
- // Launch the physical thread.
- void start ();
-
- // Ask underlying thread to stop.
- void stop ();
-
- // Returns mailbox associated with this I/O thread.
- mailbox_t *get_mailbox ();
-
- // i_poll_events implementation.
- void in_event (fd_t fd_);
- void out_event (fd_t fd_);
- void timer_event (handle_t handle_);
-
- // Used by io_objects to retrieve the assciated poller object.
- poller_base_t *get_poller ();
-
- // Command handlers.
- void process_stop ();
-
- // Returns load experienced by the I/O thread.
- int get_load ();
-
- private:
-
- // I/O thread accesses incoming commands via this mailbox.
- mailbox_t mailbox;
-
- // Handle associated with mailbox' file descriptor.
- handle_t mailbox_handle;
-
- // I/O multiplexing is performed using a poller object.
- poller_base_t *poller;
-
- io_thread_t (const io_thread_t&);
- const io_thread_t &operator = (const io_thread_t&);
- };
-
-}
-
-#endif
diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp
index f6a01c3..b65887a 100644
--- a/src/ipc_connecter.cpp
+++ b/src/ipc_connecter.cpp
@@ -26,7 +26,7 @@
#include <string>
#include "stream_engine.hpp"
-#include "io_thread.hpp"
+#include "poller_base.hpp"
#include "platform.hpp"
#include "random.hpp"
#include "err.hpp"
@@ -37,7 +37,7 @@
#include <sys/socket.h>
#include <sys/un.h>
-xs::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
+xs::ipc_connecter_t::ipc_connecter_t (class poller_base_t *io_thread_,
class session_base_t *session_, const options_t &options_,
const char *address_, bool wait_) :
own_t (io_thread_, options_),
diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp
index 2a8e8e4..46e260c 100644
--- a/src/ipc_connecter.hpp
+++ b/src/ipc_connecter.hpp
@@ -34,7 +34,7 @@
namespace xs
{
- class io_thread_t;
+ class poller_base_t;
class session_base_t;
class ipc_connecter_t : public own_t, public io_object_t
@@ -43,7 +43,7 @@ namespace xs
// If 'delay' is true connecter first waits for a while, then starts
// connection process.
- ipc_connecter_t (xs::io_thread_t *io_thread_,
+ ipc_connecter_t (xs::poller_base_t *io_thread_,
xs::session_base_t *session_, const options_t &options_,
const char *address_, bool delay_);
~ipc_connecter_t ();
diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp
index db49528..bd55cc7 100644
--- a/src/ipc_listener.cpp
+++ b/src/ipc_listener.cpp
@@ -28,7 +28,7 @@
#include "stream_engine.hpp"
#include "ipc_address.hpp"
-#include "io_thread.hpp"
+#include "poller_base.hpp"
#include "session_base.hpp"
#include "config.hpp"
#include "err.hpp"
@@ -39,7 +39,7 @@
#include <fcntl.h>
#include <sys/un.h>
-xs::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_,
+xs::ipc_listener_t::ipc_listener_t (poller_base_t *io_thread_,
socket_base_t *socket_, const options_t &options_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
@@ -83,7 +83,7 @@ void xs::ipc_listener_t::in_event (fd_t fd_)
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
- io_thread_t *io_thread = choose_io_thread (options.affinity);
+ poller_base_t *io_thread = choose_io_thread (options.affinity);
xs_assert (io_thread);
// Create and launch a session object.
diff --git a/src/ipc_listener.hpp b/src/ipc_listener.hpp
index b599bff..24b96fb 100644
--- a/src/ipc_listener.hpp
+++ b/src/ipc_listener.hpp
@@ -35,14 +35,14 @@
namespace xs
{
- class io_thread_t;
+ class poller_base_t;
class socket_base_t;
class ipc_listener_t : public own_t, public io_object_t
{
public:
- ipc_listener_t (xs::io_thread_t *io_thread_,
+ ipc_listener_t (xs::poller_base_t *io_thread_,
xs::socket_base_t *socket_, const options_t &options_);
~ipc_listener_t ();
diff --git a/src/monitor.cpp b/src/monitor.cpp
index b29c874..14a31f2 100644
--- a/src/monitor.cpp
+++ b/src/monitor.cpp
@@ -19,12 +19,12 @@
*/
#include "monitor.hpp"
-#include "io_thread.hpp"
+#include "poller_base.hpp"
#include "options.hpp"
#include "random.hpp"
#include "err.hpp"
-xs::monitor_t::monitor_t (xs::io_thread_t *io_thread_) :
+xs::monitor_t::monitor_t (xs::poller_base_t *io_thread_) :
own_t (io_thread_, options_t ()),
io_object_t (io_thread_),
timer (NULL)
diff --git a/src/monitor.hpp b/src/monitor.hpp
index 73bc31c..91d441a 100644
--- a/src/monitor.hpp
+++ b/src/monitor.hpp
@@ -30,14 +30,14 @@
namespace xs
{
- class io_thread_t;
+ class poller_base_t;
class socket_base_t;
class monitor_t : public own_t, public io_object_t
{
public:
- monitor_t (xs::io_thread_t *io_thread_);
+ monitor_t (xs::poller_base_t *io_thread_);
~monitor_t ();
void start ();
diff --git a/src/object.cpp b/src/object.cpp
index 2d1f5aa..df279f5 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -26,7 +26,7 @@
#include "ctx.hpp"
#include "err.hpp"
#include "pipe.hpp"
-#include "io_thread.hpp"
+#include "poller_base.hpp"
#include "session_base.hpp"
#include "socket_base.hpp"
@@ -149,7 +149,7 @@ void xs::object_t::destroy_socket (socket_base_t *socket_)
ctx->destroy_socket (socket_);
}
-xs::io_thread_t *xs::object_t::choose_io_thread (uint64_t affinity_)
+xs::poller_base_t *xs::object_t::choose_io_thread (uint64_t affinity_)
{
return ctx->choose_io_thread (affinity_);
}
diff --git a/src/object.hpp b/src/object.hpp
index b1bc0c3..14dad49 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -34,7 +34,7 @@ namespace xs
class pipe_t;
class socket_base_t;
class session_base_t;
- class io_thread_t;
+ class poller_base_t;
class own_t;
// Base class for all objects that participate in inter-thread
@@ -62,7 +62,7 @@ namespace xs
void destroy_socket (xs::socket_base_t *socket_);
// Chooses least loaded I/O thread.
- xs::io_thread_t *choose_io_thread (uint64_t affinity_);
+ xs::poller_base_t *choose_io_thread (uint64_t affinity_);
// Logging related functions.
void log (int sid_, const char *text_);
diff --git a/src/own.cpp b/src/own.cpp
index a60d9d0..3047d7b 100644
--- a/src/own.cpp
+++ b/src/own.cpp
@@ -20,7 +20,7 @@
#include "own.hpp"
#include "err.hpp"
-#include "io_thread.hpp"
+#include "poller_base.hpp"
xs::own_t::own_t (class ctx_t *parent_, uint32_t tid_) :
object_t (parent_, tid_),
@@ -32,7 +32,7 @@ xs::own_t::own_t (class ctx_t *parent_, uint32_t tid_) :
{
}
-xs::own_t::own_t (io_thread_t *io_thread_, const options_t &options_) :
+xs::own_t::own_t (poller_base_t *io_thread_, const options_t &options_) :
object_t (io_thread_),
options (options_),
terminating (false),
diff --git a/src/own.hpp b/src/own.hpp
index d0afd62..de0320a 100644
--- a/src/own.hpp
+++ b/src/own.hpp
@@ -33,7 +33,7 @@ namespace xs
{
class ctx_t;
- class io_thread_t;
+ class poller_base_t;
// Base class for objects forming a part of ownership hierarchy.
// It handles initialisation and destruction of such objects.
@@ -50,7 +50,7 @@ namespace xs
own_t (xs::ctx_t *parent_, uint32_t tid_);
// The object is living within I/O thread.
- own_t (xs::io_thread_t *io_thread_, const options_t &options_);
+ own_t (xs::poller_base_t *io_thread_, const options_t &options_);
// When another owned object wants to send command to this object
// it calls this function to let it know it should not shut down
diff --git a/src/pair.cpp b/src/pair.cpp
index dd14d8d..c35084f 100644
--- a/