From 05d908492dc382941fc633ad7082b5bd86e84e67 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 6 Aug 2010 17:49:37 +0200 Subject: WIP: Socket migration between threads, new zmq_close() semantics Sockets may now be migrated between OS threads; sockets may not be used by more than one thread at any time. To migrate a socket to another thread the caller must ensure that a full memory barrier is called before using the socket from the target thread. The new zmq_close() semantics implement the behaviour discussed at: http://lists.zeromq.org/pipermail/zeromq-dev/2010-July/004244.html Specifically, zmq_close() is now deterministic and while it still returns immediately, it does not discard any data that may still be queued for sending. Further, zmq_term() will now block until all outstanding data has been sent. TODO: Many bugs have been introduced, needs testing. Further, SO_LINGER or an equivalent mechanism (possibly a configurable timeout to zmq_term()) needs to be implemented. --- src/session.cpp | 62 +++++++++++++++++++++++++++++---------------------------- 1 file changed, 32 insertions(+), 30 deletions(-) (limited to 'src/session.cpp') diff --git a/src/session.cpp b/src/session.cpp index f798877..86086fb 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -69,13 +69,22 @@ zmq::session_t::~session_t () zmq_assert (!out_pipe); } +bool zmq::session_t::is_terminable () +{ + return in_pipe->is_terminating (); +} + bool zmq::session_t::read (::zmq_msg_t *msg_) { if (!in_pipe || !active) return false; - if (!in_pipe->read (msg_)) + if (!in_pipe->read (msg_)) { + active = false; + if (in_pipe->is_terminating ()) + finalise (); return false; + } incomplete_in = msg_->flags & ZMQ_MSG_MORE; return true; @@ -156,33 +165,28 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_, zmq_assert (!in_pipe); in_pipe = inpipe_; active = true; - in_pipe->set_endpoint (this); + in_pipe->set_event_sink (this); } if (outpipe_) { zmq_assert (!out_pipe); out_pipe = outpipe_; - out_pipe->set_endpoint (this); + out_pipe->set_event_sink (this); } } -void zmq::session_t::detach_inpipe (reader_t *pipe_) +void zmq::session_t::terminated (reader_t *pipe_) { active = false; in_pipe = NULL; } -void zmq::session_t::detach_outpipe (writer_t *pipe_) +void zmq::session_t::terminated (writer_t *pipe_) { out_pipe = NULL; } -void zmq::session_t::kill (reader_t *pipe_) -{ - active = false; -} - -void zmq::session_t::revive (reader_t *pipe_) +void zmq::session_t::activated (reader_t *pipe_) { zmq_assert (in_pipe == pipe_); active = true; @@ -190,7 +194,7 @@ void zmq::session_t::revive (reader_t *pipe_) engine->revive (); } -void zmq::session_t::revive (writer_t *pipe_) +void zmq::session_t::activated (writer_t *pipe_) { zmq_assert (out_pipe == pipe_); if (engine) @@ -203,6 +207,11 @@ void zmq::session_t::process_plug () void zmq::session_t::process_unplug () { + // TODO: There may be a problem here. The called ensures that all the + // commands on the fly have been delivered. However, given that the + // session is unregistered from the global repository only at this point + // there may be some commands being sent to the session right now. + // Unregister the session from the socket. if (ordinal) owner->unregister_session (ordinal); @@ -210,14 +219,10 @@ void zmq::session_t::process_unplug () owner->unregister_session (peer_identity); // Ask associated pipes to terminate. - if (in_pipe) { - in_pipe->term (); - in_pipe = NULL; - } - if (out_pipe) { - out_pipe->term (); - out_pipe = NULL; - } + if (in_pipe) + in_pipe->terminate (); + if (out_pipe) + out_pipe->terminate (); if (engine) { engine->unplug (); @@ -265,19 +270,15 @@ void zmq::session_t::process_attach (i_engine *engine_, writer_t *socket_writer = NULL; if (options.requires_in && !out_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm, options.swap); - zmq_assert (pipe); - out_pipe = &pipe->writer; - out_pipe->set_endpoint (this); - socket_reader = &pipe->reader; + create_pipe (owner, this, options.hwm, options.swap, &socket_reader, + &out_pipe); + out_pipe->set_event_sink (this); } if (options.requires_out && !in_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm, options.swap); - zmq_assert (pipe); - in_pipe = &pipe->reader; - in_pipe->set_endpoint (this); - socket_writer = &pipe->writer; + create_pipe (this, owner, options.hwm, options.swap, &in_pipe, + &socket_writer); + in_pipe->set_event_sink (this); } if (socket_reader || socket_writer) @@ -289,3 +290,4 @@ void zmq::session_t::process_attach (i_engine *engine_, engine = engine_; engine->plug (this); } + -- cgit v1.2.3