summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/i_endpoint.hpp2
-rw-r--r--src/session.cpp29
-rw-r--r--src/session.hpp13
-rw-r--r--src/socket_base.cpp30
-rw-r--r--src/socket_base.hpp2
5 files changed, 44 insertions, 32 deletions
diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp
index 14a479e..8ee2984 100644
--- a/src/i_endpoint.hpp
+++ b/src/i_endpoint.hpp
@@ -25,6 +25,8 @@ namespace zmq
struct i_endpoint
{
+ virtual void attach_inpipe (class reader_t *pipe_) = 0;
+ virtual void attach_outpipe (class writer_t *pipe_) = 0;
virtual void revive (class reader_t *pipe_) = 0;
virtual void detach_inpipe (class reader_t *pipe_) = 0;
virtual void detach_outpipe (class writer_t *pipe_) = 0;
diff --git a/src/session.cpp b/src/session.cpp
index f562bd5..ef17d6d 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -43,21 +43,6 @@ zmq::session_t::~session_t ()
out_pipe->term ();
}
-void zmq::session_t::set_inbound_pipe (reader_t *pipe_)
-{
- zmq_assert (!in_pipe);
- in_pipe = pipe_;
- active = true;
- in_pipe->set_endpoint (this);
-}
-void zmq::session_t::set_outbound_pipe (writer_t *pipe_)
-{
- zmq_assert (!out_pipe);
- out_pipe = pipe_;
- out_pipe->set_endpoint (this);
-}
-
-
bool zmq::session_t::read (::zmq_msg_t *msg_)
{
if (!active)
@@ -90,6 +75,20 @@ void zmq::session_t::detach ()
// term ();
}
+void zmq::session_t::attach_inpipe (reader_t *pipe_)
+{
+ zmq_assert (!in_pipe);
+ in_pipe = pipe_;
+ active = true;
+ in_pipe->set_endpoint (this);
+}
+void zmq::session_t::attach_outpipe (writer_t *pipe_)
+{
+ zmq_assert (!out_pipe);
+ out_pipe = pipe_;
+ out_pipe->set_endpoint (this);
+}
+
void zmq::session_t::revive (reader_t *pipe_)
{
zmq_assert (in_pipe == pipe_);
diff --git a/src/session.hpp b/src/session.hpp
index 46699cf..195bdca 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -37,13 +37,6 @@ namespace zmq
session_t (object_t *parent_, socket_base_t *owner_, const char *name_,
const options_t &options_);
- void set_inbound_pipe (class reader_t *pipe_);
- void set_outbound_pipe (class writer_t *pipe_);
-
- private:
-
- ~session_t ();
-
// i_inout interface implementation.
bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_);
@@ -51,10 +44,16 @@ namespace zmq
void detach ();
// i_endpoint interface implementation.
+ void attach_inpipe (class reader_t *pipe_);
+ void attach_outpipe (class writer_t *pipe_);
void revive (class reader_t *pipe_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);
+ private:
+
+ ~session_t ();
+
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index e14065b..4e14c68 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -173,7 +173,7 @@ int zmq::socket_base_t::connect (const char *addr_)
pipe_t *in_pipe = new pipe_t (this, session, options.hwm, options.lwm);
zmq_assert (in_pipe);
in_pipe->reader.set_endpoint (this);
- session->set_outbound_pipe (&in_pipe->writer);
+ session->attach_outpipe (&in_pipe->writer);
in_pipes.push_back (&in_pipe->reader);
in_pipes.back ()->set_index (active);
in_pipes [active]->set_index (in_pipes.size () - 1);
@@ -184,7 +184,7 @@ int zmq::socket_base_t::connect (const char *addr_)
pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
zmq_assert (out_pipe);
out_pipe->writer.set_endpoint (this);
- session->set_inbound_pipe (&out_pipe->reader);
+ session->attach_inpipe (&out_pipe->reader);
out_pipes.push_back (&out_pipe->writer);
// Activate the session.
@@ -327,6 +327,22 @@ zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
return it->second;
}
+void zmq::socket_base_t::attach_inpipe (class reader_t *pipe_)
+{
+ pipe_->set_endpoint (this);
+ in_pipes.push_back (pipe_);
+ in_pipes.back ()->set_index (active);
+ in_pipes [active]->set_index (in_pipes.size () - 1);
+ std::swap (in_pipes.back (), in_pipes [active]);
+ active++;
+}
+
+void zmq::socket_base_t::attach_outpipe (class writer_t *pipe_)
+{
+ pipe_->set_endpoint (this);
+ out_pipes.push_back (pipe_);
+}
+
void zmq::socket_base_t::revive (reader_t *pipe_)
{
// Move the pipe to the list of active pipes.
@@ -372,15 +388,9 @@ void zmq::socket_base_t::process_bind (owned_t *session_,
reader_t *in_pipe_, writer_t *out_pipe_)
{
zmq_assert (in_pipe_);
- in_pipe_->set_endpoint (this);
- in_pipes.push_back (in_pipe_);
- in_pipes.back ()->set_index (active);
- in_pipes [active]->set_index (in_pipes.size () - 1);
- std::swap (in_pipes.back (), in_pipes [active]);
- active++;
+ attach_inpipe (in_pipe_);
zmq_assert (out_pipe_);
- out_pipe_->set_endpoint (this);
- out_pipes.push_back (out_pipe_);
+ attach_outpipe (out_pipe_);
}
void zmq::socket_base_t::process_term_req (owned_t *object_)
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 490c09a..284d2c4 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -60,6 +60,8 @@ namespace zmq
class session_t *find_session (const char *name_);
// i_endpoint interface implementation.
+ void attach_inpipe (class reader_t *pipe_);
+ void attach_outpipe (class writer_t *pipe_);
void revive (class reader_t *pipe_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);