diff options
44 files changed, 555 insertions, 257 deletions
| diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h index f42f7f9..40750ec 100644 --- a/bindings/c/zmq.h +++ b/bindings/c/zmq.h @@ -51,6 +51,18 @@ extern "C" {  #ifndef EPROTONOSUPPORT  #define EPROTONOSUPPORT (ZMQ_HAUSNUMERO + 2)  #endif +#ifndef ENOBUFS +#define ENOBUFS (ZMQ_HAUSNUMERO + 3) +#endif +#ifndef ENETDOWN +#define ENETDOWN (ZMQ_HAUSNUMERO + 4) +#endif +#ifndef EADDRINUSE +#define EADDRINUSE (ZMQ_HAUSNUMERO + 5) +#endif +#ifndef EADDRNOTAVAIL +#define EADDRNOTAVAIL (ZMQ_HAUSNUMERO + 6) +#endif  //  Native 0MQ error codes.  #define EMTHREAD (ZMQ_HAUSNUMERO + 50) @@ -344,8 +356,8 @@ ZMQ_EXPORT int zmq_send (void *s, zmq_msg_t *msg, int flags);  //          EFSM - function cannot be called at the moment.   ZMQ_EXPORT int zmq_flush (void *s); -//  Send a message from the socket 's'. 'flags' argument can be combination -//  of the flags described above. +//  Receive a message from the socket 's'. 'flags' argument can be combination +//  of the flags described above with the exception of ZMQ_NOFLUSH.  //  //  Errors: EAGAIN - message cannot be received at the moment (applies only to  //                   non-blocking receive). @@ -354,6 +366,39 @@ ZMQ_EXPORT int zmq_flush (void *s);  ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags);  //////////////////////////////////////////////////////////////////////////////// +//  I/O multiplexing. +//////////////////////////////////////////////////////////////////////////////// + +#define ZMQ_POLLIN 1 +#define ZMQ_POLLOUT 2 + +//  'socket' is a 0MQ socket we want to poll on. If set to NULL, native file +//  descriptor (socket) 'fd' will be used instead. 'events' defines event we +//  are going to poll on - combination of ZMQ_POLLIN and ZMQ_POLLOUT. Error +//  event does not exist for portability reasons. Errors from native sockets +//  are reported as ZMQ_POLLIN. It's client's responsibilty to identify the +//  error afterwards. 'revents' field is filled in after function returns. It's +//  a combination of ZMQ_POLLIN and/or ZMQ_POLLOUT depending on the state of the +//  socket. +typedef struct +{ +    void *socket; +    int fd; +    short events; +    short revents; +} zmq_pollitem_t; + +//  Polls for the items specified by 'items'. Number of items in the array is +//  determined by 'nitems' argument. Returns number of items signaled, -1 +//  in the case of error. +// +//  Errors: EFAULT - there's a 0MQ socket in the pollset belonging to +//                   a different thread. +//          ENOTSUP - 0MQ context was initialised without ZMQ_POLL flag. +//                    I/O multiplexing is disabled. +ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems); + +////////////////////////////////////////////////////////////////////////////////  //  Helper functions.  //////////////////////////////////////////////////////////////////////////////// diff --git a/bindings/cpp/zmq.hpp b/bindings/cpp/zmq.hpp index 5c0ba7c..8a00230 100644 --- a/bindings/cpp/zmq.hpp +++ b/bindings/cpp/zmq.hpp @@ -200,6 +200,11 @@ namespace zmq                  throw error_t ();          } +        inline operator void* () +        { +            return ptr; +        } +          inline void setsockopt (int option_, const void *optval_,              size_t optvallen_)          { diff --git a/src/Makefile.am b/src/Makefile.am index 27784a0..92016e8 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -85,7 +85,6 @@ libzmq_la_SOURCES = $(pgm_sources) \      ip.hpp \      i_endpoint.hpp \      i_engine.hpp \ -    i_poller.hpp \      i_poll_events.hpp \      i_signaler.hpp \      kqueue.hpp \ @@ -100,6 +99,7 @@ libzmq_la_SOURCES = $(pgm_sources) \      pipe.hpp \      platform.hpp \      poll.hpp \ +    poller.hpp \      p2p.hpp \      pub.hpp \      rep.hpp \ diff --git a/src/devpoll.cpp b/src/devpoll.cpp index 2386034..f28d55e 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -69,7 +69,8 @@ void zmq::devpoll_t::devpoll_ctl (fd_t fd_, short events_)      zmq_assert (rc == sizeof pfd);  } -zmq::handle_t zmq::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_) +zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_, +    i_poll_events *reactor_)  {      assert (!fd_table [fd_].valid); @@ -84,17 +85,15 @@ zmq::handle_t zmq::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_)      //  Increase the load metric of the thread.      load.add (1); -    handle_t handle; -    handle.fd = fd_; -    return handle; +    return fd_;  }  void zmq::devpoll_t::rm_fd (handle_t handle_)  { -    assert (fd_table [handle_.fd].valid); +    assert (fd_table [handle_].valid); -    devpoll_ctl (handle_.fd, POLLREMOVE); -    fd_table [handle_.fd].valid = false; +    devpoll_ctl (handle_, POLLREMOVE); +    fd_table [handle_].valid = false;      //  Decrease the load metric of the thread.      load.sub (1); @@ -102,34 +101,30 @@ void zmq::devpoll_t::rm_fd (handle_t handle_)  void zmq::devpoll_t::set_pollin (handle_t handle_)  { -    fd_t fd = handle_.fd; -    devpoll_ctl (fd, POLLREMOVE); -    fd_table [fd].events |= POLLIN; -    devpoll_ctl (fd, fd_table [fd].events); +    devpoll_ctl (handle_, POLLREMOVE); +    fd_table [handle_].events |= POLLIN; +    devpoll_ctl (handle_, fd_table [handle_].events);  }  void zmq::devpoll_t::reset_pollin (handle_t handle_)  { -    fd_t fd = handle_.fd; -    devpoll_ctl (fd, POLLREMOVE); -    fd_table [fd].events &= ~((short) POLLIN); -    devpoll_ctl (fd, fd_table [fd].events); +    devpoll_ctl (handle_, POLLREMOVE); +    fd_table [handle_].events &= ~((short) POLLIN); +    devpoll_ctl (handle_, fd_table [handle_].events);  }  void zmq::devpoll_t::set_pollout (handle_t handle_)  { -    fd_t fd = handle_.fd; -    devpoll_ctl (fd, POLLREMOVE); -    fd_table [fd].events |= POLLOUT; -    devpoll_ctl (fd, fd_table [fd].events); +    devpoll_ctl (handle_, POLLREMOVE); +    fd_table [handle_].events |= POLLOUT; +    devpoll_ctl (handle_, fd_table [handle_].events);  }  void zmq::devpoll_t::reset_pollout (handle_t handle_)  { -    fd_t fd = handle_.fd; -    devpoll_ctl (fd, POLLREMOVE); -    fd_table [fd].events &= ~((short) POLLOUT); -    devpoll_ctl (fd, fd_table [fd].events); +    devpoll_ctl (handle_, POLLREMOVE); +    fd_table [handle_].events &= ~((short) POLLOUT); +    devpoll_ctl (handle_, fd_table [handle_].events);  }  void zmq::devpoll_t::add_timer (i_poll_events *events_) diff --git a/src/devpoll.hpp b/src/devpoll.hpp index 57f0156..b796ba5 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -26,7 +26,6 @@  #include <vector> -#include "i_poller.hpp"  #include "fd.hpp"  #include "thread.hpp"  #include "atomic_counter.hpp" @@ -37,22 +36,24 @@ namespace zmq      //  Implements socket polling mechanism using the Solaris-specific      //  "/dev/poll" interface. -    class devpoll_t : public i_poller +    class devpoll_t      {      public: +        typedef fd_t handle_t; +          devpoll_t ();          ~devpoll_t (); -        //  i_poller implementation. -        handle_t add_fd (fd_t fd_, i_poll_events *events_); +        //  "poller" concept. +        handle_t add_fd (fd_t fd_, struct i_poll_events *events_);          void rm_fd (handle_t handle_);          void set_pollin (handle_t handle_);          void reset_pollin (handle_t handle_);          void set_pollout (handle_t handle_);          void reset_pollout (handle_t handle_); -        void add_timer (i_poll_events *events_); -        void cancel_timer (i_poll_events *events_); +        void add_timer (struct i_poll_events *events_); +        void cancel_timer (struct i_poll_events *events_);          int get_load ();          void start ();          void stop (); diff --git a/src/epoll.cpp b/src/epoll.cpp index 5f3bc51..8f576f8 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -52,7 +52,7 @@ zmq::epoll_t::~epoll_t ()          delete *it;  } -zmq::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) +zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)  {      poll_entry_t *pe = new poll_entry_t;      zmq_assert (pe != NULL); @@ -72,14 +72,12 @@ zmq::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)      //  Increase the load metric of the thread.      load.add (1); -    handle_t handle; -    handle.ptr = pe; -    return handle; +    return pe;  }  void zmq::epoll_t::rm_fd (handle_t handle_)  { -    poll_entry_t *pe = (poll_entry_t*) handle_.ptr; +    poll_entry_t *pe = (poll_entry_t*) handle_;      int rc = epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev);      errno_assert (rc != -1);      pe->fd = retired_fd; @@ -91,7 +89,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_)  void zmq::epoll_t::set_pollin (handle_t handle_)  { -    poll_entry_t *pe = (poll_entry_t*) handle_.ptr; +    poll_entry_t *pe = (poll_entry_t*) handle_;      pe->ev.events |= EPOLLIN;      int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);      errno_assert (rc != -1); @@ -99,7 +97,7 @@ void zmq::epoll_t::set_pollin (handle_t handle_)  void zmq::epoll_t::reset_pollin (handle_t handle_)  { -    poll_entry_t *pe = (poll_entry_t*) handle_.ptr; +    poll_entry_t *pe = (poll_entry_t*) handle_;      pe->ev.events &= ~((short) EPOLLIN);      int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);      errno_assert (rc != -1); @@ -107,7 +105,7 @@ void zmq::epoll_t::reset_pollin (handle_t handle_)  void zmq::epoll_t::set_pollout (handle_t handle_)  { -    poll_entry_t *pe = (poll_entry_t*) handle_.ptr; +    poll_entry_t *pe = (poll_entry_t*) handle_;      pe->ev.events |= EPOLLOUT;      int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);      errno_assert (rc != -1); @@ -115,7 +113,7 @@ void zmq::epoll_t::set_pollout (handle_t handle_)  void zmq::epoll_t::reset_pollout (handle_t handle_)  { -    poll_entry_t *pe = (poll_entry_t*) handle_.ptr; +    poll_entry_t *pe = (poll_entry_t*) handle_;      pe->ev.events &= ~((short) EPOLLOUT);      int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev);      errno_assert (rc != -1); diff --git a/src/epoll.hpp b/src/epoll.hpp index 619d4f3..90824bd 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -27,8 +27,6 @@  #include <vector>  #include <sys/epoll.h> -#include "i_poller.hpp" -//#include "i_poll_events.hpp"  #include "fd.hpp"  #include "thread.hpp"  #include "atomic_counter.hpp" @@ -39,22 +37,24 @@ namespace zmq      //  This class implements socket polling mechanism using the Linux-specific      //  epoll mechanism. -    class epoll_t : public i_poller +    class epoll_t      {      public: +        typedef void* handle_t; +          epoll_t ();          ~epoll_t (); -        //  i_poller implementation. -        handle_t add_fd (fd_t fd_, i_poll_events *events_); +        //  "poller" concept. +        handle_t add_fd (fd_t fd_, struct i_poll_events *events_);          void rm_fd (handle_t handle_);          void set_pollin (handle_t handle_);          void reset_pollin (handle_t handle_);          void set_pollout (handle_t handle_);          void reset_pollout (handle_t handle_); -        void add_timer (i_poll_events *events_); -        void cancel_timer (i_poll_events *events_); +        void add_timer (struct i_poll_events *events_); +        void cancel_timer (struct i_poll_events *events_);          int get_load ();          void start ();          void stop (); diff --git a/src/err.cpp b/src/err.cpp index ef5b987..36cb2fc 100644 --- a/src/err.cpp +++ b/src/err.cpp @@ -17,6 +17,8 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ +#include "../bindings/c/zmq.h" +  #include "err.hpp"  #include "platform.hpp" @@ -24,8 +26,6 @@  const char *zmq::wsa_error()  { - -      int errcode = WSAGetLastError ();      //  TODO: This is not a generic way to handle this...      if (errcode == WSAEWOULDBLOCK) @@ -148,4 +148,43 @@ void zmq::win_error (char *buffer_, size_t buffer_size_)      zmq_assert (rc);  } +void zmq::wsa_error_to_errno () +{ +    int errcode = WSAGetLastError (); +    switch (errcode) { +    case WSAEINPROGRESS: +        errno = EAGAIN; +        return; +    case WSAEBADF: +        errno = EBADF; +        return; +    case WSAEINVAL: +        errno = EINVAL; +        return; +    case WSAEMFILE: +        errno = EMFILE; +        return; +    case WSAEFAULT: +        errno = EFAULT; +        return; +    case WSAEPROTONOSUPPORT: +        errno = EPROTONOSUPPORT; +        return; +    case WSAENOBUFS: +        errno = ENOBUFS; +        return; +    case WSAENETDOWN: +        errno = ENETDOWN; +        return; +    case WSAEADDRINUSE: +        errno = EADDRINUSE; +        return; +    case WSAEADDRNOTAVAIL: +        errno = EADDRNOTAVAIL; +        return; +    default: +        wsa_assert (false); +    } +} +  #endif diff --git a/src/err.hpp b/src/err.hpp index c1b2916..fb9195e 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -41,6 +41,7 @@ namespace zmq      const char *wsa_error ();      void win_error (char *buffer_, size_t buffer_size_); +    void wsa_error_to_errno ();  } diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp index e1b56ad..513a1e4 100644 --- a/src/fd_signaler.hpp +++ b/src/fd_signaler.hpp @@ -44,8 +44,6 @@ namespace zmq          void signal (int signal_);          uint64_t poll ();          uint64_t check (); - -        //  Get the file descriptor associated with the object.          fd_t get_fd ();      private: diff --git a/src/i_poller.hpp b/src/i_poller.hpp deleted file mode 100644 index 2665e82..0000000 --- a/src/i_poller.hpp +++ /dev/null @@ -1,84 +0,0 @@ -/* -    Copyright (c) 2007-2009 FastMQ Inc. - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the Lesser GNU General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    Lesser GNU General Public License for more details. - -    You should have received a copy of the Lesser GNU General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_I_POLLER_HPP_INCLUDED__ -#define __ZMQ_I_POLLER_HPP_INCLUDED__ - -#include "fd.hpp" - -namespace zmq -{ - -    union handle_t -    { -        fd_t fd; -        void *ptr; -    }; - -    //  Virtual interface to be used when polling on file descriptors. - -    struct i_poller -    { -        virtual ~i_poller () {}; - -        //  Add file descriptor to the polling set. Return handle -        //  representing the descriptor. 'events' interface will be used -        //  to invoke callback functions when event occurs. -        virtual handle_t add_fd (fd_t fd_, struct i_poll_events *events_) = 0; - -        //  Remove file descriptor identified by handle from the polling set. -        virtual void rm_fd (handle_t handle_) = 0; - -        //  Start polling for input from socket. -        virtual void set_pollin (handle_t handle_) = 0; - -        //  Stop polling for input from socket. -        virtual void reset_pollin (handle_t handle_) = 0; - -        //  Start polling for availability of the socket for writing. -        virtual void set_pollout (handle_t handle_) = 0; - -        //  Stop polling for availability of the socket for writing. -        virtual void reset_pollout (handle_t handle_) = 0; - -        //  Ask to be notified after some time. Actual interval varies between -        //  0 and max_timer_period ms. Timer is destroyed once it expires or, -        //  optionally, when cancel_timer is called. -        virtual void add_timer (struct i_poll_events *events_) = 0; - -        //  Cancel the timer set by add_timer method. -        virtual void cancel_timer (struct i_poll_events *events_) = 0; - -        //  Returns load experienced by the I/O thread. Currently it's number -        //  of file descriptors handled by the poller, in the future we may -        //  use a metric taking actual traffic on the individual sockets into -        //  account. -        virtual int get_load () = 0; - -        //  Start the execution of the underlying I/O thread. -        //  This method is called from a foreign thread. -        virtual void start () = 0; - -        //  Ask underlying I/O thread to stop. -        virtual void stop () = 0; -    }; - -} - -#endif diff --git a/src/i_signaler.hpp b/src/i_signaler.hpp index a09fe7e..ad04bb5 100644 --- a/src/i_signaler.hpp +++ b/src/i_signaler.hpp @@ -21,6 +21,7 @@  #define __ZMQ_I_SIGNALER_HPP_INCLUDED__  #include "stdint.hpp" +#include "fd.hpp"  namespace zmq  { @@ -42,6 +43,11 @@ namespace zmq          //  Same as poll, however, if there is no signal available,          //  function returns zero immediately instead of waiting for a signal.          virtual uint64_t check () = 0; + +        //  Returns file descriptor that allows waiting for signals. Specific +        //  signalers may not support this functionality. If so, the function +        //  returns retired_fd. +        virtual fd_t get_fd () = 0;      };  } diff --git a/src/io_object.cpp b/src/io_object.cpp index f61e5f0..0cf77fd 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -36,7 +36,7 @@ void zmq::io_object_t::set_io_thread (io_thread_t *io_thread_)      poller = io_thread_->get_poller ();  } -zmq::handle_t zmq::io_object_t::add_fd (fd_t fd_) +zmq::io_object_t::handle_t zmq::io_object_t::add_fd (fd_t fd_)  {      return poller->add_fd (fd_, this);  } diff --git a/src/io_object.hpp b/src/io_object.hpp index e5582db..2ed5e24 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -22,7 +22,7 @@  #include <stddef.h> -#include "i_poller.hpp" +#include "poller.hpp"  #include "i_poll_events.hpp"  namespace zmq @@ -41,6 +41,8 @@ namespace zmq      protected: +        typedef poller_t::handle_t handle_t; +          //  Derived class can init/swap the underlying I/O thread.          //  Caution: Remove all the file descriptors from the old I/O thread          //  before swapping to the new one! @@ -63,7 +65,7 @@ namespace zmq      private: -        struct i_poller *poller; +        poller_t *poller;          io_object_t (const io_object_t&);          void operator = (const io_object_t&); diff --git a/src/io_thread.cpp b/src/io_thread.cpp index a90876c..6d4710a 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -24,11 +24,6 @@  #include "platform.hpp"  #include "err.hpp"  #include "command.hpp" -#include "epoll.hpp" -#include "poll.hpp" -#include "select.hpp" -#include "devpoll.hpp" -#include "kqueue.hpp"  #include "dispatcher.hpp"  #include "simple_semaphore.hpp" @@ -36,39 +31,7 @@ zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_,        int flags_) :      object_t (dispatcher_, thread_slot_)  { -#if defined ZMQ_FORCE_SELECT -    poller = new select_t; -#elif defined ZMQ_FORCE_POLL -    poller = new poll_t; -#elif defined ZMQ_FORCE_EPOLL -    poller = new epoll_t; -#elif defined ZMQ_FORCE_DEVPOLL -    poller = new devpoll_t; -#elif defined ZMQ_FORCE_KQUEUE -    poller = new kqueue_t; -#elif defined ZMQ_HAVE_LINUX -    poller = new epoll_t; -#elif defined ZMQ_HAVE_WINDOWS -    poller = new select_t; -#elif defined ZMQ_HAVE_FREEBSD -    poller = new kqueue_t; -#elif defined ZMQ_HAVE_OPENBSD -    poller = new kqueue_t; -#elif defined ZMQ_HAVE_SOLARIS -    poller = new devpoll_t; -#elif defined ZMQ_HAVE_OSX -    poller = new kqueue_t; -#elif defined ZMQ_HAVE_QNXNTO -    poller = new poll_t; -#elif defined ZMQ_HAVE_AIX -    poller = new poll_t; -#elif defined ZMQ_HAVE_HPUX -    poller = new devpoll_t; -#elif defined ZMQ_HAVE_OPENVMS -    poller = new select_t; -#else -#error Unsupported platform -#endif +    poller = new poller_t;      zmq_assert (poller);      signaler_handle = poller->add_fd (signaler.get_fd (), this); @@ -134,7 +97,7 @@ void zmq::io_thread_t::timer_event ()      zmq_assert (false);  } -zmq::i_poller *zmq::io_thread_t::get_poller () +zmq::poller_t *zmq::io_thread_t::get_poller ()  {      zmq_assert (poller);      return poller; diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 4015b0c..457cdbf 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -23,7 +23,7 @@  #include <vector>  #include "object.hpp" -#include "i_poller.hpp" +#include "poller.hpp"  #include "i_poll_events.hpp"  #include "fd_signaler.hpp" @@ -59,7 +59,7 @@ namespace zmq          void timer_event ();          //  Used by io_objects to retrieve the assciated poller object. -        struct i_poller *get_poller (); +        poller_t *get_poller ();          //  Command handlers.          void process_stop (); @@ -74,10 +74,10 @@ namespace zmq          fd_signaler_t signaler;          //  Handle associated with signaler's file descriptor. -        handle_t signaler_handle; +        poller_t::handle_t signaler_handle;          //  I/O multiplexing is performed using a poller object. -        i_poller *poller; +        poller_t *poller;      };  } diff --git a/src/kqueue.cpp b/src/kqueue.cpp index c967c73..f32fa36 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -68,7 +68,8 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)      errno_assert (rc != -1);  } -zmq::handle_t zmq::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_) +zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_, +    i_poll_events *reactor_)  {      poll_entry_t *pe = new poll_entry_t;      zmq_assert (pe != NULL); @@ -78,14 +79,12 @@ zmq::handle_t zmq::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_)      pe->flag_pollout = 0;      pe->reactor = reactor_; -    handle_t handle; -    handle.ptr = pe; -    return handle; +    return pe;  }  void zmq::kqueue_t::rm_fd (handle_t handle_)  { -    poll_entry_t *pe = (poll_entry_t*) handle_.ptr; +    poll_entry_t *pe = (poll_entry_t*) handle_;      if (pe->flag_pollin)          kevent_delete (pe->fd, EVFILT_READ);      if (pe->flag_pollout) @@ -96,28 +95,28 @@ void zmq::kqueue_t::rm_fd (handle_t handle_)  void zmq::kqueue_t::set_pollin (handle_t handle_) | 
