diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.in | 9 | ||||
-rw-r--r-- | src/fq.cpp | 7 | ||||
-rw-r--r-- | src/req.cpp | 2 | ||||
-rw-r--r-- | src/socket_base.cpp | 22 | ||||
-rw-r--r-- | src/tcp_listener.cpp | 3 | ||||
-rw-r--r-- | src/thread.cpp | 2 | ||||
-rw-r--r-- | src/xrep.cpp | 2 | ||||
-rw-r--r-- | src/zmq.cpp | 40 |
8 files changed, 65 insertions, 22 deletions
diff --git a/src/Makefile.in b/src/Makefile.in index a052a08..78e3379 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -214,6 +214,8 @@ OBJDUMP = @OBJDUMP@ OBJEXT = @OBJEXT@ OTOOL = @OTOOL@ OTOOL64 = @OTOOL64@ +OpenPGM_CFLAGS = @OpenPGM_CFLAGS@ +OpenPGM_LIBS = @OpenPGM_LIBS@ PACKAGE = @PACKAGE@ PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@ PACKAGE_NAME = @PACKAGE_NAME@ @@ -222,6 +224,9 @@ PACKAGE_TARNAME = @PACKAGE_TARNAME@ PACKAGE_URL = @PACKAGE_URL@ PACKAGE_VERSION = @PACKAGE_VERSION@ PATH_SEPARATOR = @PATH_SEPARATOR@ +PKG_CONFIG = @PKG_CONFIG@ +PKG_CONFIG_LIBDIR = @PKG_CONFIG_LIBDIR@ +PKG_CONFIG_PATH = @PKG_CONFIG_PATH@ RANLIB = @RANLIB@ SED = @SED@ SET_MAKE = @SET_MAKE@ @@ -236,8 +241,6 @@ abs_top_srcdir = @abs_top_srcdir@ ac_ct_CC = @ac_ct_CC@ ac_ct_CXX = @ac_ct_CXX@ ac_ct_DUMPBIN = @ac_ct_DUMPBIN@ -ac_zmq_have_asciidoc = @ac_zmq_have_asciidoc@ -ac_zmq_have_xmlto = @ac_zmq_have_xmlto@ am__include = @am__include@ am__leading_dot = @am__leading_dot@ am__quote = @am__quote@ @@ -266,6 +269,8 @@ infodir = @infodir@ install_sh = @install_sh@ libdir = @libdir@ libexecdir = @libexecdir@ +libzmq_have_asciidoc = @libzmq_have_asciidoc@ +libzmq_have_xmlto = @libzmq_have_xmlto@ localedir = @localedir@ localstatedir = @localstatedir@ lt_ECHO = @lt_ECHO@ @@ -56,10 +56,9 @@ void zmq::fq_t::attach (reader_t *pipe_) void zmq::fq_t::terminated (reader_t *pipe_) { - // TODO: This is a problem with session-initiated termination. It breaks - // message atomicity. However, for socket initiated termination it's - // just fine. - zmq_assert (!more || pipes [current] != pipe_); + // Make sure that we are not closing current pipe while + // message is half-read. + zmq_assert (terminating || (!more || pipes [current] != pipe_)); // Remove the pipe from the list; adjust number of active pipes // accordingly. diff --git a/src/req.cpp b/src/req.cpp index f495492..503f221 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -49,7 +49,7 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) zmq_msg_t prefix; int rc = zmq_msg_init (&prefix); zmq_assert (rc == 0); - prefix.flags = ZMQ_MSG_MORE; + prefix.flags |= ZMQ_MSG_MORE; rc = xreq_t::xsend (&prefix, flags_); if (rc != 0) return rc; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 4cefb6f..4317bb0 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -87,7 +87,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, break; case ZMQ_XREP: s = new (std::nothrow) xrep_t (parent_, tid_); - break; + break; case ZMQ_PULL: s = new (std::nothrow) pull_t (parent_, tid_); break; @@ -99,7 +99,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, break; case ZMQ_XSUB: s = new (std::nothrow) xsub_t (parent_, tid_); - break; + break; default: errno = EINVAL; return NULL; @@ -334,7 +334,7 @@ int zmq::socket_base_t::bind (const char *addr_) // For convenience's sake, bind can be used interchageable with // connect for PGM and EPGM transports. - return connect (addr_); + return connect (addr_); } zmq_assert (false); @@ -458,11 +458,18 @@ int zmq::socket_base_t::connect (const char *addr_) int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) { + // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { errno = ETERM; return -1; } + // Check whether message passed to the function is valid. + if (unlikely ((msg_->flags | ZMQ_MSG_MASK) != 0xff)) { + errno = EFAULT; + return -1; + } + // Process pending commands, if any. int rc = process_commands (false, true); if (unlikely (rc != 0)) @@ -496,11 +503,18 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) { + // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { errno = ETERM; return -1; } + // Check whether message passed to the function is valid. + if (unlikely ((msg_->flags | ZMQ_MSG_MASK) != 0xff)) { + errno = EFAULT; + return -1; + } + // Get the message. int rc = xrecv (msg_, flags_); int err = errno; @@ -622,7 +636,7 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_) session->inc_seqnum (); sessions_sync.unlock (); - return session; + return session; } void zmq::socket_base_t::start_reaping (poller_t *poller_) diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 205ddc1..8de564f 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -310,7 +310,8 @@ zmq::fd_t zmq::tcp_listener_t::accept () #if (defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD || \ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_OSX || \ - defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_NETBSD) + defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_NETBSD || \ + defined ZMQ_HAVE_CYGWIN) if (sock == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR || errno == ECONNABORTED)) diff --git a/src/thread.cpp b/src/thread.cpp index 12a72e2..7bf9df0 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -47,6 +47,8 @@ void zmq::thread_t::stop () { DWORD rc = WaitForSingleObject (descriptor, INFINITE); win_assert (rc != WAIT_FAILED); + BOOL rc2 = CloseHandle (descriptor); + win_assert (rc2 != 0); } #else diff --git a/src/xrep.cpp b/src/xrep.cpp index 7f0da4d..75dc30e 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -269,7 +269,7 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) zmq_assert (rc == 0); memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (), zmq_msg_size (msg_)); - msg_->flags = ZMQ_MSG_MORE; + msg_->flags |= ZMQ_MSG_MORE; return 0; } diff --git a/src/zmq.cpp b/src/zmq.cpp index 929e51c..61f942d 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -81,7 +81,7 @@ const char *zmq_strerror (int errnum_) int zmq_msg_init (zmq_msg_t *msg_) { msg_->content = (zmq::msg_content_t*) ZMQ_VSM; - msg_->flags = 0; + msg_->flags = (unsigned char) ~ZMQ_MSG_MASK; msg_->vsm_size = 0; return 0; } @@ -90,7 +90,7 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) { if (size_ <= ZMQ_MAX_VSM_SIZE) { msg_->content = (zmq::msg_content_t*) ZMQ_VSM; - msg_->flags = 0; + msg_->flags = (unsigned char) ~ZMQ_MSG_MASK; msg_->vsm_size = (uint8_t) size_; } else { @@ -100,8 +100,8 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) errno = ENOMEM; return -1; } - msg_->flags = 0; - + msg_->flags = (unsigned char) ~ZMQ_MSG_MASK; + zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; content->data = (void*) (content + 1); content->size = size_; @@ -117,7 +117,7 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, { msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t)); alloc_assert (msg_->content); - msg_->flags = 0; + msg_->flags = (unsigned char) ~ZMQ_MSG_MASK; zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content; content->data = data_; content->size = size_; @@ -129,6 +129,12 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, int zmq_msg_close (zmq_msg_t *msg_) { + // Check the validity tag. + if (unlikely (msg_->flags | ZMQ_MSG_MASK) != 0xff) { + errno = EFAULT; + return -1; + } + // For VSMs and delimiters there are no resources to free. if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER && msg_->content != (zmq::msg_content_t*) ZMQ_VSM) { @@ -148,17 +154,22 @@ int zmq_msg_close (zmq_msg_t *msg_) } } - // As a safety measure, let's make the deallocated message look like - // an empty message. - msg_->content = (zmq::msg_content_t*) ZMQ_VSM; + // Remove the validity tag from the message. msg_->flags = 0; - msg_->vsm_size = 0; return 0; } int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_) { +#if 0 + // Check the validity tags. + if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff || + (src_->flags | ZMQ_MSG_MASK) != 0xff)) { + errno = EFAULT; + return -1; + } +#endif zmq_msg_close (dest_); *dest_ = *src_; zmq_msg_init (src_); @@ -167,6 +178,13 @@ int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_) int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) { + // Check the validity tags. + if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff || + (src_->flags | ZMQ_MSG_MASK) != 0xff)) { + errno = EFAULT; + return -1; + } + zmq_msg_close (dest_); // VSMs and delimiters require no special handling. @@ -190,6 +208,8 @@ int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) void *zmq_msg_data (zmq_msg_t *msg_) { + zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff); + if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) return msg_->vsm_data; if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) @@ -200,6 +220,8 @@ void *zmq_msg_data (zmq_msg_t *msg_) size_t zmq_msg_size (zmq_msg_t *msg_) { + zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff); + if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) return msg_->vsm_size; if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) |