From d5f3628ad08849a0c978f7d23dc678133ed33c42 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 29 Jul 2011 09:37:43 +0200 Subject: Different connecters simplified Signed-off-by: Martin Sustrik --- src/ip.cpp | 31 +++++++++ src/ip.hpp | 8 +-- src/ipc_connecter.cpp | 174 +++++------------------------------------------- src/ipc_connecter.hpp | 7 ++ src/signaler.cpp | 20 +----- src/tcp_connecter.cpp | 179 +++++++++++--------------------------------------- src/tcp_connecter.hpp | 2 +- src/tcp_engine.cpp | 27 ++------ 8 files changed, 108 insertions(+), 340 deletions(-) (limited to 'src') diff --git a/src/ip.cpp b/src/ip.cpp index e643663..7c08373 100644 --- a/src/ip.cpp +++ b/src/ip.cpp @@ -28,6 +28,18 @@ #include "platform.hpp" #include "stdint.hpp" +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#include +#endif + +#if !defined ZMQ_HAVE_WINDOWS +#include +#endif + +#if defined ZMQ_HAVE_OPENVMS +#include +#endif + #if defined ZMQ_HAVE_SOLARIS #include @@ -367,4 +379,23 @@ void zmq::tune_tcp_socket (fd_t s_) #endif } +void zmq::unblock_socket (fd_t s_) +{ +#ifdef ZMQ_HAVE_WINDOWS + u_long nonblock = 1; + int rc = ioctlsocket (s_, FIONBIO, &nonblock); + wsa_assert (rc != SOCKET_ERROR); +#elif ZMQ_HAVE_OPENVMS + int nonblock = 1; + int rc = ioctl (s_, FIONBIO, &nonblock); + errno_assert (rc != -1); +#else + int flags = fcntl (s_, F_GETFL, 0); + if (flags == -1) + flags = 0; + int rc = fcntl (s_, F_SETFL, flags | O_NONBLOCK); + errno_assert (rc != -1); +#endif +} + diff --git a/src/ip.hpp b/src/ip.hpp index 8a690b0..75193a6 100644 --- a/src/ip.hpp +++ b/src/ip.hpp @@ -35,10 +35,6 @@ #include #endif -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS -#include -#endif - // Some platforms (notably Darwin/OSX and NetBSD) do not define all AI_ // flags for getaddrinfo(). This can be worked around safely by defining // these to 0. @@ -68,6 +64,10 @@ namespace zmq // Tunes the supplied TCP socket for the best latency. void tune_tcp_socket (fd_t s_); + + // Sets the socket into non-blocking mode. + void unblock_socket (fd_t s_); + } #endif diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index 942cd49..4b8f3bf 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -18,32 +18,24 @@ along with this program. If not, see . */ +#include "ipc_connecter.hpp" + +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS + #include #include -#include "ipc_connecter.hpp" #include "tcp_engine.hpp" #include "io_thread.hpp" #include "platform.hpp" #include "random.hpp" -#include "ip.hpp" #include "err.hpp" +#include "ip.hpp" -#if defined ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#else #include #include #include -#include -#include -#include -#include -#include -#ifdef ZMQ_HAVE_OPENVMS -#include -#endif -#endif +#include zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, class session_t *session_, const options_t &options_, @@ -175,166 +167,33 @@ int zmq::ipc_connecter_t::get_new_reconnect_ivl () return this_interval; } -#ifdef ZMQ_HAVE_WINDOWS - -int zmq::ipc_connecter_t::set_address (const char *protocol_, const char *addr_) +int zmq::ipc_connecter_t::set_address (const char *addr_) { - errno = EPROTONOSUPPORT; - return -1; + return resolve_local_path (&addr, &addr_len, addr_); } int zmq::ipc_connecter_t::open () { zmq_assert (s == retired_fd); + struct sockaddr *sa = (struct sockaddr*) &addr; // Create the socket. - s = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); - if (s == INVALID_SOCKET) { - wsa_error_to_errno (); + zmq_assert (AF_UNIX == sa->sa_family); + s = socket (AF_UNIX, SOCK_STREAM, 0); + if (s == -1) return -1; - } - // Set to non-blocking mode. - unsigned long argp = 1; - int rc = ioctlsocket (s, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); + // Set the non-blocking flag. + unblock_socket (s); // Connect to the remote peer. - rc = ::connect (s, (sockaddr*) &addr, addr_len); + int rc = ::connect (s, (struct sockaddr*) &addr, sizeof (sockaddr_un)); // Connect was successfull immediately. if (rc == 0) return 0; - // Asynchronous connect was launched. - if (rc == SOCKET_ERROR && (WSAGetLastError () == WSAEINPROGRESS || - WSAGetLastError () == WSAEWOULDBLOCK)) { - errno = EAGAIN; - return -1; - } - - wsa_error_to_errno (); - return -1; -} - -int zmq::ipc_connecter_t::close () -{ - zmq_assert (s != retired_fd); - int rc = closesocket (s); - wsa_assert (rc != SOCKET_ERROR); - s = retired_fd; - return 0; -} - -zmq::fd_t zmq::ipc_connecter_t::connect () -{ - // Nonblocking connect have finished. Check whether an error occured. - int err = 0; - socklen_t len = sizeof err; - int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len); - zmq_assert (rc == 0); - if (err != 0) { - - // Assert that the error was caused by the networking problems - // rather than 0MQ bug. - if (err == WSAECONNREFUSED || err == WSAETIMEDOUT || - err == WSAECONNABORTED || err == WSAEHOSTUNREACH || - err == WSAENETUNREACH || err == WSAENETDOWN) - return retired_fd; - - wsa_assert_no (err); - } - - // Return the newly connected socket. - fd_t result = s; - s = retired_fd; - return result; -} - -#else - -int zmq::ipc_connecter_t::set_address (const char *addr_) -{ - return resolve_local_path (&addr, &addr_len, addr_); -} - -int zmq::ipc_connecter_t::open () -{ - zmq_assert (s == retired_fd); - struct sockaddr *sa = (struct sockaddr*) &addr; - - if (AF_UNIX != sa->sa_family) { - - // Create the socket. - s = socket (sa->sa_family, SOCK_STREAM, IPPROTO_TCP); - if (s == -1) - return -1; - - // Set to non-blocking mode. -#ifdef ZMQ_HAVE_OPENVMS - int flags = 1; - int rc = ioctl (s, FIONBIO, &flags); - errno_assert (rc != -1); -#else - int flags = fcntl (s, F_GETFL, 0); - if (flags == -1) - flags = 0; - int rc = fcntl (s, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc != -1); -#endif - - // Connect to the remote peer. - rc = ::connect (s, (struct sockaddr*) &addr, addr_len); - - // Connect was successfull immediately. - if (rc == 0) - return 0; - - // Asynchronous connect was launched. - if (rc == -1 && errno == EINPROGRESS) { - errno = EAGAIN; - return -1; - } - - // Error occured. - int err = errno; - close (); - errno = err; - return -1; - } - -#ifndef ZMQ_HAVE_OPENVMS - else { - - // Create the socket. - zmq_assert (AF_UNIX == sa->sa_family); - s = socket (AF_UNIX, SOCK_STREAM, 0); - if (s == -1) - return -1; - - // Set the non-blocking flag. - int flag = fcntl (s, F_GETFL, 0); - if (flag == -1) - flag = 0; - int rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); - errno_assert (rc != -1); - - // Connect to the remote peer. - rc = ::connect (s, (struct sockaddr*) &addr, sizeof (sockaddr_un)); - - // Connect was successfull immediately. - if (rc == 0) - return 0; - - // Error occured. - int err = errno; - close (); - errno = err; - return -1; - } -#endif - - zmq_assert (false); + // Forward the error. return -1; } @@ -379,3 +238,4 @@ zmq::fd_t zmq::ipc_connecter_t::connect () } #endif + diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index 272e09c..ee57c12 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -21,6 +21,10 @@ #ifndef __IPC_CONNECTER_HPP_INCLUDED__ #define __IPC_CONNECTER_HPP_INCLUDED__ +#include "platform.hpp" + +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS + #include "fd.hpp" #include "ip.hpp" #include "own.hpp" @@ -110,3 +114,6 @@ namespace zmq } #endif + +#endif + diff --git a/src/signaler.cpp b/src/signaler.cpp index 57c7f55..1c1c5b6 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -71,8 +71,6 @@ #include "windows.hpp" #else #include -#include -#include #include #include #include @@ -86,22 +84,8 @@ zmq::signaler_t::signaler_t () errno_assert (rc == 0); // Set both fds to non-blocking mode. -#if defined ZMQ_HAVE_WINDOWS - unsigned long argp = 1; - rc = ioctlsocket (w, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); - rc = ioctlsocket (r, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); -#else - int flags = fcntl (w, F_GETFL, 0); - errno_assert (flags >= 0); - rc = fcntl (w, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc == 0); - flags = fcntl (r, F_GETFL, 0); - errno_assert (flags >= 0); - rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc == 0); -#endif + unblock_socket (w); + unblock_socket (r); } zmq::signaler_t::~signaler_t () diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 27e56a1..bca7085 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -177,8 +177,6 @@ int zmq::tcp_connecter_t::get_new_reconnect_ivl () return this_interval; } -#ifdef ZMQ_HAVE_WINDOWS - int zmq::tcp_connecter_t::set_address (const char *addr_) { return resolve_ip_hostname (&addr, &addr_len, addr_); @@ -190,193 +188,96 @@ int zmq::tcp_connecter_t::open () // Create the socket. s = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); +#ifdef ZMQ_HAVE_WINDOWS if (s == INVALID_SOCKET) { wsa_error_to_errno (); return -1; } +#else + if (s == -1) + return -1; +#endif - // Set to non-blocking mode. - unsigned long argp = 1; - int rc = ioctlsocket (s, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); + // Set the socket to non-blocking mode so that we get async connect(). + unblock_socket (s); // Connect to the remote peer. - rc = ::connect (s, (sockaddr*) &addr, addr_len); + int rc = ::connect (s, (struct sockaddr*) &addr, addr_len); // Connect was successfull immediately. if (rc == 0) return 0; // Asynchronous connect was launched. +#ifdef ZMQ_HAVE_WINDOWS if (rc == SOCKET_ERROR && (WSAGetLastError () == WSAEINPROGRESS || WSAGetLastError () == WSAEWOULDBLOCK)) { errno = EAGAIN; return -1; - } - + } wsa_error_to_errno (); +#else + if (rc == -1 && errno == EINPROGRESS) { + errno = EAGAIN; + return -1; + } +#endif return -1; } -int zmq::tcp_connecter_t::close () -{ - zmq_assert (s != retired_fd); - int rc = closesocket (s); - wsa_assert (rc != SOCKET_ERROR); - s = retired_fd; - return 0; -} - zmq::fd_t zmq::tcp_connecter_t::connect () { - // Nonblocking connect have finished. Check whether an error occured. + // Async connect have finished. Check whether an error occured. int err = 0; - socklen_t len = sizeof err; +#if defined ZMQ_HAVE_HPUX + int len = sizeof (err); +#else + socklen_t len = sizeof (err); +#endif + int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len); + + // Assert if the error was caused by 0MQ bug. + // Networking problems are OK. No need to assert. +#ifdef ZMQ_HAVE_WINDOWS zmq_assert (rc == 0); if (err != 0) { - - // Assert that the error was caused by the networking problems - // rather than 0MQ bug. if (err == WSAECONNREFUSED || err == WSAETIMEDOUT || err == WSAECONNABORTED || err == WSAEHOSTUNREACH || err == WSAENETUNREACH || err == WSAENETDOWN) return retired_fd; - wsa_assert_no (err); } - - // Return the newly connected socket. - fd_t result = s; - s = retired_fd; - return result; -} - -#else - -int zmq::tcp_connecter_t::set_address (const char *addr_) -{ - return resolve_ip_hostname (&addr, &addr_len, addr_); -} - -int zmq::tcp_connecter_t::open () -{ - zmq_assert (s == retired_fd); - struct sockaddr *sa = (struct sockaddr*) &addr; - - if (AF_UNIX != sa->sa_family) { - - // Create the socket. - s = socket (sa->sa_family, SOCK_STREAM, IPPROTO_TCP); - if (s == -1) - return -1; - - // Set to non-blocking mode. -#ifdef ZMQ_HAVE_OPENVMS - int flags = 1; - int rc = ioctl (s, FIONBIO, &flags); - errno_assert (rc != -1); #else - int flags = fcntl (s, F_GETFL, 0); - if (flags == -1) - flags = 0; - int rc = fcntl (s, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc != -1); -#endif - - // Connect to the remote peer. - rc = ::connect (s, (struct sockaddr*) &addr, addr_len); - - // Connect was successfull immediately. - if (rc == 0) - return 0; - - // Asynchronous connect was launched. - if (rc == -1 && errno == EINPROGRESS) { - errno = EAGAIN; - return -1; - } - - // Error occured. - int err = errno; - close (); - errno = err; - return -1; - } - -#ifndef ZMQ_HAVE_OPENVMS - else { - - // Create the socket. - zmq_assert (AF_UNIX == sa->sa_family); - s = socket (AF_UNIX, SOCK_STREAM, 0); - if (s == -1) - return -1; - - // Set the non-blocking flag. - int flag = fcntl (s, F_GETFL, 0); - if (flag == -1) - flag = 0; - int rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); - errno_assert (rc != -1); - - // Connect to the remote peer. - rc = ::connect (s, (struct sockaddr*) &addr, sizeof (sockaddr_un)); - - // Connect was successfull immediately. - if (rc == 0) - return 0; - - // Error occured. - int err = errno; - close (); - errno = err; - return -1; - } -#endif - - zmq_assert (false); - return -1; -} - -int zmq::tcp_connecter_t::close () -{ - zmq_assert (s != retired_fd); - int rc = ::close (s); - if (rc != 0) - return -1; - s = retired_fd; - return 0; -} -zmq::fd_t zmq::tcp_connecter_t::connect () -{ // Following code should handle both Berkeley-derived socket // implementations and Solaris. - int err = 0; -#if defined ZMQ_HAVE_HPUX - int len = sizeof (err); -#else - socklen_t len = sizeof (err); -#endif - int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len); if (rc == -1) err = errno; if (err != 0) { - - // Assert if the error was caused by 0MQ bug. - // Networking problems are OK. No need to assert. errno = err; errno_assert (errno == ECONNREFUSED || errno == ECONNRESET || errno == ETIMEDOUT || errno == EHOSTUNREACH || errno == ENETUNREACH || errno == ENETDOWN); - return retired_fd; } +#endif + // Return the newly connected socket. fd_t result = s; s = retired_fd; return result; } +void zmq::tcp_connecter_t::close () +{ + zmq_assert (s != retired_fd); +#ifdef ZMQ_HAVE_WINDOWS + int rc = closesocket (s); + wsa_assert (rc != SOCKET_ERROR); +#else + int rc = ::close (s); + errno_assert (rc == 0); #endif + s = retired_fd; +} diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index 6c7aa1b..b5b39e8 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -74,7 +74,7 @@ namespace zmq int open (); // Close the connecting socket. - int close (); + void close (); // Get the file descriptor of newly created connection. Returns // retired_fd if the connection was unsuccessfull. diff --git a/src/tcp_engine.cpp b/src/tcp_engine.cpp index f940b84..f938d71 100644 --- a/src/tcp_engine.cpp +++ b/src/tcp_engine.cpp @@ -39,6 +39,7 @@ #include "session.hpp" #include "config.hpp" #include "err.hpp" +#include "ip.hpp" zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) : s (fd_), @@ -53,28 +54,12 @@ zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) : options (options_), plugged (false) { - int rc; - - // Set the socket to the non-blocking mode. -#ifdef ZMQ_HAVE_WINDOWS - u_long nonblock = 1; - rc = ioctlsocket (s, FIONBIO, &nonblock); - wsa_assert (rc != SOCKET_ERROR); -#elif ZMQ_HAVE_OPENVMS - int nonblock = 1; - rc = ioctl (s, FIONBIO, &nonblock); - errno_assert (rc != -1); -#else - int flags = fcntl (s, F_GETFL, 0); - if (flags == -1) - flags = 0; - rc = fcntl (s, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc != -1); -#endif + // Get the socket into non-blocking mode. + unblock_socket (s); // Set the socket buffer limits for the underlying socket. if (options.sndbuf) { - rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, + int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, (char*) &options.sndbuf, sizeof (int)); #ifdef ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); @@ -83,7 +68,7 @@ zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) : #endif } if (options.rcvbuf) { - rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, + int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, (char*) &options.rcvbuf, sizeof (int)); #ifdef ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); @@ -96,7 +81,7 @@ zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) : // Make sure that SIGPIPE signal is not generated when writing to a // connection that was already closed by the peer. int set = 1; - rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int)); + int rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int)); errno_assert (rc == 0); #endif } -- cgit v1.2.3