summaryrefslogtreecommitdiff
path: root/src/tcp_engine.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/tcp_engine.cpp')
-rw-r--r--src/tcp_engine.cpp198
1 files changed, 103 insertions, 95 deletions
diff --git a/src/tcp_engine.cpp b/src/tcp_engine.cpp
index 1972809..4cb535a 100644
--- a/src/tcp_engine.cpp
+++ b/src/tcp_engine.cpp
@@ -41,7 +41,7 @@
#include "err.hpp"
zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) :
- s (retired_fd),
+ s (fd_),
inpos (NULL),
insize (0),
decoder (in_batch_size, options_.maxmsgsize),
@@ -53,17 +53,88 @@ zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) :
options (options_),
plugged (false)
{
- // Initialise the underlying socket.
- int rc = open (fd_, options.sndbuf, options.rcvbuf);
- zmq_assert (rc == 0);
+ int rc;
+
+ // Set the socket to the non-blocking mode.
+#ifdef ZMQ_HAVE_WINDOWS
+ u_long nonblock = 1;
+ rc = ioctlsocket (s, FIONBIO, &nonblock);
+ wsa_assert (rc != SOCKET_ERROR);
+#elif ZMQ_HAVE_OPENVMS
+ int nonblock = 1;
+ rc = ioctl (s, FIONBIO, &nonblock);
+ errno_assert (rc != -1);
+#else
+ int flags = fcntl (s, F_GETFL, 0);
+ if (flags == -1)
+ flags = 0;
+ rc = fcntl (s, F_SETFL, flags | O_NONBLOCK);
+ errno_assert (rc != -1);
+#endif
+
+ // Set the socket buffer limits for the underlying socket.
+ if (options.sndbuf) {
+ rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF,
+ (char*) &options.sndbuf, sizeof (int));
+#ifdef ZMQ_HAVE_WINDOWS
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ errno_assert (rc == 0);
+#endif
+ }
+ if (options.rcvbuf) {
+ rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF,
+ (char*) &options.rcvbuf, sizeof (int));
+#ifdef ZMQ_HAVE_WINDOWS
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ errno_assert (rc == 0);
+#endif
+ }
+
+#if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD
+ // Make sure that SIGPIPE signal is not generated when writing to a
+ // connection that was already closed by the peer.
+ int set = 1;
+ rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
+ errno_assert (rc == 0);
+#endif
+
+ // Disable Nagle's algorithm. We are doing data batching on 0MQ level,
+ // so using Nagle wouldn't improve throughput in anyway, but it would
+ // hurt latency.
+ int nodelay = 1;
+ rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay,
+ sizeof (int));
+#ifdef ZMQ_HAVE_WINDOWS
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ errno_assert (rc == 0);
+#endif
+
+#ifdef ZMQ_HAVE_OPENVMS
+ // Disable delayed acknowledgements as they hurt latency is serious manner.
+ int nodelack = 1;
+ rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &nodelack,
+ sizeof (int));
+ errno_assert (rc != SOCKET_ERROR);
+#endif
}
zmq::tcp_engine_t::~tcp_engine_t ()
{
zmq_assert (!plugged);
- if (s != retired_fd)
- close ();
+ if (s != retired_fd) {
+#ifdef ZMQ_HAVE_WINDOWS
+ int rc = closesocket (s);
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ int rc = close (s);
+ errno_assert (rc == 0);
+#endif
+ s = retired_fd;
+ }
}
void zmq::tcp_engine_t::plug (io_thread_t *io_thread_, session_t *session_)
@@ -234,39 +305,10 @@ void zmq::tcp_engine_t::error ()
delete this;
}
-#ifdef ZMQ_HAVE_WINDOWS
-
-int zmq::tcp_engine_t::open (fd_t fd_, int sndbuf_, int rcvbuf_)
-{
- zmq_assert (s == retired_fd);
- s = fd_;
-
- if (sndbuf_) {
- int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF,
- (char*) &sndbuf_, sizeof (int));
- errno_assert (rc == 0);
- }
-
- if (rcvbuf_) {
- int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF,
- (char*) &rcvbuf_, sizeof (int));
- errno_assert (rc == 0);
- }
-
- return 0;
-}
-
-int zmq::tcp_engine_t::close ()
-{
- zmq_assert (s != retired_fd);
- int rc = closesocket (s);
- wsa_assert (rc != SOCKET_ERROR);
- s = retired_fd;
- return 0;
-}
-
int zmq::tcp_engine_t::write (const void *data_, size_t size_)
{
+#ifdef ZMQ_HAVE_WINDOWS
+
int nbytes = send (s, (char*) data_, (int) size_, 0);
// If not a single byte can be written to the socket in non-blocking mode
@@ -285,12 +327,33 @@ int zmq::tcp_engine_t::write (const void *data_, size_t size_)
return -1;
wsa_assert (nbytes != SOCKET_ERROR);
+ return (size_t) nbytes;
+
+#else
+ ssize_t nbytes = send (s, data_, size_, 0);
+
+ // Several errors are OK. When speculative write is being done we may not
+ // be able to write a single byte from the socket. Also, SIGSTOP issued
+ // by a debugging tool can result in EINTR error.
+ if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
+ errno == EINTR))
+ return 0;
+
+ // Signalise peer failure.
+ if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE))
+ return -1;
+
+ errno_assert (nbytes != -1);
return (size_t) nbytes;
+
+#endif
}
int zmq::tcp_engine_t::read (void *data_, size_t size_)
{
+#ifdef ZMQ_HAVE_WINDOWS
+
int nbytes = recv (s, (char*) data_, (int) size_, 0);
// If not a single byte can be read from the socket in non-blocking mode
@@ -316,69 +379,13 @@ int zmq::tcp_engine_t::read (void *data_, size_t size_)
return -1;
return (size_t) nbytes;
-}
#else
-int zmq::tcp_engine_t::open (fd_t fd_, int sndbuf_, int rcvbuf_)
-{
- assert (s == retired_fd);
- s = fd_;
-
- if (sndbuf_) {
- int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, &sndbuf_, sizeof (int));
- errno_assert (rc == 0);
- }
-
- if (rcvbuf_) {
- int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, &rcvbuf_, sizeof (int));
- errno_assert (rc == 0);
- }
-
-#if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD
- int set = 1;
- int rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
- errno_assert (rc == 0);
-#endif
-
- return 0;
-}
-
-int zmq::tcp_engine_t::close ()
-{
- zmq_assert (s != retired_fd);
- int rc = ::close (s);
- if (rc != 0)
- return -1;
- s = retired_fd;
- return 0;
-}
-
-int zmq::tcp_engine_t::write (const void *data_, size_t size_)
-{
- ssize_t nbytes = send (s, data_, size_, 0);
-
- // Several errors are OK. When speculative write is being done we may not
- // be able to write a single byte to the socket. Also, SIGSTOP issued
- // by a debugging tool can result in EINTR error.
- if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
- errno == EINTR))
- return 0;
-
- // Signalise peer failure.
- if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE))
- return -1;
-
- errno_assert (nbytes != -1);
- return (size_t) nbytes;
-}
-
-int zmq::tcp_engine_t::read (void *data_, size_t size_)
-{
ssize_t nbytes = recv (s, data_, size_, 0);
// Several errors are OK. When speculative read is being done we may not
- // be able to read a single byte to the socket. Also, SIGSTOP issued
+ // be able to read a single byte from the socket. Also, SIGSTOP issued
// by a debugging tool can result in EINTR error.
if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR))
@@ -396,6 +403,7 @@ int zmq::tcp_engine_t::read (void *data_, size_t size_)
return -1;
return (size_t) nbytes;
-}
#endif
+}
+