From 328c92a0a70b98b4a3bf09132bd8f8041e4c8628 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 20 Sep 2010 00:06:05 +0200 Subject: problem with engine being attached to session while it's being terminated fixed --- src/encoder.hpp | 2 +- src/session.cpp | 40 ++++++++++++++++++++++++++++++++++++++-- src/session.hpp | 2 ++ src/zmq_engine.hpp | 5 +---- 4 files changed, 42 insertions(+), 7 deletions(-) diff --git a/src/encoder.hpp b/src/encoder.hpp index 54cbb34..d5997db 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -47,7 +47,7 @@ namespace zmq zmq_assert (buf); } - // The destructor doesn't have to be virtual. It is mad virtual + // The destructor doesn't have to be virtual. It is made virtual // just to keep ICC and code checking tools from complaining. inline virtual ~encoder_base_t () { diff --git a/src/session.cpp b/src/session.cpp index d116518..4c448af 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -36,7 +36,8 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, socket (socket_), io_thread (io_thread_), attach_processed (false), - term_processed (false) + term_processed (false), + finalised (false) { } @@ -123,19 +124,46 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_, out_pipe->set_event_sink (this); } + // If we are already terminating, terminate the pipes straight away. + if (finalised) { + if (in_pipe) { + register_term_acks (1); + in_pipe->terminate (); + } + if (out_pipe) { + register_term_acks (1); + out_pipe->terminate (); + } + return; + } + attach_processed = true; finalise (); } void zmq::session_t::terminated (reader_t *pipe_) { + zmq_assert (in_pipe == pipe_); in_pipe = NULL; + + if (finalised) { + unregister_term_ack (); + return; + } + finalise (); } void zmq::session_t::terminated (writer_t *pipe_) { + zmq_assert (out_pipe == pipe_); out_pipe = NULL; + + if (finalised) { + unregister_term_ack (); + return; + } + finalise (); } @@ -173,8 +201,10 @@ void zmq::session_t::finalise () // 3. Both pipes have already terminated. Note that inbound pipe // is terminated after delimiter is read, i.e. all messages // were already sent to the wire. - if (term_processed && attach_processed && !in_pipe && !out_pipe) + if (term_processed && attach_processed && !in_pipe && !out_pipe) { + finalised = true; own_t::process_term (); + } } void zmq::session_t::process_attach (i_engine *engine_, @@ -188,6 +218,12 @@ void zmq::session_t::process_attach (i_engine *engine_, return; } + // If we are already terminating, we destroy the engine straight away. + if (finalised) { + delete engine; + return; + } + // Check whether the required pipes already exist. If not so, we'll // create them and bind them to the socket object. reader_t *socket_reader = NULL; diff --git a/src/session.hpp b/src/session.hpp index ff1e87b..6e6d2e6 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -123,6 +123,8 @@ namespace zmq // True if term command was already processed. bool term_processed; + bool finalised; + session_t (const session_t&); void operator = (const session_t&); }; diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 363eaaf..806e710 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -39,6 +39,7 @@ namespace zmq public: zmq_engine_t (fd_t fd_, const options_t &options_); + ~zmq_engine_t (); // i_engine interface implementation. void plug (class io_thread_t *io_thread_, struct i_inout *inout_); @@ -53,10 +54,6 @@ namespace zmq private: - // Destructor is not to be used directly. - // Use 'terminate' function instead. - ~zmq_engine_t (); - // Function to handle network disconnections. void error (); -- cgit v1.2.3