diff options
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r-- | src/socket_base.cpp | 80 |
1 files changed, 59 insertions, 21 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 68fc82b..e14065b 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -50,6 +50,16 @@ zmq::socket_base_t::~socket_base_t () { shutting_down = true; + // Ask all pipes to terminate. + for (in_pipes_t::iterator it = in_pipes.begin (); + it != in_pipes.end (); it++) + (*it)->term (); + in_pipes.clear (); + for (out_pipes_t::iterator it = out_pipes.begin (); + it != out_pipes.end (); it++) + (*it)->term (); + out_pipes.clear (); + while (true) { // On third pass of the loop there should be no more I/O objects @@ -164,17 +174,18 @@ int zmq::socket_base_t::connect (const char *addr_) zmq_assert (in_pipe); in_pipe->reader.set_endpoint (this); session->set_outbound_pipe (&in_pipe->writer); - in_pipes.push_back (std::make_pair (&in_pipe->reader, session)); - in_pipes.back ().first->set_index (active); - in_pipes [active].first->set_index (in_pipes.size () - 1); + in_pipes.push_back (&in_pipe->reader); + in_pipes.back ()->set_index (active); + in_pipes [active]->set_index (in_pipes.size () - 1); std::swap (in_pipes.back (), in_pipes [active]); active++; // Create outbound pipe. pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm); zmq_assert (out_pipe); + out_pipe->writer.set_endpoint (this); session->set_inbound_pipe (&out_pipe->reader); - out_pipes.push_back (std::make_pair (&out_pipe->writer, session)); + out_pipes.push_back (&out_pipe->writer); // Activate the session. send_plug (session); @@ -225,7 +236,7 @@ int zmq::socket_base_t::flush () { for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); it++) - it->first->flush (); + (*it)->flush (); return 0; } @@ -320,12 +331,38 @@ void zmq::socket_base_t::revive (reader_t *pipe_) { // Move the pipe to the list of active pipes. in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index (); - in_pipes [index].first->set_index (active); - in_pipes [active].first->set_index (index); + in_pipes [index]->set_index (active); + in_pipes [active]->set_index (index); std::swap (in_pipes [index], in_pipes [active]); active++; } +void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_) +{ + // Remove the pipe from the list of inbound pipes. + in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index (); + if (index < active) { + in_pipes [index]->set_index (active - 1); + in_pipes [active - 1]->set_index (index); + std::swap (in_pipes [index], in_pipes [active - 1]); + active--; + index = active; + } + in_pipes [index]->set_index (in_pipes.size () - 1); + in_pipes [in_pipes.size () - 1]->set_index (index); + std::swap (in_pipes [index], in_pipes [in_pipes.size () - 1]); + in_pipes.pop_back (); +} + +void zmq::socket_base_t::detach_outpipe (class writer_t *pipe_) +{ + out_pipes_t::size_type index = (out_pipes_t::size_type) pipe_->get_index (); + out_pipes [index]->set_index (out_pipes.size () - 1); + out_pipes [out_pipes.size () - 1]->set_index (index); + std::swap (out_pipes [index], out_pipes [out_pipes.size () - 1]); + out_pipes.pop_back (); +} + void zmq::socket_base_t::process_own (owned_t *object_) { io_objects.insert (object_); @@ -336,13 +373,14 @@ void zmq::socket_base_t::process_bind (owned_t *session_, { zmq_assert (in_pipe_); in_pipe_->set_endpoint (this); - in_pipes.push_back (std::make_pair (in_pipe_, session_)); - in_pipes.back ().first->set_index (active); - in_pipes [active].first->set_index (in_pipes.size () - 1); + in_pipes.push_back (in_pipe_); + in_pipes.back ()->set_index (active); + in_pipes [active]->set_index (in_pipes.size () - 1); std::swap (in_pipes.back (), in_pipes [active]); active++; zmq_assert (out_pipe_); - out_pipes.push_back (std::make_pair (out_pipe_, session_)); + out_pipe_->set_endpoint (this); + out_pipes.push_back (out_pipe_); } void zmq::socket_base_t::process_term_req (owned_t *object_) @@ -388,7 +426,7 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_) // First check whether all pipes are available for writing. for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); it++) - if (!it->first->check_write (zmq_msg_size (msg_))) + if (!(*it)->check_write (zmq_msg_size (msg_))) return false; msg_content_t *content = (msg_content_t*) msg_->content; @@ -397,9 +435,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_) if (content == (msg_content_t*) ZMQ_VSM) { for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); it++) { - it->first->write (msg_); + (*it)->write (msg_); if (flush_) - it->first->flush (); + (*it)->flush (); } int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); @@ -410,9 +448,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_) // to send the message to - no refcount adjustment i.e. no atomic // operations are needed. if (pipes_count == 1) { - out_pipes.begin ()->first->write (msg_); + (*out_pipes.begin ())->write (msg_); if (flush_) - out_pipes.begin ()->first->flush (); + (*out_pipes.begin ())->flush (); int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); return true; @@ -431,9 +469,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_) // Push the message to all destinations. for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); it++) { - it->first->write (msg_); + (*it)->write (msg_); if (flush_) - it->first->flush (); + (*it)->flush (); } // Detach the original message from the data buffer. @@ -451,13 +489,13 @@ bool zmq::socket_base_t::fetch (zmq_msg_t *msg_) // Round-robin over the pipes to get next message. for (int count = active; count != 0; count--) { - bool fetched = in_pipes [current].first->read (msg_); + bool fetched = in_pipes [current]->read (msg_); // If there's no message in the pipe, move it to the list of // non-active pipes. if (!fetched) { - in_pipes [current].first->set_index (active - 1); - in_pipes [active - 1].first->set_index (current); + in_pipes [current]->set_index (active - 1); + in_pipes [active - 1]->set_index (current); std::swap (in_pipes [current], in_pipes [active - 1]); active--; } |