summaryrefslogtreecommitdiff
path: root/src/zmq.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zmq.cpp')
-rw-r--r--src/zmq.cpp708
1 files changed, 354 insertions, 354 deletions
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 5770e04..929e51c 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -1,22 +1,37 @@
/*
- Copyright (c) 2007-2010 iMatix Corporation
+ Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
+ the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
+ GNU Lesser General Public License for more details.
- You should have received a copy of the Lesser GNU General Public License
+ You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include "platform.hpp"
+
+// On AIX, poll.h has to be included before zmq.h to get consistent
+// definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
+// instead of 'events' and 'revents' and defines macros to map from POSIX-y
+// names to AIX-specific names).
+#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
+ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
+ defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
+ defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
+ defined ZMQ_HAVE_NETBSD
+#include <poll.h>
+#endif
+
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
@@ -25,34 +40,30 @@
#include <stdlib.h>
#include <new>
-#include "forwarder.hpp"
-#include "queue.hpp"
-#include "streamer.hpp"
+#include "device.hpp"
#include "socket_base.hpp"
-#include "app_thread.hpp"
#include "msg_content.hpp"
-#include "platform.hpp"
#include "stdint.hpp"
#include "config.hpp"
+#include "likely.hpp"
+#include "clock.hpp"
#include "ctx.hpp"
#include "err.hpp"
#include "fd.hpp"
-#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
- defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
- defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
- defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
- defined ZMQ_HAVE_NETBSD
-#include <poll.h>
-#endif
-
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
-#include <sys/time.h>
#endif
#if defined ZMQ_HAVE_OPENPGM
+#define __PGM_WININT_H__
#include <pgm/pgm.h>
+
+// TODO: OpenPGM redefines bool -- remove this once OpenPGM is fixed.
+#if defined bool
+#undef bool
+#endif
+
#endif
void zmq_version (int *major_, int *minor_, int *patch_)
@@ -64,43 +75,7 @@ void zmq_version (int *major_, int *minor_, int *patch_)
const char *zmq_strerror (int errnum_)
{
- switch (errnum_) {
-#if defined ZMQ_HAVE_WINDOWS
- case ENOTSUP:
- return "Not supported";
- case EPROTONOSUPPORT:
- return "Protocol not supported";
- case ENOBUFS:
- return "No buffer space available";
- case ENETDOWN:
- return "Network is down";
- case EADDRINUSE:
- return "Address in use";
- case EADDRNOTAVAIL:
- return "Address not available";
- case ECONNREFUSED:
- return "Connection refused";
- case EINPROGRESS:
- return "Operation in progress";
-#endif
- case EMTHREAD:
- return "Number of preallocated application threads exceeded";
- case EFSM:
- return "Operation cannot be accomplished in current state";
- case ENOCOMPATPROTO:
- return "The protocol is not compatible with the socket type";
- case ETERM:
- return "Context was terminated";
- default:
-#if defined _MSC_VER
-#pragma warning (push)
-#pragma warning (disable:4996)
-#endif
- return strerror (errnum_);
-#if defined _MSC_VER
-#pragma warning (pop)
-#endif
- }
+ return zmq::errno_to_string (errnum_);
}
int zmq_msg_init (zmq_msg_t *msg_)
@@ -141,7 +116,7 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
zmq_free_fn *ffn_, void *hint_)
{
msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));
- zmq_assert (msg_->content);
+ alloc_assert (msg_->content);
msg_->flags = 0;
zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
content->data = data_;
@@ -155,24 +130,30 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
int zmq_msg_close (zmq_msg_t *msg_)
{
// For VSMs and delimiters there are no resources to free.
- if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER ||
- msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
- return 0;
+ if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
+ msg_->content != (zmq::msg_content_t*) ZMQ_VSM) {
- // If the content is not shared, or if it is shared and the reference.
- // count has dropped to zero, deallocate it.
- zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
- if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) {
+ // If the content is not shared, or if it is shared and the reference.
+ // count has dropped to zero, deallocate it.
+ zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
+ if (!(msg_->flags & ZMQ_MSG_SHARED) || !content->refcnt.sub (1)) {
- // We used "placement new" operator to initialize the reference.
- // counter so we call its destructor now.
- content->refcnt.~atomic_counter_t ();
+ // We used "placement new" operator to initialize the reference.
+ // counter so we call its destructor now.
+ content->refcnt.~atomic_counter_t ();
- if (content->ffn)
- content->ffn (content->data, content->hint);
- free (content);
+ if (content->ffn)
+ content->ffn (content->data, content->hint);
+ free (content);
+ }
}
+ // As a safety measure, let's make the deallocated message look like
+ // an empty message.
+ msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
+ msg_->flags = 0;
+ msg_->vsm_size = 0;
+
return 0;
}
@@ -235,38 +216,47 @@ void *zmq_init (int io_threads_)
}
#if defined ZMQ_HAVE_OPENPGM
- // Unfortunately, OpenPGM doesn't support refcounted init/shutdown, thus,
- // let's fail if it was initialised beforehand.
- zmq_assert (!pgm_supported ());
// Init PGM transport. Ensure threading and timer are enabled. Find PGM
// protocol ID. Note that if you want to use gettimeofday and sleep for
// openPGM timing, set environment variables PGM_TIMER to "GTOD" and
// PGM_SLEEP to "USLEEP".
- GError *pgm_error = NULL;
- int rc = pgm_init (&pgm_error);
- if (rc != TRUE) {
- if (pgm_error->domain == PGM_IF_ERROR && (
- pgm_error->code == PGM_IF_ERROR_INVAL ||
- pgm_error->code == PGM_IF_ERROR_XDEV ||
- pgm_error->code == PGM_IF_ERROR_NODEV ||
- pgm_error->code == PGM_IF_ERROR_NOTUNIQ ||
- pgm_error->code == PGM_IF_ERROR_ADDRFAMILY ||
- pgm_error->code == PGM_IF_ERROR_FAMILY ||
- pgm_error->code == PGM_IF_ERROR_NODATA ||
- pgm_error->code == PGM_IF_ERROR_NONAME ||
- pgm_error->code == PGM_IF_ERROR_SERVICE)) {
- g_error_free (pgm_error);
+ pgm_error_t *pgm_error = NULL;
+ const bool ok = pgm_init (&pgm_error);
+ if (ok != TRUE) {
+
+ // Invalid parameters don't set pgm_error_t
+ zmq_assert (pgm_error != NULL);
+ if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && (
+ pgm_error->code == PGM_ERROR_FAILED)) {
+
+ // Failed to access RTC or HPET device.
+ pgm_error_free (pgm_error);
errno = EINVAL;
return NULL;
}
+
+ // PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
zmq_assert (false);
}
#endif
+#ifdef ZMQ_HAVE_WINDOWS
+ // Intialise Windows sockets. Note that WSAStartup can be called multiple
+ // times given that WSACleanup will be called for each WSAStartup.
+ // We do this before the ctx constructor since its embedded mailbox_t
+ // object needs Winsock to be up and running.
+ WORD version_requested = MAKEWORD (2, 2);
+ WSADATA wsa_data;
+ int rc = WSAStartup (version_requested, &wsa_data);
+ zmq_assert (rc == 0);
+ zmq_assert (LOBYTE (wsa_data.wVersion) == 2 &&
+ HIBYTE (wsa_data.wVersion) == 2);
+#endif
+
// Create 0MQ context.
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_);
- zmq_assert (ctx);
+ alloc_assert (ctx);
return (void*) ctx;
}
@@ -277,9 +267,15 @@ int zmq_term (void *ctx_)
return -1;
}
- int rc = ((zmq::ctx_t*) ctx_)->term ();
+ int rc = ((zmq::ctx_t*) ctx_)->terminate ();
int en = errno;
+#ifdef ZMQ_HAVE_WINDOWS
+ // On Windows, uninitialise socket layer.
+ rc = WSACleanup ();
+ wsa_assert (rc != SOCKET_ERROR);
+#endif
+
#if defined ZMQ_HAVE_OPENPGM
// Shut down the OpenPGM library.
if (pgm_shutdown () != TRUE)
@@ -366,158 +362,203 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
}
-int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
-{
-#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
+#if defined ZMQ_FORCE_SELECT
+#define ZMQ_POLL_BASED_ON_SELECT
+#elif defined ZMQ_FORCE_POLL
+#define ZMQ_POLL_BASED_ON_POLL
+#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
defined ZMQ_HAVE_NETBSD
+#define ZMQ_POLL_BASED_ON_POLL
+#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+#define ZMQ_POLL_BASED_ON_SELECT
+#endif
+
+int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
+{
+#if defined ZMQ_POLL_BASED_ON_POLL
+ if (unlikely (nitems_ < 0)) {
+ errno = EINVAL;
+ return -1;
+ }
+ if (unlikely (nitems_ == 0)) {
+ if (timeout_ == 0)
+ return 0;
+#if defined ZMQ_HAVE_WINDOWS
+ Sleep (timeout_ > 0 ? timeout_ / 1000 : INFINITE);
+ return 0;
+#else
+ return usleep (timeout_);
+#endif
+ }
if (!items_) {
errno = EFAULT;
return -1;
}
- pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
- zmq_assert (pollfds);
- int npollfds = 0;
- int nsockets = 0;
- zmq::app_thread_t *app_thread = NULL;
+ zmq::clock_t clock;
+ uint64_t now = 0;
+ uint64_t end = 0;
+
+ pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
+ alloc_assert (pollfds);
+ // Build pollset for poll () system call.
for (int i = 0; i != nitems_; i++) {
- // 0MQ sockets.
+ // If the poll item is a 0MQ socket, we poll on the file descriptor
+ // retrieved by the ZMQ_FD socket option.
if (items_ [i].socket) {
-
- // Get the app_thread the socket is living in. If there are two
- // sockets in the same pollset with different app threads, fail.
- zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
- if (app_thread) {
- if (app_thread != s->get_thread ()) {
- free (pollfds);
- errno = EFAULT;
- return -1;
- }
+ size_t zmq_fd_size = sizeof (zmq::fd_t);
+ if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
+ &zmq_fd_size) == -1) {
+ free (pollfds);
+ return -1;
}
- else
- app_thread = s->get_thread ();
-
- nsockets++;
- continue;
+ pollfds [i].events = items_ [i].events ? POLLIN : 0;
}
-
- // Raw file descriptors.
- pollfds [npollfds].fd = items_ [i].fd;
- pollfds [npollfds].events =
- (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
- (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
- npollfds++;
- }
-
- // If there's at least one 0MQ socket in the pollset we have to poll
- // for 0MQ commands. If ZMQ_POLL was not set, fail.
- if (nsockets) {
- pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd ();
- if (pollfds [npollfds].fd == zmq::retired_fd) {
- free (pollfds);
- errno = ENOTSUP;
- return -1;
+ // Else, the poll item is a raw file descriptor. Just convert the
+ // events to normal POLLIN/POLLOUT for poll ().
+ else {
+ pollfds [i].fd = items_ [i].fd;
+ pollfds [i].events =
+ (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
+ (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
}
- pollfds [npollfds].events = POLLIN;
- npollfds++;
- }
-
- // First iteration just check for events, don't block. Waiting would
- // prevent exiting on any events that may already been signaled on
- // 0MQ sockets.
- int rc = poll (pollfds, npollfds, 0);
- if (rc == -1 && errno == EINTR && timeout_ >= 0) {
- free (pollfds);
- return 0;
}
- errno_assert (rc >= 0 || (rc == -1 && errno == EINTR));
- int timeout = timeout_ > 0 ? timeout_ / 1000 : -1;
+ bool first_pass = true;
int nevents = 0;
while (true) {
- // Process 0MQ commands if needed.
- if (nsockets && pollfds [npollfds -1].revents & POLLIN)
- if (!app_thread->process_commands (false, false)) {
+ // Compute the timeout for the subsequent poll.
+ int timeout;
+ if (first_pass)
+ timeout = 0;
+ else if (timeout_ < 0)
+ timeout = -1;
+ else
+ timeout = end - now;
+
+ // Wait for events.
+ while (true) {
+ int rc = poll (pollfds, nitems_, timeout);
+ if (rc == -1 && errno == EINTR) {
free (pollfds);
- errno = ETERM;
return -1;
}
+ errno_assert (rc >= 0);
+ break;
+ }
// Check for the events.
- int pollfd_pos = 0;
for (int i = 0; i != nitems_; i++) {
- // If the poll item is a raw file descriptor, simply convert
+ items_ [i].revents = 0;
+
+ // The poll item is a 0MQ socket. Retrieve pending events
+ // using the ZMQ_EVENTS socket option.
+ if (items_ [i].socket) {
+ size_t zmq_events_size = sizeof (uint32_t);
+ uint32_t zmq_events;
+ if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
+ &zmq_events_size) == -1) {
+ free (pollfds);
+ return -1;
+ }
+ if ((items_ [i].events & ZMQ_POLLOUT) &&
+ (zmq_events & ZMQ_POLLOUT))
+ items_ [i].revents |= ZMQ_POLLOUT;
+ if ((items_ [i].events & ZMQ_POLLIN) &&
+ (zmq_events & ZMQ_POLLIN))
+ items_ [i].revents |= ZMQ_POLLIN;
+ }
+ // Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
- if (!items_ [i].socket) {
- items_ [i].revents = 0;
- if (pollfds [pollfd_pos].revents & POLLIN)
+ else {
+ if (pollfds [i].revents & POLLIN)
items_ [i].revents |= ZMQ_POLLIN;
- if (pollfds [pollfd_pos].revents & POLLOUT)
+ if (pollfds [i].revents & POLLOUT)
items_ [i].revents |= ZMQ_POLLOUT;
- if (pollfds [pollfd_pos].revents & ~(POLLIN | POLLOUT))
+ if (pollfds [i].revents & ~(POLLIN | POLLOUT))
items_ [i].revents |= ZMQ_POLLERR;
-
- if (items_ [i].revents)
- nevents++;
- pollfd_pos++;
- continue;
}
- // The poll item is a 0MQ socket.
- zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
- items_ [i].revents = 0;
- if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ())
- items_ [i].revents |= ZMQ_POLLOUT;
- if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
- items_ [i].revents |= ZMQ_POLLIN;
if (items_ [i].revents)
nevents++;
}
- // If there's at least one event, or if we are asked not to block,
- // return immediately.
- if (nevents || !timeout_)
+ // If timout is zero, exit immediately whether there are events or not.
+ if (timeout_ == 0)
break;
- // Wait for events. Ignore interrupts if there's infinite timeout.
- while (true) {
- rc = poll (pollfds, npollfds, timeout);
- if (rc == -1 && errno == EINTR) {
- if (timeout_ < 0)
- continue;
- else {
- rc = 0;
- break;
- }
- }
- errno_assert (rc >= 0);
+ // If there are events to return, we can exit immediately.
+ if (nevents)
break;
+
+ // At this point we are meant to wait for events but there are none.
+ // If timeout is infinite we can just loop until we get some events.
+ if (timeout_ < 0) {
+ if (first_pass)
+ first_pass = false;
+ continue;
}
-
- // If timeout was hit with no events signaled, return zero.
- if (rc == 0)
- break;
- // If timeout was already applied, we don't want to poll anymore.
- // Setting timeout to zero will cause termination of the function
- // once the events we've got are processed.
- if (timeout > 0)
- timeout = 0;
+ // The timeout is finite and there are no events. In the first pass
+ // we get a timestamp of when the polling have begun. (We assume that
+ // first pass have taken negligible time). We also compute the time
+ // when the polling should time out.
+ if (first_pass) {
+ now = clock.now_ms ();
+ end = now + (timeout_ / 1000);
+ if (now == end)
+ break;
+ first_pass = false;
+ continue;
+ }
+
+ // Find out whether timeout have expired.
+ now = clock.now_ms ();
+ if (now >= end)
+ break;
}
free (pollfds);
return nevents;
-#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+#elif defined ZMQ_POLL_BASED_ON_SELECT
+
+ if (unlikely (nitems_ < 0)) {
+ errno = EINVAL;
+ return -1;
+ }
+ if (unlikely (nitems_ == 0)) {
+ if (timeout_ == 0)
+ return 0;
+#if defined ZMQ_HAVE_WINDOWS
+ Sleep (timeout_ > 0 ? timeout_ / 1000 : INFINITE);
+ return 0;
+#else
+ return usleep (timeout_);
+#endif
+ }
+
+ if (!items_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ zmq::clock_t clock;
+ uint64_t now = 0;
+ uint64_t end = 0;
+
+ // Ensure we do not attempt to select () on more than FD_SETSIZE
+ // file descriptors.
+ zmq_assert (nitems_ <= FD_SETSIZE);
fd_set pollset_in;
FD_ZERO (&pollset_in);
@@ -526,166 +567,163 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
fd_set pollset_err;
FD_ZERO (&pollset_err);
- zmq::app_thread_t *app_thread = NULL;
- int nsockets = 0;
- zmq::fd_t maxfd = zmq::retired_fd;
- zmq::fd_t notify_fd = zmq::retired_fd;
-
- // Ensure we do not attempt to select () on more than FD_SETSIZE
- // file descriptors.
- zmq_assert (nitems_ <= FD_SETSIZE);
+ zmq::fd_t maxfd = 0;
+ // Build the fd_sets for passing to select ().
for (int i = 0; i != nitems_; i++) {
- // 0MQ sockets.
+ // If the poll item is a 0MQ socket we are interested in input on the
+ // notification file descriptor retrieved by the ZMQ_FD socket option.
if (items_ [i].socket) {
-
- // Get the app_thread the socket is living in. If there are two
- // sockets in the same pollset with different app threads, fail.
- zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
- if (app_thread) {
- if (app_thread != s->get_thread ()) {
- errno = EFAULT;
- return -1;
- }
+ size_t zmq_fd_size = sizeof (zmq::fd_t);
+ zmq::fd_t notify_fd;
+ if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
+ &zmq_fd_size) == -1)
+ return -1;
+ if (items_ [i].events) {
+ FD_SET (notify_fd, &pollset_in);
+ if (maxfd < notify_fd)
+ maxfd = notify_fd;
}
- else
- app_thread = s->get_thread ();
-
- nsockets++;
- continue;
}
-
- // Raw file descriptors.
- if (items_ [i].events & ZMQ_POLLIN)
- FD_SET (items_ [i].fd, &pollset_in);
- if (items_ [i].events & ZMQ_POLLOUT)
- FD_SET (items_ [i].fd, &pollset_out);
- if (items_ [i].events & ZMQ_POLLERR)
- FD_SET (items_ [i].fd, &pollset_err);
- if (maxfd == zmq::retired_fd || maxfd < items_ [i].fd)
- maxfd = items_ [i].fd;
- }
-
- // If there's at least one 0MQ socket in the pollset we have to poll
- // for 0MQ commands. If ZMQ_POLL was not set, fail.
- if (nsockets) {
- notify_fd = app_thread->get_signaler ()->get_fd ();
- if (notify_fd == zmq::retired_fd) {
- errno = ENOTSUP;
- return -1;
+ // Else, the poll item is a raw file descriptor. Convert the poll item
+ // events to the appropriate fd_sets.
+ else {
+ if (items_ [i].events & ZMQ_POLLIN)
+ FD_SET (items_ [i].fd, &pollset_in);
+ if (items_ [i].events & ZMQ_POLLOUT)
+ FD_SET (items_ [i].fd, &pollset_out);
+ if (items_ [i].events & ZMQ_POLLERR)
+ FD_SET (items_ [i].fd, &pollset_err);
+ if (maxfd < items_ [i].fd)
+ maxfd = items_ [i].fd;
}
- FD_SET (notify_fd, &pollset_in);
- if (maxfd == zmq::retired_fd || maxfd < notify_fd)
- maxfd = notify_fd;
}
- bool block = (timeout_ < 0);
- timeval timeout = {timeout_ / 1000000, timeout_ % 1000000};
- timeval zero_timeout = {0, 0};
+ bool first_pass = true;
int nevents = 0;
-
- // First iteration just check for events, don't block. Waiting would
- // prevent exiting on any events that may already been signaled on
- // 0MQ sockets.
fd_set inset, outset, errset;
- memcpy (&inset, &pollset_in, sizeof (fd_set));
- memcpy (&outset, &pollset_out, sizeof (fd_set));
- memcpy (&errset, &pollset_err, sizeof (fd_set));
- int rc = select (maxfd, &inset, &outset, &errset, &zero_timeout);
-#if defined ZMQ_HAVE_WINDOWS
- wsa_assert (rc != SOCKET_ERROR);
-#else
- if (rc == -1 && errno == EINTR && timeout_ >= 0)
- return 0;
- errno_assert (rc >= 0 || (rc == -1 && errno == EINTR));
-#endif
while (true) {
- // Process 0MQ commands if needed.
- if (nsockets && FD_ISSET (notify_fd, &inset))
- if (!app_thread->process_commands (false, false)) {
- errno = ETERM;
+ // Compute the timeout for the subsequent poll.
+ timeval timeout;
+ timeval *ptimeout;
+ if (first_pass) {
+ timeout.tv_sec = 0;
+ timeout.tv_usec = 0;
+ ptimeout = &timeout;
+ }
+ else if (timeout_ < 0)
+ ptimeout = NULL;
+ else {
+ timeout.tv_sec = (long) ((end - now) / 1000);
+ timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
+ ptimeout = &timeout;
+ }
+
+ // Wait for events. Ignore interrupts if there's infinite timeout.
+ while (true) {
+ memcpy (&inset, &pollset_in, sizeof (fd_set));
+ memcpy (&outset, &pollset_out, sizeof (fd_set));
+ memcpy (&errset, &pollset_err, sizeof (fd_set));
+ int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
+#if defined ZMQ_HAVE_WINDOWS
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ if (rc == -1 && errno == EINTR)
return -1;
- }
+ errno_assert (rc >= 0);
+#endif
+ break;
+ }
// Check for the events.
for (int i = 0; i != nitems_; i++) {
- // If the poll item is a raw file descriptor, simply convert
+ items_ [i].revents = 0;
+
+ // The poll item is a 0MQ socket. Retrieve pending events
+ // using the ZMQ_EVENTS socket option.
+ if (items_ [i].socket) {
+ size_t zmq_events_size = sizeof (uint32_t);
+ uint32_t zmq_events;
+ if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
+ &zmq_events_size) == -1)
+ return -1;
+ if ((items_ [i].events & ZMQ_POLLOUT) &&
+ (zmq_events & ZMQ_POLLOUT))
+ items_ [i].revents |= ZMQ_POLLOUT;
+ if ((items_ [i].events & ZMQ_POLLIN) &&
+ (zmq_events & ZMQ_POLLIN))
+ items_ [i].revents |= ZMQ_POLLIN;
+ }
+ // Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
- if (!items_ [i].socket) {
- items_ [i].revents = 0;
+ else {
if (FD_ISSET (items_ [i].fd, &inset))
items_ [i].revents |= ZMQ_POLLIN;
if (FD_ISSET (items_ [i].fd, &outset))
items_ [i].revents |= ZMQ_POLLOUT;
if (FD_ISSET (items_ [i].fd, &errset))
items_ [i].revents |= ZMQ_POLLERR;
- if (items_ [i].revents)
- nevents++;
- continue;
}
- // The poll item is a 0MQ socket.
- zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
- items_ [i].revents = 0;
- if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ())
- items_ [i].revents |= ZMQ_POLLOUT;
- if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
- items_ [i].revents |= ZMQ_POLLIN;
if (items_ [i].revents)
nevents++;
}
- // If there's at least one event, or if we are asked not to block,
- // return immediately.
- if (nevents || (timeout.tv_sec == 0 && timeout.tv_usec == 0))
+ // If timout is zero, exit immediately whether there are events or not.
+ if (timeout_ == 0)
break;
- // Wait for events. Ignore interrupts if there's infinite timeout.
- while (true) {
- memcpy (&inset, &pollset_in, sizeof (fd_set));
- memcpy (&outset, &pollset_out, sizeof (fd_set));
- memcpy (&errset, &pollset_err, sizeof (fd_set));
- rc = select (maxfd, &inset, &outset, &errset,
- block ? NULL : &timeout);
-#if defined ZMQ_HAVE_WINDOWS
- wsa_assert (rc != SOCKET_ERROR);
-#else
- if (rc == -1 && errno == EINTR) {
- if (timeout_ < 0)
- continue;
- else {
- rc = 0;
- break;
- }
- }
- errno_assert (rc >= 0);
-#endif
+ // If there are events to return, we can exit immediately.
+ if (nevents)
break;
+
+ // At this point we are meant to wait for events but there are none.
+ // If timeout is infinite we can just loop until we get some events.
+ if (timeout_ < 0) {
+ if (first_pass)
+ first_pass = false;
+ continue;
+ }
+
+ // The timeout is finite and there are no events. In the first pass
+ // we get a timestamp of when the polling have begun. (We assume that
+ // first pass have taken negligible time). We also compute the time
+ // when the polling should time out.
+ if (first_pass) {
+ now = clock.now_ms ();
+ end = now + (timeout_ / 1000);
+ if (now == end)
+ break;
+ first_pass = false;
+ continue;
}
-
- // If timeout was hit with no events signaled, return zero.
- if (rc == 0)
- break;
- // If timeout was already applied, we don't want to poll anymore.
- // Setting timeout to zero will cause termination of the function
- // once the events we've got are processed.
- if (!block)
- timeout = zero_timeout;
+ // Find out whether timeout have expired.
+ now = clock.now_ms ();
+ if (now >= end)
+ break;
}
return nevents;
#else
+ // Exotic platforms that support neither poll() nor select().
errno = ENOTSUP;
return -1;
#endif
}
+#if defined ZMQ_POLL_BASED_ON_SELECT
+#undef ZMQ_POLL_BASED_ON_SELECT
+#endif
+#if defined ZMQ_POLL_BASED_ON_POLL
+#undef ZMQ_POLL_BASED_ON_POLL
+#endif
+
int zmq_errno ()
{
return errno;
@@ -697,80 +735,42 @@ int zmq_device (int device_, void *insocket_, void *outsocket_)
errno = EFAULT;
return -1;
}
- switch (device_) {
- case ZMQ_FORWARDER:
- return zmq::forwarder ((zmq::socket_base_t*) insocket_,
- (zmq::socket_base_t*) outsocket_);
- case ZMQ_QUEUE:
- return zmq::queue ((zmq::socket_base_t*) insocket_,
- (zmq::socket_base_t*) outsocket_);
- case ZMQ_STREAMER:
- return zmq::streamer ((zmq::socket_base_t*) insocket_,
- (zmq::socket_base_t*) outsocket_);
- default:
- return EINVAL;
+
+ if (device_ != ZMQ_FORWARDER && device_ != ZMQ_QUEUE &&
+ device_ != ZMQ_STREAMER) {
+ errno = EINVAL;
+ return -1;
}
+
+ return zmq::device ((zmq::socket_base_t*) insocket_,
+ (zmq::socket_base_t*) outsocket_);
}
////////////////////////////////////////////////////////////////////////////////
// 0MQ utils - to be used by perf tests
////////////////////////////////////////////////////////////////////////////////
-#if defined ZMQ_HAVE_WINDOWS
-
-static uint64_t now ()
-{
- // Get the high resolution counter's accuracy.
- LARGE_INTEGER ticksPerSecond;
- QueryPerformanceFrequency (&ticksPerSecond);
-
- // What time is it?
- LARGE_INTEGER tick;
- QueryPerformanceCounter (&tick);
-
- // Convert the tick number into the number of seconds
- // since the system was started.
- double ticks_div = (double) (ticksPerSecond.QuadPart / 1000000);
- return (uint64_t) (tick.QuadPart / ticks_div);
-}
-
void zmq_sleep (int seconds_)
{
+#if defined ZMQ_HAVE_WINDOWS
Sleep (seconds_ * 1000);
-}
-
#else
-
-static uint64_t now ()
-{
- struct timeval tv;
- int rc;
-
- rc = gettimeofday (&tv, NULL);
- assert (rc == 0);
- return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec);
-}
-
-void zmq_sleep (int seconds_)
-{
sleep (seconds_);
-}
-
#endif
+}
void *zmq_stopwatch_start ()
{
uint64_t *watch = (uint64_t*) malloc (sizeof (uint64_t));
- assert (watch);
- *watch = now ();
+ alloc_assert (watch);
+ *watch = zmq::clock_t::now_us ();
return (void*) watch;
}
unsigned long zmq_stopwatch_stop (void *watch_)
{
- uint64_t end = now ();
+ uint64_t end = zmq::clock_t::now_us ();
uint64_t start = *(uint64_t*) watch_;
free (watch_);
return (unsigned long) (end - start);
}
-