From 9da84a5239e5356e34d872c2b5af1d19b9c7eb4f Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 5 Nov 2010 17:39:51 +0100 Subject: signaler renamed to mailbox For historical reasons queue to transfer commands between threads was called 'signaler'. Given that it was used to pass commands rather than signals it was renamed to 'mailbox', see Erlang mailboxes. Signed-off-by: Martin Sustrik --- src/Makefile.am | 4 +- src/ctx.cpp | 14 +- src/ctx.hpp | 6 +- src/io_thread.cpp | 12 +- src/io_thread.hpp | 15 +-- src/mailbox.cpp | 378 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/mailbox.hpp | 61 +++++++++ src/own.cpp | 2 +- src/signaler.cpp | 366 -------------------------------------------------- src/signaler.hpp | 61 --------- src/socket_base.cpp | 12 +- src/socket_base.hpp | 10 +- 12 files changed, 476 insertions(+), 465 deletions(-) create mode 100644 src/mailbox.cpp create mode 100644 src/mailbox.hpp delete mode 100644 src/signaler.cpp delete mode 100644 src/signaler.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index 5cd4f73..fbd36c5 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -87,6 +87,7 @@ libzmq_la_SOURCES = \ kqueue.hpp \ lb.hpp \ likely.hpp \ + mailbox.hpp \ msg_content.hpp \ mutex.hpp \ named_session.hpp \ @@ -111,7 +112,6 @@ libzmq_la_SOURCES = \ select.hpp \ semaphore.hpp \ session.hpp \ - signaler.hpp \ socket_base.hpp \ stdint.hpp \ streamer.hpp \ @@ -150,6 +150,7 @@ libzmq_la_SOURCES = \ ip.cpp \ kqueue.cpp \ lb.cpp \ + mailbox.cpp \ named_session.cpp \ object.cpp \ options.cpp \ @@ -169,7 +170,6 @@ libzmq_la_SOURCES = \ req.cpp \ select.cpp \ session.cpp \ - signaler.cpp \ socket_base.cpp \ streamer.cpp \ sub.cpp \ diff --git a/src/ctx.cpp b/src/ctx.cpp index b4c27fd..59ba2db 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -49,9 +49,9 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : HIBYTE (wsa_data.wVersion) == 2); #endif - // Initialise the array of signalers. + // Initialise the array of mailboxes. slot_count = max_sockets + io_threads_; - slots = (signaler_t**) malloc (sizeof (signaler_t*) * slot_count); + slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); zmq_assert (slots); // Create I/O thread objects and launch them. @@ -59,7 +59,7 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); zmq_assert (io_thread); io_threads.push_back (io_thread); - slots [i] = io_thread->get_signaler (); + slots [i] = io_thread->get_mailbox (); io_thread->start (); } @@ -92,8 +92,8 @@ zmq::ctx_t::~ctx_t () for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) delete io_threads [i]; - // Deallocate the array of slots. No special work is - // needed as signalers themselves were deallocated with their + // Deallocate the array of mailboxes. No special work is + // needed as mailboxes themselves were deallocated with their // corresponding io_thread/socket objects. free (slots); @@ -178,7 +178,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) uint32_t slot = empty_slots.back (); empty_slots.pop_back (); - // Create the socket and register its signaler. + // Create the socket and register its mailbox. socket_base_t *s = socket_base_t::create (type_, this, slot); if (!s) { empty_slots.push_back (slot); @@ -186,7 +186,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) return NULL; } sockets.push_back (s); - slots [slot] = s->get_signaler (); + slots [slot] = s->get_mailbox (); slot_sync.unlock (); diff --git a/src/ctx.hpp b/src/ctx.hpp index 5a3a6aa..0f2dd52 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -26,7 +26,7 @@ #include "../include/zmq.h" -#include "signaler.hpp" +#include "mailbox.hpp" #include "semaphore.hpp" #include "ypipe.hpp" #include "array.hpp" @@ -117,9 +117,9 @@ namespace zmq typedef std::vector io_threads_t; io_threads_t io_threads; - // Array of pointers to signalers for both application and I/O threads. + // Array of pointers to mailboxes for both application and I/O threads. uint32_t slot_count; - signaler_t **slots; + mailbox_t **slots; // List of inproc endpoints within this context. typedef std::map endpoints_t; diff --git a/src/io_thread.cpp b/src/io_thread.cpp index aacf843..7ba8905 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -32,8 +32,8 @@ zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) : poller = new (std::nothrow) poller_t; zmq_assert (poller); - signaler_handle = poller->add_fd (signaler.get_fd (), this); - poller->set_pollin (signaler_handle); + mailbox_handle = poller->add_fd (mailbox.get_fd (), this); + poller->set_pollin (mailbox_handle); } zmq::io_thread_t::~io_thread_t () @@ -52,9 +52,9 @@ void zmq::io_thread_t::stop () send_stop (); } -zmq::signaler_t *zmq::io_thread_t::get_signaler () +zmq::mailbox_t *zmq::io_thread_t::get_mailbox () { - return &signaler; + return &mailbox; } int zmq::io_thread_t::get_load () @@ -71,7 +71,7 @@ void zmq::io_thread_t::in_event () // Get the next command. If there is none, exit. command_t cmd; - int rc = signaler.recv (&cmd, false); + int rc = mailbox.recv (&cmd, false); if (rc != 0 && errno == EINTR) continue; if (rc != 0 && errno == EAGAIN) @@ -103,6 +103,6 @@ zmq::poller_t *zmq::io_thread_t::get_poller () void zmq::io_thread_t::process_stop () { - poller->rm_fd (signaler_handle); + poller->rm_fd (mailbox_handle); poller->stop (); } diff --git a/src/io_thread.hpp b/src/io_thread.hpp index a0704fc..b01eecb 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -26,7 +26,7 @@ #include "object.hpp" #include "poller.hpp" #include "i_poll_events.hpp" -#include "signaler.hpp" +#include "mailbox.hpp" namespace zmq { @@ -50,8 +50,8 @@ namespace zmq // Ask underlying thread to stop. void stop (); - // Returns signaler associated with this I/O thread. - signaler_t *get_signaler (); + // Returns mailbox associated with this I/O thread. + mailbox_t *get_mailbox (); // i_poll_events implementation. void in_event (); @@ -69,12 +69,11 @@ namespace zmq private: - // Poll thread gets notifications about incoming commands using - // this signaler. - signaler_t signaler; + // I/O thread accesses incoming commands via this mailbox. + mailbox_t mailbox; - // Handle associated with signaler's file descriptor. - poller_t::handle_t signaler_handle; + // Handle associated with mailbox' file descriptor. + poller_t::handle_t mailbox_handle; // I/O multiplexing is performed using a poller object. poller_t *poller; diff --git a/src/mailbox.cpp b/src/mailbox.cpp new file mode 100644 index 0000000..927c230 --- /dev/null +++ b/src/mailbox.cpp @@ -0,0 +1,378 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "mailbox.hpp" +#include "platform.hpp" +#include "err.hpp" +#include "fd.hpp" +#include "ip.hpp" + +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#include +#include +#include +#include +#include +#include +#endif + +zmq::fd_t zmq::mailbox_t::get_fd () +{ + return r; +} + +#if defined ZMQ_HAVE_WINDOWS + +zmq::mailbox_t::mailbox_t () +{ + // Create the socketpair for signalling. + int rc = make_socketpair (&r, &w); + errno_assert (rc == 0); + + // Set the writer to non-blocking mode. + unsigned long argp = 1; + rc = ioctlsocket (w, FIONBIO, &argp); + wsa_assert (rc != SOCKET_ERROR); + + // Set the reader to non-blocking mode. + argp = 1; + rc = ioctlsocket (r, FIONBIO, &argp); + wsa_assert (rc != SOCKET_ERROR); +} + +zmq::mailbox_t::~mailbox_t () +{ + int rc = closesocket (w); + wsa_assert (rc != SOCKET_ERROR); + + rc = closesocket (r); + wsa_assert (rc != SOCKET_ERROR); +} + +void zmq::mailbox_t::send (const command_t &cmd_) +{ + // TODO: Implement SNDBUF auto-resizing as for POSIX platforms. + // In the mean time, the following code with assert if the send() + // call would block. + int nbytes = ::send (w, (char *)&cmd_, sizeof (command_t), 0); + wsa_assert (nbytes != SOCKET_ERROR); + zmq_assert (nbytes == sizeof (command_t)); +} + +int zmq::mailbox_t::recv (command_t *cmd_, bool block_) +{ + if (block_) { + // Set the reader to blocking mode. + unsigned long argp = 0; + int rc = ioctlsocket (r, FIONBIO, &argp); + wsa_assert (rc != SOCKET_ERROR); + } + // Attempt to read an entire command. Returns EAGAIN if non-blocking + // and a command is not available. + int err = 0; + int nbytes = ::recv (r, (char *)cmd_, sizeof (command_t), 0); + if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) { + // Save value of errno if we wish to pass it to caller. + err = EAGAIN; + } + if (block_) { + // Re-set the reader to non-blocking mode. + unsigned long argp = 1; + int rc = ioctlsocket (r, FIONBIO, &argp); + wsa_assert (rc != SOCKET_ERROR); + } + // If the recv failed, return with the saved errno if set. + if (err != 0) { + errno = err; + return -1; + } + // Sanity check for success. + wsa_assert (nbytes != SOCKET_ERROR); + + // Check whether we haven't got half of command. + zmq_assert (nbytes == sizeof (command_t)); + return 0; +} + +#else // !ZMQ_HAVE_WINDOWS + +zmq::mailbox_t::mailbox_t () +{ +#ifdef PIPE_BUF + // Make sure that command can be written to the socket in atomic fashion. + // If this wasn't guaranteed, commands from different threads would be + // interleaved. + zmq_assert (sizeof (command_t) <= PIPE_BUF); +#endif + + // Create the socketpair for signalling. + int rc = make_socketpair (&r, &w); + errno_assert (rc == 0); + + // Set the writer to non-blocking mode. + int flags = fcntl (w, F_GETFL, 0); + errno_assert (flags >= 0); + rc = fcntl (w, F_SETFL, flags | O_NONBLOCK); + errno_assert (rc == 0); + +#ifndef MSG_DONTWAIT + // Set the reader to non-blocking mode. + flags = fcntl (r, F_GETFL, 0); + errno_assert (flags >= 0); + rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); + errno_assert (rc == 0); +#endif +} + +zmq::mailbox_t::~mailbox_t () +{ + close (w); + close (r); +} + +void zmq::mailbox_t::send (const command_t &cmd_) +{ + // Attempt to write an entire command without blocking. + ssize_t nbytes; + do { + nbytes = ::send (w, &cmd_, sizeof (command_t), 0); + } while (nbytes == -1 && errno == EINTR); + + // Attempt to increase mailbox SNDBUF if the send failed. + if (nbytes == -1 && errno == EAGAIN) { + int old_sndbuf, new_sndbuf; + socklen_t sndbuf_size = sizeof old_sndbuf; + + // Retrieve current send buffer size. + int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf, + &sndbuf_size); + errno_assert (rc == 0); + new_sndbuf = old_sndbuf * 2; + + // Double the new send buffer size. + rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size); + errno_assert (rc == 0); + + // Verify that the OS actually honored the request. + rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size); + errno_assert (rc == 0); + zmq_assert (new_sndbuf > old_sndbuf); + + // Retry the sending operation; at this point it must succeed. + do { + nbytes = ::send (w, &cmd_, sizeof (command_t), 0); + } while (nbytes == -1 && errno == EINTR); + } + errno_assert (nbytes != -1); + + // This should never happen as we've already checked that command size is + // less than PIPE_BUF. + zmq_assert (nbytes == sizeof (command_t)); +} + +int zmq::mailbox_t::recv (command_t *cmd_, bool block_) +{ +#ifdef MSG_DONTWAIT + // Attempt to read an entire command. Returns EAGAIN if non-blocking + // mode is requested and a command is not available. + ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), + block_ ? 0 : MSG_DONTWAIT); + if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) + return -1; +#else + if (block_) { + + // Set the reader to blocking mode. + int flags = fcntl (r, F_GETFL, 0); + errno_assert (flags >= 0); + int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); + errno_assert (rc == 0); + } + + // Attempt to read an entire command. Returns EAGAIN if non-blocking + // and a command is not available. + int err = 0; + ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0); + if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) { + + // Save value of errno if we wish to pass it to caller. + err = errno; + } + + if (block_) { + + // Re-set the reader to non-blocking mode. + int flags = fcntl (r, F_GETFL, 0); + errno_assert (flags >= 0); + int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); + errno_assert (rc == 0); + } + + // If the recv failed, return with the saved errno if set. + if (err != 0) { + errno = err; + return -1; + } +#endif + + // Sanity check for success. + errno_assert (nbytes != -1); + + // Check whether we haven't got half of command. + zmq_assert (nbytes == sizeof (command_t)); + + return 0; +} + +#endif + +int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_) +{ +#if defined ZMQ_HAVE_WINDOWS + + // Windows has no 'socketpair' function. CreatePipe is no good as pipe + // handles cannot be polled on. Here we create the socketpair by hand. + *w_ = INVALID_SOCKET; + *r_ = INVALID_SOCKET; + + // Create listening socket. + SOCKET listener; + listener = socket (AF_INET, SOCK_STREAM, 0); + wsa_assert (listener != INVALID_SOCKET); + + // Set SO_REUSEADDR and TCP_NODELAY on listening socket. + BOOL so_reuseaddr = 1; + int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR, + (char *)&so_reuseaddr, sizeof (so_reuseaddr)); + wsa_assert (rc != SOCKET_ERROR); + BOOL tcp_nodelay = 1; + rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, + (char *)&tcp_nodelay, sizeof (tcp_nodelay)); + wsa_assert (rc != SOCKET_ERROR); + + // Bind listening socket to any free local port. + struct sockaddr_in addr; + memset (&addr, 0, sizeof (addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); + addr.sin_port = 0; + rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr)); + wsa_assert (rc != SOCKET_ERROR); + + // Retrieve local port listener is bound to (into addr). + int addrlen = sizeof (addr); + rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen); + wsa_assert (rc != SOCKET_ERROR); + + // Listen for incomming connections. + rc = listen (listener, 1); + wsa_assert (rc != SOCKET_ERROR); + + // Create the writer socket. + *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); + wsa_assert (*w_ != INVALID_SOCKET); + + // Set TCP_NODELAY on writer socket. + rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, + (char *)&tcp_nodelay, sizeof (tcp_nodelay)); + wsa_assert (rc != SOCKET_ERROR); + + // Connect writer to the listener. + rc = connect (*w_, (sockaddr *) &addr, sizeof (addr)); + wsa_assert (rc != SOCKET_ERROR); + + // Accept connection from writer. + *r_ = accept (listener, NULL, NULL); + wsa_assert (*r_ != INVALID_SOCKET); + + // We don't need the listening socket anymore. Close it. + rc = closesocket (listener); + wsa_assert (rc != SOCKET_ERROR); + + return 0; + +#elif defined ZMQ_HAVE_OPENVMS + + // Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further, + // it does not set the socket options TCP_NODELAY and TCP_NODELACK which + // can lead to performance problems. + // + // The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll + // create the socket pair manually. + sockaddr_in lcladdr; + memset (&lcladdr, 0, sizeof (lcladdr)); + lcladdr.sin_family = AF_INET; + lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); + lcladdr.sin_port = 0; + + int listener = socket (AF_INET, SOCK_STREAM, 0); + errno_assert (listener != -1); + + int on = 1; + int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); + errno_assert (rc != -1); + + rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); + errno_assert (rc != -1); + + rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); + errno_assert (rc != -1); + + socklen_t lcladdr_len = sizeof (lcladdr); + + rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len); + errno_assert (rc != -1); + + rc = listen (listener, 1); + errno_assert (rc != -1); + + *w_ = socket (AF_INET, SOCK_STREAM, 0); + errno_assert (*w_ != -1); + + rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); + errno_assert (rc != -1); + + rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); + errno_assert (rc != -1); + + rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); + errno_assert (rc != -1); + + *r_ = accept (listener, NULL, NULL); + errno_assert (*r_ != -1); + + close (listener); + + return 0; + +#else // All other implementations support socketpair() + + int sv [2]; + int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); + errno_assert (rc == 0); + *w_ = sv [0]; + *r_ = sv [1]; + return 0; + +#endif +} + diff --git a/src/mailbox.hpp b/src/mailbox.hpp new file mode 100644 index 0000000..dc49aad --- /dev/null +++ b/src/mailbox.hpp @@ -0,0 +1,61 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_MAILBOX_HPP_INCLUDED__ +#define __ZMQ_MAILBOX_HPP_INCLUDED__ + +#include + +#include "platform.hpp" +#include "fd.hpp" +#include "stdint.hpp" +#include "config.hpp" +#include "command.hpp" + +namespace zmq +{ + + class mailbox_t + { + public: + + mailbox_t (); + ~mailbox_t (); + + fd_t get_fd (); + void send (const command_t &cmd_); + int recv (command_t *cmd_, bool block_); + + private: + + // Write & read end of the socketpair. + fd_t w; + fd_t r; + + // Platform-dependent function to create a socketpair. + static int make_socketpair (fd_t *r_, fd_t *w_); + + // Disable copying of mailbox_t object. + mailbox_t (const mailbox_t&); + void operator = (const mailbox_t&); + }; + +} + +#endif diff --git a/src/own.cpp b/src/own.cpp index 955113a..15d2567 100644 --- a/src/own.cpp +++ b/src/own.cpp @@ -83,7 +83,7 @@ void zmq::own_t::launch_sibling (own_t *object_) { // At this point it is important that object is plugged in before its // owner has a chance to terminate it. Thus, 'plug' command is sent before - // the 'own' command. Given that the signaler preserves ordering of + // the 'own' command. Given that the mailbox preserves ordering of // commands, 'term' command from the owner cannot make it to the object // before the already written 'plug' command. diff --git a/src/signaler.cpp b/src/signaler.cpp deleted file mode 100644 index a692078..0000000 --- a/src/signaler.cpp +++ /dev/null @@ -1,366 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "signaler.hpp" -#include "platform.hpp" -#include "err.hpp" -#include "fd.hpp" -#include "ip.hpp" - -#if defined ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#else -#include -#include -#include -#include -#include -#include -#include -#endif - -zmq::fd_t zmq::signaler_t::get_fd () -{ - return r; -} - -#if defined ZMQ_HAVE_WINDOWS - -zmq::signaler_t::signaler_t () -{ - // Create the socketpair for signalling. - int rc = make_socketpair (&r, &w); - errno_assert (rc == 0); - - // Set the writer to non-blocking mode. - unsigned long argp = 1; - rc = ioctlsocket (w, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); - - // Set the reader to non-blocking mode. - argp = 1; - rc = ioctlsocket (r, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); -} - -zmq::signaler_t::~signaler_t () -{ - int rc = closesocket (w); - wsa_assert (rc != SOCKET_ERROR); - - rc = closesocket (r); - wsa_assert (rc != SOCKET_ERROR); -} - -void zmq::signaler_t::send (const command_t &cmd_) -{ - // TODO: Implement SNDBUF auto-resizing as for POSIX platforms. - // In the mean time, the following code with assert if the send() - // call would block. - int nbytes = ::send (w, (char *)&cmd_, sizeof (command_t), 0); - wsa_assert (nbytes != SOCKET_ERROR); - zmq_assert (nbytes == sizeof (command_t)); -} - -int zmq::signaler_t::recv (command_t *cmd_, bool block_) -{ - if (block_) { - // Set the reader to blocking mode. - unsigned long argp = 0; - int rc = ioctlsocket (r, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); - } - // Attempt to read an entire command. Returns EAGAIN if non-blocking - // and a command is not available. - int err = 0; - int nbytes = ::recv (r, (char *)cmd_, sizeof (command_t), 0); - if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) { - // Save value of errno if we wish to pass it to caller. - err = EAGAIN; - } - if (block_) { - // Re-set the reader to non-blocking mode. - unsigned long argp = 1; - int rc = ioctlsocket (r, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); - } - // If the recv failed, return with the saved errno if set. - if (err != 0) { - errno = err; - return -1; - } - // Sanity check for success. - wsa_assert (nbytes != SOCKET_ERROR); - - // Check whether we haven't got half of command. - zmq_assert (nbytes == sizeof (command_t)); - return 0; -} - -#else // !ZMQ_HAVE_WINDOWS - -zmq::signaler_t::signaler_t () -{ -#ifdef PIPE_BUF - // Make sure that command can be written to the socket in atomic fashion. - // If this wasn't guaranteed, commands from different threads would be - // interleaved. - zmq_assert (sizeof (command_t) <= PIPE_BUF); -#endif - - // Create the socketpair for signalling. - int rc = make_socketpair (&r, &w); - errno_assert (rc == 0); - - // Set the writer to non-blocking mode. - int flags = fcntl (w, F_GETFL, 0); - errno_assert (flags >= 0); - rc = fcntl (w, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc == 0); - -#ifndef MSG_DONTWAIT - // Set the reader to non-blocking mode. - flags = fcntl (r, F_GETFL, 0); - errno_assert (flags >= 0); - rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc == 0); -#endif -} - -zmq::signaler_t::~signaler_t () -{ - close (w); - close (r); -} - -void zmq::signaler_t::send (const command_t &cmd_) -{ - // Attempt to write an entire command without blocking. - ssize_t nbytes; - do { - nbytes = ::send (w, &cmd_, sizeof (command_t), 0); - } while (nbytes == -1 && errno == EINTR); - // Attempt to increase signaler SNDBUF if the send failed. - if (nbytes == -1 && errno == EAGAIN) { - int old_sndbuf, new_sndbuf; - socklen_t sndbuf_size = sizeof old_sndbuf; - // Retrieve current send buffer size. - int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf, - &sndbuf_size); - errno_assert (rc == 0); - new_sndbuf = old_sndbuf * 2; - // Double the new send buffer size. - rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size); - errno_assert (rc == 0); - // Verify that the OS actually honored the request. - rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size); - errno_assert (rc == 0); - zmq_assert (new_sndbuf > old_sndbuf); - // Retry the sending operation; at this point it must succeed. - do { - nbytes = ::send (w, &cmd_, sizeof (command_t), 0); - } while (nbytes == -1 && errno == EINTR); - } - errno_assert (nbytes != -1); - - // This should never happen as we've already checked that command size is - // less than PIPE_BUF. - zmq_assert (nbytes == sizeof (command_t)); -} - -int zmq::signaler_t::recv (command_t *cmd_, bool block_) -{ -#ifdef MSG_DONTWAIT - // Attempt to read an entire command. Returns EAGAIN if non-blocking - // mode is requested and a command is not available. - ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), - block_ ? 0 : MSG_DONTWAIT); - if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) - return -1; -#else - if (block_) { - // Set the reader to blocking mode. - int flags = fcntl (r, F_GETFL, 0); - errno_assert (flags >= 0); - int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); - errno_assert (rc == 0); - } - // Attempt to read an entire command. Returns EAGAIN if non-blocking - // and a command is not available. - int err = 0; - ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0); - if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) { - // Save value of errno if we wish to pass it to caller. - err = errno; - } - if (block_) { - // Re-set the reader to non-blocking mode. - int flags = fcntl (r, F_GETFL, 0); - errno_assert (flags >= 0); - int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc == 0); - } - // If the recv failed, return with the saved errno if set. - if (err != 0) { - errno = err; - return -1; - } -#endif - // Sanity check for success. - errno_assert (nbytes != -1); - - // Check whether we haven't got half of command. - zmq_assert (nbytes == sizeof (command_t)); - - return 0; -} - -#endif - -int zmq::signaler_t::make_socketpair (fd_t *r_, fd_t *w_) -{ -#if defined ZMQ_HAVE_WINDOWS - - // Windows has no 'socketpair' function. CreatePipe is no good as pipe - // handles cannot be polled on. Here we create the socketpair by hand. - *w_ = INVALID_SOCKET; - *r_ = INVALID_SOCKET; - - // Create listening socket. - SOCKET listener; - listener = socket (AF_INET, SOCK_STREAM, 0); - wsa_assert (listener != INVALID_SOCKET); - - // Set SO_REUSEADDR and TCP_NODELAY on listening socket. - BOOL so_reuseaddr = 1; - int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR, - (char *)&so_reuseaddr, sizeof (so_reuseaddr)); - wsa_assert (rc != SOCKET_ERROR); - BOOL tcp_nodelay = 1; - rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, - (char *)&tcp_nodelay, sizeof (tcp_nodelay)); - wsa_assert (rc != SOCKET_ERROR); - - // Bind listening socket to any free local port. - struct sockaddr_in addr; - memset (&addr, 0, sizeof (addr)); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); - addr.sin_port = 0; - rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr)); - wsa_assert (rc != SOCKET_ERROR); - - // Retrieve local port listener is bound to (into addr). - int addrlen = sizeof (addr); - rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen); - wsa_assert (rc != SOCKET_ERROR); - - // Listen for incomming connections. - rc = listen (listener, 1); - wsa_assert (rc != SOCKET_ERROR); - - // Create the writer socket. - *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); - wsa_assert (*w_ != INVALID_SOCKET); - - // Set TCP_NODELAY on writer socket. - rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, - (char *)&tcp_nodelay, sizeof (tcp_nodelay)); - wsa_assert (rc != SOCKET_ERROR); - - // Connect writer to the listener. - rc = connect (*w_, (sockaddr *) &addr, sizeof (addr)); - wsa_assert (rc != SOCKET_ERROR); - - // Accept connection from writer. - *r_ = accept (listener, NULL, NULL); - wsa_assert (*r_ != INVALID_SOCKET); - - // We don't need the listening socket anymore. Close it. - rc = closesocket (listener); - wsa_assert (rc != SOCKET_ERROR); - - return 0; - -#elif defined ZMQ_HAVE_OPENVMS - - // Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further, - // it does not set the socket options TCP_NODELAY and TCP_NODELACK which - // can lead to performance problems. - // - // The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll - // create the socket pair manually. - sockaddr_in lcladdr; - memset (&lcladdr, 0, sizeof (lcladdr)); - lcladdr.sin_family = AF_INET; - lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); - lcladdr.sin_port = 0; - - int listener = socket (AF_INET, SOCK_STREAM, 0); - errno_assert (listener != -1); - - int on = 1; - int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); - errno_assert (rc != -1); - - rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); - errno_assert (rc != -1); - - rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); - errno_assert (rc != -1); - - socklen_t lcladdr_len = sizeof (lcladdr); - - rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len); - errno_assert (rc != -1); - - rc = listen (listener, 1); - errno_assert (rc != -1); - - *w_ = socket (AF_INET, SOCK_STREAM, 0); - errno_assert (*w_ != -1); - - rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); - errno_assert (rc != -1); - - rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); - errno_assert (rc != -1); - - rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); - errno_assert (rc != -1); - - *r_ = accept (listener, NULL, NULL); - errno_assert (*r_ != -1); - - close (listener); - - return 0; - -#else // All other implementations support socketpair() - - int sv [2]; - int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); - errno_assert (rc == 0); - *w_ = sv [0]; - *r_ = sv [1]; - return 0; - -#endif -} - diff --git a/src/signaler.hpp b/src/signaler.hpp deleted file mode 100644 index faf3f1f..0000000 --- a/src/signaler.hpp +++ /dev/null @@ -1,61 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_SIGNALER_HPP_INCLUDED__ -#define __ZMQ_SIGNALER_HPP_INCLUDED__ - -#include - -#include "platform.hpp" -#include "fd.hpp" -#include "stdint.hpp" -#include "config.hpp" -#include "command.hpp" - -namespace zmq -{ - - class signaler_t - { - public: - - signaler_t (); - ~signaler_t (); - - fd_t get_fd (); - void send (const command_t &cmd_); - int recv (command_t *cmd_, bool block_); - - private: - - // Write & read end of the socketpair. - fd_t w; - fd_t r; - - // Platform-dependent function to create a socketpair. - static int make_socketpair (fd_t *r_, fd_t *w_); - - // Disable copying of signaler_t object. - signaler_t (const signaler_t&); - void operator = (const signaler_t&); - }; - -} - -#endif diff --git a/src/socket_base.cpp b/src/socket_base.cpp index c6728fe..a10ed0e 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -118,9 +118,9 @@ zmq::socket_base_t::~socket_base_t () sessions_sync.unlock (); } -zmq::signaler_t *zmq::socket_base_t::get_signaler () +zmq::mailbox_t *zmq::socket_base_t::get_mailbox () { - return &signaler; + return &mailbox; } void zmq::socket_base_t::stop () @@ -227,7 +227,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, errno = EINVAL; return -1; } - *((fd_t*) optval_) = signaler.get_fd (); + *((fd_t*) optval_) = mailbox.get_fd (); *optvallen_ = sizeof (fd_t); return 0; } @@ -613,7 +613,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) int rc; command_t cmd; if (block_) { - rc = signaler.recv (&cmd, true); + rc = mailbox.recv (&cmd, true); if (rc == -1 && errno == EINTR) return -1; errno_assert (rc == 0); @@ -640,7 +640,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) } // Check whether there are any commands pending for this thread. - rc = signaler.recv (&cmd, false); + rc = mailbox.recv (&cmd, false); } // Process all the commands available at the moment. @@ -651,7 +651,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) return -1; errno_assert (rc == 0); cmd.destination->process_command (cmd); - rc = signaler.recv (&cmd, false); + rc = mailbox.recv (&cmd, false); } if (ctx_terminated) { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 0fdfefd..69de24d 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -30,7 +30,7 @@ #include "mutex.hpp" #include "stdint.hpp" #include "atomic_counter.hpp" -#include "signaler.hpp" +#include "mailbox.hpp" #include "stdint.hpp" #include "blob.hpp" #include "own.hpp" @@ -48,8 +48,8 @@ namespace zmq static socket_base_t *create (int type_, class ctx_t *parent_, uint32_t tid_); - // Returns the signaler associated with this socket. - signaler_t *get_signaler (); + // Returns the mailbox associated with this socket. + mailbox_t *get_mailbox (); // Interrupt blocking call if the socket is stuck in one. // This function can be called from a different thread! @@ -148,8 +148,8 @@ namespace zmq const blob_t &peer_identity_); void process_unplug (); - // App thread's signaler object. - signaler_t signaler; + // Socket's mailbox object. + mailbox_t mailbox; // Timestamp of when commands were processed the last time. uint64_t last_tsc; -- cgit v1.2.3