diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 4 | ||||
-rw-r--r-- | src/session_base.cpp | 13 | ||||
-rw-r--r-- | src/socket_base.cpp | 27 | ||||
-rw-r--r-- | src/vtcp_connecter.cpp | 252 | ||||
-rw-r--r-- | src/vtcp_connecter.hpp | 121 | ||||
-rw-r--r-- | src/vtcp_listener.cpp | 125 | ||||
-rw-r--r-- | src/vtcp_listener.hpp | 72 |
7 files changed, 1 insertions, 613 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 137ba73..2b6c8f0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -67,8 +67,6 @@ libzmq_la_SOURCES = \ tcp_listener.hpp \ thread.hpp \ trie.hpp \ - vtcp_connecter.hpp \ - vtcp_listener.hpp \ windows.hpp \ wire.hpp \ xpub.hpp \ @@ -125,8 +123,6 @@ libzmq_la_SOURCES = \ tcp_listener.cpp \ thread.cpp \ trie.cpp \ - vtcp_connecter.cpp \ - vtcp_listener.cpp \ xpub.cpp \ xrep.cpp \ xreq.cpp \ diff --git a/src/session_base.cpp b/src/session_base.cpp index 5929640..d1d31c9 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -27,7 +27,6 @@ #include "likely.hpp" #include "tcp_connecter.hpp" #include "ipc_connecter.hpp" -#include "vtcp_connecter.hpp" #include "pgm_sender.hpp" #include "pgm_receiver.hpp" @@ -394,18 +393,6 @@ void zmq::session_base_t::start_connecting (bool wait_) } #endif -#if defined ZMQ_HAVE_VTCP - if (protocol == "vtcp") { - - vtcp_connecter_t *connecter = new (std::nothrow) vtcp_connecter_t ( - io_thread, this, options, address.c_str (), - wait_); - alloc_assert (connecter); - launch_child (connecter); - return; - } -#endif - #if defined ZMQ_HAVE_OPENPGM // Both PGM and EPGM transports are using the same infrastructure. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 1137d22..e990ba1 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -37,7 +37,6 @@ #include "socket_base.hpp" #include "tcp_listener.hpp" #include "ipc_listener.hpp" -#include "vtcp_listener.hpp" #include "tcp_connecter.hpp" #include "io_thread.hpp" #include "session_base.hpp" @@ -173,8 +172,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) { // First check out whether the protcol is something we are aware of. if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && - protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys" && - protocol_ != "vtcp") { + protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys") { errno = EPROTONOSUPPORT; return -1; } @@ -188,14 +186,6 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) } #endif - // If 0MQ is not compiled with VTCP, vtcp transport is not avaialble. -#if !defined ZMQ_HAVE_VTCP - if (protocol_ == "vtcp") { - errno = EPROTONOSUPPORT; - return -1; - } -#endif - // IPC transport is not available on Windows and OpenVMS. #if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS if (protocol_ == "ipc") { @@ -389,21 +379,6 @@ int zmq::socket_base_t::bind (const char *addr_) } #endif -#if defined ZMQ_HAVE_VTCP - if (protocol == "vtcp") { - vtcp_listener_t *listener = new (std::nothrow) vtcp_listener_t ( - io_thread, this, options); - alloc_assert (listener); - int rc = listener->set_address (address.c_str ()); - if (rc != 0) { - delete listener; - return -1; - } - launch_child (listener); - return 0; - } -#endif - zmq_assert (false); return -1; } diff --git a/src/vtcp_connecter.cpp b/src/vtcp_connecter.cpp deleted file mode 100644 index 7a2722b..0000000 --- a/src/vtcp_connecter.cpp +++ /dev/null @@ -1,252 +0,0 @@ -/* - Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "vtcp_connecter.hpp" - -#if defined ZMQ_HAVE_VTCP - -#include <new> -#include <string> - -#include "stream_engine.hpp" -#include "io_thread.hpp" -#include "platform.hpp" -#include "random.hpp" -#include "likely.hpp" -#include "err.hpp" -#include "ip.hpp" - -#if defined ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#else -#include <unistd.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <arpa/inet.h> -#include <netinet/tcp.h> -#include <netinet/in.h> -#include <netdb.h> -#include <fcntl.h> -#ifdef ZMQ_HAVE_OPENVMS -#include <ioctl.h> -#endif -#endif - -zmq::vtcp_connecter_t::vtcp_connecter_t (class io_thread_t *io_thread_, - class session_base_t *session_, const options_t &options_, - const char *address_, bool wait_) : - own_t (io_thread_, options_), - io_object_t (io_thread_), - s (retired_fd), - handle_valid (false), - wait (wait_), - session (session_), - current_reconnect_ivl(options.reconnect_ivl) -{ - subport = 0; - - int rc = set_address (address_); - zmq_assert (rc == 0); -} - -zmq::vtcp_connecter_t::~vtcp_connecter_t () -{ - if (wait) - cancel_timer (reconnect_timer_id); - if (handle_valid) - rm_fd (handle); - - if (s != retired_fd) - close (); -} - -int zmq::vtcp_connecter_t::set_address (const char *addr_) -{ - const char *delimiter = strrchr (addr_, '.'); - if (!delimiter) { - delimiter = strrchr (addr_, ':'); - if (!delimiter) { - errno = EINVAL; - return -1; - } - std::string addr_str (addr_, delimiter - addr_); - addr_str += ":9220"; - std::string subport_str (delimiter + 1); - subport = (vtcp_subport_t) atoi (subport_str.c_str ()); - int rc = address.resolve (addr_str.c_str (), false, true); - if (rc != 0) - return -1; - } - else { - std::string addr_str (addr_, delimiter - addr_); - std::string subport_str (delimiter + 1); - subport = (vtcp_subport_t) atoi (subport_str.c_str ()); - int rc = address.resolve (addr_str.c_str (), false, true); - if (rc != 0) - return -1; - } - - return 0; -} - -void zmq::vtcp_connecter_t::process_plug () -{ - if (wait) - add_reconnect_timer(); - else - start_connecting (); -} - -void zmq::vtcp_connecter_t::in_event () -{ - // We are not polling for incomming data, so we are actually called - // because of error here. However, we can get error on out event as well - // on some platforms, so we'll simply handle both events in the same way. - out_event (); -} - -void zmq::vtcp_connecter_t::out_event () -{ - fd_t fd = connect (); - rm_fd (handle); - handle_valid = false; - - // Handle the error condition by attempt to reconnect. - if (fd == retired_fd) { - close (); - wait = true; - add_reconnect_timer(); - return; - } - - // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); - alloc_assert (engine); - - // Attach the engine to the corresponding session object. - send_attach (session, engine); - - // Shut the connecter down. - terminate (); -} - -void zmq::vtcp_connecter_t::timer_event (int id_) -{ - zmq_assert (id_ == reconnect_timer_id); - wait = false; - start_connecting (); -} - -void zmq::vtcp_connecter_t::start_connecting () -{ - // Open the connecting socket. - int rc = open (); - - // Handle error condition by eventual reconnect. - if (unlikely (rc != 0)) { - errno_assert (false); - wait = true; - add_reconnect_timer(); - return; - } - - // Connection establishment may be dealyed. Poll for its completion. - handle = add_fd (s); - handle_valid = true; - set_pollout (handle); -} - -void zmq::vtcp_connecter_t::add_reconnect_timer() -{ - add_timer (get_new_reconnect_ivl(), reconnect_timer_id); -} - -int zmq::vtcp_connecter_t::get_new_reconnect_ivl () -{ - // The new interval is the current interval + random value. - int this_interval = current_reconnect_ivl + - (generate_random () % options.reconnect_ivl); - - // Only change the current reconnect interval if the maximum reconnect - // interval was set and if it's larger than the reconnect interval. - if (options.reconnect_ivl_max > 0 && - options.reconnect_ivl_max > options.reconnect_ivl) { - - // Calculate the next interval - current_reconnect_ivl = current_reconnect_ivl * 2; - if(current_reconnect_ivl >= options.reconnect_ivl_max) { - current_reconnect_ivl = options.reconnect_ivl_max; - } - } - return this_interval; -} - -int zmq::vtcp_connecter_t::open () -{ - zmq_assert (s == retired_fd); - - // Start the connection procedure. - sockaddr_in *paddr = (sockaddr_in*) address.addr (); - s = vtcp_connect (paddr->sin_addr.s_addr, ntohs (paddr->sin_port)); - - // Connect was successfull immediately. - if (s != retired_fd) - return 0; - - // Asynchronous connect was launched. - if (errno == EINPROGRESS) { - errno = EAGAIN; - return -1; - } - - // Error occured. - return -1; -} - -zmq::fd_t zmq::vtcp_connecter_t::connect () -{ - int rc = vtcp_acceptc (s, subport); - if (rc != 0) { - int err = errno; - close (); - errno = err; - return retired_fd; - } - - tune_tcp_socket (s); - - fd_t result = s; - s = retired_fd; - return result; -} - -int zmq::vtcp_connecter_t::close () -{ - zmq_assert (s != retired_fd); - int rc = ::close (s); - if (rc != 0) - return -1; - s = retired_fd; - return 0; -} - -#endif - diff --git a/src/vtcp_connecter.hpp b/src/vtcp_connecter.hpp deleted file mode 100644 index 6e0cd6c..0000000 --- a/src/vtcp_connecter.hpp +++ /dev/null @@ -1,121 +0,0 @@ -/* - Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __VTCP_CONNECTER_HPP_INCLUDED__ -#define __VTCP_CONNECTER_HPP_INCLUDED__ - -#include "platform.hpp" - -#if defined ZMQ_HAVE_VTCP - -#include <vtcp.h> - -#include "fd.hpp" -#include "own.hpp" -#include "stdint.hpp" -#include "io_object.hpp" -#include "tcp_address.hpp" - -namespace zmq -{ - - class vtcp_connecter_t : public own_t, public io_object_t - { - public: - - // If 'delay' is true connecter first waits for a while, then starts - // connection process. - vtcp_connecter_t (class io_thread_t *io_thread_, - class session_base_t *session_, const options_t &options_, - const char *address_, bool delay_); - ~vtcp_connecter_t (); - - private: - - // ID of the timer used to delay the reconnection. - enum {reconnect_timer_id = 1}; - - // Handlers for incoming commands. - void process_plug (); - - // Handlers for I/O events. - void in_event (); - void out_event (); - void timer_event (int id_); - - // Internal function to start the actual connection establishment. - void start_connecting (); - - // Internal function to add a reconnect timer - void add_reconnect_timer(); - - // Internal function to return a reconnect backoff delay. - // Will modify the current_reconnect_ivl used for next call - // Returns the currently used interval - int get_new_reconnect_ivl (); - - // Set address to connect to. - int set_address (const char *addr_); - - // Open TCP connecting socket. Returns -1 in case of error, - // 0 if connect was successfull immediately and 1 if async connect - // was launched. - int open (); - - // Close the connecting socket. - int close (); - - // Get the file descriptor of newly created connection. Returns - // retired_fd if the connection was unsuccessfull. - fd_t connect (); - - // Address to connect to. - tcp_address_t address; - vtcp_subport_t subport; - - // Underlying socket. - fd_t s; - - // Handle corresponding to the listening socket. - handle_t handle; - - // If true file descriptor is registered with the poller and 'handle' - // contains valid value. - bool handle_valid; - - // If true, connecter is waiting a while before trying to connect. - bool wait; - - // Reference to the session we belong to. - class session_base_t *session; - - // Current reconnect ivl, updated for backoff strategy - int current_reconnect_ivl; - - vtcp_connecter_t (const vtcp_connecter_t&); - const vtcp_connecter_t &operator = (const vtcp_connecter_t&); - }; - -} - -#endif - -#endif diff --git a/src/vtcp_listener.cpp b/src/vtcp_listener.cpp deleted file mode 100644 index 826c619..0000000 --- a/src/vtcp_listener.cpp +++ /dev/null @@ -1,125 +0,0 @@ -/* - Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "vtcp_listener.hpp" - -#if defined ZMQ_HAVE_VTCP - -#include <string> -#include <string.h> -#include <vtcp.h> - -#include "stream_engine.hpp" -#include "session_base.hpp" -#include "stdint.hpp" -#include "err.hpp" -#include "ip.hpp" - -zmq::vtcp_listener_t::vtcp_listener_t (io_thread_t *io_thread_, - socket_base_t *socket_, options_t &options_) : - own_t (io_thread_, options_), - io_object_t (io_thread_), - s (retired_fd), - socket (socket_) -{ -} - -zmq::vtcp_listener_t::~vtcp_listener_t () -{ - if (s != retired_fd) { - int rc = ::close (s); - errno_assert (rc == 0); - s = retired_fd; - } -} - -int zmq::vtcp_listener_t::set_address (const char *addr_) -{ - // VTCP doesn't allow for binding to a specific interface. Connection - // string has to begin with *: (INADDR_ANY). - if (strlen (addr_) < 2 || addr_ [0] != '*' || addr_ [1] != ':') { - errno = EADDRNOTAVAIL; - return -1; - } - - // Parse port and subport. - uint16_t port; - uint32_t subport; - const char *delimiter = strrchr (addr_, '.'); - if (!delimiter) { - port = 9220; - subport = (uint32_t) atoi (addr_ + 2); - } - else { - std::string port_str (addr_ + 2, delimiter - addr_ - 2); - std::string subport_str (delimiter + 1); - port = (uint16_t) atoi (port_str.c_str ()); - subport = (uint32_t) atoi (subport_str.c_str ()); - } - - // Start listening. - s = vtcp_bind (port, subport); - if (s == retired_fd) - return -1; - - return 0; -} - -void zmq::vtcp_listener_t::process_plug () -{ - // Start polling for incoming connections. - handle = add_fd (s); - set_pollin (handle); -} - -void zmq::vtcp_listener_t::process_term (int linger_) -{ - rm_fd (handle); - own_t::process_term (linger_); -} - -void zmq::vtcp_listener_t::in_event () -{ - fd_t fd = vtcp_acceptb (s); - if (fd == retired_fd) - return; - - tune_tcp_socket (fd); - - // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); - alloc_assert (engine); - - // Choose I/O thread to run connecter in. Given that we are already - // running in an I/O thread, there must be at least one available. - io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_assert (io_thread); - - // Create and launch a session object. - session_base_t *session = session_base_t::create (io_thread, false, socket, - options, NULL, NULL); - alloc_assert (session); - session->inc_seqnum (); - launch_child (session); - send_attach (session, engine, false); -} - -#endif diff --git a/src/vtcp_listener.hpp b/src/vtcp_listener.hpp deleted file mode 100644 index 705ce7d..0000000 --- a/src/vtcp_listener.hpp +++ /dev/null @@ -1,72 +0,0 @@ -/* - Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_VTCP_LISTENER_HPP_INCLUDED__ -#define __ZMQ_VTCP_LISTENER_HPP_INCLUDED__ - -#include "platform.hpp" - -#if defined ZMQ_HAVE_VTCP - -#include "own.hpp" -#include "io_object.hpp" -#include "fd.hpp" - -namespace zmq -{ - - class vtcp_listener_t : public own_t, public io_object_t - { - public: - - vtcp_listener_t (class io_thread_t *io_thread_, - class socket_base_t *socket_, class options_t &options_); - ~vtcp_listener_t (); - - int set_address (const char *addr_); - - private: - - // Handlers for incoming commands. - void process_plug (); - void process_term (int linger_); - - // Handlers for I/O events. - void in_event (); - - // VTCP listener socket. - fd_t s; - - // Handle corresponding to the listening socket. - handle_t handle; - - // Socket the listerner belongs to. - class socket_base_t *socket; - - vtcp_listener_t (const vtcp_listener_t&); - const vtcp_listener_t &operator = (const vtcp_listener_t&); - }; - -} - -#endif - -#endif |