From 05d908492dc382941fc633ad7082b5bd86e84e67 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 6 Aug 2010 17:49:37 +0200 Subject: WIP: Socket migration between threads, new zmq_close() semantics Sockets may now be migrated between OS threads; sockets may not be used by more than one thread at any time. To migrate a socket to another thread the caller must ensure that a full memory barrier is called before using the socket from the target thread. The new zmq_close() semantics implement the behaviour discussed at: http://lists.zeromq.org/pipermail/zeromq-dev/2010-July/004244.html Specifically, zmq_close() is now deterministic and while it still returns immediately, it does not discard any data that may still be queued for sending. Further, zmq_term() will now block until all outstanding data has been sent. TODO: Many bugs have been introduced, needs testing. Further, SO_LINGER or an equivalent mechanism (possibly a configurable timeout to zmq_term()) needs to be implemented. --- src/pipe.cpp | 250 ++++++++++++++++++++++++++++++++++------------------------- 1 file changed, 144 insertions(+), 106 deletions(-) (limited to 'src/pipe.cpp') diff --git a/src/pipe.cpp b/src/pipe.cpp index 200beb0..1903422 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -17,31 +17,54 @@ along with this program. If not, see . */ +#include + #include "../include/zmq.h" #include "pipe.hpp" +#include "likely.hpp" -zmq::reader_t::reader_t (object_t *parent_, uint64_t lwm_) : +zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, + uint64_t lwm_) : object_t (parent_), - pipe (NULL), - peer (NULL), + pipe (pipe_), + writer (NULL), lwm (lwm_), msgs_read (0), - endpoint (NULL) -{} + sink (NULL), + terminating (false) +{ + // Note that writer is not set here. Writer will inform reader about its + // address once it is created (via set_writer method). +} + +void zmq::reader_t::set_writer (writer_t *writer_) +{ + zmq_assert (!writer); + writer = writer_; +} zmq::reader_t::~reader_t () { - if (pipe) - unregister_pipe (pipe); + // Pipe as such is owned and deallocated by reader object. + // The point is that reader processes the last step of terminal + // handshaking (term_ack). + zmq_assert (pipe); + + // First delete all the unread messages in the pipe. We have to do it by + // hand because zmq_msg_t is a POD, not a class, so there's no associated + // destructor. + zmq_msg_t msg; + while (pipe->read (&msg)) + zmq_msg_close (&msg); + + delete pipe; } -void zmq::reader_t::set_pipe (pipe_t *pipe_) +void zmq::reader_t::set_event_sink (i_reader_events *sink_) { - zmq_assert (!pipe); - pipe = pipe_; - peer = &pipe->writer; - register_pipe (pipe); + zmq_assert (!sink); + sink = sink_; } bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_) @@ -53,19 +76,20 @@ bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_) bool zmq::reader_t::check_read () { + if (unlikely (terminating)) + return false; + // Check if there's an item in the pipe. // If not, deactivate the pipe. if (!pipe->check_read ()) { - endpoint->kill (this); + terminate (); return false; } // If the next item in the pipe is message delimiter, // initiate its termination. if (pipe->probe (is_delimiter)) { - if (endpoint) - endpoint->detach_inpipe (this); - term (); + terminate (); return false; } @@ -74,17 +98,16 @@ bool zmq::reader_t::check_read () bool zmq::reader_t::read (zmq_msg_t *msg_) { - if (!pipe->read (msg_)) { - endpoint->kill (this); + if (unlikely (terminating)) + return false; + + if (!pipe->read (msg_)) return false; - } // If delimiter was read, start termination process of the pipe. unsigned char *offset = 0; if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) { - if (endpoint) - endpoint->detach_inpipe (this); - term (); + terminate (); return false; } @@ -92,51 +115,64 @@ bool zmq::reader_t::read (zmq_msg_t *msg_) msgs_read++; if (lwm > 0 && msgs_read % lwm == 0) - send_reader_info (peer, msgs_read); + send_reader_info (writer, msgs_read); return true; } -void zmq::reader_t::set_endpoint (i_endpoint *endpoint_) +void zmq::reader_t::terminate () { - endpoint = endpoint_; + // If termination was already started by the peer, do nothing. + if (terminating) + return; + + terminating = true; + send_pipe_term (writer); } -void zmq::reader_t::term () +bool zmq::reader_t::is_terminating () { - endpoint = NULL; - send_pipe_term (peer); + return terminating; } void zmq::reader_t::process_revive () { - // Beacuse of command throttling mechanism, incoming termination request - // may not have been processed before subsequent send. - // In that case endpoint is NULL. - if (endpoint) - endpoint->revive (this); + // Forward the event to the sink (either socket or session). + sink->activated (this); } void zmq::reader_t::process_pipe_term_ack () { - peer = NULL; - delete pipe; + // At this point writer may already be deallocated. + // For safety's sake drop the reference to it. + writer = NULL; + + // Notify owner about the termination. + zmq_assert (sink); + sink->terminated (this); + + // Deallocate resources. + delete this; } -zmq::writer_t::writer_t (object_t *parent_, +zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_, uint64_t hwm_, int64_t swap_size_) : object_t (parent_), - pipe (NULL), - peer (NULL), + pipe (pipe_), + reader (reader_), hwm (hwm_), msgs_read (0), msgs_written (0), msg_store (NULL), extra_msg_flag (false), stalled (false), - pending_close (false), - endpoint (NULL) + sink (NULL), + terminating (false), + pending_close (false) { + // Inform reader about the writer. + reader->set_writer (this); + if (swap_size_ > 0) { msg_store = new (std::nothrow) msg_store_t (swap_size_); if (msg_store != NULL) { @@ -148,11 +184,6 @@ zmq::writer_t::writer_t (object_t *parent_, } } -void zmq::writer_t::set_endpoint (i_endpoint *endpoint_) -{ - endpoint = endpoint_; -} - zmq::writer_t::~writer_t () { if (extra_msg_flag) @@ -161,15 +192,17 @@ zmq::writer_t::~writer_t () delete msg_store; } -void zmq::writer_t::set_pipe (pipe_t *pipe_) +void zmq::writer_t::set_event_sink (i_writer_events *sink_) { - zmq_assert (!pipe); - pipe = pipe_; - peer = &pipe->reader; + zmq_assert (!sink); + sink = sink_; } bool zmq::writer_t::check_write () { + if (terminating) + return false; + if (pipe_full () && (msg_store == NULL || msg_store->full () || extra_msg_flag)) { stalled = true; return false; @@ -180,6 +213,9 @@ bool zmq::writer_t::check_write () bool zmq::writer_t::write (zmq_msg_t *msg_) { + if (terminating) + return false; + if (!check_write ()) return false; @@ -216,23 +252,27 @@ void zmq::writer_t::rollback () while (pipe->unwrite (&msg)) { zmq_assert (msg.flags & ZMQ_MSG_MORE); zmq_msg_close (&msg); + msgs_written--; } - if (stalled && endpoint != NULL && check_write ()) { + if (stalled && check_write ()) { stalled = false; - endpoint->revive (this); + zmq_assert (sink); + sink->activated (this); } } void zmq::writer_t::flush () { if (!pipe->flush ()) - send_revive (peer); + send_revive (reader); } -void zmq::writer_t::term () +void zmq::writer_t::terminate () { - endpoint = NULL; + // Prevent double termination. + if (terminating) + return; // Rollback any unfinished messages. rollback (); @@ -293,71 +333,69 @@ void zmq::writer_t::process_reader_info (uint64_t msgs_read_) flush (); } - if (stalled && endpoint != NULL) { + if (stalled) { stalled = false; - endpoint->revive (this); + zmq_assert (sink); + sink->activated (this); } } void zmq::writer_t::process_pipe_term () { - if (endpoint) - endpoint->detach_outpipe (this); + send_pipe_term_ack (reader); - reader_t *p = peer; - peer = NULL; - send_pipe_term_ack (p); -} + // The above command allows reader to deallocate itself and the pipe. + // For safety's sake we'll drop the pointers here. + reader = NULL; + pipe = NULL; -bool zmq::writer_t::pipe_full () -{ - return hwm > 0 && msgs_written - msgs_read == hwm; -} + // Notify owner about the termination. + zmq_assert (sink); + sink->terminated (this); -zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_, int64_t swap_size_) : - reader (reader_parent_, compute_lwm (hwm_)), - writer (writer_parent_, hwm_, swap_size_) -{ - reader.set_pipe (this); - writer.set_pipe (this); + // Deallocate the resources. + delete this; } -zmq::pipe_t::~pipe_t () +bool zmq::writer_t::pipe_full () { - // Deallocate all the unread messages in the pipe. We have to do it by - // hand because zmq_msg_t is a POD, not a class, so there's no associated - // destructor. - zmq_msg_t msg; - while (read (&msg)) - zmq_msg_close (&msg); + return hwm > 0 && msgs_written - msgs_read == hwm; } -uint64_t zmq::pipe_t::compute_lwm (uint64_t hwm_) +void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_, + uint64_t hwm_, int64_t swap_size_, reader_t **reader_, writer_t **writer_) { - // Following point should be taken into consideration when computing - // low watermark: - // - // 1. LWM has to be less than HWM. - // 2. LWM cannot be set to very low value (such as zero) as after filling - // the queue it would start to refill only after all the messages are - // read from it and thus unnecessarily hold the progress back. - // 3. LWM cannot be set to very high value (such as HWM-1) as it would - // result in lock-step filling of the queue - if a single message is read - // from a full queue, writer thread is resumed to write exactly one - // message to the queue and go back to sleep immediately. This would - // result in low performance. - // - // Given the 3. it would be good to keep HWM and LWM as far apart as - // possible to reduce the thread switching overhead to almost zero, - // say HWM-LWM should be 500 (max_wm_delta). - // - // That done, we still we have to account for the cases where HWM<500 thus - // driving LWM to negative numbers. Let's make LWM 1/2 of HWM in such cases. - - if (hwm_ > max_wm_delta * 2) - return hwm_ - max_wm_delta; - else - return (hwm_ + 1) / 2; + // First compute the low water mark. Following point should be taken + // into consideration: + // + // 1. LWM has to be less than HWM. + // 2. LWM cannot be set to very low value (such as zero) as after filling + // the queue it would start to refill only after all the messages are + // read from it and thus unnecessarily hold the progress back. + // 3. LWM cannot be set to very high value (such as HWM-1) as it would + // result in lock-step filling of the queue - if a single message is + // read from a full queue, writer thread is resumed to write exactly one + // message to the queue and go back to sleep immediately. This would + // result in low performance. + // + // Given the 3. it would be good to keep HWM and LWM as far apart as + // possible to reduce the thread switching overhead to almost zero, + // say HWM-LWM should be max_wm_delta. + // + // That done, we still we have to account for the cases where + // HWM < max_wm_delta thus driving LWM to negative numbers. + // Let's make LWM 1/2 of HWM in such cases. + uint64_t lwm = (hwm_ > max_wm_delta * 2) ? + hwm_ - max_wm_delta : (hwm_ + 1) / 2; + + // Create all three objects pipe consists of: the pipe per se, reader and + // writer. The pipe will be handled by reader and writer, its never passed + // to the user. Reader and writer are returned to the user. + pipe_t *pipe = new (std::nothrow) pipe_t (); + zmq_assert (pipe); + *reader_ = new (std::nothrow) reader_t (reader_parent_, pipe, lwm); + zmq_assert (*reader_); + *writer_ = new (std::nothrow) writer_t (writer_parent_, pipe, *reader_, + hwm_, swap_size_); + zmq_assert (*writer_); } - -- cgit v1.2.3