From 1fc63e4dbcf1438eb571d720f57be68852f820f7 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 16 Feb 2012 10:01:40 +0900 Subject: More logging infrastructure Signed-off-by: Martin Sustrik --- src/ctx.cpp | 33 ++++++++++++++++++++++++++++++++- src/ctx.hpp | 8 +++++++- src/monitor.cpp | 8 +++++++- src/monitor.hpp | 7 +++++++ src/object.cpp | 5 +++++ src/object.hpp | 3 ++- 6 files changed, 60 insertions(+), 4 deletions(-) diff --git a/src/ctx.cpp b/src/ctx.cpp index 97398ae..d771f6f 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -44,7 +44,7 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : { // Initialise the array of mailboxes. Additional three slots are for // zmq_term thread and reaper thread. - slot_count = max_sockets + io_threads_ + 2; + slot_count = max_sockets + io_threads_ + 3; slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); alloc_assert (slots); @@ -73,6 +73,18 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : slots [i] = NULL; } + // Create the socket to send logs to. + log_socket = create_socket (ZMQ_PUB); + zmq_assert (log_socket); + int linger = 0; + int rc = log_socket->setsockopt (ZMQ_LINGER, &linger, sizeof (linger)); + errno_assert (rc == 0); + int hwm = 1; + rc = log_socket->setsockopt (ZMQ_SNDHWM, &hwm, sizeof (hwm)); + errno_assert (rc == 0); + rc = log_socket->connect ("ipc:///tmp/zmqlogs.ipc"); + errno_assert (rc == 0); + // Create the monitor object. io_thread_t *io_thread = choose_io_thread (0); zmq_assert (io_thread); @@ -130,6 +142,13 @@ int zmq::ctx_t::terminate () zmq_assert (rc == 0); zmq_assert (cmd.type == command_t::done); + // Close the logging socket. + log_sync.lock (); + rc = log_socket->close (); + zmq_assert (rc == 0); + log_socket = NULL; + log_sync.unlock (); + // First send stop command to sockets so that any blocking calls // can be interrupted. If there are no sockets we can ask reaper // thread to stop. @@ -312,6 +331,18 @@ void zmq::ctx_t::log (int sid_, const char *text_) monitor->log (sid_, text_); } +void zmq::ctx_t::publish_logs (const char *text_) +{ + log_sync.lock (); + msg_t msg; + msg.init_size (strlen (text_) + 1); + memcpy (msg.data (), text_, strlen (text_) + 1); + int rc = log_socket->send (&msg, ZMQ_DONTWAIT); + errno_assert (rc == 0); + msg.close (); + log_sync.unlock (); +} + // The last used socket ID, or 0 if no socket was used so far. Note that this // is a global variable. Thus, even sockets created in different contexts have // unique IDs. diff --git a/src/ctx.hpp b/src/ctx.hpp index 876b53c..2859282 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -93,8 +93,9 @@ namespace zmq void unregister_endpoints (zmq::socket_base_t *socket_); endpoint_t find_endpoint (const char *addr_); - // Logging. + // Logging related functions. void log (int sid_, const char *text_); + void publish_logs (const char *text_); enum { term_tid = 0, @@ -154,6 +155,11 @@ namespace zmq // Maximum socket ID. static atomic_counter_t max_socket_id; + // PUB socket for logging. The socket is shared among all the threads, + // thus it is synchronised by a mutex. + zmq::socket_base_t *log_socket; + mutex_t log_sync; + ctx_t (const ctx_t&); const ctx_t &operator = (const ctx_t&); }; diff --git a/src/monitor.cpp b/src/monitor.cpp index 95212fd..1cccfd0 100644 --- a/src/monitor.cpp +++ b/src/monitor.cpp @@ -46,6 +46,9 @@ void zmq::monitor_t::stop () void zmq::monitor_t::log (int sid_, const char *text_) { + sync.lock (); + text = text_; + sync.unlock (); } void zmq::monitor_t::process_plug () @@ -65,7 +68,10 @@ void zmq::monitor_t::timer_event (int id_) { zmq_assert (id_ == timer_id); - // TODO: Send the snapshot here! + // Send the snapshot here! + sync.lock (); + publish_logs (text.c_str ()); + sync.unlock (); // Wait before sending next snapshot. add_timer (500 + (generate_random () % 1000), timer_id); diff --git a/src/monitor.hpp b/src/monitor.hpp index 83ffb26..c8fa823 100644 --- a/src/monitor.hpp +++ b/src/monitor.hpp @@ -21,7 +21,10 @@ #ifndef __ZMQ_MONITOR_HPP_INCLUDED__ #define __ZMQ_MONITOR_HPP_INCLUDED__ +#include + #include "own.hpp" +#include "mutex.hpp" #include "io_object.hpp" namespace zmq @@ -53,6 +56,10 @@ namespace zmq // Events from the poller. void timer_event (int id_); + // Actual monitoring data to send and the related critical section. + std::string text; + mutex_t sync; + monitor_t (const monitor_t&); const monitor_t &operator = (const monitor_t&); }; diff --git a/src/object.cpp b/src/object.cpp index c9eb3dd..e4e94d9 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -159,6 +159,11 @@ void zmq::object_t::log (int sid_, const char *text_) ctx->log (sid_, text_); } +void zmq::object_t::publish_logs (const char *text_) +{ + ctx->publish_logs (text_); +} + void zmq::object_t::send_stop () { // 'stop' command goes always from administrative thread to diff --git a/src/object.hpp b/src/object.hpp index d6d55ae..6ea3bb2 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -64,8 +64,9 @@ namespace zmq // Chooses least loaded I/O thread. zmq::io_thread_t *choose_io_thread (uint64_t affinity_); - // Log a message. + // Logging related functions. void log (int sid_, const char *text_); + void publish_logs (const char *textr_); // Derived object can use these functions to send commands // to other objects. -- cgit v1.2.3