diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile.am | 4 | ||||
| -rw-r--r-- | src/session_base.cpp | 13 | ||||
| -rw-r--r-- | src/socket_base.cpp | 27 | ||||
| -rw-r--r-- | src/vtcp_connecter.cpp | 252 | ||||
| -rw-r--r-- | src/vtcp_connecter.hpp | 121 | ||||
| -rw-r--r-- | src/vtcp_listener.cpp | 125 | ||||
| -rw-r--r-- | src/vtcp_listener.hpp | 72 | 
7 files changed, 1 insertions, 613 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index 137ba73..2b6c8f0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -67,8 +67,6 @@ libzmq_la_SOURCES = \      tcp_listener.hpp \      thread.hpp \      trie.hpp \ -    vtcp_connecter.hpp \ -    vtcp_listener.hpp \      windows.hpp \      wire.hpp \      xpub.hpp \ @@ -125,8 +123,6 @@ libzmq_la_SOURCES = \      tcp_listener.cpp \      thread.cpp \      trie.cpp \ -    vtcp_connecter.cpp \ -    vtcp_listener.cpp \      xpub.cpp \      xrep.cpp \      xreq.cpp \ diff --git a/src/session_base.cpp b/src/session_base.cpp index 5929640..d1d31c9 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -27,7 +27,6 @@  #include "likely.hpp"  #include "tcp_connecter.hpp"  #include "ipc_connecter.hpp" -#include "vtcp_connecter.hpp"  #include "pgm_sender.hpp"  #include "pgm_receiver.hpp" @@ -394,18 +393,6 @@ void zmq::session_base_t::start_connecting (bool wait_)      }  #endif -#if defined ZMQ_HAVE_VTCP -    if (protocol == "vtcp") { - -        vtcp_connecter_t *connecter = new (std::nothrow) vtcp_connecter_t ( -            io_thread, this, options, address.c_str (), -            wait_); -        alloc_assert (connecter); -        launch_child (connecter); -        return; -    } -#endif -  #if defined ZMQ_HAVE_OPENPGM      //  Both PGM and EPGM transports are using the same infrastructure. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 1137d22..e990ba1 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -37,7 +37,6 @@  #include "socket_base.hpp"  #include "tcp_listener.hpp"  #include "ipc_listener.hpp" -#include "vtcp_listener.hpp"  #include "tcp_connecter.hpp"  #include "io_thread.hpp"  #include "session_base.hpp" @@ -173,8 +172,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)  {      //  First check out whether the protcol is something we are aware of.      if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && -          protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys" && -          protocol_ != "vtcp") { +          protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys") {          errno = EPROTONOSUPPORT;          return -1;      } @@ -188,14 +186,6 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)      }  #endif -    //  If 0MQ is not compiled with VTCP, vtcp transport is not avaialble. -#if !defined ZMQ_HAVE_VTCP -    if (protocol_ == "vtcp") { -        errno = EPROTONOSUPPORT; -        return -1; -    } -#endif -      //  IPC transport is not available on Windows and OpenVMS.  #if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS      if (protocol_ == "ipc") { @@ -389,21 +379,6 @@ int zmq::socket_base_t::bind (const char *addr_)      }  #endif -#if defined ZMQ_HAVE_VTCP -    if (protocol == "vtcp") { -        vtcp_listener_t *listener = new (std::nothrow) vtcp_listener_t ( -            io_thread, this, options); -        alloc_assert (listener); -        int rc = listener->set_address (address.c_str ()); -        if (rc != 0) { -            delete listener; -            return -1; -        } -        launch_child (listener); -        return 0; -    } -#endif -      zmq_assert (false);      return -1;  } diff --git a/src/vtcp_connecter.cpp b/src/vtcp_connecter.cpp deleted file mode 100644 index 7a2722b..0000000 --- a/src/vtcp_connecter.cpp +++ /dev/null @@ -1,252 +0,0 @@ -/* -    Copyright (c) 2009-2011 250bpm s.r.o. -    Copyright (c) 2007-2011 iMatix Corporation -    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the GNU Lesser General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    GNU Lesser General Public License for more details. - -    You should have received a copy of the GNU Lesser General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "vtcp_connecter.hpp" - -#if defined ZMQ_HAVE_VTCP - -#include <new> -#include <string> - -#include "stream_engine.hpp" -#include "io_thread.hpp" -#include "platform.hpp" -#include "random.hpp" -#include "likely.hpp" -#include "err.hpp" -#include "ip.hpp" - -#if defined ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#else -#include <unistd.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <arpa/inet.h> -#include <netinet/tcp.h> -#include <netinet/in.h> -#include <netdb.h> -#include <fcntl.h> -#ifdef ZMQ_HAVE_OPENVMS -#include <ioctl.h> -#endif -#endif - -zmq::vtcp_connecter_t::vtcp_connecter_t (class io_thread_t *io_thread_, -      class session_base_t *session_, const options_t &options_, -      const char *address_, bool wait_) : -    own_t (io_thread_, options_), -    io_object_t (io_thread_), -    s (retired_fd), -    handle_valid (false), -    wait (wait_), -    session (session_), -    current_reconnect_ivl(options.reconnect_ivl) -{ -    subport = 0; - -    int rc = set_address (address_); -    zmq_assert (rc == 0); -} - -zmq::vtcp_connecter_t::~vtcp_connecter_t () -{ -    if (wait) -        cancel_timer (reconnect_timer_id); -    if (handle_valid) -        rm_fd (handle); - -    if (s != retired_fd) -        close (); -} - -int zmq::vtcp_connecter_t::set_address (const char *addr_) -{ -    const char *delimiter = strrchr (addr_, '.'); -    if (!delimiter) { -        delimiter = strrchr (addr_, ':'); -        if (!delimiter) { -            errno = EINVAL; -            return -1; -        } -        std::string addr_str (addr_, delimiter - addr_); -        addr_str += ":9220"; -        std::string subport_str (delimiter + 1); -        subport = (vtcp_subport_t) atoi (subport_str.c_str ()); -        int rc = address.resolve (addr_str.c_str (), false, true); -        if (rc != 0) -            return -1; -    } -    else { -        std::string addr_str (addr_, delimiter - addr_); -        std::string subport_str (delimiter + 1); -        subport = (vtcp_subport_t) atoi (subport_str.c_str ()); -        int rc = address.resolve (addr_str.c_str (), false, true); -        if (rc != 0) -            return -1; -    } - -    return 0; -} - -void zmq::vtcp_connecter_t::process_plug () -{ -    if (wait) -        add_reconnect_timer(); -    else -        start_connecting (); -} - -void zmq::vtcp_connecter_t::in_event () -{ -    //  We are not polling for incomming data, so we are actually called -    //  because of error here. However, we can get error on out event as well -    //  on some platforms, so we'll simply handle both events in the same way. -    out_event (); -} - -void zmq::vtcp_connecter_t::out_event () -{ -    fd_t fd = connect (); -    rm_fd (handle); -    handle_valid = false; - -    //  Handle the error condition by attempt to reconnect. -    if (fd == retired_fd) { -        close (); -        wait = true; -        add_reconnect_timer(); -        return; -    } - -    //  Create the engine object for this connection. -    stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); -    alloc_assert (engine); - -    //  Attach the engine to the corresponding session object. -    send_attach (session, engine); - -    //  Shut the connecter down. -    terminate (); -} - -void zmq::vtcp_connecter_t::timer_event (int id_) -{ -    zmq_assert (id_ == reconnect_timer_id); -    wait = false; -    start_connecting (); -} - -void zmq::vtcp_connecter_t::start_connecting () -{ -    //  Open the connecting socket. -    int rc = open (); - -    //  Handle error condition by eventual reconnect. -    if (unlikely (rc != 0)) { -        errno_assert (false); -        wait = true; -        add_reconnect_timer(); -        return; -    } - -    //  Connection establishment may be dealyed. Poll for its completion. -    handle = add_fd (s); -    handle_valid = true; -    set_pollout (handle); -} - -void zmq::vtcp_connecter_t::add_reconnect_timer() -{ -    add_timer (get_new_reconnect_ivl(), reconnect_timer_id); -} - -int zmq::vtcp_connecter_t::get_new_reconnect_ivl () -{ -    //  The new interval is the current interval + random value. -    int this_interval = current_reconnect_ivl + -        (generate_random () % options.reconnect_ivl); - -    //  Only change the current reconnect interval  if the maximum reconnect -    //  interval was set and if it's larger than the reconnect interval. -    if (options.reconnect_ivl_max > 0 &&  -        options.reconnect_ivl_max > options.reconnect_ivl) { - -        //  Calculate the next interval -        current_reconnect_ivl = current_reconnect_ivl * 2; -        if(current_reconnect_ivl >= options.reconnect_ivl_max) { -            current_reconnect_ivl = options.reconnect_ivl_max; -        }    -    } -    return this_interval; -} - -int zmq::vtcp_connecter_t::open () -{ -    zmq_assert (s == retired_fd); - -    //  Start the connection procedure. -    sockaddr_in *paddr = (sockaddr_in*) address.addr (); -    s = vtcp_connect (paddr->sin_addr.s_addr, ntohs (paddr->sin_port)); - -    //  Connect was successfull immediately. -    if (s != retired_fd) -        return 0; - -    //  Asynchronous connect was launched. -    if (errno == EINPROGRESS) { -        errno = EAGAIN; -        return -1; -    } - -    //  Error occured. -    return -1; -} - -zmq::fd_t zmq::vtcp_connecter_t::connect () -{ -    int rc = vtcp_acceptc (s, subport); -    if (rc != 0) { -        int err = errno; -        close (); -        errno = err; -        return retired_fd; -    } - -    tune_tcp_socket (s); - -    fd_t result = s; -    s = retired_fd; -    return result; -} - -int zmq::vtcp_connecter_t::close () -{ -    zmq_assert (s != retired_fd); -    int rc = ::close (s); -    if (rc != 0) -        return -1; -    s = retired_fd; -    return 0; -} - -#endif - diff --git a/src/vtcp_connecter.hpp b/src/vtcp_connecter.hpp deleted file mode 100644 index 6e0cd6c..0000000 --- a/src/vtcp_connecter.hpp +++ /dev/null @@ -1,121 +0,0 @@ -/* -    Copyright (c) 2009-2011 250bpm s.r.o. -    Copyright (c) 2007-2011 iMatix Corporation -    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the GNU Lesser General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    GNU Lesser General Public License for more details. - -    You should have received a copy of the GNU Lesser General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __VTCP_CONNECTER_HPP_INCLUDED__ -#define __VTCP_CONNECTER_HPP_INCLUDED__ - -#include "platform.hpp" - -#if defined ZMQ_HAVE_VTCP - -#include <vtcp.h> - -#include "fd.hpp" -#include "own.hpp" -#include "stdint.hpp" -#include "io_object.hpp" -#include "tcp_address.hpp" - -namespace zmq -{ - -    class vtcp_connecter_t : public own_t, public io_object_t -    { -    public: - -        //  If 'delay' is true connecter first waits for a while, then starts -        //  connection process. -        vtcp_connecter_t (class io_thread_t *io_thread_, -            class session_base_t *session_, const options_t &options_, -            const char *address_, bool delay_); -        ~vtcp_connecter_t (); - -    private: - -        //  ID of the timer used to delay the reconnection. -        enum {reconnect_timer_id = 1}; - -        //  Handlers for incoming commands. -        void process_plug (); - -        //  Handlers for I/O events. -        void in_event (); -        void out_event (); -        void timer_event (int id_); - -        //  Internal function to start the actual connection establishment. -        void start_connecting (); - -        //  Internal function to add a reconnect timer -        void add_reconnect_timer(); - -        //  Internal function to return a reconnect backoff delay. -        //  Will modify the current_reconnect_ivl used for next call -        //  Returns the currently used interval -        int get_new_reconnect_ivl (); - -        //  Set address to connect to. -        int set_address (const char *addr_); - -        //  Open TCP connecting socket. Returns -1 in case of error, -        //  0 if connect was successfull immediately and 1 if async connect -        //  was launched. -        int open (); - -        //  Close the connecting socket. -        int close (); - -        //  Get the file descriptor of newly created connection. Returns -        //  retired_fd if the connection was unsuccessfull. -        fd_t connect (); - -        //  Address to connect to. -        tcp_address_t address; -        vtcp_subport_t subport; - -        //  Underlying socket. -        fd_t s; - -        //  Handle corresponding to the listening socket. -        handle_t handle; - -        //  If true file descriptor is registered with the poller and 'handle' -        //  contains valid value. -        bool handle_valid; - -        //  If true, connecter is waiting a while before trying to connect. -        bool wait; - -        //  Reference to the session we belong to. -        class session_base_t *session; - -        //  Current reconnect ivl, updated for backoff strategy -        int current_reconnect_ivl; - -        vtcp_connecter_t (const vtcp_connecter_t&); -        const vtcp_connecter_t &operator = (const vtcp_connecter_t&); -    }; - -} - -#endif - -#endif diff --git a/src/vtcp_listener.cpp b/src/vtcp_listener.cpp deleted file mode 100644 index 826c619..0000000 --- a/src/vtcp_listener.cpp +++ /dev/null @@ -1,125 +0,0 @@ -/* -    Copyright (c) 2009-2011 250bpm s.r.o. -    Copyright (c) 2007-2011 iMatix Corporation -    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the GNU Lesser General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    GNU Lesser General Public License for more details. - -    You should have received a copy of the GNU Lesser General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "vtcp_listener.hpp" - -#if defined ZMQ_HAVE_VTCP - -#include <string> -#include <string.h> -#include <vtcp.h> - -#include "stream_engine.hpp" -#include "session_base.hpp" -#include "stdint.hpp" -#include "err.hpp" -#include "ip.hpp" - -zmq::vtcp_listener_t::vtcp_listener_t (io_thread_t *io_thread_, -        socket_base_t *socket_, options_t &options_) : -    own_t (io_thread_, options_), -    io_object_t (io_thread_), -    s (retired_fd), -    socket (socket_) -{ -} - -zmq::vtcp_listener_t::~vtcp_listener_t () -{ -    if (s != retired_fd) { -        int rc = ::close (s); -        errno_assert (rc == 0); -        s = retired_fd; -    } -} - -int zmq::vtcp_listener_t::set_address (const char *addr_) -{ -    //  VTCP doesn't allow for binding to a specific interface. Connection -    //  string has to begin with *: (INADDR_ANY). -    if (strlen (addr_) < 2 || addr_ [0] != '*' || addr_ [1] != ':') { -        errno = EADDRNOTAVAIL; -        return -1; -    } - -    //  Parse port and subport. -    uint16_t port; -    uint32_t subport; -    const char *delimiter = strrchr (addr_, '.'); -    if (!delimiter) { -        port = 9220; -        subport = (uint32_t) atoi (addr_ + 2); -    } -    else { -        std::string port_str (addr_ + 2, delimiter - addr_ - 2); -        std::string subport_str (delimiter + 1); -        port = (uint16_t) atoi (port_str.c_str ()); -        subport = (uint32_t) atoi (subport_str.c_str ()); -    } - -    //  Start listening. -    s = vtcp_bind (port, subport); -    if (s == retired_fd) -        return -1; - -    return 0; -} - -void zmq::vtcp_listener_t::process_plug () -{ -    //  Start polling for incoming connections. -    handle = add_fd (s); -    set_pollin (handle); -} - -void zmq::vtcp_listener_t::process_term (int linger_) -{ -    rm_fd (handle); -    own_t::process_term (linger_); -} - -void zmq::vtcp_listener_t::in_event () -{ -    fd_t fd = vtcp_acceptb (s); -    if (fd == retired_fd) -        return; - -    tune_tcp_socket (fd); - -    //  Create the engine object for this connection. -    stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); -    alloc_assert (engine); - -    //  Choose I/O thread to run connecter in. Given that we are already -    //  running in an I/O thread, there must be at least one available. -    io_thread_t *io_thread = choose_io_thread (options.affinity); -    zmq_assert (io_thread); - -    //  Create and launch a session object.  -    session_base_t *session = session_base_t::create (io_thread, false, socket, -        options, NULL, NULL); -    alloc_assert (session); -    session->inc_seqnum (); -    launch_child (session); -    send_attach (session, engine, false); -} - -#endif diff --git a/src/vtcp_listener.hpp b/src/vtcp_listener.hpp deleted file mode 100644 index 705ce7d..0000000 --- a/src/vtcp_listener.hpp +++ /dev/null @@ -1,72 +0,0 @@ -/* -    Copyright (c) 2009-2011 250bpm s.r.o. -    Copyright (c) 2007-2011 iMatix Corporation -    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the GNU Lesser General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    GNU Lesser General Public License for more details. - -    You should have received a copy of the GNU Lesser General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_VTCP_LISTENER_HPP_INCLUDED__ -#define __ZMQ_VTCP_LISTENER_HPP_INCLUDED__ - -#include "platform.hpp" - -#if defined ZMQ_HAVE_VTCP - -#include "own.hpp" -#include "io_object.hpp" -#include "fd.hpp" - -namespace zmq -{ - -    class vtcp_listener_t : public own_t, public io_object_t -    { -    public: - -        vtcp_listener_t (class io_thread_t *io_thread_, -            class socket_base_t *socket_, class options_t &options_); -        ~vtcp_listener_t (); - -        int set_address (const char *addr_); - -    private: - -        //  Handlers for incoming commands. -        void process_plug (); -        void process_term (int linger_); - -        //  Handlers for I/O events. -        void in_event (); - -        //  VTCP listener socket. -        fd_t s; - -        //  Handle corresponding to the listening socket. -        handle_t handle; - -        //  Socket the listerner belongs to. -        class socket_base_t *socket; - -        vtcp_listener_t (const vtcp_listener_t&); -        const vtcp_listener_t &operator = (const vtcp_listener_t&); -    }; - -} - -#endif - -#endif | 
