diff options
| -rw-r--r-- | src/Makefile.am | 10 | ||||
| -rw-r--r-- | src/session.cpp | 4 | ||||
| -rw-r--r-- | src/socket_base.cpp | 6 | ||||
| -rw-r--r-- | src/tcp_connecter.cpp | 182 | ||||
| -rw-r--r-- | src/tcp_connecter.hpp | 68 | ||||
| -rw-r--r-- | src/tcp_engine.cpp | 401 | ||||
| -rw-r--r-- | src/tcp_engine.hpp (renamed from src/zmq_engine.hpp) | 39 | ||||
| -rw-r--r-- | src/tcp_listener.cpp | 127 | ||||
| -rw-r--r-- | src/tcp_listener.hpp | 36 | ||||
| -rw-r--r-- | src/tcp_socket.cpp | 230 | ||||
| -rw-r--r-- | src/tcp_socket.hpp | 74 | ||||
| -rw-r--r-- | src/zmq_connecter.cpp | 161 | ||||
| -rw-r--r-- | src/zmq_connecter.hpp | 92 | ||||
| -rw-r--r-- | src/zmq_engine.cpp | 224 | ||||
| -rw-r--r-- | src/zmq_listener.cpp | 84 | ||||
| -rw-r--r-- | src/zmq_listener.hpp | 67 | 
16 files changed, 727 insertions, 1078 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index b02e302..1354174 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -60,8 +60,8 @@ libzmq_la_SOURCES = \      stdint.hpp \      sub.hpp \      tcp_connecter.hpp \ +    tcp_engine.hpp \      tcp_listener.hpp \ -    tcp_socket.hpp \      thread.hpp \      trie.hpp \      windows.hpp \ @@ -72,9 +72,6 @@ libzmq_la_SOURCES = \      xsub.hpp \      ypipe.hpp \      yqueue.hpp \ -    zmq_connecter.hpp \ -    zmq_engine.hpp \ -    zmq_listener.hpp \      clock.cpp \      ctx.cpp \      decoder.cpp \ @@ -116,8 +113,8 @@ libzmq_la_SOURCES = \      socket_base.cpp \      sub.cpp \      tcp_connecter.cpp \ +    tcp_engine.cpp \      tcp_listener.cpp \ -    tcp_socket.cpp \      thread.cpp \      trie.cpp \      xpub.cpp \ @@ -125,9 +122,6 @@ libzmq_la_SOURCES = \      xreq.cpp \      xsub.cpp \      zmq.cpp \ -    zmq_connecter.cpp \ -    zmq_engine.cpp \ -    zmq_listener.cpp \      zmq_utils.cpp  if ON_MINGW diff --git a/src/session.cpp b/src/session.cpp index ed9b44e..d14b58c 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -24,7 +24,7 @@  #include "err.hpp"  #include "pipe.hpp"  #include "likely.hpp" -#include "zmq_connecter.hpp" +#include "tcp_connecter.hpp"  #include "pgm_sender.hpp"  #include "pgm_receiver.hpp" @@ -307,7 +307,7 @@ void zmq::session_t::start_connecting (bool wait_)      //  Both TCP and IPC transports are using the same infrastructure.      if (protocol == "tcp" || protocol == "ipc") { -        zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( +        tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t (              io_thread, this, options, protocol.c_str (), address.c_str (),              wait_);          alloc_assert (connecter); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 2ec3998..b97e3a7 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -34,8 +34,8 @@  #endif  #include "socket_base.hpp" -#include "zmq_listener.hpp" -#include "zmq_connecter.hpp" +#include "tcp_listener.hpp" +#include "tcp_connecter.hpp"  #include "io_thread.hpp"  #include "session.hpp"  #include "config.hpp" @@ -348,7 +348,7 @@ int zmq::socket_base_t::bind (const char *addr_)          }          //  Create and run the listener. -        zmq_listener_t *listener = new (std::nothrow) zmq_listener_t ( +        tcp_listener_t *listener = new (std::nothrow) tcp_listener_t (              io_thread, this, options);          alloc_assert (listener);          int rc = listener->set_address (protocol.c_str(), address.c_str ()); diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 01d80cc..678d488 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -18,30 +18,168 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ -#include <string.h> - +#include <new>  #include <string>  #include "tcp_connecter.hpp" +#include "tcp_engine.hpp" +#include "io_thread.hpp"  #include "platform.hpp"  #include "ip.hpp"  #include "err.hpp" -#ifdef ZMQ_HAVE_WINDOWS +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> +#include <netinet/tcp.h> +#include <netinet/in.h> +#include <netdb.h> +#include <fcntl.h> +#ifdef ZMQ_HAVE_OPENVMS +#include <ioctl.h> +#endif +#endif -zmq::tcp_connecter_t::tcp_connecter_t () : -    s (retired_fd) +zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, +      class session_t *session_, const options_t &options_, +      const char *protocol_, const char *address_, bool wait_) : +    own_t (io_thread_, options_), +    io_object_t (io_thread_), +    s (retired_fd), +    handle_valid (false), +    wait (wait_), +    session (session_), +    current_reconnect_ivl(options.reconnect_ivl)  {      memset (&addr, 0, sizeof (addr));      addr_len = 0; + +    int rc = set_address (protocol_, address_); +    zmq_assert (rc == 0); //TODO: take care ENOMEM, EINVAL  }  zmq::tcp_connecter_t::~tcp_connecter_t ()  { +    if (wait) +        cancel_timer (reconnect_timer_id); +    if (handle_valid) +        rm_fd (handle); +      if (s != retired_fd)          close ();  } +void zmq::tcp_connecter_t::process_plug () +{ +    if (wait) +        add_reconnect_timer(); +    else +        start_connecting (); +} + +void zmq::tcp_connecter_t::in_event () +{ +    //  We are not polling for incomming data, so we are actually called +    //  because of error here. However, we can get error on out event as well +    //  on some platforms, so we'll simply handle both events in the same way. +    out_event (); +} + +void zmq::tcp_connecter_t::out_event () +{ +    fd_t fd = connect (); +    rm_fd (handle); +    handle_valid = false; + +    //  Handle the error condition by attempt to reconnect. +    if (fd == retired_fd) { +        close (); +        wait = true; +        add_reconnect_timer(); +        return; +    } + +    //  Create the engine object for this connection. +    tcp_engine_t *engine = new (std::nothrow) tcp_engine_t (fd, options); +    alloc_assert (engine); + +    //  Attach the engine to the corresponding session object. +    send_attach (session, engine); + +    //  Shut the connecter down. +    terminate (); +} + +void zmq::tcp_connecter_t::timer_event (int id_) +{ +    zmq_assert (id_ == reconnect_timer_id); +    wait = false; +    start_connecting (); +} + +void zmq::tcp_connecter_t::start_connecting () +{ +    //  Open the connecting socket. +    int rc = open (); + +    //  Connect may succeed in synchronous manner. +    if (rc == 0) { +        handle = add_fd (s); +        handle_valid = true; +        out_event (); +        return; +    } + +    //  Connection establishment may be dealyed. Poll for its completion. +    else if (rc == -1 && errno == EAGAIN) { +        handle = add_fd (s); +        handle_valid = true; +        set_pollout (handle); +        return; +    } + +    //  Handle any other error condition by eventual reconnect. +    wait = true; +    add_reconnect_timer(); +} + +void zmq::tcp_connecter_t::add_reconnect_timer() +{ +    add_timer (get_new_reconnect_ivl(), reconnect_timer_id); +} + +int zmq::tcp_connecter_t::get_new_reconnect_ivl () +{ +#if defined ZMQ_HAVE_WINDOWS +    int pid = (int) GetCurrentProcessId (); +#else +    int pid = (int) getpid (); +#endif + +    //  The new interval is the current interval + random value. +    int this_interval = current_reconnect_ivl + +        ((pid * 13) % options.reconnect_ivl); + +    //  Only change the current reconnect interval  if the maximum reconnect +    //  interval was set and if it's larger than the reconnect interval. +    if (options.reconnect_ivl_max > 0 &&  +        options.reconnect_ivl_max > options.reconnect_ivl) { + +        //  Calculate the next interval +        current_reconnect_ivl = current_reconnect_ivl * 2; +        if(current_reconnect_ivl >= options.reconnect_ivl_max) { +            current_reconnect_ivl = options.reconnect_ivl_max; +        }    +    } +    return this_interval; +} + +#ifdef ZMQ_HAVE_WINDOWS +  int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_)  {      if (strcmp (protocol_, "tcp") == 0) @@ -100,11 +238,6 @@ int zmq::tcp_connecter_t::close ()      return 0;  } -zmq::fd_t zmq::tcp_connecter_t::get_fd () -{ -    return s; -} -  zmq::fd_t zmq::tcp_connecter_t::connect ()  {      //  Nonblocking connect have finished. Check whether an error occured. @@ -132,30 +265,6 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()  #else -#include <unistd.h> -#include <sys/socket.h> -#include <arpa/inet.h> -#include <netinet/tcp.h> -#include <netinet/in.h> -#include <netdb.h> -#include <fcntl.h> - -#ifdef ZMQ_HAVE_OPENVMS -#include <ioctl.h> -#endif - -zmq::tcp_connecter_t::tcp_connecter_t () : -    s (retired_fd) -{ -    memset (&addr, 0, sizeof (addr)); -} - -zmq::tcp_connecter_t::~tcp_connecter_t () -{ -    if (s != retired_fd) -        close (); -} -  int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_)  {      if (strcmp (protocol_, "tcp") == 0) @@ -271,11 +380,6 @@ int zmq::tcp_connecter_t::close ()      return 0;  } -zmq::fd_t zmq::tcp_connecter_t::get_fd () -{ -    return s; -} -  zmq::fd_t zmq::tcp_connecter_t::connect ()  {      //  Following code should handle both Berkeley-derived socket diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index 06641e5..2168ca7 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -21,28 +21,50 @@  #ifndef __ZMQ_TCP_CONNECTER_HPP_INCLUDED__  #define __ZMQ_TCP_CONNECTER_HPP_INCLUDED__ -#include "platform.hpp"  #include "fd.hpp" - -#ifdef ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#else -#include <sys/types.h> -#include <sys/socket.h> -#endif +#include "ip.hpp" +#include "own.hpp" +#include "io_object.hpp" +#include "stdint.hpp"  namespace zmq  { -    //  The class encapsulating simple TCP listening socket. - -    class tcp_connecter_t +    class tcp_connecter_t : public own_t, public io_object_t      {      public: -        tcp_connecter_t (); +        //  If 'wait' is true connecter first waits for a while, then starts +        //  connection process. +        tcp_connecter_t (class io_thread_t *io_thread_, +            class session_t *session_, const options_t &options_, +            const char *protocol_, const char *address_, bool delay_);          ~tcp_connecter_t (); +    private: + +        //  ID of the timer used to delay the reconnection. +        enum {reconnect_timer_id = 1}; + +        //  Handlers for incoming commands. +        void process_plug (); + +        //  Handlers for I/O events. +        void in_event (); +        void out_event (); +        void timer_event (int id_); + +        //  Internal function to start the actual connection establishment. +        void start_connecting (); + +        //  Internal function to add a reconnect timer +        void add_reconnect_timer(); + +        //  Internal function to return a reconnect backoff delay. +        //  Will modify the current_reconnect_ivl used for next call +        //  Returns the currently used interval +        int get_new_reconnect_ivl (); +          //  Set address to connect to.          int set_address (const char *protocol, const char *addr_); @@ -55,16 +77,10 @@ namespace zmq          //  Close the connecting socket.          int close (); -        //  Get the file descriptor to poll on to get notified about -        //  connection success. -        fd_t get_fd (); -          //  Get the file descriptor of newly created connection. Returns          //  retired_fd if the connection was unsuccessfull.          fd_t connect (); -    private: -          //  Address to connect to.          sockaddr_storage addr;          socklen_t addr_len; @@ -72,6 +88,22 @@ namespace zmq          //  Underlying socket.          fd_t s; +        //  Handle corresponding to the listening socket. +        handle_t handle; + +        //  If true file descriptor is registered with the poller and 'handle' +        //  contains valid value. +        bool handle_valid; + +        //  If true, connecter is waiting a while before trying to connect. +        bool wait; + +        //  Reference to the session we belong to. +        class session_t *session; + +        //  Current reconnect ivl, updated for backoff strategy +        int current_reconnect_ivl; +          tcp_connecter_t (const tcp_connecter_t&);          const tcp_connecter_t &operator = (const tcp_connecter_t&);      }; diff --git a/src/tcp_engine.cpp b/src/tcp_engine.cpp new file mode 100644 index 0000000..1972809 --- /dev/null +++ b/src/tcp_engine.cpp @@ -0,0 +1,401 @@ +/* +    Copyright (c) 2007-2011 iMatix Corporation +    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "platform.hpp" +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include <unistd.h> +#include <sys/socket.h> +#include <arpa/inet.h> +#include <netinet/tcp.h> +#include <netinet/in.h> +#include <netdb.h> +#include <fcntl.h> +#endif + +#include <string.h> +#include <new> + +#include "tcp_engine.hpp" +#include "io_thread.hpp" +#include "session.hpp" +#include "config.hpp" +#include "err.hpp" + +zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) : +    s (retired_fd), +    inpos (NULL), +    insize (0), +    decoder (in_batch_size, options_.maxmsgsize), +    outpos (NULL), +    outsize (0), +    encoder (out_batch_size), +    session (NULL), +    leftover_session (NULL), +    options (options_), +    plugged (false) +{ +    //  Initialise the underlying socket. +    int rc = open (fd_, options.sndbuf, options.rcvbuf); +    zmq_assert (rc == 0); +} + +zmq::tcp_engine_t::~tcp_engine_t () +{ +    zmq_assert (!plugged); + +    if (s != retired_fd) +        close (); +} + +void zmq::tcp_engine_t::plug (io_thread_t *io_thread_, session_t *session_) +{ +    zmq_assert (!plugged); +    plugged = true; +    leftover_session = NULL; + +    //  Connect to session object. +    zmq_assert (!session); +    zmq_assert (session_); +    encoder.set_session (session_); +    decoder.set_session (session_); +    session = session_; + +    //  Connect to I/O threads poller object. +    io_object_t::plug (io_thread_); +    handle = add_fd (s); +    set_pollin (handle); +    set_pollout (handle); + +    //  Flush all the data that may have been already received downstream. +    in_event (); +} + +void zmq::tcp_engine_t::unplug () +{ +    zmq_assert (plugged); +    plugged = false; + +    //  Cancel all fd subscriptions. +    rm_fd (handle); + +    //  Disconnect from I/O threads poller object. +    io_object_t::unplug (); + +    //  Disconnect from session object. +    encoder.set_session (NULL); +    decoder.set_session (NULL); +    leftover_session = session; +    session = NULL; +} + +void zmq::tcp_engine_t::terminate () +{ +    unplug (); +    delete this; +} + +void zmq::tcp_engine_t::in_event () +{ +    bool disconnection = false; + +    //  If there's no data to process in the buffer... +    if (!insize) { + +        //  Retrieve the buffer and read as much data as possible. +        //  Note that buffer can be arbitrarily large. However, we assume +        //  the underlying TCP layer has fixed buffer size and thus the +        //  number of bytes read will be always limited. +        decoder.get_buffer (&inpos, &insize); +        insize = read (inpos, insize); + +        //  Check whether the peer has closed the connection. +        if (insize == (size_t) -1) { +            insize = 0; +            disconnection = true; +        } +    } + +    //  Push the data to the decoder. +    size_t processed = decoder.process_buffer (inpos, insize); + +    if (unlikely (processed == (size_t) -1)) { +        disconnection = true; +    } +    else { + +        //  Stop polling for input if we got stuck. +        if (processed < insize) { + +            //  This may happen if queue limits are in effect. +            if (plugged) +                reset_pollin (handle); +        } + +        //  Adjust the buffer. +        inpos += processed; +        insize -= processed; +    } + +    //  Flush all messages the decoder may have produced. +    //  If IO handler has unplugged engine, flush transient IO handler. +    if (unlikely (!plugged)) { +        zmq_assert (leftover_session); +        leftover_session->flush (); +    } else { +        session->flush (); +    } + +    if (session && disconnection) +        error (); +} + +void zmq::tcp_engine_t::out_event () +{ +    //  If write buffer is empty, try to read new data from the encoder. +    if (!outsize) { + +        outpos = NULL; +        encoder.get_data (&outpos, &outsize); + +        //  If IO handler has unplugged engine, flush transient IO handler. +        if (unlikely (!plugged)) { +            zmq_assert (leftover_session); +            leftover_session->flush (); +            return; +        } + +        //  If there is no data to send, stop polling for output. +        if (outsize == 0) { +            reset_pollout (handle); +            return; +        } +    } + +    //  If there are any data to write in write buffer, write as much as +    //  possible to the socket. Note that amount of data to write can be +    //  arbitratily large. However, we assume that underlying TCP layer has +    //  limited transmission buffer and thus the actual number of bytes +    //  written should be reasonably modest. +    int nbytes = write (outpos, outsize); + +    //  Handle problems with the connection. +    if (nbytes == -1) { +        error (); +        return; +    } + +    outpos += nbytes; +    outsize -= nbytes; +} + +void zmq::tcp_engine_t::activate_out () +{ +    set_pollout (handle); + +    //  Speculative write: The assumption is that at the moment new message +    //  was sent by the user the socket is probably available for writing. +    //  Thus we try to write the data to socket avoiding polling for POLLOUT. +    //  Consequently, the latency should be better in request/reply scenarios. +    out_event (); +} + +void zmq::tcp_engine_t::activate_in () +{ +    set_pollin (handle); + +    //  Speculative read. +    in_event (); +} + +void zmq::tcp_engine_t::error () +{ +    zmq_assert (session); +    session->detach (); +    unplug (); +    delete this; +} + +#ifdef ZMQ_HAVE_WINDOWS + +int zmq::tcp_engine_t::open (fd_t fd_, int sndbuf_, int rcvbuf_) +{ +    zmq_assert (s == retired_fd); +    s = fd_; + +    if (sndbuf_) { +        int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, +            (char*) &sndbuf_, sizeof (int)); +        errno_assert (rc == 0); +    } + +    if (rcvbuf_) { +        int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, +            (char*) &rcvbuf_, sizeof (int)); +        errno_assert (rc == 0); +    } + +    return 0; +} + +int zmq::tcp_engine_t::close () +{ +    zmq_assert (s != retired_fd); +    int rc = closesocket (s); +    wsa_assert (rc != SOCKET_ERROR); +    s = retired_fd; +    return 0; +} + +int zmq::tcp_engine_t::write (const void *data_, size_t size_) +{ +    int nbytes = send (s, (char*) data_, (int) size_, 0); + +    //  If not a single byte can be written to the socket in non-blocking mode +    //  we'll get an error (this may happen during the speculative write). +    if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) +        return 0; +		 +    //  Signalise peer failure. +    if (nbytes == -1 && ( +          WSAGetLastError () == WSAENETDOWN || +          WSAGetLastError () == WSAENETRESET || +          WSAGetLastError () == WSAEHOSTUNREACH || +          WSAGetLastError () == WSAECONNABORTED || +          WSAGetLastError () == WSAETIMEDOUT || +          WSAGetLastError () == WSAECONNRESET)) +        return -1; + +    wsa_assert (nbytes != SOCKET_ERROR); + +    return (size_t) nbytes; +} + +int zmq::tcp_engine_t::read (void *data_, size_t size_) +{ +    int nbytes = recv (s, (char*) data_, (int) size_, 0); + +    //  If not a single byte can be read from the socket in non-blocking mode +    //  we'll get an error (this may happen during the speculative read). +    if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) +        return 0; + +    //  Connection failure. +    if (nbytes == -1 && ( +          WSAGetLastError () == WSAENETDOWN || +          WSAGetLastError () == WSAENETRESET || +          WSAGetLastError () == WSAECONNABORTED || +          WSAGetLastError () == WSAETIMEDOUT || +          WSAGetLastError () == WSAECONNRESET || +          WSAGetLastError () == WSAECONNREFUSED || +          WSAGetLastError () == WSAENOTCONN)) +        return -1; + +    wsa_assert (nbytes != SOCKET_ERROR); + +    //  Orderly shutdown by the other peer. +    if (nbytes == 0) +        return -1;  + +    return (size_t) nbytes; +} + +#else + +int zmq::tcp_engine_t::open (fd_t fd_, int sndbuf_, int rcvbuf_) +{ +    assert (s == retired_fd); +    s = fd_; + +    if (sndbuf_) { +        int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, &sndbuf_, sizeof (int)); +        errno_assert (rc == 0); +    } + +    if (rcvbuf_) { +        int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, &rcvbuf_, sizeof (int)); +        errno_assert (rc == 0); +    } + +#if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD +    int set = 1; +    int rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int)); +    errno_assert (rc == 0); +#endif + +    return 0; +} + +int zmq::tcp_engine_t::close () +{ +    zmq_assert (s != retired_fd); +    int rc = ::close (s); +    if (rc != 0) +        return -1; +    s = retired_fd; +    return 0; +} + +int zmq::tcp_engine_t::write (const void *data_, size_t size_) +{ +    ssize_t nbytes = send (s, data_, size_, 0); + +    //  Several errors are OK. When speculative write is being done we may not +    //  be able to write a single byte to the socket. Also, SIGSTOP issued +    //  by a debugging tool can result in EINTR error. +    if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || +          errno == EINTR)) +        return 0; + +    //  Signalise peer failure. +    if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE)) +        return -1; + +    errno_assert (nbytes != -1); +    return (size_t) nbytes; +} + +int zmq::tcp_engine_t::read (void *data_, size_t size_) +{ +    ssize_t nbytes = recv (s, data_, size_, 0); + +    //  Several errors are OK. When speculative read is being done we may not +    //  be able to read a single byte to the socket. Also, SIGSTOP issued +    //  by a debugging tool can result in EINTR error. +    if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || +          errno == EINTR)) +        return 0; + +    //  Signalise peer failure. +    if (nbytes == -1 && (errno == ECONNRESET || errno == ECONNREFUSED || +          errno == ETIMEDOUT || errno == EHOSTUNREACH)) +        return -1; + +    errno_assert (nbytes != -1); + +    //  Orderly shutdown by the peer. +    if (nbytes == 0) +        return -1; + +    return (size_t) nbytes; +} + +#endif diff --git a/src/zmq_engine.hpp b/src/tcp_engine.hpp index ad4bc8a..db17122 100644 --- a/src/zmq_engine.hpp +++ b/src/tcp_engine.hpp @@ -18,16 +18,14 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ -#ifndef __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__ -#define __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__ +#ifndef __ZMQ_TCP_ENGINE_HPP_INCLUDED__ +#define __ZMQ_TCP_ENGINE_HPP_INCLUDED__  #include <stddef.h> -#include <string> - +#include "fd.hpp"  #include "i_engine.hpp"  #include "io_object.hpp" -#include "tcp_socket.hpp"  #include "encoder.hpp"  #include "decoder.hpp"  #include "options.hpp" @@ -35,12 +33,12 @@  namespace zmq  { -    class zmq_engine_t : public io_object_t, public i_engine +    class tcp_engine_t : public io_object_t, public i_engine      {      public: -        zmq_engine_t (fd_t fd_, const options_t &options_); -        ~zmq_engine_t (); +        tcp_engine_t (fd_t fd_, const options_t &options_); +        ~tcp_engine_t ();          //  i_engine interface implementation.          void plug (class io_thread_t *io_thread_, class session_t *session_); @@ -58,7 +56,26 @@ namespace zmq          //  Function to handle network disconnections.          void error (); -        tcp_socket_t tcp_socket; +        //  Associates a socket with a native socket descriptor. +        int open (fd_t fd_, int sndbuf_, int rcvbuf_); +          +        //  Closes the underlying socket. +        int close (); + +        //  Writes data to the socket. Returns the number of bytes actually +        //  written (even zero is to be considered to be a success). In case +        //  of error or orderly shutdown by the other peer -1 is returned. +        int write (const void *data_, size_t size_); + +        //  Reads data from the socket (up to 'size' bytes). Returns the number +        //  of bytes actually read (even zero is to be considered to be +        //  a success). In case of error or orderly shutdown by the other +        //  peer -1 is returned. +        int read (void *data_, size_t size_); + +        //  Underlying socket. +        fd_t s; +          handle_t handle;          unsigned char *inpos; @@ -79,8 +96,8 @@ namespace zmq          bool plugged; -        zmq_engine_t (const zmq_engine_t&); -        const zmq_engine_t &operator = (const zmq_engine_t&); +        tcp_engine_t (const tcp_engine_t&); +        const tcp_engine_t &operator = (const tcp_engine_t&);      };  } diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index f40b0fe..b409891 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -18,19 +18,42 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ +#include <new> +  #include <string.h>  #include "tcp_listener.hpp"  #include "platform.hpp" -#include "ip.hpp" +#include "tcp_engine.hpp" +#include "io_thread.hpp" +#include "session.hpp"  #include "config.hpp"  #include "err.hpp"  #ifdef ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include <unistd.h> +#include <sys/socket.h> +#include <arpa/inet.h> +#include <netinet/tcp.h> +#include <netinet/in.h> +#include <netdb.h> +#include <fcntl.h> +#ifndef ZMQ_HAVE_OPENVMS +#include <sys/un.h> +#else +#include <ioctl.h> +#endif +#endif -zmq::tcp_listener_t::tcp_listener_t () : +zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_, +      socket_base_t *socket_, const options_t &options_) : +    own_t (io_thread_, options_), +    io_object_t (io_thread_),      has_file (false), -    s (retired_fd) +    s (retired_fd), +    socket (socket_)  {      memset (&addr, 0, sizeof (addr));      addr_len = 0; @@ -42,8 +65,48 @@ zmq::tcp_listener_t::~tcp_listener_t ()          close ();  } -int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, -    int backlog_) +void zmq::tcp_listener_t::process_plug () +{ +    //  Start polling for incoming connections. +    handle = add_fd (s); +    set_pollin (handle); +} + +void zmq::tcp_listener_t::process_term (int linger_) +{ +    rm_fd (handle); +    own_t::process_term (linger_); +} + +void zmq::tcp_listener_t::in_event () +{ +    fd_t fd = accept (); + +    //  If connection was reset by the peer in the meantime, just ignore it. +    //  TODO: Handle specific errors like ENFILE/EMFILE etc. +    if (fd == retired_fd) +        return; +    //  Create the engine object for this connection. +    tcp_engine_t *engine = new (std::nothrow) tcp_engine_t (fd, options); +    alloc_assert (engine); + +    //  Choose I/O thread to run connecter in. Given that we are already +    //  running in an I/O thread, there must be at least one available. +    io_thread_t *io_thread = choose_io_thread (options.affinity); +    zmq_assert (io_thread); + +    //  Create and launch a session object.  +    session_t *session = new (std::nothrow) +        session_t (io_thread, false, socket, options, NULL, NULL); +    alloc_assert (session); +    session->inc_seqnum (); +    launch_child (session); +    send_attach (session, engine, false); +} + +#ifdef ZMQ_HAVE_WINDOWS + +int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_)  {      //  IPC protocol is not supported on Windows platform.      if (strcmp (protocol_, "tcp") != 0 ) { @@ -57,7 +120,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_,          return rc;      //  Create a listening socket. -    s = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); +    s = ::socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);      if (s == INVALID_SOCKET) {          wsa_error_to_errno ();          return -1; @@ -82,7 +145,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_,      }      //  Listen for incomming connections. -    rc = listen (s, backlog_); +    rc = listen (s, options.backlog);      if (rc == SOCKET_ERROR) {          wsa_error_to_errno ();          return -1; @@ -100,11 +163,6 @@ int zmq::tcp_listener_t::close ()      return 0;  } -zmq::fd_t zmq::tcp_listener_t::get_fd () -{ -    return s; | 
