summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-02-16 10:01:40 +0900
committerMartin Sustrik <sustrik@250bpm.com>2012-02-16 10:01:40 +0900
commit1fc63e4dbcf1438eb571d720f57be68852f820f7 (patch)
tree8ef72f17deccfb7266bcffe7e8e8b1478116ca7c /src
parente45c2b847c7c0420309731d3705688b0daff9370 (diff)
More logging infrastructure
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/ctx.cpp33
-rw-r--r--src/ctx.hpp8
-rw-r--r--src/monitor.cpp8
-rw-r--r--src/monitor.hpp7
-rw-r--r--src/object.cpp5
-rw-r--r--src/object.hpp3
6 files changed, 60 insertions, 4 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 97398ae..d771f6f 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -44,7 +44,7 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
{
// Initialise the array of mailboxes. Additional three slots are for
// zmq_term thread and reaper thread.
- slot_count = max_sockets + io_threads_ + 2;
+ slot_count = max_sockets + io_threads_ + 3;
slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
alloc_assert (slots);
@@ -73,6 +73,18 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
slots [i] = NULL;
}
+ // Create the socket to send logs to.
+ log_socket = create_socket (ZMQ_PUB);
+ zmq_assert (log_socket);
+ int linger = 0;
+ int rc = log_socket->setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
+ errno_assert (rc == 0);
+ int hwm = 1;
+ rc = log_socket->setsockopt (ZMQ_SNDHWM, &hwm, sizeof (hwm));
+ errno_assert (rc == 0);
+ rc = log_socket->connect ("ipc:///tmp/zmqlogs.ipc");
+ errno_assert (rc == 0);
+
// Create the monitor object.
io_thread_t *io_thread = choose_io_thread (0);
zmq_assert (io_thread);
@@ -130,6 +142,13 @@ int zmq::ctx_t::terminate ()
zmq_assert (rc == 0);
zmq_assert (cmd.type == command_t::done);
+ // Close the logging socket.
+ log_sync.lock ();
+ rc = log_socket->close ();
+ zmq_assert (rc == 0);
+ log_socket = NULL;
+ log_sync.unlock ();
+
// First send stop command to sockets so that any blocking calls
// can be interrupted. If there are no sockets we can ask reaper
// thread to stop.
@@ -312,6 +331,18 @@ void zmq::ctx_t::log (int sid_, const char *text_)
monitor->log (sid_, text_);
}
+void zmq::ctx_t::publish_logs (const char *text_)
+{
+ log_sync.lock ();
+ msg_t msg;
+ msg.init_size (strlen (text_) + 1);
+ memcpy (msg.data (), text_, strlen (text_) + 1);
+ int rc = log_socket->send (&msg, ZMQ_DONTWAIT);
+ errno_assert (rc == 0);
+ msg.close ();
+ log_sync.unlock ();
+}
+
// The last used socket ID, or 0 if no socket was used so far. Note that this
// is a global variable. Thus, even sockets created in different contexts have
// unique IDs.
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 876b53c..2859282 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -93,8 +93,9 @@ namespace zmq
void unregister_endpoints (zmq::socket_base_t *socket_);
endpoint_t find_endpoint (const char *addr_);
- // Logging.
+ // Logging related functions.
void log (int sid_, const char *text_);
+ void publish_logs (const char *text_);
enum {
term_tid = 0,
@@ -154,6 +155,11 @@ namespace zmq
// Maximum socket ID.
static atomic_counter_t max_socket_id;
+ // PUB socket for logging. The socket is shared among all the threads,
+ // thus it is synchronised by a mutex.
+ zmq::socket_base_t *log_socket;
+ mutex_t log_sync;
+
ctx_t (const ctx_t&);
const ctx_t &operator = (const ctx_t&);
};
diff --git a/src/monitor.cpp b/src/monitor.cpp
index 95212fd..1cccfd0 100644
--- a/src/monitor.cpp
+++ b/src/monitor.cpp
@@ -46,6 +46,9 @@ void zmq::monitor_t::stop ()
void zmq::monitor_t::log (int sid_, const char *text_)
{
+ sync.lock ();
+ text = text_;
+ sync.unlock ();
}
void zmq::monitor_t::process_plug ()
@@ -65,7 +68,10 @@ void zmq::monitor_t::timer_event (int id_)
{
zmq_assert (id_ == timer_id);
- // TODO: Send the snapshot here!
+ // Send the snapshot here!
+ sync.lock ();
+ publish_logs (text.c_str ());
+ sync.unlock ();
// Wait before sending next snapshot.
add_timer (500 + (generate_random () % 1000), timer_id);
diff --git a/src/monitor.hpp b/src/monitor.hpp
index 83ffb26..c8fa823 100644
--- a/src/monitor.hpp
+++ b/src/monitor.hpp
@@ -21,7 +21,10 @@
#ifndef __ZMQ_MONITOR_HPP_INCLUDED__
#define __ZMQ_MONITOR_HPP_INCLUDED__
+#include <string>
+
#include "own.hpp"
+#include "mutex.hpp"
#include "io_object.hpp"
namespace zmq
@@ -53,6 +56,10 @@ namespace zmq
// Events from the poller.
void timer_event (int id_);
+ // Actual monitoring data to send and the related critical section.
+ std::string text;
+ mutex_t sync;
+
monitor_t (const monitor_t&);
const monitor_t &operator = (const monitor_t&);
};
diff --git a/src/object.cpp b/src/object.cpp
index c9eb3dd..e4e94d9 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -159,6 +159,11 @@ void zmq::object_t::log (int sid_, const char *text_)
ctx->log (sid_, text_);
}
+void zmq::object_t::publish_logs (const char *text_)
+{
+ ctx->publish_logs (text_);
+}
+
void zmq::object_t::send_stop ()
{
// 'stop' command goes always from administrative thread to
diff --git a/src/object.hpp b/src/object.hpp
index d6d55ae..6ea3bb2 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -64,8 +64,9 @@ namespace zmq
// Chooses least loaded I/O thread.
zmq::io_thread_t *choose_io_thread (uint64_t affinity_);
- // Log a message.
+ // Logging related functions.
void log (int sid_, const char *text_);
+ void publish_logs (const char *textr_);
// Derived object can use these functions to send commands
// to other objects.