From 279302c5f54ddf8a23b1eaacee63c3158850d9ff Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 26 Jul 2011 18:35:40 +0200 Subject: Experimental VTCP listener added Signed-off-by: Martin Sustrik --- src/Makefile.am | 4 ++ src/socket_base.cpp | 53 +++++++++++++++++------- src/tcp_connecter.cpp | 46 +++++++++++++++++++++ src/tcp_connecter.hpp | 12 +++--- src/tcp_listener.cpp | 1 + src/vtcp_listener.cpp | 111 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/vtcp_listener.hpp | 71 ++++++++++++++++++++++++++++++++ 7 files changed, 278 insertions(+), 20 deletions(-) create mode 100644 src/vtcp_listener.cpp create mode 100644 src/vtcp_listener.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index 1354174..1da5be9 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -64,6 +64,8 @@ libzmq_la_SOURCES = \ tcp_listener.hpp \ thread.hpp \ trie.hpp \ + vtcp_connecter.hpp \ + vtcp_listener.hpp \ windows.hpp \ wire.hpp \ xpub.hpp \ @@ -117,6 +119,8 @@ libzmq_la_SOURCES = \ tcp_listener.cpp \ thread.cpp \ trie.cpp \ + vtcp_connecter.cpp \ + vtcp_listener.cpp \ xpub.cpp \ xrep.cpp \ xreq.cpp \ diff --git a/src/socket_base.cpp b/src/socket_base.cpp index b97e3a7..fb0f5fd 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -35,6 +35,7 @@ #include "socket_base.hpp" #include "tcp_listener.hpp" +#include "vtcp_listener.hpp" #include "tcp_connecter.hpp" #include "io_thread.hpp" #include "session.hpp" @@ -174,7 +175,8 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) { // First check out whether the protcol is something we are aware of. if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && - protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys") { + protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys" && + protocol_ != "vtcp") { errno = EPROTONOSUPPORT; return -1; } @@ -188,6 +190,14 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) } #endif + // If 0MQ is not compiled with VTCP, vtcp transport is not avaialble. +#if !defined ZMQ_HAVE_VTCP + if (protocol_ == "vtcp") { + errno = EPROTONOSUPPORT; + return -1; + } +#endif + // IPC transport is not available on Windows and OpenVMS. #if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS if (protocol_ == "ipc") { @@ -338,16 +348,22 @@ int zmq::socket_base_t::bind (const char *addr_) return register_endpoint (addr_, endpoint); } - if (protocol == "tcp" || protocol == "ipc") { + if (protocol == "pgm" || protocol == "epgm") { - // Choose I/O thread to run the listerner in. - io_thread_t *io_thread = choose_io_thread (options.affinity); - if (!io_thread) { - errno = EMTHREAD; - return -1; - } + // For convenience's sake, bind can be used interchageable with + // connect for PGM and EPGM transports. + return connect (addr_); + } - // Create and run the listener. + // Remaining trasnports require to be run in an I/O thread, so at this + // point we'll choose one. + io_thread_t *io_thread = choose_io_thread (options.affinity); + if (!io_thread) { + errno = EMTHREAD; + return -1; + } + + if (protocol == "tcp" || protocol == "ipc") { tcp_listener_t *listener = new (std::nothrow) tcp_listener_t ( io_thread, this, options); alloc_assert (listener); @@ -357,16 +373,23 @@ int zmq::socket_base_t::bind (const char *addr_) return -1; } launch_child (listener); - return 0; } - if (protocol == "pgm" || protocol == "epgm") { - - // For convenience's sake, bind can be used interchageable with - // connect for PGM and EPGM transports. - return connect (addr_); +#if defined ZMQ_HAVE_VTCP + if (protocol == "vtcp") { + vtcp_listener_t *listener = new (std::nothrow) vtcp_listener_t ( + io_thread, this, options); + alloc_assert (listener); + int rc = listener->set_address (address.c_str ()); + if (rc != 0) { + delete listener; + return -1; + } + launch_child (listener); + return 0; } +#endif zmq_assert (false); return -1; diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 678d488..b530e8e 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -21,6 +21,52 @@ #include #include +#include "tcp_connecter.hpp" +#include "tcp_engine.hpp" +#include "platform.hpp" +#include "ip.hpp" +#include "err.hpp" + +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef ZMQ_HAVE_OPENVMS +#include +#endif +#endif + + +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include + #include "tcp_connecter.hpp" #include "tcp_engine.hpp" #include "io_thread.hpp" diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index 2168ca7..524e9cc 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -18,8 +18,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_TCP_CONNECTER_HPP_INCLUDED__ -#define __ZMQ_TCP_CONNECTER_HPP_INCLUDED__ +#ifndef __TCP_CONNECTER_HPP_INCLUDED__ +#define __TCP_CONNECTER_HPP_INCLUDED__ #include "fd.hpp" #include "ip.hpp" @@ -30,11 +30,14 @@ namespace zmq { + // A base class for different connecters. It handles auto-reconnection + // on behalf of the derived class. + class tcp_connecter_t : public own_t, public io_object_t { public: - // If 'wait' is true connecter first waits for a while, then starts + // If 'delay' is true connecter first waits for a while, then starts // connection process. tcp_connecter_t (class io_thread_t *io_thread_, class session_t *session_, const options_t &options_, @@ -68,8 +71,7 @@ namespace zmq // Set address to connect to. int set_address (const char *protocol, const char *addr_); - // Open TCP connecting socket. Address is in - // : format. Returns -1 in case of error, + // Open TCP connecting socket. Returns -1 in case of error, // 0 if connect was successfull immediately and 1 if async connect // was launched. int open (); diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index b409891..fbb1b85 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -86,6 +86,7 @@ void zmq::tcp_listener_t::in_event () // TODO: Handle specific errors like ENFILE/EMFILE etc. if (fd == retired_fd) return; + // Create the engine object for this connection. tcp_engine_t *engine = new (std::nothrow) tcp_engine_t (fd, options); alloc_assert (engine); diff --git a/src/vtcp_listener.cpp b/src/vtcp_listener.cpp new file mode 100644 index 0000000..31fb9ac --- /dev/null +++ b/src/vtcp_listener.cpp @@ -0,0 +1,111 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "vtcp_listener.hpp" + +#if defined ZMQ_HAVE_VTCP + +#include +#include +#include + +#include "tcp_engine.hpp" +#include "session.hpp" +#include "stdint.hpp" +#include "err.hpp" + +zmq::vtcp_listener_t::vtcp_listener_t (io_thread_t *io_thread_, + socket_base_t *socket_, options_t &options_) : + own_t (io_thread_, options_), + io_object_t (io_thread_), + s (retired_fd), + socket (socket_) +{ +} + +zmq::vtcp_listener_t::~vtcp_listener_t () +{ + zmq_assert (s != retired_fd); + int rc = ::close (s); + errno_assert (rc == 0); + s = retired_fd; +} + +int zmq::vtcp_listener_t::set_address (const char *addr_) +{ + // Find the '.' at end that separates NIC name from service. + const char *delimiter = strrchr (addr_, '.'); + if (!delimiter) { + errno = EINVAL; + return -1; + } + + // Parse port and subport. + std::string port_str (addr_, delimiter - addr_); + std::string subport_str (delimiter + 1); + uint16_t port = (uint16_t) atoi (port_str.c_str ()); + uint32_t subport = (uint32_t) atoi (subport_str.c_str ()); + + // Srart listening. + s = vtcp_bind (port, subport); + if (s == retired_fd) + return -1; + + return 0; +} + +void zmq::vtcp_listener_t::process_plug () +{ + // Start polling for incoming connections. + handle = add_fd (s); + set_pollin (handle); +} + +void zmq::vtcp_listener_t::process_term (int linger_) +{ + rm_fd (handle); + own_t::process_term (linger_); +} + +void zmq::vtcp_listener_t::in_event () +{ + fd_t fd = vtcp_acceptb (s); + if (fd == retired_fd) + return; + + // Create the engine object for this connection. + tcp_engine_t *engine = new (std::nothrow) tcp_engine_t (fd, options); + alloc_assert (engine); + + // Choose I/O thread to run connecter in. Given that we are already + // running in an I/O thread, there must be at least one available. + io_thread_t *io_thread = choose_io_thread (options.affinity); + zmq_assert (io_thread); + + // Create and launch a session object. + session_t *session = new (std::nothrow) + session_t (io_thread, false, socket, options, NULL, NULL); + alloc_assert (session); + session->inc_seqnum (); + launch_child (session); + send_attach (session, engine, false); +} + +#endif diff --git a/src/vtcp_listener.hpp b/src/vtcp_listener.hpp new file mode 100644 index 0000000..78f3b51 --- /dev/null +++ b/src/vtcp_listener.hpp @@ -0,0 +1,71 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_VTCP_LISTENER_HPP_INCLUDED__ +#define __ZMQ_VTCP_LISTENER_HPP_INCLUDED__ + +#include "platform.hpp" + +#if defined ZMQ_HAVE_VTCP + +#include "own.hpp" +#include "io_object.hpp" +#include "fd.hpp" + +namespace zmq +{ + + class vtcp_listener_t : public own_t, public io_object_t + { + public: + + vtcp_listener_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, class options_t &options_); + ~vtcp_listener_t (); + + int set_address (const char *addr_); + + private: + + // Handlers for incoming commands. + void process_plug (); + void process_term (int linger_); + + // Handlers for I/O events. + void in_event (); + + // VTCP listener socket. + fd_t s; + + // Handle corresponding to the listening socket. + handle_t handle; + + // Socket the listerner belongs to. + class socket_base_t *socket; + + vtcp_listener_t (const vtcp_listener_t&); + const vtcp_listener_t &operator = (const vtcp_listener_t&); + }; + +} + +#endif + +#endif -- cgit v1.2.3