summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-10-01 10:56:17 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-10-01 10:56:17 +0200
commitcc631c4c6649b0d67114db13386a949426e35dbf (patch)
treefa558bc758a12d924dd81b3cd3cd27ebd7418aae
parentf2ff2c6e5c4e244dea28e1ac6ec3f886b7ebc356 (diff)
ZMQII-18: Implement I/O multiplexing (first approximation)
-rw-r--r--bindings/c/zmq.h33
-rw-r--r--bindings/cpp/zmq.hpp5
-rw-r--r--src/fd_signaler.hpp2
-rw-r--r--src/i_signaler.hpp6
-rw-r--r--src/p2p.cpp11
-rw-r--r--src/p2p.hpp2
-rw-r--r--src/pub.cpp11
-rw-r--r--src/pub.hpp2
-rw-r--r--src/rep.cpp17
-rw-r--r--src/rep.hpp2
-rw-r--r--src/req.cpp13
-rw-r--r--src/req.hpp2
-rw-r--r--src/socket_base.cpp15
-rw-r--r--src/socket_base.hpp12
-rw-r--r--src/sub.cpp13
-rw-r--r--src/sub.hpp2
-rw-r--r--src/ypollset.cpp6
-rw-r--r--src/ypollset.hpp1
-rw-r--r--src/zmq.cpp117
19 files changed, 269 insertions, 3 deletions
diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h
index e939dcd..3eeb3f8 100644
--- a/bindings/c/zmq.h
+++ b/bindings/c/zmq.h
@@ -354,6 +354,39 @@ ZMQ_EXPORT int zmq_flush (void *s);
ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags);
////////////////////////////////////////////////////////////////////////////////
+// I/O multiplexing.
+////////////////////////////////////////////////////////////////////////////////
+
+#define ZMQ_POLLIN 1
+#define ZMQ_POLLOUT 2
+
+// 'socket' is a 0MQ socket we want to poll on. If set to NULL, native file
+// descriptor (socket) 'fd' will be used instead. 'events' defines event we
+// are going to poll on - combination of ZMQ_POLLIN and ZMQ_POLLOUT. Error
+// event does not exist for portability reasons. Errors from native sockets
+// are reported as ZMQ_POLLIN. It's client's responsibilty to identify the
+// error afterwards. 'revents' field is filled in after function returns. It's
+// a combination of ZMQ_POLLIN and/or ZMQ_POLLOUT depending on the state of the
+// socket.
+typedef struct
+{
+ void *socket;
+ int fd;
+ short events;
+ short revents;
+} zmq_pollitem_t;
+
+// Polls for the items specified by 'items'. Number of items in the array is
+// determined by 'nitems' argument. Returns number of items signaled, -1
+// in the case of error.
+//
+// Errors: EFAULT - there's a 0MQ socket in the pollset belonging to
+// a different thread.
+// ENOTSUP - 0MQ context was initialised without ZMQ_POLL flag.
+// I/O multiplexing is disabled.
+ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems);
+
+////////////////////////////////////////////////////////////////////////////////
// Helper functions.
////////////////////////////////////////////////////////////////////////////////
diff --git a/bindings/cpp/zmq.hpp b/bindings/cpp/zmq.hpp
index 5c0ba7c..8a00230 100644
--- a/bindings/cpp/zmq.hpp
+++ b/bindings/cpp/zmq.hpp
@@ -200,6 +200,11 @@ namespace zmq
throw error_t ();
}
+ inline operator void* ()
+ {
+ return ptr;
+ }
+
inline void setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp
index e1b56ad..513a1e4 100644
--- a/src/fd_signaler.hpp
+++ b/src/fd_signaler.hpp
@@ -44,8 +44,6 @@ namespace zmq
void signal (int signal_);
uint64_t poll ();
uint64_t check ();
-
- // Get the file descriptor associated with the object.
fd_t get_fd ();
private:
diff --git a/src/i_signaler.hpp b/src/i_signaler.hpp
index a09fe7e..ad04bb5 100644
--- a/src/i_signaler.hpp
+++ b/src/i_signaler.hpp
@@ -21,6 +21,7 @@
#define __ZMQ_I_SIGNALER_HPP_INCLUDED__
#include "stdint.hpp"
+#include "fd.hpp"
namespace zmq
{
@@ -42,6 +43,11 @@ namespace zmq
// Same as poll, however, if there is no signal available,
// function returns zero immediately instead of waiting for a signal.
virtual uint64_t check () = 0;
+
+ // Returns file descriptor that allows waiting for signals. Specific
+ // signalers may not support this functionality. If so, the function
+ // returns retired_fd.
+ virtual fd_t get_fd () = 0;
};
}
diff --git a/src/p2p.cpp b/src/p2p.cpp
index c43b7b4..445ba5b 100644
--- a/src/p2p.cpp
+++ b/src/p2p.cpp
@@ -84,4 +84,15 @@ int zmq::p2p_t::xrecv (zmq_msg_t *msg_, int flags_)
return 0;
}
+bool zmq::p2p_t::xhas_in ()
+{
+ zmq_assert (false);
+ return false;
+}
+
+bool zmq::p2p_t::xhas_out ()
+{
+ zmq_assert (false);
+ return false;
+}
diff --git a/src/p2p.hpp b/src/p2p.hpp
index 1c98dd5..1fd7e34 100644
--- a/src/p2p.hpp
+++ b/src/p2p.hpp
@@ -42,6 +42,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
private:
diff --git a/src/pub.cpp b/src/pub.cpp
index 1e66a18..63b235e 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -156,3 +156,14 @@ int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_)
return -1;
}
+bool zmq::pub_t::xhas_in ()
+{
+ return false;
+}
+
+bool zmq::pub_t::xhas_out ()
+{
+ // TODO: Reimplement when queue limits are added.
+ return true;
+}
+
diff --git a/src/pub.hpp b/src/pub.hpp
index 07eb5a1..b3e868d 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -43,6 +43,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
private:
diff --git a/src/rep.cpp b/src/rep.cpp
index 137c735..e8a9e39 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -195,4 +195,21 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
return -1;
}
+bool zmq::rep_t::xhas_in ()
+{
+ for (int count = active; count != 0; count--) {
+ if (in_pipes [current]->check_read ())
+ return !waiting_for_reply;
+ current++;
+ if (current >= active)
+ current = 0;
+ }
+
+ return false;
+}
+
+bool zmq::rep_t::xhas_out ()
+{
+ return waiting_for_reply;
+}
diff --git a/src/rep.hpp b/src/rep.hpp
index 4781213..3e87dc1 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -43,6 +43,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
private:
diff --git a/src/req.cpp b/src/req.cpp
index 63409ce..93a46e8 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -195,4 +195,17 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
return 0;
}
+bool zmq::req_t::xhas_in ()
+{
+ if (reply_pipe->check_read ())
+ return waiting_for_reply;
+
+ return false;
+}
+
+bool zmq::req_t::xhas_out ()
+{
+ return !waiting_for_reply;
+}
+
diff --git a/src/req.hpp b/src/req.hpp
index 3ae78fd..86554b5 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -43,6 +43,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
private:
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 6763167..6583608 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -364,6 +364,21 @@ int zmq::socket_base_t::close ()
return 0;
}
+zmq::app_thread_t *zmq::socket_base_t::get_thread ()
+{
+ return app_thread;
+}
+
+bool zmq::socket_base_t::has_in ()
+{
+ return xhas_in ();
+}
+
+bool zmq::socket_base_t::has_out ()
+{
+ return xhas_out ();
+}
+
bool zmq::socket_base_t::register_session (const char *name_,
session_t *session_)
{
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index c54efae..49ff5a5 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -54,6 +54,16 @@ namespace zmq
int recv (zmq_msg_t *msg_, int flags_);
int close ();
+ // This function is used by the polling mechanism to determine
+ // whether the socket belongs to the application thread the poll
+ // is called from.
+ class app_thread_t *get_thread ();
+
+ // These functions are used by the polling mechanism to determine
+ // which events are to be reported from this socket.
+ bool has_in ();
+ bool has_out ();
+
// The list of sessions cannot be accessed via inter-thread
// commands as it is unacceptable to wait for the completion of the
// action till user application yields control of the application
@@ -88,6 +98,8 @@ namespace zmq
virtual int xsend (zmq_msg_t *msg_, int options_) = 0;
virtual int xflush () = 0;
virtual int xrecv (zmq_msg_t *msg_, int options_) = 0;
+ virtual bool xhas_in () = 0;
+ virtual bool xhas_out () = 0;
// Socket options.
options_t options;
diff --git a/src/sub.cpp b/src/sub.cpp
index 1bfbcd5..a7f9783 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -197,3 +197,16 @@ int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_)
errno = EAGAIN;
return -1;
}
+
+bool zmq::sub_t::xhas_in ()
+{
+ // TODO: This is more complex as we have to ignore all the messages that
+ // don't fit the filter.
+ zmq_assert (false);
+ return false;
+}
+
+bool zmq::sub_t::xhas_out ()
+{
+ return false;
+}
diff --git a/src/sub.hpp b/src/sub.hpp
index 8691928..fb881dc 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -48,6 +48,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
private:
diff --git a/src/ypollset.cpp b/src/ypollset.cpp
index a72eac7..4e73361 100644
--- a/src/ypollset.cpp
+++ b/src/ypollset.cpp
@@ -29,6 +29,7 @@ zmq::ypollset_t::~ypollset_t ()
void zmq::ypollset_t::signal (int signal_)
{
+printf ("++signal\n");
zmq_assert (signal_ >= 0 && signal_ < wait_signal);
if (bits.btsr (signal_, wait_signal))
sem.post ();
@@ -58,3 +59,8 @@ uint64_t zmq::ypollset_t::check ()
{
return (uint64_t) bits.xchg (0);
}
+
+zmq::fd_t zmq::ypollset_t::get_fd ()
+{
+ return retired_fd;
+}
diff --git a/src/ypollset.hpp b/src/ypollset.hpp
index 25eb3e0..810a638 100644
--- a/src/ypollset.hpp
+++ b/src/ypollset.hpp
@@ -42,6 +42,7 @@ namespace zmq
void signal (int signal_);
uint64_t poll ();
uint64_t check ();
+ fd_t get_fd ();
private:
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 2dfdd48..21ac612 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -25,11 +25,16 @@
#include <new>
#include "socket_base.hpp"
-#include "err.hpp"
+#include "app_thread.hpp"
#include "dispatcher.hpp"
#include "msg_content.hpp"
#include "platform.hpp"
#include "stdint.hpp"
+#include "err.hpp"
+
+#if defined ZMQ_HAVE_LINUX
+#include <poll.h>
+#endif
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
@@ -246,6 +251,116 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
}
+int zmq_poll (zmq_pollitem_t *items_, int nitems_)
+{
+ // TODO: Replace the polling mechanism by the virtualised framework
+ // used in 0MQ I/O threads. That'll make the thing work on all platforms.
+#if !defined ZMQ_HAVE_LINUX
+ errno = ENOTSUP;
+ return -1;
+#else
+
+ pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
+ zmq_assert (pollfds);
+ int npollfds = 0;
+ int nsockets = 0;
+
+ zmq::app_thread_t *app_thread = NULL;
+
+ for (int i = 0; i != nitems_; i++) {
+
+ // 0MQ sockets.
+ if (items_ [i].socket) {
+
+ // Get the app_thread the socket is living in. If there are two
+ // sockets in the same pollset with different app threads, fail.
+ zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
+ if (app_thread) {
+ if (app_thread != s->get_thread ()) {
+ free (pollfds);
+ errno = EFAULT;
+ return -1;
+ }
+ }
+ else
+ app_thread = s->get_thread ();
+
+ nsockets++;
+ continue;
+ }
+
+ // Raw file descriptors.
+ pollfds [npollfds].fd = items_ [i].fd;
+ pollfds [npollfds].events =
+ (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
+ (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
+ npollfds++;
+ }
+
+ // If there's at least one 0MQ socket in the pollset we have to poll
+ // for 0MQ commands. If ZMQ_POLL was not set, fail.
+ if (nsockets) {
+ pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd ();
+ if (pollfds [npollfds].fd == zmq::retired_fd) {
+ free (pollfds);
+ errno = ENOTSUP;
+ return -1;
+ }
+ pollfds [npollfds].events = POLLIN;
+ npollfds++;
+ }
+
+ int nevents = 0;
+ bool initial = true;
+ while (!nevents) {
+
+ // Wait for activity. In the first iteration just check for events,
+ // don't wait. Waiting would prevent exiting on any events that may
+ // already be signaled on 0MQ sockets.
+ int rc = poll (pollfds, npollfds, initial ? 0 : -1);
+ if (rc == -1 && errno == EINTR)
+ continue;
+ errno_assert (rc >= 0);
+ initial = false;
+
+ // Process 0MQ commands if needed.
+ if (nsockets && pollfds [npollfds -1].revents & POLLIN)
+ app_thread->process_commands (false, false);
+
+ // Check for the events.
+ int pollfd_pos = 0;
+ for (int i = 0; i != nitems_; i++) {
+
+ // If the poll item is a raw file descriptor, simply convert
+ // the events to zmq_pollitem_t-style format.
+ if (!items_ [i].socket) {
+ items_ [i].revents =
+ (pollfds [pollfd_pos].revents & POLLIN ? ZMQ_POLLIN : 0) |
+ (pollfds [pollfd_pos].revents & POLLOUT ? ZMQ_POLLOUT : 0);
+ if (items_ [i].revents)
+ nevents++;
+ pollfd_pos++;
+ continue;
+ }
+
+ // The poll item is a 0MQ socket.
+ zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
+ items_ [i].revents = 0;
+ if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ())
+ items_ [i].revents |= ZMQ_POLLOUT;
+ if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
+ items_ [i].revents |= ZMQ_POLLIN;
+ if (items_ [i].revents)
+ nevents++;
+ }
+ }
+
+ free (pollfds);
+ return nevents;
+
+#endif
+}
+
#if defined ZMQ_HAVE_WINDOWS
static uint64_t now ()