From 05d908492dc382941fc633ad7082b5bd86e84e67 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 6 Aug 2010 17:49:37 +0200 Subject: WIP: Socket migration between threads, new zmq_close() semantics Sockets may now be migrated between OS threads; sockets may not be used by more than one thread at any time. To migrate a socket to another thread the caller must ensure that a full memory barrier is called before using the socket from the target thread. The new zmq_close() semantics implement the behaviour discussed at: http://lists.zeromq.org/pipermail/zeromq-dev/2010-July/004244.html Specifically, zmq_close() is now deterministic and while it still returns immediately, it does not discard any data that may still be queued for sending. Further, zmq_term() will now block until all outstanding data has been sent. TODO: Many bugs have been introduced, needs testing. Further, SO_LINGER or an equivalent mechanism (possibly a configurable timeout to zmq_term()) needs to be implemented. --- src/socket_base.hpp | 100 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 64 insertions(+), 36 deletions(-) (limited to 'src/socket_base.hpp') diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 3d95cec..386fdbb 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -26,13 +26,13 @@ #include "../include/zmq.h" -#include "i_endpoint.hpp" #include "object.hpp" #include "yarray_item.hpp" #include "mutex.hpp" #include "options.hpp" #include "stdint.hpp" #include "atomic_counter.hpp" +#include "signaler.hpp" #include "stdint.hpp" #include "blob.hpp" @@ -40,11 +40,21 @@ namespace zmq { class socket_base_t : - public object_t, public i_endpoint, public yarray_item_t + public object_t, + public yarray_item_t { public: - socket_base_t (class app_thread_t *parent_); + // Create a socket of a specified type. + static socket_base_t *create (int type_, class ctx_t *parent_, + uint32_t slot_); + + // Returns the signaler associated with this socket. + signaler_t *get_signaler (); + + // Interrupt blocking call if the socket is stuck in one. + // This function can be called from a different thread! + void stop (); // Interface for communication with the API layer. int setsockopt (int option_, const void *optval_, size_t optvallen_); @@ -60,11 +70,6 @@ namespace zmq // before the command is delivered. void inc_seqnum (); - // This function is used by the polling mechanism to determine - // whether the socket belongs to the application thread the poll - // is called from. - class app_thread_t *get_thread (); - // These functions are used by the polling mechanism to determine // which events are to be reported from this socket. bool has_in (); @@ -85,43 +90,67 @@ namespace zmq void unregister_session (uint64_t ordinal_); class session_t *find_session (uint64_t ordinal_); - // i_endpoint interface implementation. - void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, - const blob_t &peer_identity_); - void detach_inpipe (class reader_t *pipe_); - void detach_outpipe (class writer_t *pipe_); - void kill (class reader_t *pipe_); - void revive (class reader_t *pipe_); - void revive (class writer_t *pipe_); + // i_reader_events interface implementation. + void activated (class reader_t *pipe_); + void terminated (class reader_t *pipe_); + + // i_writer_events interface implementation. + void activated (class writer_t *pipe_); + void terminated (class writer_t *pipe_); + + // This function should be called only on zombie sockets. It tries + // to deallocate the zombie and returns true is successful. + bool dezombify (); protected: - // Destructor is protected. Socket is closed using 'close' function. + socket_base_t (class ctx_t *parent_, uint32_t slot_); virtual ~socket_base_t (); - // Pipe management is done by individual socket types. + // Concrete algorithms for the x- methods are to be defined by + // individual socket types. + virtual void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_) = 0; - virtual void xdetach_inpipe (class reader_t *pipe_) = 0; - virtual void xdetach_outpipe (class writer_t *pipe_) = 0; - virtual void xkill (class reader_t *pipe_) = 0; - virtual void xrevive (class reader_t *pipe_) = 0; - virtual void xrevive (class writer_t *pipe_) = 0; + virtual void xterm_pipes () = 0; + virtual bool xhas_pipes () = 0; - // Actual algorithms are to be defined by individual socket types. + // The default implementation assumes there are no specific socket + // options for the particular socket type. If not so, overload this + // method. virtual int xsetsockopt (int option_, const void *optval_, - size_t optvallen_) = 0; - virtual int xsend (zmq_msg_t *msg_, int options_) = 0; - virtual int xrecv (zmq_msg_t *msg_, int options_) = 0; - virtual bool xhas_in () = 0; - virtual bool xhas_out () = 0; + size_t optvallen_); + + // The default implementation assumes that send is not supported. + virtual bool xhas_out (); + virtual int xsend (zmq_msg_t *msg_, int options_); + + // The default implementation assumes that recv in not supported. + virtual bool xhas_in (); + virtual int xrecv (zmq_msg_t *msg_, int options_); // Socket options. options_t options; + // If true, socket was already closed but not yet deallocated + // because either shutdown is in process or there are still pipes + // attached to the socket. + bool zombie; + private: + // If no identity set generate one and call xattach_pipes (). + void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); + + // Processes commands sent to this socket (if any). If 'block' is + // set to true, returns only after at least one command was processed. + // If throttle argument is true, commands are processed at most once + // in a predefined time period. + void process_commands (bool block_, bool throttle_); + // Handlers for incoming commands. + void process_stop (); void process_own (class owned_t *object_); void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_, const blob_t &peer_identity_); @@ -129,6 +158,12 @@ namespace zmq void process_term_ack (); void process_seqnum (); + // App thread's signaler object. + signaler_t signaler; + + // Timestamp of when commands were processed the last time. + uint64_t last_processing_time; + // List of all I/O objects owned by this socket. The socket is // responsible for deallocating them before it quits. typedef std::set io_objects_t; @@ -144,13 +179,6 @@ namespace zmq // If true there's a half-read message in the socket. bool rcvmore; - // Application thread the socket lives in. - class app_thread_t *app_thread; - - // If true, socket is already shutting down. No new work should be - // started. - bool shutting_down; - // Sequence number of the last command sent to this object. atomic_counter_t sent_seqnum; -- cgit v1.2.3