From ae567be0c295d9c18da5ba4be4f8403cc844a9a3 Mon Sep 17 00:00:00 2001 From: Ivo Danihelka Date: Thu, 26 Aug 2010 12:14:53 +0200 Subject: improved null checking in zmq_term --- AUTHORS | 1 + src/zmq.cpp | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/AUTHORS b/AUTHORS index 0256523..1279d7f 100644 --- a/AUTHORS +++ b/AUTHORS @@ -16,6 +16,7 @@ Erik Rigtorp Frank Denis George Neill Gonzalo Diethelm +Ivo Danihelka Joe Thornber Jon Dyte Kamil Shakirov diff --git a/src/zmq.cpp b/src/zmq.cpp index 8b21d4b..f3ccaac 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -272,13 +272,14 @@ void *zmq_init (int io_threads_) int zmq_term (void *ctx_) { - int rc = ((zmq::ctx_t*) ctx_)->term (); - int en = errno; - if (!ctx_) { errno = EFAULT; return -1; } + + int rc = ((zmq::ctx_t*) ctx_)->term (); + int en = errno; + #if defined ZMQ_HAVE_OPENPGM // Shut down the OpenPGM library. if (pgm_shutdown () != TRUE) -- cgit v1.2.3 From c2f3b3b4458187085e148850068f9719c2567614 Mon Sep 17 00:00:00 2001 From: Jon Dyte Date: Fri, 27 Aug 2010 06:59:55 +0200 Subject: forwarder and streamer devices handle multi-part messages correctly --- src/forwarder.cpp | 17 ++++++++++++++--- src/queue.cpp | 15 ++++++++------- src/streamer.cpp | 17 ++++++++++++++--- 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/src/forwarder.cpp b/src/forwarder.cpp index 503868b..d1f324e 100644 --- a/src/forwarder.cpp +++ b/src/forwarder.cpp @@ -21,6 +21,7 @@ #include "forwarder.hpp" #include "socket_base.hpp" +#include "likely.hpp" #include "err.hpp" int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_) @@ -29,16 +30,26 @@ int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_) int rc = zmq_msg_init (&msg); errno_assert (rc == 0); + int64_t more; + size_t more_sz = sizeof (more); + while (true) { rc = insocket_->recv (&msg, 0); - if (rc < 0) { + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz); + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); } - rc = outsocket_->send (&msg, 0); - if (rc < 0) { + rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); diff --git a/src/queue.cpp b/src/queue.cpp index 311a8c1..36fab07 100644 --- a/src/queue.cpp +++ b/src/queue.cpp @@ -23,6 +23,7 @@ #include "queue.hpp" #include "socket_base.hpp" +#include "likely.hpp" #include "err.hpp" int zmq::queue (class socket_base_t *insocket_, @@ -49,7 +50,7 @@ int zmq::queue (class socket_base_t *insocket_, // Wait while there are either requests or replies to process. rc = zmq_poll (&items [0], 2, -1); - if (rc < 0) { + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); @@ -65,7 +66,7 @@ int zmq::queue (class socket_base_t *insocket_, while (true) { rc = insocket_->recv (&msg, 0); - if (rc < 0) { + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); @@ -73,14 +74,14 @@ int zmq::queue (class socket_base_t *insocket_, moresz = sizeof (more); rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); - if (rc < 0) { + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); } rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); - if (rc < 0) { + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); @@ -96,7 +97,7 @@ int zmq::queue (class socket_base_t *insocket_, while (true) { rc = outsocket_->recv (&msg, 0); - if (rc < 0) { + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); @@ -104,14 +105,14 @@ int zmq::queue (class socket_base_t *insocket_, moresz = sizeof (more); rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); - if (rc < 0) { + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); } rc = insocket_->send (&msg, more ? ZMQ_SNDMORE : 0); - if (rc < 0) { + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); diff --git a/src/streamer.cpp b/src/streamer.cpp index 9799007..7c03365 100644 --- a/src/streamer.cpp +++ b/src/streamer.cpp @@ -21,6 +21,7 @@ #include "streamer.hpp" #include "socket_base.hpp" +#include "likely.hpp" #include "err.hpp" int zmq::streamer (socket_base_t *insocket_, socket_base_t *outsocket_) @@ -29,16 +30,26 @@ int zmq::streamer (socket_base_t *insocket_, socket_base_t *outsocket_) int rc = zmq_msg_init (&msg); errno_assert (rc == 0); + int64_t more; + size_t more_sz = sizeof (more); + while (true) { rc = insocket_->recv (&msg, 0); - if (rc < 0) { + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz); + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); } - rc = outsocket_->send (&msg, 0); - if (rc < 0) { + rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); + if (unlikely (rc < 0)) { if (errno == ETERM) return -1; errno_assert (false); -- cgit v1.2.3 From 10227899b1ccdaecf709fbfc7b765e147baf3080 Mon Sep 17 00:00:00 2001 From: Dhammika Pathirana Date: Fri, 27 Aug 2010 18:06:37 +0200 Subject: assert on malformed messages --- src/zmq_decoder.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index 8e335c9..dcf8e76 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -56,6 +56,9 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () // TODO: Handle over-sized message decently. + // There has to be at least one byte (the flags) in the message). + zmq_assert (*tmpbuf > 0); + // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... @@ -74,6 +77,10 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () // TODO: Handle over-sized message decently. + // There has to be at least one byte (the flags) in the message). + zmq_assert (size > 0); + + // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... -- cgit v1.2.3 From db7fe858d6356988fb9a9270c235178e523b2370 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 4 Sep 2010 17:12:08 +0200 Subject: Broken device numbering reverted --- include/zmq.h | 64 +++++++++++++++++++++++++++++------------------------------ 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/include/zmq.h b/include/zmq.h index d3a6332..90a73c2 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -152,36 +152,35 @@ ZMQ_EXPORT int zmq_term (void *context); /******************************************************************************/ /* Socket types. */ -#define ZMQ_PAIR 0 -#define ZMQ_PUB 1 -#define ZMQ_SUB 2 -#define ZMQ_REQ 3 -#define ZMQ_REP 4 -#define ZMQ_XREQ 5 -#define ZMQ_XREP 6 -#define ZMQ_PULL 7 -#define ZMQ_PUSH 8 -#define ZMQ_UPSTREAM ZMQ_PULL /* Old alias, remove in 3.x */ -#define ZMQ_DOWNSTREAM ZMQ_PUSH /* Old alias, remove in 3.x */ +#define ZMQ_PAIR 0 +#define ZMQ_PUB 1 +#define ZMQ_SUB 2 +#define ZMQ_REQ 3 +#define ZMQ_REP 4 +#define ZMQ_XREQ 5 +#define ZMQ_XREP 6 +#define ZMQ_PULL 7 +#define ZMQ_PUSH 8 +#define ZMQ_UPSTREAM ZMQ_PULL /* Old alias, remove in 3.x */ +#define ZMQ_DOWNSTREAM ZMQ_PUSH /* Old alias, remove in 3.x */ /* Socket options. */ -#define ZMQ_HWM 1 -/* ZMQ_LWM 2 no longer supported */ -#define ZMQ_SWAP 3 -#define ZMQ_AFFINITY 4 -#define ZMQ_IDENTITY 5 -#define ZMQ_SUBSCRIBE 6 -#define ZMQ_UNSUBSCRIBE 7 -#define ZMQ_RATE 8 +#define ZMQ_HWM 1 +#define ZMQ_SWAP 3 +#define ZMQ_AFFINITY 4 +#define ZMQ_IDENTITY 5 +#define ZMQ_SUBSCRIBE 6 +#define ZMQ_UNSUBSCRIBE 7 +#define ZMQ_RATE 8 #define ZMQ_RECOVERY_IVL 9 -#define ZMQ_MCAST_LOOP 10 -#define ZMQ_SNDBUF 11 -#define ZMQ_RCVBUF 12 -#define ZMQ_RCVMORE 13 +#define ZMQ_MCAST_LOOP 10 +#define ZMQ_SNDBUF 11 +#define ZMQ_RCVBUF 12 +#define ZMQ_RCVMORE 13 /* Send/recv options. */ -#define ZMQ_NOBLOCK 1 -#define ZMQ_SNDMORE 2 +#define ZMQ_NOBLOCK 1 +#define ZMQ_SNDMORE 2 ZMQ_EXPORT void *zmq_socket (void *context, int type); ZMQ_EXPORT int zmq_close (void *s); @@ -198,9 +197,9 @@ ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags); /* I/O multiplexing. */ /******************************************************************************/ -#define ZMQ_POLLIN 1 -#define ZMQ_POLLOUT 2 -#define ZMQ_POLLERR 4 +#define ZMQ_POLLIN 1 +#define ZMQ_POLLOUT 2 +#define ZMQ_POLLERR 4 typedef struct { @@ -217,12 +216,12 @@ typedef struct ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); /******************************************************************************/ -/* Devices */ +/* Devices - Experimental. */ /******************************************************************************/ -#define ZMQ_QUEUE 1 -#define ZMQ_FORWARDER 2 -#define ZMQ_STREAMER 3 +#define ZMQ_STREAMER 1 +#define ZMQ_FORWARDER 2 +#define ZMQ_QUEUE 3 ZMQ_EXPORT int zmq_device (int device, void * insocket, void* outsocket); @@ -233,3 +232,4 @@ ZMQ_EXPORT int zmq_device (int device, void * insocket, void* outsocket); #endif #endif + -- cgit v1.2.3