diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/i_endpoint.hpp | 2 | ||||
-rw-r--r-- | src/session.cpp | 29 | ||||
-rw-r--r-- | src/session.hpp | 13 | ||||
-rw-r--r-- | src/socket_base.cpp | 30 | ||||
-rw-r--r-- | src/socket_base.hpp | 2 |
5 files changed, 44 insertions, 32 deletions
diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp index 14a479e..8ee2984 100644 --- a/src/i_endpoint.hpp +++ b/src/i_endpoint.hpp @@ -25,6 +25,8 @@ namespace zmq struct i_endpoint { + virtual void attach_inpipe (class reader_t *pipe_) = 0; + virtual void attach_outpipe (class writer_t *pipe_) = 0; virtual void revive (class reader_t *pipe_) = 0; virtual void detach_inpipe (class reader_t *pipe_) = 0; virtual void detach_outpipe (class writer_t *pipe_) = 0; diff --git a/src/session.cpp b/src/session.cpp index f562bd5..ef17d6d 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -43,21 +43,6 @@ zmq::session_t::~session_t () out_pipe->term (); } -void zmq::session_t::set_inbound_pipe (reader_t *pipe_) -{ - zmq_assert (!in_pipe); - in_pipe = pipe_; - active = true; - in_pipe->set_endpoint (this); -} -void zmq::session_t::set_outbound_pipe (writer_t *pipe_) -{ - zmq_assert (!out_pipe); - out_pipe = pipe_; - out_pipe->set_endpoint (this); -} - - bool zmq::session_t::read (::zmq_msg_t *msg_) { if (!active) @@ -90,6 +75,20 @@ void zmq::session_t::detach () // term (); } +void zmq::session_t::attach_inpipe (reader_t *pipe_) +{ + zmq_assert (!in_pipe); + in_pipe = pipe_; + active = true; + in_pipe->set_endpoint (this); +} +void zmq::session_t::attach_outpipe (writer_t *pipe_) +{ + zmq_assert (!out_pipe); + out_pipe = pipe_; + out_pipe->set_endpoint (this); +} + void zmq::session_t::revive (reader_t *pipe_) { zmq_assert (in_pipe == pipe_); diff --git a/src/session.hpp b/src/session.hpp index 46699cf..195bdca 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -37,13 +37,6 @@ namespace zmq session_t (object_t *parent_, socket_base_t *owner_, const char *name_, const options_t &options_); - void set_inbound_pipe (class reader_t *pipe_); - void set_outbound_pipe (class writer_t *pipe_); - - private: - - ~session_t (); - // i_inout interface implementation. bool read (::zmq_msg_t *msg_); bool write (::zmq_msg_t *msg_); @@ -51,10 +44,16 @@ namespace zmq void detach (); // i_endpoint interface implementation. + void attach_inpipe (class reader_t *pipe_); + void attach_outpipe (class writer_t *pipe_); void revive (class reader_t *pipe_); void detach_inpipe (class reader_t *pipe_); void detach_outpipe (class writer_t *pipe_); + private: + + ~session_t (); + // Handlers for incoming commands. void process_plug (); void process_unplug (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index e14065b..4e14c68 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -173,7 +173,7 @@ int zmq::socket_base_t::connect (const char *addr_) pipe_t *in_pipe = new pipe_t (this, session, options.hwm, options.lwm); zmq_assert (in_pipe); in_pipe->reader.set_endpoint (this); - session->set_outbound_pipe (&in_pipe->writer); + session->attach_outpipe (&in_pipe->writer); in_pipes.push_back (&in_pipe->reader); in_pipes.back ()->set_index (active); in_pipes [active]->set_index (in_pipes.size () - 1); @@ -184,7 +184,7 @@ int zmq::socket_base_t::connect (const char *addr_) pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm); zmq_assert (out_pipe); out_pipe->writer.set_endpoint (this); - session->set_inbound_pipe (&out_pipe->reader); + session->attach_inpipe (&out_pipe->reader); out_pipes.push_back (&out_pipe->writer); // Activate the session. @@ -327,6 +327,22 @@ zmq::session_t *zmq::socket_base_t::find_session (const char *name_) return it->second; } +void zmq::socket_base_t::attach_inpipe (class reader_t *pipe_) +{ + pipe_->set_endpoint (this); + in_pipes.push_back (pipe_); + in_pipes.back ()->set_index (active); + in_pipes [active]->set_index (in_pipes.size () - 1); + std::swap (in_pipes.back (), in_pipes [active]); + active++; +} + +void zmq::socket_base_t::attach_outpipe (class writer_t *pipe_) +{ + pipe_->set_endpoint (this); + out_pipes.push_back (pipe_); +} + void zmq::socket_base_t::revive (reader_t *pipe_) { // Move the pipe to the list of active pipes. @@ -372,15 +388,9 @@ void zmq::socket_base_t::process_bind (owned_t *session_, reader_t *in_pipe_, writer_t *out_pipe_) { zmq_assert (in_pipe_); - in_pipe_->set_endpoint (this); - in_pipes.push_back (in_pipe_); - in_pipes.back ()->set_index (active); - in_pipes [active]->set_index (in_pipes.size () - 1); - std::swap (in_pipes.back (), in_pipes [active]); - active++; + attach_inpipe (in_pipe_); zmq_assert (out_pipe_); - out_pipe_->set_endpoint (this); - out_pipes.push_back (out_pipe_); + attach_outpipe (out_pipe_); } void zmq::socket_base_t::process_term_req (owned_t *object_) diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 490c09a..284d2c4 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -60,6 +60,8 @@ namespace zmq class session_t *find_session (const char *name_); // i_endpoint interface implementation. + void attach_inpipe (class reader_t *pipe_); + void attach_outpipe (class writer_t *pipe_); void revive (class reader_t *pipe_); void detach_inpipe (class reader_t *pipe_); void detach_outpipe (class writer_t *pipe_); |