summaryrefslogtreecommitdiff
path: root/src/session.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/session.cpp')
-rw-r--r--src/session.cpp56
1 files changed, 52 insertions, 4 deletions
diff --git a/src/session.cpp b/src/session.cpp
index fc1f858..115fb85 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -20,12 +20,17 @@
#include "session.hpp"
#include "zmq_engine.hpp"
#include "err.hpp"
+#include "pipe.hpp"
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
- const char *name_) :
+ const char *name_, const options_t &options_) :
owned_t (parent_, owner_),
+ in_pipe (NULL),
+ active (false),
+ out_pipe (NULL),
engine (NULL),
- name (name_)
+ name (name_),
+ options (options_)
{
}
@@ -33,18 +38,48 @@ zmq::session_t::~session_t ()
{
}
+void zmq::session_t::set_inbound_pipe (reader_t *pipe_)
+{
+ zmq_assert (!in_pipe);
+ in_pipe = pipe_;
+ active = true;
+ in_pipe->set_endpoint (this);
+}
+void zmq::session_t::set_outbound_pipe (writer_t *pipe_)
+{
+ zmq_assert (!out_pipe);
+ out_pipe = pipe_;
+}
+
+
bool zmq::session_t::read (::zmq_msg_t *msg_)
{
- return false;
+ if (!active)
+ return false;
+
+ bool fetched = in_pipe->read (msg_);
+ if (!fetched)
+ active = false;
+
+ return fetched;
}
bool zmq::session_t::write (::zmq_msg_t *msg_)
{
- return false;
+ return out_pipe->write (msg_);
}
void zmq::session_t::flush ()
{
+ out_pipe->flush ();
+}
+
+void zmq::session_t::revive (reader_t *pipe_)
+{
+ zmq_assert (in_pipe == pipe_);
+ active = true;
+ if (engine)
+ engine->revive ();
}
void zmq::session_t::process_plug ()
@@ -56,6 +91,19 @@ void zmq::session_t::process_plug ()
// We should syslog it and drop the session. TODO
zmq_assert (ok);
+ // If session is created by 'connect' function, it has the pipes set
+ // already. Otherwise, it's being created by the listener and the pipes
+ // are yet to be created.
+ if (!in_pipe && !out_pipe) {
+ pipe_t *inbound = new pipe_t (this, owner, options.hwm, options.lwm);
+ zmq_assert (inbound);
+ in_pipe = &inbound->reader;
+ pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm);
+ zmq_assert (outbound);
+ out_pipe = &outbound->writer;
+ send_bind (owner, this, &outbound->reader, &inbound->writer);
+ }
+
owned_t::process_plug ();
}