diff options
| -rw-r--r-- | src/Makefile.am | 2 | ||||
| -rw-r--r-- | src/ctx.cpp | 22 | ||||
| -rw-r--r-- | src/ctx.hpp | 4 | ||||
| -rw-r--r-- | src/monitor.cpp | 68 | ||||
| -rw-r--r-- | src/monitor.hpp | 60 | ||||
| -rw-r--r-- | src/zmq.cpp | 3 | ||||
| -rw-r--r-- | tests/test_msg_flags.cpp | 5 | ||||
| -rw-r--r-- | tests/test_pair_inproc.cpp | 4 | ||||
| -rw-r--r-- | tests/test_reqrep_inproc.cpp | 2 | 
9 files changed, 161 insertions, 9 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index 4d3cba3..f38dd0a 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -34,6 +34,7 @@ libzmq_la_SOURCES = \      lb.hpp \      likely.hpp \      mailbox.hpp \ +    monitor.hpp \      msg.hpp \      mtrie.hpp \      mutex.hpp \ @@ -94,6 +95,7 @@ libzmq_la_SOURCES = \      kqueue.cpp \      lb.cpp \      mailbox.cpp \ +    monitor.cpp \      msg.cpp \      mtrie.cpp \      object.cpp \ diff --git a/src/ctx.cpp b/src/ctx.cpp index 54e665a..3ef2d4a 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -32,6 +32,7 @@  #include "ctx.hpp"  #include "socket_base.hpp"  #include "io_thread.hpp" +#include "monitor.hpp"  #include "reaper.hpp"  #include "pipe.hpp"  #include "err.hpp" @@ -79,6 +80,13 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :      zmq_assert (log_socket);      rc = log_socket->bind ("sys://log");      zmq_assert (rc == 0); + +    //  Create the monitor object. +    io_thread_t *io_thread = choose_io_thread (0); +    zmq_assert (io_thread); +    monitor = new (std::nothrow) monitor_t (io_thread); +    alloc_assert (monitor); +    monitor->start ();  }  bool zmq::ctx_t::check_tag () @@ -123,15 +131,23 @@ int zmq::ctx_t::terminate ()      //  First attempt to terminate the context.      if (!restarted) { +        //  Close the monitor object. Wait for done command from the monitor. +        monitor->stop (); +        command_t cmd; +        int rc = term_mailbox.recv (&cmd, -1); +        zmq_assert (rc == 0); +        zmq_assert (cmd.type == command_t::done); +          //  Close the logging infrastructure.          log_sync.lock (); -        int rc = log_socket->close (); +        rc = log_socket->close ();          zmq_assert (rc == 0);          log_socket = NULL;          log_sync.unlock (); -        //  First send stop command to sockets so that any blocking calls can be -        //  interrupted. If there are no sockets we can ask reaper thread to stop. +        //  First send stop command to sockets so that any blocking calls +        //  can be interrupted. If there are no sockets we can ask reaper +        //  thread to stop.          slot_sync.lock ();          terminating = true;          for (sockets_t::size_type i = 0; i != sockets.size (); i++) diff --git a/src/ctx.hpp b/src/ctx.hpp index bcffcd7..6a337d5 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -39,6 +39,7 @@ namespace zmq  {      class object_t; +    class monitor_t;      class io_thread_t;      class socket_base_t;      class reaper_t; @@ -152,6 +153,9 @@ namespace zmq          zmq::socket_base_t *log_socket;          mutex_t log_sync; +        //  Monitor object attached to the context. +        zmq::monitor_t *monitor; +          //  Maximum socket ID.          static atomic_counter_t max_socket_id; diff --git a/src/monitor.cpp b/src/monitor.cpp new file mode 100644 index 0000000..77bb8f1 --- /dev/null +++ b/src/monitor.cpp @@ -0,0 +1,68 @@ +/* +    Copyright (c) 2012 250bpm s.r.o. +    Copyright (c) 2012 Other contributors as noted in the AUTHORS file + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "monitor.hpp" +#include "io_thread.hpp" +#include "options.hpp" +#include "random.hpp" +#include "err.hpp" + +zmq::monitor_t::monitor_t (zmq::io_thread_t *io_thread_) : +    own_t (io_thread_, options_t ()), +    io_object_t (io_thread_) +{ +} + +zmq::monitor_t::~monitor_t () +{ +} + +void zmq::monitor_t::start () +{ +    send_plug (this); +} + +void zmq::monitor_t::stop () +{ +    send_stop (); +} + +void zmq::monitor_t::process_plug () +{ +    //  Schedule sending of the first snapshot. +    add_timer (500 + (generate_random () % 1000), timer_id); +} + +void zmq::monitor_t::process_stop () +{ +    cancel_timer (timer_id); +    send_done (); +    delete this; +} + +void zmq::monitor_t::timer_event (int id_) +{ +    zmq_assert (id_ == timer_id); + +    //  TODO: Send the snapshot here! + +    //  Wait before sending next snapshot. +    add_timer (500 + (generate_random () % 1000), timer_id); +} diff --git a/src/monitor.hpp b/src/monitor.hpp new file mode 100644 index 0000000..439939f --- /dev/null +++ b/src/monitor.hpp @@ -0,0 +1,60 @@ +/* +    Copyright (c) 2012 250bpm s.r.o. +    Copyright (c) 2012 Other contributors as noted in the AUTHORS file + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_MONITOR_HPP_INCLUDED__ +#define __ZMQ_MONITOR_HPP_INCLUDED__ + +#include "own.hpp" +#include "io_object.hpp" + +namespace zmq +{ + +    class io_thread_t; +    class socket_base_t; + +    class monitor_t : public own_t, public io_object_t +    { +    public: + +        monitor_t (zmq::io_thread_t *io_thread_); +        ~monitor_t (); + +        void start (); +        void stop (); + +    private: + +        enum {timer_id = 0x44}; + +        //  Handlers for incoming commands. +        void process_plug (); +        void process_stop (); + +        //  Events from the poller. +        void timer_event (int id_); + +        monitor_t (const monitor_t&); +        const monitor_t &operator = (const monitor_t&); +    }; + +} + +#endif diff --git a/src/zmq.cpp b/src/zmq.cpp index 84dcdd1..6417d8e 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -92,7 +92,8 @@ const char *zmq_strerror (int errnum_)  void *zmq_init (int io_threads_)  { -    if (io_threads_ < 0) { +    //  We need at least one I/O thread to run the monitor object in. +    if (io_threads_ < 1) {          errno = EINVAL;          return NULL;      } diff --git a/tests/test_msg_flags.cpp b/tests/test_msg_flags.cpp index 10fd526..f836184 100644 --- a/tests/test_msg_flags.cpp +++ b/tests/test_msg_flags.cpp @@ -1,5 +1,6 @@  /* -    Copyright (c) 2011 250bpm s.r.o. +    Copyright (c) 2011-2012 250bpm s.r.o. +    Copyright (c) 2011 Other contributors as noted in the AUTHORS file      This file is part of 0MQ. @@ -25,7 +26,7 @@  int main (int argc, char *argv [])  {      //  Create the infrastructure -    void *ctx = zmq_init (0); +    void *ctx = zmq_init (1);      assert (ctx);      void *sb = zmq_socket (ctx, ZMQ_XREP);      assert (sb); diff --git a/tests/test_pair_inproc.cpp b/tests/test_pair_inproc.cpp index 6705cc5..75a9720 100644 --- a/tests/test_pair_inproc.cpp +++ b/tests/test_pair_inproc.cpp @@ -1,5 +1,5 @@  /* -    Copyright (c) 2010-2011 250bpm s.r.o. +    Copyright (c) 2010-2012 250bpm s.r.o.      Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file      This file is part of 0MQ. @@ -26,7 +26,7 @@ int main (int argc, char *argv [])  {      fprintf (stderr, "test_pair_inproc running...\n"); -    void *ctx = zmq_init (0); +    void *ctx = zmq_init (1);      assert (ctx);      void *sb = zmq_socket (ctx, ZMQ_PAIR); diff --git a/tests/test_reqrep_inproc.cpp b/tests/test_reqrep_inproc.cpp index f710968..ae998eb 100644 --- a/tests/test_reqrep_inproc.cpp +++ b/tests/test_reqrep_inproc.cpp @@ -26,7 +26,7 @@ int main (int argc, char *argv [])  {      fprintf (stderr, "test_reqrep_inproc running...\n"); -    void *ctx = zmq_init (0); +    void *ctx = zmq_init (1);      assert (ctx);      void *sb = zmq_socket (ctx, ZMQ_REP); | 
