From 7c0c79812075459765440ca26bad56f4f7ddbe52 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 3 Jul 2011 13:33:45 +0200 Subject: Command are now stored in ypipes instead of in socketpairs Storing commands in OS socket buffers caused whole lot of problems when free space in the buffer ran out. This patch stores commands in ypipes instead and uses socketpair just to signal the other thread, ie. at most one byte is stored in the socketpair at any single instant. Signed-off-by: Martin Sustrik --- src/mailbox.hpp | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) (limited to 'src/mailbox.hpp') diff --git a/src/mailbox.hpp b/src/mailbox.hpp index eb02e39..0675b99 100644 --- a/src/mailbox.hpp +++ b/src/mailbox.hpp @@ -24,10 +24,12 @@ #include #include "platform.hpp" +#include "signaler.hpp" #include "fd.hpp" -#include "stdint.hpp" #include "config.hpp" #include "command.hpp" +#include "ypipe.hpp" +#include "mutex.hpp" namespace zmq { @@ -45,21 +47,22 @@ namespace zmq private: - // Platform-dependent function to create a socketpair. - static int make_socketpair (fd_t *r_, fd_t *w_); + // The pipe to store actual commands. + typedef ypipe_t cpipe_t; + cpipe_t cpipe; - // Receives a command with the specific timeout. - // This function is not to be used for non-blocking or inifinitely - // blocking recvs. - int recv_timeout (command_t *cmd_, int timeout_); + // Signaler to pass signals from writer thread to reader thread. + signaler_t signaler; - // Write & read end of the socketpair. - fd_t w; - fd_t r; + // There's only one thread receiving from the mailbox, but there + // is arbitrary number of threads sending. Given that ypipe requires + // synchronised access on both of its endpoints, we have to synchronise + // the sending side. + mutex_t sync; - // Used on platforms where there's no MSG_DONTWAIT functionality. - // True if the read socket is set to the blocking state. - bool blocking; + // True if the underlying pipe is active, ie. when we are allowed to + // read commands from it. + bool active; // Disable copying of mailbox_t object. mailbox_t (const mailbox_t&); -- cgit v1.2.3