summaryrefslogtreecommitdiff
path: root/src/zmq.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zmq.cpp')
-rw-r--r--src/zmq.cpp368
1 files changed, 152 insertions, 216 deletions
diff --git a/src/zmq.cpp b/src/zmq.cpp
index f3ccaac..736f764 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -29,7 +29,6 @@
#include "queue.hpp"
#include "streamer.hpp"
#include "socket_base.hpp"
-#include "app_thread.hpp"
#include "msg_content.hpp"
#include "platform.hpp"
#include "stdint.hpp"
@@ -83,8 +82,6 @@ const char *zmq_strerror (int errnum_)
case EINPROGRESS:
return "Operation in progress";
#endif
- case EMTHREAD:
- return "Number of preallocated application threads exceeded";
case EFSM:
return "Operation cannot be accomplished in current state";
case ENOCOMPATPROTO:
@@ -277,7 +274,7 @@ int zmq_term (void *ctx_)
return -1;
}
- int rc = ((zmq::ctx_t*) ctx_)->term ();
+ int rc = ((zmq::ctx_t*) ctx_)->terminate ();
int en = errno;
#if defined ZMQ_HAVE_OPENPGM
@@ -378,140 +375,106 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
errno = EFAULT;
return -1;
}
+
pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
zmq_assert (pollfds);
- int npollfds = 0;
- int nsockets = 0;
-
- zmq::app_thread_t *app_thread = NULL;
+ // Build pollset for poll () system call.
for (int i = 0; i != nitems_; i++) {
- // 0MQ sockets.
+ // If the poll item is a 0MQ socket, we poll on the file descriptor
+ // retrieved by the ZMQ_FD socket option.
if (items_ [i].socket) {
-
- // Get the app_thread the socket is living in. If there are two
- // sockets in the same pollset with different app threads, fail.
- zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
- if (app_thread) {
- if (app_thread != s->get_thread ()) {
- free (pollfds);
- errno = EFAULT;
- return -1;
- }
+ size_t zmq_fd_size = sizeof (zmq::fd_t);
+ if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
+ &zmq_fd_size) == -1) {
+ free (pollfds);
+ return -1;
}
- else
- app_thread = s->get_thread ();
-
- nsockets++;
- continue;
+ pollfds [i].events = items_ [i].events ? POLLIN : 0;
}
-
- // Raw file descriptors.
- pollfds [npollfds].fd = items_ [i].fd;
- pollfds [npollfds].events =
- (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
- (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
- npollfds++;
- }
-
- // If there's at least one 0MQ socket in the pollset we have to poll
- // for 0MQ commands. If ZMQ_POLL was not set, fail.
- if (nsockets) {
- pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd ();
- if (pollfds [npollfds].fd == zmq::retired_fd) {
- free (pollfds);
- errno = ENOTSUP;
- return -1;
+ // Else, the poll item is a raw file descriptor. Just convert the
+ // events to normal POLLIN/POLLOUT for poll ().
+ else {
+ pollfds [i].fd = items_ [i].fd;
+ pollfds [i].events =
+ (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
+ (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
}
- pollfds [npollfds].events = POLLIN;
- npollfds++;
}
- // First iteration just check for events, don't block. Waiting would
- // prevent exiting on any events that may already been signaled on
- // 0MQ sockets.
- int rc = poll (pollfds, npollfds, 0);
- if (rc == -1 && errno == EINTR && timeout_ >= 0) {
- free (pollfds);
- return 0;
- }
- errno_assert (rc >= 0 || (rc == -1 && errno == EINTR));
-
+ bool first_pass = true;
int timeout = timeout_ > 0 ? timeout_ / 1000 : -1;
int nevents = 0;
while (true) {
- // Process 0MQ commands if needed.
- if (nsockets && pollfds [npollfds -1].revents & POLLIN)
- if (!app_thread->process_commands (false, false)) {
- free (pollfds);
- errno = ETERM;
- return -1;
+ // Wait for events. Ignore interrupts if there's infinite timeout.
+ while (true) {
+ int rc = poll (pollfds, nitems_, first_pass ? 0 : timeout);
+ if (rc == -1 && errno == EINTR) {
+ if (timeout_ < 0)
+ continue;
+ else {
+ // TODO: Calculate remaining timeout and restart poll ().
+ free (pollfds);
+ return 0;
+ }
}
+ errno_assert (rc >= 0);
+ break;
+ }
// Check for the events.
- int pollfd_pos = 0;
for (int i = 0; i != nitems_; i++) {
- // If the poll item is a raw file descriptor, simply convert
+ items_ [i].revents = 0;
+
+ // The poll item is a 0MQ socket. Retrieve pending events
+ // using the ZMQ_EVENTS socket option.
+ if (items_ [i].socket) {
+ size_t zmq_events_size = sizeof (uint32_t);
+ uint32_t zmq_events;
+ if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
+ &zmq_events_size) == -1) {
+ free (pollfds);
+ return -1;
+ }
+ if ((items_ [i].events & ZMQ_POLLOUT) &&
+ (zmq_events & ZMQ_POLLOUT))
+ items_ [i].revents |= ZMQ_POLLOUT;
+ if ((items_ [i].events & ZMQ_POLLIN) &&
+ (zmq_events & ZMQ_POLLIN))
+ items_ [i].revents |= ZMQ_POLLIN;
+ }
+ // Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
- if (!items_ [i].socket) {
- items_ [i].revents = 0;
- if (pollfds [pollfd_pos].revents & POLLIN)
+ else {
+ if (pollfds [i].revents & POLLIN)
items_ [i].revents |= ZMQ_POLLIN;
- if (pollfds [pollfd_pos].revents & POLLOUT)
+ if (pollfds [i].revents & POLLOUT)
items_ [i].revents |= ZMQ_POLLOUT;
- if (pollfds [pollfd_pos].revents & ~(POLLIN | POLLOUT))
+ if (pollfds [i].revents & ~(POLLIN | POLLOUT))
items_ [i].revents |= ZMQ_POLLERR;
-
- if (items_ [i].revents)
- nevents++;
- pollfd_pos++;
- continue;
}
- // The poll item is a 0MQ socket.
- zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
- items_ [i].revents = 0;
- if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ())
- items_ [i].revents |= ZMQ_POLLOUT;
- if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
- items_ [i].revents |= ZMQ_POLLIN;
if (items_ [i].revents)
nevents++;
}
- // If there's at least one event, or if we are asked not to block,
- // return immediately.
- if (nevents || !timeout_)
- break;
-
- // Wait for events. Ignore interrupts if there's infinite timeout.
- while (true) {
- rc = poll (pollfds, npollfds, timeout);
- if (rc == -1 && errno == EINTR) {
- if (timeout_ < 0)
- continue;
- else {
- rc = 0;
- break;
- }
- }
- errno_assert (rc >= 0);
- break;
+ // If there are no events from the first pass (the one with no
+ // timout), do at least the second pass so that we wait.
+ if (first_pass && nevents == 0 && timeout_ != 0) {
+ first_pass = false;
+ continue;
}
-
- // If timeout was hit with no events signaled, return zero.
- if (rc == 0)
- break;
- // If timeout was already applied, we don't want to poll anymore.
- // Setting timeout to zero will cause termination of the function
- // once the events we've got are processed.
- if (timeout > 0)
- timeout = 0;
+ // If timeout is set to infinite and we have to events to return
+ // we can restart the polling.
+ if (timeout == -1 && nevents == 0)
+ continue;
+
+ break;
}
free (pollfds);
@@ -526,161 +489,134 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
fd_set pollset_err;
FD_ZERO (&pollset_err);
- zmq::app_thread_t *app_thread = NULL;
- int nsockets = 0;
- zmq::fd_t maxfd = zmq::retired_fd;
- zmq::fd_t notify_fd = zmq::retired_fd;
+ zmq::fd_t maxfd = 0;
// Ensure we do not attempt to select () on more than FD_SETSIZE
// file descriptors.
zmq_assert (nitems_ <= FD_SETSIZE);
+ // Build the fd_sets for passing to select ().
for (int i = 0; i != nitems_; i++) {
- // 0MQ sockets.
+ // If the poll item is a 0MQ socket we are interested in input on the
+ // notification file descriptor retrieved by the ZMQ_FD socket option.
if (items_ [i].socket) {
-
- // Get the app_thread the socket is living in. If there are two
- // sockets in the same pollset with different app threads, fail.
- zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
- if (app_thread) {
- if (app_thread != s->get_thread ()) {
- errno = EFAULT;
- return -1;
- }
+ size_t zmq_fd_size = sizeof (zmq::fd_t);
+ zmq::fd_t notify_fd;
+ if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
+ &zmq_fd_size) == -1)
+ return -1;
+ if (items_ [i].events) {
+ FD_SET (notify_fd, &pollset_in);
+ if (maxfd < notify_fd)
+ maxfd = notify_fd;
}
- else
- app_thread = s->get_thread ();
-
- nsockets++;
- continue;
}
-
- // Raw file descriptors.
- if (items_ [i].events & ZMQ_POLLIN)
- FD_SET (items_ [i].fd, &pollset_in);
- if (items_ [i].events & ZMQ_POLLOUT)
- FD_SET (items_ [i].fd, &pollset_out);
- if (items_ [i].events & ZMQ_POLLERR)
- FD_SET (items_ [i].fd, &pollset_err);
- if (maxfd == zmq::retired_fd || maxfd < items_ [i].fd)
- maxfd = items_ [i].fd;
- }
-
- // If there's at least one 0MQ socket in the pollset we have to poll
- // for 0MQ commands. If ZMQ_POLL was not set, fail.
- if (nsockets) {
- notify_fd = app_thread->get_signaler ()->get_fd ();
- if (notify_fd == zmq::retired_fd) {
- errno = ENOTSUP;
- return -1;
+ // Else, the poll item is a raw file descriptor. Convert the poll item
+ // events to the appropriate fd_sets.
+ else {
+ if (items_ [i].events & ZMQ_POLLIN)
+ FD_SET (items_ [i].fd, &pollset_in);
+ if (items_ [i].events & ZMQ_POLLOUT)
+ FD_SET (items_ [i].fd, &pollset_out);
+ if (items_ [i].events & ZMQ_POLLERR)
+ FD_SET (items_ [i].fd, &pollset_err);
+ if (maxfd < items_ [i].fd)
+ maxfd = items_ [i].fd;
}
- FD_SET (notify_fd, &pollset_in);
- if (maxfd == zmq::retired_fd || maxfd < notify_fd)
- maxfd = notify_fd;
}
- bool block = (timeout_ < 0);
- timeval timeout = {timeout_ / 1000000, timeout_ % 1000000};
+ bool first_pass = true;
timeval zero_timeout = {0, 0};
+ timeval timeout = {timeout_ / 1000000, timeout_ % 1000000};
int nevents = 0;
-
- // First iteration just check for events, don't block. Waiting would
- // prevent exiting on any events that may already been signaled on
- // 0MQ sockets.
fd_set inset, outset, errset;
- memcpy (&inset, &pollset_in, sizeof (fd_set));
- memcpy (&outset, &pollset_out, sizeof (fd_set));
- memcpy (&errset, &pollset_err, sizeof (fd_set));
- int rc = select (maxfd, &inset, &outset, &errset, &zero_timeout);
-#if defined ZMQ_HAVE_WINDOWS
- wsa_assert (rc != SOCKET_ERROR);
-#else
- if (rc == -1 && errno == EINTR && timeout_ >= 0)
- return 0;
- errno_assert (rc >= 0 || (rc == -1 && errno == EINTR));
-#endif
while (true) {
- // Process 0MQ commands if needed.
- if (nsockets && FD_ISSET (notify_fd, &inset))
- if (!app_thread->process_commands (false, false)) {
- errno = ETERM;
- return -1;
+ // Wait for events. Ignore interrupts if there's infinite timeout.
+ while (true) {
+ memcpy (&inset, &pollset_in, sizeof (fd_set));
+ memcpy (&outset, &pollset_out, sizeof (fd_set));
+ memcpy (&errset, &pollset_err, sizeof (fd_set));
+ int rc = select (maxfd, &inset, &outset, &errset,
+ first_pass ? &zero_timeout : (timeout_ < 0 ? NULL : &timeout));
+#if defined ZMQ_HAVE_WINDOWS
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ if (rc == -1 && errno == EINTR) {
+ if (timeout_ < 0)
+ continue;
+ else
+ // TODO: Calculate remaining timeout and restart select ().
+ return 0;
}
+ errno_assert (rc >= 0);
+#endif
+ break;
+ }
// Check for the events.
for (int i = 0; i != nitems_; i++) {
- // If the poll item is a raw file descriptor, simply convert
+ items_ [i].revents = 0;
+
+ // The poll item is a 0MQ socket. Retrieve pending events
+ // using the ZMQ_EVENTS socket option.
+ if (items_ [i].socket) {
+ size_t zmq_fd_size = sizeof (zmq::fd_t);
+ zmq::fd_t notify_fd;
+ if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
+ &zmq_fd_size) == -1)
+ return -1;
+ if (FD_ISSET (notify_fd, &inset)) {
+ size_t zmq_events_size = sizeof (uint32_t);
+ uint32_t zmq_events;
+ if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
+ &zmq_events_size) == -1)
+ return -1;
+ if ((items_ [i].events & ZMQ_POLLOUT) &&
+ (zmq_events & ZMQ_POLLOUT))
+ items_ [i].revents |= ZMQ_POLLOUT;
+ if ((items_ [i].events & ZMQ_POLLIN) &&
+ (zmq_events & ZMQ_POLLIN))
+ items_ [i].revents |= ZMQ_POLLIN;
+ }
+ }
+ // Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
- if (!items_ [i].socket) {
- items_ [i].revents = 0;
+ else {
if (FD_ISSET (items_ [i].fd, &inset))
items_ [i].revents |= ZMQ_POLLIN;
if (FD_ISSET (items_ [i].fd, &outset))
items_ [i].revents |= ZMQ_POLLOUT;
if (FD_ISSET (items_ [i].fd, &errset))
items_ [i].revents |= ZMQ_POLLERR;
- if (items_ [i].revents)
- nevents++;
- continue;
}
- // The poll item is a 0MQ socket.
- zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
- items_ [i].revents = 0;
- if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ())
- items_ [i].revents |= ZMQ_POLLOUT;
- if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
- items_ [i].revents |= ZMQ_POLLIN;
if (items_ [i].revents)
nevents++;
}
- // If there's at least one event, or if we are asked not to block,
- // return immediately.
- if (nevents || (timeout.tv_sec == 0 && timeout.tv_usec == 0))
- break;
-
- // Wait for events. Ignore interrupts if there's infinite timeout.
- while (true) {
- memcpy (&inset, &pollset_in, sizeof (fd_set));
- memcpy (&outset, &pollset_out, sizeof (fd_set));
- memcpy (&errset, &pollset_err, sizeof (fd_set));
- rc = select (maxfd, &inset, &outset, &errset,
- block ? NULL : &timeout);
-#if defined ZMQ_HAVE_WINDOWS
- wsa_assert (rc != SOCKET_ERROR);
-#else
- if (rc == -1 && errno == EINTR) {
- if (timeout_ < 0)
- continue;
- else {
- rc = 0;
- break;
- }
- }
- errno_assert (rc >= 0);
-#endif
- break;
+ // If there are no events from the first pass (the one with no
+ // timout), do at least the second pass so that we wait.
+ if (first_pass && nevents == 0 && timeout_ != 0) {
+ first_pass = false;
+ continue;
}
-
- // If timeout was hit with no events signaled, return zero.
- if (rc == 0)
- break;
- // If timeout was already applied, we don't want to poll anymore.
- // Setting timeout to zero will cause termination of the function
- // once the events we've got are processed.
- if (!block)
- timeout = zero_timeout;
+ // If timeout is set to infinite and we have to events to return
+ // we can restart the polling.
+ if (timeout_ < 0 && nevents == 0)
+ continue;
+
+ break;
}
return nevents;
#else
+ // Exotic platforms that support neither poll() nor select().
errno = ENOTSUP;
return -1;
#endif