From 05d908492dc382941fc633ad7082b5bd86e84e67 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 6 Aug 2010 17:49:37 +0200 Subject: WIP: Socket migration between threads, new zmq_close() semantics Sockets may now be migrated between OS threads; sockets may not be used by more than one thread at any time. To migrate a socket to another thread the caller must ensure that a full memory barrier is called before using the socket from the target thread. The new zmq_close() semantics implement the behaviour discussed at: http://lists.zeromq.org/pipermail/zeromq-dev/2010-July/004244.html Specifically, zmq_close() is now deterministic and while it still returns immediately, it does not discard any data that may still be queued for sending. Further, zmq_term() will now block until all outstanding data has been sent. TODO: Many bugs have been introduced, needs testing. Further, SO_LINGER or an equivalent mechanism (possibly a configurable timeout to zmq_term()) needs to be implemented. --- src/fq.hpp | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) (limited to 'src/fq.hpp') diff --git a/src/fq.hpp b/src/fq.hpp index 5c699ee..2e09809 100644 --- a/src/fq.hpp +++ b/src/fq.hpp @@ -21,6 +21,7 @@ #define __ZMQ_FQ_HPP_INCLUDED__ #include "yarray.hpp" +#include "pipe.hpp" namespace zmq { @@ -28,24 +29,28 @@ namespace zmq // Class manages a set of inbound pipes. On receive it performs fair // queueing (RFC970) so that senders gone berserk won't cause denial of // service for decent senders. - class fq_t + class fq_t : public i_reader_events { public: fq_t (); ~fq_t (); - void attach (class reader_t *pipe_); - void detach (class reader_t *pipe_); - void kill (class reader_t *pipe_); - void revive (class reader_t *pipe_); + void attach (reader_t *pipe_); + bool has_pipes (); + void term_pipes (); + int recv (zmq_msg_t *msg_, int flags_); bool has_in (); + // i_reader_events implementation. + void activated (reader_t *pipe_); + void terminated (reader_t *pipe_); + private: // Inbound pipes. - typedef yarray_t pipes_t; + typedef yarray_t pipes_t; pipes_t pipes; // Number of active pipes. All the active pipes are located at the -- cgit v1.2.3