From 05d908492dc382941fc633ad7082b5bd86e84e67 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 6 Aug 2010 17:49:37 +0200 Subject: WIP: Socket migration between threads, new zmq_close() semantics Sockets may now be migrated between OS threads; sockets may not be used by more than one thread at any time. To migrate a socket to another thread the caller must ensure that a full memory barrier is called before using the socket from the target thread. The new zmq_close() semantics implement the behaviour discussed at: http://lists.zeromq.org/pipermail/zeromq-dev/2010-July/004244.html Specifically, zmq_close() is now deterministic and while it still returns immediately, it does not discard any data that may still be queued for sending. Further, zmq_term() will now block until all outstanding data has been sent. TODO: Many bugs have been introduced, needs testing. Further, SO_LINGER or an equivalent mechanism (possibly a configurable timeout to zmq_term()) needs to be implemented. --- src/pipe.hpp | 111 +++++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 69 insertions(+), 42 deletions(-) (limited to 'src/pipe.hpp') diff --git a/src/pipe.hpp b/src/pipe.hpp index ece678a..34c5600 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -23,7 +23,6 @@ #include "../include/zmq.h" #include "stdint.hpp" -#include "i_endpoint.hpp" #include "yarray_item.hpp" #include "ypipe.hpp" #include "msg_store.hpp" @@ -33,15 +32,31 @@ namespace zmq { + // The shutdown mechanism for pipe works as follows: Either endpoint + // (or even both of them) can ask pipe to terminate by calling 'terminate' + // method. Pipe then terminates in asynchronous manner. When the part of + // the shutdown tied to the endpoint is done it triggers 'terminated' + // event. When endpoint processes the event and returns, associated + // reader/writer object is deallocated. + + typedef ypipe_t pipe_t; + + struct i_reader_events + { + virtual void terminated (class reader_t *pipe_) = 0; + virtual void activated (class reader_t *pipe_) = 0; + }; + class reader_t : public object_t, public yarray_item_t { - public: + friend void zmq::create_pipe (object_t*, object_t*, uint64_t, + int64_t, reader_t**, writer_t**); + friend class writer_t; - reader_t (class object_t *parent_, uint64_t lwm_); - ~reader_t (); + public: - void set_pipe (class pipe_t *pipe_); - void set_endpoint (i_endpoint *endpoint_); + // Specifies the object to get events from the reader. + void set_event_sink (i_reader_events *endpoint_); // Returns true if there is at least one message to read in the pipe. bool check_read (); @@ -50,10 +65,20 @@ namespace zmq bool read (zmq_msg_t *msg_); // Ask pipe to terminate. - void term (); + void terminate (); + + // Returns true if the pipe is already terminating + // (say if delimiter was already read). + bool is_terminating (); private: + reader_t (class object_t *parent_, pipe_t *pipe_, uint64_t lwm_); + ~reader_t (); + + // To be called only by writer itself! + void set_writer (class writer_t *writer_); + // Command handlers. void process_revive (); void process_pipe_term_ack (); @@ -62,10 +87,10 @@ namespace zmq static bool is_delimiter (zmq_msg_t &msg_); // The underlying pipe. - class pipe_t *pipe; + pipe_t *pipe; // Pipe writer associated with the other side of the pipe. - class writer_t *peer; + class writer_t *writer; // Low watermark for in-memory storage (in bytes). uint64_t lwm; @@ -73,22 +98,32 @@ namespace zmq // Number of messages read so far. uint64_t msgs_read; - // Endpoint (either session or socket) the pipe is attached to. - i_endpoint *endpoint; + // Sink for the events (either the socket of the session). + i_reader_events *sink; + + // True is 'terminate' method was called or delimiter + // was read from the pipe. + bool terminating; reader_t (const reader_t&); void operator = (const reader_t&); }; + struct i_writer_events + { + virtual void terminated (class writer_t *pipe_) = 0; + virtual void activated (class writer_t *pipe_) = 0; + }; + class writer_t : public object_t, public yarray_item_t { - public: + friend void zmq::create_pipe (object_t*, object_t*, uint64_t, + int64_t, reader_t**, writer_t**); - writer_t (class object_t *parent_, uint64_t hwm_, int64_t swap_size_); - ~writer_t (); + public: - void set_pipe (class pipe_t *pipe_); - void set_endpoint (i_endpoint *endpoint_); + // Specifies the object to get events from the writer. + void set_event_sink (i_writer_events *endpoint_); // Checks whether a message can be written to the pipe. // If writing the message would cause high watermark to be @@ -106,10 +141,14 @@ namespace zmq void flush (); // Ask pipe to terminate. - void term (); + void terminate (); private: + writer_t (class object_t *parent_, pipe_t *pipe_, reader_t *reader_, + uint64_t hwm_, int64_t swap_size_); + ~writer_t (); + void process_reader_info (uint64_t msgs_read_); // Command handlers. @@ -123,10 +162,10 @@ namespace zmq void write_delimiter (); // The underlying pipe. - class pipe_t *pipe; + pipe_t *pipe; // Pipe reader associated with the other side of the pipe. - class reader_t *peer; + reader_t *reader; // High watermark for in-memory storage (in bytes). uint64_t hwm; @@ -149,35 +188,23 @@ namespace zmq // True iff the last attempt to write a message has failed. bool stalled; - bool pending_close; + // Sink for the events (either the socket or the session). + i_writer_events *sink; - // Endpoint (either session or socket) the pipe is attached to. - i_endpoint *endpoint; + // True is 'terminate' method was called of 'pipe_term' command + // arrived from the reader. + bool terminating; + + bool pending_close; writer_t (const writer_t&); void operator = (const writer_t&); }; - // Message pipe. - class pipe_t : public ypipe_t - { - public: - - pipe_t (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_, int64_t swap_size_); - ~pipe_t (); - - reader_t reader; - writer_t writer; - - private: - - uint64_t compute_lwm (uint64_t hwm_); - - pipe_t (const pipe_t&); - void operator = (const pipe_t&); - }; - + // Creates a pipe. Returns pointer to reader and writer objects. + void create_pipe (object_t *reader_parent_, object_t *writer_parent_, + uint64_t hwm_, int64_t swap_size_, reader_t **reader_, + writer_t **writer_); } #endif -- cgit v1.2.3