From acf0b0e515515e51ad32ba7a2d147ce703579478 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 22 May 2011 17:26:53 +0200 Subject: Introduces bi-directional pipes So far, there was a pair of unidirectional pipes between a socket and a session (or an inproc peer). This resulted in complex problems with half-closed states and tracking which inpipe corresponds to which outpipe. This patch doesn't add any functionality in itself, but is essential for further work on features like subscription forwarding. Signed-off-by: Martin Sustrik --- src/session.cpp | 167 +++++++++++++++++++++++--------------------------------- 1 file changed, 67 insertions(+), 100 deletions(-) (limited to 'src/session.cpp') diff --git a/src/session.cpp b/src/session.cpp index 499fe40..5ef21c7 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -29,13 +29,12 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, class socket_base_t *socket_, const options_t &options_) : own_t (io_thread_, options_), io_object_t (io_thread_), - in_pipe (NULL), + pipe (NULL), incomplete_in (false), - out_pipe (NULL), engine (NULL), socket (socket_), io_thread (io_thread_), - pipes_attached (false), + pipe_attached (false), delimiter_processed (false), force_terminate (false), has_linger_timer (false), @@ -45,8 +44,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, zmq::session_t::~session_t () { - zmq_assert (!in_pipe); - zmq_assert (!out_pipe); + zmq_assert (!pipe); if (engine) engine->terminate (); @@ -66,13 +64,9 @@ void zmq::session_t::proceed_with_term () has_linger_timer = false; } - if (in_pipe) { + if (pipe) { register_term_acks (1); - in_pipe->terminate (); - } - if (out_pipe) { - register_term_acks (1); - out_pipe->terminate (); + pipe->terminate (); } // The session has already waited for the linger period. We don't want @@ -82,10 +76,10 @@ void zmq::session_t::proceed_with_term () bool zmq::session_t::read (msg_t *msg_) { - if (!in_pipe) + if (!pipe) return false; - if (!in_pipe->read (msg_)) + if (!pipe->read (msg_)) return false; incomplete_in = msg_->flags () & msg_t::more; @@ -94,7 +88,7 @@ bool zmq::session_t::read (msg_t *msg_) bool zmq::session_t::write (msg_t *msg_) { - if (out_pipe && out_pipe->write (msg_)) { + if (pipe && pipe->write (msg_)) { int rc = msg_->init (); errno_assert (rc == 0); return true; @@ -105,21 +99,20 @@ bool zmq::session_t::write (msg_t *msg_) void zmq::session_t::flush () { - if (out_pipe) - out_pipe->flush (); + if (pipe) + pipe->flush (); } void zmq::session_t::clean_pipes () { - // Get rid of half-processed messages in the out pipe. Flush any - // unflushed messages upstream. - if (out_pipe) { - out_pipe->rollback (); - out_pipe->flush (); - } + if (pipe) { - // Remove any half-read message from the in pipe. - if (in_pipe) { + // Get rid of half-processed messages in the out pipe. Flush any + // unflushed messages upstream. + pipe->rollback (); + pipe->flush (); + + // Remove any half-read message from the in pipe. while (incomplete_in) { msg_t msg; int rc = msg.init (); @@ -134,78 +127,54 @@ void zmq::session_t::clean_pipes () } } -void zmq::session_t::attach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) +void zmq::session_t::attach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) { - zmq_assert (!pipes_attached); - pipes_attached = true; + zmq_assert (!pipe_attached); + pipe_attached = true; - if (inpipe_) { - zmq_assert (!in_pipe); - in_pipe = inpipe_; - in_pipe->set_event_sink (this); - } - - if (outpipe_) { - zmq_assert (!out_pipe); - out_pipe = outpipe_; - out_pipe->set_event_sink (this); + if (pipe_) { + zmq_assert (!pipe); + pipe = pipe_; + pipe->set_event_sink (this); } // If we are already terminating, terminate the pipes straight away. if (state == terminating) { - if (in_pipe) { - in_pipe->terminate (); - register_term_acks (1); - } - if (out_pipe) { - out_pipe->terminate (); + if (pipe) { + pipe->terminate (); register_term_acks (1); } } } -void zmq::session_t::delimited (reader_t *pipe_) +void zmq::session_t::terminated (pipe_t *pipe_) { - zmq_assert (in_pipe == pipe_); - zmq_assert (!delimiter_processed); - delimiter_processed = true; + zmq_assert (pipe == pipe_); - // If we are in process of being closed, but still waiting for all - // pending messeges being sent, we can terminate here. + // If we are in process of being closed, but still waiting for all + // pending messeges being sent, we can terminate here. if (state == pending) proceed_with_term (); -} -void zmq::session_t::terminated (reader_t *pipe_) -{ - zmq_assert (in_pipe == pipe_); - in_pipe = NULL; + pipe = NULL; if (state == terminating) unregister_term_ack (); } -void zmq::session_t::terminated (writer_t *pipe_) +void zmq::session_t::read_activated (pipe_t *pipe_) { - zmq_assert (out_pipe == pipe_); - out_pipe = NULL; - if (state == terminating) - unregister_term_ack (); -} - -void zmq::session_t::activated (reader_t *pipe_) -{ - zmq_assert (in_pipe == pipe_); + zmq_assert (pipe == pipe_); if (likely (engine != NULL)) engine->activate_out (); else - in_pipe->check_read (); + pipe->check_read (); } -void zmq::session_t::activated (writer_t *pipe_) +void zmq::session_t::write_activated (pipe_t *pipe_) { - zmq_assert (out_pipe == pipe_); + zmq_assert (pipe == pipe_); + if (engine) engine->activate_in (); } @@ -240,29 +209,27 @@ void zmq::session_t::process_attach (i_engine *engine_, return; } - // Check whether the required pipes already exist. If not so, we'll - // create them and bind them to the socket object. - if (!pipes_attached) { - zmq_assert (!in_pipe && !out_pipe); - pipes_attached = true; - reader_t *socket_reader = NULL; - writer_t *socket_writer = NULL; - - // Create the pipes, as required. - if (options.requires_in) { - create_pipe (socket, this, options.rcvhwm, &socket_reader, - &out_pipe); - out_pipe->set_event_sink (this); - } - if (options.requires_out) { - create_pipe (this, socket, options.sndhwm, &in_pipe, - &socket_writer); - in_pipe->set_event_sink (this); - } + // Check whether the required pipe already exists and create it + // if it does not. + if (!pipe_attached) { + zmq_assert (!pipe); + pipe_attached = true; + + object_t *parents [2] = {this, socket}; + pipe_t *pipes [2] = {NULL, NULL}; + int hwms [2] = {options.rcvhwm, options.sndhwm}; + bool delays [2] = {true, true}; + int rc = pipepair (parents, pipes, hwms, delays); + errno_assert (rc == 0); + + // Plug the local end of the pipe. + pipes [0]->set_event_sink (this); + + // Remember the local end of the pipe. + pipe = pipes [0]; - // Bind the pipes to the socket object. - if (socket_reader || socket_writer) - send_bind (socket, socket_reader, socket_writer, peer_identity_); + // Ask socket to plug into the remote end of the pipe. + send_bind (socket, pipes [1], peer_identity_); } // Plug in the engine. @@ -282,9 +249,9 @@ void zmq::session_t::detach () // Send the event to the derived class. detached (); - // Just in case, there's only a delimiter in the inbound pipe. - if (in_pipe) - in_pipe->check_read (); + // Just in case there's only a delimiter in the inbound pipe. + if (pipe) + pipe->check_read (); } void zmq::session_t::process_term (int linger_) @@ -308,16 +275,16 @@ void zmq::session_t::process_term (int linger_) // If there's no engine and there's only delimiter in the pipe it wouldn't // be ever read. Thus we check for it explicitly. - if (in_pipe) - in_pipe->check_read (); + if (pipe) + pipe->check_read (); - // If there's no in pipe there are no pending messages to send. + // If there's no in pipe, there are no pending messages to send. // We can proceed with the shutdown straight away. Also, if there is - // inbound pipe, but the delimiter was already processed, we can - // terminate immediately. Alternatively, if the derived session type have + // pipe, but the delimiter was already processed, we can terminate + // immediately. Alternatively, if the derived session type have // called 'terminate' we'll finish straight away. - if (!options.requires_out || delimiter_processed || force_terminate || - (!options.immediate_connect && !in_pipe)) + if (delimiter_processed || force_terminate || + (!options.immediate_connect && !pipe)) proceed_with_term (); } -- cgit v1.2.3