diff options
| author | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:45 +0100 | 
|---|---|---|
| committer | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:45 +0100 | 
| commit | cbaa7cfa93893876e4fd8794b6ea39f4d245b6b5 (patch) | |
| tree | e15fcee68b93793ef5654e09c214150e9d3ce248 /src | |
| parent | 8e61b98c5e2943b149c825310b24e714a6127072 (diff) | |
Imported Upstream version 2.1.6upstream/2.1.6
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile.in | 9 | ||||
| -rw-r--r-- | src/fq.cpp | 7 | ||||
| -rw-r--r-- | src/req.cpp | 2 | ||||
| -rw-r--r-- | src/socket_base.cpp | 22 | ||||
| -rw-r--r-- | src/tcp_listener.cpp | 3 | ||||
| -rw-r--r-- | src/thread.cpp | 2 | ||||
| -rw-r--r-- | src/xrep.cpp | 2 | ||||
| -rw-r--r-- | src/zmq.cpp | 40 | 
8 files changed, 65 insertions, 22 deletions
| diff --git a/src/Makefile.in b/src/Makefile.in index a052a08..78e3379 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -214,6 +214,8 @@ OBJDUMP = @OBJDUMP@  OBJEXT = @OBJEXT@  OTOOL = @OTOOL@  OTOOL64 = @OTOOL64@ +OpenPGM_CFLAGS = @OpenPGM_CFLAGS@ +OpenPGM_LIBS = @OpenPGM_LIBS@  PACKAGE = @PACKAGE@  PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@  PACKAGE_NAME = @PACKAGE_NAME@ @@ -222,6 +224,9 @@ PACKAGE_TARNAME = @PACKAGE_TARNAME@  PACKAGE_URL = @PACKAGE_URL@  PACKAGE_VERSION = @PACKAGE_VERSION@  PATH_SEPARATOR = @PATH_SEPARATOR@ +PKG_CONFIG = @PKG_CONFIG@ +PKG_CONFIG_LIBDIR = @PKG_CONFIG_LIBDIR@ +PKG_CONFIG_PATH = @PKG_CONFIG_PATH@  RANLIB = @RANLIB@  SED = @SED@  SET_MAKE = @SET_MAKE@ @@ -236,8 +241,6 @@ abs_top_srcdir = @abs_top_srcdir@  ac_ct_CC = @ac_ct_CC@  ac_ct_CXX = @ac_ct_CXX@  ac_ct_DUMPBIN = @ac_ct_DUMPBIN@ -ac_zmq_have_asciidoc = @ac_zmq_have_asciidoc@ -ac_zmq_have_xmlto = @ac_zmq_have_xmlto@  am__include = @am__include@  am__leading_dot = @am__leading_dot@  am__quote = @am__quote@ @@ -266,6 +269,8 @@ infodir = @infodir@  install_sh = @install_sh@  libdir = @libdir@  libexecdir = @libexecdir@ +libzmq_have_asciidoc = @libzmq_have_asciidoc@ +libzmq_have_xmlto = @libzmq_have_xmlto@  localedir = @localedir@  localstatedir = @localstatedir@  lt_ECHO = @lt_ECHO@ @@ -56,10 +56,9 @@ void zmq::fq_t::attach (reader_t *pipe_)  void zmq::fq_t::terminated (reader_t *pipe_)  { -    //  TODO: This is a problem with session-initiated termination. It breaks -    //  message atomicity. However, for socket initiated termination it's -    //  just fine. -    zmq_assert (!more || pipes [current] != pipe_); +    //  Make sure that we are not closing current pipe while +    //  message is half-read. +    zmq_assert (terminating || (!more || pipes [current] != pipe_));      //  Remove the pipe from the list; adjust number of active pipes      //  accordingly. diff --git a/src/req.cpp b/src/req.cpp index f495492..503f221 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -49,7 +49,7 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)          zmq_msg_t prefix;          int rc = zmq_msg_init (&prefix);          zmq_assert (rc == 0); -        prefix.flags = ZMQ_MSG_MORE; +        prefix.flags |= ZMQ_MSG_MORE;          rc = xreq_t::xsend (&prefix, flags_);          if (rc != 0)              return rc; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 4cefb6f..4317bb0 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -87,7 +87,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,          break;      case ZMQ_XREP:          s = new (std::nothrow) xrep_t (parent_, tid_); -        break;      +        break;      case ZMQ_PULL:          s = new (std::nothrow) pull_t (parent_, tid_);          break; @@ -99,7 +99,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,          break;      case ZMQ_XSUB:          s = new (std::nothrow) xsub_t (parent_, tid_); -        break;     +        break;      default:          errno = EINVAL;          return NULL; @@ -334,7 +334,7 @@ int zmq::socket_base_t::bind (const char *addr_)          //  For convenience's sake, bind can be used interchageable with          //  connect for PGM and EPGM transports. -        return connect (addr_);  +        return connect (addr_);      }      zmq_assert (false); @@ -458,11 +458,18 @@ int zmq::socket_base_t::connect (const char *addr_)  int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)  { +    //  Check whether the library haven't been shut down yet.      if (unlikely (ctx_terminated)) {          errno = ETERM;          return -1;      } +    //  Check whether message passed to the function is valid. +    if (unlikely ((msg_->flags | ZMQ_MSG_MASK) != 0xff)) { +        errno = EFAULT; +        return -1; +    } +      //  Process pending commands, if any.      int rc = process_commands (false, true);      if (unlikely (rc != 0)) @@ -496,11 +503,18 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)  int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)  { +    //  Check whether the library haven't been shut down yet.      if (unlikely (ctx_terminated)) {          errno = ETERM;          return -1;      } +    //  Check whether message passed to the function is valid. +    if (unlikely ((msg_->flags | ZMQ_MSG_MASK) != 0xff)) { +        errno = EFAULT; +        return -1; +    } +      //  Get the message.      int rc = xrecv (msg_, flags_);      int err = errno; @@ -622,7 +636,7 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)      session->inc_seqnum ();      sessions_sync.unlock (); -    return session;     +    return session;  }  void zmq::socket_base_t::start_reaping (poller_t *poller_) diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 205ddc1..8de564f 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -310,7 +310,8 @@ zmq::fd_t zmq::tcp_listener_t::accept ()  #if (defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD || \       defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_OSX || \ -     defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_NETBSD) +     defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_NETBSD || \ +     defined ZMQ_HAVE_CYGWIN)      if (sock == -1 &&           (errno == EAGAIN || errno == EWOULDBLOCK ||            errno == EINTR || errno == ECONNABORTED)) diff --git a/src/thread.cpp b/src/thread.cpp index 12a72e2..7bf9df0 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -47,6 +47,8 @@ void zmq::thread_t::stop ()  {      DWORD rc = WaitForSingleObject (descriptor, INFINITE);      win_assert (rc != WAIT_FAILED); +    BOOL rc2 = CloseHandle (descriptor); +    win_assert (rc2 != 0);  }  #else diff --git a/src/xrep.cpp b/src/xrep.cpp index 7f0da4d..75dc30e 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -269,7 +269,7 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)              zmq_assert (rc == 0);              memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (),                  zmq_msg_size (msg_)); -            msg_->flags = ZMQ_MSG_MORE; +            msg_->flags |= ZMQ_MSG_MORE;              return 0;          } diff --git a/src/zmq.cpp b/src/zmq.cpp index 929e51c..61f942d 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -81,7 +81,7 @@ const char *zmq_strerror (int errnum_)  int zmq_msg_init (zmq_msg_t *msg_)  {      msg_->content = (zmq::msg_content_t*) ZMQ_VSM; -    msg_->flags = 0; +    msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;      msg_->vsm_size = 0;      return 0;  } @@ -90,7 +90,7 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)  {      if (size_ <= ZMQ_MAX_VSM_SIZE) {          msg_->content = (zmq::msg_content_t*) ZMQ_VSM; -        msg_->flags = 0; +        msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;          msg_->vsm_size = (uint8_t) size_;      }      else { @@ -100,8 +100,8 @@ int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)              errno = ENOMEM;              return -1;          } -        msg_->flags = 0; -         +        msg_->flags = (unsigned char) ~ZMQ_MSG_MASK; +          zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;          content->data = (void*) (content + 1);          content->size = size_; @@ -117,7 +117,7 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,  {      msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));      alloc_assert (msg_->content); -    msg_->flags = 0; +    msg_->flags = (unsigned char) ~ZMQ_MSG_MASK;      zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;      content->data = data_;      content->size = size_; @@ -129,6 +129,12 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,  int zmq_msg_close (zmq_msg_t *msg_)  { +    //  Check the validity tag. +    if (unlikely (msg_->flags | ZMQ_MSG_MASK) != 0xff) { +        errno = EFAULT; +        return -1; +    } +      //  For VSMs and delimiters there are no resources to free.      if (msg_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&            msg_->content != (zmq::msg_content_t*) ZMQ_VSM) { @@ -148,17 +154,22 @@ int zmq_msg_close (zmq_msg_t *msg_)          }      } -    //  As a safety measure, let's make the deallocated message look like -    //  an empty message. -    msg_->content = (zmq::msg_content_t*) ZMQ_VSM; +    //  Remove the validity tag from the message.      msg_->flags = 0; -    msg_->vsm_size = 0;      return 0;  }  int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)  { +#if 0 +    //  Check the validity tags. +    if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff || +          (src_->flags | ZMQ_MSG_MASK) != 0xff)) { +        errno = EFAULT; +        return -1; +    } +#endif      zmq_msg_close (dest_);      *dest_ = *src_;      zmq_msg_init (src_); @@ -167,6 +178,13 @@ int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)  int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)  { +    //  Check the validity tags. +    if (unlikely ((dest_->flags | ZMQ_MSG_MASK) != 0xff || +          (src_->flags | ZMQ_MSG_MASK) != 0xff)) { +        errno = EFAULT; +        return -1; +    } +      zmq_msg_close (dest_);      //  VSMs and delimiters require no special handling. @@ -190,6 +208,8 @@ int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)  void *zmq_msg_data (zmq_msg_t *msg_)  { +    zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff); +      if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)          return msg_->vsm_data;      if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) @@ -200,6 +220,8 @@ void *zmq_msg_data (zmq_msg_t *msg_)  size_t zmq_msg_size (zmq_msg_t *msg_)  { +    zmq_assert ((msg_->flags | ZMQ_MSG_MASK) == 0xff); +      if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)          return msg_->vsm_size;      if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER) | 
