diff options
Diffstat (limited to 'src/session.cpp')
-rw-r--r-- | src/session.cpp | 62 |
1 files changed, 32 insertions, 30 deletions
diff --git a/src/session.cpp b/src/session.cpp index f798877..86086fb 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -69,13 +69,22 @@ zmq::session_t::~session_t () zmq_assert (!out_pipe); } +bool zmq::session_t::is_terminable () +{ + return in_pipe->is_terminating (); +} + bool zmq::session_t::read (::zmq_msg_t *msg_) { if (!in_pipe || !active) return false; - if (!in_pipe->read (msg_)) + if (!in_pipe->read (msg_)) { + active = false; + if (in_pipe->is_terminating ()) + finalise (); return false; + } incomplete_in = msg_->flags & ZMQ_MSG_MORE; return true; @@ -156,33 +165,28 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_, zmq_assert (!in_pipe); in_pipe = inpipe_; active = true; - in_pipe->set_endpoint (this); + in_pipe->set_event_sink (this); } if (outpipe_) { zmq_assert (!out_pipe); out_pipe = outpipe_; - out_pipe->set_endpoint (this); + out_pipe->set_event_sink (this); } } -void zmq::session_t::detach_inpipe (reader_t *pipe_) +void zmq::session_t::terminated (reader_t *pipe_) { active = false; in_pipe = NULL; } -void zmq::session_t::detach_outpipe (writer_t *pipe_) +void zmq::session_t::terminated (writer_t *pipe_) { out_pipe = NULL; } -void zmq::session_t::kill (reader_t *pipe_) -{ - active = false; -} - -void zmq::session_t::revive (reader_t *pipe_) +void zmq::session_t::activated (reader_t *pipe_) { zmq_assert (in_pipe == pipe_); active = true; @@ -190,7 +194,7 @@ void zmq::session_t::revive (reader_t *pipe_) engine->revive (); } -void zmq::session_t::revive (writer_t *pipe_) +void zmq::session_t::activated (writer_t *pipe_) { zmq_assert (out_pipe == pipe_); if (engine) @@ -203,6 +207,11 @@ void zmq::session_t::process_plug () void zmq::session_t::process_unplug () { + // TODO: There may be a problem here. The called ensures that all the + // commands on the fly have been delivered. However, given that the + // session is unregistered from the global repository only at this point + // there may be some commands being sent to the session right now. + // Unregister the session from the socket. if (ordinal) owner->unregister_session (ordinal); @@ -210,14 +219,10 @@ void zmq::session_t::process_unplug () owner->unregister_session (peer_identity); // Ask associated pipes to terminate. - if (in_pipe) { - in_pipe->term (); - in_pipe = NULL; - } - if (out_pipe) { - out_pipe->term (); - out_pipe = NULL; - } + if (in_pipe) + in_pipe->terminate (); + if (out_pipe) + out_pipe->terminate (); if (engine) { engine->unplug (); @@ -265,19 +270,15 @@ void zmq::session_t::process_attach (i_engine *engine_, writer_t *socket_writer = NULL; if (options.requires_in && !out_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm, options.swap); - zmq_assert (pipe); - out_pipe = &pipe->writer; - out_pipe->set_endpoint (this); - socket_reader = &pipe->reader; + create_pipe (owner, this, options.hwm, options.swap, &socket_reader, + &out_pipe); + out_pipe->set_event_sink (this); } if (options.requires_out && !in_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm, options.swap); - zmq_assert (pipe); - in_pipe = &pipe->reader; - in_pipe->set_endpoint (this); - socket_writer = &pipe->writer; + create_pipe (this, owner, options.hwm, options.swap, &in_pipe, + &socket_writer); + in_pipe->set_event_sink (this); } if (socket_reader || socket_writer) @@ -289,3 +290,4 @@ void zmq::session_t::process_attach (i_engine *engine_, engine = engine_; engine->plug (this); } + |