diff options
author | malosek <malosek@fastmq.com> | 2009-10-05 10:22:31 +0200 |
---|---|---|
committer | malosek <malosek@fastmq.com> | 2009-10-05 10:22:31 +0200 |
commit | d57ee0984ac3f8712063a7f83d7200be25ca5513 (patch) | |
tree | a956443e70c48ebd21242c11cc015db61c53c682 /src | |
parent | ff65e26ce7567ea6a907e566f8530f4988231d68 (diff) | |
parent | 4efe2366d7394e8969fc9aa64c50be6842d8455f (diff) |
Merge branch 'master' of git@github.com:sustrik/zeromq2
Diffstat (limited to 'src')
42 files changed, 503 insertions, 255 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 27784a0..92016e8 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -85,7 +85,6 @@ libzmq_la_SOURCES = $(pgm_sources) \ ip.hpp \ i_endpoint.hpp \ i_engine.hpp \ - i_poller.hpp \ i_poll_events.hpp \ i_signaler.hpp \ kqueue.hpp \ @@ -100,6 +99,7 @@ libzmq_la_SOURCES = $(pgm_sources) \ pipe.hpp \ platform.hpp \ poll.hpp \ + poller.hpp \ p2p.hpp \ pub.hpp \ rep.hpp \ diff --git a/src/devpoll.cpp b/src/devpoll.cpp index 2386034..f28d55e 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -69,7 +69,8 @@ void zmq::devpoll_t::devpoll_ctl (fd_t fd_, short events_) zmq_assert (rc == sizeof pfd); } -zmq::handle_t zmq::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_) +zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_, + i_poll_events *reactor_) { assert (!fd_table [fd_].valid); @@ -84,17 +85,15 @@ zmq::handle_t zmq::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_) // Increase the load metric of the thread. load.add (1); - handle_t handle; - handle.fd = fd_; - return handle; + return fd_; } void zmq::devpoll_t::rm_fd (handle_t handle_) { - assert (fd_table [handle_.fd].valid); + assert (fd_table [handle_].valid); - devpoll_ctl (handle_.fd, POLLREMOVE); - fd_table [handle_.fd].valid = false; + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].valid = false; // Decrease the load metric of the thread. load.sub (1); @@ -102,34 +101,30 @@ void zmq::devpoll_t::rm_fd (handle_t handle_) void zmq::devpoll_t::set_pollin (handle_t handle_) { - fd_t fd = handle_.fd; - devpoll_ctl (fd, POLLREMOVE); - fd_table [fd].events |= POLLIN; - devpoll_ctl (fd, fd_table [fd].events); + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].events |= POLLIN; + devpoll_ctl (handle_, fd_table [handle_].events); } void zmq::devpoll_t::reset_pollin (handle_t handle_) { - fd_t fd = handle_.fd; - devpoll_ctl (fd, POLLREMOVE); - fd_table [fd].events &= ~((short) POLLIN); - devpoll_ctl (fd, fd_table [fd].events); + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].events &= ~((short) POLLIN); + devpoll_ctl (handle_, fd_table [handle_].events); } void zmq::devpoll_t::set_pollout (handle_t handle_) { - fd_t fd = handle_.fd; - devpoll_ctl (fd, POLLREMOVE); - fd_table [fd].events |= POLLOUT; - devpoll_ctl (fd, fd_table [fd].events); + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].events |= POLLOUT; + devpoll_ctl (handle_, fd_table [handle_].events); } void zmq::devpoll_t::reset_pollout (handle_t handle_) { - fd_t fd = handle_.fd; - devpoll_ctl (fd, POLLREMOVE); - fd_table [fd].events &= ~((short) POLLOUT); - devpoll_ctl (fd, fd_table [fd].events); + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].events &= ~((short) POLLOUT); + devpoll_ctl (handle_, fd_table [handle_].events); } void zmq::devpoll_t::add_timer (i_poll_events *events_) diff --git a/src/devpoll.hpp b/src/devpoll.hpp index 57f0156..b796ba5 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -26,7 +26,6 @@ #include <vector> -#include "i_poller.hpp" #include "fd.hpp" #include "thread.hpp" #include "atomic_counter.hpp" @@ -37,22 +36,24 @@ namespace zmq // Implements socket polling mechanism using the Solaris-specific // "/dev/poll" interface. - class devpoll_t : public i_poller + class devpoll_t { public: + typedef fd_t handle_t; + devpoll_t (); ~devpoll_t (); - // i_poller implementation. - handle_t add_fd (fd_t fd_, i_poll_events *events_); + // "poller" concept. + handle_t add_fd (fd_t fd_, struct i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (i_poll_events *events_); - void cancel_timer (i_poll_events *events_); + void add_timer (struct i_poll_events *events_); + void cancel_timer (struct i_poll_events *events_); int get_load (); void start (); void stop (); diff --git a/src/epoll.cpp b/src/epoll.cpp index 5f3bc51..8f576f8 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -52,7 +52,7 @@ zmq::epoll_t::~epoll_t () delete *it; } -zmq::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) +zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) { poll_entry_t *pe = new poll_entry_t; zmq_assert (pe != NULL); @@ -72,14 +72,12 @@ zmq::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) // Increase the load metric of the thread. load.add (1); - handle_t handle; - handle.ptr = pe; - return handle; + return pe; } void zmq::epoll_t::rm_fd (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; int rc = epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev); errno_assert (rc != -1); pe->fd = retired_fd; @@ -91,7 +89,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_) void zmq::epoll_t::set_pollin (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->ev.events |= EPOLLIN; int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); errno_assert (rc != -1); @@ -99,7 +97,7 @@ void zmq::epoll_t::set_pollin (handle_t handle_) void zmq::epoll_t::reset_pollin (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->ev.events &= ~((short) EPOLLIN); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); errno_assert (rc != -1); @@ -107,7 +105,7 @@ void zmq::epoll_t::reset_pollin (handle_t handle_) void zmq::epoll_t::set_pollout (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->ev.events |= EPOLLOUT; int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); errno_assert (rc != -1); @@ -115,7 +113,7 @@ void zmq::epoll_t::set_pollout (handle_t handle_) void zmq::epoll_t::reset_pollout (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->ev.events &= ~((short) EPOLLOUT); int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); errno_assert (rc != -1); diff --git a/src/epoll.hpp b/src/epoll.hpp index 619d4f3..90824bd 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -27,8 +27,6 @@ #include <vector> #include <sys/epoll.h> -#include "i_poller.hpp" -//#include "i_poll_events.hpp" #include "fd.hpp" #include "thread.hpp" #include "atomic_counter.hpp" @@ -39,22 +37,24 @@ namespace zmq // This class implements socket polling mechanism using the Linux-specific // epoll mechanism. - class epoll_t : public i_poller + class epoll_t { public: + typedef void* handle_t; + epoll_t (); ~epoll_t (); - // i_poller implementation. - handle_t add_fd (fd_t fd_, i_poll_events *events_); + // "poller" concept. + handle_t add_fd (fd_t fd_, struct i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (i_poll_events *events_); - void cancel_timer (i_poll_events *events_); + void add_timer (struct i_poll_events *events_); + void cancel_timer (struct i_poll_events *events_); int get_load (); void start (); void stop (); diff --git a/src/err.cpp b/src/err.cpp index ef5b987..36cb2fc 100644 --- a/src/err.cpp +++ b/src/err.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include "../bindings/c/zmq.h" + #include "err.hpp" #include "platform.hpp" @@ -24,8 +26,6 @@ const char *zmq::wsa_error() { - - int errcode = WSAGetLastError (); // TODO: This is not a generic way to handle this... if (errcode == WSAEWOULDBLOCK) @@ -148,4 +148,43 @@ void zmq::win_error (char *buffer_, size_t buffer_size_) zmq_assert (rc); } +void zmq::wsa_error_to_errno () +{ + int errcode = WSAGetLastError (); + switch (errcode) { + case WSAEINPROGRESS: + errno = EAGAIN; + return; + case WSAEBADF: + errno = EBADF; + return; + case WSAEINVAL: + errno = EINVAL; + return; + case WSAEMFILE: + errno = EMFILE; + return; + case WSAEFAULT: + errno = EFAULT; + return; + case WSAEPROTONOSUPPORT: + errno = EPROTONOSUPPORT; + return; + case WSAENOBUFS: + errno = ENOBUFS; + return; + case WSAENETDOWN: + errno = ENETDOWN; + return; + case WSAEADDRINUSE: + errno = EADDRINUSE; + return; + case WSAEADDRNOTAVAIL: + errno = EADDRNOTAVAIL; + return; + default: + wsa_assert (false); + } +} + #endif diff --git a/src/err.hpp b/src/err.hpp index c1b2916..fb9195e 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -41,6 +41,7 @@ namespace zmq const char *wsa_error (); void win_error (char *buffer_, size_t buffer_size_); + void wsa_error_to_errno (); } diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp index e1b56ad..513a1e4 100644 --- a/src/fd_signaler.hpp +++ b/src/fd_signaler.hpp @@ -44,8 +44,6 @@ namespace zmq void signal (int signal_); uint64_t poll (); uint64_t check (); - - // Get the file descriptor associated with the object. fd_t get_fd (); private: diff --git a/src/i_poller.hpp b/src/i_poller.hpp deleted file mode 100644 index 2665e82..0000000 --- a/src/i_poller.hpp +++ /dev/null @@ -1,84 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_I_POLLER_HPP_INCLUDED__ -#define __ZMQ_I_POLLER_HPP_INCLUDED__ - -#include "fd.hpp" - -namespace zmq -{ - - union handle_t - { - fd_t fd; - void *ptr; - }; - - // Virtual interface to be used when polling on file descriptors. - - struct i_poller - { - virtual ~i_poller () {}; - - // Add file descriptor to the polling set. Return handle - // representing the descriptor. 'events' interface will be used - // to invoke callback functions when event occurs. - virtual handle_t add_fd (fd_t fd_, struct i_poll_events *events_) = 0; - - // Remove file descriptor identified by handle from the polling set. - virtual void rm_fd (handle_t handle_) = 0; - - // Start polling for input from socket. - virtual void set_pollin (handle_t handle_) = 0; - - // Stop polling for input from socket. - virtual void reset_pollin (handle_t handle_) = 0; - - // Start polling for availability of the socket for writing. - virtual void set_pollout (handle_t handle_) = 0; - - // Stop polling for availability of the socket for writing. - virtual void reset_pollout (handle_t handle_) = 0; - - // Ask to be notified after some time. Actual interval varies between - // 0 and max_timer_period ms. Timer is destroyed once it expires or, - // optionally, when cancel_timer is called. - virtual void add_timer (struct i_poll_events *events_) = 0; - - // Cancel the timer set by add_timer method. - virtual void cancel_timer (struct i_poll_events *events_) = 0; - - // Returns load experienced by the I/O thread. Currently it's number - // of file descriptors handled by the poller, in the future we may - // use a metric taking actual traffic on the individual sockets into - // account. - virtual int get_load () = 0; - - // Start the execution of the underlying I/O thread. - // This method is called from a foreign thread. - virtual void start () = 0; - - // Ask underlying I/O thread to stop. - virtual void stop () = 0; - }; - -} - -#endif diff --git a/src/i_signaler.hpp b/src/i_signaler.hpp index a09fe7e..ad04bb5 100644 --- a/src/i_signaler.hpp +++ b/src/i_signaler.hpp @@ -21,6 +21,7 @@ #define __ZMQ_I_SIGNALER_HPP_INCLUDED__ #include "stdint.hpp" +#include "fd.hpp" namespace zmq { @@ -42,6 +43,11 @@ namespace zmq // Same as poll, however, if there is no signal available, // function returns zero immediately instead of waiting for a signal. virtual uint64_t check () = 0; + + // Returns file descriptor that allows waiting for signals. Specific + // signalers may not support this functionality. If so, the function + // returns retired_fd. + virtual fd_t get_fd () = 0; }; } diff --git a/src/io_object.cpp b/src/io_object.cpp index f61e5f0..0cf77fd 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -36,7 +36,7 @@ void zmq::io_object_t::set_io_thread (io_thread_t *io_thread_) poller = io_thread_->get_poller (); } -zmq::handle_t zmq::io_object_t::add_fd (fd_t fd_) +zmq::io_object_t::handle_t zmq::io_object_t::add_fd (fd_t fd_) { return poller->add_fd (fd_, this); } diff --git a/src/io_object.hpp b/src/io_object.hpp index e5582db..2ed5e24 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -22,7 +22,7 @@ #include <stddef.h> -#include "i_poller.hpp" +#include "poller.hpp" #include "i_poll_events.hpp" namespace zmq @@ -41,6 +41,8 @@ namespace zmq protected: + typedef poller_t::handle_t handle_t; + // Derived class can init/swap the underlying I/O thread. // Caution: Remove all the file descriptors from the old I/O thread // before swapping to the new one! @@ -63,7 +65,7 @@ namespace zmq private: - struct i_poller *poller; + poller_t *poller; io_object_t (const io_object_t&); void operator = (const io_object_t&); diff --git a/src/io_thread.cpp b/src/io_thread.cpp index a90876c..6d4710a 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -24,11 +24,6 @@ #include "platform.hpp" #include "err.hpp" #include "command.hpp" -#include "epoll.hpp" -#include "poll.hpp" -#include "select.hpp" -#include "devpoll.hpp" -#include "kqueue.hpp" #include "dispatcher.hpp" #include "simple_semaphore.hpp" @@ -36,39 +31,7 @@ zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_, int flags_) : object_t (dispatcher_, thread_slot_) { -#if defined ZMQ_FORCE_SELECT - poller = new select_t; -#elif defined ZMQ_FORCE_POLL - poller = new poll_t; -#elif defined ZMQ_FORCE_EPOLL - poller = new epoll_t; -#elif defined ZMQ_FORCE_DEVPOLL - poller = new devpoll_t; -#elif defined ZMQ_FORCE_KQUEUE - poller = new kqueue_t; -#elif defined ZMQ_HAVE_LINUX - poller = new epoll_t; -#elif defined ZMQ_HAVE_WINDOWS - poller = new select_t; -#elif defined ZMQ_HAVE_FREEBSD - poller = new kqueue_t; -#elif defined ZMQ_HAVE_OPENBSD - poller = new kqueue_t; -#elif defined ZMQ_HAVE_SOLARIS - poller = new devpoll_t; -#elif defined ZMQ_HAVE_OSX - poller = new kqueue_t; -#elif defined ZMQ_HAVE_QNXNTO - poller = new poll_t; -#elif defined ZMQ_HAVE_AIX - poller = new poll_t; -#elif defined ZMQ_HAVE_HPUX - poller = new devpoll_t; -#elif defined ZMQ_HAVE_OPENVMS - poller = new select_t; -#else -#error Unsupported platform -#endif + poller = new poller_t; zmq_assert (poller); signaler_handle = poller->add_fd (signaler.get_fd (), this); @@ -134,7 +97,7 @@ void zmq::io_thread_t::timer_event () zmq_assert (false); } -zmq::i_poller *zmq::io_thread_t::get_poller () +zmq::poller_t *zmq::io_thread_t::get_poller () { zmq_assert (poller); return poller; diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 4015b0c..457cdbf 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -23,7 +23,7 @@ #include <vector> #include "object.hpp" -#include "i_poller.hpp" +#include "poller.hpp" #include "i_poll_events.hpp" #include "fd_signaler.hpp" @@ -59,7 +59,7 @@ namespace zmq void timer_event (); // Used by io_objects to retrieve the assciated poller object. - struct i_poller *get_poller (); + poller_t *get_poller (); // Command handlers. void process_stop (); @@ -74,10 +74,10 @@ namespace zmq fd_signaler_t signaler; // Handle associated with signaler's file descriptor. - handle_t signaler_handle; + poller_t::handle_t signaler_handle; // I/O multiplexing is performed using a poller object. - i_poller *poller; + poller_t *poller; }; } diff --git a/src/kqueue.cpp b/src/kqueue.cpp index c967c73..f32fa36 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -68,7 +68,8 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_) errno_assert (rc != -1); } -zmq::handle_t zmq::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_) +zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_, + i_poll_events *reactor_) { poll_entry_t *pe = new poll_entry_t; zmq_assert (pe != NULL); @@ -78,14 +79,12 @@ zmq::handle_t zmq::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_) pe->flag_pollout = 0; pe->reactor = reactor_; - handle_t handle; - handle.ptr = pe; - return handle; + return pe; } void zmq::kqueue_t::rm_fd (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; if (pe->flag_pollin) kevent_delete (pe->fd, EVFILT_READ); if (pe->flag_pollout) @@ -96,28 +95,28 @@ void zmq::kqueue_t::rm_fd (handle_t handle_) void zmq::kqueue_t::set_pollin (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->flag_pollin = true; kevent_add (pe->fd, EVFILT_READ, pe); } void zmq::kqueue_t::reset_pollin (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->flag_pollin = false; kevent_delete (pe->fd, EVFILT_READ); } void zmq::kqueue_t::set_pollout (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->flag_pollout = true; kevent_add (pe->fd, EVFILT_WRITE, pe); } void zmq::kqueue_t::reset_pollout (handle_t handle_) { - poll_entry_t *pe = (poll_entry_t*) handle_.ptr; + poll_entry_t *pe = (poll_entry_t*) handle_; pe->flag_pollout = false; kevent_delete (pe->fd, EVFILT_WRITE); } diff --git a/src/kqueue.hpp b/src/kqueue.hpp index eeb6f09..bed9108 100644 --- a/src/kqueue.hpp +++ b/src/kqueue.hpp @@ -26,7 +26,6 @@ #include <vector> -#include "i_poller.hpp" #include "fd.hpp" #include "thread.hpp" #include "atomic_counter.hpp" @@ -37,22 +36,24 @@ namespace zmq // Implements socket polling mechanism using the BSD-specific // kqueue interface. - class kqueue_t : public i_poller + class kqueue_t { public: + typedef void* handle_t; + kqueue_t (); ~kqueue_t (); - // i_poller implementation. - handle_t add_fd (fd_t fd_, i_poll_events *events_); + // "poller" concept. + handle_t add_fd (fd_t fd_, struct i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); void set_pollout (handle_t handle_); void reset_pollout (handle_t handle_); - void add_timer (i_poll_events *events_); - void cancel_timer (i_poll_events *events_); + void add_timer (struct i_poll_events *events_); + void cancel_timer (struct i_poll_events *events_); int get_load (); void start (); void stop (); diff --git a/src/p2p.cpp b/src/p2p.cpp index c43b7b4..445ba5b 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -84,4 +84,15 @@ int zmq::p2p_t::xrecv (zmq_msg_t *msg_, int flags_) return 0; } +bool zmq::p2p_t::xhas_in () +{ + zmq_assert (false); + return false; +} + +bool zmq::p2p_t::xhas_out () +{ + zmq_assert (false); + return false; +} diff --git a/src/p2p.hpp b/src/p2p.hpp index 1c98dd5..1fd7e34 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -42,6 +42,8 @@ namespace zmq int xsend (zmq_msg_t *msg_, int flags_); int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); private: diff --git a/src/pipe.cpp b/src/pipe.cpp index f8dfcb8..e444520 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -36,6 +36,17 @@ zmq::reader_t::~reader_t () { } +bool zmq::reader_t::check_read () +{ + // Check if there's an item in the pipe. + if (pipe->check_read ()) + return true; + + // If not, deactivate the pipe. + endpoint->kill (this); + return false; +} + bool zmq::reader_t::read (zmq_msg_t *msg_) { if (!pipe->read (msg_)) { diff --git a/src/pipe.hpp b/src/pipe.hpp index 699e3f7..ecbce7d 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -42,6 +42,9 @@ namespace zmq void set_endpoint (i_endpoint *endpoint_); + // Returns true if there is at least one message to read in the pipe. + bool check_read (); |