summaryrefslogtreecommitdiff
path: root/src/session.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/session.cpp')
-rw-r--r--src/session.cpp62
1 files changed, 32 insertions, 30 deletions
diff --git a/src/session.cpp b/src/session.cpp
index f798877..86086fb 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -69,13 +69,22 @@ zmq::session_t::~session_t ()
zmq_assert (!out_pipe);
}
+bool zmq::session_t::is_terminable ()
+{
+ return in_pipe->is_terminating ();
+}
+
bool zmq::session_t::read (::zmq_msg_t *msg_)
{
if (!in_pipe || !active)
return false;
- if (!in_pipe->read (msg_))
+ if (!in_pipe->read (msg_)) {
+ active = false;
+ if (in_pipe->is_terminating ())
+ finalise ();
return false;
+ }
incomplete_in = msg_->flags & ZMQ_MSG_MORE;
return true;
@@ -156,33 +165,28 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_,
zmq_assert (!in_pipe);
in_pipe = inpipe_;
active = true;
- in_pipe->set_endpoint (this);
+ in_pipe->set_event_sink (this);
}
if (outpipe_) {
zmq_assert (!out_pipe);
out_pipe = outpipe_;
- out_pipe->set_endpoint (this);
+ out_pipe->set_event_sink (this);
}
}
-void zmq::session_t::detach_inpipe (reader_t *pipe_)
+void zmq::session_t::terminated (reader_t *pipe_)
{
active = false;
in_pipe = NULL;
}
-void zmq::session_t::detach_outpipe (writer_t *pipe_)
+void zmq::session_t::terminated (writer_t *pipe_)
{
out_pipe = NULL;
}
-void zmq::session_t::kill (reader_t *pipe_)
-{
- active = false;
-}
-
-void zmq::session_t::revive (reader_t *pipe_)
+void zmq::session_t::activated (reader_t *pipe_)
{
zmq_assert (in_pipe == pipe_);
active = true;
@@ -190,7 +194,7 @@ void zmq::session_t::revive (reader_t *pipe_)
engine->revive ();
}
-void zmq::session_t::revive (writer_t *pipe_)
+void zmq::session_t::activated (writer_t *pipe_)
{
zmq_assert (out_pipe == pipe_);
if (engine)
@@ -203,6 +207,11 @@ void zmq::session_t::process_plug ()
void zmq::session_t::process_unplug ()
{
+ // TODO: There may be a problem here. The called ensures that all the
+ // commands on the fly have been delivered. However, given that the
+ // session is unregistered from the global repository only at this point
+ // there may be some commands being sent to the session right now.
+
// Unregister the session from the socket.
if (ordinal)
owner->unregister_session (ordinal);
@@ -210,14 +219,10 @@ void zmq::session_t::process_unplug ()
owner->unregister_session (peer_identity);
// Ask associated pipes to terminate.
- if (in_pipe) {
- in_pipe->term ();
- in_pipe = NULL;
- }
- if (out_pipe) {
- out_pipe->term ();
- out_pipe = NULL;
- }
+ if (in_pipe)
+ in_pipe->terminate ();
+ if (out_pipe)
+ out_pipe->terminate ();
if (engine) {
engine->unplug ();
@@ -265,19 +270,15 @@ void zmq::session_t::process_attach (i_engine *engine_,
writer_t *socket_writer = NULL;
if (options.requires_in && !out_pipe) {
- pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm, options.swap);
- zmq_assert (pipe);
- out_pipe = &pipe->writer;
- out_pipe->set_endpoint (this);
- socket_reader = &pipe->reader;
+ create_pipe (owner, this, options.hwm, options.swap, &socket_reader,
+ &out_pipe);
+ out_pipe->set_event_sink (this);
}
if (options.requires_out && !in_pipe) {
- pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm, options.swap);
- zmq_assert (pipe);
- in_pipe = &pipe->reader;
- in_pipe->set_endpoint (this);
- socket_writer = &pipe->writer;
+ create_pipe (this, owner, options.hwm, options.swap, &in_pipe,
+ &socket_writer);
+ in_pipe->set_event_sink (this);
}
if (socket_reader || socket_writer)
@@ -289,3 +290,4 @@ void zmq::session_t::process_attach (i_engine *engine_,
engine = engine_;
engine->plug (this);
}
+