From 4ed70a930202b103e7e80b8dc925e0aaa4622595 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 29 Jul 2009 12:07:54 +0200 Subject: initial commit --- src/select.cpp | 236 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 236 insertions(+) create mode 100644 src/select.cpp (limited to 'src/select.cpp') diff --git a/src/select.cpp b/src/select.cpp new file mode 100644 index 0000000..9776db3 --- /dev/null +++ b/src/select.cpp @@ -0,0 +1,236 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "platform.hpp" + +#include +#include + +#ifdef ZS_HAVE_WINDOWS +#include "winsock2.h" +#elif defined ZS_HAVE_HPUX +#include +#include +#include +#elif defined ZS_HAVE_OPENVMS +#include +#include +#else +#include +#endif + +#include "select.hpp" +#include "err.hpp" +#include "config.hpp" +#include "i_poll_events.hpp" + +zs::select_t::select_t () : + maxfd (retired_fd), + retired (false), + stopping (false) +{ + // Clear file descriptor sets. + FD_ZERO (&source_set_in); + FD_ZERO (&source_set_out); + FD_ZERO (&source_set_err); +} + +zs::handle_t zs::select_t::add_fd (fd_t fd_, i_poll_events *events_) +{ + // Store the file descriptor. + fd_entry_t entry = {fd_, events_}; + fds.push_back (entry); + + // Start polling on errors. + FD_SET (fd_, &source_set_err); + + // Adjust maxfd if necessary. + if (fd_ > maxfd) + maxfd = fd_; + + // Increase the load metric of the thread. + load.add (1); + + handle_t handle; + handle.fd = fd_; + return handle; +} + +void zs::select_t::rm_fd (handle_t handle_) +{ + // Get file descriptor. + fd_t fd = handle_.fd; + + // Mark the descriptor as retired. + fd_set_t::iterator it; + for (it = fds.begin (); it != fds.end (); it ++) + if (it->fd == fd) + break; + zs_assert (it != fds.end ()); + it->fd = retired_fd; + retired = true; + + // Stop polling on the descriptor. + FD_CLR (fd, &source_set_in); + FD_CLR (fd, &source_set_out); + FD_CLR (fd, &source_set_err); + + // Discard all events generated on this file descriptor. + FD_CLR (fd, &readfds); + FD_CLR (fd, &writefds); + FD_CLR (fd, &exceptfds); + + // Adjust the maxfd attribute if we have removed the + // highest-numbered file descriptor. + if (fd == maxfd) { + maxfd = retired_fd; + for (fd_set_t::iterator it = fds.begin (); it != fds.end (); it ++) + if (it->fd > maxfd) + maxfd = it->fd; + } + + // Decrease the load metric of the thread. + load.sub (1); +} + +void zs::select_t::set_pollin (handle_t handle_) +{ + FD_SET (handle_.fd, &source_set_in); +} + +void zs::select_t::reset_pollin (handle_t handle_) +{ + FD_CLR (handle_.fd, &source_set_in); +} + +void zs::select_t::set_pollout (handle_t handle_) +{ + FD_SET (handle_.fd, &source_set_out); +} + +void zs::select_t::reset_pollout (handle_t handle_) +{ + FD_CLR (handle_.fd, &source_set_out); +} + +void zs::select_t::add_timer (i_poll_events *events_) +{ + timers.push_back (events_); +} + +void zs::select_t::cancel_timer (i_poll_events *events_) +{ + timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); + if (it != timers.end ()) + timers.erase (it); +} + +int zs::select_t::get_load () +{ + return load.get (); +} + +void zs::select_t::start () +{ + worker.start (worker_routine, this); +} + +void zs::select_t::stop () +{ + stopping = true; +} + +void zs::select_t::join () +{ + worker.stop (); +} + +void zs::select_t::loop () +{ + while (!stopping) { + + // Intialise the pollsets. + memcpy (&readfds, &source_set_in, sizeof source_set_in); + memcpy (&writefds, &source_set_out, sizeof source_set_out); + memcpy (&exceptfds, &source_set_err, sizeof source_set_err); + + // Compute the timout interval. Select is free to overwrite the + // value so we have to compute it each time anew. + timeval timeout = {max_timer_period / 1000, + (max_timer_period % 1000) * 1000}; + + // Wait for events. + int rc = select (maxfd + 1, &readfds, &writefds, &exceptfds, + timers.empty () ? NULL : &timeout); + +#ifdef ZS_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); +#else + if (rc == -1 && errno == EINTR) + continue; + errno_assert (rc != -1); +#endif + + // Handle timer. + if (!rc) { + + // Use local list of timers as timer handlers may fill new timers + // into the original array. + timers_t t; + std::swap (timers, t); + + // Trigger all the timers. + for (timers_t::iterator it = t.begin (); it != t.end (); it ++) + (*it)->timer_event (); + + continue; + } + + for (fd_set_t::size_type i = 0; i < fds.size (); i ++) { + if (fds [i].fd == retired_fd) + continue; + if (FD_ISSET (fds [i].fd, &exceptfds)) + fds [i].events->in_event (); + if (fds [i].fd == retired_fd) + continue; + if (FD_ISSET (fds [i].fd, &writefds)) + fds [i].events->out_event (); + if (fds [i].fd == retired_fd) + continue; + if (FD_ISSET (fds [i].fd, &readfds)) + fds [i].events->in_event (); + } + + // Destroy retired event sources. + if (retired) { + for (fd_set_t::size_type i = 0; i < fds.size (); i ++) { + if (fds [i].fd == retired_fd) { + fds.erase (fds.begin () + i); + i --; + } + } + retired = false; + } + } +} + +void zs::select_t::worker_routine (void *arg_) +{ + ((select_t*) arg_)->loop (); +} -- cgit v1.2.3