summaryrefslogtreecommitdiff
path: root/src/session.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-09-20 00:06:05 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-09-20 00:06:05 +0200
commit328c92a0a70b98b4a3bf09132bd8f8041e4c8628 (patch)
tree76dddb89390fd6a362d10c6d11caaccf52e398ad /src/session.cpp
parent1d2399720b3fd06da5e7f9f4a211f30c57a9ce2d (diff)
problem with engine being attached to session while it's being terminated fixed
Diffstat (limited to 'src/session.cpp')
-rw-r--r--src/session.cpp40
1 files changed, 38 insertions, 2 deletions
diff --git a/src/session.cpp b/src/session.cpp
index d116518..4c448af 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -36,7 +36,8 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
socket (socket_),
io_thread (io_thread_),
attach_processed (false),
- term_processed (false)
+ term_processed (false),
+ finalised (false)
{
}
@@ -123,19 +124,46 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_,
out_pipe->set_event_sink (this);
}
+ // If we are already terminating, terminate the pipes straight away.
+ if (finalised) {
+ if (in_pipe) {
+ register_term_acks (1);
+ in_pipe->terminate ();
+ }
+ if (out_pipe) {
+ register_term_acks (1);
+ out_pipe->terminate ();
+ }
+ return;
+ }
+
attach_processed = true;
finalise ();
}
void zmq::session_t::terminated (reader_t *pipe_)
{
+ zmq_assert (in_pipe == pipe_);
in_pipe = NULL;
+
+ if (finalised) {
+ unregister_term_ack ();
+ return;
+ }
+
finalise ();
}
void zmq::session_t::terminated (writer_t *pipe_)
{
+ zmq_assert (out_pipe == pipe_);
out_pipe = NULL;
+
+ if (finalised) {
+ unregister_term_ack ();
+ return;
+ }
+
finalise ();
}
@@ -173,8 +201,10 @@ void zmq::session_t::finalise ()
// 3. Both pipes have already terminated. Note that inbound pipe
// is terminated after delimiter is read, i.e. all messages
// were already sent to the wire.
- if (term_processed && attach_processed && !in_pipe && !out_pipe)
+ if (term_processed && attach_processed && !in_pipe && !out_pipe) {
+ finalised = true;
own_t::process_term ();
+ }
}
void zmq::session_t::process_attach (i_engine *engine_,
@@ -188,6 +218,12 @@ void zmq::session_t::process_attach (i_engine *engine_,
return;
}
+ // If we are already terminating, we destroy the engine straight away.
+ if (finalised) {
+ delete engine;
+ return;
+ }
+
// Check whether the required pipes already exist. If not so, we'll
// create them and bind them to the socket object.
reader_t *socket_reader = NULL;