diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-05-26 11:30:25 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-05-26 11:30:25 +0200 |
commit | 718885fdcd7af797f940078ca8c22aebab93c8bb (patch) | |
tree | 33d0d96a5d122338ee8eaddc44fd138b5f6d0651 | |
parent | 87a6490b39c44e8f9c521f6ccea14f800a712d3f (diff) |
Pending messages are delivered even if connection doesn't exist yet
Bug in previous refactoring fixed.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r-- | src/pipe.cpp | 3 | ||||
-rw-r--r-- | src/session.cpp | 110 | ||||
-rw-r--r-- | src/session.hpp | 10 | ||||
-rw-r--r-- | src/tcp_socket.cpp | 2 |
4 files changed, 57 insertions, 68 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index 73e5aae..48fc3e5 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -216,11 +216,12 @@ void zmq::pipe_t::process_pipe_term () if (!delay) { state = terminating; send_pipe_term_ack (peer); + return; } else { state = pending; + return; } - return; } // Delimiter happened to arrive before the term command. Now we have the diff --git a/src/session.cpp b/src/session.cpp index bff452e..5601402 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -31,7 +31,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, io_object_t (io_thread_), pipe (NULL), incomplete_in (false), - terminating (false), + pending (false), engine (NULL), socket (socket_), io_thread (io_thread_), @@ -121,8 +121,11 @@ void zmq::session_t::terminated (pipe_t *pipe_) zmq_assert (pipe == pipe_); pipe = NULL; - if (terminating) - unregister_term_ack (); + // If we are waiting for pending messages to be sent, at this point + // we are sure that there will be no more messages and we can proceed + // with termination safely. + if (pending) + proceed_with_term (); } void zmq::session_t::read_activated (pipe_t *pipe_) @@ -150,15 +153,6 @@ void zmq::session_t::process_plug () void zmq::session_t::process_attach (i_engine *engine_, const blob_t &peer_identity_) { - // If we are already terminating, we destroy the engine straight away. - // Note that we don't have to unplug it before deleting as it's not - // yet plugged to the session. - if (terminating) { - if (engine_) - delete engine_; - return; - } - // If some other object (e.g. init) notifies us that the connection failed // without creating an engine we need to start the reconnection process. if (!engine_) { @@ -217,37 +211,52 @@ void zmq::session_t::detach () void zmq::session_t::process_term (int linger_) { - // If termination is already underway, do nothing. - if (!terminating) { - - terminating = true; - - // If the termination of the pipe happens before the term command is - // delivered there's nothing much to do. We can proceed with the - // stadard termination immediately. - if (pipe) { - - // We're going to wait till the pipe terminates. - register_term_acks (1); - - // If linger is set to zero, we can ask pipe to terminate without - // waiting for pending messages to be read. - if (linger_ == 0) - pipe->terminate (); - - // If there's finite linger value, set up a timer. - if (linger_ > 0) { - zmq_assert (!has_linger_timer); - add_timer (linger_, linger_timer_id); - has_linger_timer = true; - } - - // In case there's no engine and there's only delimiter in the pipe it - // wouldn't be ever read. Thus we check for it explicitly. - pipe->check_read (); - } + zmq_assert (!pending); + + // If the termination of the pipe happens before the term command is + // delivered there's nothing much to do. We can proceed with the + // stadard termination immediately. + if (!pipe) { + proceed_with_term (); + return; + } + + // If linger is set to zero, we can ask pipe to terminate without + // waiting for pending messages to be read. + if (linger_ == 0) { + proceed_with_term (); + return; + } + + pending = true; + + // If there's finite linger value, delay the termination. + // If linger is infinite (negative) we don't even have to set + // the timer. + if (linger_ > 0) { + zmq_assert (!has_linger_timer); + add_timer (linger_, linger_timer_id); + has_linger_timer = true; } + // In case there's no engine and there's only delimiter in the + // pipe it wouldn't be ever read. Thus we check for it explicitly. + pipe->check_read (); +} + +void zmq::session_t::proceed_with_term () +{ + // The pending phase have just ended. + pending = false; + + // If there's pipe attached to the session, we have to wait till it + // terminates. + if (pipe) { + register_term_acks (1); + pipe->terminate (); + } + + // Continue with standard termination. own_t::process_term (0); } @@ -260,7 +269,7 @@ void zmq::session_t::timer_event (int id_) // Ask pipe to terminate even though there may be pending messages in it. zmq_assert (pipe); - pipe->terminate (); + proceed_with_term (); } bool zmq::session_t::has_engine () @@ -278,21 +287,4 @@ void zmq::session_t::unregister_session (const blob_t &name_) socket->unregister_session (name_); } -void zmq::session_t::terminate () -{ - // If termination process is already underway, do nothing. - if (!terminating) { - terminating = true; - - // If the pipe was already terminated, there's nothing much to do. - // If it wasn't, we'll ask it to terminate. - if (pipe) { - - register_term_acks (1); - pipe->terminate (); - } - } - - own_t::terminate (); -} diff --git a/src/session.hpp b/src/session.hpp index f1564d8..8bca735 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -59,10 +59,6 @@ namespace zmq protected: - // This function allows to shut down the session even though - // there are messages pending. - void terminate (); - // Two events for the derived session type. Attached is triggered // when session is attached to a peer. The function can reject the new // peer by returning false. Detached is triggered at the beginning of @@ -105,9 +101,9 @@ namespace zmq // is still in the in pipe. bool incomplete_in; - // If true the termination process is already underway, ie. term ack - // for the pipe was already registered etc. - bool terminating; + // True if termination have been suspended to push the pending + // messages to the network. + bool pending; // The protocol I/O engine connected to the session. struct i_engine *engine; diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp index 2257e4f..3c9b1fd 100644 --- a/src/tcp_socket.cpp +++ b/src/tcp_socket.cpp @@ -213,7 +213,7 @@ int zmq::tcp_socket_t::read (void *data_, size_t size_) errno_assert (nbytes != -1); - // Orderly shutdown by the other peer. + // Orderly shutdown by the peer. if (nbytes == 0) return -1; |