summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-11-05 17:39:51 +0100
committerMartin Sustrik <sustrik@250bpm.com>2010-11-05 17:39:51 +0100
commit9da84a5239e5356e34d872c2b5af1d19b9c7eb4f (patch)
tree62267a898b0890dc21425cf4120f690a8083877d /src
parent9cfdb441f45057c7106a101835d65164fce9470a (diff)
signaler renamed to mailbox
For historical reasons queue to transfer commands between threads was called 'signaler'. Given that it was used to pass commands rather than signals it was renamed to 'mailbox', see Erlang mailboxes. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am4
-rw-r--r--src/ctx.cpp14
-rw-r--r--src/ctx.hpp6
-rw-r--r--src/io_thread.cpp12
-rw-r--r--src/io_thread.hpp15
-rw-r--r--src/mailbox.cpp (renamed from src/signaler.cpp)36
-rw-r--r--src/mailbox.hpp (renamed from src/signaler.hpp)16
-rw-r--r--src/own.cpp2
-rw-r--r--src/socket_base.cpp12
-rw-r--r--src/socket_base.hpp10
10 files changed, 69 insertions, 58 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 5cd4f73..fbd36c5 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -87,6 +87,7 @@ libzmq_la_SOURCES = \
kqueue.hpp \
lb.hpp \
likely.hpp \
+ mailbox.hpp \
msg_content.hpp \
mutex.hpp \
named_session.hpp \
@@ -111,7 +112,6 @@ libzmq_la_SOURCES = \
select.hpp \
semaphore.hpp \
session.hpp \
- signaler.hpp \
socket_base.hpp \
stdint.hpp \
streamer.hpp \
@@ -150,6 +150,7 @@ libzmq_la_SOURCES = \
ip.cpp \
kqueue.cpp \
lb.cpp \
+ mailbox.cpp \
named_session.cpp \
object.cpp \
options.cpp \
@@ -169,7 +170,6 @@ libzmq_la_SOURCES = \
req.cpp \
select.cpp \
session.cpp \
- signaler.cpp \
socket_base.cpp \
streamer.cpp \
sub.cpp \
diff --git a/src/ctx.cpp b/src/ctx.cpp
index b4c27fd..59ba2db 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -49,9 +49,9 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
HIBYTE (wsa_data.wVersion) == 2);
#endif
- // Initialise the array of signalers.
+ // Initialise the array of mailboxes.
slot_count = max_sockets + io_threads_;
- slots = (signaler_t**) malloc (sizeof (signaler_t*) * slot_count);
+ slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
zmq_assert (slots);
// Create I/O thread objects and launch them.
@@ -59,7 +59,7 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
zmq_assert (io_thread);
io_threads.push_back (io_thread);
- slots [i] = io_thread->get_signaler ();
+ slots [i] = io_thread->get_mailbox ();
io_thread->start ();
}
@@ -92,8 +92,8 @@ zmq::ctx_t::~ctx_t ()
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
delete io_threads [i];
- // Deallocate the array of slots. No special work is
- // needed as signalers themselves were deallocated with their
+ // Deallocate the array of mailboxes. No special work is
+ // needed as mailboxes themselves were deallocated with their
// corresponding io_thread/socket objects.
free (slots);
@@ -178,7 +178,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
uint32_t slot = empty_slots.back ();
empty_slots.pop_back ();
- // Create the socket and register its signaler.
+ // Create the socket and register its mailbox.
socket_base_t *s = socket_base_t::create (type_, this, slot);
if (!s) {
empty_slots.push_back (slot);
@@ -186,7 +186,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
return NULL;
}
sockets.push_back (s);
- slots [slot] = s->get_signaler ();
+ slots [slot] = s->get_mailbox ();
slot_sync.unlock ();
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 5a3a6aa..0f2dd52 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -26,7 +26,7 @@
#include "../include/zmq.h"
-#include "signaler.hpp"
+#include "mailbox.hpp"
#include "semaphore.hpp"
#include "ypipe.hpp"
#include "array.hpp"
@@ -117,9 +117,9 @@ namespace zmq
typedef std::vector <class io_thread_t*> io_threads_t;
io_threads_t io_threads;
- // Array of pointers to signalers for both application and I/O threads.
+ // Array of pointers to mailboxes for both application and I/O threads.
uint32_t slot_count;
- signaler_t **slots;
+ mailbox_t **slots;
// List of inproc endpoints within this context.
typedef std::map <std::string, class socket_base_t*> endpoints_t;
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index aacf843..7ba8905 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -32,8 +32,8 @@ zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
poller = new (std::nothrow) poller_t;
zmq_assert (poller);
- signaler_handle = poller->add_fd (signaler.get_fd (), this);
- poller->set_pollin (signaler_handle);
+ mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
+ poller->set_pollin (mailbox_handle);
}
zmq::io_thread_t::~io_thread_t ()
@@ -52,9 +52,9 @@ void zmq::io_thread_t::stop ()
send_stop ();
}
-zmq::signaler_t *zmq::io_thread_t::get_signaler ()
+zmq::mailbox_t *zmq::io_thread_t::get_mailbox ()
{
- return &signaler;
+ return &mailbox;
}
int zmq::io_thread_t::get_load ()
@@ -71,7 +71,7 @@ void zmq::io_thread_t::in_event ()
// Get the next command. If there is none, exit.
command_t cmd;
- int rc = signaler.recv (&cmd, false);
+ int rc = mailbox.recv (&cmd, false);
if (rc != 0 && errno == EINTR)
continue;
if (rc != 0 && errno == EAGAIN)
@@ -103,6 +103,6 @@ zmq::poller_t *zmq::io_thread_t::get_poller ()
void zmq::io_thread_t::process_stop ()
{
- poller->rm_fd (signaler_handle);
+ poller->rm_fd (mailbox_handle);
poller->stop ();
}
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index a0704fc..b01eecb 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -26,7 +26,7 @@
#include "object.hpp"
#include "poller.hpp"
#include "i_poll_events.hpp"
-#include "signaler.hpp"
+#include "mailbox.hpp"
namespace zmq
{
@@ -50,8 +50,8 @@ namespace zmq
// Ask underlying thread to stop.
void stop ();
- // Returns signaler associated with this I/O thread.
- signaler_t *get_signaler ();
+ // Returns mailbox associated with this I/O thread.
+ mailbox_t *get_mailbox ();
// i_poll_events implementation.
void in_event ();
@@ -69,12 +69,11 @@ namespace zmq
private:
- // Poll thread gets notifications about incoming commands using
- // this signaler.
- signaler_t signaler;
+ // I/O thread accesses incoming commands via this mailbox.
+ mailbox_t mailbox;
- // Handle associated with signaler's file descriptor.
- poller_t::handle_t signaler_handle;
+ // Handle associated with mailbox' file descriptor.
+ poller_t::handle_t mailbox_handle;
// I/O multiplexing is performed using a poller object.
poller_t *poller;
diff --git a/src/signaler.cpp b/src/mailbox.cpp
index a692078..927c230 100644
--- a/src/signaler.cpp
+++ b/src/mailbox.cpp
@@ -17,7 +17,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "signaler.hpp"
+#include "mailbox.hpp"
#include "platform.hpp"
#include "err.hpp"
#include "fd.hpp"
@@ -35,14 +35,14 @@
#include <sys/socket.h>
#endif
-zmq::fd_t zmq::signaler_t::get_fd ()
+zmq::fd_t zmq::mailbox_t::get_fd ()
{
return r;
}
#if defined ZMQ_HAVE_WINDOWS
-zmq::signaler_t::signaler_t ()
+zmq::mailbox_t::mailbox_t ()
{
// Create the socketpair for signalling.
int rc = make_socketpair (&r, &w);
@@ -59,7 +59,7 @@ zmq::signaler_t::signaler_t ()
wsa_assert (rc != SOCKET_ERROR);
}
-zmq::signaler_t::~signaler_t ()
+zmq::mailbox_t::~mailbox_t ()
{
int rc = closesocket (w);
wsa_assert (rc != SOCKET_ERROR);
@@ -68,7 +68,7 @@ zmq::signaler_t::~signaler_t ()
wsa_assert (rc != SOCKET_ERROR);
}
-void zmq::signaler_t::send (const command_t &cmd_)
+void zmq::mailbox_t::send (const command_t &cmd_)
{
// TODO: Implement SNDBUF auto-resizing as for POSIX platforms.
// In the mean time, the following code with assert if the send()
@@ -78,7 +78,7 @@ void zmq::signaler_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t));
}
-int zmq::signaler_t::recv (command_t *cmd_, bool block_)
+int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
{
if (block_) {
// Set the reader to blocking mode.
@@ -115,7 +115,7 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_)
#else // !ZMQ_HAVE_WINDOWS
-zmq::signaler_t::signaler_t ()
+zmq::mailbox_t::mailbox_t ()
{
#ifdef PIPE_BUF
// Make sure that command can be written to the socket in atomic fashion.
@@ -143,35 +143,40 @@ zmq::signaler_t::signaler_t ()
#endif
}
-zmq::signaler_t::~signaler_t ()
+zmq::mailbox_t::~mailbox_t ()
{
close (w);
close (r);
}
-void zmq::signaler_t::send (const command_t &cmd_)
+void zmq::mailbox_t::send (const command_t &cmd_)
{
// Attempt to write an entire command without blocking.
ssize_t nbytes;
do {
nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
} while (nbytes == -1 && errno == EINTR);
- // Attempt to increase signaler SNDBUF if the send failed.
+
+ // Attempt to increase mailbox SNDBUF if the send failed.
if (nbytes == -1 && errno == EAGAIN) {
int old_sndbuf, new_sndbuf;
socklen_t sndbuf_size = sizeof old_sndbuf;
+
// Retrieve current send buffer size.
int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf,
&sndbuf_size);
errno_assert (rc == 0);
new_sndbuf = old_sndbuf * 2;
+
// Double the new send buffer size.
rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size);
errno_assert (rc == 0);
+
// Verify that the OS actually honored the request.
rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size);
errno_assert (rc == 0);
zmq_assert (new_sndbuf > old_sndbuf);
+
// Retry the sending operation; at this point it must succeed.
do {
nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
@@ -184,7 +189,7 @@ void zmq::signaler_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t));
}
-int zmq::signaler_t::recv (command_t *cmd_, bool block_)
+int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
{
#ifdef MSG_DONTWAIT
// Attempt to read an entire command. Returns EAGAIN if non-blocking
@@ -195,33 +200,40 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_)
return -1;
#else
if (block_) {
+
// Set the reader to blocking mode.
int flags = fcntl (r, F_GETFL, 0);
errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
errno_assert (rc == 0);
}
+
// Attempt to read an entire command. Returns EAGAIN if non-blocking
// and a command is not available.
int err = 0;
ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) {
+
// Save value of errno if we wish to pass it to caller.
err = errno;
}
+
if (block_) {
+
// Re-set the reader to non-blocking mode.
int flags = fcntl (r, F_GETFL, 0);
errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc == 0);
}
+
// If the recv failed, return with the saved errno if set.
if (err != 0) {
errno = err;
return -1;
}
#endif
+
// Sanity check for success.
errno_assert (nbytes != -1);
@@ -233,7 +245,7 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_)
#endif
-int zmq::signaler_t::make_socketpair (fd_t *r_, fd_t *w_)
+int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_)
{
#if defined ZMQ_HAVE_WINDOWS
diff --git a/src/signaler.hpp b/src/mailbox.hpp
index faf3f1f..dc49aad 100644
--- a/src/signaler.hpp
+++ b/src/mailbox.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_SIGNALER_HPP_INCLUDED__
-#define __ZMQ_SIGNALER_HPP_INCLUDED__
+#ifndef __ZMQ_MAILBOX_HPP_INCLUDED__
+#define __ZMQ_MAILBOX_HPP_INCLUDED__
#include <stddef.h>
@@ -31,12 +31,12 @@
namespace zmq
{
- class signaler_t
+ class mailbox_t
{
public:
- signaler_t ();
- ~signaler_t ();
+ mailbox_t ();
+ ~mailbox_t ();
fd_t get_fd ();
void send (const command_t &cmd_);
@@ -51,9 +51,9 @@ namespace zmq
// Platform-dependent function to create a socketpair.
static int make_socketpair (fd_t *r_, fd_t *w_);
- // Disable copying of signaler_t object.
- signaler_t (const signaler_t&);
- void operator = (const signaler_t&);
+ // Disable copying of mailbox_t object.
+ mailbox_t (const mailbox_t&);
+ void operator = (const mailbox_t&);
};
}
diff --git a/src/own.cpp b/src/own.cpp
index 955113a..15d2567 100644
--- a/src/own.cpp
+++ b/src/own.cpp
@@ -83,7 +83,7 @@ void zmq::own_t::launch_sibling (own_t *object_)
{
// At this point it is important that object is plugged in before its
// owner has a chance to terminate it. Thus, 'plug' command is sent before
- // the 'own' command. Given that the signaler preserves ordering of
+ // the 'own' command. Given that the mailbox preserves ordering of
// commands, 'term' command from the owner cannot make it to the object
// before the already written 'plug' command.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index c6728fe..a10ed0e 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -118,9 +118,9 @@ zmq::socket_base_t::~socket_base_t ()
sessions_sync.unlock ();
}
-zmq::signaler_t *zmq::socket_base_t::get_signaler ()
+zmq::mailbox_t *zmq::socket_base_t::get_mailbox ()
{
- return &signaler;
+ return &mailbox;
}
void zmq::socket_base_t::stop ()
@@ -227,7 +227,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
errno = EINVAL;
return -1;
}
- *((fd_t*) optval_) = signaler.get_fd ();
+ *((fd_t*) optval_) = mailbox.get_fd ();
*optvallen_ = sizeof (fd_t);
return 0;
}
@@ -613,7 +613,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
int rc;
command_t cmd;
if (block_) {
- rc = signaler.recv (&cmd, true);
+ rc = mailbox.recv (&cmd, true);
if (rc == -1 && errno == EINTR)
return -1;
errno_assert (rc == 0);
@@ -640,7 +640,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
}
// Check whether there are any commands pending for this thread.
- rc = signaler.recv (&cmd, false);
+ rc = mailbox.recv (&cmd, false);
}
// Process all the commands available at the moment.
@@ -651,7 +651,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
return -1;
errno_assert (rc == 0);
cmd.destination->process_command (cmd);
- rc = signaler.recv (&cmd, false);
+ rc = mailbox.recv (&cmd, false);
}
if (ctx_terminated) {
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 0fdfefd..69de24d 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -30,7 +30,7 @@
#include "mutex.hpp"
#include "stdint.hpp"
#include "atomic_counter.hpp"
-#include "signaler.hpp"
+#include "mailbox.hpp"
#include "stdint.hpp"
#include "blob.hpp"
#include "own.hpp"
@@ -48,8 +48,8 @@ namespace zmq
static socket_base_t *create (int type_, class ctx_t *parent_,
uint32_t tid_);
- // Returns the signaler associated with this socket.
- signaler_t *get_signaler ();
+ // Returns the mailbox associated with this socket.
+ mailbox_t *get_mailbox ();
// Interrupt blocking call if the socket is stuck in one.
// This function can be called from a different thread!
@@ -148,8 +148,8 @@ namespace zmq
const blob_t &peer_identity_);
void process_unplug ();
- // App thread's signaler object.
- signaler_t signaler;
+ // Socket's mailbox object.
+ mailbox_t mailbox;
// Timestamp of when commands were processed the last time.
uint64_t last_tsc;