diff options
-rw-r--r-- | src/tcp_connecter.cpp | 20 | ||||
-rw-r--r-- | src/tcp_engine.cpp | 198 | ||||
-rw-r--r-- | src/tcp_engine.hpp | 6 | ||||
-rw-r--r-- | src/tcp_listener.cpp | 42 | ||||
-rw-r--r-- | src/vtcp_connecter.cpp | 27 |
5 files changed, 103 insertions, 190 deletions
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 678d488..30e206a 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -205,12 +205,6 @@ int zmq::tcp_connecter_t::open () int rc = ioctlsocket (s, FIONBIO, &argp); wsa_assert (rc != SOCKET_ERROR); - // Disable Nagle's algorithm. - int flag = 1; - rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, - sizeof (int)); - wsa_assert (rc != SOCKET_ERROR); - // Connect to the remote peer. rc = ::connect (s, (sockaddr*) &addr, addr_len); @@ -301,20 +295,6 @@ int zmq::tcp_connecter_t::open () errno_assert (rc != -1); #endif - // Disable Nagle's algorithm. - int flag = 1; - rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, - sizeof (int)); - errno_assert (rc == 0); - -#ifdef ZMQ_HAVE_OPENVMS - // Disable delayed acknowledgements. - flag = 1; - rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, - sizeof (int)); - errno_assert (rc != SOCKET_ERROR); -#endif - // Connect to the remote peer. rc = ::connect (s, (struct sockaddr*) &addr, addr_len); diff --git a/src/tcp_engine.cpp b/src/tcp_engine.cpp index 1972809..4cb535a 100644 --- a/src/tcp_engine.cpp +++ b/src/tcp_engine.cpp @@ -41,7 +41,7 @@ #include "err.hpp" zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) : - s (retired_fd), + s (fd_), inpos (NULL), insize (0), decoder (in_batch_size, options_.maxmsgsize), @@ -53,17 +53,88 @@ zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) : options (options_), plugged (false) { - // Initialise the underlying socket. - int rc = open (fd_, options.sndbuf, options.rcvbuf); - zmq_assert (rc == 0); + int rc; + + // Set the socket to the non-blocking mode. +#ifdef ZMQ_HAVE_WINDOWS + u_long nonblock = 1; + rc = ioctlsocket (s, FIONBIO, &nonblock); + wsa_assert (rc != SOCKET_ERROR); +#elif ZMQ_HAVE_OPENVMS + int nonblock = 1; + rc = ioctl (s, FIONBIO, &nonblock); + errno_assert (rc != -1); +#else + int flags = fcntl (s, F_GETFL, 0); + if (flags == -1) + flags = 0; + rc = fcntl (s, F_SETFL, flags | O_NONBLOCK); + errno_assert (rc != -1); +#endif + + // Set the socket buffer limits for the underlying socket. + if (options.sndbuf) { + rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, + (char*) &options.sndbuf, sizeof (int)); +#ifdef ZMQ_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); +#else + errno_assert (rc == 0); +#endif + } + if (options.rcvbuf) { + rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, + (char*) &options.rcvbuf, sizeof (int)); +#ifdef ZMQ_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); +#else + errno_assert (rc == 0); +#endif + } + +#if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD + // Make sure that SIGPIPE signal is not generated when writing to a + // connection that was already closed by the peer. + int set = 1; + rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int)); + errno_assert (rc == 0); +#endif + + // Disable Nagle's algorithm. We are doing data batching on 0MQ level, + // so using Nagle wouldn't improve throughput in anyway, but it would + // hurt latency. + int nodelay = 1; + rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay, + sizeof (int)); +#ifdef ZMQ_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); +#else + errno_assert (rc == 0); +#endif + +#ifdef ZMQ_HAVE_OPENVMS + // Disable delayed acknowledgements as they hurt latency is serious manner. + int nodelack = 1; + rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &nodelack, + sizeof (int)); + errno_assert (rc != SOCKET_ERROR); +#endif } zmq::tcp_engine_t::~tcp_engine_t () { zmq_assert (!plugged); - if (s != retired_fd) - close (); + if (s != retired_fd) { +#ifdef ZMQ_HAVE_WINDOWS + int rc = closesocket (s); + wsa_assert (rc != SOCKET_ERROR); +#else + int rc = close (s); + errno_assert (rc == 0); +#endif + s = retired_fd; + } } void zmq::tcp_engine_t::plug (io_thread_t *io_thread_, session_t *session_) @@ -234,39 +305,10 @@ void zmq::tcp_engine_t::error () delete this; } -#ifdef ZMQ_HAVE_WINDOWS - -int zmq::tcp_engine_t::open (fd_t fd_, int sndbuf_, int rcvbuf_) -{ - zmq_assert (s == retired_fd); - s = fd_; - - if (sndbuf_) { - int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, - (char*) &sndbuf_, sizeof (int)); - errno_assert (rc == 0); - } - - if (rcvbuf_) { - int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, - (char*) &rcvbuf_, sizeof (int)); - errno_assert (rc == 0); - } - - return 0; -} - -int zmq::tcp_engine_t::close () -{ - zmq_assert (s != retired_fd); - int rc = closesocket (s); - wsa_assert (rc != SOCKET_ERROR); - s = retired_fd; - return 0; -} - int zmq::tcp_engine_t::write (const void *data_, size_t size_) { +#ifdef ZMQ_HAVE_WINDOWS + int nbytes = send (s, (char*) data_, (int) size_, 0); // If not a single byte can be written to the socket in non-blocking mode @@ -285,12 +327,33 @@ int zmq::tcp_engine_t::write (const void *data_, size_t size_) return -1; wsa_assert (nbytes != SOCKET_ERROR); + return (size_t) nbytes; + +#else + ssize_t nbytes = send (s, data_, size_, 0); + + // Several errors are OK. When speculative write is being done we may not + // be able to write a single byte from the socket. Also, SIGSTOP issued + // by a debugging tool can result in EINTR error. + if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || + errno == EINTR)) + return 0; + + // Signalise peer failure. + if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE)) + return -1; + + errno_assert (nbytes != -1); return (size_t) nbytes; + +#endif } int zmq::tcp_engine_t::read (void *data_, size_t size_) { +#ifdef ZMQ_HAVE_WINDOWS + int nbytes = recv (s, (char*) data_, (int) size_, 0); // If not a single byte can be read from the socket in non-blocking mode @@ -316,69 +379,13 @@ int zmq::tcp_engine_t::read (void *data_, size_t size_) return -1; return (size_t) nbytes; -} #else -int zmq::tcp_engine_t::open (fd_t fd_, int sndbuf_, int rcvbuf_) -{ - assert (s == retired_fd); - s = fd_; - - if (sndbuf_) { - int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, &sndbuf_, sizeof (int)); - errno_assert (rc == 0); - } - - if (rcvbuf_) { - int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, &rcvbuf_, sizeof (int)); - errno_assert (rc == 0); - } - -#if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD - int set = 1; - int rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int)); - errno_assert (rc == 0); -#endif - - return 0; -} - -int zmq::tcp_engine_t::close () -{ - zmq_assert (s != retired_fd); - int rc = ::close (s); - if (rc != 0) - return -1; - s = retired_fd; - return 0; -} - -int zmq::tcp_engine_t::write (const void *data_, size_t size_) -{ - ssize_t nbytes = send (s, data_, size_, 0); - - // Several errors are OK. When speculative write is being done we may not - // be able to write a single byte to the socket. Also, SIGSTOP issued - // by a debugging tool can result in EINTR error. - if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || - errno == EINTR)) - return 0; - - // Signalise peer failure. - if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE)) - return -1; - - errno_assert (nbytes != -1); - return (size_t) nbytes; -} - -int zmq::tcp_engine_t::read (void *data_, size_t size_) -{ ssize_t nbytes = recv (s, data_, size_, 0); // Several errors are OK. When speculative read is being done we may not - // be able to read a single byte to the socket. Also, SIGSTOP issued + // be able to read a single byte from the socket. Also, SIGSTOP issued // by a debugging tool can result in EINTR error. if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) @@ -396,6 +403,7 @@ int zmq::tcp_engine_t::read (void *data_, size_t size_) return -1; return (size_t) nbytes; -} #endif +} + diff --git a/src/tcp_engine.hpp b/src/tcp_engine.hpp index db17122..6a41883 100644 --- a/src/tcp_engine.hpp +++ b/src/tcp_engine.hpp @@ -56,12 +56,6 @@ namespace zmq // Function to handle network disconnections. void error (); - // Associates a socket with a native socket descriptor. - int open (fd_t fd_, int sndbuf_, int rcvbuf_); - - // Closes the underlying socket. - int close (); - // Writes data to the socket. Returns the number of bytes actually // written (even zero is to be considered to be a success). In case // of error or orderly shutdown by the other peer -1 is returned. diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index fbb1b85..d334768 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -133,11 +133,6 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_) (const char*) &flag, sizeof (int)); wsa_assert (rc != SOCKET_ERROR); - // Set the non-blocking flag. - u_long uflag = 1; - rc = ioctlsocket (s, FIONBIO, &uflag); - wsa_assert (rc != SOCKET_ERROR); - // Bind the socket to the network interface and port. rc = bind (s, (struct sockaddr*) &addr, addr_len); if (rc == SOCKET_ERROR) { @@ -182,12 +177,6 @@ zmq::fd_t zmq::tcp_listener_t::accept () int rc = ioctlsocket (sock, FIONBIO, &argp); wsa_assert (rc != SOCKET_ERROR); - // Disable Nagle's algorithm. - int flag = 1; - rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, - sizeof (int)); - wsa_assert (rc != SOCKET_ERROR); - return sock; } @@ -212,19 +201,6 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_) rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); errno_assert (rc == 0); - // Set the non-blocking flag. -#ifdef ZMQ_HAVE_OPENVMS - flag = 1; - rc = ioctl (s, FIONBIO, &flag); - errno_assert (rc != -1); -#else - flag = fcntl (s, F_GETFL, 0); - if (flag == -1) - flag = 0; - rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); - errno_assert (rc != -1); -#endif - // Bind the socket to the network interface and port. rc = bind (s, (struct sockaddr*) &addr, addr_len); if (rc != 0) { @@ -369,24 +345,6 @@ zmq::fd_t zmq::tcp_listener_t::accept () errno_assert (rc != -1); #endif - struct sockaddr *sa = (struct sockaddr*) &addr; - if (AF_UNIX != sa->sa_family) { - - // Disable Nagle's algorithm. - int flag = 1; - rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, - sizeof (int)); - errno_assert (rc == 0); - -#ifdef ZMQ_HAVE_OPENVMS - // Disable delayed acknowledgements. - flag = 1; - rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, - sizeof (int)); - errno_assert (rc != SOCKET_ERROR); -#endif - } - return sock; } diff --git a/src/vtcp_connecter.cpp b/src/vtcp_connecter.cpp index ecb8942..2ed4d9f 100644 --- a/src/vtcp_connecter.cpp +++ b/src/vtcp_connecter.cpp @@ -225,33 +225,6 @@ zmq::fd_t zmq::vtcp_connecter_t::connect () return retired_fd; } - // Set to non-blocking mode. -#ifdef ZMQ_HAVE_OPENVMS - int flags = 1; - rc = ioctl (s, FIONBIO, &flags); - errno_assert (rc != -1); -#else - int flags = fcntl (s, F_GETFL, 0); - if (flags == -1) - flags = 0; - rc = fcntl (s, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc != -1); -#endif - - // Disable Nagle's algorithm. - int flag = 1; - rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, - sizeof (int)); - errno_assert (rc == 0); - -#ifdef ZMQ_HAVE_OPENVMS - // Disable delayed acknowledgements. - flag = 1; - rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, - sizeof (int)); - errno_assert (rc != SOCKET_ERROR); -#endif - fd_t result = s; s = retired_fd; return result; |