summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am4
-rw-r--r--src/ctx.cpp14
-rw-r--r--src/ctx.hpp6
-rw-r--r--src/io_thread.cpp12
-rw-r--r--src/io_thread.hpp15
-rw-r--r--src/mailbox.cpp (renamed from src/signaler.cpp)36
-rw-r--r--src/mailbox.hpp (renamed from src/signaler.hpp)16
-rw-r--r--src/own.cpp2
-rw-r--r--src/socket_base.cpp12
-rw-r--r--src/socket_base.hpp10
10 files changed, 69 insertions, 58 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 5cd4f73..fbd36c5 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -87,6 +87,7 @@ libzmq_la_SOURCES = \
kqueue.hpp \
lb.hpp \
likely.hpp \
+ mailbox.hpp \
msg_content.hpp \
mutex.hpp \
named_session.hpp \
@@ -111,7 +112,6 @@ libzmq_la_SOURCES = \
select.hpp \
semaphore.hpp \
session.hpp \
- signaler.hpp \
socket_base.hpp \
stdint.hpp \
streamer.hpp \
@@ -150,6 +150,7 @@ libzmq_la_SOURCES = \
ip.cpp \
kqueue.cpp \
lb.cpp \
+ mailbox.cpp \
named_session.cpp \
object.cpp \
options.cpp \
@@ -169,7 +170,6 @@ libzmq_la_SOURCES = \
req.cpp \
select.cpp \
session.cpp \
- signaler.cpp \
socket_base.cpp \
streamer.cpp \
sub.cpp \
diff --git a/src/ctx.cpp b/src/ctx.cpp
index b4c27fd..59ba2db 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -49,9 +49,9 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
HIBYTE (wsa_data.wVersion) == 2);
#endif
- // Initialise the array of signalers.
+ // Initialise the array of mailboxes.
slot_count = max_sockets + io_threads_;
- slots = (signaler_t**) malloc (sizeof (signaler_t*) * slot_count);
+ slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
zmq_assert (slots);
// Create I/O thread objects and launch them.
@@ -59,7 +59,7 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
zmq_assert (io_thread);
io_threads.push_back (io_thread);
- slots [i] = io_thread->get_signaler ();
+ slots [i] = io_thread->get_mailbox ();
io_thread->start ();
}
@@ -92,8 +92,8 @@ zmq::ctx_t::~ctx_t ()
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
delete io_threads [i];
- // Deallocate the array of slots. No special work is
- // needed as signalers themselves were deallocated with their
+ // Deallocate the array of mailboxes. No special work is
+ // needed as mailboxes themselves were deallocated with their
// corresponding io_thread/socket objects.
free (slots);
@@ -178,7 +178,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
uint32_t slot = empty_slots.back ();
empty_slots.pop_back ();
- // Create the socket and register its signaler.
+ // Create the socket and register its mailbox.
socket_base_t *s = socket_base_t::create (type_, this, slot);
if (!s) {
empty_slots.push_back (slot);
@@ -186,7 +186,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
return NULL;
}
sockets.push_back (s);
- slots [slot] = s->get_signaler ();
+ slots [slot] = s->get_mailbox ();
slot_sync.unlock ();
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 5a3a6aa..0f2dd52 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -26,7 +26,7 @@
#include "../include/zmq.h"
-#include "signaler.hpp"
+#include "mailbox.hpp"
#include "semaphore.hpp"
#include "ypipe.hpp"
#include "array.hpp"
@@ -117,9 +117,9 @@ namespace zmq
typedef std::vector <class io_thread_t*> io_threads_t;
io_threads_t io_threads;
- // Array of pointers to signalers for both application and I/O threads.
+ // Array of pointers to mailboxes for both application and I/O threads.
uint32_t slot_count;
- signaler_t **slots;
+ mailbox_t **slots;
// List of inproc endpoints within this context.
typedef std::map <std::string, class socket_base_t*> endpoints_t;
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index aacf843..7ba8905 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -32,8 +32,8 @@ zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
poller = new (std::nothrow) poller_t;
zmq_assert (poller);
- signaler_handle = poller->add_fd (signaler.get_fd (), this);
- poller->set_pollin (signaler_handle);
+ mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
+ poller->set_pollin (mailbox_handle);
}
zmq::io_thread_t::~io_thread_t ()
@@ -52,9 +52,9 @@ void zmq::io_thread_t::stop ()
send_stop ();
}
-zmq::signaler_t *zmq::io_thread_t::get_signaler ()
+zmq::mailbox_t *zmq::io_thread_t::get_mailbox ()
{
- return &signaler;
+ return &mailbox;
}
int zmq::io_thread_t::get_load ()
@@ -71,7 +71,7 @@ void zmq::io_thread_t::in_event ()
// Get the next command. If there is none, exit.
command_t cmd;
- int rc = signaler.recv (&cmd, false);
+ int rc = mailbox.recv (&cmd, false);
if (rc != 0 && errno == EINTR)
continue;
if (rc != 0 && errno == EAGAIN)
@@ -103,6 +103,6 @@ zmq::poller_t *zmq::io_thread_t::get_poller ()
void zmq::io_thread_t::process_stop ()
{
- poller->rm_fd (signaler_handle);
+ poller->rm_fd (mailbox_handle);
poller->stop ();
}
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index a0704fc..b01eecb 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -26,7 +26,7 @@
#include "object.hpp"
#include "poller.hpp"
#include "i_poll_events.hpp"
-#include "signaler.hpp"
+#include "mailbox.hpp"
namespace zmq
{
@@ -50,8 +50,8 @@ namespace zmq
// Ask underlying thread to stop.
void stop ();
- // Returns signaler associated with this I/O thread.
- signaler_t *get_signaler ();
+ // Returns mailbox associated with this I/O thread.
+ mailbox_t *get_mailbox ();
// i_poll_events implementation.
void in_event ();
@@ -69,12 +69,11 @@ namespace zmq
private:
- // Poll thread gets notifications about incoming commands using
- // this signaler.
- signaler_t signaler;
+ // I/O thread accesses incoming commands via this mailbox.
+ mailbox_t mailbox;
- // Handle associated with signaler's file descriptor.
- poller_t::handle_t signaler_handle;
+ // Handle associated with mailbox' file descriptor.
+ poller_t::handle_t mailbox_handle;
// I/O multiplexing is performed using a poller object.
poller_t *poller;
diff --git a/src/signaler.cpp b/src/mailbox.cpp
index a692078..927c230 100644
--- a/src/signaler.cpp
+++ b/src/mailbox.cpp
@@ -17,7 +17,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "signaler.hpp"
+#include "mailbox.hpp"
#include "platform.hpp"
#include "err.hpp"
#include "fd.hpp"
@@ -35,14 +35,14 @@
#include <sys/socket.h>
#endif
-zmq::fd_t zmq::signaler_t::get_fd ()
+zmq::fd_t zmq::mailbox_t::get_fd ()
{
return r;
}
#if defined ZMQ_HAVE_WINDOWS
-zmq::signaler_t::signaler_t ()
+zmq::mailbox_t::mailbox_t ()
{
// Create the socketpair for signalling.
int rc = make_socketpair (&r, &w);
@@ -59,7 +59,7 @@ zmq::signaler_t::signaler_t ()
wsa_assert (rc != SOCKET_ERROR);
}
-zmq::signaler_t::~signaler_t ()
+zmq::mailbox_t::~mailbox_t ()
{
int rc = closesocket (w);
wsa_assert (rc != SOCKET_ERROR);
@@ -68,7 +68,7 @@ zmq::signaler_t::~signaler_t ()
wsa_assert (rc != SOCKET_ERROR);
}
-void zmq::signaler_t::send (const command_t &cmd_)
+void zmq::mailbox_t::send (const command_t &cmd_)
{
// TODO: Implement SNDBUF auto-resizing as for POSIX platforms.
// In the mean time, the following code with assert if the send()
@@ -78,7 +78,7 @@ void zmq::signaler_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t));
}
-int zmq::signaler_t::recv (command_t *cmd_, bool block_)
+int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
{
if (block_) {
// Set the reader to blocking mode.
@@ -115,7 +115,7 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_)
#else // !ZMQ_HAVE_WINDOWS
-zmq::signaler_t::signaler_t ()
+zmq::mailbox_t::mailbox_t ()
{
#ifdef PIPE_BUF
// Make sure that command can be written to the socket in atomic fashion.
@@ -143,35 +143,40 @@ zmq::signaler_t::signaler_t ()
#endif
}
-zmq::signaler_t::~signaler_t ()
+zmq::mailbox_t::~mailbox_t ()
{
close (w);
close (r);
}
-void zmq::signaler_t::send (const command_t &cmd_)
+void zmq::mailbox_t::send (const command_t &cmd_)
{
// Attempt to write an entire command without blocking.
ssize_t nbytes;
do {
nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
} while (nbytes == -1 && errno == EINTR);
- // Attempt to increase signaler SNDBUF if the send failed.
+
+ // Attempt to increase mailbox SNDBUF if the send failed.
if (nbytes == -1 && errno == EAGAIN) {
int old_sndbuf, new_sndbuf;
socklen_t sndbuf_size = sizeof old_sndbuf;
+
// Retrieve current send buffer size.
int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf,
&sndbuf_size);
errno_assert (rc == 0);
new_sndbuf = old_sndbuf * 2;
+
// Double the new send buffer size.
rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size);
errno_assert (rc == 0);
+
// Verify that the OS actually honored the request.
rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size);
errno_assert (rc == 0);
zmq_assert (new_sndbuf > old_sndbuf);
+
// Retry the sending operation; at this point it must succeed.
do {
nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
@@ -184,7 +189,7 @@ void zmq::signaler_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t));
}
-int zmq::signaler_t::recv (command_t *cmd_, bool block_)
+int zmq::mailbox_t::recv (command_t *cmd_, bool block_)
{
#ifdef MSG_DONTWAIT
// Attempt to read an entire command. Returns EAGAIN if non-blocking
@@ -195,33 +200,40 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_)
return -1;
#else
if (block_) {
+
// Set the reader to blocking mode.
int flags = fcntl (r, F_GETFL, 0);
errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
errno_assert (rc == 0);
}
+
// Attempt to read an entire command. Returns EAGAIN if non-blocking
// and a command is not available.
int err = 0;
ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) {
+
// Save value of errno if we wish to pass it to caller.
err = errno;
}
+
if (block_) {
+
// Re-set the reader to non-blocking mode.
int flags = fcntl (r, F_GETFL, 0);
errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc == 0);
}
+
// If the recv failed, return with the saved errno if set.
if (err != 0) {
errno = err;
return -1;
}
#endif
+
// Sanity check for success.
errno_assert (nbytes != -1);
@@ -233,7 +245,7 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_)
#endif
-int zmq::signaler_t::make_socketpair (fd_t *r_, fd_t *w_)
+int zmq::mailbox_t::make_socketpair (fd_t *r_, fd_t *w_)
{
#if defined ZMQ_HAVE_WINDOWS
diff --git a/src/signaler.hpp b/src/mailbox.hpp
index faf3f1f..dc49aad 100644
--- a/src/signaler.hpp
+++ b/src/mailbox.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_SIGNALER_HPP_INCLUDED__
-#define __ZMQ_SIGNALER_HPP_INCLUDED__
+#ifndef __ZMQ_MAILBOX_HPP_INCLUDED__
+#define __ZMQ_MAILBOX_HPP_INCLUDED__
#include <stddef.h>
@@ -31,12 +31,12 @@
namespace zmq
{
- class signaler_t
+ class mailbox_t
{
public:
- signaler_t ();
- ~signaler_t ();
+ mailbox_t ();
+ ~mailbox_t ();
fd_t get_fd ();
void send (const command_t &cmd_);
@@ -51,9 +51,9 @@ namespace zmq
// Platform-dependent function to create a socketpair.
static int make_socketpair (fd_t *r_, fd_t *w_);
- // Disable copying of signaler_t object.
- signaler_t (const signaler_t&);
- void operator = (const signaler_t&);
+ // Disable copying of mailbox_t object.
+ mailbox_t (const mailbox_t&);
+ void operator = (const mailbox_t&);
};
}
diff --git a/src/own.cpp b/src/own.cpp
index 955113a..15d2567 100644
--- a/src/own.cpp
+++ b/src/own.cpp
@@ -83,7 +83,7 @@ void zmq::own_t::launch_sibling (own_t *object_)
{
// At this point it is important that object is plugged in before its
// owner has a chance to terminate it. Thus, 'plug' command is sent before
- // the 'own' command. Given that the signaler preserves ordering of
+ // the 'own' command. Given that the mailbox preserves ordering of
// commands, 'term' command from the owner cannot make it to the object
// before the already written 'plug' command.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index c6728fe..a10ed0e 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -118,9 +118,9 @@ zmq::socket_base_t::~socket_base_t ()
sessions_sync.unlock ();
}
-zmq::signaler_t *zmq::socket_base_t::get_signaler ()
+zmq::mailbox_t *zmq::socket_base_t::get_mailbox ()
{
- return &signaler;
+ return &mailbox;
}
void zmq::socket_base_t::stop ()
@@ -227,7 +227,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
errno = EINVAL;
return -1;
}
- *((fd_t*) optval_) = signaler.get_fd ();
+ *((fd_t*) optval_) = mailbox.get_fd ();
*optvallen_ = sizeof (fd_t);
return 0;
}
@@ -613,7 +613,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
int rc;
command_t cmd;
if (block_) {
- rc = signaler.recv (&cmd, true);
+ rc = mailbox.recv (&cmd, true);
if (rc == -1 && errno == EINTR)
return -1;
errno_assert (rc == 0);
@@ -640,7 +640,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
}
// Check whether there are any commands pending for this thread.
- rc = signaler.recv (&cmd, false);
+ rc = mailbox.recv (&cmd, false);
}
// Process all the commands available at the moment.
@@ -651,7 +651,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
return -1;
errno_assert (rc == 0);
cmd.destination->process_command (cmd);
- rc = signaler.recv (&cmd, false);
+ rc = mailbox.recv (&cmd, false);
}
if (ctx_terminated) {
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 0fdfefd..69de24d 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -30,7 +30,7 @@
#include "mutex.hpp"
#include "stdint.hpp"
#include "atomic_counter.hpp"
-#include "signaler.hpp"
+#include "mailbox.hpp"
#include "stdint.hpp"
#include "blob.hpp"
#include "own.hpp"
@@ -48,8 +48,8 @@ namespace zmq
static socket_base_t *create (int type_, class ctx_t *parent_,
uint32_t tid_);
- // Returns the signaler associated with this socket.
- signaler_t *get_signaler ();
+ // Returns the mailbox associated with this socket.
+ mailbox_t *get_mailbox ();
// Interrupt blocking call if the socket is stuck in one.
// This function can be called from a different thread!
@@ -148,8 +148,8 @@ namespace zmq
const blob_t &peer_identity_);
void process_unplug ();
- // App thread's signaler object.
- signaler_t signaler;
+ // Socket's mailbox object.
+ mailbox_t mailbox;
// Timestamp of when commands were processed the last time.
uint64_t last_tsc;