From b8ba042912233045b6ca74f732bcc7265dfeb5b0 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 16 Feb 2012 10:01:22 +0900 Subject: Monitor object added Signed-off-by: Martin Sustrik --- src/Makefile.am | 2 ++ src/ctx.cpp | 22 ++++++++++++-- src/ctx.hpp | 4 +++ src/monitor.cpp | 68 ++++++++++++++++++++++++++++++++++++++++++++ src/monitor.hpp | 60 ++++++++++++++++++++++++++++++++++++++ src/zmq.cpp | 3 +- tests/test_msg_flags.cpp | 5 ++-- tests/test_pair_inproc.cpp | 4 +-- tests/test_reqrep_inproc.cpp | 2 +- 9 files changed, 161 insertions(+), 9 deletions(-) create mode 100644 src/monitor.cpp create mode 100644 src/monitor.hpp diff --git a/src/Makefile.am b/src/Makefile.am index 4d3cba3..f38dd0a 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -34,6 +34,7 @@ libzmq_la_SOURCES = \ lb.hpp \ likely.hpp \ mailbox.hpp \ + monitor.hpp \ msg.hpp \ mtrie.hpp \ mutex.hpp \ @@ -94,6 +95,7 @@ libzmq_la_SOURCES = \ kqueue.cpp \ lb.cpp \ mailbox.cpp \ + monitor.cpp \ msg.cpp \ mtrie.cpp \ object.cpp \ diff --git a/src/ctx.cpp b/src/ctx.cpp index 54e665a..3ef2d4a 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -32,6 +32,7 @@ #include "ctx.hpp" #include "socket_base.hpp" #include "io_thread.hpp" +#include "monitor.hpp" #include "reaper.hpp" #include "pipe.hpp" #include "err.hpp" @@ -79,6 +80,13 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : zmq_assert (log_socket); rc = log_socket->bind ("sys://log"); zmq_assert (rc == 0); + + // Create the monitor object. + io_thread_t *io_thread = choose_io_thread (0); + zmq_assert (io_thread); + monitor = new (std::nothrow) monitor_t (io_thread); + alloc_assert (monitor); + monitor->start (); } bool zmq::ctx_t::check_tag () @@ -123,15 +131,23 @@ int zmq::ctx_t::terminate () // First attempt to terminate the context. if (!restarted) { + // Close the monitor object. Wait for done command from the monitor. + monitor->stop (); + command_t cmd; + int rc = term_mailbox.recv (&cmd, -1); + zmq_assert (rc == 0); + zmq_assert (cmd.type == command_t::done); + // Close the logging infrastructure. log_sync.lock (); - int rc = log_socket->close (); + rc = log_socket->close (); zmq_assert (rc == 0); log_socket = NULL; log_sync.unlock (); - // First send stop command to sockets so that any blocking calls can be - // interrupted. If there are no sockets we can ask reaper thread to stop. + // First send stop command to sockets so that any blocking calls + // can be interrupted. If there are no sockets we can ask reaper + // thread to stop. slot_sync.lock (); terminating = true; for (sockets_t::size_type i = 0; i != sockets.size (); i++) diff --git a/src/ctx.hpp b/src/ctx.hpp index bcffcd7..6a337d5 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -39,6 +39,7 @@ namespace zmq { class object_t; + class monitor_t; class io_thread_t; class socket_base_t; class reaper_t; @@ -152,6 +153,9 @@ namespace zmq zmq::socket_base_t *log_socket; mutex_t log_sync; + // Monitor object attached to the context. + zmq::monitor_t *monitor; + // Maximum socket ID. static atomic_counter_t max_socket_id; diff --git a/src/monitor.cpp b/src/monitor.cpp new file mode 100644 index 0000000..77bb8f1 --- /dev/null +++ b/src/monitor.cpp @@ -0,0 +1,68 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "monitor.hpp" +#include "io_thread.hpp" +#include "options.hpp" +#include "random.hpp" +#include "err.hpp" + +zmq::monitor_t::monitor_t (zmq::io_thread_t *io_thread_) : + own_t (io_thread_, options_t ()), + io_object_t (io_thread_) +{ +} + +zmq::monitor_t::~monitor_t () +{ +} + +void zmq::monitor_t::start () +{ + send_plug (this); +} + +void zmq::monitor_t::stop () +{ + send_stop (); +} + +void zmq::monitor_t::process_plug () +{ + // Schedule sending of the first snapshot. + add_timer (500 + (generate_random () % 1000), timer_id); +} + +void zmq::monitor_t::process_stop () +{ + cancel_timer (timer_id); + send_done (); + delete this; +} + +void zmq::monitor_t::timer_event (int id_) +{ + zmq_assert (id_ == timer_id); + + // TODO: Send the snapshot here! + + // Wait before sending next snapshot. + add_timer (500 + (generate_random () % 1000), timer_id); +} diff --git a/src/monitor.hpp b/src/monitor.hpp new file mode 100644 index 0000000..439939f --- /dev/null +++ b/src/monitor.hpp @@ -0,0 +1,60 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_MONITOR_HPP_INCLUDED__ +#define __ZMQ_MONITOR_HPP_INCLUDED__ + +#include "own.hpp" +#include "io_object.hpp" + +namespace zmq +{ + + class io_thread_t; + class socket_base_t; + + class monitor_t : public own_t, public io_object_t + { + public: + + monitor_t (zmq::io_thread_t *io_thread_); + ~monitor_t (); + + void start (); + void stop (); + + private: + + enum {timer_id = 0x44}; + + // Handlers for incoming commands. + void process_plug (); + void process_stop (); + + // Events from the poller. + void timer_event (int id_); + + monitor_t (const monitor_t&); + const monitor_t &operator = (const monitor_t&); + }; + +} + +#endif diff --git a/src/zmq.cpp b/src/zmq.cpp index 84dcdd1..6417d8e 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -92,7 +92,8 @@ const char *zmq_strerror (int errnum_) void *zmq_init (int io_threads_) { - if (io_threads_ < 0) { + // We need at least one I/O thread to run the monitor object in. + if (io_threads_ < 1) { errno = EINVAL; return NULL; } diff --git a/tests/test_msg_flags.cpp b/tests/test_msg_flags.cpp index 10fd526..f836184 100644 --- a/tests/test_msg_flags.cpp +++ b/tests/test_msg_flags.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2011 250bpm s.r.o. + Copyright (c) 2011-2012 250bpm s.r.o. + Copyright (c) 2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -25,7 +26,7 @@ int main (int argc, char *argv []) { // Create the infrastructure - void *ctx = zmq_init (0); + void *ctx = zmq_init (1); assert (ctx); void *sb = zmq_socket (ctx, ZMQ_XREP); assert (sb); diff --git a/tests/test_pair_inproc.cpp b/tests/test_pair_inproc.cpp index 6705cc5..75a9720 100644 --- a/tests/test_pair_inproc.cpp +++ b/tests/test_pair_inproc.cpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2010-2012 250bpm s.r.o. Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -26,7 +26,7 @@ int main (int argc, char *argv []) { fprintf (stderr, "test_pair_inproc running...\n"); - void *ctx = zmq_init (0); + void *ctx = zmq_init (1); assert (ctx); void *sb = zmq_socket (ctx, ZMQ_PAIR); diff --git a/tests/test_reqrep_inproc.cpp b/tests/test_reqrep_inproc.cpp index f710968..ae998eb 100644 --- a/tests/test_reqrep_inproc.cpp +++ b/tests/test_reqrep_inproc.cpp @@ -26,7 +26,7 @@ int main (int argc, char *argv []) { fprintf (stderr, "test_reqrep_inproc running...\n"); - void *ctx = zmq_init (0); + void *ctx = zmq_init (1); assert (ctx); void *sb = zmq_socket (ctx, ZMQ_REP); -- cgit v1.2.3