From e645fc2693acc796304498909786b7b47005b429 Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Mon, 23 Jan 2012 08:53:35 +0100 Subject: Imported Upstream version 2.1.3 --- src/ctx.cpp | 340 ++++++++++++++++++++++++++++++------------------------------ 1 file changed, 168 insertions(+), 172 deletions(-) (limited to 'src/ctx.cpp') diff --git a/src/ctx.cpp b/src/ctx.cpp index 397f692..9cbb9de 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -1,93 +1,88 @@ /* - Copyright (c) 2007-2010 iMatix Corporation + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by + the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. + GNU Lesser General Public License for more details. - You should have received a copy of the Lesser GNU General Public License + You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #include #include -#include "../include/zmq.h" - #include "ctx.hpp" #include "socket_base.hpp" -#include "app_thread.hpp" #include "io_thread.hpp" #include "platform.hpp" +#include "reaper.hpp" #include "err.hpp" #include "pipe.hpp" #if defined ZMQ_HAVE_WINDOWS #include "windows.h" +#else +#include "unistd.h" #endif zmq::ctx_t::ctx_t (uint32_t io_threads_) : - sockets (0), - terminated (false) + terminating (false) { -#ifdef ZMQ_HAVE_WINDOWS - // Intialise Windows sockets. Note that WSAStartup can be called multiple - // times given that WSACleanup will be called for each WSAStartup. - WORD version_requested = MAKEWORD (2, 2); - WSADATA wsa_data; - int rc = WSAStartup (version_requested, &wsa_data); - zmq_assert (rc == 0); - zmq_assert (LOBYTE (wsa_data.wVersion) == 2 && - HIBYTE (wsa_data.wVersion) == 2); -#endif + int rc; - // Initialise the array of signalers. - signalers_count = max_app_threads + io_threads_; - signalers = (signaler_t**) malloc (sizeof (signaler_t*) * signalers_count); - zmq_assert (signalers); - memset (signalers, 0, sizeof (signaler_t*) * signalers_count); + // Initialise the array of mailboxes. Additional three slots are for + // internal log socket and the zmq_term thread the reaper thread. + slot_count = max_sockets + io_threads_ + 3; + slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); + alloc_assert (slots); + + // Initialise the infrastructure for zmq_term thread. + slots [term_tid] = &term_mailbox; + + // Create the reaper thread. + reaper = new (std::nothrow) reaper_t (this, reaper_tid); + alloc_assert (reaper); + slots [reaper_tid] = reaper->get_mailbox (); + reaper->start (); // Create I/O thread objects and launch them. - for (uint32_t i = 0; i != io_threads_; i++) { + for (uint32_t i = 2; i != io_threads_ + 2; i++) { io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); - zmq_assert (io_thread); + alloc_assert (io_thread); io_threads.push_back (io_thread); - signalers [i] = io_thread->get_signaler (); + slots [i] = io_thread->get_mailbox (); io_thread->start (); } -} -int zmq::ctx_t::term () -{ - // First send stop command to application threads so that any - // blocking calls are interrupted. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - app_threads [i].app_thread->stop (); - - // Then mark context as terminated. - term_sync.lock (); - zmq_assert (!terminated); - terminated = true; - bool destroy = (sockets == 0); - term_sync.unlock (); - - // If there are no sockets open, destroy the context immediately. - if (destroy) - delete this; + // In the unused part of the slot array, create a list of empty slots. + for (int32_t i = (int32_t) slot_count - 1; + i >= (int32_t) io_threads_ + 2; i--) { + empty_slots.push_back (i); + slots [i] = NULL; + } - return 0; + // Create the logging infrastructure. + log_socket = create_socket (ZMQ_PUB); + zmq_assert (log_socket); + rc = log_socket->bind ("sys://log"); + zmq_assert (rc == 0); } zmq::ctx_t::~ctx_t () { + // Check that there are no remaining sockets. + zmq_assert (sockets.empty ()); + // Ask I/O threads to terminate. If stop signal wasn't sent to I/O // thread subsequent invocation of destructor would hang-up. for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) @@ -97,136 +92,134 @@ zmq::ctx_t::~ctx_t () for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) delete io_threads [i]; - // Close all application theads, sockets, io_objects etc. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - delete app_threads [i].app_thread; - - // Deallocate all the orphaned pipes. - while (!pipes.empty ()) - delete *pipes.begin (); - - // Deallocate the array of pointers to signalers. No special work is - // needed as signalers themselves were deallocated with their - // corresponding (app_/io_) thread objects. - free (signalers); - -#ifdef ZMQ_HAVE_WINDOWS - // On Windows, uninitialise socket layer. - int rc = WSACleanup (); - wsa_assert (rc != SOCKET_ERROR); -#endif + // Deallocate the reaper thread object. + delete reaper; + + // Deallocate the array of mailboxes. No special work is + // needed as mailboxes themselves were deallocated with their + // corresponding io_thread/socket objects. + free (slots); } -zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) +int zmq::ctx_t::terminate () { - app_threads_sync.lock (); - - // Find whether the calling thread has app_thread_t object associated - // already. At the same time find an unused app_thread_t so that it can - // be used if there's no associated object for the calling thread. - // Check whether thread ID is already assigned. If so, return it. - app_threads_t::size_type unused = app_threads.size (); - app_threads_t::size_type current; - for (current = 0; current != app_threads.size (); current++) { - if (app_threads [current].associated && - thread_t::equal (thread_t::id (), app_threads [current].tid)) - break; - if (!app_threads [current].associated) - unused = current; + // Check whether termination was already underway, but interrupted and now + // restarted. + slot_sync.lock (); + bool restarted = terminating; + slot_sync.unlock (); + + // First attempt to terminate the context. + if (!restarted) { + + // Close the logging infrastructure. + log_sync.lock (); + int rc = log_socket->close (); + zmq_assert (rc == 0); + log_socket = NULL; + log_sync.unlock (); + + // First send stop command to sockets so that any blocking calls can be + // interrupted. If there are no sockets we can ask reaper thread to stop. + slot_sync.lock (); + terminating = true; + for (sockets_t::size_type i = 0; i != sockets.size (); i++) + sockets [i]->stop (); + if (sockets.empty ()) + reaper->stop (); + slot_sync.unlock (); } - // If no app_thread_t is associated with the calling thread, - // associate it with one of the unused app_thread_t objects. - if (current == app_threads.size ()) { + // Wait till reaper thread closes all the sockets. + command_t cmd; + int rc = term_mailbox.recv (&cmd, true); + if (rc == -1 && errno == EINTR) + return -1; + zmq_assert (rc == 0); + zmq_assert (cmd.type == command_t::done); + slot_sync.lock (); + zmq_assert (sockets.empty ()); + slot_sync.unlock (); - // If all the existing app_threads are already used, create one more. - if (unused == app_threads.size ()) { + // Deallocate the resources. + delete this; - // If max_app_threads limit was reached, return error. - if (app_threads.size () == max_app_threads) { - app_threads_sync.unlock (); - errno = EMTHREAD; - return NULL; - } + return 0; +} - // Create the new application thread proxy object. - app_thread_info_t info; - memset (&info, 0, sizeof (info)); - info.associated = false; - info.app_thread = new (std::nothrow) app_thread_t (this, - io_threads.size () + app_threads.size ()); - zmq_assert (info.app_thread); - signalers [io_threads.size () + app_threads.size ()] = - info.app_thread->get_signaler (); - app_threads.push_back (info); - } +zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) +{ + slot_sync.lock (); - // Incidentally, this works both when there is an unused app_thread - // and when a new one is created. - current = unused; + // Once zmq_term() was called, we can't create new sockets. + if (terminating) { + slot_sync.unlock (); + errno = ETERM; + return NULL; + } - // Associate the selected app_thread with the OS thread. - app_threads [current].associated = true; - app_threads [current].tid = thread_t::id (); + // If max_sockets limit was reached, return error. + if (empty_slots.empty ()) { + slot_sync.unlock (); + errno = EMFILE; + return NULL; } - app_thread_t *thread = app_threads [current].app_thread; - app_threads_sync.unlock (); + // Choose a slot for the socket. + uint32_t slot = empty_slots.back (); + empty_slots.pop_back (); - socket_base_t *s = thread->create_socket (type_); - if (!s) + // Create the socket and register its mailbox. + socket_base_t *s = socket_base_t::create (type_, this, slot); + if (!s) { + empty_slots.push_back (slot); + slot_sync.unlock (); return NULL; + } + sockets.push_back (s); + slots [slot] = s->get_mailbox (); - term_sync.lock (); - sockets++; - term_sync.unlock (); + slot_sync.unlock (); return s; } -void zmq::ctx_t::destroy_socket () +void zmq::ctx_t::destroy_socket (class socket_base_t *socket_) { - // If zmq_term was already called and there are no more sockets, - // terminate the whole 0MQ infrastructure. - term_sync.lock (); - zmq_assert (sockets > 0); - sockets--; - bool destroy = (sockets == 0 && terminated); - term_sync.unlock (); - - if (destroy) - delete this; -} + slot_sync.lock (); -void zmq::ctx_t::no_sockets (app_thread_t *thread_) -{ - app_threads_sync.lock (); - app_threads_t::size_type i; - for (i = 0; i != app_threads.size (); i++) - if (app_threads [i].app_thread == thread_) { - app_threads [i].associated = false; - break; - } - zmq_assert (i != app_threads.size ()); - app_threads_sync.unlock (); + // Free the associared thread slot. + uint32_t tid = socket_->get_tid (); + empty_slots.push_back (tid); + slots [tid] = NULL; + + // Remove the socket from the list of sockets. + sockets.erase (socket_); + + // If zmq_term() was already called and there are no more socket + // we can ask reaper thread to terminate. + if (terminating && sockets.empty ()) + reaper->stop (); + + slot_sync.unlock (); } -void zmq::ctx_t::send_command (uint32_t destination_, - const command_t &command_) +zmq::object_t *zmq::ctx_t::get_reaper () { - signalers [destination_]->send (command_); + return reaper; } -bool zmq::ctx_t::recv_command (uint32_t thread_slot_, - command_t *command_, bool block_) +void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_) { - return signalers [thread_slot_]->recv (command_, block_); + slots [tid_]->send (command_); } zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) { + if (io_threads.empty ()) + return NULL; + // Find the I/O thread with minimum load. - zmq_assert (io_threads.size () > 0); int min_load = -1; io_threads_t::size_type result = 0; for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) { @@ -242,29 +235,12 @@ zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) return io_threads [result]; } -void zmq::ctx_t::register_pipe (class pipe_t *pipe_) -{ - pipes_sync.lock (); - bool inserted = pipes.insert (pipe_).second; - zmq_assert (inserted); - pipes_sync.unlock (); -} - -void zmq::ctx_t::unregister_pipe (class pipe_t *pipe_) -{ - pipes_sync.lock (); - pipes_t::size_type erased = pipes.erase (pipe_); - zmq_assert (erased == 1); - pipes_sync.unlock (); -} - -int zmq::ctx_t::register_endpoint (const char *addr_, - socket_base_t *socket_) +int zmq::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_) { endpoints_sync.lock (); - bool inserted = endpoints.insert (std::make_pair (std::string (addr_), - socket_)).second; + bool inserted = endpoints.insert (endpoints_t::value_type ( + std::string (addr_), endpoint_)).second; if (!inserted) { errno = EADDRINUSE; endpoints_sync.unlock (); @@ -281,19 +257,19 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_) endpoints_t::iterator it = endpoints.begin (); while (it != endpoints.end ()) { - if (it->second == socket_) { + if (it->second.socket == socket_) { endpoints_t::iterator to_erase = it; - it++; + ++it; endpoints.erase (to_erase); continue; } - it++; + ++it; } endpoints_sync.unlock (); } -zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_) +zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) { endpoints_sync.lock (); @@ -301,17 +277,37 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_) if (it == endpoints.end ()) { endpoints_sync.unlock (); errno = ECONNREFUSED; - return NULL; + endpoint_t empty = {NULL, options_t()}; + return empty; } - socket_base_t *endpoint = it->second; + endpoint_t *endpoint = &it->second; // Increment the command sequence number of the peer so that it won't // get deallocated until "bind" command is issued by the caller. // The subsequent 'bind' has to be called with inc_seqnum parameter // set to false, so that the seqnum isn't incremented twice. - endpoint->inc_seqnum (); + endpoint->socket->inc_seqnum (); endpoints_sync.unlock (); - return endpoint; + return *endpoint; } +void zmq::ctx_t::log (const char *format_, va_list args_) +{ + // Create the log message. + zmq_msg_t msg; + int rc = zmq_msg_init_size (&msg, strlen (format_) + 1); + zmq_assert (rc == 0); + memcpy (zmq_msg_data (&msg), format_, zmq_msg_size (&msg)); + + // At this point we migrate the log socket to the current thread. + // We rely on mutex for executing the memory barrier. + log_sync.lock (); + if (log_socket) + log_socket->send (&msg, 0); + log_sync.unlock (); + + zmq_msg_close (&msg); +} + + -- cgit v1.2.3