summaryrefslogtreecommitdiff
path: root/src/session.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-05-22 17:26:53 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-05-22 17:26:53 +0200
commitacf0b0e515515e51ad32ba7a2d147ce703579478 (patch)
treed2032009cf46c23aa0f677c2216914f718ab968a /src/session.cpp
parent9e6b39925603f9e64db08c469bd628d7ef9465de (diff)
Introduces bi-directional pipes
So far, there was a pair of unidirectional pipes between a socket and a session (or an inproc peer). This resulted in complex problems with half-closed states and tracking which inpipe corresponds to which outpipe. This patch doesn't add any functionality in itself, but is essential for further work on features like subscription forwarding. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/session.cpp')
-rw-r--r--src/session.cpp167
1 files changed, 67 insertions, 100 deletions
diff --git a/src/session.cpp b/src/session.cpp
index 499fe40..5ef21c7 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -29,13 +29,12 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
- in_pipe (NULL),
+ pipe (NULL),
incomplete_in (false),
- out_pipe (NULL),
engine (NULL),
socket (socket_),
io_thread (io_thread_),
- pipes_attached (false),
+ pipe_attached (false),
delimiter_processed (false),
force_terminate (false),
has_linger_timer (false),
@@ -45,8 +44,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
zmq::session_t::~session_t ()
{
- zmq_assert (!in_pipe);
- zmq_assert (!out_pipe);
+ zmq_assert (!pipe);
if (engine)
engine->terminate ();
@@ -66,13 +64,9 @@ void zmq::session_t::proceed_with_term ()
has_linger_timer = false;
}
- if (in_pipe) {
+ if (pipe) {
register_term_acks (1);
- in_pipe->terminate ();
- }
- if (out_pipe) {
- register_term_acks (1);
- out_pipe->terminate ();
+ pipe->terminate ();
}
// The session has already waited for the linger period. We don't want
@@ -82,10 +76,10 @@ void zmq::session_t::proceed_with_term ()
bool zmq::session_t::read (msg_t *msg_)
{
- if (!in_pipe)
+ if (!pipe)
return false;
- if (!in_pipe->read (msg_))
+ if (!pipe->read (msg_))
return false;
incomplete_in = msg_->flags () & msg_t::more;
@@ -94,7 +88,7 @@ bool zmq::session_t::read (msg_t *msg_)
bool zmq::session_t::write (msg_t *msg_)
{
- if (out_pipe && out_pipe->write (msg_)) {
+ if (pipe && pipe->write (msg_)) {
int rc = msg_->init ();
errno_assert (rc == 0);
return true;
@@ -105,21 +99,20 @@ bool zmq::session_t::write (msg_t *msg_)
void zmq::session_t::flush ()
{
- if (out_pipe)
- out_pipe->flush ();
+ if (pipe)
+ pipe->flush ();
}
void zmq::session_t::clean_pipes ()
{
- // Get rid of half-processed messages in the out pipe. Flush any
- // unflushed messages upstream.
- if (out_pipe) {
- out_pipe->rollback ();
- out_pipe->flush ();
- }
+ if (pipe) {
- // Remove any half-read message from the in pipe.
- if (in_pipe) {
+ // Get rid of half-processed messages in the out pipe. Flush any
+ // unflushed messages upstream.
+ pipe->rollback ();
+ pipe->flush ();
+
+ // Remove any half-read message from the in pipe.
while (incomplete_in) {
msg_t msg;
int rc = msg.init ();
@@ -134,78 +127,54 @@ void zmq::session_t::clean_pipes ()
}
}
-void zmq::session_t::attach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_, const blob_t &peer_identity_)
+void zmq::session_t::attach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
- zmq_assert (!pipes_attached);
- pipes_attached = true;
+ zmq_assert (!pipe_attached);
+ pipe_attached = true;
- if (inpipe_) {
- zmq_assert (!in_pipe);
- in_pipe = inpipe_;
- in_pipe->set_event_sink (this);
- }
-
- if (outpipe_) {
- zmq_assert (!out_pipe);
- out_pipe = outpipe_;
- out_pipe->set_event_sink (this);
+ if (pipe_) {
+ zmq_assert (!pipe);
+ pipe = pipe_;
+ pipe->set_event_sink (this);
}
// If we are already terminating, terminate the pipes straight away.
if (state == terminating) {
- if (in_pipe) {
- in_pipe->terminate ();
- register_term_acks (1);
- }
- if (out_pipe) {
- out_pipe->terminate ();
+ if (pipe) {
+ pipe->terminate ();
register_term_acks (1);
}
}
}
-void zmq::session_t::delimited (reader_t *pipe_)
+void zmq::session_t::terminated (pipe_t *pipe_)
{
- zmq_assert (in_pipe == pipe_);
- zmq_assert (!delimiter_processed);
- delimiter_processed = true;
+ zmq_assert (pipe == pipe_);
- // If we are in process of being closed, but still waiting for all
- // pending messeges being sent, we can terminate here.
+ // If we are in process of being closed, but still waiting for all
+ // pending messeges being sent, we can terminate here.
if (state == pending)
proceed_with_term ();
-}
-void zmq::session_t::terminated (reader_t *pipe_)
-{
- zmq_assert (in_pipe == pipe_);
- in_pipe = NULL;
+ pipe = NULL;
if (state == terminating)
unregister_term_ack ();
}
-void zmq::session_t::terminated (writer_t *pipe_)
+void zmq::session_t::read_activated (pipe_t *pipe_)
{
- zmq_assert (out_pipe == pipe_);
- out_pipe = NULL;
- if (state == terminating)
- unregister_term_ack ();
-}
-
-void zmq::session_t::activated (reader_t *pipe_)
-{
- zmq_assert (in_pipe == pipe_);
+ zmq_assert (pipe == pipe_);
if (likely (engine != NULL))
engine->activate_out ();
else
- in_pipe->check_read ();
+ pipe->check_read ();
}
-void zmq::session_t::activated (writer_t *pipe_)
+void zmq::session_t::write_activated (pipe_t *pipe_)
{
- zmq_assert (out_pipe == pipe_);
+ zmq_assert (pipe == pipe_);
+
if (engine)
engine->activate_in ();
}
@@ -240,29 +209,27 @@ void zmq::session_t::process_attach (i_engine *engine_,
return;
}
- // Check whether the required pipes already exist. If not so, we'll
- // create them and bind them to the socket object.
- if (!pipes_attached) {
- zmq_assert (!in_pipe && !out_pipe);
- pipes_attached = true;
- reader_t *socket_reader = NULL;
- writer_t *socket_writer = NULL;
-
- // Create the pipes, as required.
- if (options.requires_in) {
- create_pipe (socket, this, options.rcvhwm, &socket_reader,
- &out_pipe);
- out_pipe->set_event_sink (this);
- }
- if (options.requires_out) {
- create_pipe (this, socket, options.sndhwm, &in_pipe,
- &socket_writer);
- in_pipe->set_event_sink (this);
- }
+ // Check whether the required pipe already exists and create it
+ // if it does not.
+ if (!pipe_attached) {
+ zmq_assert (!pipe);
+ pipe_attached = true;
+
+ object_t *parents [2] = {this, socket};
+ pipe_t *pipes [2] = {NULL, NULL};
+ int hwms [2] = {options.rcvhwm, options.sndhwm};
+ bool delays [2] = {true, true};
+ int rc = pipepair (parents, pipes, hwms, delays);
+ errno_assert (rc == 0);
+
+ // Plug the local end of the pipe.
+ pipes [0]->set_event_sink (this);
+
+ // Remember the local end of the pipe.
+ pipe = pipes [0];
- // Bind the pipes to the socket object.
- if (socket_reader || socket_writer)
- send_bind (socket, socket_reader, socket_writer, peer_identity_);
+ // Ask socket to plug into the remote end of the pipe.
+ send_bind (socket, pipes [1], peer_identity_);
}
// Plug in the engine.
@@ -282,9 +249,9 @@ void zmq::session_t::detach ()
// Send the event to the derived class.
detached ();
- // Just in case, there's only a delimiter in the inbound pipe.
- if (in_pipe)
- in_pipe->check_read ();
+ // Just in case there's only a delimiter in the inbound pipe.
+ if (pipe)
+ pipe->check_read ();
}
void zmq::session_t::process_term (int linger_)
@@ -308,16 +275,16 @@ void zmq::session_t::process_term (int linger_)
// If there's no engine and there's only delimiter in the pipe it wouldn't
// be ever read. Thus we check for it explicitly.
- if (in_pipe)
- in_pipe->check_read ();
+ if (pipe)
+ pipe->check_read ();
- // If there's no in pipe there are no pending messages to send.
+ // If there's no in pipe, there are no pending messages to send.
// We can proceed with the shutdown straight away. Also, if there is
- // inbound pipe, but the delimiter was already processed, we can
- // terminate immediately. Alternatively, if the derived session type have
+ // pipe, but the delimiter was already processed, we can terminate
+ // immediately. Alternatively, if the derived session type have
// called 'terminate' we'll finish straight away.
- if (!options.requires_out || delimiter_processed || force_terminate ||
- (!options.immediate_connect && !in_pipe))
+ if (delimiter_processed || force_terminate ||
+ (!options.immediate_connect && !pipe))
proceed_with_term ();
}