diff options
author | Martin Lucina <mato@kotelna.sk> | 2011-03-28 10:39:51 +0200 |
---|---|---|
committer | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:37 +0100 |
commit | 3e20cb1b8a2b1ca222011df37334e5f4f88dd565 (patch) | |
tree | 4a753775186bc7f583f1ceb3f9aa675b6f110596 /src/zmq.cpp | |
parent | 3f0085ddbef1a44b6bb7a0b23af497d56e0025fa (diff) | |
parent | e645fc2693acc796304498909786b7b47005b429 (diff) |
Imported Debian patch 2.1.3-1debian/2.1.3-1
Diffstat (limited to 'src/zmq.cpp')
-rw-r--r-- | src/zmq.cpp | 708 |
1 files changed, 354 insertions, 354 deletions
diff --git a/src/zmq.cpp b/src/zmq.cpp index 5770e04..929e51c 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1,22 +1,37 @@ /* - Copyright (c) 2007-2010 iMatix Corporation + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by + the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. + GNU Lesser General Public License for more details. - You should have received a copy of the Lesser GNU General Public License + You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include "platform.hpp" + +// On AIX, poll.h has to be included before zmq.h to get consistent +// definition of pollfd structure (AIX uses 'reqevents' and 'retnevents' +// instead of 'events' and 'revents' and defines macros to map from POSIX-y +// names to AIX-specific names). +#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ + defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ + defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ + defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\ + defined ZMQ_HAVE_NETBSD +#include <poll.h> +#endif + #include "../include/zmq.h" #include "../include/zmq_utils.h" @@ -25,34 +40,30 @@ #include <stdlib.h> #include <new> -#include "forwarder.hpp" -#include "queue.hpp" -#include "streamer.hpp" +#include "device.hpp" #include "socket_base.hpp" -#include "app_thread.hpp" #include "msg_content.hpp" -#include "platform.hpp" #include "stdint.hpp" #include "config.hpp" +#include "likely.hpp" +#include "clock.hpp" #include "ctx.hpp" #include "err.hpp" #include "fd.hpp" -#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ - defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ - defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ - defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\ - defined ZMQ_HAVE_NETBSD -#include <poll.h> -#endif - #if !defined ZMQ_HAVE_WINDOWS #include <unistd.h> -#include <sys/time.h> #endif #if defined ZMQ_HAVE_OPENPGM +#define __PGM_WININT_H__ #include <pgm/pgm.h> + +// TODO: OpenPGM redefines bool -- remove this once OpenPGM is fixed. +#if defined bool +#undef bool +#endif + #endif void zmq_version (int *major_, int *minor_, int *patch_) @@ -64,43 +75,7 @@ void zmq_version (int *major_, int *minor_, int *patch_) const char *zmq_strerror (int errnum_) { - switch (errnum_) { -#if defined ZMQ_HAVE_WINDOWS - case ENOTSUP: - return "Not supported"; - case EPROTONOSUPPORT: - return "Protocol not supported"; - case ENOBUFS: - return "No buffer space available"; - case ENETDOWN: - return "Network is down"; - case EADDRINUSE: - return "Address in use"; - case EADDRNOTAVAIL: - return "Address not available"; - case ECONNREFUSED: - return "Connection refused"; - case EINPROGRESS: - return "Operation in progress"; -#endif - case EMTHREAD: - return "Number of preallocated application threads exceeded"; - case EFSM: - return "Operation cannot be accomplished in current state"; - case ENOCOMPATPROTO: - return "The protocol is not compatible with the socket type"; - case ETERM: - return "Context was terminated"; - default: -#if defined _MSC_VER -#pragma warning (push) -#pragma warning (disable:4996) -#endif - return strerror (errnum_); -#if defined _MSC_VER -#pragma warning (pop) -#endif - } + return zmq::errno_to_string (errnum_); } int zmq_msg_init (zmq_msg_t *msg_) @@ -141,7 +116,7 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, zmq_free_fn *ffn_, void *hint_) { msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t)); - zmq_assert (msg_->content); + alloc_assert (msg_->content); msg_->flags = 0; zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; content->data = data_; @@ -155,24 +130,30 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, int zmq_msg_close (zmq_msg_t *msg_) { // For VSMs and delimiters there are no resources to free. - if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER || - msg_->content == (zmq::msg_content_t*) ZMQ_VSM) - return 0; + if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER && + msg_->content != (zmq::msg_content_t*) ZMQ_VSM) { - // If the content is not shared, or if it is shared and the reference. - // count has dropped to zero, deallocate it. - zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; - if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) { + // If the content is not shared, or if it is shared and the reference. + // count has dropped to zero, deallocate it. + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; + if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) { - // We used "placement new" operator to initialize the reference. - // counter so we call its destructor now. - content->refcnt.~atomic_counter_t (); + // We used "placement new" operator to initialize the reference. + // counter so we call its destructor now. + content->refcnt.~atomic_counter_t (); - if (content->ffn) - content->ffn (content->data, content->hint); - free (content); + if (content->ffn) + content->ffn (content->data, content->hint); + free (content); + } } + // As a safety measure, let's make the deallocated message look like + // an empty message. + msg_->content = (zmq::msg_content_t*) ZMQ_VSM; + msg_->flags = 0; + msg_->vsm_size = 0; + return 0; } @@ -235,38 +216,47 @@ void *zmq_init (int io_threads_) } #if defined ZMQ_HAVE_OPENPGM - // Unfortunately, OpenPGM doesn't support refcounted init/shutdown, thus, - // let's fail if it was initialised beforehand. - zmq_assert (!pgm_supported ()); // Init PGM transport. Ensure threading and timer are enabled. Find PGM // protocol ID. Note that if you want to use gettimeofday and sleep for // openPGM timing, set environment variables PGM_TIMER to "GTOD" and // PGM_SLEEP to "USLEEP". - GError *pgm_error = NULL; - int rc = pgm_init (&pgm_error); - if (rc != TRUE) { - if (pgm_error->domain == PGM_IF_ERROR && ( - pgm_error->code == PGM_IF_ERROR_INVAL || - pgm_error->code == PGM_IF_ERROR_XDEV || - pgm_error->code == PGM_IF_ERROR_NODEV || - pgm_error->code == PGM_IF_ERROR_NOTUNIQ || - pgm_error->code == PGM_IF_ERROR_ADDRFAMILY || - pgm_error->code == PGM_IF_ERROR_FAMILY || - pgm_error->code == PGM_IF_ERROR_NODATA || - pgm_error->code == PGM_IF_ERROR_NONAME || - pgm_error->code == PGM_IF_ERROR_SERVICE)) { - g_error_free (pgm_error); + pgm_error_t *pgm_error = NULL; + const bool ok = pgm_init (&pgm_error); + if (ok != TRUE) { + + // Invalid parameters don't set pgm_error_t + zmq_assert (pgm_error != NULL); + if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && ( + pgm_error->code == PGM_ERROR_FAILED)) { + + // Failed to access RTC or HPET device. + pgm_error_free (pgm_error); errno = EINVAL; return NULL; } + + // PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg. zmq_assert (false); } #endif +#ifdef ZMQ_HAVE_WINDOWS + // Intialise Windows sockets. Note that WSAStartup can be called multiple + // times given that WSACleanup will be called for each WSAStartup. + // We do this before the ctx constructor since its embedded mailbox_t + // object needs Winsock to be up and running. + WORD version_requested = MAKEWORD (2, 2); + WSADATA wsa_data; + int rc = WSAStartup (version_requested, &wsa_data); + zmq_assert (rc == 0); + zmq_assert (LOBYTE (wsa_data.wVersion) == 2 && + HIBYTE (wsa_data.wVersion) == 2); +#endif + // Create 0MQ context. zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_); - zmq_assert (ctx); + alloc_assert (ctx); return (void*) ctx; } @@ -277,9 +267,15 @@ int zmq_term (void *ctx_) return -1; } - int rc = ((zmq::ctx_t*) ctx_)->term (); + int rc = ((zmq::ctx_t*) ctx_)->terminate (); int en = errno; +#ifdef ZMQ_HAVE_WINDOWS + // On Windows, uninitialise socket layer. + rc = WSACleanup (); + wsa_assert (rc != SOCKET_ERROR); +#endif + #if defined ZMQ_HAVE_OPENPGM // Shut down the OpenPGM library. if (pgm_shutdown () != TRUE) @@ -366,158 +362,203 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) return (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); } -int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) -{ -#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ +#if defined ZMQ_FORCE_SELECT +#define ZMQ_POLL_BASED_ON_SELECT +#elif defined ZMQ_FORCE_POLL +#define ZMQ_POLL_BASED_ON_POLL +#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\ defined ZMQ_HAVE_NETBSD +#define ZMQ_POLL_BASED_ON_POLL +#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS +#define ZMQ_POLL_BASED_ON_SELECT +#endif + +int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) +{ +#if defined ZMQ_POLL_BASED_ON_POLL + if (unlikely (nitems_ < 0)) { + errno = EINVAL; + return -1; + } + if (unlikely (nitems_ == 0)) { + if (timeout_ == 0) + return 0; +#if defined ZMQ_HAVE_WINDOWS + Sleep (timeout_ > 0 ? timeout_ / 1000 : INFINITE); + return 0; +#else + return usleep (timeout_); +#endif + } if (!items_) { errno = EFAULT; return -1; } - pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); - zmq_assert (pollfds); - int npollfds = 0; - int nsockets = 0; - zmq::app_thread_t *app_thread = NULL; + zmq::clock_t clock; + uint64_t now = 0; + uint64_t end = 0; + + pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); + alloc_assert (pollfds); + // Build pollset for poll () system call. for (int i = 0; i != nitems_; i++) { - // 0MQ sockets. + // If the poll item is a 0MQ socket, we poll on the file descriptor + // retrieved by the ZMQ_FD socket option. if (items_ [i].socket) { - - // Get the app_thread the socket is living in. If there are two - // sockets in the same pollset with different app threads, fail. - zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; - if (app_thread) { - if (app_thread != s->get_thread ()) { - free (pollfds); - errno = EFAULT; - return -1; - } + size_t zmq_fd_size = sizeof (zmq::fd_t); + if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd, + &zmq_fd_size) == -1) { + free (pollfds); + return -1; } - else - app_thread = s->get_thread (); - - nsockets++; - continue; + pollfds [i].events = items_ [i].events ? POLLIN : 0; } - - // Raw file descriptors. - pollfds [npollfds].fd = items_ [i].fd; - pollfds [npollfds].events = - (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) | - (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0); - npollfds++; - } - - // If there's at least one 0MQ socket in the pollset we have to poll - // for 0MQ commands. If ZMQ_POLL was not set, fail. - if (nsockets) { - pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd (); - if (pollfds [npollfds].fd == zmq::retired_fd) { - free (pollfds); - errno = ENOTSUP; - return -1; + // Else, the poll item is a raw file descriptor. Just convert the + // events to normal POLLIN/POLLOUT for poll (). + else { + pollfds [i].fd = items_ [i].fd; + pollfds [i].events = + (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) | + (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0); } - pollfds [npollfds].events = POLLIN; - npollfds++; - } - - // First iteration just check for events, don't block. Waiting would - // prevent exiting on any events that may already been signaled on - // 0MQ sockets. - int rc = poll (pollfds, npollfds, 0); - if (rc == -1 && errno == EINTR && timeout_ >= 0) { - free (pollfds); - return 0; } - errno_assert (rc >= 0 || (rc == -1 && errno == EINTR)); - int timeout = timeout_ > 0 ? timeout_ / 1000 : -1; + bool first_pass = true; int nevents = 0; while (true) { - // Process 0MQ commands if needed. - if (nsockets && pollfds [npollfds -1].revents & POLLIN) - if (!app_thread->process_commands (false, false)) { + // Compute the timeout for the subsequent poll. + int timeout; + if (first_pass) + timeout = 0; + else if (timeout_ < 0) + timeout = -1; + else + timeout = end - now; + + // Wait for events. + while (true) { + int rc = poll (pollfds, nitems_, timeout); + if (rc == -1 && errno == EINTR) { free (pollfds); - errno = ETERM; return -1; } + errno_assert (rc >= 0); + break; + } // Check for the events. - int pollfd_pos = 0; for (int i = 0; i != nitems_; i++) { - // If the poll item is a raw file descriptor, simply convert + items_ [i].revents = 0; + + // The poll item is a 0MQ socket. Retrieve pending events + // using the ZMQ_EVENTS socket option. + if (items_ [i].socket) { + size_t zmq_events_size = sizeof (uint32_t); + uint32_t zmq_events; + if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, + &zmq_events_size) == -1) { + free (pollfds); + return -1; + } + if ((items_ [i].events & ZMQ_POLLOUT) && + (zmq_events & ZMQ_POLLOUT)) + items_ [i].revents |= ZMQ_POLLOUT; + if ((items_ [i].events & ZMQ_POLLIN) && + (zmq_events & ZMQ_POLLIN)) + items_ [i].revents |= ZMQ_POLLIN; + } + // Else, the poll item is a raw file descriptor, simply convert // the events to zmq_pollitem_t-style format. - if (!items_ [i].socket) { - items_ [i].revents = 0; - if (pollfds [pollfd_pos].revents & POLLIN) + else { + if (pollfds [i].revents & POLLIN) items_ [i].revents |= ZMQ_POLLIN; - if (pollfds [pollfd_pos].revents & POLLOUT) + if (pollfds [i].revents & POLLOUT) items_ [i].revents |= ZMQ_POLLOUT; - if (pollfds [pollfd_pos].revents & ~(POLLIN | POLLOUT)) + if (pollfds [i].revents & ~(POLLIN | POLLOUT)) items_ [i].revents |= ZMQ_POLLERR; - - if (items_ [i].revents) - nevents++; - pollfd_pos++; - continue; } - // The poll item is a 0MQ socket. - zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; - items_ [i].revents = 0; - if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ()) - items_ [i].revents |= ZMQ_POLLOUT; - if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ()) - items_ [i].revents |= ZMQ_POLLIN; if (items_ [i].revents) nevents++; } - // If there's at least one event, or if we are asked not to block, - // return immediately. - if (nevents || !timeout_) + // If timout is zero, exit immediately whether there are events or not. + if (timeout_ == 0) break; - // Wait for events. Ignore interrupts if there's infinite timeout. - while (true) { - rc = poll (pollfds, npollfds, timeout); - if (rc == -1 && errno == EINTR) { - if (timeout_ < 0) - continue; - else { - rc = 0; - break; - } - } - errno_assert (rc >= 0); + // If there are events to return, we can exit immediately. + if (nevents) break; + + // At this point we are meant to wait for events but there are none. + // If timeout is infinite we can just loop until we get some events. + if (timeout_ < 0) { + if (first_pass) + first_pass = false; + continue; } - - // If timeout was hit with no events signaled, return zero. - if (rc == 0) - break; - // If timeout was already applied, we don't want to poll anymore. - // Setting timeout to zero will cause termination of the function - // once the events we've got are processed. - if (timeout > 0) - timeout = 0; + // The timeout is finite and there are no events. In the first pass + // we get a timestamp of when the polling have begun. (We assume that + // first pass have taken negligible time). We also compute the time + // when the polling should time out. + if (first_pass) { + now = clock.now_ms (); + end = now + (timeout_ / 1000); + if (now == end) + break; + first_pass = false; + continue; + } + + // Find out whether timeout have expired. + now = clock.now_ms (); + if (now >= end) + break; } free (pollfds); return nevents; -#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS +#elif defined ZMQ_POLL_BASED_ON_SELECT + + if (unlikely (nitems_ < 0)) { + errno = EINVAL; + return -1; + } + if (unlikely (nitems_ == 0)) { + if (timeout_ == 0) + return 0; +#if defined ZMQ_HAVE_WINDOWS + Sleep (timeout_ > 0 ? timeout_ / 1000 : INFINITE); + return 0; +#else + return usleep (timeout_); +#endif + } + + if (!items_) { + errno = EFAULT; + return -1; + } + + zmq::clock_t clock; + uint64_t now = 0; + uint64_t end = 0; + + // Ensure we do not attempt to select () on more than FD_SETSIZE + // file descriptors. + zmq_assert (nitems_ <= FD_SETSIZE); fd_set pollset_in; FD_ZERO (&pollset_in); @@ -526,166 +567,163 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) fd_set pollset_err; FD_ZERO (&pollset_err); - zmq::app_thread_t *app_thread = NULL; - int nsockets = 0; - zmq::fd_t maxfd = zmq::retired_fd; - zmq::fd_t notify_fd = zmq::retired_fd; - - // Ensure we do not attempt to select () on more than FD_SETSIZE - // file descriptors. - zmq_assert (nitems_ <= FD_SETSIZE); + zmq::fd_t maxfd = 0; + // Build the fd_sets for passing to select (). for (int i = 0; i != nitems_; i++) { - // 0MQ sockets. + // If the poll item is a 0MQ socket we are interested in input on the + // notification file descriptor retrieved by the ZMQ_FD socket option. if (items_ [i].socket) { - - // Get the app_thread the socket is living in. If there are two - // sockets in the same pollset with different app threads, fail. - zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; - if (app_thread) { - if (app_thread != s->get_thread ()) { - errno = EFAULT; - return -1; - } + size_t zmq_fd_size = sizeof (zmq::fd_t); + zmq::fd_t notify_fd; + if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, + &zmq_fd_size) == -1) + return -1; + if (items_ [i].events) { + FD_SET (notify_fd, &pollset_in); + if (maxfd < notify_fd) + maxfd = notify_fd; } - else - app_thread = s->get_thread (); - - nsockets++; - continue; } - - // Raw file descriptors. - if (items_ [i].events & ZMQ_POLLIN) - FD_SET (items_ [i].fd, &pollset_in); - if (items_ [i].events & ZMQ_POLLOUT) - FD_SET (items_ [i].fd, &pollset_out); - if (items_ [i].events & ZMQ_POLLERR) - FD_SET (items_ [i].fd, &pollset_err); - if (maxfd == zmq::retired_fd || maxfd < items_ [i].fd) - maxfd = items_ [i].fd; - } - - // If there's at least one 0MQ socket in the pollset we have to poll - // for 0MQ commands. If ZMQ_POLL was not set, fail. - if (nsockets) { - notify_fd = app_thread->get_signaler ()->get_fd (); - if (notify_fd == zmq::retired_fd) { - errno = ENOTSUP; - return -1; + // Else, the poll item is a raw file descriptor. Convert the poll item + // events to the appropriate fd_sets. + else { + if (items_ [i].events & ZMQ_POLLIN) + FD_SET (items_ [i].fd, &pollset_in); + if (items_ [i].events & ZMQ_POLLOUT) + FD_SET (items_ [i].fd, &pollset_out); + if (items_ [i].events & ZMQ_POLLERR) + FD_SET (items_ [i].fd, &pollset_err); + if (maxfd < items_ [i].fd) + maxfd = items_ [i].fd; } - FD_SET (notify_fd, &pollset_in); - if (maxfd == zmq::retired_fd || maxfd < notify_fd) - maxfd = notify_fd; } - bool block = (timeout_ < 0); - timeval timeout = {timeout_ / 1000000, timeout_ % 1000000}; - timeval zero_timeout = {0, 0}; + bool first_pass = true; int nevents = 0; - - // First iteration just check for events, don't block. Waiting would - // prevent exiting on any events that may already been signaled on - // 0MQ sockets. fd_set inset, outset, errset; - memcpy (&inset, &pollset_in, sizeof (fd_set)); - memcpy (&outset, &pollset_out, sizeof (fd_set)); - memcpy (&errset, &pollset_err, sizeof (fd_set)); - int rc = select (maxfd, &inset, &outset, &errset, &zero_timeout); -#if defined ZMQ_HAVE_WINDOWS - wsa_assert (rc != SOCKET_ERROR); -#else - if (rc == -1 && errno == EINTR && timeout_ >= 0) - return 0; - errno_assert (rc >= 0 || (rc == -1 && errno == EINTR)); -#endif while (true) { - // Process 0MQ commands if needed. - if (nsockets && FD_ISSET (notify_fd, &inset)) - if (!app_thread->process_commands (false, false)) { - errno = ETERM; + // Compute the timeout for the subsequent poll. + timeval timeout; + timeval *ptimeout; + if (first_pass) { + timeout.tv_sec = 0; + timeout.tv_usec = 0; + ptimeout = &timeout; + } + else if (timeout_ < 0) + ptimeout = NULL; + else { + timeout.tv_sec = (long) ((end - now) / 1000); + timeout.tv_usec = (long) ((end - now) % 1000 * 1000); + ptimeout = &timeout; + } + + // Wait for events. Ignore interrupts if there's infinite timeout. + while (true) { + memcpy (&inset, &pollset_in, sizeof (fd_set)); + memcpy (&outset, &pollset_out, sizeof (fd_set)); + memcpy (&errset, &pollset_err, sizeof (fd_set)); + int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout); +#if defined ZMQ_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); +#else + if (rc == -1 && errno == EINTR) return -1; - } + errno_assert (rc >= 0); +#endif + break; + } // Check for the events. for (int i = 0; i != nitems_; i++) { - // If the poll item is a raw file descriptor, simply convert + items_ [i].revents = 0; + + // The poll item is a 0MQ socket. Retrieve pending events + // using the ZMQ_EVENTS socket option. + if (items_ [i].socket) { + size_t zmq_events_size = sizeof (uint32_t); + uint32_t zmq_events; + if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, + &zmq_events_size) == -1) + return -1; + if ((items_ [i].events & ZMQ_POLLOUT) && + (zmq_events & ZMQ_POLLOUT)) + items_ [i].revents |= ZMQ_POLLOUT; + if ((items_ [i].events & ZMQ_POLLIN) && + (zmq_events & ZMQ_POLLIN)) + items_ [i].revents |= ZMQ_POLLIN; + } + // Else, the poll item is a raw file descriptor, simply convert // the events to zmq_pollitem_t-style format. - if (!items_ [i].socket) { - items_ [i].revents = 0; + else { if (FD_ISSET (items_ [i].fd, &inset)) items_ [i].revents |= ZMQ_POLLIN; if (FD_ISSET (items_ [i].fd, &outset)) items_ [i].revents |= ZMQ_POLLOUT; if (FD_ISSET (items_ [i].fd, &errset)) items_ [i].revents |= ZMQ_POLLERR; - if (items_ [i].revents) - nevents++; - continue; } - // The poll item is a 0MQ socket. - zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket; - items_ [i].revents = 0; - if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ()) - items_ [i].revents |= ZMQ_POLLOUT; - if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ()) - items_ [i].revents |= ZMQ_POLLIN; if (items_ [i].revents) nevents++; } - // If there's at least one event, or if we are asked not to block, - // return immediately. - if (nevents || (timeout.tv_sec == 0 && timeout.tv_usec == 0)) + // If timout is zero, exit immediately whether there are events or not. + if (timeout_ == 0) break; - // Wait for events. Ignore interrupts if there's infinite timeout. - while (true) { - memcpy (&inset, &pollset_in, sizeof (fd_set)); - memcpy (&outset, &pollset_out, sizeof (fd_set)); - memcpy (&errset, &pollset_err, sizeof (fd_set)); - rc = select (maxfd, &inset, &outset, &errset, - block ? NULL : &timeout); -#if defined ZMQ_HAVE_WINDOWS - wsa_assert (rc != SOCKET_ERROR); -#else - if (rc == -1 && errno == EINTR) { - if (timeout_ < 0) - continue; - else { - rc = 0; - break; - } - } - errno_assert (rc >= 0); -#endif + // If there are events to return, we can exit immediately. + if (nevents) break; + + // At this point we are meant to wait for events but there are none. + // If timeout is infinite we can just loop until we get some events. + if (timeout_ < 0) { + if (first_pass) + first_pass = false; + continue; + } + + // The timeout is finite and there are no events. In the first pass + // we get a timestamp of when the polling have begun. (We assume that + // first pass have taken negligible time). We also compute the time + // when the polling should time out. + if (first_pass) { + now = clock.now_ms (); + end = now + (timeout_ / 1000); + if (now == end) + break; + first_pass = false; + continue; } - - // If timeout was hit with no events signaled, return zero. - if (rc == 0) - break; - // If timeout was already applied, we don't want to poll anymore. - // Setting timeout to zero will cause termination of the function - // once the events we've got are processed. - if (!block) - timeout = zero_timeout; + // Find out whether timeout have expired. + now = clock.now_ms (); + if (now >= end) + break; } return nevents; #else + // Exotic platforms that support neither poll() nor select(). errno = ENOTSUP; return -1; #endif } +#if defined ZMQ_POLL_BASED_ON_SELECT +#undef ZMQ_POLL_BASED_ON_SELECT +#endif +#if defined ZMQ_POLL_BASED_ON_POLL +#undef ZMQ_POLL_BASED_ON_POLL +#endif + int zmq_errno () { return errno; @@ -697,80 +735,42 @@ int zmq_device (int device_, void *insocket_, void *outsocket_) errno = EFAULT; return -1; } - switch (device_) { - case ZMQ_FORWARDER: - return zmq::forwarder ((zmq::socket_base_t*) insocket_, - (zmq::socket_base_t*) outsocket_); - case ZMQ_QUEUE: - return zmq::queue ((zmq::socket_base_t*) insocket_, - (zmq::socket_base_t*) outsocket_); - case ZMQ_STREAMER: - return zmq::streamer ((zmq::socket_base_t*) insocket_, - (zmq::socket_base_t*) outsocket_); - default: - return EINVAL; + + if (device_ != ZMQ_FORWARDER && device_ != ZMQ_QUEUE && + device_ != ZMQ_STREAMER) { + errno = EINVAL; + return -1; } + + return zmq::device ((zmq::socket_base_t*) insocket_, + (zmq::socket_base_t*) outsocket_); } //////////////////////////////////////////////////////////////////////////////// // 0MQ utils - to be used by perf tests //////////////////////////////////////////////////////////////////////////////// -#if defined ZMQ_HAVE_WINDOWS - -static uint64_t now () -{ - // Get the high resolution counter's accuracy. - LARGE_INTEGER ticksPerSecond; - QueryPerformanceFrequency (&ticksPerSecond); - - // What time is it? - LARGE_INTEGER tick; - QueryPerformanceCounter (&tick); - - // Convert the tick number into the number of seconds - // since the system was started. - double ticks_div = (double) (ticksPerSecond.QuadPart / 1000000); - return (uint64_t) (tick.QuadPart / ticks_div); -} - void zmq_sleep (int seconds_) { +#if defined ZMQ_HAVE_WINDOWS Sleep (seconds_ * 1000); -} - #else - -static uint64_t now () -{ - struct timeval tv; - int rc; - - rc = gettimeofday (&tv, NULL); - assert (rc == 0); - return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec); -} - -void zmq_sleep (int seconds_) -{ sleep (seconds_); -} - #endif +} void *zmq_stopwatch_start () { uint64_t *watch = (uint64_t*) malloc (sizeof (uint64_t)); - assert (watch); - *watch = now (); + alloc_assert (watch); + *watch = zmq::clock_t::now_us (); return (void*) watch; } unsigned long zmq_stopwatch_stop (void *watch_) { - uint64_t end = now (); + uint64_t end = zmq::clock_t::now_us (); uint64_t start = *(uint64_t*) watch_; free (watch_); return (unsigned long) (end - start); } - |