summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bindings/c/zmq.h49
-rw-r--r--bindings/cpp/zmq.hpp5
-rw-r--r--src/Makefile.am2
-rw-r--r--src/devpoll.cpp41
-rw-r--r--src/devpoll.hpp13
-rw-r--r--src/epoll.cpp16
-rw-r--r--src/epoll.hpp14
-rw-r--r--src/err.cpp43
-rw-r--r--src/err.hpp1
-rw-r--r--src/fd_signaler.hpp2
-rw-r--r--src/i_poller.hpp84
-rw-r--r--src/i_signaler.hpp6
-rw-r--r--src/io_object.cpp2
-rw-r--r--src/io_object.hpp6
-rw-r--r--src/io_thread.cpp41
-rw-r--r--src/io_thread.hpp8
-rw-r--r--src/kqueue.cpp17
-rw-r--r--src/kqueue.hpp13
-rw-r--r--src/p2p.cpp11
-rw-r--r--src/p2p.hpp2
-rw-r--r--src/pipe.cpp11
-rw-r--r--src/pipe.hpp3
-rw-r--r--src/poll.cpp18
-rw-r--r--src/poll.hpp13
-rw-r--r--src/poller.hpp68
-rw-r--r--src/pub.cpp11
-rw-r--r--src/pub.hpp2
-rw-r--r--src/rep.cpp17
-rw-r--r--src/rep.hpp2
-rw-r--r--src/req.cpp13
-rw-r--r--src/req.hpp2
-rw-r--r--src/select.cpp33
-rw-r--r--src/select.hpp13
-rw-r--r--src/socket_base.cpp15
-rw-r--r--src/socket_base.hpp12
-rw-r--r--src/sub.cpp13
-rw-r--r--src/sub.hpp2
-rw-r--r--src/tcp_connecter.cpp10
-rw-r--r--src/tcp_listener.cpp17
-rw-r--r--src/ypipe.hpp24
-rw-r--r--src/ypollset.cpp5
-rw-r--r--src/ypollset.hpp1
-rw-r--r--src/zmq.cpp125
-rw-r--r--src/zmq_engine.cpp6
44 files changed, 555 insertions, 257 deletions
diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h
index f42f7f9..40750ec 100644
--- a/bindings/c/zmq.h
+++ b/bindings/c/zmq.h
@@ -51,6 +51,18 @@ extern "C" {
#ifndef EPROTONOSUPPORT
#define EPROTONOSUPPORT (ZMQ_HAUSNUMERO + 2)
#endif
+#ifndef ENOBUFS
+#define ENOBUFS (ZMQ_HAUSNUMERO + 3)
+#endif
+#ifndef ENETDOWN
+#define ENETDOWN (ZMQ_HAUSNUMERO + 4)
+#endif
+#ifndef EADDRINUSE
+#define EADDRINUSE (ZMQ_HAUSNUMERO + 5)
+#endif
+#ifndef EADDRNOTAVAIL
+#define EADDRNOTAVAIL (ZMQ_HAUSNUMERO + 6)
+#endif
// Native 0MQ error codes.
#define EMTHREAD (ZMQ_HAUSNUMERO + 50)
@@ -344,8 +356,8 @@ ZMQ_EXPORT int zmq_send (void *s, zmq_msg_t *msg, int flags);
// EFSM - function cannot be called at the moment.
ZMQ_EXPORT int zmq_flush (void *s);
-// Send a message from the socket 's'. 'flags' argument can be combination
-// of the flags described above.
+// Receive a message from the socket 's'. 'flags' argument can be combination
+// of the flags described above with the exception of ZMQ_NOFLUSH.
//
// Errors: EAGAIN - message cannot be received at the moment (applies only to
// non-blocking receive).
@@ -354,6 +366,39 @@ ZMQ_EXPORT int zmq_flush (void *s);
ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags);
////////////////////////////////////////////////////////////////////////////////
+// I/O multiplexing.
+////////////////////////////////////////////////////////////////////////////////
+
+#define ZMQ_POLLIN 1
+#define ZMQ_POLLOUT 2
+
+// 'socket' is a 0MQ socket we want to poll on. If set to NULL, native file
+// descriptor (socket) 'fd' will be used instead. 'events' defines event we
+// are going to poll on - combination of ZMQ_POLLIN and ZMQ_POLLOUT. Error
+// event does not exist for portability reasons. Errors from native sockets
+// are reported as ZMQ_POLLIN. It's client's responsibilty to identify the
+// error afterwards. 'revents' field is filled in after function returns. It's
+// a combination of ZMQ_POLLIN and/or ZMQ_POLLOUT depending on the state of the
+// socket.
+typedef struct
+{
+ void *socket;
+ int fd;
+ short events;
+ short revents;
+} zmq_pollitem_t;
+
+// Polls for the items specified by 'items'. Number of items in the array is
+// determined by 'nitems' argument. Returns number of items signaled, -1
+// in the case of error.
+//
+// Errors: EFAULT - there's a 0MQ socket in the pollset belonging to
+// a different thread.
+// ENOTSUP - 0MQ context was initialised without ZMQ_POLL flag.
+// I/O multiplexing is disabled.
+ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems);
+
+////////////////////////////////////////////////////////////////////////////////
// Helper functions.
////////////////////////////////////////////////////////////////////////////////
diff --git a/bindings/cpp/zmq.hpp b/bindings/cpp/zmq.hpp
index 5c0ba7c..8a00230 100644
--- a/bindings/cpp/zmq.hpp
+++ b/bindings/cpp/zmq.hpp
@@ -200,6 +200,11 @@ namespace zmq
throw error_t ();
}
+ inline operator void* ()
+ {
+ return ptr;
+ }
+
inline void setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
diff --git a/src/Makefile.am b/src/Makefile.am
index 27784a0..92016e8 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -85,7 +85,6 @@ libzmq_la_SOURCES = $(pgm_sources) \
ip.hpp \
i_endpoint.hpp \
i_engine.hpp \
- i_poller.hpp \
i_poll_events.hpp \
i_signaler.hpp \
kqueue.hpp \
@@ -100,6 +99,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
pipe.hpp \
platform.hpp \
poll.hpp \
+ poller.hpp \
p2p.hpp \
pub.hpp \
rep.hpp \
diff --git a/src/devpoll.cpp b/src/devpoll.cpp
index 2386034..f28d55e 100644
--- a/src/devpoll.cpp
+++ b/src/devpoll.cpp
@@ -69,7 +69,8 @@ void zmq::devpoll_t::devpoll_ctl (fd_t fd_, short events_)
zmq_assert (rc == sizeof pfd);
}
-zmq::handle_t zmq::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_)
+zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_,
+ i_poll_events *reactor_)
{
assert (!fd_table [fd_].valid);
@@ -84,17 +85,15 @@ zmq::handle_t zmq::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_)
// Increase the load metric of the thread.
load.add (1);
- handle_t handle;
- handle.fd = fd_;
- return handle;
+ return fd_;
}
void zmq::devpoll_t::rm_fd (handle_t handle_)
{
- assert (fd_table [handle_.fd].valid);
+ assert (fd_table [handle_].valid);
- devpoll_ctl (handle_.fd, POLLREMOVE);
- fd_table [handle_.fd].valid = false;
+ devpoll_ctl (handle_, POLLREMOVE);
+ fd_table [handle_].valid = false;
// Decrease the load metric of the thread.
load.sub (1);
@@ -102,34 +101,30 @@ void zmq::devpoll_t::rm_fd (handle_t handle_)
void zmq::devpoll_t::set_pollin (handle_t handle_)
{
- fd_t fd = handle_.fd;
- devpoll_ctl (fd, POLLREMOVE);
- fd_table [fd].events |= POLLIN;
- devpoll_ctl (fd, fd_table [fd].events);
+ devpoll_ctl (handle_, POLLREMOVE);
+ fd_table [handle_].events |= POLLIN;
+ devpoll_ctl (handle_, fd_table [handle_].events);
}
void zmq::devpoll_t::reset_pollin (handle_t handle_)
{
- fd_t fd = handle_.fd;
- devpoll_ctl (fd, POLLREMOVE);
- fd_table [fd].events &= ~((short) POLLIN);
- devpoll_ctl (fd, fd_table [fd].events);
+ devpoll_ctl (handle_, POLLREMOVE);
+ fd_table [handle_].events &= ~((short) POLLIN);
+ devpoll_ctl (handle_, fd_table [handle_].events);
}
void zmq::devpoll_t::set_pollout (handle_t handle_)
{
- fd_t fd = handle_.fd;
- devpoll_ctl (fd, POLLREMOVE);
- fd_table [fd].events |= POLLOUT;
- devpoll_ctl (fd, fd_table [fd].events);
+ devpoll_ctl (handle_, POLLREMOVE);
+ fd_table [handle_].events |= POLLOUT;
+ devpoll_ctl (handle_, fd_table [handle_].events);
}
void zmq::devpoll_t::reset_pollout (handle_t handle_)
{
- fd_t fd = handle_.fd;
- devpoll_ctl (fd, POLLREMOVE);
- fd_table [fd].events &= ~((short) POLLOUT);
- devpoll_ctl (fd, fd_table [fd].events);
+ devpoll_ctl (handle_, POLLREMOVE);
+ fd_table [handle_].events &= ~((short) POLLOUT);
+ devpoll_ctl (handle_, fd_table [handle_].events);
}
void zmq::devpoll_t::add_timer (i_poll_events *events_)
diff --git a/src/devpoll.hpp b/src/devpoll.hpp
index 57f0156..b796ba5 100644
--- a/src/devpoll.hpp
+++ b/src/devpoll.hpp
@@ -26,7 +26,6 @@
#include <vector>
-#include "i_poller.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "atomic_counter.hpp"
@@ -37,22 +36,24 @@ namespace zmq
// Implements socket polling mechanism using the Solaris-specific
// "/dev/poll" interface.
- class devpoll_t : public i_poller
+ class devpoll_t
{
public:
+ typedef fd_t handle_t;
+
devpoll_t ();
~devpoll_t ();
- // i_poller implementation.
- handle_t add_fd (fd_t fd_, i_poll_events *events_);
+ // "poller" concept.
+ handle_t add_fd (fd_t fd_, struct i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
- void add_timer (i_poll_events *events_);
- void cancel_timer (i_poll_events *events_);
+ void add_timer (struct i_poll_events *events_);
+ void cancel_timer (struct i_poll_events *events_);
int get_load ();
void start ();
void stop ();
diff --git a/src/epoll.cpp b/src/epoll.cpp
index 5f3bc51..8f576f8 100644
--- a/src/epoll.cpp
+++ b/src/epoll.cpp
@@ -52,7 +52,7 @@ zmq::epoll_t::~epoll_t ()
delete *it;
}
-zmq::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
+zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
{
poll_entry_t *pe = new poll_entry_t;
zmq_assert (pe != NULL);
@@ -72,14 +72,12 @@ zmq::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
// Increase the load metric of the thread.
load.add (1);
- handle_t handle;
- handle.ptr = pe;
- return handle;
+ return pe;
}
void zmq::epoll_t::rm_fd (handle_t handle_)
{
- poll_entry_t *pe = (poll_entry_t*) handle_.ptr;
+ poll_entry_t *pe = (poll_entry_t*) handle_;
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev);
errno_assert (rc != -1);
pe->fd = retired_fd;
@@ -91,7 +89,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_)
void zmq::epoll_t::set_pollin (handle_t handle_)
{
- poll_entry_t *pe = (poll_entry_t*) handle_.ptr;
+ poll_entry_t *pe = (poll_entry_t*) handle_;
pe->ev.events |= EPOLLIN;
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1);
@@ -99,7 +97,7 @@ void zmq::epoll_t::set_pollin (handle_t handle_)
void zmq::epoll_t::reset_pollin (handle_t handle_)
{
- poll_entry_t *pe = (poll_entry_t*) handle_.ptr;
+ poll_entry_t *pe = (poll_entry_t*) handle_;
pe->ev.events &= ~((short) EPOLLIN);
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1);
@@ -107,7 +105,7 @@ void zmq::epoll_t::reset_pollin (handle_t handle_)
void zmq::epoll_t::set_pollout (handle_t handle_)
{
- poll_entry_t *pe = (poll_entry_t*) handle_.ptr;
+ poll_entry_t *pe = (poll_entry_t*) handle_;
pe->ev.events |= EPOLLOUT;
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1);
@@ -115,7 +113,7 @@ void zmq::epoll_t::set_pollout (handle_t handle_)
void zmq::epoll_t::reset_pollout (handle_t handle_)
{
- poll_entry_t *pe = (poll_entry_t*) handle_.ptr;
+ poll_entry_t *pe = (poll_entry_t*) handle_;
pe->ev.events &= ~((short) EPOLLOUT);
int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);
errno_assert (rc != -1);
diff --git a/src/epoll.hpp b/src/epoll.hpp
index 619d4f3..90824bd 100644
--- a/src/epoll.hpp
+++ b/src/epoll.hpp
@@ -27,8 +27,6 @@
#include <vector>
#include <sys/epoll.h>
-#include "i_poller.hpp"
-//#include "i_poll_events.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "atomic_counter.hpp"
@@ -39,22 +37,24 @@ namespace zmq
// This class implements socket polling mechanism using the Linux-specific
// epoll mechanism.
- class epoll_t : public i_poller
+ class epoll_t
{
public:
+ typedef void* handle_t;
+
epoll_t ();
~epoll_t ();
- // i_poller implementation.
- handle_t add_fd (fd_t fd_, i_poll_events *events_);
+ // "poller" concept.
+ handle_t add_fd (fd_t fd_, struct i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
- void add_timer (i_poll_events *events_);
- void cancel_timer (i_poll_events *events_);
+ void add_timer (struct i_poll_events *events_);
+ void cancel_timer (struct i_poll_events *events_);
int get_load ();
void start ();
void stop ();
diff --git a/src/err.cpp b/src/err.cpp
index ef5b987..36cb2fc 100644
--- a/src/err.cpp
+++ b/src/err.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include "../bindings/c/zmq.h"
+
#include "err.hpp"
#include "platform.hpp"
@@ -24,8 +26,6 @@
const char *zmq::wsa_error()
{
-
-
int errcode = WSAGetLastError ();
// TODO: This is not a generic way to handle this...
if (errcode == WSAEWOULDBLOCK)
@@ -148,4 +148,43 @@ void zmq::win_error (char *buffer_, size_t buffer_size_)
zmq_assert (rc);
}
+void zmq::wsa_error_to_errno ()
+{
+ int errcode = WSAGetLastError ();
+ switch (errcode) {
+ case WSAEINPROGRESS:
+ errno = EAGAIN;
+ return;
+ case WSAEBADF:
+ errno = EBADF;
+ return;
+ case WSAEINVAL:
+ errno = EINVAL;
+ return;
+ case WSAEMFILE:
+ errno = EMFILE;
+ return;
+ case WSAEFAULT:
+ errno = EFAULT;
+ return;
+ case WSAEPROTONOSUPPORT:
+ errno = EPROTONOSUPPORT;
+ return;
+ case WSAENOBUFS:
+ errno = ENOBUFS;
+ return;
+ case WSAENETDOWN:
+ errno = ENETDOWN;
+ return;
+ case WSAEADDRINUSE:
+ errno = EADDRINUSE;
+ return;
+ case WSAEADDRNOTAVAIL:
+ errno = EADDRNOTAVAIL;
+ return;
+ default:
+ wsa_assert (false);
+ }
+}
+
#endif
diff --git a/src/err.hpp b/src/err.hpp
index c1b2916..fb9195e 100644
--- a/src/err.hpp
+++ b/src/err.hpp
@@ -41,6 +41,7 @@ namespace zmq
const char *wsa_error ();
void win_error (char *buffer_, size_t buffer_size_);
+ void wsa_error_to_errno ();
}
diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp
index e1b56ad..513a1e4 100644
--- a/src/fd_signaler.hpp
+++ b/src/fd_signaler.hpp
@@ -44,8 +44,6 @@ namespace zmq
void signal (int signal_);
uint64_t poll ();
uint64_t check ();
-
- // Get the file descriptor associated with the object.
fd_t get_fd ();
private:
diff --git a/src/i_poller.hpp b/src/i_poller.hpp
deleted file mode 100644
index 2665e82..0000000
--- a/src/i_poller.hpp
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- Copyright (c) 2007-2009 FastMQ Inc.
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_I_POLLER_HPP_INCLUDED__
-#define __ZMQ_I_POLLER_HPP_INCLUDED__
-
-#include "fd.hpp"
-
-namespace zmq
-{
-
- union handle_t
- {
- fd_t fd;
- void *ptr;
- };
-
- // Virtual interface to be used when polling on file descriptors.
-
- struct i_poller
- {
- virtual ~i_poller () {};
-
- // Add file descriptor to the polling set. Return handle
- // representing the descriptor. 'events' interface will be used
- // to invoke callback functions when event occurs.
- virtual handle_t add_fd (fd_t fd_, struct i_poll_events *events_) = 0;
-
- // Remove file descriptor identified by handle from the polling set.
- virtual void rm_fd (handle_t handle_) = 0;
-
- // Start polling for input from socket.
- virtual void set_pollin (handle_t handle_) = 0;
-
- // Stop polling for input from socket.
- virtual void reset_pollin (handle_t handle_) = 0;
-
- // Start polling for availability of the socket for writing.
- virtual void set_pollout (handle_t handle_) = 0;
-
- // Stop polling for availability of the socket for writing.
- virtual void reset_pollout (handle_t handle_) = 0;
-
- // Ask to be notified after some time. Actual interval varies between
- // 0 and max_timer_period ms. Timer is destroyed once it expires or,
- // optionally, when cancel_timer is called.
- virtual void add_timer (struct i_poll_events *events_) = 0;
-
- // Cancel the timer set by add_timer method.
- virtual void cancel_timer (struct i_poll_events *events_) = 0;
-
- // Returns load experienced by the I/O thread. Currently it's number
- // of file descriptors handled by the poller, in the future we may
- // use a metric taking actual traffic on the individual sockets into
- // account.
- virtual int get_load () = 0;
-
- // Start the execution of the underlying I/O thread.
- // This method is called from a foreign thread.
- virtual void start () = 0;
-
- // Ask underlying I/O thread to stop.
- virtual void stop () = 0;
- };
-
-}
-
-#endif
diff --git a/src/i_signaler.hpp b/src/i_signaler.hpp
index a09fe7e..ad04bb5 100644
--- a/src/i_signaler.hpp
+++ b/src/i_signaler.hpp
@@ -21,6 +21,7 @@
#define __ZMQ_I_SIGNALER_HPP_INCLUDED__
#include "stdint.hpp"
+#include "fd.hpp"
namespace zmq
{
@@ -42,6 +43,11 @@ namespace zmq
// Same as poll, however, if there is no signal available,
// function returns zero immediately instead of waiting for a signal.
virtual uint64_t check () = 0;
+
+ // Returns file descriptor that allows waiting for signals. Specific
+ // signalers may not support this functionality. If so, the function
+ // returns retired_fd.
+ virtual fd_t get_fd () = 0;
};
}
diff --git a/src/io_object.cpp b/src/io_object.cpp
index f61e5f0..0cf77fd 100644
--- a/src/io_object.cpp
+++ b/src/io_object.cpp
@@ -36,7 +36,7 @@ void zmq::io_object_t::set_io_thread (io_thread_t *io_thread_)
poller = io_thread_->get_poller ();
}
-zmq::handle_t zmq::io_object_t::add_fd (fd_t fd_)
+zmq::io_object_t::handle_t zmq::io_object_t::add_fd (fd_t fd_)
{
return poller->add_fd (fd_, this);
}
diff --git a/src/io_object.hpp b/src/io_object.hpp
index e5582db..2ed5e24 100644
--- a/src/io_object.hpp
+++ b/src/io_object.hpp
@@ -22,7 +22,7 @@
#include <stddef.h>
-#include "i_poller.hpp"
+#include "poller.hpp"
#include "i_poll_events.hpp"
namespace zmq
@@ -41,6 +41,8 @@ namespace zmq
protected:
+ typedef poller_t::handle_t handle_t;
+
// Derived class can init/swap the underlying I/O thread.
// Caution: Remove all the file descriptors from the old I/O thread
// before swapping to the new one!
@@ -63,7 +65,7 @@ namespace zmq
private:
- struct i_poller *poller;
+ poller_t *poller;
io_object_t (const io_object_t&);
void operator = (const io_object_t&);
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index a90876c..6d4710a 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -24,11 +24,6 @@
#include "platform.hpp"
#include "err.hpp"
#include "command.hpp"
-#include "epoll.hpp"
-#include "poll.hpp"
-#include "select.hpp"
-#include "devpoll.hpp"
-#include "kqueue.hpp"
#include "dispatcher.hpp"
#include "simple_semaphore.hpp"
@@ -36,39 +31,7 @@ zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
int flags_) :
object_t (dispatcher_, thread_slot_)
{
-#if defined ZMQ_FORCE_SELECT
- poller = new select_t;
-#elif defined ZMQ_FORCE_POLL
- poller = new poll_t;
-#elif defined ZMQ_FORCE_EPOLL
- poller = new epoll_t;
-#elif defined ZMQ_FORCE_DEVPOLL
- poller = new devpoll_t;
-#elif defined ZMQ_FORCE_KQUEUE
- poller = new kqueue_t;
-#elif defined ZMQ_HAVE_LINUX
- poller = new epoll_t;
-#elif defined ZMQ_HAVE_WINDOWS
- poller = new select_t;
-#elif defined ZMQ_HAVE_FREEBSD
- poller = new kqueue_t;
-#elif defined ZMQ_HAVE_OPENBSD
- poller = new kqueue_t;
-#elif defined ZMQ_HAVE_SOLARIS
- poller = new devpoll_t;
-#elif defined ZMQ_HAVE_OSX
- poller = new kqueue_t;
-#elif defined ZMQ_HAVE_QNXNTO
- poller = new poll_t;
-#elif defined ZMQ_HAVE_AIX
- poller = new poll_t;
-#elif defined ZMQ_HAVE_HPUX
- poller = new devpoll_t;
-#elif defined ZMQ_HAVE_OPENVMS
- poller = new select_t;
-#else
-#error Unsupported platform
-#endif
+ poller = new poller_t;
zmq_assert (poller);
signaler_handle = poller->add_fd (signaler.get_fd (), this);
@@ -134,7 +97,7 @@ void zmq::io_thread_t::timer_event ()
zmq_assert (false);
}
-zmq::i_poller *zmq::io_thread_t::get_poller ()
+zmq::poller_t *zmq::io_thread_t::get_poller ()
{
zmq_assert (poller);
return poller;
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index 4015b0c..457cdbf 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -23,7 +23,7 @@
#include <vector>
#include "object.hpp"
-#include "i_poller.hpp"
+#include "poller.hpp"
#include "i_poll_events.hpp"
#include "fd_signaler.hpp"
@@ -59,7 +59,7 @@ namespace zmq
void timer_event ();
// Used by io_objects to retrieve the assciated poller object.
- struct i_poller *get_poller ();
+ poller_t *get_poller ();
// Command handlers.
void process_stop ();
@@ -74,10 +74,10 @@ namespace zmq
fd_signaler_t signaler;
// Handle associated with signaler's file descriptor.
- handle_t signaler_handle;
+ poller_t::handle_t signaler_handle;
// I/O multiplexing is performed using a poller object.
- i_poller *poller;
+ poller_t *poller;
};
}
diff --git a/src/kqueue.cpp b/src/kqueue.cpp
index c967c73..f32fa36 100644
--- a/src/kqueue.cpp
+++ b/src/kqueue.cpp
@@ -68,7 +68,8 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)
errno_assert (rc != -1);
}
-zmq::handle_t zmq::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_)
+zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_,
+ i_poll_events *reactor_)
{
poll_entry_t *pe = new poll_entry_t;
zmq_assert (pe != NULL);
@@ -78,14 +79,12 @@ zmq::handle_t zmq::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_)
pe->flag_pollout = 0;
pe->reactor = reactor_;
- handle_t handle;
- handle.ptr = pe;
- return handle;
+ return pe;
}
void zmq::kqueue_t::rm_fd (handle_t handle_)
{
- poll_entry_t *pe = (poll_entry_t*) handle_.ptr;
+ poll_entry_t *pe = (poll_entry_t*) handle_;
if (pe->flag_pollin)
kevent_delete (pe->fd, EVFILT_READ);
if (pe->flag_pollout)
@@ -96,28 +95,28 @@ void zmq::kqueue_t::rm_fd (handle_t handle_)
void zmq::kqueue_t::set_pollin (handle_t handle_)
{
- poll_entry_t *pe = (poll_entry_t*) handle_.ptr;
+ poll_entry_t *pe = (poll_entry_t*) handle_;
pe->flag_pollin = true;
kevent_add (pe->fd, EVFILT_READ, pe);
}
void zmq::kqueue_t::reset_pollin (handle_t handle_)
{
- poll_entry_t *pe = (poll_entry_t*) handle_.ptr;
+ poll_entry_t *pe = (poll_entry_t*) handle_;
pe->flag_pollin = false;
kevent_delete (pe->fd, EVFILT_READ);
}
void zmq::kqueue_t::set_pollout (handle_t handle_)
{
- poll_entry_t *pe = (poll_entry_t*) handle_.ptr;
+ poll_entry_t *pe = (poll_entry_t*) handle_;
pe->flag_pollout = true;
kevent_add (pe->fd, EVFILT_WRITE, pe);
}
void zmq::kqueue_t::reset_pollout (handle_t handle_)
{
- poll_entry_t *pe = (poll_entry_t*) handle_.ptr;
+ poll_entry_t *pe = (poll_entry_t*) handle_;
pe->flag_pollout = false;
kevent_delete (pe->fd, EVFILT_WRITE);
}
diff --git a/src/kqueue.hpp b/src/kqueue.hpp
index eeb6f09..bed9108 100644
--- a/src/kqueue.hpp
+++ b/src/kqueue.hpp
@@ -26,7 +26,6 @@
#include <vector>
-#include "i_poller.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "atomic_counter.hpp"
@@ -37,22 +36,24 @@ namespace zmq
// Implements socket polling mechanism using the BSD-specific
// kqueue interface.
- class kqueue_t : public i_poller
+ class kqueue_t
{
public:
+ typedef void* handle_t;
+
kqueue_t ();
~kqueue_t ();
- // i_poller implementation.
- handle_t add_fd (fd_t fd_, i_poll_events *events_);
+ // "poller" concept.
+ handle_t add_fd (fd_t fd_, struct i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
- void add_timer (i_poll_events *events_);
- void cancel_timer (i_poll_events *events_);
+ void add_timer (struct i_poll_events *events_);
+ void cancel_timer (struct i_poll_events *events_);
int get_load ();
void start ();
void stop ();
diff --git a/src/p2p.cpp b/src/p2p.cpp
index c43b7b4..445ba5b 100644
--- a/src/p2p.cpp
+++ b/src/p2p.cpp
@@ -84,4 +84,15 @@ int zmq::p2p_t::xrecv (zmq_msg_t *msg_, int flags_)
return 0;
}
+bool zmq::p2p_t::xhas_in ()
+{
+ zmq_assert (false);
+ return false;
+}
+
+bool zmq::p2p_t::xhas_out ()
+{
+ zmq_assert (false);
+ return false;
+}
diff --git a/src/p2p.hpp b/src/p2p.hpp
index 1c98dd5..1fd7e34 100644
--- a/src/p2p.hpp
+++ b/src/p2p.hpp
@@ -42,6 +42,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
private:
diff --git a/src/pipe.cpp b/src/pipe.cpp
index f8dfcb8..e444520 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -36,6 +36,17 @@ zmq::reader_t::~reader_t ()
{
}
+bool zmq::reader_t::check_read ()
+{
+ // Check if there's an item in the pipe.
+ if (pipe->check_read ())
+ return true;
+
+ // If not, deactivate the pipe.
+ endpoint->kill (this);
+ return false;
+}
+
bool zmq::reader_t::read (zmq_msg_t *msg_)
{
if (!pipe->read (msg_)) {
diff --git a/src/pipe.hpp b/src/pipe.hpp
index 699e3f7..ecbce7d 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -42,6 +42,9 @@ namespace zmq
void set_endpoint (i_endpoint *endpoint_);
+ // Returns true if there is at least one message to read in the pipe.
+ bool check_read ();
+
// Reads a message to the underlying pipe.
bool read (zmq_msg_t *msg_);
diff --git a/src/poll.cpp b/src/poll.cpp
index dd3de43..50a2426 100644
--- a/src/poll.cpp
+++ b/src/poll.cpp
@@ -58,7 +58,7 @@ zmq::poll_t::~poll_t ()
zmq_assert (load.get () == 0);
}
-zmq::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
+zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
{
pollfd pfd = {fd_, 0, 0};
pollset.push_back (pfd);
@@ -70,19 +70,17 @@ zmq::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
// Increase the load metric of the thread.
load.add (1);
- handle_t handle;
- handle.fd = fd_;
- return handle;
+ return fd_;
}
void zmq::poll_t::rm_fd (handle_t handle_)
{
- fd_t index = fd_table [handle_.fd].index;
+ fd_t index = fd_table [handle_].index;
assert (index != retired_fd);
// Mark the fd as unused.
pollset [index].fd = retired_fd;
- fd_table [handle_.fd].index = retired_fd;
+ fd_table [handle_].index = retired_fd;
retired = true;
// Decrease the load metric of the thread.
@@ -91,25 +89,25 @@ void zmq::poll_t::rm_fd (handle_t handle_)
void zmq::poll_t::set_pollin (handle_t handle_)
{
- int index = fd_table [handle_.fd].index;
+ int index = fd_table [handle_].index;
pollset [index].events |= POLLIN;
}
void zmq::poll_t::reset_pollin (handle_t handle_)
{
- int index = fd_table [handle_.fd].index;
+ int index = fd_table [handle_].index;
pollset [index].events &= ~((short) POLLIN);
}
void zmq::poll_t::set_pollout (handle_t handle_)
{
- int index = fd_table [handle_.fd].index;
+ int index = fd_table [handle_].index;
pollset [index].events |= POLLOUT;
}
void zmq::poll_t::reset_pollout (handle_t handle_)
{
- int index = fd_table [handle_.fd].index;
+ int index = fd_table [handle_].index;
pollset [index].events &= ~((short) POLLOUT);
}
diff --git a/src/poll.hpp b/src/poll.hpp
index 9fe6067..05732d0 100644
--- a/src/poll.hpp
+++ b/src/poll.hpp
@@ -31,7 +31,6 @@
#include <stddef.h>
#include <vector>
-#include "i_poller.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "atomic_counter.hpp"
@@ -42,22 +41,24 @@ namespace zmq
// Implements socket polling mechanism using the POSIX.1-2001
// poll() system call.
- class poll_t : public i_poller
+ class poll_t
{
public:
+ typedef fd_t handle_t;
+
poll_t ();
~poll_t ();
- // i_poller implementation.
- handle_t add_fd (fd_t fd_, i_poll_events *events_);
+ // "poller" concept.
+ handle_t add_fd (fd_t fd_, struct i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
- void add_timer (i_poll_events *events_);
- void cancel_timer (i_poll_events *events_);
+ void add_timer (struct i_poll_events *events_);
+ void cancel_timer (struct i_poll_events *events_);
int get_load ();
void start ();
void stop ();
diff --git a/src/poller.hpp b/src/poller.hpp
new file mode 100644
index 0000000..9051514
--- /dev/null
+++ b/src/poller.hpp
@@ -0,0 +1,68 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_POLLER_HPP_INCLUDED__
+#define __ZMQ_POLLER_HPP_INCLUDED__
+
+#include "epoll.hpp"
+#include "poll.hpp"
+#include "select.hpp"
+#include "devpoll.hpp"
+#include "kqueue.hpp"
+
+namespace zmq
+{
+
+#if defined ZMQ_FORCE_SELECT
+ typedef select_t poller_t;
+#elif defined ZMQ_FORCE_POLL
+ typedef poll_t poller_t;
+#elif defined ZMQ_FORCE_EPOLL
+ typedef epoll_t poller_t;
+#elif defined ZMQ_FORCE_DEVPOLL
+ typedef devpoll_t poller_t;
+#elif defined ZMQ_FORCE_KQUEUE
+ typedef kqueue_t poller_t;
+#elif defined ZMQ_HAVE_LINUX
+ typedef epoll_t poller_t;
+#elif defined ZMQ_HAVE_WINDOWS
+ typedef select_t poller_t;
+#elif defined ZMQ_HAVE_FREEBSD
+ typedef kqueue_t poller_t;
+#elif defined ZMQ_HAVE_OPENBSD
+ typedef kqueue_t poller_t;
+#elif defined ZMQ_HAVE_SOLARIS
+ typedef devpoll_t poller_t;
+#elif defined ZMQ_HAVE_OSX
+ typedef kqueue_t poller_t;
+#elif defined ZMQ_HAVE_QNXNTO
+ typedef poll_t poller_t;
+#elif defined ZMQ_HAVE_AIX
+ typedef poll_t poller_t;
+#elif defined ZMQ_HAVE_HPUX
+ typedef devpoll_t poller_t;
+#elif defined ZMQ_HAVE_OPENVMS
+ typedef select_t poller_t;
+#else
+#error Unsupported platform
+#endif
+
+}
+
+#endif
diff --git a/src/pub.cpp b/src/pub.cpp
index 1e66a18..63b235e 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -156,3 +156,14 @@ int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_)
return -1;
}
+bool zmq::pub_t::xhas_in ()
+{
+ return false;
+}
+
+bool zmq::pub_t::xhas_out ()
+{
+ // TODO: Reimplement when queue limits are added.
+ return true;
+}
+
diff --git a/src/pub.hpp b/src/pub.hpp
index 07eb5a1..b3e868d 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -43,6 +43,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
private:
diff --git a/src/rep.cpp b/src/rep.cpp
index 137c735..e8a9e39 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -195,4 +195,21 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
return -1;
}
+bool zmq::rep_t::xhas_in ()
+{
+ for (int count = active; count != 0; count--) {
+ if (in_pipes [current]->check_read ())
+ return !waiting_for_reply;
+ current++;
+ if (current >= active)
+ current = 0;
+ }
+
+ return false;
+}
+
+bool zmq::rep_t::xhas_out ()
+{
+ return waiting_for_reply;
+}
diff --git a/src/rep.hpp b/src/rep.hpp
index 4781213..3e87dc1 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -43,6 +43,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
private:
diff --git a/src/req.cpp b/src/req.cpp
index 63409ce..93a46e8 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -195,4 +195,17 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
return 0;
}
+bool zmq::req_t::xhas_in ()
+{
+ if (reply_pipe->check_read ())
+ return waiting_for_reply;
+
+ return false;
+}
+
+bool zmq::req_t::xhas_out ()
+{
+ return !waiting_for_reply;
+}
+
diff --git a/src/req.hpp b/src/req.hpp
index 3ae78fd..86554b5 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -43,6 +43,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
private:
diff --git a/src/select.cpp b/src/select.cpp
index cb17169..3edd4ff 100644
--- a/src/select.cpp
+++ b/src/select.cpp
@@ -59,7 +59,7 @@ zmq::select_t::~select_t ()
zmq_assert (load.get () == 0);
}
-zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
+zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
{
// Store the file descriptor.
fd_entry_t entry = {fd_, events_};
@@ -75,38 +75,33 @@ zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
// Increase the load metric of the thread.
load.add (1);
- handle_t handle;
- handle.fd = fd_;
- return handle;
+ return fd_;
}
void zmq::select_t::rm_fd (handle_t handle_)
{
- // Get file descriptor.
- fd_t fd = handle_.fd;
-
// Mark the descriptor as retired.
fd_set_t::iterator it;
for (it = fds.begin (); it != fds.end (); it ++)
- if (it->fd == fd)
+ if (it->fd == handle_)
break;
zmq_assert (it != fds.end ());
it->fd = retired_fd;
retired = true;
// Stop polling on the descriptor.
- FD_CLR (fd, &source_set_in);
- FD_CLR (fd, &source_set_out);
- FD_CLR (fd, &source_set_err);
+ FD_CLR (handle_, &source_set_in);
+ FD_CLR (handle_, &source_set_out);
+ FD_CLR (handle_, &source_set_err);
// Discard all events generated on this file descriptor.
- FD_CLR (fd, &readfds);
- FD_CLR (fd, &writefds);
- FD_CLR (fd, &exceptfds);
+ FD_CLR (handle_, &readfds);
+ FD_CLR (handle_, &writefds);
+ FD_CLR (handle_, &exceptfds);
// Adjust the maxfd attribute if we have removed the
// highest-numbered file descriptor.
- if (fd == maxfd) {
+ if (handle_ == maxfd) {
maxfd = retired_fd;
for (fd_set_t::iterator it = fds.begin (); it != fds.end (); it ++)
if (it->fd > maxfd)
@@ -119,22 +114,22 @@ void zmq::select_t::rm_fd (handle_t handle_)
void zmq::select_t::set_pollin (handle_t handle_)
{
- FD_SET (handle_.fd, &source_set_in);
+ FD_SET (handle_, &source_set_in);
}
void zmq::select_t::reset_pollin (handle_t handle_)
{
- FD_CLR (handle_.fd, &source_set_in);
+ FD_CLR (handle_, &source_set_in);
}
void zmq::select_t::set_pollout (handle_t handle_)
{
- FD_SET (handle_.fd, &source_set_out);
+ FD_SET (handle_, &source_set_out);
}
void zmq::select_t::reset_pollout (handle_t handle_)
{
- FD_CLR (handle_.fd, &source_set_out);
+ FD_CLR (handle_, &source_set_out);
}
void zmq::select_t::add_timer (i_poll_events *events_)
diff --git a/src/select.hpp b/src/select.hpp
index c442477..01014d5 100644
--- a/src/select.hpp
+++ b/src/select.hpp
@@ -34,7 +34,6 @@
#include <sys/select.h>
#endif
-#include "i_poller.hpp"
#include "fd.hpp"
#include "thread.hpp"
#include "atomic_counter.hpp"
@@ -45,22 +44,24 @@ namespace zmq
// Implements socket polling mechanism using POSIX.1-2001 select()
// function.
- class select_t : public i_poller
+ class select_t
{
public:
+ typedef fd_t handle_t;
+
select_t ();
~select_t ();
- // i_poller implementation.
- handle_t add_fd (fd_t fd_, i_poll_events *events_);
+ // "poller" concept.
+ handle_t add_fd (fd_t fd_, struct i_poll_events *events_);
void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_);
void set_pollout (handle_t handle_);
void reset_pollout (handle_t handle_);
- void add_timer (i_poll_events *events_);
- void cancel_timer (i_poll_events *events_);
+ void add_timer (struct i_poll_events *events_);
+ void cancel_timer (struct i_poll_events *events_);
int get_load ();
void start ();
void stop ();
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 6763167..6583608 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -364,6 +364,21 @@ int zmq::socket_base_t::close ()
return 0;
}
+zmq::app_thread_t *zmq::socket_base_t::get_thread ()
+{
+ return app_thread;
+}
+
+bool zmq::socket_base_t::has_in ()
+{
+ return xhas_in ();
+}
+
+bool zmq::socket_base_t::has_out ()
+{
+ return xhas_out ();
+}
+
bool zmq::socket_base_t::register_session (const char *name_,
session_t *session_)
{
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index c54efae..49ff5a5 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -54,6 +54,16 @@ namespace zmq
int recv (zmq_msg_t *msg_, int flags_);
int close ();
+ // This function is used by the polling mechanism to determine
+ // whether the socket belongs to the application thread the poll
+ // is called from.
+ class app_thread_t *get_thread ();
+
+ // These functions are used by the polling mechanism to determine
+ // which events are to be reported from this socket.
+ bool has_in ();
+ bool has_out ();
+
// The list of sessions cannot be accessed via inter-thread
// commands as it is unacceptable to wait for the completion of the
// action till user application yields control of the application
@@ -88,6 +98,8 @@ namespace zmq
virtual int xsend (zmq_msg_t *msg_, int options_) = 0;
virtual int xflush () = 0;
virtual int xrecv (zmq_msg_t *msg_, int options_) = 0;
+ virtual bool xhas_in () = 0;
+ virtual bool xhas_out () = 0;
// Socket options.
options_t options;
diff --git a/src/sub.cpp b/src/sub.cpp
index 1bfbcd5..a7f9783 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -197,3 +197,16 @@ int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_)
errno = EAGAIN;
return -1;
}
+
+bool zmq::sub_t::xhas_in ()
+{
+ // TODO: This is more complex as we have to ignore all the messages that
+ // don't fit the filter.
+ zmq_assert (false);
+ return false;
+}
+
+bool zmq::sub_t::xhas_out ()
+{
+ return false;
+}
diff --git a/src/sub.hpp b/src/sub.hpp
index 8691928..fb881dc 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -48,6 +48,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
private:
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index 304790d..9bca0f0 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -50,8 +50,10 @@ int zmq::tcp_connecter_t::open ()
// Create the socket.
s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
- // TODO: Convert error to errno.
- wsa_assert (s != INVALID_SOCKET);
+ if (s == INVALID_SOCKET) {
+ wsa_error_to_errno ();
+ return -1;
+ }
// Set to non-blocking mode.
unsigned long argp = 1;
@@ -78,9 +80,7 @@ int zmq::tcp_connecter_t::open ()
return -1;
}
- // TODO: Convert error to errno.
- wsa_assert (rc == 0);
-
+ wsa_error_to_errno ();
return -1;
}
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 9431ccf..383aebe 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -48,8 +48,10 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
// Create a listening socket.
s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
- // TODO: Convert error code to errno.
- wsa_assert (s != INVALID_SOCKET);
+ if (s == INVALID_SOCKET) {
+ wsa_error_to_errno ();
+ return -1;
+ }
// Allow reusing of the address.
int flag = 1;
@@ -65,12 +67,17 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
// Bind the socket to the network interface and port.
rc = bind (s, (struct sockaddr*) &addr, sizeof (addr));
// TODO: Convert error code to errno.
- wsa_assert (rc != SOCKET_ERROR);
+ if (rc == SOCKET_ERROR) {
+ wsa_error_to_errno ();
+ return -1;
+ }
// Listen for incomming connections.
rc = listen (s, 1);
- // TODO: Convert error code to errno.
- wsa_assert (rc != SOCKET_ERROR);
+ if (rc == SOCKET_ERROR) {
+ wsa_error_to_errno ();
+ return -1;
+ }
return 0;
}
diff --git a/src/ypipe.hpp b/src/ypipe.hpp
index 6c51b63..0a9b5d5 100644
--- a/src/ypipe.hpp
+++ b/src/ypipe.hpp
@@ -106,16 +106,12 @@ namespace zmq
return true;
}
- // Reads an item from the pipe. Returns false if there is no value.
- // available.
- inline bool read (T *value_)
+ // Check whether item is available for reading.
+ inline bool check_read ()
{
- // Was the value prefetched already? If so, return it.
- if (&queue.front () != r) {
- *value_ = queue.front ();
- queue.pop ();
+ // Was the value prefetched already? If so, return.
+ if (&queue.front () != r)
return true;
- }
// There's no prefetched value, so let us prefetch more values.
// (Note that D is a template parameter. Becaue of that one of
@@ -166,6 +162,18 @@ namespace zmq
}
// There was at least one value prefetched.
+ return true;
+ }
+
+ // Reads an item from the pipe. Returns false if there is no value.
+ // available.
+ inline bool read (T *value_)
+ {
+ // Try to prefetch a value.
+ if (!check_read ())
+ return false;
+
+ // There was at least one value prefetched.
// Return it to the caller.
*value_ = queue.front ();
queue.pop ();
diff --git a/src/ypollset.cpp b/src/ypollset.cpp
index a72eac7..0f0d75f 100644
--- a/src/ypollset.cpp
+++ b/src/ypollset.cpp
@@ -58,3 +58,8 @@ uint64_t zmq::ypollset_t::check ()
{
return (uint64_t) bits.xchg (0);
}
+
+zmq::fd_t zmq::ypollset_t::get_fd ()
+{
+ return retired_fd;
+}
diff --git a/src/ypollset.hpp b/src/ypollset.hpp
index 25eb3e0..810a638 100644
--- a/src/ypollset.hpp
+++ b/src/ypollset.hpp
@@ -42,6 +42,7 @@ namespace zmq
void signal (int signal_);
uint64_t poll ();
uint64_t check ();
+ fd_t get_fd ();
private:
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 2dfdd48..7952b61 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -25,11 +25,16 @@
#include <new>
#include "socket_base.hpp"
-#include "err.hpp"
+#include "app_thread.hpp"
#include "dispatcher.hpp"
#include "msg_content.hpp"
#include "platform.hpp"
#include "stdint.hpp"
+#include "err.hpp"
+
+#if defined ZMQ_HAVE_LINUX
+#include <poll.h>
+#endif
#if !defined ZMQ_HAVE_WINDOWS
#include <unistd.h>
@@ -44,6 +49,14 @@ const char *zmq_strerror (int errnum_)
return "Not supported";
case EPROTONOSUPPORT:
return "Protocol not supported";
+ case ENOBUFS:
+ return "No buffer space available";
+ case ENETDOWN:
+ return "Network is down";
+ case EADDRINUSE:
+ return "Address in use";
+ case EADDRNOTAVAIL:
+ return "Address not available";
#endif
case EMTHREAD:
return "Number of preallocated application threads exceeded";
@@ -246,6 +259,116 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
}
+int zmq_poll (zmq_pollitem_t *items_, int nitems_)
+{
+ // TODO: Replace the polling mechanism by the virtualised framework
+ // used in 0MQ I/O threads. That'll make the thing work on all platforms.
+#if !defined ZMQ_HAVE_LINUX
+ errno = ENOTSUP;
+ return -1;
+#else
+
+ pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
+ zmq_assert (pollfds);
+ int npollfds = 0;
+ int nsockets = 0;
+
+ zmq::app_thread_t *app_thread = NULL;
+
+ for (int i = 0; i != nitems_; i++) {
+
+ // 0MQ sockets.
+ if (items_ [i].socket) {
+
+ // Get the app_thread the socket is living in. If there are two
+ // sockets in the same pollset with different app threads, fail.
+ zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
+ if (app_thread) {
+ if (app_thread != s->get_thread ()) {
+ free (pollfds);
+ errno = EFAULT;
+ return -1;
+ }
+ }
+ else
+ app_thread = s->get_thread ();
+
+ nsockets++;
+ continue;
+ }
+
+ // Raw file descriptors.
+ pollfds [npollfds].fd = items_ [i].fd;
+ pollfds [npollfds].events =
+ (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
+ (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
+ npollfds++;
+ }
+
+ // If there's at least one 0MQ socket in the pollset we have to poll
+ // for 0MQ commands. If ZMQ_POLL was not set, fail.
+ if (nsockets) {
+ pollfds [npollfds].fd = app_thread->get_signaler ()->get_fd ();
+ if (pollfds [npollfds].fd == zmq::retired_fd) {
+ free (pollfds);
+ errno = ENOTSUP;
+ return -1;
+ }
+ pollfds [npollfds].events = POLLIN;
+ npollfds++;
+ }
+
+ int nevents = 0;
+ bool initial = true;
+ while (!nevents) {
+
+ // Wait for activity. In the first iteration just check for events,
+ // don't wait. Waiting would prevent exiting on any events that may
+ // already be signaled on 0MQ sockets.
+ int rc = poll (pollfds, npollfds, initial ? 0 : -1);
+ if (rc == -1 && errno == EINTR)
+ continue;
+ errno_assert (rc >= 0);
+ initial = false;
+
+ // Process 0MQ commands if needed.
+ if (nsockets && pollfds [npollfds -1].revents & POLLIN)
+ app_thread->process_commands (false, false);
+
+ // Check for the events.
+ int pollfd_pos = 0;
+ for (int i = 0; i != nitems_; i++) {
+
+ // If the poll item is a raw file descriptor, simply convert
+ // the events to zmq_pollitem_t-style format.
+ if (!items_ [i].socket) {
+ items_ [i].revents =
+ (pollfds [pollfd_pos].revents & POLLIN ? ZMQ_POLLIN : 0) |
+ (pollfds [pollfd_pos].revents & POLLOUT ? ZMQ_POLLOUT : 0);
+ if (items_ [i].revents)
+ nevents++;
+ pollfd_pos++;
+ continue;
+ }
+
+ // The poll item is a 0MQ socket.
+ zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
+ items_ [i].revents = 0;
+ if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ())
+ items_ [i].revents |= ZMQ_POLLOUT;
+ if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
+ items_ [i].revents |= ZMQ_POLLIN;
+ if (items_ [i].revents)
+ nevents++;
+ }
+ }
+
+ free (pollfds);
+ return nevents;
+
+#endif
+}
+
#if defined ZMQ_HAVE_WINDOWS
static uint64_t now ()
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index b82af0a..baa0eee 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -137,6 +137,12 @@ void zmq::zmq_engine_t::out_event ()
void zmq::zmq_engine_t::revive ()
{
set_pollout (handle);
+
+ // Speculative write: The assumption is that at the moment new message
+ // was sent by the user the socket is probably available for writing.
+ // Thus we try to write the data to socket avoiding polling for POLLOUT.
+ // Consequently, the latency should be better in request/reply scenarios.
+ out_event ();
}
void zmq::zmq_engine_t::error ()