summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Lucina <martin@lucina.net>2012-01-23 08:53:45 +0100
committerMartin Lucina <martin@lucina.net>2012-01-23 08:53:45 +0100
commitcbaa7cfa93893876e4fd8794b6ea39f4d245b6b5 (patch)
treee15fcee68b93793ef5654e09c214150e9d3ce248 /src
parent8e61b98c5e2943b149c825310b24e714a6127072 (diff)
Imported Upstream version 2.1.6upstream/2.1.6
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.in9
-rw-r--r--src/fq.cpp7
-rw-r--r--src/req.cpp2
-rw-r--r--src/socket_base.cpp22
-rw-r--r--src/tcp_listener.cpp3
-rw-r--r--src/thread.cpp2
-rw-r--r--src/xrep.cpp2
-rw-r--r--src/zmq.cpp40
8 files changed, 65 insertions, 22 deletions
diff --git a/src/Makefile.in b/src/Makefile.in
index a052a08..78e3379 100644
--- a/src/Makefile.in
+++ b/src/Makefile.in
@@ -214,6 +214,8 @@ OBJDUMP = @OBJDUMP@
OBJEXT = @OBJEXT@
OTOOL = @OTOOL@
OTOOL64 = @OTOOL64@
+OpenPGM_CFLAGS = @OpenPGM_CFLAGS@
+OpenPGM_LIBS = @OpenPGM_LIBS@
PACKAGE = @PACKAGE@
PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@
PACKAGE_NAME = @PACKAGE_NAME@
@@ -222,6 +224,9 @@ PACKAGE_TARNAME = @PACKAGE_TARNAME@
PACKAGE_URL = @PACKAGE_URL@
PACKAGE_VERSION = @PACKAGE_VERSION@
PATH_SEPARATOR = @PATH_SEPARATOR@
+PKG_CONFIG = @PKG_CONFIG@
+PKG_CONFIG_LIBDIR = @PKG_CONFIG_LIBDIR@
+PKG_CONFIG_PATH = @PKG_CONFIG_PATH@
RANLIB = @RANLIB@
SED = @SED@
SET_MAKE = @SET_MAKE@
@@ -236,8 +241,6 @@ abs_top_srcdir = @abs_top_srcdir@
ac_ct_CC = @ac_ct_CC@
ac_ct_CXX = @ac_ct_CXX@
ac_ct_DUMPBIN = @ac_ct_DUMPBIN@
-ac_zmq_have_asciidoc = @ac_zmq_have_asciidoc@
-ac_zmq_have_xmlto = @ac_zmq_have_xmlto@
am__include = @am__include@
am__leading_dot = @am__leading_dot@
am__quote = @am__quote@
@@ -266,6 +269,8 @@ infodir = @infodir@
install_sh = @install_sh@
libdir = @libdir@
libexecdir = @libexecdir@
+libzmq_have_asciidoc = @libzmq_have_asciidoc@
+libzmq_have_xmlto = @libzmq_have_xmlto@
localedir = @localedir@
localstatedir = @localstatedir@
lt_ECHO = @lt_ECHO@
diff --git a/src/fq.cpp b/src/fq.cpp
index 36fd435..a1952d8 100644
--- a/src/fq.cpp
+++ b/src/fq.cpp
@@ -56,10 +56,9 @@ void zmq::fq_t::attach (reader_t *pipe_)
void zmq::fq_t::terminated (reader_t *pipe_)
{
- // TODO: This is a problem with session-initiated termination. It breaks
- // message atomicity. However, for socket initiated termination it's
- // just fine.
- zmq_assert (!more || pipes [current] != pipe_);
+ // Make sure that we are not closing current pipe while
+ // message is half-read.
+ zmq_assert (terminating || (!more || pipes [current] != pipe_));
// Remove the pipe from the list; adjust number of active pipes
// accordingly.
diff --git a/src/req.cpp b/src/req.cpp
index f495492..503f221 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -49,7 +49,7 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
zmq_msg_t prefix;
int rc = zmq_msg_init (&prefix);
zmq_assert (rc == 0);
- prefix.flags = ZMQ_MSG_MORE;
+ prefix.flags |= ZMQ_MSG_MORE;
rc = xreq_t::xsend (&prefix, flags_);
if (rc != 0)
return rc;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 4cefb6f..4317bb0 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -87,7 +87,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
break;
case ZMQ_XREP:
s = new (std::nothrow) xrep_t (parent_, tid_);
- break;
+ break;
case ZMQ_PULL:
s = new (std::nothrow) pull_t (parent_, tid_);
break;
@@ -99,7 +99,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
break;
case ZMQ_XSUB:
s = new (std::nothrow) xsub_t (parent_, tid_);
- break;
+ break;
default:
errno = EINVAL;
return NULL;
@@ -334,7 +334,7 @@ int zmq::socket_base_t::bind (const char *addr_)
// For convenience's sake, bind can be used interchageable with
// connect for PGM and EPGM transports.
- return connect (addr_);
+ return connect (addr_);
}
zmq_assert (false);
@@ -458,11 +458,18 @@ int zmq::socket_base_t::connect (const char *addr_)
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
{
+ // Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
errno = ETERM;
return -1;
}
+ // Check whether message passed to the function is valid.
+ if (unlikely ((msg_->flags | ZMQ_MSG_MASK) != 0xff)) {
+ errno = EFAULT;
+ return -1;
+ }
+
// Process pending commands, if any.
int rc = process_commands (false, true);
if (unlikely (rc != 0))
@@ -496,11 +503,18 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
{
+ // Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
errno = ETERM;
return -1;
}
+ // Check whether message passed to the function is valid.
+ if (unlikely ((msg_->flags | ZMQ_MSG_MASK) != 0xff)) {
+ errno = EFAULT;
+ return -1;
+ }
+
// Get the message.
int rc = xrecv (msg_, flags_);
int err = errno;
@@ -622,7 +636,7 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)
session->inc_seqnum ();
sessions_sync.unlock ();
- return session;
+ return session;
}
void zmq::socket_base_t::start_reaping (poller_t *poller_)
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 205ddc1..8de564f 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -310,7 +310,8 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
#if (defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD || \
defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_OSX || \
- defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_NETBSD)
+ defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_NETBSD || \
+ defined ZMQ_HAVE_CYGWIN)
if (sock == -1 &&
(errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR || errno == ECONNABORTED))
diff --git a/src/thread.cpp b/src/thread.cpp
index 12a72e2..7bf9df0 100644
--- a/src/thread.cpp
+++ b/src/thread.cpp
@@ -47,6 +47,8 @@ void zmq::thread_t::stop ()
{
DWORD rc = WaitForSingleObject (descriptor, INFINITE);
win_assert (rc != WAIT_FAILED);
+ BOOL rc2 = CloseHandle (descriptor);
+ win_assert (rc2 != 0);
}
#else
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 7f0da4d..75dc30e 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -269,7 +269,7 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
zmq_assert (rc == 0);
memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (),
zmq_msg_size (msg_));
- msg_->flags = ZMQ_MSG_MORE;
+ msg_->flags |= ZMQ_MSG_MORE;
return 0;
}
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 929e51c..61f942d 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -81,7 +81,7 @@ const char *zmq_strerror (int errnum_)
int zmq_msg_init (zmq_msg_t *msg_)
{
msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
- msg_->flags = 0;
+ msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
msg_->vsm_size = 0;
return 0;
}
@@ -90,7 +90,7 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
if (size_ <= ZMQ_MAX_VSM_SIZE) {
msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
- msg_->flags = 0;
+ msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
msg_->vsm_size = (uint8_t) size_;
}
else {
@@ -100,8 +100,8 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
errno = ENOMEM;
return -1;
}
- msg_->flags = 0;
-
+ msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
+
zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
content->data = (void*) (content + 1);
content->size = size_;
@@ -117,7 +117,7 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
{
msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));
alloc_assert (msg_->content);
- msg_->flags = 0;
+ msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;
zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
content->data = data_;
content->size = size_;
@@ -129,6 +129,12 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
int zmq_msg_close (zmq_msg_t *msg_)
{
+ // Check the validity tag.
+ if (unlikely (msg_->flags | ZMQ_MSG_MASK) != 0xff) {
+ errno = EFAULT;
+ return -1;
+ }
+
// For VSMs and delimiters there are no resources to free.
if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
msg_->content != (zmq::msg_content_t*) ZMQ_VSM) {
@@ -148,17 +154,22 @@ int zmq_msg_close (zmq_msg_t *msg_)
}
}
- // As a safety measure, let's make the deallocated message look like
- // an empty message.
- msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
+ // Remove the validity tag from the message.
msg_->flags = 0;
- msg_->vsm_size = 0;
return 0;
}
int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
{
+#if 0
+ // Check the validity tags.
+ if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff ||
+ (src_->flags | ZMQ_MSG_MASK) != 0xff)) {
+ errno = EFAULT;
+ return -1;
+ }
+#endif
zmq_msg_close (dest_);
*dest_ = *src_;
zmq_msg_init (src_);
@@ -167,6 +178,13 @@ int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
{
+ // Check the validity tags.
+ if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff ||
+ (src_->flags | ZMQ_MSG_MASK) != 0xff)) {
+ errno = EFAULT;
+ return -1;
+ }
+
zmq_msg_close (dest_);
// VSMs and delimiters require no special handling.
@@ -190,6 +208,8 @@ int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
void *zmq_msg_data (zmq_msg_t *msg_)
{
+ zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff);
+
if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
return msg_->vsm_data;
if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
@@ -200,6 +220,8 @@ void *zmq_msg_data (zmq_msg_t *msg_)
size_t zmq_msg_size (zmq_msg_t *msg_)
{
+ zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff);
+
if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
return msg_->vsm_size;
if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)