diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile.am | 4 | ||||
| -rw-r--r-- | src/socket_base.cpp | 53 | ||||
| -rw-r--r-- | src/tcp_connecter.cpp | 46 | ||||
| -rw-r--r-- | src/tcp_connecter.hpp | 12 | ||||
| -rw-r--r-- | src/tcp_listener.cpp | 1 | ||||
| -rw-r--r-- | src/vtcp_listener.cpp | 111 | ||||
| -rw-r--r-- | src/vtcp_listener.hpp | 71 | 
7 files changed, 278 insertions, 20 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index 1354174..1da5be9 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -64,6 +64,8 @@ libzmq_la_SOURCES = \      tcp_listener.hpp \      thread.hpp \      trie.hpp \ +    vtcp_connecter.hpp \ +    vtcp_listener.hpp \      windows.hpp \      wire.hpp \      xpub.hpp \ @@ -117,6 +119,8 @@ libzmq_la_SOURCES = \      tcp_listener.cpp \      thread.cpp \      trie.cpp \ +    vtcp_connecter.cpp \ +    vtcp_listener.cpp \      xpub.cpp \      xrep.cpp \      xreq.cpp \ diff --git a/src/socket_base.cpp b/src/socket_base.cpp index b97e3a7..fb0f5fd 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -35,6 +35,7 @@  #include "socket_base.hpp"  #include "tcp_listener.hpp" +#include "vtcp_listener.hpp"  #include "tcp_connecter.hpp"  #include "io_thread.hpp"  #include "session.hpp" @@ -174,7 +175,8 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)  {      //  First check out whether the protcol is something we are aware of.      if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && -          protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys") { +          protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys" && +          protocol_ != "vtcp") {          errno = EPROTONOSUPPORT;          return -1;      } @@ -188,6 +190,14 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)      }  #endif +    //  If 0MQ is not compiled with VTCP, vtcp transport is not avaialble. +#if !defined ZMQ_HAVE_VTCP +    if (protocol_ == "vtcp") { +        errno = EPROTONOSUPPORT; +        return -1; +    } +#endif +      //  IPC transport is not available on Windows and OpenVMS.  #if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS      if (protocol_ == "ipc") { @@ -338,16 +348,22 @@ int zmq::socket_base_t::bind (const char *addr_)          return register_endpoint (addr_, endpoint);      } -    if (protocol == "tcp" || protocol == "ipc") { +    if (protocol == "pgm" || protocol == "epgm") { -        //  Choose I/O thread to run the listerner in. -        io_thread_t *io_thread = choose_io_thread (options.affinity); -        if (!io_thread) { -            errno = EMTHREAD; -            return -1; -        } +        //  For convenience's sake, bind can be used interchageable with +        //  connect for PGM and EPGM transports. +        return connect (addr_);  +    } -        //  Create and run the listener. +    //  Remaining trasnports require to be run in an I/O thread, so at this +    //  point we'll choose one. +    io_thread_t *io_thread = choose_io_thread (options.affinity); +    if (!io_thread) { +        errno = EMTHREAD; +        return -1; +    } + +    if (protocol == "tcp" || protocol == "ipc") {          tcp_listener_t *listener = new (std::nothrow) tcp_listener_t (              io_thread, this, options);          alloc_assert (listener); @@ -357,16 +373,23 @@ int zmq::socket_base_t::bind (const char *addr_)              return -1;          }          launch_child (listener); -          return 0;      } -    if (protocol == "pgm" || protocol == "epgm") { - -        //  For convenience's sake, bind can be used interchageable with -        //  connect for PGM and EPGM transports. -        return connect (addr_);  +#if defined ZMQ_HAVE_VTCP +    if (protocol == "vtcp") { +        vtcp_listener_t *listener = new (std::nothrow) vtcp_listener_t ( +            io_thread, this, options); +        alloc_assert (listener); +        int rc = listener->set_address (address.c_str ()); +        if (rc != 0) { +            delete listener; +            return -1; +        } +        launch_child (listener); +        return 0;      } +#endif      zmq_assert (false);      return -1; diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 678d488..b530e8e 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -23,6 +23,52 @@  #include "tcp_connecter.hpp"  #include "tcp_engine.hpp" +#include "platform.hpp" +#include "ip.hpp" +#include "err.hpp" + +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <arpa/inet.h> +#include <netinet/tcp.h> +#include <netinet/in.h> +#include <netdb.h> +#include <fcntl.h> +#ifdef ZMQ_HAVE_OPENVMS +#include <ioctl.h> +#endif +#endif + + +/* +    Copyright (c) 2007-2011 iMatix Corporation +    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <new> +#include <string> + +#include "tcp_connecter.hpp" +#include "tcp_engine.hpp"  #include "io_thread.hpp"  #include "platform.hpp"  #include "ip.hpp" diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index 2168ca7..524e9cc 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -18,8 +18,8 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ -#ifndef __ZMQ_TCP_CONNECTER_HPP_INCLUDED__ -#define __ZMQ_TCP_CONNECTER_HPP_INCLUDED__ +#ifndef __TCP_CONNECTER_HPP_INCLUDED__ +#define __TCP_CONNECTER_HPP_INCLUDED__  #include "fd.hpp"  #include "ip.hpp" @@ -30,11 +30,14 @@  namespace zmq  { +    //  A base class for different connecters. It handles auto-reconnection +    //  on behalf of the derived class. +      class tcp_connecter_t : public own_t, public io_object_t      {      public: -        //  If 'wait' is true connecter first waits for a while, then starts +        //  If 'delay' is true connecter first waits for a while, then starts          //  connection process.          tcp_connecter_t (class io_thread_t *io_thread_,              class session_t *session_, const options_t &options_, @@ -68,8 +71,7 @@ namespace zmq          //  Set address to connect to.          int set_address (const char *protocol, const char *addr_); -        //  Open TCP connecting socket. Address is in -        //  <hostname>:<port-number> format. Returns -1 in case of error, +        //  Open TCP connecting socket. Returns -1 in case of error,          //  0 if connect was successfull immediately and 1 if async connect          //  was launched.          int open (); diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index b409891..fbb1b85 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -86,6 +86,7 @@ void zmq::tcp_listener_t::in_event ()      //  TODO: Handle specific errors like ENFILE/EMFILE etc.      if (fd == retired_fd)          return; +      //  Create the engine object for this connection.      tcp_engine_t *engine = new (std::nothrow) tcp_engine_t (fd, options);      alloc_assert (engine); diff --git a/src/vtcp_listener.cpp b/src/vtcp_listener.cpp new file mode 100644 index 0000000..31fb9ac --- /dev/null +++ b/src/vtcp_listener.cpp @@ -0,0 +1,111 @@ +/* +    Copyright (c) 2007-2011 iMatix Corporation +    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "vtcp_listener.hpp" + +#if defined ZMQ_HAVE_VTCP + +#include <string> +#include <string.h> +#include <vtcp.h> + +#include "tcp_engine.hpp" +#include "session.hpp" +#include "stdint.hpp" +#include "err.hpp" + +zmq::vtcp_listener_t::vtcp_listener_t (io_thread_t *io_thread_, +        socket_base_t *socket_, options_t &options_) : +    own_t (io_thread_, options_), +    io_object_t (io_thread_), +    s (retired_fd), +    socket (socket_) +{ +} + +zmq::vtcp_listener_t::~vtcp_listener_t () +{ +    zmq_assert (s != retired_fd); +    int rc = ::close (s); +    errno_assert (rc == 0); +    s = retired_fd; +} + +int zmq::vtcp_listener_t::set_address (const char *addr_) +{ +    //  Find the '.' at end that separates NIC name from service. +    const char *delimiter = strrchr (addr_, '.'); +    if (!delimiter) { +        errno = EINVAL; +        return -1; +    } + +    //  Parse port and subport. +    std::string port_str (addr_, delimiter - addr_); +    std::string subport_str (delimiter + 1); +    uint16_t port = (uint16_t) atoi (port_str.c_str ()); +    uint32_t subport = (uint32_t) atoi (subport_str.c_str ()); + +    //  Srart listening. +    s = vtcp_bind (port, subport); +    if (s == retired_fd) +        return -1; + +    return 0; +} + +void zmq::vtcp_listener_t::process_plug () +{ +    //  Start polling for incoming connections. +    handle = add_fd (s); +    set_pollin (handle); +} + +void zmq::vtcp_listener_t::process_term (int linger_) +{ +    rm_fd (handle); +    own_t::process_term (linger_); +} + +void zmq::vtcp_listener_t::in_event () +{ +    fd_t fd = vtcp_acceptb (s); +    if (fd == retired_fd) +        return; + +    //  Create the engine object for this connection. +    tcp_engine_t *engine = new (std::nothrow) tcp_engine_t (fd, options); +    alloc_assert (engine); + +    //  Choose I/O thread to run connecter in. Given that we are already +    //  running in an I/O thread, there must be at least one available. +    io_thread_t *io_thread = choose_io_thread (options.affinity); +    zmq_assert (io_thread); + +    //  Create and launch a session object.  +    session_t *session = new (std::nothrow) +        session_t (io_thread, false, socket, options, NULL, NULL); +    alloc_assert (session); +    session->inc_seqnum (); +    launch_child (session); +    send_attach (session, engine, false); +} + +#endif diff --git a/src/vtcp_listener.hpp b/src/vtcp_listener.hpp new file mode 100644 index 0000000..78f3b51 --- /dev/null +++ b/src/vtcp_listener.hpp @@ -0,0 +1,71 @@ +/* +    Copyright (c) 2007-2011 iMatix Corporation +    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_VTCP_LISTENER_HPP_INCLUDED__ +#define __ZMQ_VTCP_LISTENER_HPP_INCLUDED__ + +#include "platform.hpp" + +#if defined ZMQ_HAVE_VTCP + +#include "own.hpp" +#include "io_object.hpp" +#include "fd.hpp" + +namespace zmq +{ + +    class vtcp_listener_t : public own_t, public io_object_t +    { +    public: + +        vtcp_listener_t (class io_thread_t *io_thread_, +            class socket_base_t *socket_, class options_t &options_); +        ~vtcp_listener_t (); + +        int set_address (const char *addr_); + +    private: + +        //  Handlers for incoming commands. +        void process_plug (); +        void process_term (int linger_); + +        //  Handlers for I/O events. +        void in_event (); + +        //  VTCP listener socket. +        fd_t s; + +        //  Handle corresponding to the listening socket. +        handle_t handle; + +        //  Socket the listerner belongs to. +        class socket_base_t *socket; + +        vtcp_listener_t (const vtcp_listener_t&); +        const vtcp_listener_t &operator = (const vtcp_listener_t&); +    }; + +} + +#endif + +#endif | 
