From 2bb57ac57ace37203c505ff17147210feca34d73 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 15 Jan 2010 14:11:39 +0100 Subject: ZMQII-39: Implement IPC transport --- src/tcp_listener.cpp | 166 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 117 insertions(+), 49 deletions(-) (limited to 'src/tcp_listener.cpp') diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 9087405..2b30a8e 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -39,10 +39,16 @@ zmq::tcp_listener_t::~tcp_listener_t () close (); } -int zmq::tcp_listener_t::set_address (const char *addr_) +int zmq::tcp_listener_t::set_address (cosnt char *protocol_, const char *addr_) { + // IPC protocol is not supported on Windows platform. + if (strcmp (protocol_, "tcp") != 0 ) { + errno = EPROTONOSUPPORT; + return -1; + } + // Convert the interface into sockaddr_in structure. - int rc = resolve_ip_interface (&addr, addr_); + int rc = resolve_ip_interface ((sockaddr_in*) &addr, addr_); if (rc != 0) return rc; @@ -65,7 +71,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_) wsa_assert (rc != SOCKET_ERROR); // Bind the socket to the network interface and port. - rc = bind (s, (struct sockaddr*) &addr, sizeof (addr)); + rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_in)); if (rc == SOCKET_ERROR) { wsa_error_to_errno (); return -1; @@ -131,6 +137,7 @@ zmq::fd_t zmq::tcp_listener_t::accept () #include #include #include +#include zmq::tcp_listener_t::tcp_listener_t () : s (retired_fd) @@ -144,45 +151,91 @@ zmq::tcp_listener_t::~tcp_listener_t () close (); } -int zmq::tcp_listener_t::set_address (const char *addr_) +int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_) { - // Convert the interface into sockaddr_in structure. - int rc = resolve_ip_interface (&addr, addr_); - if (rc != 0) - return rc; - - // Create a listening socket. - s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); - if (s == -1) - return -1; - - // Allow reusing of the address. - int flag = 1; - rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); - errno_assert (rc == 0); - - // Set the non-blocking flag. - flag = fcntl (s, F_GETFL, 0); - if (flag == -1) - flag = 0; - rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); - errno_assert (rc != -1); - - // Bind the socket to the network interface and port. - rc = bind (s, (struct sockaddr*) &addr, sizeof (addr)); - if (rc != 0) { - close (); - return -1; + if (strcmp (protocol_, "tcp") == 0 ) { + + // Convert the interface into sockaddr_in structure. + int rc = resolve_ip_interface ((struct sockaddr_in*) &addr, addr_); + if (rc != 0) + return -1; + + // Create a listening socket. + s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (s == -1) + return -1; + + // Allow reusing of the address. + int flag = 1; + rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); + errno_assert (rc == 0); + + // Set the non-blocking flag. + flag = fcntl (s, F_GETFL, 0); + if (flag == -1) + flag = 0; + rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); + errno_assert (rc != -1); + + // Bind the socket to the network interface and port. + rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_in)); + if (rc != 0) { + close (); + return -1; + } + + // Listen for incomming connections. + rc = listen (s, tcp_connection_backlog); + if (rc != 0) { + close (); + return -1; + } + + return 0; } - - // Listen for incomming connections. - rc = listen (s, tcp_connection_backlog); - if (rc != 0) { - close (); - return -1; + else if (strcmp (protocol_, "ipc") == 0) { + + // Get rid of the file associated with the UNIX domain socket that + // may have been left behind by the previous run of the application. + ::unlink (addr_); + + // Convert the address into sockaddr_un structure. + int rc = resolve_local_path ((struct sockaddr_un*) &addr, addr_); + if (rc != 0) + return -1; + + // Create a listening socket. + s = socket (AF_LOCAL, SOCK_STREAM, 0); + if (s == -1) + return -1; + + // Set the non-blocking flag. + int flag = fcntl (s, F_GETFL, 0); + if (flag == -1) + flag = 0; + rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); + errno_assert (rc != -1); + + // Bind the socket to the file path. + rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_un)); + if (rc != 0) { + close (); + return -1; + } + + // Listen for incomming connections. + rc = listen (s, tcp_connection_backlog); + if (rc != 0) { + close (); + return -1; + } + + return 0; } - - return 0; + else { + errno = EPROTONOSUPPORT; + return -1; + } } int zmq::tcp_listener_t::close () @@ -192,6 +245,17 @@ int zmq::tcp_listener_t::close () if (rc != 0) return -1; s = retired_fd; + + // If there's an underlying UNIX domain socket, get rid of the file it + // is associated with. + struct sockaddr *sa = (struct sockaddr*) &addr; + if (AF_LOCAL == sa->sa_family) { + struct sockaddr_un *sun = (struct sockaddr_un*) &addr; + rc = ::unlink(sun->sun_path); + if (rc != 0) + return -1; + } + return 0; } @@ -239,19 +303,23 @@ zmq::fd_t zmq::tcp_listener_t::accept () int rc = fcntl (sock, F_SETFL, flags | O_NONBLOCK); errno_assert (rc != -1); - // Disable Nagle's algorithm. - int flag = 1; - rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, - sizeof (int)); - errno_assert (rc == 0); + struct sockaddr *sa = (struct sockaddr*) &addr; + if (AF_INET == sa->sa_family) { + + // Disable Nagle's algorithm. + int flag = 1; + rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, + sizeof (int)); + errno_assert (rc == 0); #ifdef ZMQ_HAVE_OPENVMS - // Disable delayed acknowledgements. - flag = 1; - rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, - sizeof (int)); - errno_assert (rc != SOCKET_ERROR); + // Disable delayed acknowledgements. + flag = 1; + rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, + sizeof (int)); + errno_assert (rc != SOCKET_ERROR); #endif + } return sock; } -- cgit v1.2.3