diff options
-rw-r--r-- | MAINTAINERS | 2 | ||||
-rw-r--r-- | builds/msvc/libzmq/libzmq.vcproj | 4 | ||||
-rw-r--r-- | src/Makefile.am | 4 | ||||
-rw-r--r-- | src/ctx.cpp | 14 | ||||
-rw-r--r-- | src/ctx.hpp | 6 | ||||
-rw-r--r-- | src/io_thread.cpp | 12 | ||||
-rw-r--r-- | src/io_thread.hpp | 15 | ||||
-rw-r--r-- | src/mailbox.cpp (renamed from src/signaler.cpp) | 36 | ||||
-rw-r--r-- | src/mailbox.hpp (renamed from src/signaler.hpp) | 16 | ||||
-rw-r--r-- | src/own.cpp | 2 | ||||
-rw-r--r-- | src/socket_base.cpp | 12 | ||||
-rw-r--r-- | src/socket_base.hpp | 10 |
12 files changed, 72 insertions, 61 deletions
diff --git a/MAINTAINERS b/MAINTAINERS index 9d65ad4..8e572eb 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -38,7 +38,7 @@ Component: Devices Maintainer: Martin Sustrik Contact: sustrik@250bpm.com -Component: Generic Infrastructure (context, signaler, command, pipe) +Component: Generic Infrastructure (context, mailbox, command, pipe) Maintainer: Martin Sustrik Contact: sustrik@250bpm.com diff --git a/builds/msvc/libzmq/libzmq.vcproj b/builds/msvc/libzmq/libzmq.vcproj index ffe8a17..456b5ed 100644 --- a/builds/msvc/libzmq/libzmq.vcproj +++ b/builds/msvc/libzmq/libzmq.vcproj @@ -310,7 +310,7 @@ > </File> <File - RelativePath="..\..\..\src\signaler.cpp" + RelativePath="..\..\..\src\mailbox.cpp" > </File> <File @@ -592,7 +592,7 @@ > </File> <File - RelativePath="..\..\..\src\signaler.hpp" + RelativePath="..\..\..\src\mailbox.hpp" > </File> <File diff --git a/src/Makefile.am b/src/Makefile.am index 5cd4f73..fbd36c5 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -87,6 +87,7 @@ libzmq_la_SOURCES = \ kqueue.hpp \ lb.hpp \ likely.hpp \ + mailbox.hpp \ msg_content.hpp \ mutex.hpp \ named_session.hpp \ @@ -111,7 +112,6 @@ libzmq_la_SOURCES = \ select.hpp \ semaphore.hpp \ session.hpp \ - signaler.hpp \ socket_base.hpp \ stdint.hpp \ streamer.hpp \ @@ -150,6 +150,7 @@ libzmq_la_SOURCES = \ ip.cpp \ kqueue.cpp \ lb.cpp \ + mailbox.cpp \ named_session.cpp \ object.cpp \ options.cpp \ @@ -169,7 +170,6 @@ libzmq_la_SOURCES = \ req.cpp \ select.cpp \ session.cpp \ - signaler.cpp \ socket_base.cpp \ streamer.cpp \ sub.cpp \ diff --git a/src/ctx.cpp b/src/ctx.cpp index b4c27fd..59ba2db 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -49,9 +49,9 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : HIBYTE (wsa_data.wVersion) == 2); #endif - // Initialise the array of signalers. + // Initialise the array of mailboxes. slot_count = max_sockets + io_threads_; - slots = (signaler_t**) malloc (sizeof (signaler_t*) * slot_count); + slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); zmq_assert (slots); // Create I/O thread objects and launch them. @@ -59,7 +59,7 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); zmq_assert (io_thread); io_threads.push_back (io_thread); - slots [i] = io_thread->get_signaler (); + slots [i] = io_thread->get_mailbox (); io_thread->start (); } @@ -92,8 +92,8 @@ zmq::ctx_t::~ctx_t () for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) delete io_threads [i]; - // Deallocate the array of slots. No special work is - // needed as signalers themselves were deallocated with their + // Deallocate the array of mailboxes. No special work is + // needed as mailboxes themselves were deallocated with their // corresponding io_thread/socket objects. free (slots); @@ -178,7 +178,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) uint32_t slot = empty_slots.back (); empty_slots.pop_back (); - // Create the socket and register its signaler. + // Create the socket and register its mailbox. socket_base_t *s = socket_base_t::create (type_, this, slot); if (!s) { empty_slots.push_back (slot); @@ -186,7 +186,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) return NULL; } sockets.push_back (s); - slots [slot] = s->get_signaler (); + slots [slot] = s->get_mailbox (); slot_sync.unlock (); diff --git a/src/ctx.hpp b/src/ctx.hpp index 5a3a6aa..0f2dd52 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -26,7 +26,7 @@ #include "../include/zmq.h" -#include "signaler.hpp" +#include "mailbox.hpp" #include "semaphore.hpp" #include "ypipe.hpp" #include "array.hpp" @@ -117,9 +117,9 @@ namespace zmq typedef std::vector <class io_thread_t*> io_threads_t; io_threads_t io_threads; - // Array of pointers to signalers for both application and I/O threads. + // Array of pointers to mailboxes for both application and I/O threads. uint32_t slot_count; - signaler_t **slots; + mailbox_t **slots; // List of inproc endpoints within this context. typedef std::map <std::string, class socket_base_t*> endpoints_t; diff --git a/src/io_thread.cpp b/src/io_thread.cpp index aacf843..7ba8905 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -32,8 +32,8 @@ zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) : poller = new (std::nothrow) poller_t; zmq_assert (poller); - signaler_handle = poller->add_fd (signaler.get_fd (), this); - poller->set_pollin (signaler_handle); + mailbox_handle = poller->add_fd (mailbox.get_fd (), this); + poller->set_pollin (mailbox_handle); } zmq::io_thread_t::~io_thread_t () @@ -52,9 +52,9 @@ void zmq::io_thread_t::stop () send_stop (); } -zmq::signaler_t *zmq::io_thread_t::get_signaler () +zmq::mailbox_t *zmq::io_thread_t::get_mailbox () { - return &signaler; + return &mailbox; } int zmq::io_thread_t::get_load () @@ -71,7 +71,7 @@ void zmq::io_thread_t::in_event () // Get the next command. If there is none, exit. command_t cmd; - int rc = signaler.recv (&cmd, false); + int rc = mailbox.recv (&cmd, false); if (rc != 0 && errno == EINTR) continue; if (rc != 0 && errno == EAGAIN) @@ -103,6 +103,6 @@ zmq::poller_t *zmq::io_thread_t::get_poller () void zmq::io_thread_t::process_stop () { - poller->rm_fd (signaler_handle); + poller->rm_fd (mailbox_handle); poller->stop (); } diff --git a/src/io_thread.hpp b/src/io_thread.hpp index a0704fc..b01eecb 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -26,7 +26,7 @@ #include "object.hpp" #include "poller.hpp" #include "i_poll_events.hpp" -#include "signaler.hpp" +#include "mailbox.hpp" namespace zmq { @@ -50,8 +50,8 @@ namespace zmq // Ask underlying thread to stop. void stop (); - // Returns signaler associated with this I/O thread. - signaler_t *get_signaler (); + // Returns mailbox associated with this I/O thread. + mailbox_t *get_mailbox (); // i_poll_events implementation. void in_event (); @@ -69,12 +69,11 @@ namespace zmq private: - // Poll thread gets notifications about incoming commands using - // this signaler. - signaler_t signaler; + // I/O thread accesses incoming commands via this mailbox. + mailbox_t mailbox; - // Handle associated with signaler's file descriptor. - poller_t::handle_t signaler_handle; + // Handle associated with mailbox' file descriptor. + poller_t::handle_t mailbox_handle; // I/O multiplexing is performed using a poller object. poller_t *poller; diff --git a/src/signaler.cpp b/src/mailbox.cpp index a692078..927c230 100644 --- a/src/signaler.cpp +++ b/src/mailbox.cpp @@ -17,7 +17,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "signaler.hpp" +#include "mailbox.hpp" #include "platform.hpp" #include "err.hpp" #include "fd.hpp" @@ -35,14 +35,14 @@ #include <sys/socket.h> #endif -zmq::fd_t zmq::signaler_t::get_fd () +zmq::fd_t zmq::mailbox_t::get_fd () { return r; } #if defined ZMQ_HAVE_WINDOWS -zmq::signaler_t::signaler_t () +zmq::mailbox_t::mailbox_t () { // Create the socketpair for signalling. int rc = make_socketpair (&r, &w); @@ -59,7 +59,7 @@ zmq::signaler_t::signaler_t () wsa_assert (rc != SOCKET_ERROR); } -zmq::signaler_t::~signaler_t () +zmq::mailbox_t::~mailbox_t () { int rc = closesocket (w); wsa_assert (rc != SOCKET_ERROR); @@ -68,7 +68,7 @@ zmq::signaler_t::~signaler_t () wsa_assert (rc != SOCKET_ERROR); } -void zmq::signaler_t::send (const command_t &cmd_) +void zmq::mailbox_t::send (const command_t &cmd_) { // TODO: Implement SNDBUF auto-resizing as for POSIX platforms. // In the mean time, the following code with assert if the send() @@ -78,7 +78,7 @@ void zmq::signaler_t::send (const command_t &cmd_) zmq_assert (nbytes == sizeof (command_t)); } -int zmq::signaler_t::recv (command_t *cmd_, bool block_) +int zmq::mailbox_t::recv (command_t *cmd_, bool block_) { if (block_) { // Set the reader to blocking mode. @@ -115,7 +115,7 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_) #else // !ZMQ_HAVE_WINDOWS -zmq::signaler_t::signaler_t () +zmq::mailbox_t::mailbox_t () { #ifdef PIPE_BUF // Make sure that command can be written to the socket in atomic fashion. @@ -143,35 +143,40 @@ zmq::signaler_t::signaler_t () #endif } -zmq::signaler_t::~signaler_t () +zmq::mailbox_t::~mailbox_t () { close (w); close (r); } -void zmq::signaler_t::send (const command_t &cmd_) +void zmq::mailbox_t::send (const command_t &cmd_) { // Attempt to write an entire command without blocking. ssize_t nbytes; do { nbytes = ::send (w, &cmd_, sizeof (command_t), 0); } while (nbytes == -1 && errno == EINTR); - // Attempt to increase signaler SNDBUF if the send failed. + + // Attempt to increase mailbox SNDBUF if the send failed. if (nbytes == -1 && errno == EAGAIN) { int old_sndbuf, new_sndbuf; socklen_t sndbuf_size = sizeof old_sndbuf; + // Retrieve current send buffer size. int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf, &sndbuf_size); errno_assert (rc == 0); new_sndbuf = old_sndbuf * 2; + // Double the new send buffer size. rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size); errno_assert (rc == 0); + // Verify that the OS actually honored the request. rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size); errno_assert (rc == 0); zmq_assert (new_sndbuf > old_sndbuf); + // Retry the sending operation; at this point it must succeed. do { nbytes = ::send (w, &cmd_, sizeof (command_t), 0); @@ -184,7 +189,7 @@ void zmq::signaler_t::send (const command_t &cmd_) zmq_assert (nbytes == sizeof (command_t)); } -int zmq::signaler_t::recv (command_t *cmd_, bool block_) +int zmq::mailbox_t::recv (command_t *cmd_, bool block_) { #ifdef MSG_DONTWAIT // Attempt to read an entire command. Returns EAGAIN if non-blocking @@ -195,33 +200,40 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_) return -1; #else if (block_) { + // Set the reader to blocking mode. int flags = fcntl (r, F_GETFL, 0); errno_assert (flags >= 0); int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); errno_assert (rc == 0); } + // Attempt to read an entire command. Returns EAGAIN if non-blocking // and a command is not available. int err = 0; ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0); if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) { + // Save value of errno if we wish to pass it to caller. err = errno; } + if (block_) { + // Re-set the reader to non-blocking mode. int flags = fcntl (r, F_GETFL, 0); errno_assert (flags >= 0); int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); errno_assert (rc == 0); } + // If the recv failed, return with the saved errno if set. if (err != 0) { errno = err; return -1; } #endif + // Sanity check for success. errno_assert (nbytes != -1); @@ -233,7 +245,7 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_) #endif -int zmq::signaler_t::make_socketpair (fd_t *r_, fd_t *w_) +int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_) { #if defined ZMQ_HAVE_WINDOWS diff --git a/src/signaler.hpp b/src/mailbox.hpp index faf3f1f..dc49aad 100644 --- a/src/signaler.hpp +++ b/src/mailbox.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_SIGNALER_HPP_INCLUDED__ -#define __ZMQ_SIGNALER_HPP_INCLUDED__ +#ifndef __ZMQ_MAILBOX_HPP_INCLUDED__ +#define __ZMQ_MAILBOX_HPP_INCLUDED__ #include <stddef.h> @@ -31,12 +31,12 @@ namespace zmq { - class signaler_t + class mailbox_t { public: - signaler_t (); - ~signaler_t (); + mailbox_t (); + ~mailbox_t (); fd_t get_fd (); void send (const command_t &cmd_); @@ -51,9 +51,9 @@ namespace zmq // Platform-dependent function to create a socketpair. static int make_socketpair (fd_t *r_, fd_t *w_); - // Disable copying of signaler_t object. - signaler_t (const signaler_t&); - void operator = (const signaler_t&); + // Disable copying of mailbox_t object. + mailbox_t (const mailbox_t&); + void operator = (const mailbox_t&); }; } diff --git a/src/own.cpp b/src/own.cpp index 955113a..15d2567 100644 --- a/src/own.cpp +++ b/src/own.cpp @@ -83,7 +83,7 @@ void zmq::own_t::launch_sibling (own_t *object_) { // At this point it is important that object is plugged in before its // owner has a chance to terminate it. Thus, 'plug' command is sent before - // the 'own' command. Given that the signaler preserves ordering of + // the 'own' command. Given that the mailbox preserves ordering of // commands, 'term' command from the owner cannot make it to the object // before the already written 'plug' command. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index c6728fe..a10ed0e 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -118,9 +118,9 @@ zmq::socket_base_t::~socket_base_t () sessions_sync.unlock (); } -zmq::signaler_t *zmq::socket_base_t::get_signaler () +zmq::mailbox_t *zmq::socket_base_t::get_mailbox () { - return &signaler; + return &mailbox; } void zmq::socket_base_t::stop () @@ -227,7 +227,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, errno = EINVAL; return -1; } - *((fd_t*) optval_) = signaler.get_fd (); + *((fd_t*) optval_) = mailbox.get_fd (); *optvallen_ = sizeof (fd_t); return 0; } @@ -613,7 +613,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) int rc; command_t cmd; if (block_) { - rc = signaler.recv (&cmd, true); + rc = mailbox.recv (&cmd, true); if (rc == -1 && errno == EINTR) return -1; errno_assert (rc == 0); @@ -640,7 +640,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) } // Check whether there are any commands pending for this thread. - rc = signaler.recv (&cmd, false); + rc = mailbox.recv (&cmd, false); } // Process all the commands available at the moment. @@ -651,7 +651,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) return -1; errno_assert (rc == 0); cmd.destination->process_command (cmd); - rc = signaler.recv (&cmd, false); + rc = mailbox.recv (&cmd, false); } if (ctx_terminated) { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 0fdfefd..69de24d 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -30,7 +30,7 @@ #include "mutex.hpp" #include "stdint.hpp" #include "atomic_counter.hpp" -#include "signaler.hpp" +#include "mailbox.hpp" #include "stdint.hpp" #include "blob.hpp" #include "own.hpp" @@ -48,8 +48,8 @@ namespace zmq static socket_base_t *create (int type_, class ctx_t *parent_, uint32_t tid_); - // Returns the signaler associated with this socket. - signaler_t *get_signaler (); + // Returns the mailbox associated with this socket. + mailbox_t *get_mailbox (); // Interrupt blocking call if the socket is stuck in one. // This function can be called from a different thread! @@ -148,8 +148,8 @@ namespace zmq const blob_t &peer_identity_); void process_unplug (); - // App thread's signaler object. - signaler_t signaler; + // Socket's mailbox object. + mailbox_t mailbox; // Timestamp of when commands were processed the last time. uint64_t last_tsc; |