summaryrefslogtreecommitdiff
path: root/src/poller_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/poller_base.cpp')
-rw-r--r--src/poller_base.cpp64
1 files changed, 55 insertions, 9 deletions
diff --git a/src/poller_base.cpp b/src/poller_base.cpp
index 83e0301..913a8d7 100644
--- a/src/poller_base.cpp
+++ b/src/poller_base.cpp
@@ -27,44 +27,57 @@
#include "devpoll.hpp"
#include "kqueue.hpp"
-xs::poller_base_t *xs::poller_base_t::create ()
+xs::poller_base_t *xs::poller_base_t::create (xs::ctx_t *ctx_, uint32_t tid_)
{
poller_base_t *result;
#if defined XS_HAVE_SELECT
- result = new (std::nothrow) select_t;
+ result = new (std::nothrow) select_t (ctx_, tid_);
#elif defined XS_HAVE_POLL
- result = new (std::nothrow) poll_t;
+ result = new (std::nothrow) poll_t (ctx_, tid_);
#elif defined XS_HAVE_EPOLL
- result = new (std::nothrow) epoll_t;
+ result = new (std::nothrow) epoll_t (ctx_, tid_);
#elif defined XS_HAVE_DEVPOLL
- result = new (std::nothrow) devpoll_t;
+ result = new (std::nothrow) devpoll_t (ctx_, tid_);
#elif defined XS_HAVE_KQUEUE
- result = new (std::nothrow) kqueue_t;
+ result = new (std::nothrow) kqueue_t (ctx_, tid_);
#endif
alloc_assert (result);
return result;
}
-xs::poller_base_t::poller_base_t ()
+xs::poller_base_t::poller_base_t (xs::ctx_t *ctx_, uint32_t tid_) :
+ object_t (ctx_, tid_)
{
}
xs::poller_base_t::~poller_base_t ()
{
- // Make sure there is no more load on the shutdown.
- xs_assert (get_load () == 0);
}
void xs::poller_base_t::start ()
{
+ mailbox_handle = add_fd (mailbox.get_fd (), this);
+ set_pollin (mailbox_handle);
xstart ();
}
void xs::poller_base_t::stop ()
{
+ // Ask the I/O thread to stop.
+ send_stop ();
+}
+
+void xs::poller_base_t::process_stop ()
+{
+ rm_fd (mailbox_handle);
xstop ();
}
+xs::mailbox_t *xs::poller_base_t::get_mailbox ()
+{
+ return &mailbox;
+}
+
int xs::poller_base_t::get_load ()
{
return load.get ();
@@ -126,3 +139,36 @@ uint64_t xs::poller_base_t::execute_timers ()
// There are no more timers.
return 0;
}
+
+void xs::poller_base_t::in_event (fd_t fd_)
+{
+ // TODO: Do we want to limit number of commands I/O thread can
+ // process in a single go?
+
+ while (true) {
+
+ // Get the next command. If there is none, exit.
+ command_t cmd;
+ int rc = mailbox.recv (&cmd, 0);
+ if (rc != 0 && errno == EINTR)
+ continue;
+ if (rc != 0 && errno == EAGAIN)
+ break;
+ errno_assert (rc == 0);
+
+ // Process the command.
+ cmd.destination->process_command (cmd);
+ }
+}
+
+void xs::poller_base_t::out_event (fd_t fd_)
+{
+ // We are never polling for POLLOUT here. This function is never called.
+ xs_assert (false);
+}
+
+void xs::poller_base_t::timer_event (handle_t handle_)
+{
+ // No timers here. This function is never called.
+ xs_assert (false);
+}