diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2010-02-19 15:25:05 +0100 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-02-19 15:25:05 +0100 |
commit | aff1f6621ae13083c7f15f7f1f808560254a2dcb (patch) | |
tree | 9e73dd8bd5d38a21e0aa83b4d4bea55b7792b39e /src | |
parent | 75f571c8844231f4172f131e1dd6ba2348eb54e5 (diff) | |
parent | 2a79a943de417679c562cd4a917e1d1bc19b0d25 (diff) |
Merge branch 'master' of git@github.com:sustrik/zeromq2
Diffstat (limited to 'src')
-rw-r--r-- | src/atomic_bitmap.hpp | 16 | ||||
-rw-r--r-- | src/atomic_counter.hpp | 14 | ||||
-rw-r--r-- | src/atomic_ptr.hpp | 14 | ||||
-rw-r--r-- | src/ip.cpp | 11 | ||||
-rw-r--r-- | src/ip.hpp | 10 | ||||
-rw-r--r-- | src/kqueue.cpp | 17 | ||||
-rw-r--r-- | src/kqueue.hpp | 3 | ||||
-rw-r--r-- | src/poll.cpp | 3 | ||||
-rw-r--r-- | src/poll.hpp | 3 | ||||
-rw-r--r-- | src/poller.hpp | 2 | ||||
-rw-r--r-- | src/rep.cpp | 22 | ||||
-rw-r--r-- | src/socket_base.cpp | 28 | ||||
-rw-r--r-- | src/tcp_listener.cpp | 2 | ||||
-rw-r--r-- | src/uuid.cpp | 2 | ||||
-rw-r--r-- | src/uuid.hpp | 4 | ||||
-rw-r--r-- | src/zmq.cpp | 6 |
16 files changed, 93 insertions, 64 deletions
diff --git a/src/atomic_bitmap.hpp b/src/atomic_bitmap.hpp index 6620f06..1aefd27 100644 --- a/src/atomic_bitmap.hpp +++ b/src/atomic_bitmap.hpp @@ -34,8 +34,8 @@ #define ZMQ_ATOMIC_BITMAP_SPARC #elif defined ZMQ_HAVE_WINDOWS #define ZMQ_ATOMIC_BITMAP_WINDOWS -#elif defined ZMQ_HAVE_SOLARIS -#define ZMQ_ATOMIC_BITMAP_SOLARIS +#elif (defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_NETBSD) +#define ZMQ_ATOMIC_BITMAP_SYSTEM #else #define ZMQ_ATOMIC_BITMAP_MUTEX #endif @@ -44,7 +44,7 @@ #include "mutex.hpp" #elif defined ZMQ_ATOMIC_BITMAP_WINDOWS #include "windows.hpp" -#elif defined ZMQ_ATOMIC_BITMAP_SOLARIS +#elif defined ZMQ_ATOMIC_BITMAP_SYSTEM #include <atomic.h> #endif @@ -89,7 +89,7 @@ namespace zmq return (oldval & (bitmap_t (1) << reset_index_)) ? true : false; } -#elif defined ZMQ_ATOMIC_BITMAP_SOLARIS +#elif defined ZMQ_ATOMIC_BITMAP_SYSTEM while (true) { bitmap_t oldval = value; bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) & @@ -150,7 +150,7 @@ namespace zmq bitmap_t oldval; #if defined ZMQ_ATOMIC_BITMAP_WINDOWS oldval = InterlockedExchange ((volatile LONG*) &value, newval_); -#elif defined ZMQ_ATOMIC_BITMAP_SOLARIS +#elif defined ZMQ_ATOMIC_BITMAP_SYSTEM oldval = atomic_swap_32 (&value, newval_); #elif defined ZMQ_ATOMIC_BITMAP_X86 oldval = newval_; @@ -201,7 +201,7 @@ namespace zmq newval, oldval) == (LONG) oldval) return oldval; } -#elif defined ZMQ_ATOMIC_BITMAP_SOLARIS +#elif defined ZMQ_ATOMIC_BITMAP_SYSTEM while (true) { bitmap_t oldval = value; bitmap_t newval = oldval == 0 ? thenval_ : elseval_; @@ -270,8 +270,8 @@ namespace zmq #if defined ZMQ_ATOMIC_BITMAP_WINDOWS #undef ZMQ_ATOMIC_BITMAP_WINDOWS #endif -#if defined ZMQ_ATOMIC_BITMAP_SOLARIS -#undef ZMQ_ATOMIC_BITMAP_SOLARIS +#if defined ZMQ_ATOMIC_BITMAP_SYSTEM +#undef ZMQ_ATOMIC_BITMAP_SYSTEM #endif #if defined ZMQ_ATOMIC_BITMAP_X86 #undef ZMQ_ATOMIC_BITMAP_X86 diff --git a/src/atomic_counter.hpp b/src/atomic_counter.hpp index 649fdbf..4a77928 100644 --- a/src/atomic_counter.hpp +++ b/src/atomic_counter.hpp @@ -31,8 +31,8 @@ #define ZMQ_ATOMIC_COUNTER_SPARC #elif defined ZMQ_HAVE_WINDOWS #define ZMQ_ATOMIC_COUNTER_WINDOWS -#elif defined ZMQ_HAVE_SOLARIS -#define ZMQ_ATOMIC_COUNTER_SOLARIS +#elif (defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_NETBSD) +#define ZMQ_ATOMIC_COUNTER_SYSTEM #else #define ZMQ_ATOMIC_COUNTER_MUTEX #endif @@ -41,7 +41,7 @@ #include "mutex.hpp" #elif defined ZMQ_ATOMIC_COUNTER_WINDOWS #include "windows.hpp" -#elif defined ZMQ_ATOMIC_COUNTER_SOLARIS +#elif defined ZMQ_ATOMIC_COUNTER_SYSTEM #include <atomic.h> #endif @@ -79,7 +79,7 @@ namespace zmq #if defined ZMQ_ATOMIC_COUNTER_WINDOWS old_value = InterlockedExchangeAdd ((LONG*) &value, increment_); -#elif defined ZMQ_ATOMIC_COUNTER_SOLARIS +#elif defined ZMQ_ATOMIC_COUNTER_SYSTEM integer_t new_value = atomic_add_32_nv (&value, increment_); old_value = new_value - increment_; #elif defined ZMQ_ATOMIC_COUNTER_X86 @@ -119,7 +119,7 @@ namespace zmq LONG delta = - ((LONG) decrement); integer_t old = InterlockedExchangeAdd ((LONG*) &value, delta); return old - decrement != 0; -#elif defined ZMQ_ATOMIC_COUNTER_SOLARIS +#elif defined ZMQ_ATOMIC_COUNTER_SYSTEM int32_t delta = - ((int32_t) decrement); integer_t nv = atomic_add_32_nv (&value, delta); return nv != 0; @@ -180,8 +180,8 @@ namespace zmq #if defined ZMQ_ATOMIC_COUNTER_WINDOWS #undef ZMQ_ATOMIC_COUNTER_WINDOWS #endif -#if defined ZMQ_ATOMIC_COUNTER_SOLARIS -#undef ZMQ_ATOMIC_COUNTER_SOLARIS +#if defined ZMQ_ATOMIC_COUNTER_SYSTEM +#undef ZMQ_ATOMIC_COUNTER_SYSTEM #endif #if defined ZMQ_ATOMIC_COUNTER_X86 #undef ZMQ_ATOMIC_COUNTER_X86 diff --git a/src/atomic_ptr.hpp b/src/atomic_ptr.hpp index 3735e99..e4a7491 100644 --- a/src/atomic_ptr.hpp +++ b/src/atomic_ptr.hpp @@ -31,8 +31,8 @@ #define ZMQ_ATOMIC_PTR_SPARC #elif defined ZMQ_HAVE_WINDOWS #define ZMQ_ATOMIC_PTR_WINDOWS -#elif defined ZMQ_HAVE_SOLARIS -#define ZMQ_ATOMIC_PTR_SOLARIS +#elif (defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_NETBSD) +#define ZMQ_ATOMIC_PTR_SYSTEM #else #define ZMQ_ATOMIC_PTR_MUTEX #endif @@ -41,7 +41,7 @@ #include "mutex.hpp" #elif defined ZMQ_ATOMIC_PTR_WINDOWS #include "windows.hpp" -#elif defined ZMQ_ATOMIC_PTR_SOLARIS +#elif defined ZMQ_ATOMIC_PTR_SYSTEM #include <atomic.h> #endif @@ -79,7 +79,7 @@ namespace zmq { #if defined ZMQ_ATOMIC_PTR_WINDOWS return (T*) InterlockedExchangePointer (&ptr, val_); -#elif defined ZMQ_ATOMIC_PTR_SOLARIS +#elif defined ZMQ_ATOMIC_PTR_SYSTEM return (T*) atomic_swap_ptr (&ptr, val_); #elif defined ZMQ_ATOMIC_PTR_X86 T *old; @@ -125,7 +125,7 @@ namespace zmq #if defined ZMQ_ATOMIC_PTR_WINDOWS return (T*) InterlockedCompareExchangePointer ( (volatile PVOID*) &ptr, val_, cmp_); -#elif defined ZMQ_ATOMIC_PTR_SOLARIS +#elif defined ZMQ_ATOMIC_PTR_SYSTEM return (T*) atomic_cas_ptr (&ptr, cmp_, val_); #elif defined ZMQ_ATOMIC_PTR_X86 T *old; @@ -173,8 +173,8 @@ namespace zmq #if defined ZMQ_ATOMIC_PTR_WINDOWS #undef ZMQ_ATOMIC_PTR_WINDOWS #endif -#if defined ZMQ_ATOMIC_PTR_SOLARIS -#undef ZMQ_ATOMIC_PTR_SOLARIS +#if defined ZMQ_ATOMIC_PTR_SYSTEM +#undef ZMQ_ATOMIC_PTR_SYSTEM #endif #if defined ZMQ_ATOMIC_PTR_X86 #undef ZMQ_ATOMIC_PTR_X86 @@ -125,7 +125,8 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_) #elif ((defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENBSD ||\ - defined ZMQ_HAVE_QNXNTO) && defined ZMQ_HAVE_IFADDRS) + defined ZMQ_HAVE_QNXNTO || defined ZMQ_HAVE_NETBSD)\ + && defined ZMQ_HAVE_IFADDRS) #include <ifaddrs.h> @@ -239,11 +240,7 @@ int zmq::resolve_ip_interface (sockaddr_storage* addr_, socklen_t *addr_len_, // Restrict hostname/service to literals to avoid any DNS lookups or // service-name irregularity due to indeterminate socktype. -#if defined ZMQ_HAVE_OSX - req.ai_flags = AI_PASSIVE | AI_NUMERICHOST; -#else req.ai_flags = AI_PASSIVE | AI_NUMERICHOST | AI_NUMERICSERV; -#endif // Resolve the literal address. Some of the error info is lost in case // of error, however, there's no way to report EAI errors via errno. @@ -292,11 +289,7 @@ int zmq::resolve_ip_hostname (sockaddr_storage *addr_, socklen_t *addr_len_, // Avoid named services due to unclear socktype, and don't pick IPv6 // addresses if we don't have a local IPv6 address configured. -#if defined ZMQ_HAVE_OSX - req.ai_flags = AI_ADDRCONFIG; -#else req.ai_flags = AI_NUMERICSERV | AI_ADDRCONFIG; -#endif // Resolve host name. Some of the error info is lost in case of error, // however, there's no way to report EAI errors via errno. @@ -30,6 +30,16 @@ #include <arpa/inet.h> #include <netinet/in.h> #include <netdb.h> + +// Some platforms (notably Darwin/OSX and NetBSD) do not define all AI_ +// flags for getaddrinfo(). This can be worked around safely by defining +// these to 0. +#ifndef AI_ADDRCONFIG +#define AI_ADDRCONFIG 0 +#endif +#ifndef AI_NUMERICSERV +#define AI_NUMERICSERV 0 +#endif #endif #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS diff --git a/src/kqueue.cpp b/src/kqueue.cpp index bf9c8a2..e1fe2fa 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -19,7 +19,8 @@ #include "platform.hpp" -#if defined ZMQ_HAVE_FREEBSD || defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_OSX +#if defined ZMQ_HAVE_FREEBSD || defined ZMQ_HAVE_OPENBSD ||\ + defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_NETBSD #include <sys/time.h> #include <sys/types.h> @@ -34,6 +35,14 @@ #include "config.hpp" #include "i_poll_events.hpp" +// NetBSD defines (struct kevent).udata as intptr_t, everyone else +// as void *. +#if defined ZMQ_HAVE_NETBSD +#define kevent_udata_t intptr_t +#else +#define kevent_udata_t void * +#endif + zmq::kqueue_t::kqueue_t () : stopping (false) { @@ -56,7 +65,7 @@ void zmq::kqueue_t::kevent_add (fd_t fd_, short filter_, void *udata_) { struct kevent ev; - EV_SET (&ev, fd_, filter_, EV_ADD, 0, 0, udata_); + EV_SET (&ev, fd_, filter_, EV_ADD, 0, 0, (kevent_udata_t)udata_); int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL); errno_assert (rc != -1); } @@ -65,7 +74,7 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_) { struct kevent ev; - EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, NULL); + EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, (kevent_udata_t)NULL); int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL); errno_assert (rc != -1); } @@ -212,4 +221,6 @@ void zmq::kqueue_t::worker_routine (void *arg_) ((kqueue_t*) arg_)->loop (); } +// Don't pollute namespace with defines local to this file +#undef kevent_udata_t #endif diff --git a/src/kqueue.hpp b/src/kqueue.hpp index d2dd09a..ac28a7d 100644 --- a/src/kqueue.hpp +++ b/src/kqueue.hpp @@ -22,7 +22,8 @@ #include "platform.hpp" -#if defined ZMQ_HAVE_FREEBSD || defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_OSX +#if defined ZMQ_HAVE_FREEBSD || defined ZMQ_HAVE_OPENBSD ||\ + defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_NETBSD #include <vector> diff --git a/src/poll.cpp b/src/poll.cpp index 889ea4a..4214195 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -22,7 +22,8 @@ #if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ - defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX + defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\ + defined ZMQ_HAVE_NETBSD #include <sys/types.h> #include <sys/time.h> diff --git a/src/poll.hpp b/src/poll.hpp index 5b8e745..f4ae35a 100644 --- a/src/poll.hpp +++ b/src/poll.hpp @@ -25,7 +25,8 @@ #if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ - defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX + defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\ + defined ZMQ_HAVE_NETBSD #include <poll.h> #include <stddef.h> diff --git a/src/poller.hpp b/src/poller.hpp index b769283..49d466b 100644 --- a/src/poller.hpp +++ b/src/poller.hpp @@ -47,6 +47,8 @@ namespace zmq typedef kqueue_t poller_t; #elif defined ZMQ_HAVE_OPENBSD typedef kqueue_t poller_t; +#elif defined ZMQ_HAVE_NETBSD + typedef kqueue_t poller_t; #elif defined ZMQ_HAVE_SOLARIS typedef devpoll_t poller_t; #elif defined ZMQ_HAVE_OSX diff --git a/src/rep.cpp b/src/rep.cpp index 968427d..755d78e 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -89,11 +89,11 @@ void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_) out_pipes_t::size_type index = out_pipes.index (pipe_); - // TODO: If the connection we've got the request from disconnects, - // there's nowhere to send the reply. DLQ? - if (waiting_for_reply && pipe_ == reply_pipe) { - zmq_assert (false); - } + // If the connection we've got the request from disconnects, + // there's nowhere to send the reply. Forget about the reply pipe. + // Once the reply is sent it will be dropped. + if (waiting_for_reply && pipe_ == reply_pipe) + reply_pipe = NULL; // If corresponding inpipe is still in place simply nullify the pointer // to the outpipe. @@ -150,9 +150,15 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) // overloads the buffer, connection should be torn down. zmq_assert (reply_pipe->check_write (zmq_msg_size (msg_))); - // Push message to the selected pipe. - reply_pipe->write (msg_); - reply_pipe->flush (); + // Push message to the selected pipe. If requester have disconnected + // in the meantime, drop the reply. + if (reply_pipe) { + reply_pipe->write (msg_); + reply_pipe->flush (); + } + else { + zmq_close (msg_); + } waiting_for_reply = false; reply_pipe = NULL; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 1607673..871f9e9 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -346,6 +346,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) { // Get the message. int rc = xrecv (msg_, flags_); + int err = errno; // Once every inbound_poll_rate messages check for signals and process // incoming commands. This happens only if we are not polling altogether @@ -364,29 +365,30 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) if (rc == 0) return 0; + // If we don't have the message, restore the original cause of the problem. + errno = err; + // If the message cannot be fetched immediately, there are two scenarios. // For non-blocking recv, commands are processed in case there's a revive // command already waiting int a command pipe. If it's not, return EAGAIN. - // In blocking scenario, commands are processed over and over again until - // we are able to fetch a message. if (flags_ & ZMQ_NOBLOCK) { if (errno != EAGAIN) return -1; app_thread->process_commands (false, false); - rc = xrecv (msg_, flags_); ticks = 0; - } - else { - while (rc != 0) { - if (errno != EAGAIN) - return -1; - app_thread->process_commands (true, false); - rc = xrecv (msg_, flags_); - ticks = 0; - } + return xrecv (msg_, flags_); } - return rc; + // In blocking scenario, commands are processed over and over again until + // we are able to fetch a message. + while (rc != 0) { + if (errno != EAGAIN) + return -1; + app_thread->process_commands (true, false); + rc = xrecv (msg_, flags_); + ticks = 0; + } + return 0; } int zmq::socket_base_t::close () diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 3d15abe..cb84715 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -275,7 +275,7 @@ zmq::fd_t zmq::tcp_listener_t::accept () #if (defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD || \ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_OSX || \ - defined ZMQ_HAVE_OPENVMS) + defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_NETBSD) if (sock == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR || errno == ECONNABORTED)) diff --git a/src/uuid.cpp b/src/uuid.cpp index 6fdb060..fa1cff4 100644 --- a/src/uuid.cpp +++ b/src/uuid.cpp @@ -47,7 +47,7 @@ const char *zmq::uuid_t::to_string () return (char*) uuid_str; } -#elif defined ZMQ_HAVE_FREEBSD +#elif defined ZMQ_HAVE_FREEBSD || defined ZMQ_HAVE_NETBSD #include <stdlib.h> #include <uuid.h> diff --git a/src/uuid.hpp b/src/uuid.hpp index f565f8d..03bb69d 100644 --- a/src/uuid.hpp +++ b/src/uuid.hpp @@ -23,7 +23,7 @@ #include "platform.hpp" #include "stdint.hpp" -#if defined ZMQ_HAVE_FREEBSD +#if defined ZMQ_HAVE_FREEBSD || defined ZMQ_HAVE_NETBSD #include <uuid.h> #elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_OSX #include <uuid/uuid.h> @@ -60,7 +60,7 @@ namespace zmq #endif ::UUID uuid; RPC_CSTR uuid_str; -#elif defined ZMQ_HAVE_FREEBSD +#elif defined ZMQ_HAVE_FREEBSD || defined ZMQ_HAVE_NETBSD ::uuid_t uuid; char *uuid_str; #elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_OSX diff --git a/src/zmq.cpp b/src/zmq.cpp index e6f1a61..4170b3f 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -37,7 +37,8 @@ #if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ - defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX + defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\ + defined ZMQ_HAVE_NETBSD #include <poll.h> #endif @@ -325,7 +326,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) #if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ - defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX + defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\ + defined ZMQ_HAVE_NETBSD pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); zmq_assert (pollfds); |