diff options
author | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-09-21 17:20:13 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-09-21 17:20:13 +0200 |
commit | b15f695976d21300beabc3e0ecef87c1a0b4dc4c (patch) | |
tree | 09513a17251be5bc8d132b8d00cbf2b893bcf57a /src/session.cpp | |
parent | cb1b6fe32cbf3c7cf5961bb4156f2de743693a3a (diff) |
different fixes to req/rep
Diffstat (limited to 'src/session.cpp')
-rw-r--r-- | src/session.cpp | 36 |
1 files changed, 24 insertions, 12 deletions
diff --git a/src/session.cpp b/src/session.cpp index b829ae9..eb0a963 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -43,7 +43,7 @@ zmq::session_t::~session_t () bool zmq::session_t::read (::zmq_msg_t *msg_) { - if (!active) + if (!in_pipe || !active) return false; return in_pipe->read (msg_); @@ -51,8 +51,9 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) bool zmq::session_t::write (::zmq_msg_t *msg_) { - if (!out_pipe) - return true; + // The communication is unidirectional. + // We don't expect any message to arrive. + zmq_assert (out_pipe); if (out_pipe->write (msg_)) { zmq_msg_init (msg_); @@ -136,15 +137,26 @@ void zmq::session_t::process_plug () // already. Otherwise, it's being created by the listener and the pipes // are yet to be created. if (!in_pipe && !out_pipe) { - pipe_t *inbound = new pipe_t (this, owner, options.hwm, options.lwm); - zmq_assert (inbound); - in_pipe = &inbound->reader; - in_pipe->set_endpoint (this); - pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm); - zmq_assert (outbound); - out_pipe = &outbound->writer; - out_pipe->set_endpoint (this); - send_bind (owner, this, &outbound->reader, &inbound->writer); + + pipe_t *inbound = NULL; + pipe_t *outbound = NULL; + + if (options.requires_out) { + inbound = new pipe_t (this, owner, options.hwm, options.lwm); + zmq_assert (inbound); + in_pipe = &inbound->reader; + in_pipe->set_endpoint (this); + } + + if (options.requires_in) { + outbound = new pipe_t (owner, this, options.hwm, options.lwm); + zmq_assert (outbound); + out_pipe = &outbound->writer; + out_pipe->set_endpoint (this); + } + + send_bind (owner, this, outbound ? &outbound->reader : NULL, + inbound ? &inbound->writer : NULL); } owned_t::process_plug (); |