summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-28 16:51:46 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-28 16:51:46 +0200
commitcb09c6951e2c4405318b422a1f9213af3e4b6b8a (patch)
treefb5d4dfd6a71745e885b2501f19cfbbb38c6f441 /src/socket_base.cpp
parent2dd501651592baa7f9e49f52e1321ae2b9b4e126 (diff)
pipe deallocation added
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp80
1 files changed, 59 insertions, 21 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 68fc82b..e14065b 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -50,6 +50,16 @@ zmq::socket_base_t::~socket_base_t ()
{
shutting_down = true;
+ // Ask all pipes to terminate.
+ for (in_pipes_t::iterator it = in_pipes.begin ();
+ it != in_pipes.end (); it++)
+ (*it)->term ();
+ in_pipes.clear ();
+ for (out_pipes_t::iterator it = out_pipes.begin ();
+ it != out_pipes.end (); it++)
+ (*it)->term ();
+ out_pipes.clear ();
+
while (true) {
// On third pass of the loop there should be no more I/O objects
@@ -164,17 +174,18 @@ int zmq::socket_base_t::connect (const char *addr_)
zmq_assert (in_pipe);
in_pipe->reader.set_endpoint (this);
session->set_outbound_pipe (&in_pipe->writer);
- in_pipes.push_back (std::make_pair (&in_pipe->reader, session));
- in_pipes.back ().first->set_index (active);
- in_pipes [active].first->set_index (in_pipes.size () - 1);
+ in_pipes.push_back (&in_pipe->reader);
+ in_pipes.back ()->set_index (active);
+ in_pipes [active]->set_index (in_pipes.size () - 1);
std::swap (in_pipes.back (), in_pipes [active]);
active++;
// Create outbound pipe.
pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
zmq_assert (out_pipe);
+ out_pipe->writer.set_endpoint (this);
session->set_inbound_pipe (&out_pipe->reader);
- out_pipes.push_back (std::make_pair (&out_pipe->writer, session));
+ out_pipes.push_back (&out_pipe->writer);
// Activate the session.
send_plug (session);
@@ -225,7 +236,7 @@ int zmq::socket_base_t::flush ()
{
for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
it++)
- it->first->flush ();
+ (*it)->flush ();
return 0;
}
@@ -320,12 +331,38 @@ void zmq::socket_base_t::revive (reader_t *pipe_)
{
// Move the pipe to the list of active pipes.
in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index ();
- in_pipes [index].first->set_index (active);
- in_pipes [active].first->set_index (index);
+ in_pipes [index]->set_index (active);
+ in_pipes [active]->set_index (index);
std::swap (in_pipes [index], in_pipes [active]);
active++;
}
+void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_)
+{
+ // Remove the pipe from the list of inbound pipes.
+ in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index ();
+ if (index < active) {
+ in_pipes [index]->set_index (active - 1);
+ in_pipes [active - 1]->set_index (index);
+ std::swap (in_pipes [index], in_pipes [active - 1]);
+ active--;
+ index = active;
+ }
+ in_pipes [index]->set_index (in_pipes.size () - 1);
+ in_pipes [in_pipes.size () - 1]->set_index (index);
+ std::swap (in_pipes [index], in_pipes [in_pipes.size () - 1]);
+ in_pipes.pop_back ();
+}
+
+void zmq::socket_base_t::detach_outpipe (class writer_t *pipe_)
+{
+ out_pipes_t::size_type index = (out_pipes_t::size_type) pipe_->get_index ();
+ out_pipes [index]->set_index (out_pipes.size () - 1);
+ out_pipes [out_pipes.size () - 1]->set_index (index);
+ std::swap (out_pipes [index], out_pipes [out_pipes.size () - 1]);
+ out_pipes.pop_back ();
+}
+
void zmq::socket_base_t::process_own (owned_t *object_)
{
io_objects.insert (object_);
@@ -336,13 +373,14 @@ void zmq::socket_base_t::process_bind (owned_t *session_,
{
zmq_assert (in_pipe_);
in_pipe_->set_endpoint (this);
- in_pipes.push_back (std::make_pair (in_pipe_, session_));
- in_pipes.back ().first->set_index (active);
- in_pipes [active].first->set_index (in_pipes.size () - 1);
+ in_pipes.push_back (in_pipe_);
+ in_pipes.back ()->set_index (active);
+ in_pipes [active]->set_index (in_pipes.size () - 1);
std::swap (in_pipes.back (), in_pipes [active]);
active++;
zmq_assert (out_pipe_);
- out_pipes.push_back (std::make_pair (out_pipe_, session_));
+ out_pipe_->set_endpoint (this);
+ out_pipes.push_back (out_pipe_);
}
void zmq::socket_base_t::process_term_req (owned_t *object_)
@@ -388,7 +426,7 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
// First check whether all pipes are available for writing.
for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
it++)
- if (!it->first->check_write (zmq_msg_size (msg_)))
+ if (!(*it)->check_write (zmq_msg_size (msg_)))
return false;
msg_content_t *content = (msg_content_t*) msg_->content;
@@ -397,9 +435,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
if (content == (msg_content_t*) ZMQ_VSM) {
for (out_pipes_t::iterator it = out_pipes.begin ();
it != out_pipes.end (); it++) {
- it->first->write (msg_);
+ (*it)->write (msg_);
if (flush_)
- it->first->flush ();
+ (*it)->flush ();
}
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
@@ -410,9 +448,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
// to send the message to - no refcount adjustment i.e. no atomic
// operations are needed.
if (pipes_count == 1) {
- out_pipes.begin ()->first->write (msg_);
+ (*out_pipes.begin ())->write (msg_);
if (flush_)
- out_pipes.begin ()->first->flush ();
+ (*out_pipes.begin ())->flush ();
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return true;
@@ -431,9 +469,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
// Push the message to all destinations.
for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
it++) {
- it->first->write (msg_);
+ (*it)->write (msg_);
if (flush_)
- it->first->flush ();
+ (*it)->flush ();
}
// Detach the original message from the data buffer.
@@ -451,13 +489,13 @@ bool zmq::socket_base_t::fetch (zmq_msg_t *msg_)
// Round-robin over the pipes to get next message.
for (int count = active; count != 0; count--) {
- bool fetched = in_pipes [current].first->read (msg_);
+ bool fetched = in_pipes [current]->read (msg_);
// If there's no message in the pipe, move it to the list of
// non-active pipes.
if (!fetched) {
- in_pipes [current].first->set_index (active - 1);
- in_pipes [active - 1].first->set_index (current);
+ in_pipes [current]->set_index (active - 1);
+ in_pipes [active - 1]->set_index (current);
std::swap (in_pipes [current], in_pipes [active - 1]);
active--;
}