diff options
Diffstat (limited to 'src/poller_base.cpp')
-rw-r--r-- | src/poller_base.cpp | 64 |
1 files changed, 55 insertions, 9 deletions
diff --git a/src/poller_base.cpp b/src/poller_base.cpp index 83e0301..913a8d7 100644 --- a/src/poller_base.cpp +++ b/src/poller_base.cpp @@ -27,44 +27,57 @@ #include "devpoll.hpp" #include "kqueue.hpp" -xs::poller_base_t *xs::poller_base_t::create () +xs::poller_base_t *xs::poller_base_t::create (xs::ctx_t *ctx_, uint32_t tid_) { poller_base_t *result; #if defined XS_HAVE_SELECT - result = new (std::nothrow) select_t; + result = new (std::nothrow) select_t (ctx_, tid_); #elif defined XS_HAVE_POLL - result = new (std::nothrow) poll_t; + result = new (std::nothrow) poll_t (ctx_, tid_); #elif defined XS_HAVE_EPOLL - result = new (std::nothrow) epoll_t; + result = new (std::nothrow) epoll_t (ctx_, tid_); #elif defined XS_HAVE_DEVPOLL - result = new (std::nothrow) devpoll_t; + result = new (std::nothrow) devpoll_t (ctx_, tid_); #elif defined XS_HAVE_KQUEUE - result = new (std::nothrow) kqueue_t; + result = new (std::nothrow) kqueue_t (ctx_, tid_); #endif alloc_assert (result); return result; } -xs::poller_base_t::poller_base_t () +xs::poller_base_t::poller_base_t (xs::ctx_t *ctx_, uint32_t tid_) : + object_t (ctx_, tid_) { } xs::poller_base_t::~poller_base_t () { - // Make sure there is no more load on the shutdown. - xs_assert (get_load () == 0); } void xs::poller_base_t::start () { + mailbox_handle = add_fd (mailbox.get_fd (), this); + set_pollin (mailbox_handle); xstart (); } void xs::poller_base_t::stop () { + // Ask the I/O thread to stop. + send_stop (); +} + +void xs::poller_base_t::process_stop () +{ + rm_fd (mailbox_handle); xstop (); } +xs::mailbox_t *xs::poller_base_t::get_mailbox () +{ + return &mailbox; +} + int xs::poller_base_t::get_load () { return load.get (); @@ -126,3 +139,36 @@ uint64_t xs::poller_base_t::execute_timers () // There are no more timers. return 0; } + +void xs::poller_base_t::in_event (fd_t fd_) +{ + // TODO: Do we want to limit number of commands I/O thread can + // process in a single go? + + while (true) { + + // Get the next command. If there is none, exit. + command_t cmd; + int rc = mailbox.recv (&cmd, 0); + if (rc != 0 && errno == EINTR) + continue; + if (rc != 0 && errno == EAGAIN) + break; + errno_assert (rc == 0); + + // Process the command. + cmd.destination->process_command (cmd); + } +} + +void xs::poller_base_t::out_event (fd_t fd_) +{ + // We are never polling for POLLOUT here. This function is never called. + xs_assert (false); +} + +void xs::poller_base_t::timer_event (handle_t handle_) +{ + // No timers here. This function is never called. + xs_assert (false); +} |