summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Makefile.am2
-rw-r--r--src/devpoll.cpp13
-rw-r--r--src/devpoll.hpp9
-rw-r--r--src/epoll.cpp12
-rw-r--r--src/epoll.hpp9
-rw-r--r--src/kqueue.cpp15
-rw-r--r--src/kqueue.hpp9
-rw-r--r--src/poll.cpp12
-rw-r--r--src/poll.hpp9
-rw-r--r--src/poller_base.cpp44
-rw-r--r--src/poller_base.hpp56
-rw-r--r--src/select.cpp12
-rw-r--r--src/select.hpp9
13 files changed, 125 insertions, 86 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index d2cc174..89fc44a 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -89,6 +89,7 @@ libzmq_la_SOURCES = \
platform.hpp \
poll.hpp \
poller.hpp \
+ poller_base.hpp \
pair.hpp \
pub.hpp \
pull.hpp \
@@ -148,6 +149,7 @@ libzmq_la_SOURCES = \
pgm_socket.cpp \
pipe.cpp \
poll.cpp \
+ poller_base.cpp \
pull.cpp \
push.cpp \
pub.cpp \
diff --git a/src/devpoll.cpp b/src/devpoll.cpp
index 7054c2b..32aca50 100644
--- a/src/devpoll.cpp
+++ b/src/devpoll.cpp
@@ -56,10 +56,6 @@ zmq::devpoll_t::devpoll_t () :
zmq::devpoll_t::~devpoll_t ()
{
worker.stop ();
-
- // Make sure there are no fds registered on shutdown.
- zmq_assert (load.get () == 0);
-
close (devpoll_fd);
}
@@ -84,7 +80,7 @@ zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_,
pending_list.push_back (fd_);
// Increase the load metric of the thread.
- load.add (1);
+ adjust_load (1);
return fd_;
}
@@ -97,7 +93,7 @@ void zmq::devpoll_t::rm_fd (handle_t handle_)
fd_table [handle_].valid = false;
// Decrease the load metric of the thread.
- load.sub (1);
+ adjust_load (-1);
}
void zmq::devpoll_t::set_pollin (handle_t handle_)
@@ -140,11 +136,6 @@ void zmq::devpoll_t::cancel_timer (i_poll_events *events_, int id_)
timers.erase (it);
}
-int zmq::devpoll_t::get_load ()
-{
- return load.get ();
-}
-
void zmq::devpoll_t::start ()
{
worker.start (worker_routine, this);
diff --git a/src/devpoll.hpp b/src/devpoll.hpp
index 00be385..ae16583 100644
--- a/src/devpoll.hpp
+++ b/src/devpoll.hpp
@@ -28,14 +28,14 @@
#include "fd.hpp"
#include "thread.hpp"
-#include "atomic_counter.hpp"
+#include "poller_base.hpp"
namespace zmq
{
// Implements socket polling mechanism using the "/dev/poll" interface.
- class devpoll_t
+ class devpoll_t : public poller_base_t
{
public:
@@ -53,7 +53,6 @@ namespace zmq
void reset_pollout (handle_t handle_);
void add_timer (int timeout_, struct i_poll_events *events_, int id_);
void cancel_timer (struct i_poll_events *events_, int id_);
- int get_load ();
void start ();
void stop ();
@@ -94,10 +93,6 @@ namespace zmq
// Handle of the physical thread doing the I/O work.
thread_t worker;
- // Load of the poller. Currently number of file descriptors
- // registered with the poller.
- atomic_counter_t load;
-
devpoll_t (const devpoll_t&);
void operator = (const devpoll_t&);
};
diff --git a/src/epoll.cpp b/src/epoll.cpp
index bbad639..584a13f 100644
--- a/src/epoll.cpp
+++ b/src/epoll.cpp
@@ -45,9 +45,6 @@ zmq::epoll_t::~epoll_t ()
// Wait till the worker thread exits.
worker.stop ();
- // Make sure there are no fds registered on shutdown.
- zmq_assert (load.get () == 0);
-
close (epoll_fd);
for (retired_t::iterator it = retired.begin (); it != retired.end (); it ++)
delete *it;
@@ -71,7 +68,7 @@ zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_)
errno_assert (rc != -1);
// Increase the load metric of the thread.
- load.add (1);
+ adjust_load (1);
return pe;
}
@@ -85,7 +82,7 @@ void zmq::epoll_t::rm_fd (handle_t handle_)
retired.push_back (pe);
// Decrease the load metric of the thread.
- load.sub (1);
+ adjust_load (-1);
}
void zmq::epoll_t::set_pollin (handle_t handle_)
@@ -133,11 +130,6 @@ void zmq::epoll_t::cancel_timer (i_poll_events *events_, int id_)
timers.erase (it);
}
-int zmq::epoll_t::get_load ()
-{
- return load.get ();
-}
-
void zmq::epoll_t::start ()
{
worker.start (worker_routine, this);
diff --git a/src/epoll.hpp b/src/epoll.hpp
index a68f055..a19fc0d 100644
--- a/src/epoll.hpp
+++ b/src/epoll.hpp
@@ -29,7 +29,7 @@
#include "fd.hpp"
#include "thread.hpp"
-#include "atomic_counter.hpp"
+#include "poller_base.hpp"
namespace zmq
{
@@ -37,7 +37,7 @@ namespace zmq
// This class implements socket polling mechanism using the Linux-specific
// epoll mechanism.
- class epoll_t
+ class epoll_t : public poller_base_t
{
public:
@@ -55,7 +55,6 @@ namespace zmq
void reset_pollout (handle_t handle_);
void add_timer (int timeout_, struct i_poll_events *events_, int id_);
void cancel_timer (struct i_poll_events *events_, int id_);
- int get_load ();
void start ();
void stop ();
@@ -91,10 +90,6 @@ namespace zmq
// Handle of the physical thread doing the I/O work.
thread_t worker;
- // Load of the poller. Currently number of file descriptors
- // registered with the poller.
- atomic_counter_t load;
-
epoll_t (const epoll_t&);
void operator = (const epoll_t&);
};
diff --git a/src/kqueue.cpp b/src/kqueue.cpp
index f76a08f..47178d3 100644
--- a/src/kqueue.cpp
+++ b/src/kqueue.cpp
@@ -54,10 +54,6 @@ zmq::kqueue_t::kqueue_t () :
zmq::kqueue_t::~kqueue_t ()
{
worker.stop ();
-
- // Make sure there are no fds registered on shutdown.
- zmq_assert (load.get () == 0);
-
close (kqueue_fd);
}
@@ -74,7 +70,7 @@ void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)
{
struct kevent ev;
- EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, (kevent_udata_t)NULL);
+ EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, (kevent_udata_t) NULL);
int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL);
errno_assert (rc != -1);
}
@@ -90,6 +86,8 @@ zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_,
pe->flag_pollout = 0;
pe->reactor = reactor_;
+ adjust_load (1);
+
return pe;
}
@@ -102,6 +100,8 @@ void zmq::kqueue_t::rm_fd (handle_t handle_)
kevent_delete (pe->fd, EVFILT_WRITE);
pe->fd = retired_fd;
retired.push_back (pe);
+
+ adjust_load (-1);
}
void zmq::kqueue_t::set_pollin (handle_t handle_)
@@ -144,11 +144,6 @@ void zmq::kqueue_t::cancel_timer (i_poll_events *events_, int id_)
timers.erase (it);
}
-int zmq::kqueue_t::get_load ()
-{
- return load.get ();
-}
-
void zmq::kqueue_t::start ()
{
worker.start (worker_routine, this);
diff --git a/src/kqueue.hpp b/src/kqueue.hpp
index 43c2a39..6a27260 100644
--- a/src/kqueue.hpp
+++ b/src/kqueue.hpp
@@ -29,7 +29,7 @@
#include "fd.hpp"
#include "thread.hpp"
-#include "atomic_counter.hpp"
+#include "poller_base.hpp"
namespace zmq
{
@@ -37,7 +37,7 @@ namespace zmq
// Implements socket polling mechanism using the BSD-specific
// kqueue interface.
- class kqueue_t
+ class kqueue_t : public poller_base_t
{
public:
@@ -55,7 +55,6 @@ namespace zmq
void reset_pollout (handle_t handle_);
void add_timer (int timeout_, struct i_poll_events *events_, int id_);
void cancel_timer (struct i_poll_events *events_, int id_);
- int get_load ();
void start ();
void stop ();
@@ -98,10 +97,6 @@ namespace zmq
// Handle of the physical thread doing the I/O work.
thread_t worker;
- // Load of the poller. Currently number of file descriptors
- // registered with the poller.
- atomic_counter_t load;
-
kqueue_t (const kqueue_t&);
void operator = (const kqueue_t&);
};
diff --git a/src/poll.cpp b/src/poll.cpp
index 513c405..9afa6b5 100644
--- a/src/poll.cpp
+++ b/src/poll.cpp
@@ -54,9 +54,6 @@ zmq::poll_t::poll_t () :
zmq::poll_t::~poll_t ()
{
worker.stop ();
-
- // Make sure there are no fds registered on shutdown.
- zmq_assert (load.get () == 0);
}
zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
@@ -69,7 +66,7 @@ zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
fd_table [fd_].events = events_;
// Increase the load metric of the thread.
- load.add (1);
+ adjust_load (1);
return fd_;
}
@@ -85,7 +82,7 @@ void zmq::poll_t::rm_fd (handle_t handle_)
retired = true;
// Decrease the load metric of the thread.
- load.sub (1);
+ adjust_load (-1);
}
void zmq::poll_t::set_pollin (handle_t handle_)
@@ -124,11 +121,6 @@ void zmq::poll_t::cancel_timer (i_poll_events *events_, int id_)
timers.erase (it);
}
-int zmq::poll_t::get_load ()
-{
- return load.get ();
-}
-
void zmq::poll_t::start ()
{
worker.start (worker_routine, this);
diff --git a/src/poll.hpp b/src/poll.hpp
index 96d18dd..e88c39d 100644
--- a/src/poll.hpp
+++ b/src/poll.hpp
@@ -34,7 +34,7 @@
#include "fd.hpp"
#include "thread.hpp"
-#include "atomic_counter.hpp"
+#include "poller_base.hpp"
namespace zmq
{
@@ -42,7 +42,7 @@ namespace zmq
// Implements socket polling mechanism using the POSIX.1-2001
// poll() system call.
- class poll_t
+ class poll_t : public poller_base_t
{
public:
@@ -60,7 +60,6 @@ namespace zmq
void reset_pollout (handle_t handle_);
void add_timer (int timeout_, struct i_poll_events *events_, int id_);
void cancel_timer (struct i_poll_events *events_, int id_);
- int get_load ();
void start ();
void stop ();
@@ -98,10 +97,6 @@ namespace zmq
// Handle of the physical thread doing the I/O work.
thread_t worker;
- // Load of the poller. Currently number of file descriptors
- // registered with the poller.
- atomic_counter_t load;
-
poll_t (const poll_t&);
void operator = (const poll_t&);
};
diff --git a/src/poller_base.cpp b/src/poller_base.cpp
new file mode 100644
index 0000000..f1de7e9
--- /dev/null
+++ b/src/poller_base.cpp
@@ -0,0 +1,44 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "poller_base.hpp"
+#include "err.hpp"
+
+zmq::poller_base_t::poller_base_t ()
+{
+}
+
+zmq::poller_base_t::~poller_base_t ()
+{
+ // Make sure there are no fds registered on shutdown.
+ zmq_assert (get_load () == 0);
+}
+
+int zmq::poller_base_t::get_load ()
+{
+ return load.get ();
+}
+
+void zmq::poller_base_t::adjust_load (int amount_)
+{
+ if (amount_ > 0)
+ load.add (amount_);
+ else if (amount_ < 0)
+ load.sub (-amount_);
+}
diff --git a/src/poller_base.hpp b/src/poller_base.hpp
new file mode 100644
index 0000000..0b0d53d
--- /dev/null
+++ b/src/poller_base.hpp
@@ -0,0 +1,56 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_POLLER_BASE_HPP_INCLUDED__
+#define __ZMQ_POLLER_BASE_HPP_INCLUDED__
+
+#include "atomic_counter.hpp"
+
+namespace zmq
+{
+
+ class poller_base_t
+ {
+ public:
+
+ poller_base_t ();
+ ~poller_base_t ();
+
+ // Returns load of the poller. Note that this function can be
+ // invoked from a different thread!
+ int get_load ();
+
+ protected:
+
+ // Called by individual poller implementations to manage the load.
+ void adjust_load (int amount_);
+
+ private:
+
+ // Load of the poller. Currently the number of file descriptors
+ // registered.
+ atomic_counter_t load;
+
+ poller_base_t (const poller_base_t&);
+ void operator = (const poller_base_t&);
+ };
+
+}
+
+#endif
diff --git a/src/select.cpp b/src/select.cpp
index f169239..a04ee66 100644
--- a/src/select.cpp
+++ b/src/select.cpp
@@ -54,9 +54,6 @@ zmq::select_t::select_t () :
zmq::select_t::~select_t ()
{
worker.stop ();
-
- // Make sure there are no fds registered on shutdown.
- zmq_assert (load.get () == 0);
}
zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
@@ -77,7 +74,7 @@ zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
maxfd = fd_;
// Increase the load metric of the thread.
- load.add (1);
+ adjust_load (1);
return fd_;
}
@@ -113,7 +110,7 @@ void zmq::select_t::rm_fd (handle_t handle_)
}
// Decrease the load metric of the thread.
- load.sub (1);
+ adjust_load (-1);
}
void zmq::select_t::set_pollin (handle_t handle_)
@@ -148,11 +145,6 @@ void zmq::select_t::cancel_timer (i_poll_events *events_, int id_)
timers.erase (it);
}
-int zmq::select_t::get_load ()
-{
- return load.get ();
-}
-
void zmq::select_t::start ()
{
worker.start (worker_routine, this);
diff --git a/src/select.hpp b/src/select.hpp
index ae19fe1..c6c7f51 100644
--- a/src/select.hpp
+++ b/src/select.hpp
@@ -36,7 +36,7 @@
#include "fd.hpp"
#include "thread.hpp"
-#include "atomic_counter.hpp"
+#include "poller_base.hpp"
namespace zmq
{
@@ -44,7 +44,7 @@ namespace zmq
// Implements socket polling mechanism using POSIX.1-2001 select()
// function.
- class select_t
+ class select_t : public poller_base_t
{
public:
@@ -62,7 +62,6 @@ namespace zmq
void reset_pollout (handle_t handle_);
void add_timer (int timeout_, struct i_poll_events *events_, int id_);
void cancel_timer (struct i_poll_events *events_, int id_);
- int get_load ();
void start ();
void stop ();
@@ -109,10 +108,6 @@ namespace zmq
// Handle of the physical thread doing the I/O work.
thread_t worker;
- // Load of the poller. Currently number of file descriptors
- // registered with the poller.
- atomic_counter_t load;
-
select_t (const select_t&);
void operator = (const select_t&);
};