summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/tcp_connecter.cpp20
-rw-r--r--src/tcp_engine.cpp198
-rw-r--r--src/tcp_engine.hpp6
-rw-r--r--src/tcp_listener.cpp42
-rw-r--r--src/vtcp_connecter.cpp27
5 files changed, 103 insertions, 190 deletions
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index 678d488..30e206a 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -205,12 +205,6 @@ int zmq::tcp_connecter_t::open ()
int rc = ioctlsocket (s, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR);
- // Disable Nagle's algorithm.
- int flag = 1;
- rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
- sizeof (int));
- wsa_assert (rc != SOCKET_ERROR);
-
// Connect to the remote peer.
rc = ::connect (s, (sockaddr*) &addr, addr_len);
@@ -301,20 +295,6 @@ int zmq::tcp_connecter_t::open ()
errno_assert (rc != -1);
#endif
- // Disable Nagle's algorithm.
- int flag = 1;
- rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
- sizeof (int));
- errno_assert (rc == 0);
-
-#ifdef ZMQ_HAVE_OPENVMS
- // Disable delayed acknowledgements.
- flag = 1;
- rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &flag,
- sizeof (int));
- errno_assert (rc != SOCKET_ERROR);
-#endif
-
// Connect to the remote peer.
rc = ::connect (s, (struct sockaddr*) &addr, addr_len);
diff --git a/src/tcp_engine.cpp b/src/tcp_engine.cpp
index 1972809..4cb535a 100644
--- a/src/tcp_engine.cpp
+++ b/src/tcp_engine.cpp
@@ -41,7 +41,7 @@
#include "err.hpp"
zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) :
- s (retired_fd),
+ s (fd_),
inpos (NULL),
insize (0),
decoder (in_batch_size, options_.maxmsgsize),
@@ -53,17 +53,88 @@ zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) :
options (options_),
plugged (false)
{
- // Initialise the underlying socket.
- int rc = open (fd_, options.sndbuf, options.rcvbuf);
- zmq_assert (rc == 0);
+ int rc;
+
+ // Set the socket to the non-blocking mode.
+#ifdef ZMQ_HAVE_WINDOWS
+ u_long nonblock = 1;
+ rc = ioctlsocket (s, FIONBIO, &nonblock);
+ wsa_assert (rc != SOCKET_ERROR);
+#elif ZMQ_HAVE_OPENVMS
+ int nonblock = 1;
+ rc = ioctl (s, FIONBIO, &nonblock);
+ errno_assert (rc != -1);
+#else
+ int flags = fcntl (s, F_GETFL, 0);
+ if (flags == -1)
+ flags = 0;
+ rc = fcntl (s, F_SETFL, flags | O_NONBLOCK);
+ errno_assert (rc != -1);
+#endif
+
+ // Set the socket buffer limits for the underlying socket.
+ if (options.sndbuf) {
+ rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF,
+ (char*) &options.sndbuf, sizeof (int));
+#ifdef ZMQ_HAVE_WINDOWS
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ errno_assert (rc == 0);
+#endif
+ }
+ if (options.rcvbuf) {
+ rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF,
+ (char*) &options.rcvbuf, sizeof (int));
+#ifdef ZMQ_HAVE_WINDOWS
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ errno_assert (rc == 0);
+#endif
+ }
+
+#if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD
+ // Make sure that SIGPIPE signal is not generated when writing to a
+ // connection that was already closed by the peer.
+ int set = 1;
+ rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
+ errno_assert (rc == 0);
+#endif
+
+ // Disable Nagle's algorithm. We are doing data batching on 0MQ level,
+ // so using Nagle wouldn't improve throughput in anyway, but it would
+ // hurt latency.
+ int nodelay = 1;
+ rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay,
+ sizeof (int));
+#ifdef ZMQ_HAVE_WINDOWS
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ errno_assert (rc == 0);
+#endif
+
+#ifdef ZMQ_HAVE_OPENVMS
+ // Disable delayed acknowledgements as they hurt latency is serious manner.
+ int nodelack = 1;
+ rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &nodelack,
+ sizeof (int));
+ errno_assert (rc != SOCKET_ERROR);
+#endif
}
zmq::tcp_engine_t::~tcp_engine_t ()
{
zmq_assert (!plugged);
- if (s != retired_fd)
- close ();
+ if (s != retired_fd) {
+#ifdef ZMQ_HAVE_WINDOWS
+ int rc = closesocket (s);
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ int rc = close (s);
+ errno_assert (rc == 0);
+#endif
+ s = retired_fd;
+ }
}
void zmq::tcp_engine_t::plug (io_thread_t *io_thread_, session_t *session_)
@@ -234,39 +305,10 @@ void zmq::tcp_engine_t::error ()
delete this;
}
-#ifdef ZMQ_HAVE_WINDOWS
-
-int zmq::tcp_engine_t::open (fd_t fd_, int sndbuf_, int rcvbuf_)
-{
- zmq_assert (s == retired_fd);
- s = fd_;
-
- if (sndbuf_) {
- int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF,
- (char*) &sndbuf_, sizeof (int));
- errno_assert (rc == 0);
- }
-
- if (rcvbuf_) {
- int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF,
- (char*) &rcvbuf_, sizeof (int));
- errno_assert (rc == 0);
- }
-
- return 0;
-}
-
-int zmq::tcp_engine_t::close ()
-{
- zmq_assert (s != retired_fd);
- int rc = closesocket (s);
- wsa_assert (rc != SOCKET_ERROR);
- s = retired_fd;
- return 0;
-}
-
int zmq::tcp_engine_t::write (const void *data_, size_t size_)
{
+#ifdef ZMQ_HAVE_WINDOWS
+
int nbytes = send (s, (char*) data_, (int) size_, 0);
// If not a single byte can be written to the socket in non-blocking mode
@@ -285,12 +327,33 @@ int zmq::tcp_engine_t::write (const void *data_, size_t size_)
return -1;
wsa_assert (nbytes != SOCKET_ERROR);
+ return (size_t) nbytes;
+
+#else
+ ssize_t nbytes = send (s, data_, size_, 0);
+
+ // Several errors are OK. When speculative write is being done we may not
+ // be able to write a single byte from the socket. Also, SIGSTOP issued
+ // by a debugging tool can result in EINTR error.
+ if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
+ errno == EINTR))
+ return 0;
+
+ // Signalise peer failure.
+ if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE))
+ return -1;
+
+ errno_assert (nbytes != -1);
return (size_t) nbytes;
+
+#endif
}
int zmq::tcp_engine_t::read (void *data_, size_t size_)
{
+#ifdef ZMQ_HAVE_WINDOWS
+
int nbytes = recv (s, (char*) data_, (int) size_, 0);
// If not a single byte can be read from the socket in non-blocking mode
@@ -316,69 +379,13 @@ int zmq::tcp_engine_t::read (void *data_, size_t size_)
return -1;
return (size_t) nbytes;
-}
#else
-int zmq::tcp_engine_t::open (fd_t fd_, int sndbuf_, int rcvbuf_)
-{
- assert (s == retired_fd);
- s = fd_;
-
- if (sndbuf_) {
- int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, &sndbuf_, sizeof (int));
- errno_assert (rc == 0);
- }
-
- if (rcvbuf_) {
- int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, &rcvbuf_, sizeof (int));
- errno_assert (rc == 0);
- }
-
-#if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD
- int set = 1;
- int rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
- errno_assert (rc == 0);
-#endif
-
- return 0;
-}
-
-int zmq::tcp_engine_t::close ()
-{
- zmq_assert (s != retired_fd);
- int rc = ::close (s);
- if (rc != 0)
- return -1;
- s = retired_fd;
- return 0;
-}
-
-int zmq::tcp_engine_t::write (const void *data_, size_t size_)
-{
- ssize_t nbytes = send (s, data_, size_, 0);
-
- // Several errors are OK. When speculative write is being done we may not
- // be able to write a single byte to the socket. Also, SIGSTOP issued
- // by a debugging tool can result in EINTR error.
- if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
- errno == EINTR))
- return 0;
-
- // Signalise peer failure.
- if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE))
- return -1;
-
- errno_assert (nbytes != -1);
- return (size_t) nbytes;
-}
-
-int zmq::tcp_engine_t::read (void *data_, size_t size_)
-{
ssize_t nbytes = recv (s, data_, size_, 0);
// Several errors are OK. When speculative read is being done we may not
- // be able to read a single byte to the socket. Also, SIGSTOP issued
+ // be able to read a single byte from the socket. Also, SIGSTOP issued
// by a debugging tool can result in EINTR error.
if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR))
@@ -396,6 +403,7 @@ int zmq::tcp_engine_t::read (void *data_, size_t size_)
return -1;
return (size_t) nbytes;
-}
#endif
+}
+
diff --git a/src/tcp_engine.hpp b/src/tcp_engine.hpp
index db17122..6a41883 100644
--- a/src/tcp_engine.hpp
+++ b/src/tcp_engine.hpp
@@ -56,12 +56,6 @@ namespace zmq
// Function to handle network disconnections.
void error ();
- // Associates a socket with a native socket descriptor.
- int open (fd_t fd_, int sndbuf_, int rcvbuf_);
-
- // Closes the underlying socket.
- int close ();
-
// Writes data to the socket. Returns the number of bytes actually
// written (even zero is to be considered to be a success). In case
// of error or orderly shutdown by the other peer -1 is returned.
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index fbb1b85..d334768 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -133,11 +133,6 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_)
(const char*) &flag, sizeof (int));
wsa_assert (rc != SOCKET_ERROR);
- // Set the non-blocking flag.
- u_long uflag = 1;
- rc = ioctlsocket (s, FIONBIO, &uflag);
- wsa_assert (rc != SOCKET_ERROR);
-
// Bind the socket to the network interface and port.
rc = bind (s, (struct sockaddr*) &addr, addr_len);
if (rc == SOCKET_ERROR) {
@@ -182,12 +177,6 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
int rc = ioctlsocket (sock, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR);
- // Disable Nagle's algorithm.
- int flag = 1;
- rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
- sizeof (int));
- wsa_assert (rc != SOCKET_ERROR);
-
return sock;
}
@@ -212,19 +201,6 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_)
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
errno_assert (rc == 0);
- // Set the non-blocking flag.
-#ifdef ZMQ_HAVE_OPENVMS
- flag = 1;
- rc = ioctl (s, FIONBIO, &flag);
- errno_assert (rc != -1);
-#else
- flag = fcntl (s, F_GETFL, 0);
- if (flag == -1)
- flag = 0;
- rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
- errno_assert (rc != -1);
-#endif
-
// Bind the socket to the network interface and port.
rc = bind (s, (struct sockaddr*) &addr, addr_len);
if (rc != 0) {
@@ -369,24 +345,6 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
errno_assert (rc != -1);
#endif
- struct sockaddr *sa = (struct sockaddr*) &addr;
- if (AF_UNIX != sa->sa_family) {
-
- // Disable Nagle's algorithm.
- int flag = 1;
- rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
- sizeof (int));
- errno_assert (rc == 0);
-
-#ifdef ZMQ_HAVE_OPENVMS
- // Disable delayed acknowledgements.
- flag = 1;
- rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELACK, (char*) &flag,
- sizeof (int));
- errno_assert (rc != SOCKET_ERROR);
-#endif
- }
-
return sock;
}
diff --git a/src/vtcp_connecter.cpp b/src/vtcp_connecter.cpp
index ecb8942..2ed4d9f 100644
--- a/src/vtcp_connecter.cpp
+++ b/src/vtcp_connecter.cpp
@@ -225,33 +225,6 @@ zmq::fd_t zmq::vtcp_connecter_t::connect ()
return retired_fd;
}
- // Set to non-blocking mode.
-#ifdef ZMQ_HAVE_OPENVMS
- int flags = 1;
- rc = ioctl (s, FIONBIO, &flags);
- errno_assert (rc != -1);
-#else
- int flags = fcntl (s, F_GETFL, 0);
- if (flags == -1)
- flags = 0;
- rc = fcntl (s, F_SETFL, flags | O_NONBLOCK);
- errno_assert (rc != -1);
-#endif
-
- // Disable Nagle's algorithm.
- int flag = 1;
- rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
- sizeof (int));
- errno_assert (rc == 0);
-
-#ifdef ZMQ_HAVE_OPENVMS
- // Disable delayed acknowledgements.
- flag = 1;
- rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &flag,
- sizeof (int));
- errno_assert (rc != SOCKET_ERROR);
-#endif
-
fd_t result = s;
s = retired_fd;
return result;