diff options
Diffstat (limited to 'src/ctx.cpp')
-rw-r--r-- | src/ctx.cpp | 306 |
1 files changed, 161 insertions, 145 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp index 397f692..ff7c68c 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -4,27 +4,24 @@ This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by + the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. + GNU Lesser General Public License for more details. - You should have received a copy of the Lesser GNU General Public License + You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include <new> #include <string.h> -#include "../include/zmq.h" - #include "ctx.hpp" #include "socket_base.hpp" -#include "app_thread.hpp" #include "io_thread.hpp" #include "platform.hpp" #include "err.hpp" @@ -32,62 +29,60 @@ #if defined ZMQ_HAVE_WINDOWS #include "windows.h" +#else +#include "unistd.h" #endif zmq::ctx_t::ctx_t (uint32_t io_threads_) : - sockets (0), - terminated (false) + no_sockets_notify (false) { + int rc; + #ifdef ZMQ_HAVE_WINDOWS // Intialise Windows sockets. Note that WSAStartup can be called multiple // times given that WSACleanup will be called for each WSAStartup. WORD version_requested = MAKEWORD (2, 2); WSADATA wsa_data; - int rc = WSAStartup (version_requested, &wsa_data); + rc = WSAStartup (version_requested, &wsa_data); zmq_assert (rc == 0); zmq_assert (LOBYTE (wsa_data.wVersion) == 2 && HIBYTE (wsa_data.wVersion) == 2); #endif // Initialise the array of signalers. - signalers_count = max_app_threads + io_threads_; - signalers = (signaler_t**) malloc (sizeof (signaler_t*) * signalers_count); - zmq_assert (signalers); - memset (signalers, 0, sizeof (signaler_t*) * signalers_count); + slot_count = max_sockets + io_threads_; + slots = (signaler_t**) malloc (sizeof (signaler_t*) * slot_count); + zmq_assert (slots); // Create I/O thread objects and launch them. for (uint32_t i = 0; i != io_threads_; i++) { io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); zmq_assert (io_thread); io_threads.push_back (io_thread); - signalers [i] = io_thread->get_signaler (); + slots [i] = io_thread->get_signaler (); io_thread->start (); } -} -int zmq::ctx_t::term () -{ - // First send stop command to application threads so that any - // blocking calls are interrupted. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - app_threads [i].app_thread->stop (); - - // Then mark context as terminated. - term_sync.lock (); - zmq_assert (!terminated); - terminated = true; - bool destroy = (sockets == 0); - term_sync.unlock (); - - // If there are no sockets open, destroy the context immediately. - if (destroy) - delete this; + // In the unused part of the slot array, create a list of empty slots. + for (int32_t i = (int32_t) slot_count - 1; + i >= (int32_t) io_threads_; i--) { + empty_slots.push_back (i); + slots [i] = NULL; + } - return 0; + // Create the logging infrastructure. + log_socket = create_socket (ZMQ_PUB); + zmq_assert (log_socket); + rc = log_socket->bind ("sys://log"); + zmq_assert (rc == 0); } zmq::ctx_t::~ctx_t () { + // Check that there are no remaining open or zombie sockets. + zmq_assert (sockets.empty ()); + zmq_assert (zombies.empty ()); + // Ask I/O threads to terminate. If stop signal wasn't sent to I/O // thread subsequent invocation of destructor would hang-up. for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) @@ -97,18 +92,10 @@ zmq::ctx_t::~ctx_t () for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) delete io_threads [i]; - // Close all application theads, sockets, io_objects etc. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - delete app_threads [i].app_thread; - - // Deallocate all the orphaned pipes. - while (!pipes.empty ()) - delete *pipes.begin (); - - // Deallocate the array of pointers to signalers. No special work is + // Deallocate the array of slot. No special work is // needed as signalers themselves were deallocated with their - // corresponding (app_/io_) thread objects. - free (signalers); + // corresponding io_thread/socket objects. + free (slots); #ifdef ZMQ_HAVE_WINDOWS // On Windows, uninitialise socket layer. @@ -117,116 +104,129 @@ zmq::ctx_t::~ctx_t () #endif } -zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) +int zmq::ctx_t::terminate () { - app_threads_sync.lock (); - - // Find whether the calling thread has app_thread_t object associated - // already. At the same time find an unused app_thread_t so that it can - // be used if there's no associated object for the calling thread. - // Check whether thread ID is already assigned. If so, return it. - app_threads_t::size_type unused = app_threads.size (); - app_threads_t::size_type current; - for (current = 0; current != app_threads.size (); current++) { - if (app_threads [current].associated && - thread_t::equal (thread_t::id (), app_threads [current].tid)) - break; - if (!app_threads [current].associated) - unused = current; - } + // Close the logging infrastructure. + log_sync.lock (); + int rc = log_socket->close (); + zmq_assert (rc == 0); + log_socket = NULL; + log_sync.unlock (); - // If no app_thread_t is associated with the calling thread, - // associate it with one of the unused app_thread_t objects. - if (current == app_threads.size ()) { + // First send stop command to sockets so that any + // blocking calls are interrupted. + slot_sync.lock (); + for (sockets_t::size_type i = 0; i != sockets.size (); i++) + sockets [i]->stop (); + if (!sockets.empty ()) + no_sockets_notify = true; + slot_sync.unlock (); + + // Find out whether there are any open sockets to care about. + // If there are open sockets, sleep till they are closed. Note that we can + // use no_sockets_notify safely out of the critical section as once set + // its value is never changed again. + if (no_sockets_notify) + no_sockets_sync.wait (); + + // Note that the lock won't block anyone here. There's noone else having + // open sockets anyway. The only purpose of the lock is to double-check all + // the CPU caches have been synchronised. + slot_sync.lock (); + + // At this point there should be no active sockets. What we have is a set + // of zombies waiting to be dezombified. + zmq_assert (sockets.empty ()); + + // Get rid of remaining zombie sockets. + while (!zombies.empty ()) { + dezombify (); + + // Sleep for 1ms not to end up busy-looping in the case the I/O threads + // are still busy sending data. We can possibly add a grand poll here + // (polling for fds associated with all the zombie sockets), but it's + // probably not worth of implementing it. +#if defined ZMQ_HAVE_WINDOWS + Sleep (1); +#else + usleep (1000); +#endif + } + slot_sync.unlock (); - // If all the existing app_threads are already used, create one more. - if (unused == app_threads.size ()) { + // Deallocate the resources. + delete this; - // If max_app_threads limit was reached, return error. - if (app_threads.size () == max_app_threads) { - app_threads_sync.unlock (); - errno = EMTHREAD; - return NULL; - } + return 0; +} - // Create the new application thread proxy object. - app_thread_info_t info; - memset (&info, 0, sizeof (info)); - info.associated = false; - info.app_thread = new (std::nothrow) app_thread_t (this, - io_threads.size () + app_threads.size ()); - zmq_assert (info.app_thread); - signalers [io_threads.size () + app_threads.size ()] = - info.app_thread->get_signaler (); - app_threads.push_back (info); - } +zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) +{ + slot_sync.lock (); - // Incidentally, this works both when there is an unused app_thread - // and when a new one is created. - current = unused; + // Free the slots, if possible. + dezombify (); - // Associate the selected app_thread with the OS thread. - app_threads [current].associated = true; - app_threads [current].tid = thread_t::id (); + // If max_sockets limit was reached, return error. + if (empty_slots.empty ()) { + slot_sync.unlock (); + errno = EMFILE; + return NULL; } - app_thread_t *thread = app_threads [current].app_thread; - app_threads_sync.unlock (); + // Choose a slot for the socket. + uint32_t slot = empty_slots.back (); + empty_slots.pop_back (); - socket_base_t *s = thread->create_socket (type_); - if (!s) + // Create the socket and register its signaler. + socket_base_t *s = socket_base_t::create (type_, this, slot); + if (!s) { + empty_slots.push_back (slot); + slot_sync.unlock (); return NULL; + } + sockets.push_back (s); + slots [slot] = s->get_signaler (); - term_sync.lock (); - sockets++; - term_sync.unlock (); + slot_sync.unlock (); return s; } -void zmq::ctx_t::destroy_socket () +void zmq::ctx_t::zombify_socket (socket_base_t *socket_) { - // If zmq_term was already called and there are no more sockets, - // terminate the whole 0MQ infrastructure. - term_sync.lock (); - zmq_assert (sockets > 0); - sockets--; - bool destroy = (sockets == 0 && terminated); - term_sync.unlock (); - - if (destroy) - delete this; -} + // Zombification of socket basically means that its ownership is tranferred + // from the application that created it to the context. -void zmq::ctx_t::no_sockets (app_thread_t *thread_) -{ - app_threads_sync.lock (); - app_threads_t::size_type i; - for (i = 0; i != app_threads.size (); i++) - if (app_threads [i].app_thread == thread_) { - app_threads [i].associated = false; - break; - } - zmq_assert (i != app_threads.size ()); - app_threads_sync.unlock (); -} + // Note that the lock provides the memory barrier needed to migrate + // zombie-to-be socket from it's native thread to shared data area + // synchronised by slot_sync. + slot_sync.lock (); + sockets.erase (socket_); + zombies.push_back (socket_); -void zmq::ctx_t::send_command (uint32_t destination_, - const command_t &command_) -{ - signalers [destination_]->send (command_); + // Try to get rid of at least some zombie sockets at this point. + dezombify (); + + // If shutdown thread is interested in notification about no more + // open sockets, notify it now. + if (sockets.empty () && no_sockets_notify) + no_sockets_sync.post (); + + slot_sync.unlock (); } -bool zmq::ctx_t::recv_command (uint32_t thread_slot_, - command_t *command_, bool block_) +void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_) { - return signalers [thread_slot_]->recv (command_, block_); + slots [slot_]->send (command_); } zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) { + if (io_threads.empty ()) + return NULL; + // Find the I/O thread with minimum load. - zmq_assert (io_threads.size () > 0); int min_load = -1; io_threads_t::size_type result = 0; for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) { @@ -242,29 +242,13 @@ zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) return io_threads [result]; } -void zmq::ctx_t::register_pipe (class pipe_t *pipe_) -{ - pipes_sync.lock (); - bool inserted = pipes.insert (pipe_).second; - zmq_assert (inserted); - pipes_sync.unlock (); -} - -void zmq::ctx_t::unregister_pipe (class pipe_t *pipe_) -{ - pipes_sync.lock (); - pipes_t::size_type erased = pipes.erase (pipe_); - zmq_assert (erased == 1); - pipes_sync.unlock (); -} - int zmq::ctx_t::register_endpoint (const char *addr_, socket_base_t *socket_) { endpoints_sync.lock (); - bool inserted = endpoints.insert (std::make_pair (std::string (addr_), - socket_)).second; + bool inserted = endpoints.insert (endpoints_t::value_type ( + std::string (addr_), socket_)).second; if (!inserted) { errno = EADDRINUSE; endpoints_sync.unlock (); @@ -315,3 +299,35 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_) return endpoint; } +void zmq::ctx_t::log (zmq_msg_t *msg_) +{ + // At this point we migrate the log socket to the current thread. + // We rely on mutex for executing the memory barrier. + log_sync.lock (); + if (log_socket) + log_socket->send (msg_, 0); + log_sync.unlock (); +} + +void zmq::ctx_t::dezombify () +{ + // Try to dezombify each zombie in the list. Note that caller is + // responsible for calling this method in the slot_sync critical section. + for (zombies_t::iterator it = zombies.begin (); it != zombies.end ();) { + uint32_t slot = (*it)->get_slot (); + if ((*it)->dezombify ()) { +#if defined _MSC_VER + + // HP implementation of STL requires doing it this way... + it = zombies.erase (it); +#else + zombies.erase (it); +#endif + empty_slots.push_back (slot); + slots [slot] = NULL; + } + else + it++; + } +} + |