diff options
Diffstat (limited to 'src/epoll.cpp')
-rw-r--r-- | src/epoll.cpp | 214 |
1 files changed, 214 insertions, 0 deletions
diff --git a/src/epoll.cpp b/src/epoll.cpp new file mode 100644 index 0000000..a9780d2 --- /dev/null +++ b/src/epoll.cpp @@ -0,0 +1,214 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "platform.hpp" + +#ifdef ZS_HAVE_LINUX + +#include <sys/epoll.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <algorithm> + +#include "epoll.hpp" +#include "err.hpp" +#include "config.hpp" +#include "i_poll_events.hpp" + +zs::epoll_t::epoll_t () : + stopping (false) +{ + epoll_fd = epoll_create (1); + errno_assert (epoll_fd != -1); +} + +zs::epoll_t::~epoll_t () +{ + close (epoll_fd); + + for (retired_t::iterator it = retired.begin (); it != retired.end (); it ++) + delete *it; +} + +zs::handle_t zs::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) +{ + poll_entry_t *pe = new poll_entry_t; + zs_assert (pe != NULL); + + // The memset is not actually needed. It's here to prevent debugging + // tools to complain about using uninitialised memory. + memset (pe, 0, sizeof (poll_entry_t)); + + pe->fd = fd_; + pe->ev.events = 0; + pe->ev.data.ptr = pe; + pe->events = events_; + + int rc = epoll_ctl (epoll_fd, EPOLL_CTL_ADD, fd_, &pe->ev); + errno_assert (rc != -1); + + // Increase the load metric of the thread. + load.add (1); + + handle_t handle; + handle.ptr = pe; + return handle; +} + +void zs::epoll_t::rm_fd (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + int rc = epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev); + errno_assert (rc != -1); + pe->fd = retired_fd; + retired.push_back (pe); + + // Decrease the load metric of the thread. + load.sub (1); +} + +void zs::epoll_t::set_pollin (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + pe->ev.events |= EPOLLIN; + int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); + errno_assert (rc != -1); +} + +void zs::epoll_t::reset_pollin (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + pe->ev.events &= ~((short) EPOLLIN); + int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); + errno_assert (rc != -1); +} + +void zs::epoll_t::set_pollout (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + pe->ev.events |= EPOLLOUT; + int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); + errno_assert (rc != -1); +} + +void zs::epoll_t::reset_pollout (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + pe->ev.events &= ~((short) EPOLLOUT); + int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); + errno_assert (rc != -1); +} + +void zs::epoll_t::add_timer (i_poll_events *events_) +{ + timers.push_back (events_); +} + +void zs::epoll_t::cancel_timer (i_poll_events *events_) +{ + timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); + if (it == timers.end ()) + return; + timers.erase (it); +} + +int zs::epoll_t::get_load () +{ + return load.get (); +} + +void zs::epoll_t::start () +{ + worker.start (worker_routine, this); +} + +void zs::epoll_t::stop () +{ + stopping = true; +} + +void zs::epoll_t::join () +{ + worker.stop (); +} + +void zs::epoll_t::loop () +{ + epoll_event ev_buf [max_io_events]; + + while (!stopping) { + + // Wait for events. + int n; + while (true) { + n = epoll_wait (epoll_fd, &ev_buf [0], max_io_events, + timers.empty () ? -1 : max_timer_period); + if (!(n == -1 && errno == EINTR)) { + errno_assert (n != -1); + break; + } + } + + // Handle timer. + if (!n) { + + // Use local list of timers as timer handlers may fill new timers + // into the original array. + timers_t t; + std::swap (timers, t); + + // Trigger all the timers. + for (timers_t::iterator it = t.begin (); it != t.end (); it ++) + (*it)->timer_event (); + + continue; + } + + for (int i = 0; i < n; i ++) { + poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr); + + if (pe->fd == retired_fd) + continue; + if (ev_buf [i].events & (EPOLLERR | EPOLLHUP)) + pe->events->in_event (); + if (pe->fd == retired_fd) + continue; + if (ev_buf [i].events & EPOLLOUT) + pe->events->out_event (); + if (pe->fd == retired_fd) + continue; + if (ev_buf [i].events & EPOLLIN) + pe->events->in_event (); + } + + // Destroy retired event sources. + for (retired_t::iterator it = retired.begin (); it != retired.end (); + it ++) + delete *it; + retired.clear (); + } +} + +void zs::epoll_t::worker_routine (void *arg_) +{ + ((epoll_t*) arg_)->loop (); +} + +#endif |