diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/ctx.cpp | 53 | ||||
-rw-r--r-- | src/ctx.hpp | 13 | ||||
-rw-r--r-- | src/monitor.cpp | 80 | ||||
-rw-r--r-- | src/monitor.hpp | 70 | ||||
-rw-r--r-- | src/object.cpp | 10 | ||||
-rw-r--r-- | src/object.hpp | 4 |
7 files changed, 1 insertions, 231 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index c30c6cb..fdfa8d1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -43,7 +43,6 @@ libxs_la_SOURCES = \ lb.hpp \ likely.hpp \ mailbox.hpp \ - monitor.hpp \ msg.hpp \ mtrie.hpp \ mutex.hpp \ @@ -103,7 +102,6 @@ libxs_la_SOURCES = \ kqueue.cpp \ lb.cpp \ mailbox.cpp \ - monitor.cpp \ msg.cpp \ mtrie.cpp \ object.cpp \ diff --git a/src/ctx.cpp b/src/ctx.cpp index df7f564..6fb2b36 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -31,7 +31,6 @@ #include "ctx.hpp" #include "socket_base.hpp" -#include "monitor.hpp" #include "reaper.hpp" #include "pipe.hpp" #include "err.hpp" @@ -44,8 +43,6 @@ xs::ctx_t::ctx_t () : reaper (NULL), slot_count (0), slots (NULL), - monitor (NULL), - log_socket (NULL), max_sockets (512), io_thread_count (1) { @@ -98,20 +95,6 @@ int xs::ctx_t::terminate () // First attempt to terminate the context. if (!restarted) { - // Close the monitor object. Wait for done command from the monitor. - monitor->stop (); - command_t cmd; - int rc = term_mailbox.recv (&cmd, -1); - xs_assert (rc == 0); - xs_assert (cmd.type == command_t::done); - - // Close the logging socket. - log_sync.lock (); - rc = log_socket->close (); - xs_assert (rc == 0); - log_socket = NULL; - log_sync.unlock (); - // First send stop command to sockets so that any blocking calls // can be interrupted. If there are no sockets we can ask reaper // thread to stop. @@ -181,7 +164,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_) int maxs = max_sockets; int ios = io_thread_count; opt_sync.unlock (); - slot_count = maxs + ios + 3; + slot_count = maxs + ios + 2; slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); alloc_assert (slots); @@ -209,23 +192,6 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_) empty_slots.push_back (i); slots [i] = NULL; } - - // Create the socket to send logs to. - log_socket = create_socket (XS_PUB); - xs_assert (log_socket); - int linger = 0; - int rc = log_socket->setsockopt (XS_LINGER, &linger, sizeof (linger)); - errno_assert (rc == 0); - int hwm = 1; - rc = log_socket->setsockopt (XS_SNDHWM, &hwm, sizeof (hwm)); - errno_assert (rc == 0); - - // Create the monitor object. - io_thread_t *io_thread = choose_io_thread (0); - xs_assert (io_thread); - monitor = new (std::nothrow) monitor_t (io_thread); - alloc_assert (monitor); - monitor->start (); } slot_sync.lock (); @@ -374,23 +340,6 @@ xs::endpoint_t xs::ctx_t::find_endpoint (const char *addr_) return *endpoint; } -void xs::ctx_t::log (int sid_, const char *text_) -{ - monitor->log (sid_, text_); -} - -void xs::ctx_t::publish_logs (const char *text_) -{ - log_sync.lock (); - msg_t msg; - msg.init_size (strlen (text_) + 1); - memcpy (msg.data (), text_, strlen (text_) + 1); - int rc = log_socket->send (&msg, XS_DONTWAIT); - errno_assert (rc == 0); - msg.close (); - log_sync.unlock (); -} - // The last used socket ID, or 0 if no socket was used so far. Note that this // is a global variable. Thus, even sockets created in different contexts have // unique IDs. diff --git a/src/ctx.hpp b/src/ctx.hpp index b83fa1f..c4fa96a 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -38,7 +38,6 @@ namespace xs { class object_t; - class monitor_t; class io_thread_t; class socket_base_t; class reaper_t; @@ -94,10 +93,6 @@ namespace xs void unregister_endpoints (xs::socket_base_t *socket_); endpoint_t find_endpoint (const char *addr_); - // Logging related functions. - void log (int sid_, const char *text_); - void publish_logs (const char *text_); - enum { term_tid = 0, reaper_tid = 1 @@ -154,17 +149,9 @@ namespace xs // Synchronisation of access to the list of inproc endpoints. mutex_t endpoints_sync; - // Monitor object attached to the context. - xs::monitor_t *monitor; - // Maximum socket ID. static atomic_counter_t max_socket_id; - // PUB socket for logging. The socket is shared among all the threads, - // thus it is synchronised by a mutex. - xs::socket_base_t *log_socket; - mutex_t log_sync; - // Maximum number of sockets that can be opened at the same time. int max_sockets; diff --git a/src/monitor.cpp b/src/monitor.cpp deleted file mode 100644 index deab9e0..0000000 --- a/src/monitor.cpp +++ /dev/null @@ -1,80 +0,0 @@ -/* - Copyright (c) 2012 250bpm s.r.o. - Copyright (c) 2012 Other contributors as noted in the AUTHORS file - - This file is part of Crossroads I/O project. - - Crossroads I/O is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - Crossroads is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "monitor.hpp" -#include "io_thread.hpp" -#include "options.hpp" -#include "random.hpp" -#include "err.hpp" - -xs::monitor_t::monitor_t (xs::io_thread_t *io_thread_) : - own_t (io_thread_, options_t ()), - io_object_t (io_thread_), - timer (NULL) -{ -} - -xs::monitor_t::~monitor_t () -{ -} - -void xs::monitor_t::start () -{ - send_plug (this); -} - -void xs::monitor_t::stop () -{ - send_stop (); -} - -void xs::monitor_t::log (int sid_, const char *text_) -{ - sync.lock (); - text = text_; - sync.unlock (); -} - -void xs::monitor_t::process_plug () -{ - // Schedule sending of the first snapshot. - timer = add_timer (500 + (generate_random () % 1000)); -} - -void xs::monitor_t::process_stop () -{ - rm_timer (timer); - timer = NULL; - send_done (); - delete this; -} - -void xs::monitor_t::timer_event (handle_t handle_) -{ - xs_assert (handle_ == timer); - - // Send the snapshot here! - sync.lock (); - publish_logs (text.c_str ()); - sync.unlock (); - - // Wait before sending next snapshot. - timer = add_timer (500 + (generate_random () % 1000)); -} diff --git a/src/monitor.hpp b/src/monitor.hpp deleted file mode 100644 index 5bb0831..0000000 --- a/src/monitor.hpp +++ /dev/null @@ -1,70 +0,0 @@ -/* - Copyright (c) 2012 250bpm s.r.o. - Copyright (c) 2012 Other contributors as noted in the AUTHORS file - - This file is part of Crossroads I/O project. - - Crossroads I/O is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - Crossroads is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __XS_MONITOR_HPP_INCLUDED__ -#define __XS_MONITOR_HPP_INCLUDED__ - -#include <string> - -#include "own.hpp" -#include "mutex.hpp" -#include "io_object.hpp" - -namespace xs -{ - - class io_thread_t; - class socket_base_t; - - class monitor_t : public own_t, public io_object_t - { - public: - - monitor_t (xs::io_thread_t *io_thread_); - ~monitor_t (); - - void start (); - void stop (); - - void log (int sid_, const char *text_); - - private: - - // Handlers for incoming commands. - void process_plug (); - void process_stop (); - - // Events from the I/O thread. - void timer_event (handle_t handle_); - - // Actual monitoring data to send and the related critical section. - std::string text; - mutex_t sync; - - // Handle of the timer. - handle_t timer; - - monitor_t (const monitor_t&); - const monitor_t &operator = (const monitor_t&); - }; - -} - -#endif diff --git a/src/object.cpp b/src/object.cpp index 7739a75..5c1ed84 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -154,16 +154,6 @@ xs::io_thread_t *xs::object_t::choose_io_thread (uint64_t affinity_) return ctx->choose_io_thread (affinity_); } -void xs::object_t::log (int sid_, const char *text_) -{ - ctx->log (sid_, text_); -} - -void xs::object_t::publish_logs (const char *text_) -{ - ctx->publish_logs (text_); -} - void xs::object_t::send_stop () { // 'stop' command goes always from administrative thread to diff --git a/src/object.hpp b/src/object.hpp index b6b77cb..5b855a5 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -64,10 +64,6 @@ namespace xs // Chooses least loaded I/O thread. xs::io_thread_t *choose_io_thread (uint64_t affinity_); - // Logging related functions. - void log (int sid_, const char *text_); - void publish_logs (const char *textr_); - // Derived object can use these functions to send commands // to other objects. void send_stop (); |