From e645fc2693acc796304498909786b7b47005b429 Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Mon, 23 Jan 2012 08:53:35 +0100 Subject: Imported Upstream version 2.1.3 --- src/select.cpp | 82 +++++++++++++++++++--------------------------------------- 1 file changed, 26 insertions(+), 56 deletions(-) (limited to 'src/select.cpp') diff --git a/src/select.cpp b/src/select.cpp index 59eb83e..56f9f74 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -1,19 +1,20 @@ /* - Copyright (c) 2007-2010 iMatix Corporation + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by + the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. + GNU Lesser General Public License for more details. - You should have received a copy of the Lesser GNU General Public License + You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ @@ -54,9 +55,6 @@ zmq::select_t::select_t () : zmq::select_t::~select_t () { worker.stop (); - - // Make sure there are no fds registered on shutdown. - zmq_assert (load.get () == 0); } zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) @@ -77,7 +75,7 @@ zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) maxfd = fd_; // Increase the load metric of the thread. - load.add (1); + adjust_load (1); return fd_; } @@ -86,7 +84,7 @@ void zmq::select_t::rm_fd (handle_t handle_) { // Mark the descriptor as retired. fd_set_t::iterator it; - for (it = fds.begin (); it != fds.end (); it ++) + for (it = fds.begin (); it != fds.end (); ++it) if (it->fd == handle_) break; zmq_assert (it != fds.end ()); @@ -107,13 +105,13 @@ void zmq::select_t::rm_fd (handle_t handle_) // highest-numbered file descriptor. if (handle_ == maxfd) { maxfd = retired_fd; - for (fd_set_t::iterator it = fds.begin (); it != fds.end (); it ++) + for (fd_set_t::iterator it = fds.begin (); it != fds.end (); ++it) if (it->fd > maxfd) maxfd = it->fd; } // Decrease the load metric of the thread. - load.sub (1); + adjust_load (-1); } void zmq::select_t::set_pollin (handle_t handle_) @@ -136,23 +134,6 @@ void zmq::select_t::reset_pollout (handle_t handle_) FD_CLR (handle_, &source_set_out); } -void zmq::select_t::add_timer (i_poll_events *events_) -{ - timers.push_back (events_); -} - -void zmq::select_t::cancel_timer (i_poll_events *events_) -{ - timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); - if (it != timers.end ()) - timers.erase (it); -} - -int zmq::select_t::get_load () -{ - return load.get (); -} - void zmq::select_t::start () { worker.start (worker_routine, this); @@ -167,19 +148,19 @@ void zmq::select_t::loop () { while (!stopping) { + // Execute any due timers. + int timeout = (int) execute_timers (); + // Intialise the pollsets. memcpy (&readfds, &source_set_in, sizeof source_set_in); memcpy (&writefds, &source_set_out, sizeof source_set_out); memcpy (&exceptfds, &source_set_err, sizeof source_set_err); - // Compute the timout interval. Select is free to overwrite the - // value so we have to compute it each time anew. - timeval timeout = {max_timer_period / 1000, - (max_timer_period % 1000) * 1000}; - // Wait for events. + struct timeval tv = {(long) (timeout / 1000), + (long) (timeout % 1000 * 1000)}; int rc = select (maxfd + 1, &readfds, &writefds, &exceptfds, - timers.empty () ? NULL : &timeout); + timeout ? &tv : NULL); #ifdef ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); @@ -189,20 +170,10 @@ void zmq::select_t::loop () errno_assert (rc != -1); #endif - // Handle timer. - if (!rc) { - - // Use local list of timers as timer handlers may fill new timers - // into the original array. - timers_t t; - std::swap (timers, t); - - // Trigger all the timers. - for (timers_t::iterator it = t.begin (); it != t.end (); it ++) - (*it)->timer_event (); - + // If there are no events (i.e. it's a timeout) there's no point + // in checking the pollset. + if (rc == 0) continue; - } for (fd_set_t::size_type i = 0; i < fds.size (); i ++) { if (fds [i].fd == retired_fd) @@ -221,15 +192,8 @@ void zmq::select_t::loop () // Destroy retired event sources. if (retired) { - fd_set_t::iterator it = fds.begin(); - while (it != fds.end()) { - if (it->fd == retired_fd) { - it = fds.erase(it); - } - else { - it++; - } - } + fds.erase (std::remove_if (fds.begin (), fds.end (), + zmq::select_t::is_retired_fd), fds.end ()); retired = false; } } @@ -239,3 +203,9 @@ void zmq::select_t::worker_routine (void *arg_) { ((select_t*) arg_)->loop (); } + +bool zmq::select_t::is_retired_fd (const fd_entry_t &entry) +{ + return (entry.fd == retired_fd); +} + -- cgit v1.2.3