summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/err.hpp3
-rw-r--r--src/ip.cpp50
-rw-r--r--src/ip.hpp2
-rw-r--r--src/kqueue.cpp4
-rw-r--r--src/options.cpp10
-rw-r--r--src/req.cpp8
-rw-r--r--src/signaler.cpp6
-rw-r--r--src/socket_base.cpp13
-rw-r--r--src/tcp_connecter.cpp15
-rw-r--r--src/tcp_listener.cpp2
-rw-r--r--src/tcp_socket.cpp10
11 files changed, 96 insertions, 27 deletions
diff --git a/src/err.hpp b/src/err.hpp
index b540a5d..9558a10 100644
--- a/src/err.hpp
+++ b/src/err.hpp
@@ -21,6 +21,9 @@
#ifndef __ZMQ_ERR_HPP_INCLUDED__
#define __ZMQ_ERR_HPP_INCLUDED__
+// 0MQ-specific error codes are defined in zmq.h
+#include "../include/zmq.h"
+
#include <assert.h>
#include <errno.h>
#include <string.h>
diff --git a/src/ip.cpp b/src/ip.cpp
index 3ea1f29..206f0e9 100644
--- a/src/ip.cpp
+++ b/src/ip.cpp
@@ -18,23 +18,33 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include "ip.hpp"
+#include "err.hpp"
+#include "platform.hpp"
+#include "stdint.hpp"
#include <stdlib.h>
#include <string.h>
#include <stdlib.h>
#include <string>
-#include "../include/zmq.h"
+#if defined ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#else
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <unistd.h>
+#endif
-#include "ip.hpp"
-#include "platform.hpp"
-#include "err.hpp"
-#include "stdint.hpp"
+#if defined ZMQ_HAVE_OPENVMS
+#include <ioctl.h>
+#endif
#if defined ZMQ_HAVE_SOLARIS
-
#include <sys/sockio.h>
#include <net/if.h>
-#include <unistd.h>
// On Solaris platform, network interface name can be queried by ioctl.
static int resolve_nic_name (in_addr* addr_, char const *interface_)
@@ -93,9 +103,6 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_)
}
#elif defined ZMQ_HAVE_AIX || ZMQ_HAVE_HPUX || ZMQ_HAVE_ANDROID
-
-#include <sys/types.h>
-#include <unistd.h>
#include <sys/ioctl.h>
#include <net/if.h>
@@ -177,6 +184,29 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_)
#endif
+int zmq::open_socket (int domain_, int type_, int protocol_)
+{
+ // Setting this option result in sane behaviour when exec() functions
+ // are used. Old sockets are closed and don't block TCP ports etc.
+#if defined SOCK_CLOEXEC
+ type_ |= SOCK_CLOEXEC;
+#endif
+
+ int s = socket (domain_, type_, protocol_);
+ if (s == -1)
+ return -1;
+
+ // If there's no SOCK_CLOEXEC, let's try the second best option. Note that
+ // race condition can cause socket not to be closed (if fork happens
+ // between socket creation and this point).
+#if !defined SOCK_CLOEXEC && defined FD_CLOEXEC
+ int rc = fcntl (s, F_SETFD, FD_CLOEXEC);
+ errno_assert (rc != -1);
+#endif
+
+ return s;
+}
+
int zmq::resolve_ip_interface (sockaddr_storage* addr_, socklen_t *addr_len_,
char const *interface_)
{
diff --git a/src/ip.hpp b/src/ip.hpp
index ec2db43..1d44325 100644
--- a/src/ip.hpp
+++ b/src/ip.hpp
@@ -49,6 +49,8 @@
namespace zmq
{
+ // Same as socket(2), but allows for transparent tweaking the options.
+ int open_socket (int domain_, int type_, int protocol_);
// Resolves network interface name in <nic-name>:<port> format. Symbol "*"
// (asterisk) resolves to INADDR_ANY (all network interfaces).
diff --git a/src/kqueue.cpp b/src/kqueue.cpp
index e28ecd7..f173f84 100644
--- a/src/kqueue.cpp
+++ b/src/kqueue.cpp
@@ -73,7 +73,9 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)
EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, 0);
int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL);
- errno_assert (rc != -1);
+
+ if (rc == -1 && errno != ENOENT)
+ errno_assert (false);
}
zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_,
diff --git a/src/options.cpp b/src/options.cpp
index 92887ab..952907b 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -19,9 +19,11 @@
*/
#include <string.h>
+#ifndef ZMQ_HAVE_WINDOWS
+#include <sys/stat.h>
+#endif
#include "../include/zmq.h"
-
#include "options.hpp"
#include "err.hpp"
@@ -64,6 +66,12 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
errno = EINVAL;
return -1;
}
+ // Check that SWAP directory (.) is writable
+ struct stat stat_buf;
+ if (stat (".", &stat_buf) || ((stat_buf.st_mode & S_IWRITE) == 0)) {
+ errno = EACCES;
+ return -1;
+ }
swap = *((int64_t*) optval_);
return 0;
diff --git a/src/req.cpp b/src/req.cpp
index 503f221..6a6b6a8 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -84,8 +84,12 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
int rc = xreq_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
- zmq_assert (msg_->flags & ZMQ_MSG_MORE);
- zmq_assert (zmq_msg_size (msg_) == 0);
+
+ // TODO: this should also close the connection with the peer
+ if (!(msg_->flags & ZMQ_MSG_MORE) || zmq_msg_size (msg_) != 0) {
+ errno = EAGAIN;
+ return -1;
+ }
message_begins = false;
}
diff --git a/src/signaler.cpp b/src/signaler.cpp
index fa2f123..7b66ea7 100644
--- a/src/signaler.cpp
+++ b/src/signaler.cpp
@@ -215,7 +215,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// Create listening socket.
SOCKET listener;
- listener = socket (AF_INET, SOCK_STREAM, 0);
+ listener = open_socket (AF_INET, SOCK_STREAM, 0);
wsa_assert (listener != INVALID_SOCKET);
// Set SO_REUSEADDR and TCP_NODELAY on listening socket.
@@ -283,7 +283,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
lcladdr.sin_port = 0;
- int listener = socket (AF_INET, SOCK_STREAM, 0);
+ int listener = open_socket (AF_INET, SOCK_STREAM, 0);
errno_assert (listener != -1);
int on = 1;
@@ -304,7 +304,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
rc = listen (listener, 1);
errno_assert (rc != -1);
- *w_ = socket (AF_INET, SOCK_STREAM, 0);
+ *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
errno_assert (*w_ != -1);
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 2167b0b..335a858 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -364,10 +364,23 @@ int zmq::socket_base_t::connect (const char *addr_)
if (rc != 0)
return -1;
+ // Checks that protocol is valid and supported on this system
rc = check_protocol (protocol);
if (rc != 0)
return -1;
+ // Parsed address for validation
+ sockaddr_storage addr;
+ socklen_t addr_len;
+
+ if (protocol == "tcp")
+ rc = resolve_ip_hostname (&addr, &addr_len, address.c_str ());
+ else
+ if (protocol == "ipc")
+ rc = resolve_local_path (&addr, &addr_len, address.c_str ());
+ if (rc != 0)
+ return -1;
+
if (protocol == "inproc" || protocol == "sys") {
// TODO: inproc connect is specific with respect to creating pipes
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index d6f73ca..6bc1b2d 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -50,7 +50,7 @@ int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_)
return resolve_ip_hostname (&addr, &addr_len, addr_);
errno = EPROTONOSUPPORT;
- return -1;
+ return -1;
}
int zmq::tcp_connecter_t::open ()
@@ -58,7 +58,7 @@ int zmq::tcp_connecter_t::open ()
zmq_assert (s == retired_fd);
// Create the socket.
- s = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);
+ s = open_socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);
if (s == INVALID_SOCKET) {
wsa_error_to_errno ();
return -1;
@@ -88,7 +88,7 @@ int zmq::tcp_connecter_t::open ()
errno = EAGAIN;
return -1;
}
-
+
wsa_error_to_errno ();
return -1;
}
@@ -162,7 +162,8 @@ int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_)
{
if (strcmp (protocol_, "tcp") == 0)
return resolve_ip_hostname (&addr, &addr_len, addr_);
- else if (strcmp (protocol_, "ipc") == 0)
+ else
+ if (strcmp (protocol_, "ipc") == 0)
return resolve_local_path (&addr, &addr_len, addr_);
errno = EPROTONOSUPPORT;
@@ -177,7 +178,7 @@ int zmq::tcp_connecter_t::open ()
if (AF_UNIX != sa->sa_family) {
// Create the socket.
- s = socket (sa->sa_family, SOCK_STREAM, IPPROTO_TCP);
+ s = open_socket (sa->sa_family, SOCK_STREAM, IPPROTO_TCP);
if (s == -1)
return -1;
@@ -233,13 +234,13 @@ int zmq::tcp_connecter_t::open ()
// Create the socket.
zmq_assert (AF_UNIX == sa->sa_family);
- s = socket (AF_UNIX, SOCK_STREAM, 0);
+ s = open_socket (AF_UNIX, SOCK_STREAM, 0);
if (s == -1)
return -1;
// Set the non-blocking flag.
int flag = fcntl (s, F_GETFL, 0);
- if (flag == -1)
+ if (flag == -1)
flag = 0;
int rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
errno_assert (rc != -1);
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 8de564f..4bfaa85 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -176,7 +176,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_,
return -1;
// Create a listening socket.
- s = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);
+ s = open_socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);
if (s == -1)
return -1;
diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp
index e7a69e4..07159d8 100644
--- a/src/tcp_socket.cpp
+++ b/src/tcp_socket.cpp
@@ -79,7 +79,7 @@ int zmq::tcp_socket_t::write (const void *data, int size)
// we'll get an error (this may happen during the speculative write).
if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)
return 0;
-
+
// Signalise peer failure.
if (nbytes == -1 && (
WSAGetLastError () == WSAENETDOWN ||
@@ -119,7 +119,7 @@ int zmq::tcp_socket_t::read (void *data, int size)
// Orderly shutdown by the other peer.
if (nbytes == 0)
- return -1;
+ return -1;
return (size_t) nbytes;
}
@@ -200,6 +200,9 @@ int zmq::tcp_socket_t::write (const void *data, int size)
if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE))
return -1;
+ if (nbytes == 1)
+ fprintf (stderr, "E: unhandled error on send: %d/%s\n",
+ errno, strerror (errno));
errno_assert (nbytes != -1);
return (size_t) nbytes;
}
@@ -220,6 +223,9 @@ int zmq::tcp_socket_t::read (void *data, int size)
errno == ETIMEDOUT || errno == EHOSTUNREACH))
return -1;
+ if (nbytes == 1)
+ fprintf (stderr, "E: unhandled error on recv: %d/%s\n",
+ errno, strerror (errno));
errno_assert (nbytes != -1);
// Orderly shutdown by the other peer.