summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-09-02 10:22:23 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-09-02 10:22:23 +0200
commit6a5120b1f1c48d19b777f76ac756b00fb624d110 (patch)
tree33853b4f9aaaf88bcb82a53fe3607d91d7a06ab1 /src/socket_base.cpp
parent72fdf47d16c8d3ecd9da657b4649978e414d775c (diff)
python extension & perf tests
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp30
1 files changed, 20 insertions, 10 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index e14065b..4e14c68 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -173,7 +173,7 @@ int zmq::socket_base_t::connect (const char *addr_)
pipe_t *in_pipe = new pipe_t (this, session, options.hwm, options.lwm);
zmq_assert (in_pipe);
in_pipe->reader.set_endpoint (this);
- session->set_outbound_pipe (&in_pipe->writer);
+ session->attach_outpipe (&in_pipe->writer);
in_pipes.push_back (&in_pipe->reader);
in_pipes.back ()->set_index (active);
in_pipes [active]->set_index (in_pipes.size () - 1);
@@ -184,7 +184,7 @@ int zmq::socket_base_t::connect (const char *addr_)
pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
zmq_assert (out_pipe);
out_pipe->writer.set_endpoint (this);
- session->set_inbound_pipe (&out_pipe->reader);
+ session->attach_inpipe (&out_pipe->reader);
out_pipes.push_back (&out_pipe->writer);
// Activate the session.
@@ -327,6 +327,22 @@ zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
return it->second;
}
+void zmq::socket_base_t::attach_inpipe (class reader_t *pipe_)
+{
+ pipe_->set_endpoint (this);
+ in_pipes.push_back (pipe_);
+ in_pipes.back ()->set_index (active);
+ in_pipes [active]->set_index (in_pipes.size () - 1);
+ std::swap (in_pipes.back (), in_pipes [active]);
+ active++;
+}
+
+void zmq::socket_base_t::attach_outpipe (class writer_t *pipe_)
+{
+ pipe_->set_endpoint (this);
+ out_pipes.push_back (pipe_);
+}
+
void zmq::socket_base_t::revive (reader_t *pipe_)
{
// Move the pipe to the list of active pipes.
@@ -372,15 +388,9 @@ void zmq::socket_base_t::process_bind (owned_t *session_,
reader_t *in_pipe_, writer_t *out_pipe_)
{
zmq_assert (in_pipe_);
- in_pipe_->set_endpoint (this);
- in_pipes.push_back (in_pipe_);
- in_pipes.back ()->set_index (active);
- in_pipes [active]->set_index (in_pipes.size () - 1);
- std::swap (in_pipes.back (), in_pipes [active]);
- active++;
+ attach_inpipe (in_pipe_);
zmq_assert (out_pipe_);
- out_pipe_->set_endpoint (this);
- out_pipes.push_back (out_pipe_);
+ attach_outpipe (out_pipe_);
}
void zmq::socket_base_t::process_term_req (owned_t *object_)