From 9119b4fd7b292b1a14db916040f8e7cc4731d4b6 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 26 Jul 2011 00:43:57 +0200 Subject: TCP transport classes simplified zmq_engine and tcp_socket merged into tcp_engine zmq_connecter and tcp_connecter merged into tcp_connecter zmq_listener and tcp_listener merged into tcp_listener Signed-off-by: Martin Sustrik --- src/tcp_listener.cpp | 127 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 75 insertions(+), 52 deletions(-) (limited to 'src/tcp_listener.cpp') diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index f40b0fe..b409891 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -18,19 +18,42 @@ along with this program. If not, see . */ +#include + #include #include "tcp_listener.hpp" #include "platform.hpp" -#include "ip.hpp" +#include "tcp_engine.hpp" +#include "io_thread.hpp" +#include "session.hpp" #include "config.hpp" #include "err.hpp" #ifdef ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#include +#include +#include +#include +#include +#include +#ifndef ZMQ_HAVE_OPENVMS +#include +#else +#include +#endif +#endif -zmq::tcp_listener_t::tcp_listener_t () : +zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_, + socket_base_t *socket_, const options_t &options_) : + own_t (io_thread_, options_), + io_object_t (io_thread_), has_file (false), - s (retired_fd) + s (retired_fd), + socket (socket_) { memset (&addr, 0, sizeof (addr)); addr_len = 0; @@ -42,8 +65,48 @@ zmq::tcp_listener_t::~tcp_listener_t () close (); } -int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, - int backlog_) +void zmq::tcp_listener_t::process_plug () +{ + // Start polling for incoming connections. + handle = add_fd (s); + set_pollin (handle); +} + +void zmq::tcp_listener_t::process_term (int linger_) +{ + rm_fd (handle); + own_t::process_term (linger_); +} + +void zmq::tcp_listener_t::in_event () +{ + fd_t fd = accept (); + + // If connection was reset by the peer in the meantime, just ignore it. + // TODO: Handle specific errors like ENFILE/EMFILE etc. + if (fd == retired_fd) + return; + // Create the engine object for this connection. + tcp_engine_t *engine = new (std::nothrow) tcp_engine_t (fd, options); + alloc_assert (engine); + + // Choose I/O thread to run connecter in. Given that we are already + // running in an I/O thread, there must be at least one available. + io_thread_t *io_thread = choose_io_thread (options.affinity); + zmq_assert (io_thread); + + // Create and launch a session object. + session_t *session = new (std::nothrow) + session_t (io_thread, false, socket, options, NULL, NULL); + alloc_assert (session); + session->inc_seqnum (); + launch_child (session); + send_attach (session, engine, false); +} + +#ifdef ZMQ_HAVE_WINDOWS + +int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_) { // IPC protocol is not supported on Windows platform. if (strcmp (protocol_, "tcp") != 0 ) { @@ -57,7 +120,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, return rc; // Create a listening socket. - s = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); + s = ::socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); if (s == INVALID_SOCKET) { wsa_error_to_errno (); return -1; @@ -82,7 +145,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, } // Listen for incomming connections. - rc = listen (s, backlog_); + rc = listen (s, options.backlog); if (rc == SOCKET_ERROR) { wsa_error_to_errno (); return -1; @@ -100,11 +163,6 @@ int zmq::tcp_listener_t::close () return 0; } -zmq::fd_t zmq::tcp_listener_t::get_fd () -{ - return s; -} - zmq::fd_t zmq::tcp_listener_t::accept () { zmq_assert (s != retired_fd); @@ -134,37 +192,7 @@ zmq::fd_t zmq::tcp_listener_t::accept () #else -#include -#include -#include -#include -#include -#include -#include - -#ifndef ZMQ_HAVE_OPENVMS -#include -#endif - -#ifdef ZMQ_HAVE_OPENVMS -#include -#endif - -zmq::tcp_listener_t::tcp_listener_t () : - has_file (false), - s (retired_fd) -{ - memset (&addr, 0, sizeof (addr)); -} - -zmq::tcp_listener_t::~tcp_listener_t () -{ - if (s != retired_fd) - close (); -} - -int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, - int backlog_) +int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_) { if (strcmp (protocol_, "tcp") == 0 ) { @@ -174,7 +202,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, return -1; // Create a listening socket. - s = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); + s = ::socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); if (s == -1) return -1; @@ -207,7 +235,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, } // Listen for incomming connections. - rc = listen (s, backlog_); + rc = listen (s, options.backlog); if (rc != 0) { int err = errno; if (close () != 0) @@ -231,7 +259,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, return -1; // Create a listening socket. - s = socket (AF_UNIX, SOCK_STREAM, 0); + s = ::socket (AF_UNIX, SOCK_STREAM, 0); if (s == -1) return -1; @@ -254,7 +282,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, has_file = true; // Listen for incomming connections. - rc = listen (s, backlog_); + rc = listen (s, options.backlog); if (rc != 0) { int err = errno; if (close () != 0) @@ -294,11 +322,6 @@ int zmq::tcp_listener_t::close () return 0; } -zmq::fd_t zmq::tcp_listener_t::get_fd () -{ - return s; -} - zmq::fd_t zmq::tcp_listener_t::accept () { zmq_assert (s != retired_fd); -- cgit v1.2.3