From 4efe2366d7394e8969fc9aa64c50be6842d8455f Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 2 Oct 2009 10:46:36 +0200 Subject: poller is a concept now rather than virtualised class --- src/Makefile.am | 2 +- src/devpoll.cpp | 41 ++++++++++++--------------- src/devpoll.hpp | 13 +++++---- src/epoll.cpp | 16 +++++------ src/epoll.hpp | 14 +++++----- src/i_poller.hpp | 84 ------------------------------------------------------- src/io_object.cpp | 2 +- src/io_object.hpp | 6 ++-- src/io_thread.cpp | 41 ++------------------------- src/io_thread.hpp | 8 +++--- src/kqueue.cpp | 17 ++++++----- src/kqueue.hpp | 13 +++++---- src/poll.cpp | 18 ++++++------ src/poll.hpp | 13 +++++---- src/poller.hpp | 68 ++++++++++++++++++++++++++++++++++++++++++++ src/select.cpp | 33 ++++++++++------------ src/select.hpp | 13 +++++---- 17 files changed, 170 insertions(+), 232 deletions(-) delete mode 100644 src/i_poller.hpp create mode 100644 src/poller.hpp diff --git a/src/Makefile.am b/src/Makefile.am index 27784a0..92016e8 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -85,7 +85,6 @@ libzmq_la_SOURCES = $(pgm_sources) \ ip.hpp \ i_endpoint.hpp \ i_engine.hpp \ - i_poller.hpp \ i_poll_events.hpp \ i_signaler.hpp \ kqueue.hpp \ @@ -100,6 +99,7 @@ libzmq_la_SOURCES = $(pgm_sources) \ pipe.hpp \ platform.hpp \ poll.hpp \ + poller.hpp \ p2p.hpp \ pub.hpp \ rep.hpp \ diff --git a/src/devpoll.cpp b/src/devpoll.cpp index 2386034..f28d55e 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -69,7 +69,8 @@ void zmq::devpoll_t::devpoll_ctl (fd_t fd_, short events_) zmq_assert (rc == sizeof pfd); } -zmq::handle_t zmq::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_) +zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_, + i_poll_events *reactor_) { assert (!fd_table [fd_].valid); @@ -84,17 +85,15 @@ zmq::handle_t zmq::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_) // Increase the load metric of the thread. load.add (1); - handle_t handle; - handle.fd = fd_; - return handle; + return fd_; } void zmq::devpoll_t::rm_fd (handle_t handle_) { - assert (fd_table [handle_.fd].valid); + assert (fd_table [handle_].valid); - devpoll_ctl (handle_.fd, POLLREMOVE); - fd_table [handle_.fd].valid = false; + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].valid = false; // Decrease the load metric of the thread. load.sub (1); @@ -102,34 +101,30 @@ void zmq::devpoll_t::rm_fd (handle_t handle_) void zmq::devpoll_t::set_pollin (handle_t handle_) { - fd_t fd = handle_.fd; - devpoll_ctl (fd, POLLREMOVE); - fd_table [fd].events |= POLLIN; - devpoll_ctl (fd, fd_table [fd].events); + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].events |= POLLIN; + devpoll_ctl (handle_, fd_table [handle_].events); } void zmq::devpoll_t::reset_pollin (handle_t handle_) { - fd_t fd = handle_.fd; - devpoll_ctl (fd, POLLREMOVE); - fd_table [fd].events &= ~((short) POLLIN); - devpoll_ctl (fd, fd_table [fd].events); + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].events &= ~((short) POLLIN); + devpoll_ctl (handle_, fd_table [handle_].events); } void zmq::devpoll_t::set_pollout (handle_t handle_) { - fd_t fd = handle_.fd; - devpoll_ctl (fd, POLLREMOVE); - fd_table [fd].events |= POLLOUT; - devpoll_ctl (fd, fd_table [fd].events); + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].events |= POLLOUT; + devpoll_ctl (handle_, fd_table [handle_].events); } void zmq::devpoll_t::reset_pollout (handle_t handle_) { - fd_t fd = handle_.fd; - devpoll_ctl (fd, POLLREMOVE); - fd_table [fd].events &= ~((short) POLLOUT); - devpoll_ctl (fd, fd_table [fd].events); + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].events &= ~((short) POLLOUT); + devpoll_ctl (handle_, fd_table [handle_].events); } void zmq::devpoll_t::add_timer (i_poll_events *events_) diff --git a/src/devpoll.hpp b/src/devpoll.hpp index 57f0156..b796ba5 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -26,7 +26,6 @@ #include -#include "i_poller.hpp" #include "fd.hpp" #include "thread.hpp" #include "atomic_counter.hpp" @@ -37,22 +36,24 @@ namespace zmq // Implements socket polling mechanism using the Solaris-specific // "/dev/poll" interface. - class devpoll_t : public i_poller + class devpoll_t { public: + typedef fd_t handle_t; + devpoll_t (); ~devpoll_t (); - // i_poller implementation. - handle_t add_fd (fd_t fd_, i_poll_events *events_); + // "poller" concept. + handle_t add_fd (fd_t fd_, struct i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (i_poll_events *events_); - void cancel_timer (i_poll_events *events_); + void add_timer (struct i_poll_events *events_); + void cancel_timer (struct i_poll_events *events_); int get_load (); void start (); void stop (); diff --git a/src/epoll.cpp b/src/epoll.cpp index 5f3bc51..8f576f8 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -52,7 +52,7 @@ zmq::epoll_t::~epoll_t () delete *it; } -zmq::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) +zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) { poll_entry_t *pe = new poll_entry_t; zmq_assert (pe != NULL); @@ -72,14 +72,12 @@ zmq::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) // Increase the load metric of the thread. load.add (1); - handle_t handle; - handle.ptr = pe; - return handle; + return pe; } void zmq::epoll_t::rm_fd (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; int rc = epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev); errno_assert (rc != -1); pe->fd = retired_fd; @@ -91,7 +89,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_) void zmq::epoll_t::set_pollin (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->ev.events |= EPOLLIN; int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); errno_assert (rc != -1); @@ -99,7 +97,7 @@ void zmq::epoll_t::set_pollin (handle_t handle_) void zmq::epoll_t::reset_pollin (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->ev.events &= ~((short) EPOLLIN); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); errno_assert (rc != -1); @@ -107,7 +105,7 @@ void zmq::epoll_t::reset_pollin (handle_t handle_) void zmq::epoll_t::set_pollout (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->ev.events |= EPOLLOUT; int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); errno_assert (rc != -1); @@ -115,7 +113,7 @@ void zmq::epoll_t::set_pollout (handle_t handle_) void zmq::epoll_t::reset_pollout (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->ev.events &= ~((short) EPOLLOUT); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); errno_assert (rc != -1); diff --git a/src/epoll.hpp b/src/epoll.hpp index 619d4f3..90824bd 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -27,8 +27,6 @@ #include #include -#include "i_poller.hpp" -//#include "i_poll_events.hpp" #include "fd.hpp" #include "thread.hpp" #include "atomic_counter.hpp" @@ -39,22 +37,24 @@ namespace zmq // This class implements socket polling mechanism using the Linux-specific // epoll mechanism. - class epoll_t : public i_poller + class epoll_t { public: + typedef void* handle_t; + epoll_t (); ~epoll_t (); - // i_poller implementation. - handle_t add_fd (fd_t fd_, i_poll_events *events_); + // "poller" concept. + handle_t add_fd (fd_t fd_, struct i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (i_poll_events *events_); - void cancel_timer (i_poll_events *events_); + void add_timer (struct i_poll_events *events_); + void cancel_timer (struct i_poll_events *events_); int get_load (); void start (); void stop (); diff --git a/src/i_poller.hpp b/src/i_poller.hpp deleted file mode 100644 index 2665e82..0000000 --- a/src/i_poller.hpp +++ /dev/null @@ -1,84 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_I_POLLER_HPP_INCLUDED__ -#define __ZMQ_I_POLLER_HPP_INCLUDED__ - -#include "fd.hpp" - -namespace zmq -{ - - union handle_t - { - fd_t fd; - void *ptr; - }; - - // Virtual interface to be used when polling on file descriptors. - - struct i_poller - { - virtual ~i_poller () {}; - - // Add file descriptor to the polling set. Return handle - // representing the descriptor. 'events' interface will be used - // to invoke callback functions when event occurs. - virtual handle_t add_fd (fd_t fd_, struct i_poll_events *events_) = 0; - - // Remove file descriptor identified by handle from the polling set. - virtual void rm_fd (handle_t handle_) = 0; - - // Start polling for input from socket. - virtual void set_pollin (handle_t handle_) = 0; - - // Stop polling for input from socket. - virtual void reset_pollin (handle_t handle_) = 0; - - // Start polling for availability of the socket for writing. - virtual void set_pollout (handle_t handle_) = 0; - - // Stop polling for availability of the socket for writing. - virtual void reset_pollout (handle_t handle_) = 0; - - // Ask to be notified after some time. Actual interval varies between - // 0 and max_timer_period ms. Timer is destroyed once it expires or, - // optionally, when cancel_timer is called. - virtual void add_timer (struct i_poll_events *events_) = 0; - - // Cancel the timer set by add_timer method. - virtual void cancel_timer (struct i_poll_events *events_) = 0; - - // Returns load experienced by the I/O thread. Currently it's number - // of file descriptors handled by the poller, in the future we may - // use a metric taking actual traffic on the individual sockets into - // account. - virtual int get_load () = 0; - - // Start the execution of the underlying I/O thread. - // This method is called from a foreign thread. - virtual void start () = 0; - - // Ask underlying I/O thread to stop. - virtual void stop () = 0; - }; - -} - -#endif diff --git a/src/io_object.cpp b/src/io_object.cpp index f61e5f0..0cf77fd 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -36,7 +36,7 @@ void zmq::io_object_t::set_io_thread (io_thread_t *io_thread_) poller = io_thread_->get_poller (); } -zmq::handle_t zmq::io_object_t::add_fd (fd_t fd_) +zmq::io_object_t::handle_t zmq::io_object_t::add_fd (fd_t fd_) { return poller->add_fd (fd_, this); } diff --git a/src/io_object.hpp b/src/io_object.hpp index e5582db..2ed5e24 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -22,7 +22,7 @@ #include -#include "i_poller.hpp" +#include "poller.hpp" #include "i_poll_events.hpp" namespace zmq @@ -41,6 +41,8 @@ namespace zmq protected: + typedef poller_t::handle_t handle_t; + // Derived class can init/swap the underlying I/O thread. // Caution: Remove all the file descriptors from the old I/O thread // before swapping to the new one! @@ -63,7 +65,7 @@ namespace zmq private: - struct i_poller *poller; + poller_t *poller; io_object_t (const io_object_t&); void operator = (const io_object_t&); diff --git a/src/io_thread.cpp b/src/io_thread.cpp index a90876c..6d4710a 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -24,11 +24,6 @@ #include "platform.hpp" #include "err.hpp" #include "command.hpp" -#include "epoll.hpp" -#include "poll.hpp" -#include "select.hpp" -#include "devpoll.hpp" -#include "kqueue.hpp" #include "dispatcher.hpp" #include "simple_semaphore.hpp" @@ -36,39 +31,7 @@ zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_, int flags_) : object_t (dispatcher_, thread_slot_) { -#if defined ZMQ_FORCE_SELECT - poller = new select_t; -#elif defined ZMQ_FORCE_POLL - poller = new poll_t; -#elif defined ZMQ_FORCE_EPOLL - poller = new epoll_t; -#elif defined ZMQ_FORCE_DEVPOLL - poller = new devpoll_t; -#elif defined ZMQ_FORCE_KQUEUE - poller = new kqueue_t; -#elif defined ZMQ_HAVE_LINUX - poller = new epoll_t; -#elif defined ZMQ_HAVE_WINDOWS - poller = new select_t; -#elif defined ZMQ_HAVE_FREEBSD - poller = new kqueue_t; -#elif defined ZMQ_HAVE_OPENBSD - poller = new kqueue_t; -#elif defined ZMQ_HAVE_SOLARIS - poller = new devpoll_t; -#elif defined ZMQ_HAVE_OSX - poller = new kqueue_t; -#elif defined ZMQ_HAVE_QNXNTO - poller = new poll_t; -#elif defined ZMQ_HAVE_AIX - poller = new poll_t; -#elif defined ZMQ_HAVE_HPUX - poller = new devpoll_t; -#elif defined ZMQ_HAVE_OPENVMS - poller = new select_t; -#else -#error Unsupported platform -#endif + poller = new poller_t; zmq_assert (poller); signaler_handle = poller->add_fd (signaler.get_fd (), this); @@ -134,7 +97,7 @@ void zmq::io_thread_t::timer_event () zmq_assert (false); } -zmq::i_poller *zmq::io_thread_t::get_poller () +zmq::poller_t *zmq::io_thread_t::get_poller () { zmq_assert (poller); return poller; diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 4015b0c..457cdbf 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -23,7 +23,7 @@ #include #include "object.hpp" -#include "i_poller.hpp" +#include "poller.hpp" #include "i_poll_events.hpp" #include "fd_signaler.hpp" @@ -59,7 +59,7 @@ namespace zmq void timer_event (); // Used by io_objects to retrieve the assciated poller object. - struct i_poller *get_poller (); + poller_t *get_poller (); // Command handlers. void process_stop (); @@ -74,10 +74,10 @@ namespace zmq fd_signaler_t signaler; // Handle associated with signaler's file descriptor. - handle_t signaler_handle; + poller_t::handle_t signaler_handle; // I/O multiplexing is performed using a poller object. - i_poller *poller; + poller_t *poller; }; } diff --git a/src/kqueue.cpp b/src/kqueue.cpp index c967c73..f32fa36 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -68,7 +68,8 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_) errno_assert (rc != -1); } -zmq::handle_t zmq::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_) +zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_, + i_poll_events *reactor_) { poll_entry_t *pe = new poll_entry_t; zmq_assert (pe != NULL); @@ -78,14 +79,12 @@ zmq::handle_t zmq::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_) pe->flag_pollout = 0; pe->reactor = reactor_; - handle_t handle; - handle.ptr = pe; - return handle; + return pe; } void zmq::kqueue_t::rm_fd (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; if (pe->flag_pollin) kevent_delete (pe->fd, EVFILT_READ); if (pe->flag_pollout) @@ -96,28 +95,28 @@ void zmq::kqueue_t::rm_fd (handle_t handle_) void zmq::kqueue_t::set_pollin (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->flag_pollin = true; kevent_add (pe->fd, EVFILT_READ, pe); } void zmq::kqueue_t::reset_pollin (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->flag_pollin = false; kevent_delete (pe->fd, EVFILT_READ); } void zmq::kqueue_t::set_pollout (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->flag_pollout = true; kevent_add (pe->fd, EVFILT_WRITE, pe); } void zmq::kqueue_t::reset_pollout (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->flag_pollout = false; kevent_delete (pe->fd, EVFILT_WRITE); } diff --git a/src/kqueue.hpp b/src/kqueue.hpp index eeb6f09..bed9108 100644 --- a/src/kqueue.hpp +++ b/src/kqueue.hpp @@ -26,7 +26,6 @@ #include -#include "i_poller.hpp" #include "fd.hpp" #include "thread.hpp" #include "atomic_counter.hpp" @@ -37,22 +36,24 @@ namespace zmq // Implements socket polling mechanism using the BSD-specific // kqueue interface. - class kqueue_t : public i_poller + class kqueue_t { public: + typedef void* handle_t; + kqueue_t (); ~kqueue_t (); - // i_poller implementation. - handle_t add_fd (fd_t fd_, i_poll_events *events_); + // "poller" concept. + handle_t add_fd (fd_t fd_, struct i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (i_poll_events *events_); - void cancel_timer (i_poll_events *events_); + void add_timer (struct i_poll_events *events_); + void cancel_timer (struct i_poll_events *events_); int get_load (); void start (); void stop (); diff --git a/src/poll.cpp b/src/poll.cpp index dd3de43..50a2426 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -58,7 +58,7 @@ zmq::poll_t::~poll_t () zmq_assert (load.get () == 0); } -zmq::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) +zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) { pollfd pfd = {fd_, 0, 0}; pollset.push_back (pfd); @@ -70,19 +70,17 @@ zmq::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) // Increase the load metric of the thread. load.add (1); - handle_t handle; - handle.fd = fd_; - return handle; + return fd_; } void zmq::poll_t::rm_fd (handle_t handle_) { - fd_t index = fd_table [handle_.fd].index; + fd_t index = fd_table [handle_].index; assert (index != retired_fd); // Mark the fd as unused. pollset [index].fd = retired_fd; - fd_table [handle_.fd].index = retired_fd; + fd_table [handle_].index = retired_fd; retired = true; // Decrease the load metric of the thread. @@ -91,25 +89,25 @@ void zmq::poll_t::rm_fd (handle_t handle_) void zmq::poll_t::set_pollin (handle_t handle_) { - int index = fd_table [handle_.fd].index; + int index = fd_table [handle_].index; pollset [index].events |= POLLIN; } void zmq::poll_t::reset_pollin (handle_t handle_) { - int index = fd_table [handle_.fd].index; + int index = fd_table [handle_].index; pollset [index].events &= ~((short) POLLIN); } void zmq::poll_t::set_pollout (handle_t handle_) { - int index = fd_table [handle_.fd].index; + int index = fd_table [handle_].index; pollset [index].events |= POLLOUT; } void zmq::poll_t::reset_pollout (handle_t handle_) { - int index = fd_table [handle_.fd].index; + int index = fd_table [handle_].index; pollset [index].events &= ~((short) POLLOUT); } diff --git a/src/poll.hpp b/src/poll.hpp index 9fe6067..05732d0 100644 --- a/src/poll.hpp +++ b/src/poll.hpp @@ -31,7 +31,6 @@ #include #include -#include "i_poller.hpp" #include "fd.hpp" #include "thread.hpp" #include "atomic_counter.hpp" @@ -42,22 +41,24 @@ namespace zmq // Implements socket polling mechanism using the POSIX.1-2001 // poll() system call. - class poll_t : public i_poller + class poll_t { public: + typedef fd_t handle_t; + poll_t (); ~poll_t (); - // i_poller implementation. - handle_t add_fd (fd_t fd_, i_poll_events *events_); + // "poller" concept. + handle_t add_fd (fd_t fd_, struct i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (i_poll_events *events_); - void cancel_timer (i_poll_events *events_); + void add_timer (struct i_poll_events *events_); + void cancel_timer (struct i_poll_events *events_); int get_load (); void start (); void stop (); diff --git a/src/poller.hpp b/src/poller.hpp new file mode 100644 index 0000000..9051514 --- /dev/null +++ b/src/poller.hpp @@ -0,0 +1,68 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_POLLER_HPP_INCLUDED__ +#define __ZMQ_POLLER_HPP_INCLUDED__ + +#include "epoll.hpp" +#include "poll.hpp" +#include "select.hpp" +#include "devpoll.hpp" +#include "kqueue.hpp" + +namespace zmq +{ + +#if defined ZMQ_FORCE_SELECT + typedef select_t poller_t; +#elif defined ZMQ_FORCE_POLL + typedef poll_t poller_t; +#elif defined ZMQ_FORCE_EPOLL + typedef epoll_t poller_t; +#elif defined ZMQ_FORCE_DEVPOLL + typedef devpoll_t poller_t; +#elif defined ZMQ_FORCE_KQUEUE + typedef kqueue_t poller_t; +#elif defined ZMQ_HAVE_LINUX + typedef epoll_t poller_t; +#elif defined ZMQ_HAVE_WINDOWS + typedef select_t poller_t; +#elif defined ZMQ_HAVE_FREEBSD + typedef kqueue_t poller_t; +#elif defined ZMQ_HAVE_OPENBSD + typedef kqueue_t poller_t; +#elif defined ZMQ_HAVE_SOLARIS + typedef devpoll_t poller_t; +#elif defined ZMQ_HAVE_OSX + typedef kqueue_t poller_t; +#elif defined ZMQ_HAVE_QNXNTO + typedef poll_t poller_t; +#elif defined ZMQ_HAVE_AIX + typedef poll_t poller_t; +#elif defined ZMQ_HAVE_HPUX + typedef devpoll_t poller_t; +#elif defined ZMQ_HAVE_OPENVMS + typedef select_t poller_t; +#else +#error Unsupported platform +#endif + +} + +#endif diff --git a/src/select.cpp b/src/select.cpp index cb17169..3edd4ff 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -59,7 +59,7 @@ zmq::select_t::~select_t () zmq_assert (load.get () == 0); } -zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) +zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) { // Store the file descriptor. fd_entry_t entry = {fd_, events_}; @@ -75,38 +75,33 @@ zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) // Increase the load metric of the thread. load.add (1); - handle_t handle; - handle.fd = fd_; - return handle; + return fd_; } void zmq::select_t::rm_fd (handle_t handle_) { - // Get file descriptor. - fd_t fd = handle_.fd; - // Mark the descriptor as retired. fd_set_t::iterator it; for (it = fds.begin (); it != fds.end (); it ++) - if (it->fd == fd) + if (it->fd == handle_) break; zmq_assert (it != fds.end ()); it->fd = retired_fd; retired = true; // Stop polling on the descriptor. - FD_CLR (fd, &source_set_in); - FD_CLR (fd, &source_set_out); - FD_CLR (fd, &source_set_err); + FD_CLR (handle_, &source_set_in); + FD_CLR (handle_, &source_set_out); + FD_CLR (handle_, &source_set_err); // Discard all events generated on this file descriptor. - FD_CLR (fd, &readfds); - FD_CLR (fd, &writefds); - FD_CLR (fd, &exceptfds); + FD_CLR (handle_, &readfds); + FD_CLR (handle_, &writefds); + FD_CLR (handle_, &exceptfds); // Adjust the maxfd attribute if we have removed the // highest-numbered file descriptor. - if (fd == maxfd) { + if (handle_ == maxfd) { maxfd = retired_fd; for (fd_set_t::iterator it = fds.begin (); it != fds.end (); it ++) if (it->fd > maxfd) @@ -119,22 +114,22 @@ void zmq::select_t::rm_fd (handle_t handle_) void zmq::select_t::set_pollin (handle_t handle_) { - FD_SET (handle_.fd, &source_set_in); + FD_SET (handle_, &source_set_in); } void zmq::select_t::reset_pollin (handle_t handle_) { - FD_CLR (handle_.fd, &source_set_in); + FD_CLR (handle_, &source_set_in); } void zmq::select_t::set_pollout (handle_t handle_) { - FD_SET (handle_.fd, &source_set_out); + FD_SET (handle_, &source_set_out); } void zmq::select_t::reset_pollout (handle_t handle_) { - FD_CLR (handle_.fd, &source_set_out); + FD_CLR (handle_, &source_set_out); } void zmq::select_t::add_timer (i_poll_events *events_) diff --git a/src/select.hpp b/src/select.hpp index c442477..01014d5 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -34,7 +34,6 @@ #include #endif -#include "i_poller.hpp" #include "fd.hpp" #include "thread.hpp" #include "atomic_counter.hpp" @@ -45,22 +44,24 @@ namespace zmq // Implements socket polling mechanism using POSIX.1-2001 select() // function. - class select_t : public i_poller + class select_t { public: + typedef fd_t handle_t; + select_t (); ~select_t (); - // i_poller implementation. - handle_t add_fd (fd_t fd_, i_poll_events *events_); + // "poller" concept. + handle_t add_fd (fd_t fd_, struct i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (i_poll_events *events_); - void cancel_timer (i_poll_events *events_); + void add_timer (struct i_poll_events *events_); + void cancel_timer (struct i_poll_events *events_); int get_load (); void start (); void stop (); -- cgit v1.2.3