diff options
-rw-r--r-- | src/ipc_listener.cpp | 208 | ||||
-rw-r--r-- | src/ipc_listener.hpp | 9 | ||||
-rw-r--r-- | src/session.cpp | 2 | ||||
-rw-r--r-- | src/socket_base.cpp | 6 | ||||
-rw-r--r-- | src/tcp_listener.cpp | 130 | ||||
-rw-r--r-- | src/tcp_listener.hpp | 2 |
6 files changed, 82 insertions, 275 deletions
diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index e11d522..a61ceed 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -18,34 +18,24 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include "ipc_listener.hpp" + +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS + #include <new> #include <string.h> -#include "ipc_listener.hpp" -#include "platform.hpp" #include "tcp_engine.hpp" #include "io_thread.hpp" #include "session.hpp" #include "config.hpp" #include "err.hpp" -#ifdef ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#else #include <unistd.h> #include <sys/socket.h> -#include <arpa/inet.h> -#include <netinet/tcp.h> -#include <netinet/in.h> -#include <netdb.h> #include <fcntl.h> -#ifndef ZMQ_HAVE_OPENVMS #include <sys/un.h> -#else -#include <ioctl.h> -#endif -#endif zmq::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_, socket_base_t *socket_, const options_t &options_) : @@ -105,176 +95,51 @@ void zmq::ipc_listener_t::in_event () send_attach (session, engine, false); } -#ifdef ZMQ_HAVE_WINDOWS - -int zmq::ipc_listener_t::set_address (const char *protocol_, const char *addr_) +int zmq::ipc_listener_t::set_address (const char *addr_) { - // IPC protocol is not supported on Windows platform. - if (strcmp (protocol_, "tcp") != 0 ) { - errno = EPROTONOSUPPORT; - return -1; - } + // Get rid of the file associated with the UNIX domain socket that + // may have been left behind by the previous run of the application. + ::unlink (addr_); - // Convert the interface into sockaddr_in structure. - int rc = resolve_ip_interface (&addr, &addr_len, addr_); + // Convert the address into sockaddr_un structure. + int rc = resolve_local_path (&addr, &addr_len, addr_); if (rc != 0) - return rc; + return -1; // Create a listening socket. - s = ::socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); - if (s == INVALID_SOCKET) { - wsa_error_to_errno (); + s = ::socket (AF_UNIX, SOCK_STREAM, 0); + if (s == -1) return -1; - } - // Allow reusing of the address. - int flag = 1; - rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, - (const char*) &flag, sizeof (int)); - wsa_assert (rc != SOCKET_ERROR); + // Set the non-blocking flag. + int flag = fcntl (s, F_GETFL, 0); + if (flag == -1) + flag = 0; + rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); + errno_assert (rc != -1); - // Bind the socket to the network interface and port. + // Bind the socket to the file path. rc = bind (s, (struct sockaddr*) &addr, addr_len); - if (rc == SOCKET_ERROR) { - wsa_error_to_errno (); + if (rc != 0) { + int err = errno; + if (close () != 0) + return -1; + errno = err; return -1; } + has_file = true; // Listen for incomming connections. rc = listen (s, options.backlog); - if (rc == SOCKET_ERROR) { - wsa_error_to_errno (); - return -1; - } - - return 0; -} - -int zmq::ipc_listener_t::close () -{ - zmq_assert (s != retired_fd); - int rc = closesocket (s); - wsa_assert (rc != SOCKET_ERROR); - s = retired_fd; - return 0; -} - -zmq::fd_t zmq::ipc_listener_t::accept () -{ - zmq_assert (s != retired_fd); - - // Accept one incoming connection. - fd_t sock = ::accept (s, NULL, NULL); - if (sock == INVALID_SOCKET && - (WSAGetLastError () == WSAEWOULDBLOCK || - WSAGetLastError () == WSAECONNRESET)) - return retired_fd; - - zmq_assert (sock != INVALID_SOCKET); - - // Set to non-blocking mode. - unsigned long argp = 1; - int rc = ioctlsocket (sock, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); - - return sock; -} - -#else - -int zmq::ipc_listener_t::set_address (const char *protocol_, const char *addr_) -{ - if (strcmp (protocol_, "tcp") == 0 ) { - - // Resolve the sockaddr to bind to. - int rc = resolve_ip_interface (&addr, &addr_len, addr_); - if (rc != 0) - return -1; - - // Create a listening socket. - s = ::socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); - if (s == -1) - return -1; - - // Allow reusing of the address. - int flag = 1; - rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); - errno_assert (rc == 0); - - // Bind the socket to the network interface and port. - rc = bind (s, (struct sockaddr*) &addr, addr_len); - if (rc != 0) { - int err = errno; - if (close () != 0) - return -1; - errno = err; - return -1; - } - - // Listen for incomming connections. - rc = listen (s, options.backlog); - if (rc != 0) { - int err = errno; - if (close () != 0) - return -1; - errno = err; + if (rc != 0) { + int err = errno; + if (close () != 0) return -1; - } - - return 0; + errno = err; + return -1; } -#ifndef ZMQ_HAVE_OPENVMS - else if (strcmp (protocol_, "ipc") == 0) { - // Get rid of the file associated with the UNIX domain socket that - // may have been left behind by the previous run of the application. - ::unlink (addr_); - - // Convert the address into sockaddr_un structure. - int rc = resolve_local_path (&addr, &addr_len, addr_); - if (rc != 0) - return -1; - - // Create a listening socket. - s = ::socket (AF_UNIX, SOCK_STREAM, 0); - if (s == -1) - return -1; - - // Set the non-blocking flag. - int flag = fcntl (s, F_GETFL, 0); - if (flag == -1) - flag = 0; - rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); - errno_assert (rc != -1); - - // Bind the socket to the file path. - rc = bind (s, (struct sockaddr*) &addr, addr_len); - if (rc != 0) { - int err = errno; - if (close () != 0) - return -1; - errno = err; - return -1; - } - has_file = true; - - // Listen for incomming connections. - rc = listen (s, options.backlog); - if (rc != 0) { - int err = errno; - if (close () != 0) - return -1; - errno = err; - return -1; - } - - return 0; - } -#endif - else { - errno = EPROTONOSUPPORT; - return -1; - } + return 0; } int zmq::ipc_listener_t::close () @@ -285,7 +150,6 @@ int zmq::ipc_listener_t::close () return -1; s = retired_fd; -#ifndef ZMQ_HAVE_OPENVMS // If there's an underlying UNIX domain socket, get rid of the file it // is associated with. struct sockaddr_un *su = (struct sockaddr_un*) &addr; @@ -294,7 +158,6 @@ int zmq::ipc_listener_t::close () if (rc != 0) return -1; } -#endif return 0; } @@ -333,19 +196,14 @@ zmq::fd_t zmq::ipc_listener_t::accept () errno_assert (sock != -1); // Set to non-blocking mode. -#ifdef ZMQ_HAVE_OPENVMS - int flags = 1; - int rc = ioctl (sock, FIONBIO, &flags); - errno_assert (rc != -1); -#else int flags = fcntl (s, F_GETFL, 0); if (flags == -1) flags = 0; int rc = fcntl (sock, F_SETFL, flags | O_NONBLOCK); errno_assert (rc != -1); -#endif return sock; } #endif + diff --git a/src/ipc_listener.hpp b/src/ipc_listener.hpp index 5d0e1d1..ce1e20d 100644 --- a/src/ipc_listener.hpp +++ b/src/ipc_listener.hpp @@ -21,6 +21,10 @@ #ifndef __ZMQ_IPC_LISTENER_HPP_INCLUDED__ #define __ZMQ_IPC_LISTENER_HPP_INCLUDED__ +#include "platform.hpp" + +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS + #include "fd.hpp" #include "ip.hpp" #include "own.hpp" @@ -39,7 +43,7 @@ namespace zmq ~ipc_listener_t (); // Set address to listen on. - int set_address (const char* protocol_, const char *addr_); + int set_address (const char *addr_); private: @@ -81,3 +85,6 @@ namespace zmq } #endif + +#endif + diff --git a/src/session.cpp b/src/session.cpp index 5db080f..8001ba8 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -314,6 +314,7 @@ void zmq::session_t::start_connecting (bool wait_) return; } +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS if (protocol == "ipc") { ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t ( io_thread, this, options, address.c_str (), wait_); @@ -321,6 +322,7 @@ void zmq::session_t::start_connecting (bool wait_) launch_child (connecter); return; } +#endif #if defined ZMQ_HAVE_VTCP if (protocol == "vtcp") { diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 975934f..593ff66 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -368,7 +368,7 @@ int zmq::socket_base_t::bind (const char *addr_) tcp_listener_t *listener = new (std::nothrow) tcp_listener_t ( io_thread, this, options); alloc_assert (listener); - int rc = listener->set_address (protocol.c_str(), address.c_str ()); + int rc = listener->set_address (address.c_str ()); if (rc != 0) { delete listener; return -1; @@ -377,11 +377,12 @@ int zmq::socket_base_t::bind (const char *addr_) return 0; } +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS if (protocol == "ipc") { ipc_listener_t *listener = new (std::nothrow) ipc_listener_t ( io_thread, this, options); alloc_assert (listener); - int rc = listener->set_address (protocol.c_str(), address.c_str ()); + int rc = listener->set_address (address.c_str ()); if (rc != 0) { delete listener; return -1; @@ -389,6 +390,7 @@ int zmq::socket_base_t::bind (const char *addr_) launch_child (listener); return 0; } +#endif #if defined ZMQ_HAVE_VTCP if (protocol == "vtcp") { diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 360be8c..07e2803 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -22,8 +22,8 @@ #include <string.h> -#include "tcp_listener.hpp" #include "platform.hpp" +#include "tcp_listener.hpp" #include "tcp_engine.hpp" #include "io_thread.hpp" #include "session.hpp" @@ -40,11 +40,10 @@ #include <netinet/in.h> #include <netdb.h> #include <fcntl.h> -#ifndef ZMQ_HAVE_OPENVMS -#include <sys/un.h> -#else -#include <ioctl.h> #endif + +#ifdef ZMQ_HAVE_OPENVMS +#include <ioctl.h> #endif zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_, @@ -127,14 +126,8 @@ void zmq::tcp_listener_t::in_event () #ifdef ZMQ_HAVE_WINDOWS -int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_) +int zmq::tcp_listener_t::set_address (const char *addr_) { - // IPC protocol is not supported on Windows platform. - if (strcmp (protocol_, "tcp") != 0 ) { - errno = EPROTONOSUPPORT; - return -1; - } - // Convert the interface into sockaddr_in structure. int rc = resolve_ip_interface (&addr, &addr_len, addr_); if (rc != 0) @@ -202,99 +195,44 @@ zmq::fd_t zmq::tcp_listener_t::accept () #else -int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_) +int zmq::tcp_listener_t::set_address (const char *addr_) { - if (strcmp (protocol_, "tcp") == 0 ) { + // Resolve the sockaddr to bind to. + int rc = resolve_ip_interface (&addr, &addr_len, addr_); + if (rc != 0) + return -1; - // Resolve the sockaddr to bind to. - int rc = resolve_ip_interface (&addr, &addr_len, addr_); - if (rc != 0) - return -1; + // Create a listening socket. + s = ::socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); + if (s == -1) + return -1; - // Create a listening socket. - s = ::socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); - if (s == -1) - return -1; + // Allow reusing of the address. + int flag = 1; + rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); + errno_assert (rc == 0); - // Allow reusing of the address. - int flag = 1; - rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); - errno_assert (rc == 0); - - // Bind the socket to the network interface and port. - rc = bind (s, (struct sockaddr*) &addr, addr_len); - if (rc != 0) { - int err = errno; - if (close () != 0) - return -1; - errno = err; - return -1; - } - - // Listen for incomming connections. - rc = listen (s, options.backlog); - if (rc != 0) { - int err = errno; - if (close () != 0) - return -1; - errno = err; + // Bind the socket to the network interface and port. + rc = bind (s, (struct sockaddr*) &addr, addr_len); + if (rc != 0) { + int err = errno; + if (close () != 0) return -1; - } - - return 0; + errno = err; + return -1; } -#ifndef ZMQ_HAVE_OPENVMS - else if (strcmp (protocol_, "ipc") == 0) { - - // Get rid of the file associated with the UNIX domain socket that - // may have been left behind by the previous run of the application. - ::unlink (addr_); - - // Convert the address into sockaddr_un structure. - int rc = resolve_local_path (&addr, &addr_len, addr_); - if (rc != 0) - return -1; - // Create a listening socket. - s = ::socket (AF_UNIX, SOCK_STREAM, 0); - if (s == -1) - return -1; - - // Set the non-blocking flag. - int flag = fcntl (s, F_GETFL, 0); - if (flag == -1) - flag = 0; - rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); - errno_assert (rc != -1); - - // Bind the socket to the file path. - rc = bind (s, (struct sockaddr*) &addr, addr_len); - if (rc != 0) { - int err = errno; - if (close () != 0) - return -1; - errno = err; - return -1; - } - has_file = true; - - // Listen for incomming connections. - rc = listen (s, options.backlog); - if (rc != 0) { - int err = errno; - if (close () != 0) - return -1; - errno = err; + // Listen for incomming connections. + rc = listen (s, options.backlog); + if (rc != 0) { + int err = errno; + if (close () != 0) return -1; - } - - return 0; - } -#endif - else { - errno = EPROTONOSUPPORT; + errno = err; return -1; - } + } + + return 0; } int zmq::tcp_listener_t::close () diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp index 2dd43ce..857317a 100644 --- a/src/tcp_listener.hpp +++ b/src/tcp_listener.hpp @@ -39,7 +39,7 @@ namespace zmq ~tcp_listener_t (); // Set address to listen on. - int set_address (const char* protocol_, const char *addr_); + int set_address (const char *addr_); private: |