From ce0972dca3982538fd123b61fbae3928fad6d1e7 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 1 Sep 2010 07:57:38 +0200 Subject: context creates an inproc endpoint ('inproc://log') to distribute 0MQ's log messages --- src/ctx.cpp | 25 +++++++++++++++++++++++-- src/ctx.hpp | 10 ++++++++++ src/object.cpp | 5 +++++ src/object.hpp | 5 +++++ 4 files changed, 43 insertions(+), 2 deletions(-) diff --git a/src/ctx.cpp b/src/ctx.cpp index 79145eb..f66e1fe 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -20,8 +20,6 @@ #include #include -#include "../include/zmq.h" - #include "ctx.hpp" #include "socket_base.hpp" #include "io_thread.hpp" @@ -68,6 +66,12 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : empty_slots.push_back (i); slots [i] = NULL; } + + // Create the logging infrastructure. + log_socket = create_socket (ZMQ_PUB); + zmq_assert (log_socket); + int rc = log_socket->bind ("inproc://log"); + zmq_assert (rc == 0); } zmq::ctx_t::~ctx_t () @@ -104,6 +108,13 @@ int zmq::ctx_t::terminate () for (sockets_t::size_type i = 0; i != sockets.size (); i++) sockets [i]->stop (); + // Close the logging infrastructure. + log_sync.lock (); + int rc = log_socket->close (); + zmq_assert (rc == 0); + log_socket = NULL; + log_sync.unlock (); + // Find out whether there are any open sockets to care about. // If so, sleep till they are closed. Note that we can use // no_sockets_notify safely out of the critical section as once set @@ -287,6 +298,16 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_) return endpoint; } +void zmq::ctx_t::log (zmq_msg_t *msg_) +{ + // At this point we migrate the log socket to the current thread. + // We rely on mutex for executing the memory barrier. + log_sync.lock (); + if (log_socket) + log_socket->send (msg_, 0); + log_sync.unlock (); +} + void zmq::ctx_t::dezombify () { // Try to dezombify each zombie in the list. Note that caller is diff --git a/src/ctx.hpp b/src/ctx.hpp index a31e1d8..5f6cc83 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -24,6 +24,8 @@ #include #include +#include "../include/zmq.h" + #include "signaler.hpp" #include "semaphore.hpp" #include "ypipe.hpp" @@ -74,6 +76,9 @@ namespace zmq void unregister_endpoints (class socket_base_t *socket_); class socket_base_t *find_endpoint (const char *addr_); + // Logging. + void log (zmq_msg_t *msg_); + private: ~ctx_t (); @@ -125,6 +130,11 @@ namespace zmq // Synchronisation of access to the list of inproc endpoints. mutex_t endpoints_sync; + // PUB socket for logging. The socket is shared among all the threads, + // thus it is synchronised by a mutex. + class socket_base_t *log_socket; + mutex_t log_sync; + ctx_t (const ctx_t&); void operator = (const ctx_t&); }; diff --git a/src/object.cpp b/src/object.cpp index 5f4a94e..7b5532b 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -137,6 +137,11 @@ zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_) return ctx->find_endpoint (addr_); } +void zmq::object_t::log (zmq_msg_t *msg_) +{ + ctx->log (msg_); +} + zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) { return ctx->choose_io_thread (taskset_); diff --git a/src/object.hpp b/src/object.hpp index 8652a86..6b52f4b 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -20,6 +20,8 @@ #ifndef __ZMQ_OBJECT_HPP_INCLUDED__ #define __ZMQ_OBJECT_HPP_INCLUDED__ +#include "../include/zmq.h" + #include "stdint.hpp" #include "blob.hpp" @@ -48,6 +50,9 @@ namespace zmq void unregister_endpoints (class socket_base_t *socket_); class socket_base_t *find_endpoint (const char *addr_); + // Logs an message. + void log (zmq_msg_t *msg_); + // Chooses least loaded I/O thread. class io_thread_t *choose_io_thread (uint64_t taskset_); -- cgit v1.2.3