From 978e33ba253a997b41b331b449b474a5cee7bccc Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Mon, 23 Jan 2012 08:54:31 +0100 Subject: Imported Upstream version 2.1.10 --- src/err.hpp | 3 +++ src/ip.cpp | 50 ++++++++++++++++++++++++++++++++++++++++---------- src/ip.hpp | 2 ++ src/kqueue.cpp | 4 +++- src/options.cpp | 10 +++++++++- src/req.cpp | 8 ++++++-- src/signaler.cpp | 6 +++--- src/socket_base.cpp | 13 +++++++++++++ src/tcp_connecter.cpp | 15 ++++++++------- src/tcp_listener.cpp | 2 +- src/tcp_socket.cpp | 10 ++++++++-- 11 files changed, 96 insertions(+), 27 deletions(-) (limited to 'src') diff --git a/src/err.hpp b/src/err.hpp index b540a5d..9558a10 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -21,6 +21,9 @@ #ifndef __ZMQ_ERR_HPP_INCLUDED__ #define __ZMQ_ERR_HPP_INCLUDED__ +// 0MQ-specific error codes are defined in zmq.h +#include "../include/zmq.h" + #include #include #include diff --git a/src/ip.cpp b/src/ip.cpp index 3ea1f29..206f0e9 100644 --- a/src/ip.cpp +++ b/src/ip.cpp @@ -18,23 +18,33 @@ along with this program. If not, see . */ +#include "ip.hpp" +#include "err.hpp" +#include "platform.hpp" +#include "stdint.hpp" #include #include #include #include -#include "../include/zmq.h" +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#include +#include +#include +#include +#include +#endif -#include "ip.hpp" -#include "platform.hpp" -#include "err.hpp" -#include "stdint.hpp" +#if defined ZMQ_HAVE_OPENVMS +#include +#endif #if defined ZMQ_HAVE_SOLARIS - #include #include -#include // On Solaris platform, network interface name can be queried by ioctl. static int resolve_nic_name (in_addr* addr_, char const *interface_) @@ -93,9 +103,6 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_) } #elif defined ZMQ_HAVE_AIX || ZMQ_HAVE_HPUX || ZMQ_HAVE_ANDROID - -#include -#include #include #include @@ -177,6 +184,29 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_) #endif +int zmq::open_socket (int domain_, int type_, int protocol_) +{ + // Setting this option result in sane behaviour when exec() functions + // are used. Old sockets are closed and don't block TCP ports etc. +#if defined SOCK_CLOEXEC + type_ |= SOCK_CLOEXEC; +#endif + + int s = socket (domain_, type_, protocol_); + if (s == -1) + return -1; + + // If there's no SOCK_CLOEXEC, let's try the second best option. Note that + // race condition can cause socket not to be closed (if fork happens + // between socket creation and this point). +#if !defined SOCK_CLOEXEC && defined FD_CLOEXEC + int rc = fcntl (s, F_SETFD, FD_CLOEXEC); + errno_assert (rc != -1); +#endif + + return s; +} + int zmq::resolve_ip_interface (sockaddr_storage* addr_, socklen_t *addr_len_, char const *interface_) { diff --git a/src/ip.hpp b/src/ip.hpp index ec2db43..1d44325 100644 --- a/src/ip.hpp +++ b/src/ip.hpp @@ -49,6 +49,8 @@ namespace zmq { + // Same as socket(2), but allows for transparent tweaking the options. + int open_socket (int domain_, int type_, int protocol_); // Resolves network interface name in : format. Symbol "*" // (asterisk) resolves to INADDR_ANY (all network interfaces). diff --git a/src/kqueue.cpp b/src/kqueue.cpp index e28ecd7..f173f84 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -73,7 +73,9 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_) EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, 0); int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL); - errno_assert (rc != -1); + + if (rc == -1 && errno != ENOENT) + errno_assert (false); } zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_, diff --git a/src/options.cpp b/src/options.cpp index 92887ab..952907b 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -19,9 +19,11 @@ */ #include +#ifndef ZMQ_HAVE_WINDOWS +#include +#endif #include "../include/zmq.h" - #include "options.hpp" #include "err.hpp" @@ -64,6 +66,12 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, errno = EINVAL; return -1; } + // Check that SWAP directory (.) is writable + struct stat stat_buf; + if (stat (".", &stat_buf) || ((stat_buf.st_mode & S_IWRITE) == 0)) { + errno = EACCES; + return -1; + } swap = *((int64_t*) optval_); return 0; diff --git a/src/req.cpp b/src/req.cpp index 503f221..6a6b6a8 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -84,8 +84,12 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_) int rc = xreq_t::xrecv (msg_, flags_); if (rc != 0) return rc; - zmq_assert (msg_->flags & ZMQ_MSG_MORE); - zmq_assert (zmq_msg_size (msg_) == 0); + + // TODO: this should also close the connection with the peer + if (!(msg_->flags & ZMQ_MSG_MORE) || zmq_msg_size (msg_) != 0) { + errno = EAGAIN; + return -1; + } message_begins = false; } diff --git a/src/signaler.cpp b/src/signaler.cpp index fa2f123..7b66ea7 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -215,7 +215,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) // Create listening socket. SOCKET listener; - listener = socket (AF_INET, SOCK_STREAM, 0); + listener = open_socket (AF_INET, SOCK_STREAM, 0); wsa_assert (listener != INVALID_SOCKET); // Set SO_REUSEADDR and TCP_NODELAY on listening socket. @@ -283,7 +283,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); lcladdr.sin_port = 0; - int listener = socket (AF_INET, SOCK_STREAM, 0); + int listener = open_socket (AF_INET, SOCK_STREAM, 0); errno_assert (listener != -1); int on = 1; @@ -304,7 +304,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) rc = listen (listener, 1); errno_assert (rc != -1); - *w_ = socket (AF_INET, SOCK_STREAM, 0); + *w_ = open_socket (AF_INET, SOCK_STREAM, 0); errno_assert (*w_ != -1); rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 2167b0b..335a858 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -364,10 +364,23 @@ int zmq::socket_base_t::connect (const char *addr_) if (rc != 0) return -1; + // Checks that protocol is valid and supported on this system rc = check_protocol (protocol); if (rc != 0) return -1; + // Parsed address for validation + sockaddr_storage addr; + socklen_t addr_len; + + if (protocol == "tcp") + rc = resolve_ip_hostname (&addr, &addr_len, address.c_str ()); + else + if (protocol == "ipc") + rc = resolve_local_path (&addr, &addr_len, address.c_str ()); + if (rc != 0) + return -1; + if (protocol == "inproc" || protocol == "sys") { // TODO: inproc connect is specific with respect to creating pipes diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index d6f73ca..6bc1b2d 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -50,7 +50,7 @@ int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_) return resolve_ip_hostname (&addr, &addr_len, addr_); errno = EPROTONOSUPPORT; - return -1; + return -1; } int zmq::tcp_connecter_t::open () @@ -58,7 +58,7 @@ int zmq::tcp_connecter_t::open () zmq_assert (s == retired_fd); // Create the socket. - s = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); + s = open_socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); if (s == INVALID_SOCKET) { wsa_error_to_errno (); return -1; @@ -88,7 +88,7 @@ int zmq::tcp_connecter_t::open () errno = EAGAIN; return -1; } - + wsa_error_to_errno (); return -1; } @@ -162,7 +162,8 @@ int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_) { if (strcmp (protocol_, "tcp") == 0) return resolve_ip_hostname (&addr, &addr_len, addr_); - else if (strcmp (protocol_, "ipc") == 0) + else + if (strcmp (protocol_, "ipc") == 0) return resolve_local_path (&addr, &addr_len, addr_); errno = EPROTONOSUPPORT; @@ -177,7 +178,7 @@ int zmq::tcp_connecter_t::open () if (AF_UNIX != sa->sa_family) { // Create the socket. - s = socket (sa->sa_family, SOCK_STREAM, IPPROTO_TCP); + s = open_socket (sa->sa_family, SOCK_STREAM, IPPROTO_TCP); if (s == -1) return -1; @@ -233,13 +234,13 @@ int zmq::tcp_connecter_t::open () // Create the socket. zmq_assert (AF_UNIX == sa->sa_family); - s = socket (AF_UNIX, SOCK_STREAM, 0); + s = open_socket (AF_UNIX, SOCK_STREAM, 0); if (s == -1) return -1; // Set the non-blocking flag. int flag = fcntl (s, F_GETFL, 0); - if (flag == -1) + if (flag == -1) flag = 0; int rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); errno_assert (rc != -1); diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 8de564f..4bfaa85 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -176,7 +176,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, return -1; // Create a listening socket. - s = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); + s = open_socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); if (s == -1) return -1; diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp index e7a69e4..07159d8 100644 --- a/src/tcp_socket.cpp +++ b/src/tcp_socket.cpp @@ -79,7 +79,7 @@ int zmq::tcp_socket_t::write (const void *data, int size) // we'll get an error (this may happen during the speculative write). if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) return 0; - + // Signalise peer failure. if (nbytes == -1 && ( WSAGetLastError () == WSAENETDOWN || @@ -119,7 +119,7 @@ int zmq::tcp_socket_t::read (void *data, int size) // Orderly shutdown by the other peer. if (nbytes == 0) - return -1; + return -1; return (size_t) nbytes; } @@ -200,6 +200,9 @@ int zmq::tcp_socket_t::write (const void *data, int size) if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE)) return -1; + if (nbytes == 1) + fprintf (stderr, "E: unhandled error on send: %d/%s\n", + errno, strerror (errno)); errno_assert (nbytes != -1); return (size_t) nbytes; } @@ -220,6 +223,9 @@ int zmq::tcp_socket_t::read (void *data, int size) errno == ETIMEDOUT || errno == EHOSTUNREACH)) return -1; + if (nbytes == 1) + fprintf (stderr, "E: unhandled error on recv: %d/%s\n", + errno, strerror (errno)); errno_assert (nbytes != -1); // Orderly shutdown by the other peer. -- cgit v1.2.3