summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Makefile.am2
-rw-r--r--src/ctx.cpp22
-rw-r--r--src/ctx.hpp4
-rw-r--r--src/monitor.cpp68
-rw-r--r--src/monitor.hpp60
-rw-r--r--src/zmq.cpp3
-rw-r--r--tests/test_msg_flags.cpp5
-rw-r--r--tests/test_pair_inproc.cpp4
-rw-r--r--tests/test_reqrep_inproc.cpp2
9 files changed, 161 insertions, 9 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 4d3cba3..f38dd0a 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -34,6 +34,7 @@ libzmq_la_SOURCES = \
lb.hpp \
likely.hpp \
mailbox.hpp \
+ monitor.hpp \
msg.hpp \
mtrie.hpp \
mutex.hpp \
@@ -94,6 +95,7 @@ libzmq_la_SOURCES = \
kqueue.cpp \
lb.cpp \
mailbox.cpp \
+ monitor.cpp \
msg.cpp \
mtrie.cpp \
object.cpp \
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 54e665a..3ef2d4a 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -32,6 +32,7 @@
#include "ctx.hpp"
#include "socket_base.hpp"
#include "io_thread.hpp"
+#include "monitor.hpp"
#include "reaper.hpp"
#include "pipe.hpp"
#include "err.hpp"
@@ -79,6 +80,13 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
zmq_assert (log_socket);
rc = log_socket->bind ("sys://log");
zmq_assert (rc == 0);
+
+ // Create the monitor object.
+ io_thread_t *io_thread = choose_io_thread (0);
+ zmq_assert (io_thread);
+ monitor = new (std::nothrow) monitor_t (io_thread);
+ alloc_assert (monitor);
+ monitor->start ();
}
bool zmq::ctx_t::check_tag ()
@@ -123,15 +131,23 @@ int zmq::ctx_t::terminate ()
// First attempt to terminate the context.
if (!restarted) {
+ // Close the monitor object. Wait for done command from the monitor.
+ monitor->stop ();
+ command_t cmd;
+ int rc = term_mailbox.recv (&cmd, -1);
+ zmq_assert (rc == 0);
+ zmq_assert (cmd.type == command_t::done);
+
// Close the logging infrastructure.
log_sync.lock ();
- int rc = log_socket->close ();
+ rc = log_socket->close ();
zmq_assert (rc == 0);
log_socket = NULL;
log_sync.unlock ();
- // First send stop command to sockets so that any blocking calls can be
- // interrupted. If there are no sockets we can ask reaper thread to stop.
+ // First send stop command to sockets so that any blocking calls
+ // can be interrupted. If there are no sockets we can ask reaper
+ // thread to stop.
slot_sync.lock ();
terminating = true;
for (sockets_t::size_type i = 0; i != sockets.size (); i++)
diff --git a/src/ctx.hpp b/src/ctx.hpp
index bcffcd7..6a337d5 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -39,6 +39,7 @@ namespace zmq
{
class object_t;
+ class monitor_t;
class io_thread_t;
class socket_base_t;
class reaper_t;
@@ -152,6 +153,9 @@ namespace zmq
zmq::socket_base_t *log_socket;
mutex_t log_sync;
+ // Monitor object attached to the context.
+ zmq::monitor_t *monitor;
+
// Maximum socket ID.
static atomic_counter_t max_socket_id;
diff --git a/src/monitor.cpp b/src/monitor.cpp
new file mode 100644
index 0000000..77bb8f1
--- /dev/null
+++ b/src/monitor.cpp
@@ -0,0 +1,68 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "monitor.hpp"
+#include "io_thread.hpp"
+#include "options.hpp"
+#include "random.hpp"
+#include "err.hpp"
+
+zmq::monitor_t::monitor_t (zmq::io_thread_t *io_thread_) :
+ own_t (io_thread_, options_t ()),
+ io_object_t (io_thread_)
+{
+}
+
+zmq::monitor_t::~monitor_t ()
+{
+}
+
+void zmq::monitor_t::start ()
+{
+ send_plug (this);
+}
+
+void zmq::monitor_t::stop ()
+{
+ send_stop ();
+}
+
+void zmq::monitor_t::process_plug ()
+{
+ // Schedule sending of the first snapshot.
+ add_timer (500 + (generate_random () % 1000), timer_id);
+}
+
+void zmq::monitor_t::process_stop ()
+{
+ cancel_timer (timer_id);
+ send_done ();
+ delete this;
+}
+
+void zmq::monitor_t::timer_event (int id_)
+{
+ zmq_assert (id_ == timer_id);
+
+ // TODO: Send the snapshot here!
+
+ // Wait before sending next snapshot.
+ add_timer (500 + (generate_random () % 1000), timer_id);
+}
diff --git a/src/monitor.hpp b/src/monitor.hpp
new file mode 100644
index 0000000..439939f
--- /dev/null
+++ b/src/monitor.hpp
@@ -0,0 +1,60 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_MONITOR_HPP_INCLUDED__
+#define __ZMQ_MONITOR_HPP_INCLUDED__
+
+#include "own.hpp"
+#include "io_object.hpp"
+
+namespace zmq
+{
+
+ class io_thread_t;
+ class socket_base_t;
+
+ class monitor_t : public own_t, public io_object_t
+ {
+ public:
+
+ monitor_t (zmq::io_thread_t *io_thread_);
+ ~monitor_t ();
+
+ void start ();
+ void stop ();
+
+ private:
+
+ enum {timer_id = 0x44};
+
+ // Handlers for incoming commands.
+ void process_plug ();
+ void process_stop ();
+
+ // Events from the poller.
+ void timer_event (int id_);
+
+ monitor_t (const monitor_t&);
+ const monitor_t &operator = (const monitor_t&);
+ };
+
+}
+
+#endif
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 84dcdd1..6417d8e 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -92,7 +92,8 @@ const char *zmq_strerror (int errnum_)
void *zmq_init (int io_threads_)
{
- if (io_threads_ < 0) {
+ // We need at least one I/O thread to run the monitor object in.
+ if (io_threads_ < 1) {
errno = EINVAL;
return NULL;
}
diff --git a/tests/test_msg_flags.cpp b/tests/test_msg_flags.cpp
index 10fd526..f836184 100644
--- a/tests/test_msg_flags.cpp
+++ b/tests/test_msg_flags.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011-2012 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -25,7 +26,7 @@
int main (int argc, char *argv [])
{
// Create the infrastructure
- void *ctx = zmq_init (0);
+ void *ctx = zmq_init (1);
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_XREP);
assert (sb);
diff --git a/tests/test_pair_inproc.cpp b/tests/test_pair_inproc.cpp
index 6705cc5..75a9720 100644
--- a/tests/test_pair_inproc.cpp
+++ b/tests/test_pair_inproc.cpp
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2012 250bpm s.r.o.
Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -26,7 +26,7 @@ int main (int argc, char *argv [])
{
fprintf (stderr, "test_pair_inproc running...\n");
- void *ctx = zmq_init (0);
+ void *ctx = zmq_init (1);
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_PAIR);
diff --git a/tests/test_reqrep_inproc.cpp b/tests/test_reqrep_inproc.cpp
index f710968..ae998eb 100644
--- a/tests/test_reqrep_inproc.cpp
+++ b/tests/test_reqrep_inproc.cpp
@@ -26,7 +26,7 @@ int main (int argc, char *argv [])
{
fprintf (stderr, "test_reqrep_inproc running...\n");
- void *ctx = zmq_init (0);
+ void *ctx = zmq_init (1);
assert (ctx);
void *sb = zmq_socket (ctx, ZMQ_REP);