diff options
| author | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:35 +0100 | 
|---|---|---|
| committer | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:35 +0100 | 
| commit | e645fc2693acc796304498909786b7b47005b429 (patch) | |
| tree | 4118cd4c7b9eba3ba1d6892800c79669ea94c4e9 /src/ctx.cpp | |
| parent | 2c416a793ea781273a5da6742211f5f01af13a2b (diff) | |
Imported Upstream version 2.1.3upstream/2.1.3
Diffstat (limited to 'src/ctx.cpp')
| -rw-r--r-- | src/ctx.cpp | 340 | 
1 files changed, 168 insertions, 172 deletions
| diff --git a/src/ctx.cpp b/src/ctx.cpp index 397f692..9cbb9de 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -1,93 +1,88 @@  /* -    Copyright (c) 2007-2010 iMatix Corporation +    Copyright (c) 2007-2011 iMatix Corporation +    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file      This file is part of 0MQ.      0MQ is free software; you can redistribute it and/or modify it under -    the terms of the Lesser GNU General Public License as published by +    the terms of the GNU Lesser General Public License as published by      the Free Software Foundation; either version 3 of the License, or      (at your option) any later version.      0MQ is distributed in the hope that it will be useful,      but WITHOUT ANY WARRANTY; without even the implied warranty of      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    Lesser GNU General Public License for more details. +    GNU Lesser General Public License for more details. -    You should have received a copy of the Lesser GNU General Public License +    You should have received a copy of the GNU Lesser General Public License      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */  #include <new>  #include <string.h> -#include "../include/zmq.h" -  #include "ctx.hpp"  #include "socket_base.hpp" -#include "app_thread.hpp"  #include "io_thread.hpp"  #include "platform.hpp" +#include "reaper.hpp"  #include "err.hpp"  #include "pipe.hpp"  #if defined ZMQ_HAVE_WINDOWS  #include "windows.h" +#else +#include "unistd.h"  #endif  zmq::ctx_t::ctx_t (uint32_t io_threads_) : -    sockets (0), -    terminated (false) +    terminating (false)  { -#ifdef ZMQ_HAVE_WINDOWS -    //  Intialise Windows sockets. Note that WSAStartup can be called multiple -    //  times given that WSACleanup will be called for each WSAStartup. -    WORD version_requested = MAKEWORD (2, 2); -    WSADATA wsa_data; -    int rc = WSAStartup (version_requested, &wsa_data); -    zmq_assert (rc == 0); -    zmq_assert (LOBYTE (wsa_data.wVersion) == 2 && -        HIBYTE (wsa_data.wVersion) == 2); -#endif +    int rc; -    //  Initialise the array of signalers. -    signalers_count = max_app_threads + io_threads_; -    signalers = (signaler_t**) malloc (sizeof (signaler_t*) * signalers_count); -    zmq_assert (signalers); -    memset (signalers, 0, sizeof (signaler_t*) * signalers_count); +    //  Initialise the array of mailboxes. Additional three slots are for +    //  internal log socket and the zmq_term thread the reaper thread. +    slot_count = max_sockets + io_threads_ + 3; +    slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); +    alloc_assert (slots); + +    //  Initialise the infrastructure for zmq_term thread. +    slots [term_tid] = &term_mailbox; + +    //  Create the reaper thread. +    reaper = new (std::nothrow) reaper_t (this, reaper_tid); +    alloc_assert (reaper); +    slots [reaper_tid] = reaper->get_mailbox (); +    reaper->start ();      //  Create I/O thread objects and launch them. -    for (uint32_t i = 0; i != io_threads_; i++) { +    for (uint32_t i = 2; i != io_threads_ + 2; i++) {          io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); -        zmq_assert (io_thread); +        alloc_assert (io_thread);          io_threads.push_back (io_thread); -        signalers [i] = io_thread->get_signaler (); +        slots [i] = io_thread->get_mailbox ();          io_thread->start ();      } -} -int zmq::ctx_t::term () -{ -    //  First send stop command to application threads so that any -    //  blocking calls are interrupted. -    for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) -        app_threads [i].app_thread->stop (); - -    //  Then mark context as terminated. -    term_sync.lock (); -    zmq_assert (!terminated); -    terminated = true; -    bool destroy = (sockets == 0); -    term_sync.unlock (); -     -    //  If there are no sockets open, destroy the context immediately. -    if (destroy) -        delete this; +    //  In the unused part of the slot array, create a list of empty slots. +    for (int32_t i = (int32_t) slot_count - 1; +          i >= (int32_t) io_threads_ + 2; i--) { +        empty_slots.push_back (i); +        slots [i] = NULL; +    } -    return 0; +    //  Create the logging infrastructure. +    log_socket = create_socket (ZMQ_PUB); +    zmq_assert (log_socket); +    rc = log_socket->bind ("sys://log"); +    zmq_assert (rc == 0);  }  zmq::ctx_t::~ctx_t ()  { +    //  Check that there are no remaining sockets. +    zmq_assert (sockets.empty ()); +      //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O      //  thread subsequent invocation of destructor would hang-up.      for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) @@ -97,136 +92,134 @@ zmq::ctx_t::~ctx_t ()      for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)          delete io_threads [i]; -    //  Close all application theads, sockets, io_objects etc. -    for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) -        delete app_threads [i].app_thread; - -    //  Deallocate all the orphaned pipes. -    while (!pipes.empty ()) -        delete *pipes.begin (); - -    //  Deallocate the array of pointers to signalers. No special work is -    //  needed as signalers themselves were deallocated with their -    //  corresponding (app_/io_) thread objects. -    free (signalers); -     -#ifdef ZMQ_HAVE_WINDOWS -    //  On Windows, uninitialise socket layer. -    int rc = WSACleanup (); -    wsa_assert (rc != SOCKET_ERROR); -#endif +    //  Deallocate the reaper thread object. +    delete reaper; + +    //  Deallocate the array of mailboxes. No special work is +    //  needed as mailboxes themselves were deallocated with their +    //  corresponding io_thread/socket objects. +    free (slots);  } -zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) +int zmq::ctx_t::terminate ()  { -    app_threads_sync.lock (); - -    //  Find whether the calling thread has app_thread_t object associated -    //  already. At the same time find an unused app_thread_t so that it can -    //  be used if there's no associated object for the calling thread. -    //  Check whether thread ID is already assigned. If so, return it. -    app_threads_t::size_type unused = app_threads.size (); -    app_threads_t::size_type current; -    for (current = 0; current != app_threads.size (); current++) { -        if (app_threads [current].associated && -              thread_t::equal (thread_t::id (), app_threads [current].tid)) -            break; -        if (!app_threads [current].associated) -            unused = current; +    //  Check whether termination was already underway, but interrupted and now +    //  restarted. +    slot_sync.lock (); +    bool restarted = terminating; +    slot_sync.unlock (); + +    //  First attempt to terminate the context. +    if (!restarted) { + +        //  Close the logging infrastructure. +        log_sync.lock (); +        int rc = log_socket->close (); +        zmq_assert (rc == 0); +        log_socket = NULL; +        log_sync.unlock (); + +        //  First send stop command to sockets so that any blocking calls can be +        //  interrupted. If there are no sockets we can ask reaper thread to stop. +        slot_sync.lock (); +        terminating = true; +        for (sockets_t::size_type i = 0; i != sockets.size (); i++) +            sockets [i]->stop (); +        if (sockets.empty ()) +            reaper->stop (); +        slot_sync.unlock ();      } -    //  If no app_thread_t is associated with the calling thread, -    //  associate it with one of the unused app_thread_t objects. -    if (current == app_threads.size ()) { +    //  Wait till reaper thread closes all the sockets. +    command_t cmd; +    int rc = term_mailbox.recv (&cmd, true); +    if (rc == -1 && errno == EINTR) +        return -1; +    zmq_assert (rc == 0); +    zmq_assert (cmd.type == command_t::done); +    slot_sync.lock (); +    zmq_assert (sockets.empty ()); +    slot_sync.unlock (); -        //  If all the existing app_threads are already used, create one more. -        if (unused == app_threads.size ()) { +    //  Deallocate the resources. +    delete this; -            //  If max_app_threads limit was reached, return error. -            if (app_threads.size () == max_app_threads) { -                app_threads_sync.unlock (); -                errno = EMTHREAD; -                return NULL; -            } +    return 0; +} -            //  Create the new application thread proxy object. -            app_thread_info_t info; -            memset (&info, 0, sizeof (info)); -            info.associated = false; -            info.app_thread = new (std::nothrow) app_thread_t (this, -                io_threads.size () + app_threads.size ()); -            zmq_assert (info.app_thread); -            signalers [io_threads.size () + app_threads.size ()] = -                info.app_thread->get_signaler (); -            app_threads.push_back (info); -        } +zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) +{ +    slot_sync.lock (); -        //  Incidentally, this works both when there is an unused app_thread -        //  and when a new one is created. -        current = unused; +    //  Once zmq_term() was called, we can't create new sockets. +    if (terminating) { +        slot_sync.unlock (); +        errno = ETERM; +        return NULL; +    } -        //  Associate the selected app_thread with the OS thread. -        app_threads [current].associated = true; -        app_threads [current].tid = thread_t::id (); +    //  If max_sockets limit was reached, return error. +    if (empty_slots.empty ()) { +        slot_sync.unlock (); +        errno = EMFILE; +        return NULL;      } -    app_thread_t *thread = app_threads [current].app_thread; -    app_threads_sync.unlock (); +    //  Choose a slot for the socket. +    uint32_t slot = empty_slots.back (); +    empty_slots.pop_back (); -    socket_base_t *s = thread->create_socket (type_); -    if (!s) +    //  Create the socket and register its mailbox. +    socket_base_t *s = socket_base_t::create (type_, this, slot); +    if (!s) { +        empty_slots.push_back (slot); +        slot_sync.unlock ();          return NULL; +    } +    sockets.push_back (s); +    slots [slot] = s->get_mailbox (); -    term_sync.lock (); -    sockets++; -    term_sync.unlock (); +    slot_sync.unlock ();      return s;  } -void zmq::ctx_t::destroy_socket () +void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)  { -    //  If zmq_term was already called and there are no more sockets, -    //  terminate the whole 0MQ infrastructure. -    term_sync.lock (); -    zmq_assert (sockets > 0); -    sockets--; -    bool destroy = (sockets == 0 && terminated); -    term_sync.unlock (); - -    if (destroy) -       delete this; -} +    slot_sync.lock (); -void zmq::ctx_t::no_sockets (app_thread_t *thread_) -{ -    app_threads_sync.lock (); -    app_threads_t::size_type i; -    for (i = 0; i != app_threads.size (); i++) -        if (app_threads [i].app_thread == thread_) { -            app_threads [i].associated = false; -            break; -        } -    zmq_assert (i != app_threads.size ()); -    app_threads_sync.unlock (); +    //  Free the associared thread slot. +    uint32_t tid = socket_->get_tid (); +    empty_slots.push_back (tid); +    slots [tid] = NULL;     + +    //  Remove the socket from the list of sockets. +    sockets.erase (socket_); + +    //  If zmq_term() was already called and there are no more socket +    //  we can ask reaper thread to terminate. +    if (terminating && sockets.empty ()) +        reaper->stop (); + +    slot_sync.unlock ();  } -void zmq::ctx_t::send_command (uint32_t destination_, -    const command_t &command_) +zmq::object_t *zmq::ctx_t::get_reaper ()  { -    signalers [destination_]->send (command_); +    return reaper;  } -bool zmq::ctx_t::recv_command (uint32_t thread_slot_, -    command_t *command_, bool block_) +void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)  { -    return signalers [thread_slot_]->recv (command_, block_); +    slots [tid_]->send (command_);  }  zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)  { +    if (io_threads.empty ()) +        return NULL; +      //  Find the I/O thread with minimum load. -    zmq_assert (io_threads.size () > 0);      int min_load = -1;      io_threads_t::size_type result = 0;      for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) { @@ -242,29 +235,12 @@ zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)      return io_threads [result];  } -void zmq::ctx_t::register_pipe (class pipe_t *pipe_) -{ -    pipes_sync.lock (); -    bool inserted = pipes.insert (pipe_).second; -    zmq_assert (inserted); -    pipes_sync.unlock (); -} - -void zmq::ctx_t::unregister_pipe (class pipe_t *pipe_) -{ -    pipes_sync.lock (); -    pipes_t::size_type erased = pipes.erase (pipe_); -    zmq_assert (erased == 1); -    pipes_sync.unlock (); -} - -int zmq::ctx_t::register_endpoint (const char *addr_, -    socket_base_t *socket_) +int zmq::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)  {      endpoints_sync.lock (); -    bool inserted = endpoints.insert (std::make_pair (std::string (addr_), -        socket_)).second; +    bool inserted = endpoints.insert (endpoints_t::value_type ( +        std::string (addr_), endpoint_)).second;      if (!inserted) {          errno = EADDRINUSE;          endpoints_sync.unlock (); @@ -281,19 +257,19 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)      endpoints_t::iterator it = endpoints.begin ();      while (it != endpoints.end ()) { -        if (it->second == socket_) { +        if (it->second.socket == socket_) {              endpoints_t::iterator to_erase = it; -            it++; +            ++it;              endpoints.erase (to_erase);              continue;          } -        it++; +        ++it;      }      endpoints_sync.unlock ();  } -zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_) +zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)  {       endpoints_sync.lock (); @@ -301,17 +277,37 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_)       if (it == endpoints.end ()) {           endpoints_sync.unlock ();           errno = ECONNREFUSED; -         return NULL; +         endpoint_t empty = {NULL, options_t()}; +         return empty;       } -     socket_base_t *endpoint = it->second; +     endpoint_t *endpoint = &it->second;       //  Increment the command sequence number of the peer so that it won't       //  get deallocated until "bind" command is issued by the caller.       //  The subsequent 'bind' has to be called with inc_seqnum parameter       //  set to false, so that the seqnum isn't incremented twice. -     endpoint->inc_seqnum (); +     endpoint->socket->inc_seqnum ();       endpoints_sync.unlock (); -     return endpoint; +     return *endpoint;  } +void zmq::ctx_t::log (const char *format_, va_list args_) +{ +    //  Create the log message. +    zmq_msg_t msg; +    int rc = zmq_msg_init_size (&msg, strlen (format_) + 1); +    zmq_assert (rc == 0); +    memcpy (zmq_msg_data (&msg), format_, zmq_msg_size (&msg)); + +    //  At this  point we migrate the log socket to the current thread. +    //  We rely on mutex for executing the memory barrier. +    log_sync.lock (); +    if (log_socket) +        log_socket->send (&msg, 0); +    log_sync.unlock (); + +    zmq_msg_close (&msg); +} + + | 
