summaryrefslogtreecommitdiff
path: root/src/tcp_listener.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/tcp_listener.cpp')
-rw-r--r--src/tcp_listener.cpp127
1 files changed, 75 insertions, 52 deletions
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index f40b0fe..b409891 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -18,19 +18,42 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <new>
+
#include <string.h>
#include "tcp_listener.hpp"
#include "platform.hpp"
-#include "ip.hpp"
+#include "tcp_engine.hpp"
+#include "io_thread.hpp"
+#include "session.hpp"
#include "config.hpp"
#include "err.hpp"
#ifdef ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#else
+#include <unistd.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <netinet/tcp.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <fcntl.h>
+#ifndef ZMQ_HAVE_OPENVMS
+#include <sys/un.h>
+#else
+#include <ioctl.h>
+#endif
+#endif
-zmq::tcp_listener_t::tcp_listener_t () :
+zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_,
+ socket_base_t *socket_, const options_t &options_) :
+ own_t (io_thread_, options_),
+ io_object_t (io_thread_),
has_file (false),
- s (retired_fd)
+ s (retired_fd),
+ socket (socket_)
{
memset (&addr, 0, sizeof (addr));
addr_len = 0;
@@ -42,8 +65,48 @@ zmq::tcp_listener_t::~tcp_listener_t ()
close ();
}
-int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_,
- int backlog_)
+void zmq::tcp_listener_t::process_plug ()
+{
+ // Start polling for incoming connections.
+ handle = add_fd (s);
+ set_pollin (handle);
+}
+
+void zmq::tcp_listener_t::process_term (int linger_)
+{
+ rm_fd (handle);
+ own_t::process_term (linger_);
+}
+
+void zmq::tcp_listener_t::in_event ()
+{
+ fd_t fd = accept ();
+
+ // If connection was reset by the peer in the meantime, just ignore it.
+ // TODO: Handle specific errors like ENFILE/EMFILE etc.
+ if (fd == retired_fd)
+ return;
+ // Create the engine object for this connection.
+ tcp_engine_t *engine = new (std::nothrow) tcp_engine_t (fd, options);
+ alloc_assert (engine);
+
+ // Choose I/O thread to run connecter in. Given that we are already
+ // running in an I/O thread, there must be at least one available.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ zmq_assert (io_thread);
+
+ // Create and launch a session object.
+ session_t *session = new (std::nothrow)
+ session_t (io_thread, false, socket, options, NULL, NULL);
+ alloc_assert (session);
+ session->inc_seqnum ();
+ launch_child (session);
+ send_attach (session, engine, false);
+}
+
+#ifdef ZMQ_HAVE_WINDOWS
+
+int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_)
{
// IPC protocol is not supported on Windows platform.
if (strcmp (protocol_, "tcp") != 0 ) {
@@ -57,7 +120,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_,
return rc;
// Create a listening socket.
- s = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);
+ s = ::socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);
if (s == INVALID_SOCKET) {
wsa_error_to_errno ();
return -1;
@@ -82,7 +145,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_,
}
// Listen for incomming connections.
- rc = listen (s, backlog_);
+ rc = listen (s, options.backlog);
if (rc == SOCKET_ERROR) {
wsa_error_to_errno ();
return -1;
@@ -100,11 +163,6 @@ int zmq::tcp_listener_t::close ()
return 0;
}
-zmq::fd_t zmq::tcp_listener_t::get_fd ()
-{
- return s;
-}
-
zmq::fd_t zmq::tcp_listener_t::accept ()
{
zmq_assert (s != retired_fd);
@@ -134,37 +192,7 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
#else
-#include <unistd.h>
-#include <sys/socket.h>
-#include <arpa/inet.h>
-#include <netinet/tcp.h>
-#include <netinet/in.h>
-#include <netdb.h>
-#include <fcntl.h>
-
-#ifndef ZMQ_HAVE_OPENVMS
-#include <sys/un.h>
-#endif
-
-#ifdef ZMQ_HAVE_OPENVMS
-#include <ioctl.h>
-#endif
-
-zmq::tcp_listener_t::tcp_listener_t () :
- has_file (false),
- s (retired_fd)
-{
- memset (&addr, 0, sizeof (addr));
-}
-
-zmq::tcp_listener_t::~tcp_listener_t ()
-{
- if (s != retired_fd)
- close ();
-}
-
-int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_,
- int backlog_)
+int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_)
{
if (strcmp (protocol_, "tcp") == 0 ) {
@@ -174,7 +202,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_,
return -1;
// Create a listening socket.
- s = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);
+ s = ::socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);
if (s == -1)
return -1;
@@ -207,7 +235,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_,
}
// Listen for incomming connections.
- rc = listen (s, backlog_);
+ rc = listen (s, options.backlog);
if (rc != 0) {
int err = errno;
if (close () != 0)
@@ -231,7 +259,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_,
return -1;
// Create a listening socket.
- s = socket (AF_UNIX, SOCK_STREAM, 0);
+ s = ::socket (AF_UNIX, SOCK_STREAM, 0);
if (s == -1)
return -1;
@@ -254,7 +282,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_,
has_file = true;
// Listen for incomming connections.
- rc = listen (s, backlog_);
+ rc = listen (s, options.backlog);
if (rc != 0) {
int err = errno;
if (close () != 0)
@@ -294,11 +322,6 @@ int zmq::tcp_listener_t::close ()
return 0;
}
-zmq::fd_t zmq::tcp_listener_t::get_fd ()
-{
- return s;
-}
-
zmq::fd_t zmq::tcp_listener_t::accept ()
{
zmq_assert (s != retired_fd);