summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-05-26 11:30:25 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-05-26 11:30:25 +0200
commit718885fdcd7af797f940078ca8c22aebab93c8bb (patch)
tree33d0d96a5d122338ee8eaddc44fd138b5f6d0651
parent87a6490b39c44e8f9c521f6ccea14f800a712d3f (diff)
Pending messages are delivered even if connection doesn't exist yet
Bug in previous refactoring fixed. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--src/pipe.cpp3
-rw-r--r--src/session.cpp110
-rw-r--r--src/session.hpp10
-rw-r--r--src/tcp_socket.cpp2
4 files changed, 57 insertions, 68 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 73e5aae..48fc3e5 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -216,11 +216,12 @@ void zmq::pipe_t::process_pipe_term ()
if (!delay) {
state = terminating;
send_pipe_term_ack (peer);
+ return;
}
else {
state = pending;
+ return;
}
- return;
}
// Delimiter happened to arrive before the term command. Now we have the
diff --git a/src/session.cpp b/src/session.cpp
index bff452e..5601402 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -31,7 +31,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
io_object_t (io_thread_),
pipe (NULL),
incomplete_in (false),
- terminating (false),
+ pending (false),
engine (NULL),
socket (socket_),
io_thread (io_thread_),
@@ -121,8 +121,11 @@ void zmq::session_t::terminated (pipe_t *pipe_)
zmq_assert (pipe == pipe_);
pipe = NULL;
- if (terminating)
- unregister_term_ack ();
+ // If we are waiting for pending messages to be sent, at this point
+ // we are sure that there will be no more messages and we can proceed
+ // with termination safely.
+ if (pending)
+ proceed_with_term ();
}
void zmq::session_t::read_activated (pipe_t *pipe_)
@@ -150,15 +153,6 @@ void zmq::session_t::process_plug ()
void zmq::session_t::process_attach (i_engine *engine_,
const blob_t &peer_identity_)
{
- // If we are already terminating, we destroy the engine straight away.
- // Note that we don't have to unplug it before deleting as it's not
- // yet plugged to the session.
- if (terminating) {
- if (engine_)
- delete engine_;
- return;
- }
-
// If some other object (e.g. init) notifies us that the connection failed
// without creating an engine we need to start the reconnection process.
if (!engine_) {
@@ -217,37 +211,52 @@ void zmq::session_t::detach ()
void zmq::session_t::process_term (int linger_)
{
- // If termination is already underway, do nothing.
- if (!terminating) {
-
- terminating = true;
-
- // If the termination of the pipe happens before the term command is
- // delivered there's nothing much to do. We can proceed with the
- // stadard termination immediately.
- if (pipe) {
-
- // We're going to wait till the pipe terminates.
- register_term_acks (1);
-
- // If linger is set to zero, we can ask pipe to terminate without
- // waiting for pending messages to be read.
- if (linger_ == 0)
- pipe->terminate ();
-
- // If there's finite linger value, set up a timer.
- if (linger_ > 0) {
- zmq_assert (!has_linger_timer);
- add_timer (linger_, linger_timer_id);
- has_linger_timer = true;
- }
-
- // In case there's no engine and there's only delimiter in the pipe it
- // wouldn't be ever read. Thus we check for it explicitly.
- pipe->check_read ();
- }
+ zmq_assert (!pending);
+
+ // If the termination of the pipe happens before the term command is
+ // delivered there's nothing much to do. We can proceed with the
+ // stadard termination immediately.
+ if (!pipe) {
+ proceed_with_term ();
+ return;
+ }
+
+ // If linger is set to zero, we can ask pipe to terminate without
+ // waiting for pending messages to be read.
+ if (linger_ == 0) {
+ proceed_with_term ();
+ return;
+ }
+
+ pending = true;
+
+ // If there's finite linger value, delay the termination.
+ // If linger is infinite (negative) we don't even have to set
+ // the timer.
+ if (linger_ > 0) {
+ zmq_assert (!has_linger_timer);
+ add_timer (linger_, linger_timer_id);
+ has_linger_timer = true;
}
+ // In case there's no engine and there's only delimiter in the
+ // pipe it wouldn't be ever read. Thus we check for it explicitly.
+ pipe->check_read ();
+}
+
+void zmq::session_t::proceed_with_term ()
+{
+ // The pending phase have just ended.
+ pending = false;
+
+ // If there's pipe attached to the session, we have to wait till it
+ // terminates.
+ if (pipe) {
+ register_term_acks (1);
+ pipe->terminate ();
+ }
+
+ // Continue with standard termination.
own_t::process_term (0);
}
@@ -260,7 +269,7 @@ void zmq::session_t::timer_event (int id_)
// Ask pipe to terminate even though there may be pending messages in it.
zmq_assert (pipe);
- pipe->terminate ();
+ proceed_with_term ();
}
bool zmq::session_t::has_engine ()
@@ -278,21 +287,4 @@ void zmq::session_t::unregister_session (const blob_t &name_)
socket->unregister_session (name_);
}
-void zmq::session_t::terminate ()
-{
- // If termination process is already underway, do nothing.
- if (!terminating) {
- terminating = true;
-
- // If the pipe was already terminated, there's nothing much to do.
- // If it wasn't, we'll ask it to terminate.
- if (pipe) {
-
- register_term_acks (1);
- pipe->terminate ();
- }
- }
-
- own_t::terminate ();
-}
diff --git a/src/session.hpp b/src/session.hpp
index f1564d8..8bca735 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -59,10 +59,6 @@ namespace zmq
protected:
- // This function allows to shut down the session even though
- // there are messages pending.
- void terminate ();
-
// Two events for the derived session type. Attached is triggered
// when session is attached to a peer. The function can reject the new
// peer by returning false. Detached is triggered at the beginning of
@@ -105,9 +101,9 @@ namespace zmq
// is still in the in pipe.
bool incomplete_in;
- // If true the termination process is already underway, ie. term ack
- // for the pipe was already registered etc.
- bool terminating;
+ // True if termination have been suspended to push the pending
+ // messages to the network.
+ bool pending;
// The protocol I/O engine connected to the session.
struct i_engine *engine;
diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp
index 2257e4f..3c9b1fd 100644
--- a/src/tcp_socket.cpp
+++ b/src/tcp_socket.cpp
@@ -213,7 +213,7 @@ int zmq::tcp_socket_t::read (void *data_, size_t size_)
errno_assert (nbytes != -1);
- // Orderly shutdown by the other peer.
+ // Orderly shutdown by the peer.
if (nbytes == 0)
return -1;