diff options
Diffstat (limited to 'src')
42 files changed, 503 insertions, 255 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 27784a0..92016e8 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -85,7 +85,6 @@ libzmq_la_SOURCES = $(pgm_sources) \ ip.hpp \ i_endpoint.hpp \ i_engine.hpp \ - i_poller.hpp \ i_poll_events.hpp \ i_signaler.hpp \ kqueue.hpp \ @@ -100,6 +99,7 @@ libzmq_la_SOURCES = $(pgm_sources) \ pipe.hpp \ platform.hpp \ poll.hpp \ + poller.hpp \ p2p.hpp \ pub.hpp \ rep.hpp \ diff --git a/src/devpoll.cpp b/src/devpoll.cpp index 2386034..f28d55e 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -69,7 +69,8 @@ void zmq::devpoll_t::devpoll_ctl (fd_t fd_, short events_) zmq_assert (rc == sizeof pfd); } -zmq::handle_t zmq::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_) +zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_, + i_poll_events *reactor_) { assert (!fd_table [fd_].valid); @@ -84,17 +85,15 @@ zmq::handle_t zmq::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_) // Increase the load metric of the thread. load.add (1); - handle_t handle; - handle.fd = fd_; - return handle; + return fd_; } void zmq::devpoll_t::rm_fd (handle_t handle_) { - assert (fd_table [handle_.fd].valid); + assert (fd_table [handle_].valid); - devpoll_ctl (handle_.fd, POLLREMOVE); - fd_table [handle_.fd].valid = false; + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].valid = false; // Decrease the load metric of the thread. load.sub (1); @@ -102,34 +101,30 @@ void zmq::devpoll_t::rm_fd (handle_t handle_) void zmq::devpoll_t::set_pollin (handle_t handle_) { - fd_t fd = handle_.fd; - devpoll_ctl (fd, POLLREMOVE); - fd_table [fd].events |= POLLIN; - devpoll_ctl (fd, fd_table [fd].events); + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].events |= POLLIN; + devpoll_ctl (handle_, fd_table [handle_].events); } void zmq::devpoll_t::reset_pollin (handle_t handle_) { - fd_t fd = handle_.fd; - devpoll_ctl (fd, POLLREMOVE); - fd_table [fd].events &= ~((short) POLLIN); - devpoll_ctl (fd, fd_table [fd].events); + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].events &= ~((short) POLLIN); + devpoll_ctl (handle_, fd_table [handle_].events); } void zmq::devpoll_t::set_pollout (handle_t handle_) { - fd_t fd = handle_.fd; - devpoll_ctl (fd, POLLREMOVE); - fd_table [fd].events |= POLLOUT; - devpoll_ctl (fd, fd_table [fd].events); + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].events |= POLLOUT; + devpoll_ctl (handle_, fd_table [handle_].events); } void zmq::devpoll_t::reset_pollout (handle_t handle_) { - fd_t fd = handle_.fd; - devpoll_ctl (fd, POLLREMOVE); - fd_table [fd].events &= ~((short) POLLOUT); - devpoll_ctl (fd, fd_table [fd].events); + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].events &= ~((short) POLLOUT); + devpoll_ctl (handle_, fd_table [handle_].events); } void zmq::devpoll_t::add_timer (i_poll_events *events_) diff --git a/src/devpoll.hpp b/src/devpoll.hpp index 57f0156..b796ba5 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -26,7 +26,6 @@ #include <vector> -#include "i_poller.hpp" #include "fd.hpp" #include "thread.hpp" #include "atomic_counter.hpp" @@ -37,22 +36,24 @@ namespace zmq // Implements socket polling mechanism using the Solaris-specific // "/dev/poll" interface. - class devpoll_t : public i_poller + class devpoll_t { public: + typedef fd_t handle_t; + devpoll_t (); ~devpoll_t (); - // i_poller implementation. - handle_t add_fd (fd_t fd_, i_poll_events *events_); + // "poller" concept. + handle_t add_fd (fd_t fd_, struct i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (i_poll_events *events_); - void cancel_timer (i_poll_events *events_); + void add_timer (struct i_poll_events *events_); + void cancel_timer (struct i_poll_events *events_); int get_load (); void start (); void stop (); diff --git a/src/epoll.cpp b/src/epoll.cpp index 5f3bc51..8f576f8 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -52,7 +52,7 @@ zmq::epoll_t::~epoll_t () delete *it; } -zmq::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) +zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) { poll_entry_t *pe = new poll_entry_t; zmq_assert (pe != NULL); @@ -72,14 +72,12 @@ zmq::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) // Increase the load metric of the thread. load.add (1); - handle_t handle; - handle.ptr = pe; - return handle; + return pe; } void zmq::epoll_t::rm_fd (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; int rc = epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev); errno_assert (rc != -1); pe->fd = retired_fd; @@ -91,7 +89,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_) void zmq::epoll_t::set_pollin (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->ev.events |= EPOLLIN; int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); errno_assert (rc != -1); @@ -99,7 +97,7 @@ void zmq::epoll_t::set_pollin (handle_t handle_) void zmq::epoll_t::reset_pollin (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->ev.events &= ~((short) EPOLLIN); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); errno_assert (rc != -1); @@ -107,7 +105,7 @@ void zmq::epoll_t::reset_pollin (handle_t handle_) void zmq::epoll_t::set_pollout (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->ev.events |= EPOLLOUT; int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); errno_assert (rc != -1); @@ -115,7 +113,7 @@ void zmq::epoll_t::set_pollout (handle_t handle_) void zmq::epoll_t::reset_pollout (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->ev.events &= ~((short) EPOLLOUT); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); errno_assert (rc != -1); diff --git a/src/epoll.hpp b/src/epoll.hpp index 619d4f3..90824bd 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -27,8 +27,6 @@ #include <vector> #include <sys/epoll.h> -#include "i_poller.hpp" -//#include "i_poll_events.hpp" #include "fd.hpp" #include "thread.hpp" #include "atomic_counter.hpp" @@ -39,22 +37,24 @@ namespace zmq // This class implements socket polling mechanism using the Linux-specific // epoll mechanism. - class epoll_t : public i_poller + class epoll_t { public: + typedef void* handle_t; + epoll_t (); ~epoll_t (); - // i_poller implementation. - handle_t add_fd (fd_t fd_, i_poll_events *events_); + // "poller" concept. + handle_t add_fd (fd_t fd_, struct i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (i_poll_events *events_); - void cancel_timer (i_poll_events *events_); + void add_timer (struct i_poll_events *events_); + void cancel_timer (struct i_poll_events *events_); int get_load (); void start (); void stop (); diff --git a/src/err.cpp b/src/err.cpp index ef5b987..36cb2fc 100644 --- a/src/err.cpp +++ b/src/err.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include "../bindings/c/zmq.h" + #include "err.hpp" #include "platform.hpp" @@ -24,8 +26,6 @@ const char *zmq::wsa_error() { - - int errcode = WSAGetLastError (); // TODO: This is not a generic way to handle this... if (errcode == WSAEWOULDBLOCK) @@ -148,4 +148,43 @@ void zmq::win_error (char *buffer_, size_t buffer_size_) zmq_assert (rc); } +void zmq::wsa_error_to_errno () +{ + int errcode = WSAGetLastError (); + switch (errcode) { + case WSAEINPROGRESS: + errno = EAGAIN; + return; + case WSAEBADF: + errno = EBADF; + return; + case WSAEINVAL: + errno = EINVAL; + return; + case WSAEMFILE: + errno = EMFILE; + return; + case WSAEFAULT: + errno = EFAULT; + return; + case WSAEPROTONOSUPPORT: + errno = EPROTONOSUPPORT; + return; + case WSAENOBUFS: + errno = ENOBUFS; + return; + case WSAENETDOWN: + errno = ENETDOWN; + return; + case WSAEADDRINUSE: + errno = EADDRINUSE; + return; + case WSAEADDRNOTAVAIL: + errno = EADDRNOTAVAIL; + return; + default: + wsa_assert (false); + } +} + #endif diff --git a/src/err.hpp b/src/err.hpp index c1b2916..fb9195e 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -41,6 +41,7 @@ namespace zmq const char *wsa_error (); void win_error (char *buffer_, size_t buffer_size_); + void wsa_error_to_errno (); } diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp index e1b56ad..513a1e4 100644 --- a/src/fd_signaler.hpp +++ b/src/fd_signaler.hpp @@ -44,8 +44,6 @@ namespace zmq void signal (int signal_); uint64_t poll (); uint64_t check (); - - // Get the file descriptor associated with the object. fd_t get_fd (); private: diff --git a/src/i_poller.hpp b/src/i_poller.hpp deleted file mode 100644 index 2665e82..0000000 --- a/src/i_poller.hpp +++ /dev/null @@ -1,84 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_I_POLLER_HPP_INCLUDED__ -#define __ZMQ_I_POLLER_HPP_INCLUDED__ - -#include "fd.hpp" - -namespace zmq -{ - - union handle_t - { - fd_t fd; - void *ptr; - }; - - // Virtual interface to be used when polling on file descriptors. - - struct i_poller - { - virtual ~i_poller () {}; - - // Add file descriptor to the polling set. Return handle - // representing the descriptor. 'events' interface will be used - // to invoke callback functions when event occurs. - virtual handle_t add_fd (fd_t fd_, struct i_poll_events *events_) = 0; - - // Remove file descriptor identified by handle from the polling set. - virtual void rm_fd (handle_t handle_) = 0; - - // Start polling for input from socket. - virtual void set_pollin (handle_t handle_) = 0; - - // Stop polling for input from socket. - virtual void reset_pollin (handle_t handle_) = 0; - - // Start polling for availability of the socket for writing. - virtual void set_pollout (handle_t handle_) = 0; - - // Stop polling for availability of the socket for writing. - virtual void reset_pollout (handle_t handle_) = 0; - - // Ask to be notified after some time. Actual interval varies between - // 0 and max_timer_period ms. Timer is destroyed once it expires or, - // optionally, when cancel_timer is called. - virtual void add_timer (struct i_poll_events *events_) = 0; - - // Cancel the timer set by add_timer method. - virtual void cancel_timer (struct i_poll_events *events_) = 0; - - // Returns load experienced by the I/O thread. Currently it's number - // of file descriptors handled by the poller, in the future we may - // use a metric taking actual traffic on the individual sockets into - // account. - virtual int get_load () = 0; - - // Start the execution of the underlying I/O thread. - // This method is called from a foreign thread. - virtual void start () = 0; - - // Ask underlying I/O thread to stop. - virtual void stop () = 0; - }; - -} - -#endif diff --git a/src/i_signaler.hpp b/src/i_signaler.hpp index a09fe7e..ad04bb5 100644 --- a/src/i_signaler.hpp +++ b/src/i_signaler.hpp @@ -21,6 +21,7 @@ #define __ZMQ_I_SIGNALER_HPP_INCLUDED__ #include "stdint.hpp" +#include "fd.hpp" namespace zmq { @@ -42,6 +43,11 @@ namespace zmq // Same as poll, however, if there is no signal available, // function returns zero immediately instead of waiting for a signal. virtual uint64_t check () = 0; + + // Returns file descriptor that allows waiting for signals. Specific + // signalers may not support this functionality. If so, the function + // returns retired_fd. + virtual fd_t get_fd () = 0; }; } diff --git a/src/io_object.cpp b/src/io_object.cpp index f61e5f0..0cf77fd 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -36,7 +36,7 @@ void zmq::io_object_t::set_io_thread (io_thread_t *io_thread_) poller = io_thread_->get_poller (); } -zmq::handle_t zmq::io_object_t::add_fd (fd_t fd_) +zmq::io_object_t::handle_t zmq::io_object_t::add_fd (fd_t fd_) { return poller->add_fd (fd_, this); } diff --git a/src/io_object.hpp b/src/io_object.hpp index e5582db..2ed5e24 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -22,7 +22,7 @@ #include <stddef.h> -#include "i_poller.hpp" +#include "poller.hpp" #include "i_poll_events.hpp" namespace zmq @@ -41,6 +41,8 @@ namespace zmq protected: + typedef poller_t::handle_t handle_t; + // Derived class can init/swap the underlying I/O thread. // Caution: Remove all the file descriptors from the old I/O thread // before swapping to the new one! @@ -63,7 +65,7 @@ namespace zmq private: - struct i_poller *poller; + poller_t *poller; io_object_t (const io_object_t&); void operator = (const io_object_t&); diff --git a/src/io_thread.cpp b/src/io_thread.cpp index a90876c..6d4710a 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -24,11 +24,6 @@ #include "platform.hpp" #include "err.hpp" #include "command.hpp" -#include "epoll.hpp" -#include "poll.hpp" -#include "select.hpp" -#include "devpoll.hpp" -#include "kqueue.hpp" #include "dispatcher.hpp" #include "simple_semaphore.hpp" @@ -36,39 +31,7 @@ zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_, int flags_) : object_t (dispatcher_, thread_slot_) { -#if defined ZMQ_FORCE_SELECT - poller = new select_t; -#elif defined ZMQ_FORCE_POLL - poller = new poll_t; -#elif defined ZMQ_FORCE_EPOLL - poller = new epoll_t; -#elif defined ZMQ_FORCE_DEVPOLL - poller = new devpoll_t; -#elif defined ZMQ_FORCE_KQUEUE - poller = new kqueue_t; -#elif defined ZMQ_HAVE_LINUX - poller = new epoll_t; -#elif defined ZMQ_HAVE_WINDOWS - poller = new select_t; -#elif defined ZMQ_HAVE_FREEBSD - poller = new kqueue_t; -#elif defined ZMQ_HAVE_OPENBSD - poller = new kqueue_t; -#elif defined ZMQ_HAVE_SOLARIS - poller = new devpoll_t; -#elif defined ZMQ_HAVE_OSX - poller = new kqueue_t; -#elif defined ZMQ_HAVE_QNXNTO - poller = new poll_t; -#elif defined ZMQ_HAVE_AIX - poller = new poll_t; -#elif defined ZMQ_HAVE_HPUX - poller = new devpoll_t; -#elif defined ZMQ_HAVE_OPENVMS - poller = new select_t; -#else -#error Unsupported platform -#endif + poller = new poller_t; zmq_assert (poller); signaler_handle = poller->add_fd (signaler.get_fd (), this); @@ -134,7 +97,7 @@ void zmq::io_thread_t::timer_event () zmq_assert (false); } -zmq::i_poller *zmq::io_thread_t::get_poller () +zmq::poller_t *zmq::io_thread_t::get_poller () { zmq_assert (poller); return poller; diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 4015b0c..457cdbf 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -23,7 +23,7 @@ #include <vector> #include "object.hpp" -#include "i_poller.hpp" +#include "poller.hpp" #include "i_poll_events.hpp" #include "fd_signaler.hpp" @@ -59,7 +59,7 @@ namespace zmq void timer_event (); // Used by io_objects to retrieve the assciated poller object. - struct i_poller *get_poller (); + poller_t *get_poller (); // Command handlers. void process_stop (); @@ -74,10 +74,10 @@ namespace zmq fd_signaler_t signaler; // Handle associated with signaler's file descriptor. - handle_t signaler_handle; + poller_t::handle_t signaler_handle; // I/O multiplexing is performed using a poller object. - i_poller *poller; + poller_t *poller; }; } diff --git a/src/kqueue.cpp b/src/kqueue.cpp index c967c73..f32fa36 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -68,7 +68,8 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_) errno_assert (rc != -1); } -zmq::handle_t zmq::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_) +zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_, + i_poll_events *reactor_) { poll_entry_t *pe = new poll_entry_t; zmq_assert (pe != NULL); @@ -78,14 +79,12 @@ zmq::handle_t zmq::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_) pe->flag_pollout = 0; pe->reactor = reactor_; - handle_t handle; - handle.ptr = pe; - return handle; + return pe; } void zmq::kqueue_t::rm_fd (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; if (pe->flag_pollin) kevent_delete (pe->fd, EVFILT_READ); if (pe->flag_pollout) @@ -96,28 +95,28 @@ void zmq::kqueue_t::rm_fd (handle_t handle_) void zmq::kqueue_t::set_pollin (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->flag_pollin = true; kevent_add (pe->fd, EVFILT_READ, pe); } void zmq::kqueue_t::reset_pollin (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->flag_pollin = false; kevent_delete (pe->fd, EVFILT_READ); } void zmq::kqueue_t::set_pollout (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->flag_pollout = true; kevent_add (pe->fd, EVFILT_WRITE, pe); } void zmq::kqueue_t::reset_pollout (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->flag_pollout = false; kevent_delete (pe->fd, EVFILT_WRITE); } diff --git a/src/kqueue.hpp b/src/kqueue.hpp index eeb6f09..bed9108 100644 --- a/src/kqueue.hpp +++ b/src/kqueue.hpp @@ -26,7 +26,6 @@ #include <vector> -#include "i_poller.hpp" #include "fd.hpp" #include "thread.hpp" #include "atomic_counter.hpp" @@ -37,22 +36,24 @@ namespace zmq // Implements socket polling mechanism using the BSD-specific // kqueue interface. - class kqueue_t : public i_poller + class kqueue_t { public: + typedef void* handle_t; + kqueue_t (); ~kqueue_t (); - // i_poller implementation. - handle_t add_fd (fd_t fd_, i_poll_events *events_); + // "poller" concept. + handle_t add_fd (fd_t fd_, struct i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (i_poll_events *events_); - void cancel_timer (i_poll_events *events_); + void add_timer (struct i_poll_events *events_); + void cancel_timer (struct i_poll_events *events_); int get_load (); void start (); void stop (); diff --git a/src/p2p.cpp b/src/p2p.cpp index c43b7b4..445ba5b 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -84,4 +84,15 @@ int zmq::p2p_t::xrecv (zmq_msg_t *msg_, int flags_) return 0; } +bool zmq::p2p_t::xhas_in () +{ + zmq_assert (false); + return false; +} + +bool zmq::p2p_t::xhas_out () +{ + zmq_assert (false); + return false; +} diff --git a/src/p2p.hpp b/src/p2p.hpp index 1c98dd5..1fd7e34 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -42,6 +42,8 @@ namespace zmq int xsend (zmq_msg_t *msg_, int flags_); int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); private: diff --git a/src/pipe.cpp b/src/pipe.cpp index f8dfcb8..e444520 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -36,6 +36,17 @@ zmq::reader_t::~reader_t () { } +bool zmq::reader_t::check_read () +{ + // Check if there's an item in the pipe. + if (pipe->check_read ()) + return true; + + // If not, deactivate the pipe. + endpoint->kill (this); + return false; +} + bool zmq::reader_t::read (zmq_msg_t *msg_) { if (!pipe->read (msg_)) { diff --git a/src/pipe.hpp b/src/pipe.hpp index 699e3f7..ecbce7d 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -42,6 +42,9 @@ namespace zmq void set_endpoint (i_endpoint *endpoint_); + // Returns true if there is at least one message to read in the pipe. + bool check_read (); + // Reads a message to the underlying pipe. bool read (zmq_msg_t *msg_); diff --git a/src/poll.cpp b/src/poll.cpp index dd3de43..50a2426 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -58,7 +58,7 @@ zmq::poll_t::~poll_t () zmq_assert (load.get () == 0); } -zmq::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) +zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) { pollfd pfd = {fd_, 0, 0}; pollset.push_back (pfd); @@ -70,19 +70,17 @@ zmq::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) // Increase the load metric of the thread. load.add (1); - handle_t handle; - handle.fd = fd_; - return handle; + return fd_; } void zmq::poll_t::rm_fd (handle_t handle_) { - fd_t index = fd_table [handle_.fd].index; + fd_t index = fd_table [handle_].index; assert (index != retired_fd); // Mark the fd as unused. pollset [index].fd = retired_fd; - fd_table [handle_.fd].index = retired_fd; + fd_table [handle_].index = retired_fd; retired = true; // Decrease the load metric of the thread. @@ -91,25 +89,25 @@ void zmq::poll_t::rm_fd (handle_t handle_) void zmq::poll_t::set_pollin (handle_t handle_) { - int index = fd_table [handle_.fd].index; + int index = fd_table [handle_].index; pollset [index].events |= POLLIN; } void zmq::poll_t::reset_pollin (handle_t handle_) { - int index = fd_table [handle_.fd].index; + int index = fd_table [handle_].index; pollset [index].events &= ~((short) POLLIN); } void zmq::poll_t::set_pollout (handle_t handle_) { - int index = fd_table [handle_.fd].index; + int index = fd_table [handle_].index; pollset [index].events |= POLLOUT; } void zmq::poll_t::reset_pollout (handle_t handle_) { - int index = fd_table [handle_.fd].index; + int index = fd_table [handle_].index; pollset [index].events &= ~((short) POLLOUT); } diff --git a/src/poll.hpp b/src/poll.hpp index 9fe6067..05732d0 100644 --- a/src/poll.hpp +++ b/src/poll.hpp @@ -31,7 +31,6 @@ #include <stddef.h> #include <vector> -#include "i_poller.hpp" #include "fd.hpp" #include "thread.hpp" #include "atomic_counter.hpp" @@ -42,22 +41,24 @@ namespace zmq // Implements socket polling mechanism using the POSIX.1-2001 // poll() system call. - class poll_t : public i_poller + class poll_t { public: + typedef fd_t handle_t; + poll_t (); ~poll_t (); - // i_poller implementation. - handle_t add_fd (fd_t fd_, i_poll_events *events_); + // "poller" concept. + handle_t add_fd (fd_t fd_, struct i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (i_poll_events *events_); - void cancel_timer (i_poll_events *events_); + void add_timer (struct i_poll_events *events_); + void cancel_timer (struct i_poll_events *events_); int get_load (); void start (); void stop (); diff --git a/src/poller.hpp b/src/poller.hpp new file mode 100644 index 0000000..9051514 --- /dev/null +++ b/src/poller.hpp @@ -0,0 +1,68 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_POLLER_HPP_INCLUDED__ +#define __ZMQ_POLLER_HPP_INCLUDED__ + +#include "epoll.hpp" +#include "poll.hpp" +#include "select.hpp" +#include "devpoll.hpp" +#include "kqueue.hpp" + +namespace zmq +{ + +#if defined ZMQ_FORCE_SELECT + typedef select_t poller_t; +#elif defined ZMQ_FORCE_POLL + typedef poll_t poller_t; +#elif defined ZMQ_FORCE_EPOLL + typedef epoll_t poller_t; +#elif defined ZMQ_FORCE_DEVPOLL + typedef devpoll_t poller_t; +#elif defined ZMQ_FORCE_KQUEUE + typedef kqueue_t poller_t; +#elif defined ZMQ_HAVE_LINUX + typedef epoll_t poller_t; +#elif defined ZMQ_HAVE_WINDOWS + typedef select_t poller_t; +#elif defined ZMQ_HAVE_FREEBSD + typedef kqueue_t poller_t; +#elif defined ZMQ_HAVE_OPENBSD + typedef kqueue_t poller_t; +#elif defined ZMQ_HAVE_SOLARIS + typedef devpoll_t poller_t; +#elif defined ZMQ_HAVE_OSX + typedef kqueue_t poller_t; +#elif defined ZMQ_HAVE_QNXNTO + typedef poll_t poller_t; +#elif defined ZMQ_HAVE_AIX + typedef poll_t poller_t; +#elif defined ZMQ_HAVE_HPUX + typedef devpoll_t poller_t; +#elif defined ZMQ_HAVE_OPENVMS + typedef select_t poller_t; +#else +#error Unsupported platform +#endif + +} + +#endif diff --git a/src/pub.cpp b/src/pub.cpp index 1e66a18..63b235e 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -156,3 +156,14 @@ int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_) return -1; } +bool zmq::pub_t::xhas_in () +{ + return false; +} + +bool zmq::pub_t::xhas_out () +{ + // TODO: Reimplement when queue limits are added. + return true; +} + diff --git a/src/pub.hpp b/src/pub.hpp index 07eb5a1..b3e868d 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -43,6 +43,8 @@ namespace zmq int xsend (zmq_msg_t *msg_, int flags_); int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); private: diff --git a/src/rep.cpp b/src/rep.cpp index 137c735..e8a9e39 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -195,4 +195,21 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) return -1; } +bool zmq::rep_t::xhas_in () +{ + for (int count = active; count != 0; count--) { + if (in_pipes [current]->check_read ()) + return !waiting_for_reply; + current++; + if (current >= active) + current = 0; + } + + return false; +} + +bool zmq::rep_t::xhas_out () +{ + return waiting_for_reply; +} diff --git a/src/rep.hpp b/src/rep.hpp index 4781213..3e87dc1 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -43,6 +43,8 @@ namespace zmq int xsend (zmq_msg_t *msg_, int flags_); int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); private: diff --git a/src/req.cpp b/src/req.cpp index 63409ce..93a46e8 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -195,4 +195,17 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_) return 0; } +bool zmq::req_t::xhas_in () +{ + if (reply_pipe->check_read ()) + return waiting_for_reply; + + return false; +} + +bool zmq::req_t::xhas_out () +{ + return !waiting_for_reply; +} + diff --git a/src/req.hpp b/src/req.hpp index 3ae78fd..86554b5 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -43,6 +43,8 @@ namespace zmq int xsend (zmq_msg_t *msg_, int flags_); int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); private: diff --git a/src/select.cpp b/src/select.cpp index cb17169..3edd4ff 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -59,7 +59,7 @@ zmq::select_t::~select_t () zmq_assert (load.get () == 0); } -zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) +zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) { // Store the file descriptor. fd_entry_t entry = {fd_, events_}; @@ -75,38 +75,33 @@ zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) // Increase the load metric of the thread. load.add (1); - handle_t handle; - handle.fd = fd_; - return handle; + return fd_; } void zmq::select_t::rm_fd (handle_t handle_) { - // Get file descriptor. - fd_t fd = handle_.fd; - // Mark the descriptor as retired. fd_set_t::iterator it; for (it = fds.begin (); it != fds.end (); it ++) - if (it->fd == fd) + if (it->fd == handle_) break; zmq_assert (it != fds.end ()); it->fd = retired_fd; retired = true; // Stop polling on the descriptor. - FD_CLR (fd, &source_set_in); - FD_CLR (fd, &source_set_out); - FD_CLR (fd, &source_set_err); + FD_CLR (handle_, &source_set_in); + FD_CLR (handle_, &source_set_out); + FD_CLR (handle_, &source_set_err); // Discard all events generated on this file descriptor. - FD_CLR (fd, &readfds); - FD_CLR (fd, &writefds); - FD_CLR (fd, &exceptfds); + FD_CLR (handle_, &readfds); + FD_CLR (handle_, &writefds); + FD_CLR (handle_, &exceptfds); // Adjust the maxfd attribute if we have removed the // highest-numbered file descriptor. - if (fd == maxfd) { + if (handle_ == maxfd) { maxfd = retired_fd; for (fd_set_t::iterator it = fds.begin (); it != fds.end (); it ++) if (it->fd > maxfd) @@ -119,22 +114,22 @@ void zmq::select_t::rm_fd (handle_t handle_) void zmq::select_t::set_pollin (handle_t handle_) { - FD_SET (handle_.fd, &source_set_in); + FD_SET (handle_, &source_set_in); } void zmq::select_t::reset_pollin (handle_t handle_) { - FD_CLR (handle_.fd, &source_set_in); + FD_CLR (handle_, &source_set_in); } void zmq::select_t::set_pollout (handle_t handle_) { - FD_SET (handle_.fd, &source_set_out); + FD_SET (handle_, &source_set_out); } void zmq::select_t::reset_pollout (handle_t handle_) { - FD_CLR (handle_.fd, &source_set_out); + FD_CLR (handle_, &source_set_out); } void zmq::select_t::add_timer (i_poll_events *events_) diff --git a/src/select.hpp b/src/select.hpp index c442477..01014d5 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -34,7 +34,6 @@ #include <sys/select.h> #endif -#include "i_poller.hpp" #include "fd.hpp" #include "thread.hpp" #include "atomic_counter.hpp" @@ -45,22 +44,24 @@ namespace zmq // Implements socket polling mechanism using POSIX.1-2001 select() // function. - class select_t : public i_poller + class select_t { public: + typedef fd_t handle_t; + select_t (); ~select_t (); - // i_poller implementation. - handle_t add_fd (fd_t fd_, i_poll_events *events_); + // "poller" concept. + handle_t add_fd (fd_t fd_, struct i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (i_poll_events *events_); - void cancel_timer (i_poll_events *events_); + void add_timer (struct i_poll_events *events_); + void cancel_timer (struct i_poll_events *events_); int get_load (); void start (); void stop (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 6763167..6583608 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -364,6 +364,21 @@ int zmq::socket_base_t::close () return 0; } +zmq::app_thread_t *zmq::socket_base_t::get_thread () +{ + return app_thread; +} + +bool zmq::socket_base_t::has_in () +{ + return xhas_in (); +} + +bool zmq::socket_base_t::has_out () +{ + return xhas_out (); +} + bool zmq::socket_base_t::register_session (const char *name_, session_t *session_) { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index c54efae..49ff5a5 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -54,6 +54,16 @@ namespace zmq int recv (zmq_msg_t *msg_, int flags_); int close (); + // This function is used by the polling mechanism to determine + // whether the socket belongs to the application thread the poll + // is called from. + class app_thread_t *get_thread (); + + // These functions are used by the polling mechanism to determine + // which events are to be reported from this socket. + bool has_in (); + bool has_out (); + // The list of sessions cannot be accessed via inter-thread // commands as it is unacceptable to wait for the completion of the // action till user application yields control of the application @@ -88,6 +98,8 @@ namespace zmq virtual int xsend (zmq_msg_t *msg_, int options_) = 0; virtual int xflush () = 0; virtual int xrecv (zmq_msg_t *msg_, int options_) = 0; + virtual bool xhas_in () = 0; + virtual bool xhas_out () = 0; // Socket options. options_t options; diff --git a/src/sub.cpp b/src/sub.cpp index 1bfbcd5..a7f9783 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -197,3 +197,16 @@ int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_) errno = EAGAIN; return -1; } + +bool zmq::sub_t::xhas_in () +{ + // TODO: This is more complex as we have to ignore all the messages that + // don't fit the filter. + zmq_assert (false); + return false; +} + +bool zmq::sub_t::xhas_out () +{ + return false; +} diff --git a/src/sub.hpp b/src/sub.hpp index 8691928..fb881dc 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -48,6 +48,8 @@ namespace zmq int xsend (zmq_msg_t *msg_, int flags_); int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); private: diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 304790d..9bca0f0 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -50,8 +50,10 @@ int zmq::tcp_connecter_t::open () // Create the socket. s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); - // TODO: Convert error to errno. - wsa_assert (s != INVALID_SOCKET); + if (s == INVALID_SOCKET) { + wsa_error_to_errno (); + return -1; + } // Set to non-blocking mode. unsigned long argp = 1; @@ -78,9 +80,7 @@ int zmq::tcp_connecter_t::open () return -1; } - // TODO: Convert error to errno. - wsa_assert (rc == 0); - + wsa_error_to_errno (); return -1; } diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 9431ccf..383aebe 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -48,8 +48,10 @@ int zmq::tcp_listener_t::set_address (const char *addr_) // Create a listening socket. s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); - // TODO: Convert error code to errno. - wsa_assert (s != INVALID_SOCKET); + if (s == INVALID_SOCKET) { + wsa_error_to_errno (); + return -1; + } // Allow reusing of the address. int flag = 1; @@ -65,12 +67,17 @@ int zmq::tcp_listener_t::set_address (const char *addr_) // Bind the socket to the network interface and port. rc = bind (s, (struct sockaddr*) &addr, sizeof (addr)); // TODO: Convert error code to errno. - wsa_assert (rc != SOCKET_ERROR); + if (rc == SOCKET_ERROR) { + wsa_error_to_errno (); + return -1; + } // Listen for incomming connections. rc = listen (s, 1); - // TODO: Convert error code to errno. - wsa_assert (rc != SOCKET_ERROR); + if (rc == SOCKET_ERROR) { + wsa_error_to_errno (); + return -1; + } return 0; } diff --git a/src/ypipe.hpp b/src/ypipe.hpp index 6c51b63..0a9b5d5 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -106,16 +106,12 @@ namespace zmq return true; } - // Reads an item from the pipe. Returns false if there is no value. - // available. - inline bool read (T *value_) + // Check whether item is available for reading. + inline bool check_read () { - // Was the value prefetched already? If so, return it. - if (&queue.front () != r) { - *value_ = queue.front (); - queue.pop (); + // Was the value prefetched already? If so, return. + if (&queue.front () != r) return true; - } // There's no prefetched value, so let us prefetch more values. // (Note that D is a template parameter. Becaue of that one of @@ -166,6 +162,18 @@ namespace zmq } // There was at least one value prefetched. + return true; + } + + // Reads an item from the pipe. Returns false if there is no value. + // available. + inline bool read (T *value_) + { + // Try to prefetch a value. + if (!check_read ()) + return false; + + // There was at least one value prefetched. // Return it to the caller. *value_ = queue.front (); queue.pop (); diff --git a/src/ypollset.cpp b/src/ypollset.cpp index a72eac7..0f0d75f 100644 --- a/src/ypollset.cpp +++ b/src/ypollset.cpp @@ -58,3 +58,8 @@ uint64_t zmq::ypollset_t::check () { return (uint64_t) bits.xchg (0); } + +zmq::fd_t zmq::ypollset_t::get_fd () +{ + return retired_fd; +} diff --git a/src/ypollset.hpp b/src/ypollset.hpp index 25eb3e0..810a638 100644 --- a/src/ypollset.hpp +++ b/src/ypollset.hpp @@ -42,6 +42,7 @@ namespace zmq void signal (int signal_); uint64_t poll (); uint64_t check (); + fd_t get_fd (); private: diff --git a/src/zmq.cpp b/src/zmq.cpp index 2dfdd48..7952b61 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -25,11 +25,16 @@ #include <new> #include "socket_base.hpp" -#include "err.hpp" +#include "app_thread.hpp" #include "dispatcher.hpp" #include "msg_content.hpp" #include "platform.hpp" #include "stdint.hpp" +#include "err.hpp" + +#if defined ZMQ_HAVE_LINUX +#include <poll.h> +#endif #if !defined ZMQ_HAVE_WINDOWS #include <unistd.h> @@ -44,6 +49,14 @@ const char *zmq_strerror (int errnum_) return "Not supported"; case EPROTONOSUPPORT: return "Protocol not supported"; + case ENOBUFS: + return "No buffer space available"; + case ENETDOWN: + return "Network is down"; + case EADDRINUSE: + return "Address in use"; + case EADDRNOTAVAIL: + return "Address not available"; #endif case EMTHREAD: return "Number of preallocated application threads exceeded"; @@ -246,6 +259,116 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) return (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); } +int zmq_poll (zmq_pollitem_t *items_, int nitems_) +{ + // TODO: Replace the polling mechanism by the virtualised framework + // used in 0MQ I/O threads. That'll make the thing work on all platforms. +#if !defined ZMQ_HAVE_LINUX + errno = ENOTSUP; + return -1; +#else + + pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); + zmq_assert (pollfds); + int npollfds = 0; + int nsockets = 0; + + zmq::app_thread_t *app_thread = NULL; + + for (int i = 0; i != nitems_; i++) { + + // 0MQ sockets. + if (items_ [i].socket) { + + // Get the app_thread the socket is living in. If there are two + // sockets in the same pollset with different app threads, fail. + zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; + if (app_thread) { + if (app_thread != s->get_thread ()) { + free (pollfds); + errno = EFAULT; + return -1; + } + } + else + app_thread = s->get_thread (); + + nsockets++; + continue; + } + + // Raw file descriptors. + pollfds [npollfds].fd = items_ [i].fd; + pollfds [npollfds].events = + (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) | + (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0); + npollfds++; + } + + // If there's at least one 0MQ socket in the pollset we have to poll + // for 0MQ commands. If ZMQ_POLL was not set, fail. + if (nsockets) { + pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd (); + if (pollfds [npollfds].fd == zmq::retired_fd) { + free (pollfds); + errno = ENOTSUP; + return -1; + } + pollfds [npollfds].events = POLLIN; + npollfds++; + } + + int nevents = 0; + bool initial = true; + while (!nevents) { + + // Wait for activity. In the first iteration just check for events, + // don't wait. Waiting would prevent exiting on any events that may + // already be signaled on 0MQ sockets. + int rc = poll (pollfds, npollfds, initial ? 0 : -1); + if (rc == -1 && errno == EINTR) + continue; + errno_assert (rc >= 0); + initial = false; + + // Process 0MQ commands if needed. + if (nsockets && pollfds [npollfds -1].revents & POLLIN) + app_thread->process_commands (false, false); + + // Check for the events. + int pollfd_pos = 0; + for (int i = 0; i != nitems_; i++) { + + // If the poll item is a raw file descriptor, simply convert + // the events to zmq_pollitem_t-style format. + if (!items_ [i].socket) { + items_ [i].revents = + (pollfds [pollfd_pos].revents & POLLIN ? ZMQ_POLLIN : 0) | + (pollfds [pollfd_pos].revents & POLLOUT ? ZMQ_POLLOUT : 0); + if (items_ [i].revents) + nevents++; + pollfd_pos++; + continue; + } + + // The poll item is a 0MQ socket. + zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; + items_ [i].revents = 0; + if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ()) + items_ [i].revents |= ZMQ_POLLOUT; + if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ()) + items_ [i].revents |= ZMQ_POLLIN; + if (items_ [i].revents) + nevents++; + } + } + + free (pollfds); + return nevents; + +#endif +} + #if defined ZMQ_HAVE_WINDOWS static uint64_t now () diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index b82af0a..baa0eee 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -137,6 +137,12 @@ void zmq::zmq_engine_t::out_event () void zmq::zmq_engine_t::revive () { set_pollout (handle); + + // Speculative write: The assumption is that at the moment new message + // was sent by the user the socket is probably available for writing. + // Thus we try to write the data to socket avoiding polling for POLLOUT. + // Consequently, the latency should be better in request/reply scenarios. + out_event (); } void zmq::zmq_engine_t::error () |