diff options
Diffstat (limited to 'src/tcp_engine.cpp')
-rw-r--r-- | src/tcp_engine.cpp | 198 |
1 files changed, 103 insertions, 95 deletions
diff --git a/src/tcp_engine.cpp b/src/tcp_engine.cpp index 1972809..4cb535a 100644 --- a/src/tcp_engine.cpp +++ b/src/tcp_engine.cpp @@ -41,7 +41,7 @@ #include "err.hpp" zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) : - s (retired_fd), + s (fd_), inpos (NULL), insize (0), decoder (in_batch_size, options_.maxmsgsize), @@ -53,17 +53,88 @@ zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) : options (options_), plugged (false) { - // Initialise the underlying socket. - int rc = open (fd_, options.sndbuf, options.rcvbuf); - zmq_assert (rc == 0); + int rc; + + // Set the socket to the non-blocking mode. +#ifdef ZMQ_HAVE_WINDOWS + u_long nonblock = 1; + rc = ioctlsocket (s, FIONBIO, &nonblock); + wsa_assert (rc != SOCKET_ERROR); +#elif ZMQ_HAVE_OPENVMS + int nonblock = 1; + rc = ioctl (s, FIONBIO, &nonblock); + errno_assert (rc != -1); +#else + int flags = fcntl (s, F_GETFL, 0); + if (flags == -1) + flags = 0; + rc = fcntl (s, F_SETFL, flags | O_NONBLOCK); + errno_assert (rc != -1); +#endif + + // Set the socket buffer limits for the underlying socket. + if (options.sndbuf) { + rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, + (char*) &options.sndbuf, sizeof (int)); +#ifdef ZMQ_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); +#else + errno_assert (rc == 0); +#endif + } + if (options.rcvbuf) { + rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, + (char*) &options.rcvbuf, sizeof (int)); +#ifdef ZMQ_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); +#else + errno_assert (rc == 0); +#endif + } + +#if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD + // Make sure that SIGPIPE signal is not generated when writing to a + // connection that was already closed by the peer. + int set = 1; + rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int)); + errno_assert (rc == 0); +#endif + + // Disable Nagle's algorithm. We are doing data batching on 0MQ level, + // so using Nagle wouldn't improve throughput in anyway, but it would + // hurt latency. + int nodelay = 1; + rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay, + sizeof (int)); +#ifdef ZMQ_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); +#else + errno_assert (rc == 0); +#endif + +#ifdef ZMQ_HAVE_OPENVMS + // Disable delayed acknowledgements as they hurt latency is serious manner. + int nodelack = 1; + rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &nodelack, + sizeof (int)); + errno_assert (rc != SOCKET_ERROR); +#endif } zmq::tcp_engine_t::~tcp_engine_t () { zmq_assert (!plugged); - if (s != retired_fd) - close (); + if (s != retired_fd) { +#ifdef ZMQ_HAVE_WINDOWS + int rc = closesocket (s); + wsa_assert (rc != SOCKET_ERROR); +#else + int rc = close (s); + errno_assert (rc == 0); +#endif + s = retired_fd; + } } void zmq::tcp_engine_t::plug (io_thread_t *io_thread_, session_t *session_) @@ -234,39 +305,10 @@ void zmq::tcp_engine_t::error () delete this; } -#ifdef ZMQ_HAVE_WINDOWS - -int zmq::tcp_engine_t::open (fd_t fd_, int sndbuf_, int rcvbuf_) -{ - zmq_assert (s == retired_fd); - s = fd_; - - if (sndbuf_) { - int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, - (char*) &sndbuf_, sizeof (int)); - errno_assert (rc == 0); - } - - if (rcvbuf_) { - int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, - (char*) &rcvbuf_, sizeof (int)); - errno_assert (rc == 0); - } - - return 0; -} - -int zmq::tcp_engine_t::close () -{ - zmq_assert (s != retired_fd); - int rc = closesocket (s); - wsa_assert (rc != SOCKET_ERROR); - s = retired_fd; - return 0; -} - int zmq::tcp_engine_t::write (const void *data_, size_t size_) { +#ifdef ZMQ_HAVE_WINDOWS + int nbytes = send (s, (char*) data_, (int) size_, 0); // If not a single byte can be written to the socket in non-blocking mode @@ -285,12 +327,33 @@ int zmq::tcp_engine_t::write (const void *data_, size_t size_) return -1; wsa_assert (nbytes != SOCKET_ERROR); + return (size_t) nbytes; + +#else + ssize_t nbytes = send (s, data_, size_, 0); + + // Several errors are OK. When speculative write is being done we may not + // be able to write a single byte from the socket. Also, SIGSTOP issued + // by a debugging tool can result in EINTR error. + if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || + errno == EINTR)) + return 0; + + // Signalise peer failure. + if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE)) + return -1; + + errno_assert (nbytes != -1); return (size_t) nbytes; + +#endif } int zmq::tcp_engine_t::read (void *data_, size_t size_) { +#ifdef ZMQ_HAVE_WINDOWS + int nbytes = recv (s, (char*) data_, (int) size_, 0); // If not a single byte can be read from the socket in non-blocking mode @@ -316,69 +379,13 @@ int zmq::tcp_engine_t::read (void *data_, size_t size_) return -1; return (size_t) nbytes; -} #else -int zmq::tcp_engine_t::open (fd_t fd_, int sndbuf_, int rcvbuf_) -{ - assert (s == retired_fd); - s = fd_; - - if (sndbuf_) { - int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, &sndbuf_, sizeof (int)); - errno_assert (rc == 0); - } - - if (rcvbuf_) { - int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, &rcvbuf_, sizeof (int)); - errno_assert (rc == 0); - } - -#if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD - int set = 1; - int rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int)); - errno_assert (rc == 0); -#endif - - return 0; -} - -int zmq::tcp_engine_t::close () -{ - zmq_assert (s != retired_fd); - int rc = ::close (s); - if (rc != 0) - return -1; - s = retired_fd; - return 0; -} - -int zmq::tcp_engine_t::write (const void *data_, size_t size_) -{ - ssize_t nbytes = send (s, data_, size_, 0); - - // Several errors are OK. When speculative write is being done we may not - // be able to write a single byte to the socket. Also, SIGSTOP issued - // by a debugging tool can result in EINTR error. - if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || - errno == EINTR)) - return 0; - - // Signalise peer failure. - if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE)) - return -1; - - errno_assert (nbytes != -1); - return (size_t) nbytes; -} - -int zmq::tcp_engine_t::read (void *data_, size_t size_) -{ ssize_t nbytes = recv (s, data_, size_, 0); // Several errors are OK. When speculative read is being done we may not - // be able to read a single byte to the socket. Also, SIGSTOP issued + // be able to read a single byte from the socket. Also, SIGSTOP issued // by a debugging tool can result in EINTR error. if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) @@ -396,6 +403,7 @@ int zmq::tcp_engine_t::read (void *data_, size_t size_) return -1; return (size_t) nbytes; -} #endif +} + |