diff options
| -rw-r--r-- | src/ctx.cpp | 25 | ||||
| -rw-r--r-- | src/ctx.hpp | 10 | ||||
| -rw-r--r-- | src/object.cpp | 5 | ||||
| -rw-r--r-- | src/object.hpp | 5 | 
4 files changed, 43 insertions, 2 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp index 79145eb..f66e1fe 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -20,8 +20,6 @@  #include <new>  #include <string.h> -#include "../include/zmq.h" -  #include "ctx.hpp"  #include "socket_base.hpp"  #include "io_thread.hpp" @@ -68,6 +66,12 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :          empty_slots.push_back (i);          slots [i] = NULL;      } + +    //  Create the logging infrastructure. +    log_socket = create_socket (ZMQ_PUB); +    zmq_assert (log_socket); +    int rc = log_socket->bind ("inproc://log"); +    zmq_assert (rc == 0);  }  zmq::ctx_t::~ctx_t () @@ -104,6 +108,13 @@ int zmq::ctx_t::terminate ()      for (sockets_t::size_type i = 0; i != sockets.size (); i++)          sockets [i]->stop (); +    //  Close the logging infrastructure. +    log_sync.lock (); +    int rc = log_socket->close (); +    zmq_assert (rc == 0); +    log_socket = NULL; +    log_sync.unlock (); +      //  Find out whether there are any open sockets to care about.      //  If so, sleep till they are closed. Note that we can use      //  no_sockets_notify safely out of the critical section as once set @@ -287,6 +298,16 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_)       return endpoint;  } +void zmq::ctx_t::log (zmq_msg_t *msg_) +{ +    //  At this  point we migrate the log socket to the current thread. +    //  We rely on mutex for executing the memory barrier. +    log_sync.lock (); +    if (log_socket) +        log_socket->send (msg_, 0); +    log_sync.unlock (); +} +  void zmq::ctx_t::dezombify ()  {      //  Try to dezombify each zombie in the list. Note that caller is diff --git a/src/ctx.hpp b/src/ctx.hpp index a31e1d8..5f6cc83 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -24,6 +24,8 @@  #include <vector>  #include <string> +#include "../include/zmq.h" +  #include "signaler.hpp"  #include "semaphore.hpp"  #include "ypipe.hpp" @@ -74,6 +76,9 @@ namespace zmq          void unregister_endpoints (class socket_base_t *socket_);          class socket_base_t *find_endpoint (const char *addr_); +        //  Logging. +        void log (zmq_msg_t *msg_); +      private:          ~ctx_t (); @@ -125,6 +130,11 @@ namespace zmq          //  Synchronisation of access to the list of inproc endpoints.          mutex_t endpoints_sync; +        //  PUB socket for logging. The socket is shared among all the threads, +        //  thus it is synchronised by a mutex. +        class socket_base_t *log_socket; +        mutex_t log_sync; +          ctx_t (const ctx_t&);          void operator = (const ctx_t&);      }; diff --git a/src/object.cpp b/src/object.cpp index 5f4a94e..7b5532b 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -137,6 +137,11 @@ zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_)      return ctx->find_endpoint (addr_);  } +void zmq::object_t::log (zmq_msg_t *msg_) +{ +    ctx->log (msg_); +} +  zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)  {      return ctx->choose_io_thread (taskset_); diff --git a/src/object.hpp b/src/object.hpp index 8652a86..6b52f4b 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -20,6 +20,8 @@  #ifndef __ZMQ_OBJECT_HPP_INCLUDED__  #define __ZMQ_OBJECT_HPP_INCLUDED__ +#include "../include/zmq.h" +  #include "stdint.hpp"  #include "blob.hpp" @@ -48,6 +50,9 @@ namespace zmq          void unregister_endpoints (class socket_base_t *socket_);          class socket_base_t *find_endpoint (const char *addr_); +        //  Logs an message. +        void log (zmq_msg_t *msg_); +          //  Chooses least loaded I/O thread.          class io_thread_t *choose_io_thread (uint64_t taskset_);  | 
