From 37128b7b1aeed9ad2bf6816560b85b5f94dd5bec Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 29 Apr 2010 17:31:57 +0200 Subject: fd_signaler_t renamed to signaler_t --- src/Makefile.am | 4 +- src/app_thread.cpp | 3 +- src/app_thread.hpp | 6 +- src/dispatcher.hpp | 4 +- src/fd_signaler.cpp | 432 ---------------------------------------------------- src/fd_signaler.hpp | 80 ---------- src/io_thread.cpp | 2 +- src/io_thread.hpp | 6 +- src/signaler.cpp | 432 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/signaler.hpp | 80 ++++++++++ 10 files changed, 524 insertions(+), 525 deletions(-) delete mode 100644 src/fd_signaler.cpp delete mode 100644 src/fd_signaler.hpp create mode 100644 src/signaler.cpp create mode 100644 src/signaler.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index 837cd5f..2cd5ace 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -63,7 +63,6 @@ libzmq_la_SOURCES = app_thread.hpp \ epoll.hpp \ err.hpp \ fd.hpp \ - fd_signaler.hpp \ forwarder.hpp \ fq.hpp \ i_inout.hpp \ @@ -96,6 +95,7 @@ libzmq_la_SOURCES = app_thread.hpp \ req.hpp \ select.hpp \ session.hpp \ + signaler.hpp \ socket_base.hpp \ stdint.hpp \ streamer.hpp \ @@ -127,7 +127,6 @@ libzmq_la_SOURCES = app_thread.hpp \ downstream.cpp \ epoll.cpp \ err.cpp \ - fd_signaler.cpp \ forwarder.cpp \ fq.cpp \ io_object.cpp \ @@ -151,6 +150,7 @@ libzmq_la_SOURCES = app_thread.hpp \ req.cpp \ select.cpp \ session.cpp \ + signaler.cpp \ socket_base.cpp \ streamer.cpp \ sub.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 0dad660..10068c0 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -35,7 +35,6 @@ #include "app_thread.hpp" #include "dispatcher.hpp" -#include "fd_signaler.hpp" #include "err.hpp" #include "pipe.hpp" #include "config.hpp" @@ -75,7 +74,7 @@ void zmq::app_thread_t::stop () send_stop (); } -zmq::fd_signaler_t *zmq::app_thread_t::get_signaler () +zmq::signaler_t *zmq::app_thread_t::get_signaler () { return &signaler; } diff --git a/src/app_thread.hpp b/src/app_thread.hpp index b7572da..2bca757 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -25,7 +25,7 @@ #include "stdint.hpp" #include "object.hpp" #include "yarray.hpp" -#include "fd_signaler.hpp" +#include "signaler.hpp" namespace zmq { @@ -43,7 +43,7 @@ namespace zmq void stop (); // Returns signaler associated with this application thread. - fd_signaler_t *get_signaler (); + signaler_t *get_signaler (); // Processes commands sent to this thread (if any). If 'block' is // set to true, returns only after at least one command was processed. @@ -71,7 +71,7 @@ namespace zmq sockets_t sockets; // App thread's signaler object. - fd_signaler_t signaler; + signaler_t signaler; // Timestamp of when commands were processed the last time. uint64_t last_processing_time; diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index 6648f5d..ff08abc 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -25,7 +25,7 @@ #include #include -#include "fd_signaler.hpp" +#include "signaler.hpp" #include "ypipe.hpp" #include "command.hpp" #include "config.hpp" @@ -125,7 +125,7 @@ namespace zmq io_threads_t io_threads; // Signalers for both application and I/O threads. - std::vector signalers; + std::vector signalers; // Pipe to hold the commands. typedef ypipe_t . -*/ - -#include "fd_signaler.hpp" -#include "platform.hpp" -#include "err.hpp" -#include "fd.hpp" -#include "ip.hpp" - -#if defined ZMQ_HAVE_OPENVMS -#include -#elif defined ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#else -#include -#include -#endif - -#if defined ZMQ_HAVE_EVENTFD - -#include - -zmq::fd_signaler_t::fd_signaler_t () -{ - // Create eventfd object. - fd = eventfd (0, 0); - errno_assert (fd != -1); - - // Set to non-blocking mode. - int flags = fcntl (fd, F_GETFL, 0); - if (flags == -1) - flags = 0; - int rc = fcntl (fd, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc != -1); -} - -zmq::fd_signaler_t::~fd_signaler_t () -{ - int rc = close (fd); - errno_assert (rc != -1); -} - -void zmq::fd_signaler_t::signal (int signal_) -{ - zmq_assert (signal_ >= 0 && signal_ < 64); - uint64_t inc = 1; - inc <<= signal_; - ssize_t sz = write (fd, &inc, sizeof (uint64_t)); - errno_assert (sz == sizeof (uint64_t)); -} - -uint64_t zmq::fd_signaler_t::poll () -{ - // Set to blocking mode. - int flags = fcntl (fd, F_GETFL, 0); - if (flags == -1) - flags = 0; - int rc = fcntl (fd, F_SETFL, flags & ~O_NONBLOCK); - errno_assert (rc != -1); - - uint64_t signals; - ssize_t sz; - while (true) { - sz = read (fd, &signals, sizeof (uint64_t)); - if (sz == -1) { - if (errno == EAGAIN || errno == EINTR) - continue; - errno_assert (false); - } - break; - } - - // Set to non-blocking mode. - rc = fcntl (fd, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc != -1); - - return signals; -} - -uint64_t zmq::fd_signaler_t::check () -{ - uint64_t signals; - ssize_t sz = read (fd, &signals, sizeof (uint64_t)); - if (sz == -1 && (errno == EAGAIN || errno == EINTR)) - return 0; - errno_assert (sz != -1); - return signals; -} - -zmq::fd_t zmq::fd_signaler_t::get_fd () -{ - return fd; -} - -#elif defined ZMQ_HAVE_WINDOWS - -zmq::fd_signaler_t::fd_signaler_t () -{ - // Windows have no 'socketpair' function. CreatePipe is no good as pipe - // handles cannot be polled on. Here we create the socketpair by hand. - - struct sockaddr_in addr; - SOCKET listener; - int addrlen = sizeof (addr); - - w = INVALID_SOCKET; - r = INVALID_SOCKET; - - fd_t rcs = (listener = socket (AF_INET, SOCK_STREAM, 0)); - wsa_assert (rcs != INVALID_SOCKET); - - memset (&addr, 0, sizeof (addr)); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); - addr.sin_port = 0; - - int rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr)); - wsa_assert (rc != SOCKET_ERROR); - - rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen); - wsa_assert (rc != SOCKET_ERROR); - - // Listen for incomming connections. - rc = listen (listener, 1); - wsa_assert (rc != SOCKET_ERROR); - - // Create the socket. - w = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); - wsa_assert (w != INVALID_SOCKET); - - // Connect to the remote peer. - rc = connect (w, (sockaddr *) &addr, sizeof (addr)); - wsa_assert (rc != SOCKET_ERROR); - - // Accept connection from w. - r = accept (listener, NULL, NULL); - wsa_assert (r != INVALID_SOCKET); - - // Set the read site of the pair to non-blocking mode. - unsigned long argp = 1; - rc = ioctlsocket (r, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); - - // We don't need the listening socket anymore. Close it. - rc = closesocket (listener); - wsa_assert (rc != SOCKET_ERROR); -} - -zmq::fd_signaler_t::~fd_signaler_t () -{ - int rc = closesocket (w); - wsa_assert (rc != SOCKET_ERROR); - - rc = closesocket (r); - wsa_assert (rc != SOCKET_ERROR); -} - -void zmq::fd_signaler_t::signal (int signal_) -{ - // TODO: Note that send is a blocking operation. - // How should we behave if the signal cannot be written to the signaler? - - zmq_assert (signal_ >= 0 && signal_ < 64); - char c = (char) signal_; - int rc = send (w, &c, 1, 0); - win_assert (rc != SOCKET_ERROR); -} - -uint64_t zmq::fd_signaler_t::poll () -{ - // Switch to blocking mode. - unsigned long argp = 0; - int rc = ioctlsocket (r, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); - - // Get the signals. Given that we are in the blocking mode now, - // there should be at least a single signal returned. - uint64_t signals = check (); - zmq_assert (signals); - - // Switch back to non-blocking mode. - argp = 1; - rc = ioctlsocket (r, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); - - return signals; -} - -uint64_t zmq::fd_signaler_t::check () -{ - unsigned char buffer [32]; - int nbytes = recv (r, (char*) buffer, 32, 0); - if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) - return 0; - wsa_assert (nbytes != -1); - - uint64_t signals = 0; - for (int pos = 0; pos != nbytes; pos++) { - zmq_assert (buffer [pos] < 64); - signals |= (uint64_t (1) << (buffer [pos])); - } - return signals; -} - -zmq::fd_t zmq::fd_signaler_t::get_fd () -{ - return r; -} - -#elif defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX - -#include -#include - -zmq::fd_signaler_t::fd_signaler_t () -{ - int sv [2]; - int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); - errno_assert (rc == 0); - w = sv [0]; - r = sv [1]; - - // Set the reader to non-blocking mode. - int flags = fcntl (r, F_GETFL, 0); - if (flags == -1) - flags = 0; - rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc != -1); -} - -zmq::fd_signaler_t::~fd_signaler_t () -{ - close (w); - close (r); -} - -void zmq::fd_signaler_t::signal (int signal_) -{ - zmq_assert (signal_ >= 0 && signal_ < 64); - unsigned char c = (unsigned char) signal_; - ssize_t nbytes = send (w, &c, 1, 0); - errno_assert (nbytes == 1); -} - -uint64_t zmq::fd_signaler_t::poll () -{ - // Set the reader to blocking mode. - int flags = fcntl (r, F_GETFL, 0); - if (flags == -1) - flags = 0; - int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); - errno_assert (rc != -1); - - // Poll for events. - uint64_t signals = check (); - - // Set the reader to non-blocking mode. - flags = fcntl (r, F_GETFL, 0); - if (flags == -1) - flags = 0; - rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc != -1); - - return signals; -} - -uint64_t zmq::fd_signaler_t::check () -{ - unsigned char buffer [64]; - ssize_t nbytes = recv (r, buffer, 64, 0); - if (nbytes == -1 && errno == EAGAIN) - return 0; - zmq_assert (nbytes != -1); - - uint64_t signals = 0; - for (int pos = 0; pos != nbytes; pos ++) { - zmq_assert (buffer [pos] < 64); - signals |= (uint64_t (1) << (buffer [pos])); - } - return signals; -} - -zmq::fd_t zmq::fd_signaler_t::get_fd () -{ - return r; -} - -#else - -#include -#include - -zmq::fd_signaler_t::fd_signaler_t () -{ - int sv [2]; - int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); - errno_assert (rc == 0); - w = sv [0]; - r = sv [1]; -} - -zmq::fd_signaler_t::~fd_signaler_t () -{ - close (w); - close (r); -} - -void zmq::fd_signaler_t::signal (int signal_) -{ - // TODO: Note that send is a blocking operation. - // How should we behave if the signal cannot be written to the signaler? - - zmq_assert (signal_ >= 0 && signal_ < 64); - unsigned char c = (unsigned char) signal_; - ssize_t nbytes = send (w, &c, 1, 0); - errno_assert (nbytes == 1); -} - -uint64_t zmq::fd_signaler_t::poll () -{ - unsigned char buffer [64]; - ssize_t nbytes = recv (r, buffer, 64, 0); - zmq_assert (nbytes != -1); - - uint64_t signals = 0; - for (int pos = 0; pos != nbytes; pos ++) { - zmq_assert (buffer [pos] < 64); - signals |= (uint64_t (1) << (buffer [pos])); - } - return signals; -} - -uint64_t zmq::fd_signaler_t::check () -{ - unsigned char buffer [64]; - ssize_t nbytes = recv (r, buffer, 64, MSG_DONTWAIT); - if (nbytes == -1 && errno == EAGAIN) - return 0; - zmq_assert (nbytes != -1); - - uint64_t signals = 0; - for (int pos = 0; pos != nbytes; pos ++) { - zmq_assert (buffer [pos] < 64); - signals |= (uint64_t (1) << (buffer [pos])); - } - return signals; -} - -zmq::fd_t zmq::fd_signaler_t::get_fd () -{ - return r; -} - -#endif - -#if defined ZMQ_HAVE_OPENVMS - -int zmq::fd_signaler_t::socketpair (int domain_, int type_, int protocol_, - int sv_ [2]) -{ - int listener; - sockaddr_in lcladdr; - socklen_t lcladdr_len; - int rc; - int on = 1; - - zmq_assert (type_ == SOCK_STREAM); - - // Fill in the localhost address (127.0.0.1). - memset (&lcladdr, 0, sizeof (lcladdr)); - lcladdr.sin_family = AF_INET; - lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); - lcladdr.sin_port = 0; - - listener = socket (AF_INET, SOCK_STREAM, 0); - errno_assert (listener != -1); - - rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); - errno_assert (rc != -1); - - rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); - errno_assert (rc != -1); - - rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); - errno_assert (rc != -1); - - lcladdr_len = sizeof (lcladdr); - - rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len); - errno_assert (rc != -1); - - rc = listen (listener, 1); - errno_assert (rc != -1); - - sv_ [0] = socket (AF_INET, SOCK_STREAM, 0); - errno_assert (rc != -1); - - rc = setsockopt (sv_ [0], IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); - errno_assert (rc != -1); - - rc = setsockopt (sv_ [0], IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); - errno_assert (rc != -1); - - rc = connect (sv_ [0], (struct sockaddr*) &lcladdr, sizeof (lcladdr)); - errno_assert (rc != -1); - - sv_ [1] = accept (listener, NULL, NULL); - errno_assert (sv_ [1] != -1); - - close (listener); - - return 0; -} - -#endif - diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp deleted file mode 100644 index a6ccbba..0000000 --- a/src/fd_signaler.hpp +++ /dev/null @@ -1,80 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_FD_SIGNALER_HPP_INCLUDED__ -#define __ZMQ_FD_SIGNALER_HPP_INCLUDED__ - -#include "platform.hpp" -#include "fd.hpp" -#include "stdint.hpp" - -namespace zmq -{ - - // This object can be used to send individual signals from one thread to - // another. The specific of this pipe is that it has associated file - // descriptor and so it can be polled on. Same signal cannot be sent twice - // unless signals are retrieved by the reader side in the meantime. - - class fd_signaler_t - { - public: - - fd_signaler_t (); - ~fd_signaler_t (); - - // i_signaler interface implementation. - void signal (int signal_); - uint64_t poll (); - uint64_t check (); - fd_t get_fd (); - - private: - -#if defined ZMQ_HAVE_OPENVMS - - // Whilst OpenVMS supports socketpair - it maps to AF_INET only. - // Further, it does not set the socket options TCP_NODELAY and - // TCP_NODELACK which can lead to performance problems. We'll - // overload the socketpair function for this class. - // - // The bug will be fixed in V5.6 ECO4 and beyond. In the - // meantime, we'll create the socket pair manually. - static int socketpair (int domain_, int type_, int protocol_, - int sv_ [2]); - -#endif - -#if defined ZMQ_HAVE_EVENTFD - // Eventfd descriptor. - fd_t fd; -#else - // Write & read end of the socketpair. - fd_t w; - fd_t r; -#endif - - // Disable copying of fd_signeler object. - fd_signaler_t (const fd_signaler_t&); - void operator = (const fd_signaler_t&); - }; - -} - -#endif diff --git a/src/io_thread.cpp b/src/io_thread.cpp index 41f7f7d..7d997ad 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -54,7 +54,7 @@ void zmq::io_thread_t::stop () send_stop (); } -zmq::fd_signaler_t *zmq::io_thread_t::get_signaler () +zmq::signaler_t *zmq::io_thread_t::get_signaler () { return &signaler; } diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 84b9319..deb03a1 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -26,7 +26,7 @@ #include "object.hpp" #include "poller.hpp" #include "i_poll_events.hpp" -#include "fd_signaler.hpp" +#include "signaler.hpp" namespace zmq { @@ -51,7 +51,7 @@ namespace zmq void stop (); // Returns signaler associated with this I/O thread. - fd_signaler_t *get_signaler (); + signaler_t *get_signaler (); // i_poll_events implementation. void in_event (); @@ -71,7 +71,7 @@ namespace zmq // Poll thread gets notifications about incoming commands using // this signaler. - fd_signaler_t signaler; + signaler_t signaler; // Handle associated with signaler's file descriptor. poller_t::handle_t signaler_handle; diff --git a/src/signaler.cpp b/src/signaler.cpp new file mode 100644 index 0000000..a98f1fe --- /dev/null +++ b/src/signaler.cpp @@ -0,0 +1,432 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "signaler.hpp" +#include "platform.hpp" +#include "err.hpp" +#include "fd.hpp" +#include "ip.hpp" + +#if defined ZMQ_HAVE_OPENVMS +#include +#elif defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#include +#endif + +#if defined ZMQ_HAVE_EVENTFD + +#include + +zmq::signaler_t::signaler_t () +{ + // Create eventfd object. + fd = eventfd (0, 0); + errno_assert (fd != -1); + + // Set to non-blocking mode. + int flags = fcntl (fd, F_GETFL, 0); + if (flags == -1) + flags = 0; + int rc = fcntl (fd, F_SETFL, flags | O_NONBLOCK); + errno_assert (rc != -1); +} + +zmq::signaler_t::~signaler_t () +{ + int rc = close (fd); + errno_assert (rc != -1); +} + +void zmq::signaler_t::signal (int signal_) +{ + zmq_assert (signal_ >= 0 && signal_ < 64); + uint64_t inc = 1; + inc <<= signal_; + ssize_t sz = write (fd, &inc, sizeof (uint64_t)); + errno_assert (sz == sizeof (uint64_t)); +} + +uint64_t zmq::signaler_t::poll () +{ + // Set to blocking mode. + int flags = fcntl (fd, F_GETFL, 0); + if (flags == -1) + flags = 0; + int rc = fcntl (fd, F_SETFL, flags & ~O_NONBLOCK); + errno_assert (rc != -1); + + uint64_t signals; + ssize_t sz; + while (true) { + sz = read (fd, &signals, sizeof (uint64_t)); + if (sz == -1) { + if (errno == EAGAIN || errno == EINTR) + continue; + errno_assert (false); + } + break; + } + + // Set to non-blocking mode. + rc = fcntl (fd, F_SETFL, flags | O_NONBLOCK); + errno_assert (rc != -1); + + return signals; +} + +uint64_t zmq::signaler_t::check () +{ + uint64_t signals; + ssize_t sz = read (fd, &signals, sizeof (uint64_t)); + if (sz == -1 && (errno == EAGAIN || errno == EINTR)) + return 0; + errno_assert (sz != -1); + return signals; +} + +zmq::fd_t zmq::signaler_t::get_fd () +{ + return fd; +} + +#elif defined ZMQ_HAVE_WINDOWS + +zmq::signaler_t::signaler_t () +{ + // Windows have no 'socketpair' function. CreatePipe is no good as pipe + // handles cannot be polled on. Here we create the socketpair by hand. + + struct sockaddr_in addr; + SOCKET listener; + int addrlen = sizeof (addr); + + w = INVALID_SOCKET; + r = INVALID_SOCKET; + + fd_t rcs = (listener = socket (AF_INET, SOCK_STREAM, 0)); + wsa_assert (rcs != INVALID_SOCKET); + + memset (&addr, 0, sizeof (addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); + addr.sin_port = 0; + + int rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr)); + wsa_assert (rc != SOCKET_ERROR); + + rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen); + wsa_assert (rc != SOCKET_ERROR); + + // Listen for incomming connections. + rc = listen (listener, 1); + wsa_assert (rc != SOCKET_ERROR); + + // Create the socket. + w = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); + wsa_assert (w != INVALID_SOCKET); + + // Connect to the remote peer. + rc = connect (w, (sockaddr *) &addr, sizeof (addr)); + wsa_assert (rc != SOCKET_ERROR); + + // Accept connection from w. + r = accept (listener, NULL, NULL); + wsa_assert (r != INVALID_SOCKET); + + // Set the read site of the pair to non-blocking mode. + unsigned long argp = 1; + rc = ioctlsocket (r, FIONBIO, &argp); + wsa_assert (rc != SOCKET_ERROR); + + // We don't need the listening socket anymore. Close it. + rc = closesocket (listener); + wsa_assert (rc != SOCKET_ERROR); +} + +zmq::signaler_t::~signaler_t () +{ + int rc = closesocket (w); + wsa_assert (rc != SOCKET_ERROR); + + rc = closesocket (r); + wsa_assert (rc != SOCKET_ERROR); +} + +void zmq::signaler_t::signal (int signal_) +{ + // TODO: Note that send is a blocking operation. + // How should we behave if the signal cannot be written to the signaler? + + zmq_assert (signal_ >= 0 && signal_ < 64); + char c = (char) signal_; + int rc = send (w, &c, 1, 0); + win_assert (rc != SOCKET_ERROR); +} + +uint64_t zmq::signaler_t::poll () +{ + // Switch to blocking mode. + unsigned long argp = 0; + int rc = ioctlsocket (r, FIONBIO, &argp); + wsa_assert (rc != SOCKET_ERROR); + + // Get the signals. Given that we are in the blocking mode now, + // there should be at least a single signal returned. + uint64_t signals = check (); + zmq_assert (signals); + + // Switch back to non-blocking mode. + argp = 1; + rc = ioctlsocket (r, FIONBIO, &argp); + wsa_assert (rc != SOCKET_ERROR); + + return signals; +} + +uint64_t zmq::signaler_t::check () +{ + unsigned char buffer [32]; + int nbytes = recv (r, (char*) buffer, 32, 0); + if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) + return 0; + wsa_assert (nbytes != -1); + + uint64_t signals = 0; + for (int pos = 0; pos != nbytes; pos++) { + zmq_assert (buffer [pos] < 64); + signals |= (uint64_t (1) << (buffer [pos])); + } + return signals; +} + +zmq::fd_t zmq::signaler_t::get_fd () +{ + return r; +} + +#elif defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX + +#include +#include + +zmq::signaler_t::signaler_t () +{ + int sv [2]; + int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); + errno_assert (rc == 0); + w = sv [0]; + r = sv [1]; + + // Set the reader to non-blocking mode. + int flags = fcntl (r, F_GETFL, 0); + if (flags == -1) + flags = 0; + rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); + errno_assert (rc != -1); +} + +zmq::signaler_t::~signaler_t () +{ + close (w); + close (r); +} + +void zmq::signaler_t::signal (int signal_) +{ + zmq_assert (signal_ >= 0 && signal_ < 64); + unsigned char c = (unsigned char) signal_; + ssize_t nbytes = send (w, &c, 1, 0); + errno_assert (nbytes == 1); +} + +uint64_t zmq::signaler_t::poll () +{ + // Set the reader to blocking mode. + int flags = fcntl (r, F_GETFL, 0); + if (flags == -1) + flags = 0; + int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); + errno_assert (rc != -1); + + // Poll for events. + uint64_t signals = check (); + + // Set the reader to non-blocking mode. + flags = fcntl (r, F_GETFL, 0); + if (flags == -1) + flags = 0; + rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); + errno_assert (rc != -1); + + return signals; +} + +uint64_t zmq::signaler_t::check () +{ + unsigned char buffer [64]; + ssize_t nbytes = recv (r, buffer, 64, 0); + if (nbytes == -1 && errno == EAGAIN) + return 0; + zmq_assert (nbytes != -1); + + uint64_t signals = 0; + for (int pos = 0; pos != nbytes; pos ++) { + zmq_assert (buffer [pos] < 64); + signals |= (uint64_t (1) << (buffer [pos])); + } + return signals; +} + +zmq::fd_t zmq::signaler_t::get_fd () +{ + return r; +} + +#else + +#include +#include + +zmq::signaler_t::signaler_t () +{ + int sv [2]; + int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); + errno_assert (rc == 0); + w = sv [0]; + r = sv [1]; +} + +zmq::signaler_t::~signaler_t () +{ + close (w); + close (r); +} + +void zmq::signaler_t::signal (int signal_) +{ + // TODO: Note that send is a blocking operation. + // How should we behave if the signal cannot be written to the signaler? + + zmq_assert (signal_ >= 0 && signal_ < 64); + unsigned char c = (unsigned char) signal_; + ssize_t nbytes = send (w, &c, 1, 0); + errno_assert (nbytes == 1); +} + +uint64_t zmq::signaler_t::poll () +{ + unsigned char buffer [64]; + ssize_t nbytes = recv (r, buffer, 64, 0); + zmq_assert (nbytes != -1); + + uint64_t signals = 0; + for (int pos = 0; pos != nbytes; pos ++) { + zmq_assert (buffer [pos] < 64); + signals |= (uint64_t (1) << (buffer [pos])); + } + return signals; +} + +uint64_t zmq::signaler_t::check () +{ + unsigned char buffer [64]; + ssize_t nbytes = recv (r, buffer, 64, MSG_DONTWAIT); + if (nbytes == -1 && errno == EAGAIN) + return 0; + zmq_assert (nbytes != -1); + + uint64_t signals = 0; + for (int pos = 0; pos != nbytes; pos ++) { + zmq_assert (buffer [pos] < 64); + signals |= (uint64_t (1) << (buffer [pos])); + } + return signals; +} + +zmq::fd_t zmq::signaler_t::get_fd () +{ + return r; +} + +#endif + +#if defined ZMQ_HAVE_OPENVMS + +int zmq::signaler_t::socketpair (int domain_, int type_, int protocol_, + int sv_ [2]) +{ + int listener; + sockaddr_in lcladdr; + socklen_t lcladdr_len; + int rc; + int on = 1; + + zmq_assert (type_ == SOCK_STREAM); + + // Fill in the localhost address (127.0.0.1). + memset (&lcladdr, 0, sizeof (lcladdr)); + lcladdr.sin_family = AF_INET; + lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); + lcladdr.sin_port = 0; + + listener = socket (AF_INET, SOCK_STREAM, 0); + errno_assert (listener != -1); + + rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); + errno_assert (rc != -1); + + rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); + errno_assert (rc != -1); + + rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); + errno_assert (rc != -1); + + lcladdr_len = sizeof (lcladdr); + + rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len); + errno_assert (rc != -1); + + rc = listen (listener, 1); + errno_assert (rc != -1); + + sv_ [0] = socket (AF_INET, SOCK_STREAM, 0); + errno_assert (rc != -1); + + rc = setsockopt (sv_ [0], IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); + errno_assert (rc != -1); + + rc = setsockopt (sv_ [0], IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); + errno_assert (rc != -1); + + rc = connect (sv_ [0], (struct sockaddr*) &lcladdr, sizeof (lcladdr)); + errno_assert (rc != -1); + + sv_ [1] = accept (listener, NULL, NULL); + errno_assert (sv_ [1] != -1); + + close (listener); + + return 0; +} + +#endif + diff --git a/src/signaler.hpp b/src/signaler.hpp new file mode 100644 index 0000000..29f2ff6 --- /dev/null +++ b/src/signaler.hpp @@ -0,0 +1,80 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_SIGNALER_HPP_INCLUDED__ +#define __ZMQ_SIGNALER_HPP_INCLUDED__ + +#include "platform.hpp" +#include "fd.hpp" +#include "stdint.hpp" + +namespace zmq +{ + + // This object can be used to send individual signals from one thread to + // another. The specific of this pipe is that it has associated file + // descriptor and so it can be polled on. Same signal cannot be sent twice + // unless signals are retrieved by the reader side in the meantime. + + class signaler_t + { + public: + + signaler_t (); + ~signaler_t (); + + // i_signaler interface implementation. + void signal (int signal_); + uint64_t poll (); + uint64_t check (); + fd_t get_fd (); + + private: + +#if defined ZMQ_HAVE_OPENVMS + + // Whilst OpenVMS supports socketpair - it maps to AF_INET only. + // Further, it does not set the socket options TCP_NODELAY and + // TCP_NODELACK which can lead to performance problems. We'll + // overload the socketpair function for this class. + // + // The bug will be fixed in V5.6 ECO4 and beyond. In the + // meantime, we'll create the socket pair manually. + static int socketpair (int domain_, int type_, int protocol_, + int sv_ [2]); + +#endif + +#if defined ZMQ_HAVE_EVENTFD + // Eventfd descriptor. + fd_t fd; +#else + // Write & read end of the socketpair. + fd_t w; + fd_t r; +#endif + + // Disable copying of fd_signeler object. + signaler_t (const signaler_t&); + void operator = (const signaler_t&); + }; + +} + +#endif -- cgit v1.2.3