summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Lucina <mato@kotelna.sk>2010-08-07 18:24:12 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-08-25 15:39:20 +0200
commitcd12508418530724f10a353fc3a14ab321d0be8e (patch)
tree37a8c0620a698129ae2381782809ea30f58235ce /src
parenteb7b8a413a99b2e43e8feee410f2b860e99e7056 (diff)
zmq_poll(): Rewrite to use ZMQ_FD/ZMQ_EVENTS pt1
Rewrite zmq_poll() to use ZMQ_FD and ZMQ_EVENTS introduced on the wip-shutdown branch. Only do the poll()-based version of zmq_poll (), the select()-based version will not compile at the moment.
Diffstat (limited to 'src')
-rw-r--r--src/zmq.cpp169
1 files changed, 54 insertions, 115 deletions
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 6dc577e..d7ce546 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -364,7 +364,6 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
-/*
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
@@ -377,138 +376,81 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
}
pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
zmq_assert (pollfds);
- int npollfds = 0;
- int nsockets = 0;
-
- zmq::app_thread_t *app_thread = NULL;
+ // Build pollset for poll () system call.
for (int i = 0; i != nitems_; i++) {
- // 0MQ sockets.
+ // If the poll item is a 0MQ socket, we poll on the file descriptor
+ // retrieved by the ZMQ_FD socket option.
if (items_ [i].socket) {
-
- // Get the app_thread the socket is living in. If there are two
- // sockets in the same pollset with different app threads, fail.
- zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
- if (app_thread) {
- if (app_thread != s->get_thread ()) {
- free (pollfds);
- errno = EFAULT;
- return -1;
- }
+ size_t zmq_fd_size = sizeof (int);
+ if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
+ &zmq_fd_size) == -1) {
+ free (pollfds);
+ return -1;
}
- else
- app_thread = s->get_thread ();
-
- nsockets++;
- continue;
+ pollfds [i].events = POLLIN;
}
-
- // Raw file descriptors.
- pollfds [npollfds].fd = items_ [i].fd;
- pollfds [npollfds].events =
- (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
- (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
- npollfds++;
+ // Else, the poll item is a raw file descriptor. Just convert the
+ // events to normal POLLIN/POLLOUT for poll ().
+ else {
+ pollfds [i].fd = items_ [i].fd;
+ pollfds [i].events =
+ (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
+ (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
+ }
}
- // If there's at least one 0MQ socket in the pollset we have to poll
- // for 0MQ commands. If ZMQ_POLL was not set, fail.
- if (nsockets) {
- pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd ();
- if (pollfds [npollfds].fd == zmq::retired_fd) {
- free (pollfds);
- errno = ENOTSUP;
- return -1;
- }
- pollfds [npollfds].events = POLLIN;
- npollfds++;
- }
-
- // First iteration just check for events, don't block. Waiting would
- // prevent exiting on any events that may already been signaled on
- // 0MQ sockets.
- int rc = poll (pollfds, npollfds, 0);
- if (rc == -1 && errno == EINTR && timeout_ >= 0) {
- free (pollfds);
- return 0;
- }
- errno_assert (rc >= 0 || (rc == -1 && errno == EINTR));
-
int timeout = timeout_ > 0 ? timeout_ / 1000 : -1;
int nevents = 0;
+ // Wait for events. Ignore interrupts if there's infinite timeout.
while (true) {
-
- // Process 0MQ commands if needed.
- if (nsockets && pollfds [npollfds -1].revents & POLLIN)
- if (!app_thread->process_commands (false, false)) {
- free (pollfds);
- errno = ETERM;
- return -1;
- }
-
- // Check for the events.
- int pollfd_pos = 0;
- for (int i = 0; i != nitems_; i++) {
-
- // If the poll item is a raw file descriptor, simply convert
- // the events to zmq_pollitem_t-style format.
- if (!items_ [i].socket) {
- items_ [i].revents = 0;
- if (pollfds [pollfd_pos].revents & POLLIN)
- items_ [i].revents |= ZMQ_POLLIN;
- if (pollfds [pollfd_pos].revents & POLLOUT)
- items_ [i].revents |= ZMQ_POLLOUT;
- if (pollfds [pollfd_pos].revents & ~(POLLIN | POLLOUT))
- items_ [i].revents |= ZMQ_POLLERR;
-
- if (items_ [i].revents)
- nevents++;
- pollfd_pos++;
+ int rc = poll (pollfds, nitems_, timeout);
+ if (rc == -1 && errno == EINTR) {
+ if (timeout_ < 0)
continue;
+ // Interrupted, no way to determine how much time is remaining
+ // from timeout so just return for now.
+ else {
+ free (pollfds);
+ return 0;
}
-
- // The poll item is a 0MQ socket.
- zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
- items_ [i].revents = 0;
- if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ())
- items_ [i].revents |= ZMQ_POLLOUT;
- if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
- items_ [i].revents |= ZMQ_POLLIN;
- if (items_ [i].revents)
- nevents++;
}
+ errno_assert (rc >= 0);
+ break;
+ }
- // If there's at least one event, or if we are asked not to block,
- // return immediately.
- if (nevents || !timeout_)
- break;
+ // Check for the events.
+ for (int i = 0; i != nitems_; i++) {
- // Wait for events. Ignore interrupts if there's infinite timeout.
- while (true) {
- rc = poll (pollfds, npollfds, timeout);
- if (rc == -1 && errno == EINTR) {
- if (timeout_ < 0)
- continue;
- else {
- rc = 0;
- break;
+ items_ [i].revents = 0;
+
+ // The poll item is a 0MQ socket. Retrieve pending events
+ // using the ZMQ_EVENTS socket option.
+ if (items_ [i].socket) {
+ if (pollfds [i].revents & POLLIN) {
+ size_t zmq_events_size = sizeof (uint32_t);
+ if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS,
+ &items_ [i].revents, &zmq_events_size) == -1) {
+ free (pollfds);
+ return -1;
}
}
- errno_assert (rc >= 0);
- break;
}
-
- // If timeout was hit with no events signaled, return zero.
- if (rc == 0)
- break;
+ // Else, the poll item is a raw file descriptor, simply convert
+ // the events to zmq_pollitem_t-style format.
+ else {
+ if (pollfds [i].revents & POLLIN)
+ items_ [i].revents |= ZMQ_POLLIN;
+ if (pollfds [i].revents & POLLOUT)
+ items_ [i].revents |= ZMQ_POLLOUT;
+ if (pollfds [i].revents & ~(POLLIN | POLLOUT))
+ items_ [i].revents |= ZMQ_POLLERR;
+ }
- // If timeout was already applied, we don't want to poll anymore.
- // Setting timeout to zero will cause termination of the function
- // once the events we've got are processed.
- if (timeout > 0)
- timeout = 0;
+ if (items_ [i].revents)
+ nevents++;
}
free (pollfds);
@@ -677,9 +619,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
errno = ENOTSUP;
return -1;
#endif
-*/
-zmq_assert (false);
-return -1;
}
int zmq_errno ()