summaryrefslogtreecommitdiff
path: root/src/session.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/session.cpp')
-rw-r--r--src/session.cpp36
1 files changed, 24 insertions, 12 deletions
diff --git a/src/session.cpp b/src/session.cpp
index b829ae9..eb0a963 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -43,7 +43,7 @@ zmq::session_t::~session_t ()
bool zmq::session_t::read (::zmq_msg_t *msg_)
{
- if (!active)
+ if (!in_pipe || !active)
return false;
return in_pipe->read (msg_);
@@ -51,8 +51,9 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
bool zmq::session_t::write (::zmq_msg_t *msg_)
{
- if (!out_pipe)
- return true;
+ // The communication is unidirectional.
+ // We don't expect any message to arrive.
+ zmq_assert (out_pipe);
if (out_pipe->write (msg_)) {
zmq_msg_init (msg_);
@@ -136,15 +137,26 @@ void zmq::session_t::process_plug ()
// already. Otherwise, it's being created by the listener and the pipes
// are yet to be created.
if (!in_pipe && !out_pipe) {
- pipe_t *inbound = new pipe_t (this, owner, options.hwm, options.lwm);
- zmq_assert (inbound);
- in_pipe = &inbound->reader;
- in_pipe->set_endpoint (this);
- pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm);
- zmq_assert (outbound);
- out_pipe = &outbound->writer;
- out_pipe->set_endpoint (this);
- send_bind (owner, this, &outbound->reader, &inbound->writer);
+
+ pipe_t *inbound = NULL;
+ pipe_t *outbound = NULL;
+
+ if (options.requires_out) {
+ inbound = new pipe_t (this, owner, options.hwm, options.lwm);
+ zmq_assert (inbound);
+ in_pipe = &inbound->reader;
+ in_pipe->set_endpoint (this);
+ }
+
+ if (options.requires_in) {
+ outbound = new pipe_t (owner, this, options.hwm, options.lwm);
+ zmq_assert (outbound);
+ out_pipe = &outbound->writer;
+ out_pipe->set_endpoint (this);
+ }
+
+ send_bind (owner, this, outbound ? &outbound->reader : NULL,
+ inbound ? &inbound->writer : NULL);
}
owned_t::process_plug ();