summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-09-01 07:57:38 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-09-01 07:57:38 +0200
commitce0972dca3982538fd123b61fbae3928fad6d1e7 (patch)
treef15d4de3c7dd2aadc3c25b6f8a602c4fe64a334d
parentdb73c76314d7109da4b400a3edb107c4eda802a2 (diff)
context creates an inproc endpoint ('inproc://log') to distribute 0MQ's log messages
-rw-r--r--src/ctx.cpp25
-rw-r--r--src/ctx.hpp10
-rw-r--r--src/object.cpp5
-rw-r--r--src/object.hpp5
4 files changed, 43 insertions, 2 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 79145eb..f66e1fe 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -20,8 +20,6 @@
#include <new>
#include <string.h>
-#include "../include/zmq.h"
-
#include "ctx.hpp"
#include "socket_base.hpp"
#include "io_thread.hpp"
@@ -68,6 +66,12 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
empty_slots.push_back (i);
slots [i] = NULL;
}
+
+ // Create the logging infrastructure.
+ log_socket = create_socket (ZMQ_PUB);
+ zmq_assert (log_socket);
+ int rc = log_socket->bind ("inproc://log");
+ zmq_assert (rc == 0);
}
zmq::ctx_t::~ctx_t ()
@@ -104,6 +108,13 @@ int zmq::ctx_t::terminate ()
for (sockets_t::size_type i = 0; i != sockets.size (); i++)
sockets [i]->stop ();
+ // Close the logging infrastructure.
+ log_sync.lock ();
+ int rc = log_socket->close ();
+ zmq_assert (rc == 0);
+ log_socket = NULL;
+ log_sync.unlock ();
+
// Find out whether there are any open sockets to care about.
// If so, sleep till they are closed. Note that we can use
// no_sockets_notify safely out of the critical section as once set
@@ -287,6 +298,16 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_)
return endpoint;
}
+void zmq::ctx_t::log (zmq_msg_t *msg_)
+{
+ // At this point we migrate the log socket to the current thread.
+ // We rely on mutex for executing the memory barrier.
+ log_sync.lock ();
+ if (log_socket)
+ log_socket->send (msg_, 0);
+ log_sync.unlock ();
+}
+
void zmq::ctx_t::dezombify ()
{
// Try to dezombify each zombie in the list. Note that caller is
diff --git a/src/ctx.hpp b/src/ctx.hpp
index a31e1d8..5f6cc83 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -24,6 +24,8 @@
#include <vector>
#include <string>
+#include "../include/zmq.h"
+
#include "signaler.hpp"
#include "semaphore.hpp"
#include "ypipe.hpp"
@@ -74,6 +76,9 @@ namespace zmq
void unregister_endpoints (class socket_base_t *socket_);
class socket_base_t *find_endpoint (const char *addr_);
+ // Logging.
+ void log (zmq_msg_t *msg_);
+
private:
~ctx_t ();
@@ -125,6 +130,11 @@ namespace zmq
// Synchronisation of access to the list of inproc endpoints.
mutex_t endpoints_sync;
+ // PUB socket for logging. The socket is shared among all the threads,
+ // thus it is synchronised by a mutex.
+ class socket_base_t *log_socket;
+ mutex_t log_sync;
+
ctx_t (const ctx_t&);
void operator = (const ctx_t&);
};
diff --git a/src/object.cpp b/src/object.cpp
index 5f4a94e..7b5532b 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -137,6 +137,11 @@ zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_)
return ctx->find_endpoint (addr_);
}
+void zmq::object_t::log (zmq_msg_t *msg_)
+{
+ ctx->log (msg_);
+}
+
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
{
return ctx->choose_io_thread (taskset_);
diff --git a/src/object.hpp b/src/object.hpp
index 8652a86..6b52f4b 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -20,6 +20,8 @@
#ifndef __ZMQ_OBJECT_HPP_INCLUDED__
#define __ZMQ_OBJECT_HPP_INCLUDED__
+#include "../include/zmq.h"
+
#include "stdint.hpp"
#include "blob.hpp"
@@ -48,6 +50,9 @@ namespace zmq
void unregister_endpoints (class socket_base_t *socket_);
class socket_base_t *find_endpoint (const char *addr_);
+ // Logs an message.
+ void log (zmq_msg_t *msg_);
+
// Chooses least loaded I/O thread.
class io_thread_t *choose_io_thread (uint64_t taskset_);