summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-05-25 10:25:51 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-05-25 10:25:51 +0200
commit87a6490b39c44e8f9c521f6ccea14f800a712d3f (patch)
tree3a4f5dceb1b737675e57a500e14a74d870d2bc63 /src
parent3d4203decf87a5d5fb1718c2163f6d9c6c24328d (diff)
All pipe termination code moved to pipe_t
Till now the code was spread over mutliple locations. Additionally, the code was made more formally correct, with explicit pipe state machine etc. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/pipe.cpp147
-rw-r--r--src/pipe.hpp29
-rw-r--r--src/session.cpp161
-rw-r--r--src/session.hpp27
-rw-r--r--src/socket_base.cpp4
5 files changed, 208 insertions, 160 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 3d0c0a6..73e5aae 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -62,9 +62,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
peers_msgs_read (0),
peer (NULL),
sink (NULL),
- terminating (false),
- term_recvd (false),
- delimited (false),
+ state (active),
delay (delay_)
{
}
@@ -89,7 +87,7 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
bool zmq::pipe_t::check_read ()
{
- if (unlikely (!in_active))
+ if (unlikely (!in_active || (state != active && state != pending)))
return false;
// Check if there's an item in the pipe.
@@ -104,13 +102,7 @@ bool zmq::pipe_t::check_read ()
msg_t msg;
bool ok = inpipe->read (&msg);
zmq_assert (ok);
- delimited = true;
-
- // If pipe_term was already received but wasn't processed because
- // of pending messages, we can ack it now.
- if (term_recvd)
- send_pipe_term_ack (peer);
-
+ delimit ();
return false;
}
@@ -119,7 +111,7 @@ bool zmq::pipe_t::check_read ()
bool zmq::pipe_t::read (msg_t *msg_)
{
- if (unlikely (!in_active))
+ if (unlikely (!in_active || (state != active && state != pending)))
return false;
if (!inpipe->read (msg_)) {
@@ -129,13 +121,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
// If delimiter was read, start termination process of the pipe.
if (msg_->is_delimiter ()) {
- delimited = true;
-
- // If pipe_term was already received but wasn't processed because
- // of pending messages, we can ack it now.
- if (term_recvd)
- send_pipe_term_ack (peer);
-
+ delimit ();
return false;
}
@@ -150,7 +136,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
bool zmq::pipe_t::check_write (msg_t *msg_)
{
- if (unlikely (!out_active))
+ if (unlikely (!out_active || state != active))
return false;
bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm);
@@ -188,13 +174,21 @@ void zmq::pipe_t::rollback ()
void zmq::pipe_t::flush ()
{
+ // If terminate() was already called do nothing.
+ if (state == terminated && state == double_terminated)
+ return;
+
+ // The peer does not exist anymore at this point.
+ if (state == terminating)
+ return;
+
if (!outpipe->flush ())
send_activate_read (peer);
}
void zmq::pipe_t::process_activate_read ()
{
- if (!in_active && !terminating) {
+ if (!in_active && (state == active || state == pending)) {
in_active = true;
sink->read_activated (this);
}
@@ -205,7 +199,7 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
// Remember the peers's message sequence number.
peers_msgs_read = msgs_read_;
- if (!out_active && !terminating) {
+ if (!out_active && state == active) {
out_active = true;
sink->write_activated (this);
}
@@ -213,16 +207,41 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
void zmq::pipe_t::process_pipe_term ()
{
- term_recvd = true;
-
- // We can proceed with the termination if one of the following is true:
- // 1. User asked this side of pipe to terminate already.
- // 2. Waiting for pending messages in not required.
- // 3. Delimiter was already received.
- if (terminating || !delay || delimited) {
- terminating = true;
+ // This is the simple case of peer-induced termination. If there are no
+ // more pending messages to read, or if the pipe was configured to drop
+ // pending messages, we can move directly to the terminating state.
+ // Otherwise we'll hang up in pending state till all the pending messages
+ // are sent.
+ if (state == active) {
+ if (!delay) {
+ state = terminating;
+ send_pipe_term_ack (peer);
+ }
+ else {
+ state = pending;
+ }
+ return;
+ }
+
+ // Delimiter happened to arrive before the term command. Now we have the
+ // term command as well, so we can move straight to terminating state.
+ if (state == delimited) {
+ state = terminating;
+ send_pipe_term_ack (peer);
+ return;
+ }
+
+ // This is the case where both ends of the pipe are closed in parallel.
+ // We simply reply to the request by ack and continue waiting for our
+ // own ack.
+ if (state == terminated) {
+ state = double_terminated;
send_pipe_term_ack (peer);
+ return;
}
+
+ // pipe_term is invalid in other states.
+ zmq_assert (false);
}
void zmq::pipe_t::process_pipe_term_ack ()
@@ -231,10 +250,16 @@ void zmq::pipe_t::process_pipe_term_ack ()
zmq_assert (sink);
sink->terminated (this);
- // If the peer haven't asked for the termination itself, we have to
- // ack the ack, so that it can deallocate properly.
- if (!term_recvd)
+ // In terminating and double_terminated states there's nothing to do.
+ // Simply deallocate the pipe. In terminated state we have to ack the
+ // peer before deallocating this side of the pipe. All the other states
+ // are invalid.
+ if (state == terminating) ;
+ else if (state == double_terminated);
+ else if (state == terminated)
send_pipe_term_ack (peer);
+ else
+ zmq_assert (false);
// We'll deallocate the inbound pipe, the peer will deallocate the outbound
// pipe (which is an inbound pipe from its point of view).
@@ -254,10 +279,40 @@ void zmq::pipe_t::process_pipe_term_ack ()
void zmq::pipe_t::terminate ()
{
- // Prevent double termination.
- if (terminating)
+ // If terminate was already called, we can ignore the duplicit invocation.
+ if (state == terminated || state == double_terminated)
return;
- terminating = true;
+
+ // If the pipe is in the final phase of async termination, it's going to
+ // closed anyway. No need to do anything special here.
+ else if (state == terminating)
+ return;
+
+ // The simple sync termination case. Ask the peer to terminate and wait
+ // for the ack.
+ else if (state == active) {
+ send_pipe_term (peer);
+ state = terminated;
+ }
+
+ // There are still pending messages available, but the user calls
+ // 'terminate'. We can act as if all the pending messages were read.
+ else if (state == pending) {
+ send_pipe_term_ack (peer);
+ state = terminating;
+ }
+
+ // We've already got delimiter, but not term command yet. We can ignore
+ // the delimiter and ack synchronously terminate as if we were in
+ // active state.
+ else if (state == delimited) {
+ send_pipe_term (peer);
+ state = terminated;
+ }
+
+ // There are no other states.
+ else
+ zmq_assert (false);
// Stop inbound and outbound flow of messages.
in_active = false;
@@ -272,9 +327,6 @@ void zmq::pipe_t::terminate ()
msg.init_delimiter ();
outpipe->write (msg, false);
flush ();
-
- // Start the termination handshaking.
- send_pipe_term (peer);
}
bool zmq::pipe_t::is_delimiter (msg_t &msg_)
@@ -309,3 +361,20 @@ int zmq::pipe_t::compute_lwm (int hwm_)
return result;
}
+
+void zmq::pipe_t::delimit ()
+{
+ if (state == active) {
+ state = delimited;
+ return;
+ }
+
+ if (state == pending) {
+ send_pipe_term_ack (peer);
+ state = terminating;
+ return;
+ }
+
+ // Delimiter in any other state is invalid.
+ zmq_assert (false);
+}
diff --git a/src/pipe.hpp b/src/pipe.hpp
index f821bdd..3087ab8 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -99,6 +99,9 @@ namespace zmq
void process_pipe_term ();
void process_pipe_term_ack ();
+ // Handler for delimiter read from the pipe.
+ void delimit ();
+
// Type of the underlying lock-free pipe.
typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t;
@@ -142,15 +145,23 @@ namespace zmq
// Sink to send events to.
i_pipe_events *sink;
- // True is 'terminate' method was called or termination request
- // was received from the peer.
- bool terminating;
-
- // True is we've already got pipe_term command from the peer.
- bool term_recvd;
-
- // True if delimiter was already received from the peer.
- bool delimited;
+ // State of the pipe endpoint. Active is common state before any
+ // termination begins. Delimited means that delimiter was read from
+ // pipe before term command was received. Pending means that term
+ // command was already received from the peer but there are still
+ // pending messages to read. Terminating means that all pending
+ // messages were already read and all we are waiting for is ack from
+ // the peer. Terminated means that 'terminate' was explicitly called
+ // by the user. Double_terminated means that user called 'terminate'
+ // and then we've got term command from the peer as well.
+ enum {
+ active,
+ delimited,
+ pending,
+ terminating,
+ terminated,
+ double_terminated
+ } state;
// If true, we receive all the pending inbound messages before
// terminating. If false, we terminate immediately when the peer
diff --git a/src/session.cpp b/src/session.cpp
index 5ef21c7..bff452e 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -31,47 +31,35 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
io_object_t (io_thread_),
pipe (NULL),
incomplete_in (false),
+ terminating (false),
engine (NULL),
socket (socket_),
io_thread (io_thread_),
- pipe_attached (false),
- delimiter_processed (false),
- force_terminate (false),
- has_linger_timer (false),
- state (active)
-{
+ has_linger_timer (false)
+{
}
zmq::session_t::~session_t ()
{
zmq_assert (!pipe);
- if (engine)
- engine->terminate ();
-}
-
-void zmq::session_t::proceed_with_term ()
-{
- if (state == terminating)
- return;
-
- zmq_assert (state == pending);
- state = terminating;
-
// If there's still a pending linger timer, remove it.
if (has_linger_timer) {
cancel_timer (linger_timer_id);
has_linger_timer = false;
}
- if (pipe) {
- register_term_acks (1);
- pipe->terminate ();
- }
+ // Close the engine.
+ if (engine)
+ engine->terminate ();
+}
- // The session has already waited for the linger period. We don't want
- // the child objects to linger any more thus linger is set to zero.
- own_t::process_term (0);
+void zmq::session_t::attach_pipe (pipe_t *pipe_)
+{
+ zmq_assert (!pipe);
+ zmq_assert (pipe_);
+ pipe = pipe_;
+ pipe->set_event_sink (this);
}
bool zmq::session_t::read (msg_t *msg_)
@@ -127,37 +115,13 @@ void zmq::session_t::clean_pipes ()
}
}
-void zmq::session_t::attach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
-{
- zmq_assert (!pipe_attached);
- pipe_attached = true;
-
- if (pipe_) {
- zmq_assert (!pipe);
- pipe = pipe_;
- pipe->set_event_sink (this);
- }
-
- // If we are already terminating, terminate the pipes straight away.
- if (state == terminating) {
- if (pipe) {
- pipe->terminate ();
- register_term_acks (1);
- }
- }
-}
-
void zmq::session_t::terminated (pipe_t *pipe_)
{
+ // Drop the reference to the deallocated pipe.
zmq_assert (pipe == pipe_);
-
- // If we are in process of being closed, but still waiting for all
- // pending messeges being sent, we can terminate here.
- if (state == pending)
- proceed_with_term ();
-
pipe = NULL;
- if (state == terminating)
+
+ if (terminating)
unregister_term_ack ();
}
@@ -189,7 +153,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
// If we are already terminating, we destroy the engine straight away.
// Note that we don't have to unplug it before deleting as it's not
// yet plugged to the session.
- if (state == terminating) {
+ if (terminating) {
if (engine_)
delete engine_;
return;
@@ -209,12 +173,8 @@ void zmq::session_t::process_attach (i_engine *engine_,
return;
}
- // Check whether the required pipe already exists and create it
- // if it does not.
- if (!pipe_attached) {
- zmq_assert (!pipe);
- pipe_attached = true;
-
+ // Create the pipe if it does not exist yet.
+ if (!pipe) {
object_t *parents [2] = {this, socket};
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.rcvhwm, options.sndhwm};
@@ -226,6 +186,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
pipes [0]->set_event_sink (this);
// Remember the local end of the pipe.
+ zmq_assert (!pipe);
pipe = pipes [0];
// Ask socket to plug into the remote end of the pipe.
@@ -249,43 +210,45 @@ void zmq::session_t::detach ()
// Send the event to the derived class.
detached ();
- // Just in case there's only a delimiter in the inbound pipe.
+ // Just in case there's only a delimiter in the pipe.
if (pipe)
pipe->check_read ();
}
void zmq::session_t::process_term (int linger_)
{
- zmq_assert (state == active);
- state = pending;
-
- // If linger is set to zero, we can terminate the session straight away
- // not waiting for the pending messages to be sent.
- if (linger_ == 0) {
- proceed_with_term ();
- return;
+ // If termination is already underway, do nothing.
+ if (!terminating) {
+
+ terminating = true;
+
+ // If the termination of the pipe happens before the term command is
+ // delivered there's nothing much to do. We can proceed with the
+ // stadard termination immediately.
+ if (pipe) {
+
+ // We're going to wait till the pipe terminates.
+ register_term_acks (1);
+
+ // If linger is set to zero, we can ask pipe to terminate without
+ // waiting for pending messages to be read.
+ if (linger_ == 0)
+ pipe->terminate ();
+
+ // If there's finite linger value, set up a timer.
+ if (linger_ > 0) {
+ zmq_assert (!has_linger_timer);
+ add_timer (linger_, linger_timer_id);
+ has_linger_timer = true;
+ }
+
+ // In case there's no engine and there's only delimiter in the pipe it
+ // wouldn't be ever read. Thus we check for it explicitly.
+ pipe->check_read ();
+ }
}
- // If there's finite linger value, set up a timer.
- if (linger_ > 0) {
- zmq_assert (!has_linger_timer);
- add_timer (linger_, linger_timer_id);
- has_linger_timer = true;
- }
-
- // If there's no engine and there's only delimiter in the pipe it wouldn't
- // be ever read. Thus we check for it explicitly.
- if (pipe)
- pipe->check_read ();
-
- // If there's no in pipe, there are no pending messages to send.
- // We can proceed with the shutdown straight away. Also, if there is
- // pipe, but the delimiter was already processed, we can terminate
- // immediately. Alternatively, if the derived session type have
- // called 'terminate' we'll finish straight away.
- if (delimiter_processed || force_terminate ||
- (!options.immediate_connect && !pipe))
- proceed_with_term ();
+ own_t::process_term (0);
}
void zmq::session_t::timer_event (int id_)
@@ -294,7 +257,10 @@ void zmq::session_t::timer_event (int id_)
// there are still pending messages to be sent.
zmq_assert (id_ == linger_timer_id);
has_linger_timer = false;
- proceed_with_term ();
+
+ // Ask pipe to terminate even though there may be pending messages in it.
+ zmq_assert (pipe);
+ pipe->terminate ();
}
bool zmq::session_t::has_engine ()
@@ -314,6 +280,19 @@ void zmq::session_t::unregister_session (const blob_t &name_)
void zmq::session_t::terminate ()
{
- force_terminate = true;
- own_t::terminate ();
+ // If termination process is already underway, do nothing.
+ if (!terminating) {
+ terminating = true;
+
+ // If the pipe was already terminated, there's nothing much to do.
+ // If it wasn't, we'll ask it to terminate.
+ if (pipe) {
+
+ register_term_acks (1);
+ pipe->terminate ();
+ }
+ }
+
+ own_t::terminate ();
}
+
diff --git a/src/session.hpp b/src/session.hpp
index 4a12d68..f1564d8 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -41,6 +41,9 @@ namespace zmq
session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_);
+ // To be used once only, when creating the session.
+ void attach_pipe (class pipe_t *pipe_);
+
// i_inout interface implementation. Note that detach method is not
// implemented by generic session. Different session types may handle
// engine disconnection in different ways.
@@ -49,8 +52,6 @@ namespace zmq
void flush ();
void detach ();
- void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
-
// i_pipe_events interface implementation.
void read_activated (class pipe_t *pipe_);
void write_activated (class pipe_t *pipe_);
@@ -59,7 +60,7 @@ namespace zmq
protected:
// This function allows to shut down the session even though
- // there are pending messages in the inbound pipe.
+ // there are messages pending.
void terminate ();
// Two events for the derived session type. Attached is triggered
@@ -104,6 +105,10 @@ namespace zmq
// is still in the in pipe.
bool incomplete_in;
+ // If true the termination process is already underway, ie. term ack
+ // for the pipe was already registered etc.
+ bool terminating;
+
// The protocol I/O engine connected to the session.
struct i_engine *engine;
@@ -114,28 +119,12 @@ namespace zmq
// the engines into the same thread.
class io_thread_t *io_thread;
- // If true, pipe was already attached to this session.
- bool pipe_attached;
-
- // If true, delimiter was already read from the inbound pipe.
- bool delimiter_processed;
-
- // If true, we should terminate the session even though there are
- // pending messages in the inbound pipe.
- bool force_terminate;
-
// ID of the linger timer
enum {linger_timer_id = 0x20};
// True is linger timer is running.
bool has_linger_timer;
- enum {
- active,
- pending,
- terminating
- } state;
-
session_t (const session_t&);
const session_t &operator = (const session_t&);
};
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index fae55f2..1682c05 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -450,8 +450,8 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach local end of the pipe to the socket object.
attach_pipe (pipes [0], blob_t ());
- // Attach remote end of the pipe to the session object.
- session->attach_pipe (pipes [1], blob_t ());
+ // Attach remote end of the pipe to the session object later on.
+ session->attach_pipe (pipes [1]);
}
// Activate the session. Make it a child of this socket.