diff options
| author | Martin Lucina <martin@lucina.net> | 2012-01-23 08:54:31 +0100 | 
|---|---|---|
| committer | Martin Lucina <martin@lucina.net> | 2012-01-23 08:54:31 +0100 | 
| commit | 978e33ba253a997b41b331b449b474a5cee7bccc (patch) | |
| tree | 68b0709e1ebb04bf7fd102a7783e3f93dd52cc8c /src | |
| parent | 75af6aed482ab16997c1388fe801f74d11ec12a4 (diff) | |
Imported Upstream version 2.1.10upstream/2.1.10
Diffstat (limited to 'src')
| -rw-r--r-- | src/err.hpp | 3 | ||||
| -rw-r--r-- | src/ip.cpp | 50 | ||||
| -rw-r--r-- | src/ip.hpp | 2 | ||||
| -rw-r--r-- | src/kqueue.cpp | 4 | ||||
| -rw-r--r-- | src/options.cpp | 10 | ||||
| -rw-r--r-- | src/req.cpp | 8 | ||||
| -rw-r--r-- | src/signaler.cpp | 6 | ||||
| -rw-r--r-- | src/socket_base.cpp | 13 | ||||
| -rw-r--r-- | src/tcp_connecter.cpp | 15 | ||||
| -rw-r--r-- | src/tcp_listener.cpp | 2 | ||||
| -rw-r--r-- | src/tcp_socket.cpp | 10 | 
11 files changed, 96 insertions, 27 deletions
| diff --git a/src/err.hpp b/src/err.hpp index b540a5d..9558a10 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -21,6 +21,9 @@  #ifndef __ZMQ_ERR_HPP_INCLUDED__  #define __ZMQ_ERR_HPP_INCLUDED__ +//  0MQ-specific error codes are defined in zmq.h +#include "../include/zmq.h" +  #include <assert.h>  #include <errno.h>  #include <string.h> @@ -18,23 +18,33 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ +#include "ip.hpp" +#include "err.hpp" +#include "platform.hpp" +#include "stdint.hpp"  #include <stdlib.h>  #include <string.h>  #include <stdlib.h>  #include <string> -#include "../include/zmq.h" +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include <fcntl.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <unistd.h> +#endif -#include "ip.hpp" -#include "platform.hpp" -#include "err.hpp" -#include "stdint.hpp" +#if defined ZMQ_HAVE_OPENVMS +#include <ioctl.h> +#endif  #if defined ZMQ_HAVE_SOLARIS -  #include <sys/sockio.h>  #include <net/if.h> -#include <unistd.h>  //  On Solaris platform, network interface name can be queried by ioctl.  static int resolve_nic_name (in_addr* addr_, char const *interface_) @@ -93,9 +103,6 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_)  }  #elif defined ZMQ_HAVE_AIX || ZMQ_HAVE_HPUX || ZMQ_HAVE_ANDROID - -#include <sys/types.h> -#include <unistd.h>  #include <sys/ioctl.h>  #include <net/if.h> @@ -177,6 +184,29 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_)  #endif +int zmq::open_socket (int domain_, int type_, int protocol_) +{ +    //  Setting this option result in sane behaviour when exec() functions +    //  are used. Old sockets are closed and don't block TCP ports etc. +#if defined SOCK_CLOEXEC +    type_ |= SOCK_CLOEXEC; +#endif + +    int s = socket (domain_, type_, protocol_); +    if (s == -1) +        return -1; + +    //  If there's no SOCK_CLOEXEC, let's try the second best option. Note that +    //  race condition can cause socket not to be closed (if fork happens +    //  between socket creation and this point). +#if !defined SOCK_CLOEXEC && defined FD_CLOEXEC +    int rc = fcntl (s, F_SETFD, FD_CLOEXEC); +    errno_assert (rc != -1); +#endif + +    return s; +} +  int zmq::resolve_ip_interface (sockaddr_storage* addr_, socklen_t *addr_len_,      char const *interface_)  { @@ -49,6 +49,8 @@  namespace zmq  { +    //  Same as socket(2), but allows for transparent tweaking the options. +    int open_socket (int domain_, int type_, int protocol_);      //  Resolves network interface name in <nic-name>:<port> format. Symbol "*"      //  (asterisk) resolves to INADDR_ANY (all network interfaces). diff --git a/src/kqueue.cpp b/src/kqueue.cpp index e28ecd7..f173f84 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -73,7 +73,9 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)      EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, 0);      int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL); -    errno_assert (rc != -1); + +    if (rc == -1 && errno != ENOENT) +        errno_assert (false);  }  zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_, diff --git a/src/options.cpp b/src/options.cpp index 92887ab..952907b 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -19,9 +19,11 @@  */  #include <string.h> +#ifndef ZMQ_HAVE_WINDOWS +#include <sys/stat.h> +#endif  #include "../include/zmq.h" -  #include "options.hpp"  #include "err.hpp" @@ -64,6 +66,12 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,              errno = EINVAL;              return -1;          } +        //  Check that SWAP directory (.) is writable +        struct stat stat_buf; +        if (stat (".", &stat_buf) || ((stat_buf.st_mode & S_IWRITE) == 0)) { +            errno = EACCES; +            return -1; +        }          swap = *((int64_t*) optval_);          return 0; diff --git a/src/req.cpp b/src/req.cpp index 503f221..6a6b6a8 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -84,8 +84,12 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)          int rc = xreq_t::xrecv (msg_, flags_);          if (rc != 0)              return rc; -        zmq_assert (msg_->flags & ZMQ_MSG_MORE); -        zmq_assert (zmq_msg_size (msg_) == 0); + +        // TODO: this should also close the connection with the peer +        if (!(msg_->flags & ZMQ_MSG_MORE) || zmq_msg_size (msg_) != 0) { +            errno = EAGAIN; +            return -1; +        }          message_begins = false;      } diff --git a/src/signaler.cpp b/src/signaler.cpp index fa2f123..7b66ea7 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -215,7 +215,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)      //  Create listening socket.      SOCKET listener; -    listener = socket (AF_INET, SOCK_STREAM, 0); +    listener = open_socket (AF_INET, SOCK_STREAM, 0);      wsa_assert (listener != INVALID_SOCKET);      //  Set SO_REUSEADDR and TCP_NODELAY on listening socket. @@ -283,7 +283,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)      lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);      lcladdr.sin_port = 0; -    int listener = socket (AF_INET, SOCK_STREAM, 0); +    int listener = open_socket (AF_INET, SOCK_STREAM, 0);      errno_assert (listener != -1);      int on = 1; @@ -304,7 +304,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)      rc = listen (listener, 1);      errno_assert (rc != -1); -    *w_ = socket (AF_INET, SOCK_STREAM, 0); +    *w_ = open_socket (AF_INET, SOCK_STREAM, 0);      errno_assert (*w_ != -1);      rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 2167b0b..335a858 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -364,10 +364,23 @@ int zmq::socket_base_t::connect (const char *addr_)      if (rc != 0)          return -1; +    //  Checks that protocol is valid and supported on this system      rc = check_protocol (protocol);      if (rc != 0)          return -1; +    //  Parsed address for validation +    sockaddr_storage addr; +    socklen_t addr_len; + +    if (protocol == "tcp") +        rc = resolve_ip_hostname (&addr, &addr_len, address.c_str ()); +    else +    if (protocol == "ipc") +        rc = resolve_local_path (&addr, &addr_len, address.c_str ()); +    if (rc != 0) +        return -1; +      if (protocol == "inproc" || protocol == "sys") {          //  TODO: inproc connect is specific with respect to creating pipes diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index d6f73ca..6bc1b2d 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -50,7 +50,7 @@ int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_)          return resolve_ip_hostname (&addr, &addr_len, addr_);      errno = EPROTONOSUPPORT; -    return -1;     +    return -1;  }  int zmq::tcp_connecter_t::open () @@ -58,7 +58,7 @@ int zmq::tcp_connecter_t::open ()      zmq_assert (s == retired_fd);      //  Create the socket. -    s = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); +    s = open_socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);      if (s == INVALID_SOCKET) {          wsa_error_to_errno ();          return -1; @@ -88,7 +88,7 @@ int zmq::tcp_connecter_t::open ()          errno = EAGAIN;          return -1;      } -     +      wsa_error_to_errno ();      return -1;  } @@ -162,7 +162,8 @@ int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_)  {      if (strcmp (protocol_, "tcp") == 0)          return resolve_ip_hostname (&addr, &addr_len, addr_); -    else if (strcmp (protocol_, "ipc") == 0) +    else +    if (strcmp (protocol_, "ipc") == 0)          return resolve_local_path (&addr, &addr_len, addr_);      errno = EPROTONOSUPPORT; @@ -177,7 +178,7 @@ int zmq::tcp_connecter_t::open ()      if (AF_UNIX != sa->sa_family) {          //  Create the socket. -        s = socket (sa->sa_family, SOCK_STREAM, IPPROTO_TCP); +        s = open_socket (sa->sa_family, SOCK_STREAM, IPPROTO_TCP);          if (s == -1)              return -1; @@ -233,13 +234,13 @@ int zmq::tcp_connecter_t::open ()          //  Create the socket.          zmq_assert (AF_UNIX == sa->sa_family); -        s = socket (AF_UNIX, SOCK_STREAM, 0); +        s = open_socket (AF_UNIX, SOCK_STREAM, 0);          if (s == -1)              return -1;          //  Set the non-blocking flag.          int flag = fcntl (s, F_GETFL, 0); -        if (flag == -1)  +        if (flag == -1)              flag = 0;          int rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);          errno_assert (rc != -1); diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 8de564f..4bfaa85 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -176,7 +176,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_,              return -1;          //  Create a listening socket. -        s = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); +        s = open_socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);          if (s == -1)              return -1; diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp index e7a69e4..07159d8 100644 --- a/src/tcp_socket.cpp +++ b/src/tcp_socket.cpp @@ -79,7 +79,7 @@ int zmq::tcp_socket_t::write (const void *data, int size)      //  we'll get an error (this may happen during the speculative write).      if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)          return 0; -		 +      //  Signalise peer failure.      if (nbytes == -1 && (            WSAGetLastError () == WSAENETDOWN || @@ -119,7 +119,7 @@ int zmq::tcp_socket_t::read (void *data, int size)      //  Orderly shutdown by the other peer.      if (nbytes == 0) -        return -1;  +        return -1;      return (size_t) nbytes;  } @@ -200,6 +200,9 @@ int zmq::tcp_socket_t::write (const void *data, int size)      if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE))          return -1; +    if (nbytes == 1) +        fprintf (stderr, "E: unhandled error on send: %d/%s\n", +                 errno, strerror (errno));      errno_assert (nbytes != -1);      return (size_t) nbytes;  } @@ -220,6 +223,9 @@ int zmq::tcp_socket_t::read (void *data, int size)            errno == ETIMEDOUT || errno == EHOSTUNREACH))          return -1; +    if (nbytes == 1) +        fprintf (stderr, "E: unhandled error on recv: %d/%s\n", +                 errno, strerror (errno));      errno_assert (nbytes != -1);      //  Orderly shutdown by the other peer. | 
