From 99c5d9283622a0b37ee80f83ff4875c059fc5990 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 27 Aug 2009 10:54:28 +0200 Subject: pipes added --- src/session.cpp | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 4 deletions(-) (limited to 'src/session.cpp') diff --git a/src/session.cpp b/src/session.cpp index fc1f858..115fb85 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -20,12 +20,17 @@ #include "session.hpp" #include "zmq_engine.hpp" #include "err.hpp" +#include "pipe.hpp" zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const char *name_) : + const char *name_, const options_t &options_) : owned_t (parent_, owner_), + in_pipe (NULL), + active (false), + out_pipe (NULL), engine (NULL), - name (name_) + name (name_), + options (options_) { } @@ -33,18 +38,48 @@ zmq::session_t::~session_t () { } +void zmq::session_t::set_inbound_pipe (reader_t *pipe_) +{ + zmq_assert (!in_pipe); + in_pipe = pipe_; + active = true; + in_pipe->set_endpoint (this); +} +void zmq::session_t::set_outbound_pipe (writer_t *pipe_) +{ + zmq_assert (!out_pipe); + out_pipe = pipe_; +} + + bool zmq::session_t::read (::zmq_msg_t *msg_) { - return false; + if (!active) + return false; + + bool fetched = in_pipe->read (msg_); + if (!fetched) + active = false; + + return fetched; } bool zmq::session_t::write (::zmq_msg_t *msg_) { - return false; + return out_pipe->write (msg_); } void zmq::session_t::flush () { + out_pipe->flush (); +} + +void zmq::session_t::revive (reader_t *pipe_) +{ + zmq_assert (in_pipe == pipe_); + active = true; + if (engine) + engine->revive (); } void zmq::session_t::process_plug () @@ -56,6 +91,19 @@ void zmq::session_t::process_plug () // We should syslog it and drop the session. TODO zmq_assert (ok); + // If session is created by 'connect' function, it has the pipes set + // already. Otherwise, it's being created by the listener and the pipes + // are yet to be created. + if (!in_pipe && !out_pipe) { + pipe_t *inbound = new pipe_t (this, owner, options.hwm, options.lwm); + zmq_assert (inbound); + in_pipe = &inbound->reader; + pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm); + zmq_assert (outbound); + out_pipe = &outbound->writer; + send_bind (owner, this, &outbound->reader, &inbound->writer); + } + owned_t::process_plug (); } -- cgit v1.2.3