summaryrefslogtreecommitdiff
path: root/src/mailbox.hpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-07-03 13:33:45 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-07-03 13:33:45 +0200
commit7c0c79812075459765440ca26bad56f4f7ddbe52 (patch)
tree2d1aa3dd7dbb1dcff8664708bc535d3dae4f2585 /src/mailbox.hpp
parentde3838403b9a35e7131aae23519ced1f11a3e03c (diff)
Command are now stored in ypipes instead of in socketpairs
Storing commands in OS socket buffers caused whole lot of problems when free space in the buffer ran out. This patch stores commands in ypipes instead and uses socketpair just to signal the other thread, ie. at most one byte is stored in the socketpair at any single instant. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/mailbox.hpp')
-rw-r--r--src/mailbox.hpp29
1 files changed, 16 insertions, 13 deletions
diff --git a/src/mailbox.hpp b/src/mailbox.hpp
index eb02e39..0675b99 100644
--- a/src/mailbox.hpp
+++ b/src/mailbox.hpp
@@ -24,10 +24,12 @@
#include <stddef.h>
#include "platform.hpp"
+#include "signaler.hpp"
#include "fd.hpp"
-#include "stdint.hpp"
#include "config.hpp"
#include "command.hpp"
+#include "ypipe.hpp"
+#include "mutex.hpp"
namespace zmq
{
@@ -45,21 +47,22 @@ namespace zmq
private:
- // Platform-dependent function to create a socketpair.
- static int make_socketpair (fd_t *r_, fd_t *w_);
+ // The pipe to store actual commands.
+ typedef ypipe_t <command_t, command_pipe_granularity> cpipe_t;
+ cpipe_t cpipe;
- // Receives a command with the specific timeout.
- // This function is not to be used for non-blocking or inifinitely
- // blocking recvs.
- int recv_timeout (command_t *cmd_, int timeout_);
+ // Signaler to pass signals from writer thread to reader thread.
+ signaler_t signaler;
- // Write & read end of the socketpair.
- fd_t w;
- fd_t r;
+ // There's only one thread receiving from the mailbox, but there
+ // is arbitrary number of threads sending. Given that ypipe requires
+ // synchronised access on both of its endpoints, we have to synchronise
+ // the sending side.
+ mutex_t sync;
- // Used on platforms where there's no MSG_DONTWAIT functionality.
- // True if the read socket is set to the blocking state.
- bool blocking;
+ // True if the underlying pipe is active, ie. when we are allowed to
+ // read commands from it.
+ bool active;
// Disable copying of mailbox_t object.
mailbox_t (const mailbox_t&);