From 05d908492dc382941fc633ad7082b5bd86e84e67 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 6 Aug 2010 17:49:37 +0200 Subject: WIP: Socket migration between threads, new zmq_close() semantics Sockets may now be migrated between OS threads; sockets may not be used by more than one thread at any time. To migrate a socket to another thread the caller must ensure that a full memory barrier is called before using the socket from the target thread. The new zmq_close() semantics implement the behaviour discussed at: http://lists.zeromq.org/pipermail/zeromq-dev/2010-July/004244.html Specifically, zmq_close() is now deterministic and while it still returns immediately, it does not discard any data that may still be queued for sending. Further, zmq_term() will now block until all outstanding data has been sent. TODO: Many bugs have been introduced, needs testing. Further, SO_LINGER or an equivalent mechanism (possibly a configurable timeout to zmq_term()) needs to be implemented. --- src/req.hpp | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) (limited to 'src/req.hpp') diff --git a/src/req.hpp b/src/req.hpp index 5ab7bca..5fd5642 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -22,31 +22,39 @@ #include "socket_base.hpp" #include "yarray.hpp" +#include "pipe.hpp" namespace zmq { - class req_t : public socket_base_t + class req_t : + public socket_base_t, + public i_reader_events, + public i_writer_events { public: - req_t (class app_thread_t *parent_); + req_t (class ctx_t *parent_, uint32_t slot_); ~req_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, const blob_t &peer_identity_); - void xdetach_inpipe (class reader_t *pipe_); - void xdetach_outpipe (class writer_t *pipe_); - void xkill (class reader_t *pipe_); - void xrevive (class reader_t *pipe_); - void xrevive (class writer_t *pipe_); - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + void xterm_pipes (); + bool xhas_pipes (); int xsend (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); + // i_reader_events interface implementation. + void activated (reader_t *pipe_); + void terminated (reader_t *pipe_); + + // i_writer_events interface implementation. + void activated (writer_t *pipe_); + void terminated (writer_t *pipe_); + private: // List in outbound and inbound pipes. Note that the two lists are @@ -58,9 +66,9 @@ namespace zmq // the beginning of the array). We don't have to do the same thing for // inpipes, because we know which pipe we want to read the // reply from. - typedef yarray_t out_pipes_t; + typedef yarray_t out_pipes_t; out_pipes_t out_pipes; - typedef yarray_t in_pipes_t; + typedef yarray_t in_pipes_t; in_pipes_t in_pipes; // Number of active pipes. @@ -82,7 +90,7 @@ namespace zmq bool more; // Pipe we are awaiting the reply from. - class reader_t *reply_pipe; + reader_t *reply_pipe; req_t (const req_t&); void operator = (const req_t&); -- cgit v1.2.3