summaryrefslogtreecommitdiff
path: root/src/socket_base.hpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-08-06 17:49:37 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-08-25 15:39:20 +0200
commit05d908492dc382941fc633ad7082b5bd86e84e67 (patch)
treeae10e49766152e42521a6c100e622dc616998143 /src/socket_base.hpp
parentb7e0fa972f45d21e45cacb93a1a92d38fdc11f40 (diff)
WIP: Socket migration between threads, new zmq_close() semantics
Sockets may now be migrated between OS threads; sockets may not be used by more than one thread at any time. To migrate a socket to another thread the caller must ensure that a full memory barrier is called before using the socket from the target thread. The new zmq_close() semantics implement the behaviour discussed at: http://lists.zeromq.org/pipermail/zeromq-dev/2010-July/004244.html Specifically, zmq_close() is now deterministic and while it still returns immediately, it does not discard any data that may still be queued for sending. Further, zmq_term() will now block until all outstanding data has been sent. TODO: Many bugs have been introduced, needs testing. Further, SO_LINGER or an equivalent mechanism (possibly a configurable timeout to zmq_term()) needs to be implemented.
Diffstat (limited to 'src/socket_base.hpp')
-rw-r--r--src/socket_base.hpp100
1 files changed, 64 insertions, 36 deletions
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 3d95cec..386fdbb 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -26,13 +26,13 @@
#include "../include/zmq.h"
-#include "i_endpoint.hpp"
#include "object.hpp"
#include "yarray_item.hpp"
#include "mutex.hpp"
#include "options.hpp"
#include "stdint.hpp"
#include "atomic_counter.hpp"
+#include "signaler.hpp"
#include "stdint.hpp"
#include "blob.hpp"
@@ -40,11 +40,21 @@ namespace zmq
{
class socket_base_t :
- public object_t, public i_endpoint, public yarray_item_t
+ public object_t,
+ public yarray_item_t
{
public:
- socket_base_t (class app_thread_t *parent_);
+ // Create a socket of a specified type.
+ static socket_base_t *create (int type_, class ctx_t *parent_,
+ uint32_t slot_);
+
+ // Returns the signaler associated with this socket.
+ signaler_t *get_signaler ();
+
+ // Interrupt blocking call if the socket is stuck in one.
+ // This function can be called from a different thread!
+ void stop ();
// Interface for communication with the API layer.
int setsockopt (int option_, const void *optval_, size_t optvallen_);
@@ -60,11 +70,6 @@ namespace zmq
// before the command is delivered.
void inc_seqnum ();
- // This function is used by the polling mechanism to determine
- // whether the socket belongs to the application thread the poll
- // is called from.
- class app_thread_t *get_thread ();
-
// These functions are used by the polling mechanism to determine
// which events are to be reported from this socket.
bool has_in ();
@@ -85,43 +90,67 @@ namespace zmq
void unregister_session (uint64_t ordinal_);
class session_t *find_session (uint64_t ordinal_);
- // i_endpoint interface implementation.
- void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
- const blob_t &peer_identity_);
- void detach_inpipe (class reader_t *pipe_);
- void detach_outpipe (class writer_t *pipe_);
- void kill (class reader_t *pipe_);
- void revive (class reader_t *pipe_);
- void revive (class writer_t *pipe_);
+ // i_reader_events interface implementation.
+ void activated (class reader_t *pipe_);
+ void terminated (class reader_t *pipe_);
+
+ // i_writer_events interface implementation.
+ void activated (class writer_t *pipe_);
+ void terminated (class writer_t *pipe_);
+
+ // This function should be called only on zombie sockets. It tries
+ // to deallocate the zombie and returns true is successful.
+ bool dezombify ();
protected:
- // Destructor is protected. Socket is closed using 'close' function.
+ socket_base_t (class ctx_t *parent_, uint32_t slot_);
virtual ~socket_base_t ();
- // Pipe management is done by individual socket types.
+ // Concrete algorithms for the x- methods are to be defined by
+ // individual socket types.
+
virtual void xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
- virtual void xdetach_inpipe (class reader_t *pipe_) = 0;
- virtual void xdetach_outpipe (class writer_t *pipe_) = 0;
- virtual void xkill (class reader_t *pipe_) = 0;
- virtual void xrevive (class reader_t *pipe_) = 0;
- virtual void xrevive (class writer_t *pipe_) = 0;
+ virtual void xterm_pipes () = 0;
+ virtual bool xhas_pipes () = 0;
- // Actual algorithms are to be defined by individual socket types.
+ // The default implementation assumes there are no specific socket
+ // options for the particular socket type. If not so, overload this
+ // method.
virtual int xsetsockopt (int option_, const void *optval_,
- size_t optvallen_) = 0;
- virtual int xsend (zmq_msg_t *msg_, int options_) = 0;
- virtual int xrecv (zmq_msg_t *msg_, int options_) = 0;
- virtual bool xhas_in () = 0;
- virtual bool xhas_out () = 0;
+ size_t optvallen_);
+
+ // The default implementation assumes that send is not supported.
+ virtual bool xhas_out ();
+ virtual int xsend (zmq_msg_t *msg_, int options_);
+
+ // The default implementation assumes that recv in not supported.
+ virtual bool xhas_in ();
+ virtual int xrecv (zmq_msg_t *msg_, int options_);
// Socket options.
options_t options;
+ // If true, socket was already closed but not yet deallocated
+ // because either shutdown is in process or there are still pipes
+ // attached to the socket.
+ bool zombie;
+
private:
+ // If no identity set generate one and call xattach_pipes ().
+ void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
+
+ // Processes commands sent to this socket (if any). If 'block' is
+ // set to true, returns only after at least one command was processed.
+ // If throttle argument is true, commands are processed at most once
+ // in a predefined time period.
+ void process_commands (bool block_, bool throttle_);
+
// Handlers for incoming commands.
+ void process_stop ();
void process_own (class owned_t *object_);
void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_,
const blob_t &peer_identity_);
@@ -129,6 +158,12 @@ namespace zmq
void process_term_ack ();
void process_seqnum ();
+ // App thread's signaler object.
+ signaler_t signaler;
+
+ // Timestamp of when commands were processed the last time.
+ uint64_t last_processing_time;
+
// List of all I/O objects owned by this socket. The socket is
// responsible for deallocating them before it quits.
typedef std::set <class owned_t*> io_objects_t;
@@ -144,13 +179,6 @@ namespace zmq
// If true there's a half-read message in the socket.
bool rcvmore;
- // Application thread the socket lives in.
- class app_thread_t *app_thread;
-
- // If true, socket is already shutting down. No new work should be
- // started.
- bool shutting_down;
-
// Sequence number of the last command sent to this object.
atomic_counter_t sent_seqnum;