diff options
author | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-08-28 16:51:46 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-08-28 16:51:46 +0200 |
commit | cb09c6951e2c4405318b422a1f9213af3e4b6b8a (patch) | |
tree | fb5d4dfd6a71745e885b2501f19cfbbb38c6f441 /src | |
parent | 2dd501651592baa7f9e49f52e1321ae2b9b4e126 (diff) |
pipe deallocation added
Diffstat (limited to 'src')
-rw-r--r-- | src/command.hpp | 11 | ||||
-rw-r--r-- | src/dispatcher.cpp | 20 | ||||
-rw-r--r-- | src/dispatcher.hpp | 18 | ||||
-rw-r--r-- | src/i_endpoint.hpp | 2 | ||||
-rw-r--r-- | src/object.cpp | 44 | ||||
-rw-r--r-- | src/object.hpp | 8 | ||||
-rw-r--r-- | src/pipe.cpp | 81 | ||||
-rw-r--r-- | src/pipe.hpp | 30 | ||||
-rw-r--r-- | src/session.cpp | 18 | ||||
-rw-r--r-- | src/session.hpp | 2 | ||||
-rw-r--r-- | src/socket_base.cpp | 80 | ||||
-rw-r--r-- | src/socket_base.hpp | 13 |
12 files changed, 290 insertions, 37 deletions
diff --git a/src/command.hpp b/src/command.hpp index d3bad79..d16d4fa 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -40,6 +40,8 @@ namespace zmq attach, bind, revive, + pipe_term, + pipe_term_ack, term_req, term, term_ack @@ -78,6 +80,15 @@ namespace zmq struct { } revive; + // Sent by pipe reader to pipe writer to ask it to terminate + // its end of the pipe. + struct { + } pipe_term; + + // Pipe writer acknowledges pipe_term command. + struct { + } pipe_term_ack; + // Sent by I/O object ot the socket to request the shutdown of // the I/O object. struct { diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index c0f4541..71e20df 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -83,6 +83,10 @@ zmq::dispatcher_t::~dispatcher_t () for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) delete io_threads [i]; + // Deallocate all the orphaned pipes. + for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it++) + delete *it; + delete [] command_pipes; #ifdef ZMQ_HAVE_WINDOWS @@ -146,3 +150,19 @@ zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_) return io_threads [result]; } + +void zmq::dispatcher_t::register_pipe (class pipe_t *pipe_) +{ + pipes_sync.lock (); + bool inserted = pipes.insert (pipe_).second; + zmq_assert (inserted); + pipes_sync.unlock (); +} + +void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_) +{ + pipes_sync.lock (); + pipes_t::size_type erased = pipes.erase (pipe_); + zmq_assert (erased == 1); + pipes_sync.unlock (); +} diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index 08596cb..cb445ef 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -21,6 +21,7 @@ #define __ZMQ_DISPATCHER_HPP_INCLUDED__ #include <vector> +#include <set> #include <map> #include <string> @@ -85,6 +86,11 @@ namespace zmq // Taskset specifies which I/O threads are eligible (0 = all). class io_thread_t *choose_io_thread (uint64_t taskset_); + // All pipes are registered with the dispatcher so that even the + // orphaned pipes can be deallocated on the terminal shutdown. + void register_pipe (class pipe_t *pipe_); + void unregister_pipe (class pipe_t *pipe_); + private: // Returns the app thread associated with the current thread. @@ -112,6 +118,18 @@ namespace zmq // Synchronisation of accesses to shared thread data. mutex_t threads_sync; + // As pipes may reside in orphaned state in particular moments + // of the pipe shutdown process, i.e. neither pipe reader nor + // pipe writer hold reference to the pipe, we have to hold references + // to all pipes in dispatcher so that we can deallocate them + // during terminal shutdown even though it conincides with the + // pipe being in the orphaned state. + typedef std::set <class pipe_t*> pipes_t; + pipes_t pipes; + + // Synchronisation of access to the pipes repository. + mutex_t pipes_sync; + dispatcher_t (const dispatcher_t&); void operator = (const dispatcher_t&); }; diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp index bb7409e..14a479e 100644 --- a/src/i_endpoint.hpp +++ b/src/i_endpoint.hpp @@ -26,6 +26,8 @@ namespace zmq struct i_endpoint { virtual void revive (class reader_t *pipe_) = 0; + virtual void detach_inpipe (class reader_t *pipe_) = 0; + virtual void detach_outpipe (class writer_t *pipe_) = 0; }; } diff --git a/src/object.cpp b/src/object.cpp index 4d54ebf..b3cf898 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -83,6 +83,14 @@ void zmq::object_t::process_command (command_t &cmd_) cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); return; + case command_t::pipe_term: + process_pipe_term (); + return; + + case command_t::pipe_term_ack: + process_pipe_term_ack (); + return; + case command_t::term_req: process_term_req (cmd_.args.term_req.object); return; @@ -100,6 +108,16 @@ void zmq::object_t::process_command (command_t &cmd_) } } +void zmq::object_t::register_pipe (class pipe_t *pipe_) +{ + dispatcher->register_pipe (pipe_); +} + +void zmq::object_t::unregister_pipe (class pipe_t *pipe_) +{ + dispatcher->unregister_pipe (pipe_); +} + zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) { return dispatcher->choose_io_thread (taskset_); @@ -166,6 +184,22 @@ void zmq::object_t::send_revive (object_t *destination_) send_command (cmd); } +void zmq::object_t::send_pipe_term (writer_t *destination_) +{ + command_t cmd; + cmd.destination = destination_; + cmd.type = command_t::pipe_term; + send_command (cmd); +} + +void zmq::object_t::send_pipe_term_ack (reader_t *destination_) +{ + command_t cmd; + cmd.destination = destination_; + cmd.type = command_t::pipe_term_ack; + send_command (cmd); +} + void zmq::object_t::send_term_req (socket_base_t *destination_, owned_t *object_) { @@ -223,6 +257,16 @@ void zmq::object_t::process_revive () zmq_assert (false); } +void zmq::object_t::process_pipe_term () +{ + zmq_assert (false); +} + +void zmq::object_t::process_pipe_term_ack () +{ + zmq_assert (false); +} + void zmq::object_t::process_term_req (owned_t *object_) { zmq_assert (false); diff --git a/src/object.hpp b/src/object.hpp index 0dbac24..8ce569e 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -42,6 +42,10 @@ namespace zmq int get_thread_slot (); void process_command (struct command_t &cmd_); + // Allow pipe to access corresponding dispatcher functions. + void register_pipe (class pipe_t *pipe_); + void unregister_pipe (class pipe_t *pipe_); + protected: // Derived object can use following functions to interact with @@ -60,6 +64,8 @@ namespace zmq void send_bind (object_t *destination_, class owned_t *session_, class reader_t *in_pipe_, class writer_t *out_pipe_); void send_revive (class object_t *destination_); + void send_pipe_term (class writer_t *destination_); + void send_pipe_term_ack (class reader_t *destination_); void send_term_req (class socket_base_t *destination_, class owned_t *object_); void send_term (class owned_t *destination_); @@ -74,6 +80,8 @@ namespace zmq virtual void process_bind (class owned_t *session_, class reader_t *in_pipe_, class writer_t *out_pipe_); virtual void process_revive (); + virtual void process_pipe_term (); + virtual void process_pipe_term_ack (); virtual void process_term_req (class owned_t *object_); virtual void process_term (); virtual void process_term_ack (); diff --git a/src/pipe.cpp b/src/pipe.cpp index 5016631..3748ae9 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -19,6 +19,8 @@ #include <pthread.h> +#include <../include/zmq.h> + #include "pipe.hpp" zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, @@ -39,9 +41,21 @@ zmq::reader_t::~reader_t () bool zmq::reader_t::read (zmq_msg_t *msg_) { - return pipe->read (msg_); + if (!pipe->read (msg_)) + return false; + + // If delimiter was read, start termination process of the pipe. + unsigned char *offset = 0; + if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) { + if (endpoint) + endpoint->detach_inpipe (this); + term (); + return false; + } // TODO: Adjust the size of the pipe. + + return true; } void zmq::reader_t::set_endpoint (i_endpoint *endpoint_) @@ -59,19 +73,48 @@ int zmq::reader_t::get_index () return index; } +void zmq::reader_t::term () +{ + endpoint = NULL; + send_pipe_term (peer); +} + void zmq::reader_t::process_revive () { endpoint->revive (this); } +void zmq::reader_t::process_pipe_term_ack () +{ + peer = NULL; + delete pipe; +} + zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, uint64_t hwm_, uint64_t lwm_) : object_t (parent_), pipe (pipe_), peer (&pipe_->reader), hwm (hwm_), - lwm (lwm_) + lwm (lwm_), + index (-1), + endpoint (NULL) +{ +} + +void zmq::writer_t::set_endpoint (i_endpoint *endpoint_) +{ + endpoint = endpoint_; +} + +void zmq::writer_t::set_index (int index_) +{ + index = index_; +} + +int zmq::writer_t::get_index () { + return index; } zmq::writer_t::~writer_t () @@ -99,14 +142,46 @@ void zmq::writer_t::flush () send_revive (peer); } +void zmq::writer_t::term () +{ + endpoint = NULL; + + // Push delimiter into the pipe. + // Trick the compiler to belive that the tag is a valid pointer. + zmq_msg_t msg; + const unsigned char *offset = 0; + msg.content = (void*) (offset + ZMQ_DELIMITER); + msg.shared = false; + pipe->write (msg); + pipe->flush (); +} + +void zmq::writer_t::process_pipe_term () +{ + if (endpoint) + endpoint->detach_outpipe (this); + + reader_t *p = peer; + peer = NULL; + send_pipe_term_ack (p); +} + zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_) : reader (reader_parent_, this, hwm_, lwm_), writer (writer_parent_, this, hwm_, lwm_) { + reader.register_pipe (this); } zmq::pipe_t::~pipe_t () { + // Deallocate all the unread messages in the pipe. We have to do it by + // hand because zmq_msg_t is a POD, not a class, so there's no associated + // destructor. + zmq_msg_t msg; + while (read (&msg)) + zmq_msg_close (&msg); + + reader.unregister_pipe (this); } - diff --git a/src/pipe.hpp b/src/pipe.hpp index b7593c7..b4e592a 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -39,25 +39,29 @@ namespace zmq uint64_t hwm_, uint64_t lwm_); ~reader_t (); + void set_endpoint (i_endpoint *endpoint_); + // Reads a message to the underlying pipe. bool read (struct zmq_msg_t *msg_); - void set_endpoint (i_endpoint *endpoint_); - // Mnaipulation of index of the pipe. void set_index (int index_); int get_index (); + // Ask pipe to terminate. + void term (); + private: // Command handlers. void process_revive (); + void process_pipe_term_ack (); // The underlying pipe. class pipe_t *pipe; // Pipe writer associated with the other side of the pipe. - class object_t *peer; + class writer_t *peer; // High and low watermarks for in-memory storage (in bytes). uint64_t hwm; @@ -86,6 +90,8 @@ namespace zmq uint64_t hwm_, uint64_t lwm_); ~writer_t (); + void set_endpoint (i_endpoint *endpoint_); + // Checks whether message with specified size can be written to the // pipe. If writing the message would cause high watermark to be // exceeded, the function returns false. @@ -98,13 +104,23 @@ namespace zmq // Flush the messages downsteam. void flush (); + // Mnaipulation of index of the pipe. + void set_index (int index_); + int get_index (); + + // Ask pipe to terminate. + void term (); + private: + // Command handlers. + void process_pipe_term (); + // The underlying pipe. class pipe_t *pipe; // Pipe reader associated with the other side of the pipe. - class object_t *peer; + class reader_t *peer; // High and low watermarks for in-memory storage (in bytes). uint64_t hwm; @@ -114,6 +130,12 @@ namespace zmq uint64_t head; uint64_t tail; + // Index of the pipe in the socket's list of outbound pipes. + int index; + + // Endpoint (either session or socket) the pipe is attached to. + i_endpoint *endpoint; + writer_t (const writer_t&); void operator = (const writer_t&); }; diff --git a/src/session.cpp b/src/session.cpp index 0b1b947..d667851 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -36,6 +36,11 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, zmq::session_t::~session_t () { + // Ask associated pipes to terminate. + if (in_pipe) + in_pipe->term (); + if (out_pipe) + out_pipe->term (); } void zmq::session_t::set_inbound_pipe (reader_t *pipe_) @@ -49,6 +54,7 @@ void zmq::session_t::set_outbound_pipe (writer_t *pipe_) { zmq_assert (!out_pipe); out_pipe = pipe_; + out_pipe->set_endpoint (this); } @@ -92,6 +98,17 @@ void zmq::session_t::revive (reader_t *pipe_) engine->revive (); } +void zmq::session_t::detach_inpipe (reader_t *pipe_) +{ + active = false; + in_pipe = NULL; +} + +void zmq::session_t::detach_outpipe (writer_t *pipe_) +{ + out_pipe = NULL; +} + void zmq::session_t::process_plug () { // Register the session with the socket. @@ -112,6 +129,7 @@ void zmq::session_t::process_plug () pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm); zmq_assert (outbound); out_pipe = &outbound->writer; + out_pipe->set_endpoint (this); send_bind (owner, this, &outbound->reader, &inbound->writer); } diff --git a/src/session.hpp b/src/session.hpp index 4a0882b..ba5bcdd 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -52,6 +52,8 @@ namespace zmq // i_endpoint interface implementation. void revive (class reader_t *pipe_); + void detach_inpipe (class reader_t *pipe_); + void detach_outpipe (class writer_t *pipe_); // Handlers for incoming commands. void process_plug (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 68fc82b..e14065b 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -50,6 +50,16 @@ zmq::socket_base_t::~socket_base_t () { shutting_down = true; + // Ask all pipes to terminate. + for (in_pipes_t::iterator it = in_pipes.begin (); + it != in_pipes.end (); it++) + (*it)->term (); + in_pipes.clear (); + for (out_pipes_t::iterator it = out_pipes.begin (); + it != out_pipes.end (); it++) + (*it)->term (); + out_pipes.clear (); + while (true) { // On third pass of the loop there should be no more I/O objects @@ -164,17 +174,18 @@ int zmq::socket_base_t::connect (const char *addr_) zmq_assert (in_pipe); in_pipe->reader.set_endpoint (this); session->set_outbound_pipe (&in_pipe->writer); - in_pipes.push_back (std::make_pair (&in_pipe->reader, session)); - in_pipes.back ().first->set_index (active); - in_pipes [active].first->set_index (in_pipes.size () - 1); + in_pipes.push_back (&in_pipe->reader); + in_pipes.back ()->set_index (active); + in_pipes [active]->set_index (in_pipes.size () - 1); std::swap (in_pipes.back (), in_pipes [active]); active++; // Create outbound pipe. pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm); zmq_assert (out_pipe); + out_pipe->writer.set_endpoint (this); session->set_inbound_pipe (&out_pipe->reader); - out_pipes.push_back (std::make_pair (&out_pipe->writer, session)); + out_pipes.push_back (&out_pipe->writer); // Activate the session. send_plug (session); @@ -225,7 +236,7 @@ int zmq::socket_base_t::flush () { for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); it++) - it->first->flush (); + (*it)->flush (); return 0; } @@ -320,12 +331,38 @@ void zmq::socket_base_t::revive (reader_t *pipe_) { // Move the pipe to the list of active pipes. in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index (); - in_pipes [index].first->set_index (active); - in_pipes [active].first->set_index (index); + in_pipes [index]->set_index (active); + in_pipes [active]->set_index (index); std::swap (in_pipes [index], in_pipes [active]); active++; } +void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_) +{ + // Remove the pipe from the list of inbound pipes. + in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index (); + if (index < active) { + in_pipes [index]->set_index (active - 1); + in_pipes [active - 1]->set_index (index); + std::swap (in_pipes [index], in_pipes [active - 1]); + active--; + index = active; + } + in_pipes [index]->set_index (in_pipes.size () - 1); + in_pipes [in_pipes.size () - 1]->set_index (index); + std::swap (in_pipes [index], in_pipes [in_pipes.size () - 1]); + in_pipes.pop_back (); +} + +void zmq::socket_base_t::detach_outpipe (class writer_t *pipe_) +{ + out_pipes_t::size_type index = (out_pipes_t::size_type) pipe_->get_index (); + out_pipes [index]->set_index (out_pipes.size () - 1); + out_pipes [out_pipes.size () - 1]->set_index (index); + std::swap (out_pipes [index], out_pipes [out_pipes.size () - 1]); + out_pipes.pop_back (); +} + void zmq::socket_base_t::process_own (owned_t *object_) { io_objects.insert (object_); @@ -336,13 +373,14 @@ void zmq::socket_base_t::process_bind (owned_t *session_, { zmq_assert (in_pipe_); in_pipe_->set_endpoint (this); - in_pipes.push_back (std::make_pair (in_pipe_, session_)); - in_pipes.back ().first->set_index (active); - in_pipes [active].first->set_index (in_pipes.size () - 1); + in_pipes.push_back (in_pipe_); + in_pipes.back ()->set_index (active); + in_pipes [active]->set_index (in_pipes.size () - 1); std::swap (in_pipes.back (), in_pipes [active]); active++; zmq_assert (out_pipe_); - out_pipes.push_back (std::make_pair (out_pipe_, session_)); + out_pipe_->set_endpoint (this); + out_pipes.push_back (out_pipe_); } void zmq::socket_base_t::process_term_req (owned_t *object_) @@ -388,7 +426,7 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_) // First check whether all pipes are available for writing. for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); it++) - if (!it->first->check_write (zmq_msg_size (msg_))) + if (!(*it)->check_write (zmq_msg_size (msg_))) return false; msg_content_t *content = (msg_content_t*) msg_->content; @@ -397,9 +435,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_) if (content == (msg_content_t*) ZMQ_VSM) { for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); it++) { - it->first->write (msg_); + (*it)->write (msg_); if (flush_) - it->first->flush (); + (*it)->flush (); } int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); @@ -410,9 +448,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_) // to send the message to - no refcount adjustment i.e. no atomic // operations are needed. if (pipes_count == 1) { - out_pipes.begin ()->first->write (msg_); + (*out_pipes.begin ())->write (msg_); if (flush_) - out_pipes.begin ()->first->flush (); + (*out_pipes.begin ())->flush (); int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); return true; @@ -431,9 +469,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_) // Push the message to all destinations. for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); it++) { - it->first->write (msg_); + (*it)->write (msg_); if (flush_) - it->first->flush (); + (*it)->flush (); } // Detach the original message from the data buffer. @@ -451,13 +489,13 @@ bool zmq::socket_base_t::fetch (zmq_msg_t *msg_) // Round-robin over the pipes to get next message. for (int count = active; count != 0; count--) { - bool fetched = in_pipes [current].first->read (msg_); + bool fetched = in_pipes [current]->read (msg_); // If there's no message in the pipe, move it to the list of // non-active pipes. if (!fetched) { - in_pipes [current].first->set_index (active - 1); - in_pipes [active - 1].first->set_index (current); + in_pipes [current]->set_index (active - 1); + in_pipes [active - 1]->set_index (current); std::swap (in_pipes [current], in_pipes [active - 1]); active--; } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 1f04bda..490c09a 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -24,7 +24,6 @@ #include <map> #include <vector> #include <string> -#include <utility> #include "i_endpoint.hpp" #include "object.hpp" @@ -62,6 +61,8 @@ namespace zmq // i_endpoint interface implementation. void revive (class reader_t *pipe_); + void detach_inpipe (class reader_t *pipe_); + void detach_outpipe (class writer_t *pipe_); private: @@ -86,10 +87,7 @@ namespace zmq io_objects_t io_objects; // Inbound pipes, i.e. those the socket is getting messages from. - // The second member in the pair indicates the object on the other - // side of the pipe. - typedef std::vector <std::pair <class reader_t*, owned_t*> > - in_pipes_t; + typedef std::vector <class reader_t*> in_pipes_t; in_pipes_t in_pipes; // Index of the next inbound pipe to read messages from. @@ -100,10 +98,7 @@ namespace zmq in_pipes_t::size_type active; // Outbound pipes, i.e. those the socket is sending messages to. - // The second member in the pair indicates the object on the other - // side of the pipe. - typedef std::vector <std::pair <class writer_t*, owned_t*> > - out_pipes_t; + typedef std::vector <class writer_t*> out_pipes_t; out_pipes_t out_pipes; // Number of I/O objects that were already asked to terminate |