summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-04-29 17:20:23 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-04-29 17:20:23 +0200
commitc193fd146661b39027c5e3fa0776dcdf8c6af5e2 (patch)
tree0ec001869a7e6519cc1b910de0b26c46308eba96
parent7cb076e56a18cb76c49f17bd34bc73c11e01b705 (diff)
lock-free polling removed; ZMQ_POLL flag removed
-rw-r--r--doc/zmq_init.txt8
-rw-r--r--include/zmq.h1
-rw-r--r--src/Makefile.am5
-rw-r--r--src/app_thread.cpp22
-rw-r--r--src/app_thread.hpp8
-rw-r--r--src/atomic_bitmap.hpp310
-rw-r--r--src/dispatcher.cpp7
-rw-r--r--src/dispatcher.hpp6
-rw-r--r--src/fd_signaler.hpp3
-rw-r--r--src/i_signaler.hpp55
-rw-r--r--src/io_thread.cpp6
-rw-r--r--src/io_thread.hpp5
-rw-r--r--src/object.cpp1
-rw-r--r--src/queue.cpp5
-rw-r--r--src/simple_semaphore.hpp242
-rw-r--r--src/ypollset.cpp65
-rw-r--r--src/ypollset.hpp69
-rw-r--r--src/zmq.cpp11
18 files changed, 28 insertions, 801 deletions
diff --git a/doc/zmq_init.txt b/doc/zmq_init.txt
index 9df3e67..c1a5877 100644
--- a/doc/zmq_init.txt
+++ b/doc/zmq_init.txt
@@ -25,12 +25,8 @@ The 'io_threads' argument specifies the size of the 0MQ thread pool to handle
I/O operations. If your application is using 'inproc' messaging exclusively you
may set this to zero, otherwise set it to at least one.
-The 'flags' argument is a combination of the flags defined below:
-
-*ZMQ_POLL*::
-Specifies that sockets within this 'context' should support multiplexing using
-_zmq_poll()_. Enabling this functionality may add a small amount of latency to
-message transfers compared to leaving it disabled.
+There are no flags defined at the moment and the 'flags' argument should be set
+to zero.
RETURN VALUE
diff --git a/include/zmq.h b/include/zmq.h
index e860146..9745adf 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -144,6 +144,7 @@ ZMQ_EXPORT size_t zmq_msg_size (zmq_msg_t *msg);
/* 0MQ infrastructure (a.k.a. context) initialisation & termination. */
/******************************************************************************/
+/* This flag is obsolete and has no effect. To be removed in next version. */
#define ZMQ_POLL 1
ZMQ_EXPORT void *zmq_init (int app_threads, int io_threads, int flags);
diff --git a/src/Makefile.am b/src/Makefile.am
index 8cd94dd..837cd5f 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -50,7 +50,6 @@ endif
nodist_libzmq_la_SOURCES = $(pgm_sources)
libzmq_la_SOURCES = app_thread.hpp \
- atomic_bitmap.hpp \
atomic_counter.hpp \
atomic_ptr.hpp \
blob.hpp \
@@ -74,7 +73,6 @@ libzmq_la_SOURCES = app_thread.hpp \
i_endpoint.hpp \
i_engine.hpp \
i_poll_events.hpp \
- i_signaler.hpp \
kqueue.hpp \
lb.hpp \
likely.hpp \
@@ -98,7 +96,6 @@ libzmq_la_SOURCES = app_thread.hpp \
req.hpp \
select.hpp \
session.hpp \
- simple_semaphore.hpp \
socket_base.hpp \
stdint.hpp \
streamer.hpp \
@@ -116,7 +113,6 @@ libzmq_la_SOURCES = app_thread.hpp \
yarray.hpp \
yarray_item.hpp \
ypipe.hpp \
- ypollset.hpp \
yqueue.hpp \
zmq_connecter.hpp \
zmq_decoder.hpp \
@@ -166,7 +162,6 @@ libzmq_la_SOURCES = app_thread.hpp \
uuid.cpp \
xrep.cpp \
xreq.cpp \
- ypollset.cpp \
zmq.cpp \
zmq_connecter.cpp \
zmq_decoder.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 9ff2112..0dad660 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -36,7 +36,6 @@
#include "app_thread.hpp"
#include "dispatcher.hpp"
#include "fd_signaler.hpp"
-#include "ypollset.hpp"
#include "err.hpp"
#include "pipe.hpp"
#include "config.hpp"
@@ -59,27 +58,16 @@
#define ZMQ_DELAY_COMMANDS
#endif
-zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
- int flags_) :
+zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
object_t (dispatcher_, thread_slot_),
last_processing_time (0),
terminated (false)
{
- if (flags_ & ZMQ_POLL) {
- signaler = new (std::nothrow) fd_signaler_t;
- zmq_assert (signaler);
- }
- else {
- signaler = new (std::nothrow) ypollset_t;
- zmq_assert (signaler);
- }
}
zmq::app_thread_t::~app_thread_t ()
{
zmq_assert (sockets.empty ());
- zmq_assert (signaler);
- delete signaler;
}
void zmq::app_thread_t::stop ()
@@ -87,16 +75,16 @@ void zmq::app_thread_t::stop ()
send_stop ();
}
-zmq::i_signaler *zmq::app_thread_t::get_signaler ()
+zmq::fd_signaler_t *zmq::app_thread_t::get_signaler ()
{
- return signaler;
+ return &signaler;
}
bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)
{
uint64_t signals;
if (block_)
- signals = signaler->poll ();
+ signals = signaler.poll ();
else {
#if defined ZMQ_DELAY_COMMANDS
@@ -129,7 +117,7 @@ bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)
#endif
// Check whether there are any commands pending for this thread.
- signals = signaler->check ();
+ signals = signaler.check ();
}
if (signals) {
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
index 1c2f47a..b7572da 100644
--- a/src/app_thread.hpp
+++ b/src/app_thread.hpp
@@ -25,6 +25,7 @@
#include "stdint.hpp"
#include "object.hpp"
#include "yarray.hpp"
+#include "fd_signaler.hpp"
namespace zmq
{
@@ -33,8 +34,7 @@ namespace zmq
{
public:
- app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_,
- int flags_);
+ app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
~app_thread_t ();
@@ -43,7 +43,7 @@ namespace zmq
void stop ();
// Returns signaler associated with this application thread.
- struct i_signaler *get_signaler ();
+ fd_signaler_t *get_signaler ();
// Processes commands sent to this thread (if any). If 'block' is
// set to true, returns only after at least one command was processed.
@@ -71,7 +71,7 @@ namespace zmq
sockets_t sockets;
// App thread's signaler object.
- struct i_signaler *signaler;
+ fd_signaler_t signaler;
// Timestamp of when commands were processed the last time.
uint64_t last_processing_time;
diff --git a/src/atomic_bitmap.hpp b/src/atomic_bitmap.hpp
deleted file mode 100644
index 29bdc13..0000000
--- a/src/atomic_bitmap.hpp
+++ /dev/null
@@ -1,310 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_ATOMIC_BITMAP_HPP_INCLUDED__
-#define __ZMQ_ATOMIC_BITMAP_HPP_INCLUDED__
-
-#include "stdint.hpp"
-#include "platform.hpp"
-
-// These are the conditions to choose between different implementations
-// of atomic_bitmap.
-
-#if defined ZMQ_FORCE_MUTEXES
-#define ZMQ_ATOMIC_BITMAP_MUTEX
-#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__
-#define ZMQ_ATOMIC_BITMAP_X86
-#elif 0 && defined __sparc__ && defined __GNUC__
-#define ZMQ_ATOMIC_BITMAP_SPARC
-#elif defined ZMQ_HAVE_WINDOWS
-#define ZMQ_ATOMIC_BITMAP_WINDOWS
-#elif defined sun
-#define ZMQ_ATOMIC_COUNTER_SUN
-#elif defined( __GNUC__ ) && ( __GNUC__ * 100 + __GNUC_MINOR__ >= 401 )
-#define ZMQ_ATOMIC_COUNTER_GNU
-#else
-#define ZMQ_ATOMIC_BITMAP_MUTEX
-#endif
-
-#if defined ZMQ_ATOMIC_BITMAP_MUTEX
-#include "mutex.hpp"
-#elif defined ZMQ_ATOMIC_BITMAP_WINDOWS
-#include "windows.hpp"
-#elif defined ZMQ_ATOMIC_BITMAP_SUN
-#include <atomic.h>
-#endif
-
-namespace zmq
-{
-
- // This class encapuslates several bitwise atomic operations on unsigned
- // integer. Selection of operations is driven specifically by the needs
- // of ypollset implementation.
-
- class atomic_bitmap_t
- {
- public:
-
-#if (defined ZMQ_ATOMIC_BITMAP_X86 || defined ZMQ_FORCE_MUTEXES) \
- && defined __x86_64__
- typedef uint64_t bitmap_t;
-#else
- typedef uint32_t bitmap_t;
-#endif
-
- inline atomic_bitmap_t (bitmap_t value_ = 0) :
- value (value_)
- {
- }
-
- inline ~atomic_bitmap_t ()
- {
- }
-
- // Bit-test-set-and-reset. Sets one bit of the value and resets
- // another one. Returns the original value of the reset bit.
- inline bool btsr (int set_index_, int reset_index_)
- {
-#if defined ZMQ_ATOMIC_BITMAP_WINDOWS
- while (true) {
- bitmap_t oldval = value;
- bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) &
- ~(bitmap_t (1) << reset_index_);
- if (InterlockedCompareExchange ((volatile LONG*) &value, newval,
- oldval) == (LONG) oldval)
- return (oldval & (bitmap_t (1) << reset_index_)) ?
- true : false;
- }
-#elif defined ZMQ_ATOMIC_BITMAP_GNU
- while (true) {
- bitmap_t oldval = value;
- bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) &
- ~(bitmap_t (1) << reset_index_);
- if (__sync_val_compare_and_swap (&value, oldval, newval) == oldval)
- return (oldval & (bitmap_t (1) << reset_index_)) ?
- true : false;
- }
-#elif defined ZMQ_ATOMIC_BITMAP_SUN
- while (true) {
- bitmap_t oldval = value;
- bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) &
- ~(bitmap_t (1) << reset_index_);
- if (atomic_cas_32 (&value, oldval, newval) == oldval)
- return (oldval & (bitmap_t (1) << reset_index_)) ?
- true : false;
- }
-#elif defined ZMQ_ATOMIC_BITMAP_X86
- bitmap_t oldval, dummy;
- __asm__ volatile (
- "mov %0, %1\n\t"
- "1:"
- "mov %1, %2\n\t"
- "bts %3, %2\n\t"
- "btr %4, %2\n\t"
- "lock cmpxchg %2, %0\n\t"
- "jnz 1b\n\t"
- : "+m" (value), "=&a" (oldval), "=&r" (dummy)
- : "r" (bitmap_t(set_index_)), "r" (bitmap_t(reset_index_))
- : "cc");
- return (bool) (oldval & (bitmap_t(1) << reset_index_));
-#elif defined ZMQ_ATOMIC_BITMAP_SPARC
- volatile bitmap_t* valptr = &value;
- bitmap_t set_val = bitmap_t(1) << set_index_;
- bitmap_t reset_val = ~(bitmap_t(1) << reset_index_);
- bitmap_t tmp;
- bitmap_t oldval;
- __asm__ volatile(
- "ld [%5], %2 \n\t"
- "1: \n\t"
- "or %2, %0, %3 \n\t"
- "and %3, %1, %3 \n\t"
- "cas [%5], %2, %3 \n\t"
- "cmp %2, %3 \n\t"
- "bne,a,pn %%icc, 1b \n\t"
- "mov %3, %2 \n\t"
- : "+r" (set_val), "+r" (reset_val), "=&r" (tmp),
- "=&r" (oldval), "+m" (*valptr)
- : "r" (valptr)
- : "cc");
- return oldval;
-#elif defined ZMQ_ATOMIC_BITMAP_MUTEX
- sync.lock ();
- bitmap_t oldval = value;
- value = (oldval | (bitmap_t (1) << set_index_)) &
- ~(bitmap_t (1) << reset_index_);
- sync.unlock ();
- return (oldval & (bitmap_t (1) << reset_index_)) ? true : false;
-#else
-#error
-#endif
- }
-
- // Sets value to newval. Returns the original value.
- inline bitmap_t xchg (bitmap_t newval_)
- {
- bitmap_t oldval;
-#if defined ZMQ_ATOMIC_BITMAP_WINDOWS
- oldval = InterlockedExchange ((volatile LONG*) &value, newval_);
-#elif defined ZMQ_ATOMIC_BITMAP_GNU
- oldval = __sync_lock_test_and_set (&value, newval_);
-#elif defined ZMQ_ATOMIC_BITMAP_SUN
- oldval = atomic_swap_32 (&value, newval_);
-#elif defined ZMQ_ATOMIC_BITMAP_X86
- oldval = newval_;
- __asm__ volatile (
- "lock; xchg %0, %1"
- : "=r" (oldval)
- : "m" (value), "0" (oldval)
- : "memory");
-#elif defined ZMQ_ATOMIC_BITMAP_SPARC
- oldval = value;
- volatile bitmap_t* ptrin = &value;
- bitmap_t tmp;
- bitmap_t prev;
- __asm__ __volatile__(
- "ld [%4], %1\n\t"
- "1:\n\t"
- "mov %0, %2\n\t"
- "cas [%4], %1, %2\n\t"
- "cmp %1, %2\n\t"
- "bne,a,pn %%icc, 1b\n\t"
- "mov %2, %1\n\t"
- : "+r" (newval_), "=&r" (tmp), "=&r" (prev), "+m" (*ptrin)
- : "r" (ptrin)
- : "cc");
- return prev;
-#elif defined ZMQ_ATOMIC_BITMAP_MUTEX
- sync.lock ();
- oldval = value;
- value = newval_;
- sync.unlock ();
-#else
-#error
-#endif
- return oldval;
- }
-
- // izte is "if-zero-then-else" atomic operation - if the value is zero
- // it substitutes it by 'thenval' else it rewrites it by 'elseval'.
- // Original value of the integer is returned from this function.
- inline bitmap_t izte (bitmap_t thenval_,
- bitmap_t elseval_)
- {
-#if defined ZMQ_ATOMIC_BITMAP_WINDOWS
- while (true) {
- bitmap_t oldval = value;
- bitmap_t newval = oldval == 0 ? thenval_ : elseval_;
- if (InterlockedCompareExchange ((volatile LONG*) &value,
- newval, oldval) == (LONG) oldval)
- return oldval;
- }
-#elif defined ZMQ_ATOMIC_BITMAP_GNU
- while (true) {
- bitmap_t oldval = value;
- bitmap_t newval = oldval == 0 ? thenval_ : elseval_;
- if (__sync_val_compare_and_swap (&value, oldval, newval) == oldval)
- return oldval;
- }
-#elif defined ZMQ_ATOMIC_BITMAP_SUN
- while (true) {
- bitmap_t oldval = value;
- bitmap_t newval = oldval == 0 ? thenval_ : elseval_;
- if (atomic_cas_32 (&value, oldval, newval) == oldval)
- return oldval;
- }
-#elif defined ZMQ_ATOMIC_BITMAP_X86
- bitmap_t oldval;
- bitmap_t dummy;
- __asm__ volatile (
- "mov %0, %1\n\t"
- "1:"
- "mov %3, %2\n\t"
- "test %1, %1\n\t"
- "jz 2f\n\t"
- "mov %4, %2\n\t"
- "2:"
- "lock cmpxchg %2, %0\n\t"
- "jnz 1b\n\t"
- : "+m" (value), "=&a" (oldval), "=&r" (dummy)
- : "r" (thenval_), "r" (elseval_)
- : "cc");
- return oldval;
-#elif defined ZMQ_ATOMIC_BITMAP_SPARC
- volatile bitmap_t* ptrin = &value;
- bitmap_t tmp;
- bitmap_t prev;
- __asm__ __volatile__(
- "ld [%3], %0 \n\t"
- "mov 0, %1 \n\t"
- "cas [%3], %1, %4 \n\t"
- "cmp %0, %1 \n\t"
- "be,a,pn %%icc,1f \n\t"
- "ld [%3], %0 \n\t"
- "cas [%3], %0, %5 \n\t"
- "1: \n\t"
- : "=&r" (tmp), "=&r" (prev), "+m" (*ptrin)
- : "r" (ptrin), "r" (thenval_), "r" (elseval_)
- : "cc");
- return prev;
-#elif defined ZMQ_ATOMIC_BITMAP_MUTEX
- sync.lock ();
- bitmap_t oldval = value;
- value = oldval ? elseval_ : thenval_;
- sync.unlock ();
- return oldval;
-#else
-#error
-#endif
- }
-
- private:
-
- volatile bitmap_t value;
-#if defined ZMQ_ATOMIC_BITMAP_MUTEX
- mutex_t sync;
-#endif
-
- atomic_bitmap_t (const atomic_bitmap_t&);
- void operator = (const atomic_bitmap_t&);
- };
-
-}
-
-// Remove macros local to this file.
-#if defined ZMQ_ATOMIC_BITMAP_WINDOWS
-#undef ZMQ_ATOMIC_BITMAP_WINDOWS
-#endif
-#if defined ZMQ_ATOMIC_BITMAP_GNU
-#undef ZMQ_ATOMIC_BITMAP_GNU
-#endif
-#if defined ZMQ_ATOMIC_BITMAP_SUN
-#undef ZMQ_ATOMIC_BITMAP_SUN
-#endif
-#if defined ZMQ_ATOMIC_BITMAP_X86
-#undef ZMQ_ATOMIC_BITMAP_X86
-#endif
-#if defined ZMQ_ATOMIC_BITMAP_SPARC
-#undef ZMQ_ATOMIC_BITMAP_SPARC
-#endif
-#if defined ZMQ_ATOMIC_BITMAP_MUTEX
-#undef ZMQ_ATOMIC_BITMAP_MUTEX
-#endif
-
-#endif
-
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index a1154de..1e11619 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -33,8 +33,7 @@
#include "windows.h"
#endif
-zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
- int flags_) :
+zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
sockets (0),
terminated (false)
{
@@ -53,7 +52,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
for (int i = 0; i != app_threads_; i++) {
app_thread_info_t info;
info.associated = false;
- info.app_thread = new (std::nothrow) app_thread_t (this, i, flags_);
+ info.app_thread = new (std::nothrow) app_thread_t (this, i);
zmq_assert (info.app_thread);
app_threads.push_back (info);
signalers.push_back (info.app_thread->get_signaler ());
@@ -62,7 +61,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
// Create I/O thread objects.
for (int i = 0; i != io_threads_; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this,
- i + app_threads_, flags_);
+ i + app_threads_);
zmq_assert (io_thread);
io_threads.push_back (io_thread);
signalers.push_back (io_thread->get_signaler ());
diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp
index 1cd5207..6648f5d 100644
--- a/src/dispatcher.hpp
+++ b/src/dispatcher.hpp
@@ -25,7 +25,7 @@
#include <map>
#include <string>
-#include "i_signaler.hpp"
+#include "fd_signaler.hpp"
#include "ypipe.hpp"
#include "command.hpp"
#include "config.hpp"
@@ -51,7 +51,7 @@ namespace zmq
// Create the dispatcher object. Matrix of pipes to communicate between
// each socket and each I/O thread is created along with appropriate
// signalers.
- dispatcher_t (int app_threads_, int io_threads_, int flags_);
+ dispatcher_t (int app_threads_, int io_threads_);
// This function is called when user invokes zmq_term. If there are
// no more sockets open it'll cause all the infrastructure to be shut
@@ -125,7 +125,7 @@ namespace zmq
io_threads_t io_threads;
// Signalers for both application and I/O threads.
- std::vector <i_signaler*> signalers;
+ std::vector <fd_signaler_t*> signalers;
// Pipe to hold the commands.
typedef ypipe_t <command_t, true,
diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp
index f2037a9..a6ccbba 100644
--- a/src/fd_signaler.hpp
+++ b/src/fd_signaler.hpp
@@ -21,7 +21,6 @@
#define __ZMQ_FD_SIGNALER_HPP_INCLUDED__
#include "platform.hpp"
-#include "i_signaler.hpp"
#include "fd.hpp"
#include "stdint.hpp"
@@ -33,7 +32,7 @@ namespace zmq
// descriptor and so it can be polled on. Same signal cannot be sent twice
// unless signals are retrieved by the reader side in the meantime.
- class fd_signaler_t : public i_signaler
+ class fd_signaler_t
{
public:
diff --git a/src/i_signaler.hpp b/src/i_signaler.hpp
deleted file mode 100644
index 822ab8e..0000000
--- a/src/i_signaler.hpp
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_I_SIGNALER_HPP_INCLUDED__
-#define __ZMQ_I_SIGNALER_HPP_INCLUDED__
-
-#include "stdint.hpp"
-#include "fd.hpp"
-
-namespace zmq
-{
- // Virtual interface used to send signals. Individual implementations
- // may restrict the number of possible signal types to send.
-
- struct i_signaler
- {
- virtual ~i_signaler () {};
-
- // Send a signal with a specific ID.
- virtual void signal (int signal_) = 0;
-
- // Wait for signal. Returns a set of signals in form of a bitmap.
- // Signal with index 0 corresponds to value 1, index 1 to value 2,
- // index 2 to value 3 etc.
- virtual uint64_t poll () = 0;
-
- // Same as poll, however, if there is no signal available,
- // function returns zero immediately instead of waiting for a signal.
- virtual uint64_t check () = 0;
-
- // Returns file descriptor that allows waiting for signals. Specific
- // signalers may not support this functionality. If so, the function
- // returns retired_fd.
- virtual fd_t get_fd () = 0;
- };
-
-}
-
-#endif
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index ff45478..41f7f7d 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -27,10 +27,8 @@
#include "err.hpp"
#include "command.hpp"
#include "dispatcher.hpp"
-#include "simple_semaphore.hpp"
-zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
- int flags_) :
+zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
object_t (dispatcher_, thread_slot_)
{
poller = new (std::nothrow) poller_t;
@@ -56,7 +54,7 @@ void zmq::io_thread_t::stop ()
send_stop ();
}
-zmq::i_signaler *zmq::io_thread_t::get_signaler ()
+zmq::fd_signaler_t *zmq::io_thread_t::get_signaler ()
{
return &signaler;
}
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index 9377515..84b9319 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -38,8 +38,7 @@ namespace zmq
{
public:
- io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_,
- int flags_);
+ io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
// Clean-up. If the thread was started, it's neccessary to call 'stop'
// before invoking destructor. Otherwise the destructor would hang up.
@@ -52,7 +51,7 @@ namespace zmq
void stop ();
// Returns signaler associated with this I/O thread.
- i_signaler *get_signaler ();
+ fd_signaler_t *get_signaler ();
// i_poll_events implementation.
void in_event ();
diff --git a/src/object.cpp b/src/object.cpp
index 9221280..113a456 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -24,7 +24,6 @@
#include "err.hpp"
#include "pipe.hpp"
#include "io_thread.hpp"
-#include "simple_semaphore.hpp"
#include "owned.hpp"
#include "session.hpp"
#include "socket_base.hpp"
diff --git a/src/queue.cpp b/src/queue.cpp
index 165e360..05cd125 100644
--- a/src/queue.cpp
+++ b/src/queue.cpp
@@ -51,8 +51,9 @@ int zmq::queue (class socket_base_t *insocket_,
errno_assert (rc > 0);
// The algorithm below asumes ratio of request and replies processed
- // under full load to be 1:1. The alternative would be to process
- // replies first, handle request only when there are no more replies.
+ // under full load to be 1:1. While processing requests replies first
+ // is tempting it is suspectible to DoS attacks (overloading the system
+ // with unsolicited replies).
// Receive a new request.
if (items [0].revents & ZMQ_POLLIN) {
diff --git a/src/simple_semaphore.hpp b/src/simple_semaphore.hpp
deleted file mode 100644
index 78d72e5..0000000
--- a/src/simple_semaphore.hpp
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_SIMPLE_SEMAPHORE_HPP_INCLUDED__
-#define __ZMQ_SIMPLE_SEMAPHORE_HPP_INCLUDED__
-
-#include "platform.hpp"
-#include "err.hpp"
-
-#if 0 //defined ZMQ_HAVE_LINUX
-#include <sys/syscall.h>
-#include <unistd.h>
-#include <linux/futex.h>
-#elif defined ZMQ_HAVE_LINUX ||defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
-#include <pthread.h>
-#elif defined ZMQ_HAVE_WINDOWS
-#include "windows.hpp"
-#else
-#include <semaphore.h>
-#endif
-
-namespace zmq
-{
- // Simple semaphore. Only single thread may be waiting at any given time.
- // Also, the semaphore may not be posted before the previous post
- // was matched by corresponding wait and the waiting thread was
- // released.
-
-#if 0 //defined ZMQ_HAVE_LINUX
-
- // In theory, using private futexes should be more efficient on Linux
- // platform than using mutexes. However, in uncontended cases of TCP
- // transport on loopback interface we haven't seen any latency improvement.
- // The code is commented out waiting for more thorough testing.
-
- class simple_semaphore_t
- {
- public:
-
- // Initialise the semaphore.
- inline simple_semaphore_t () :
- dummy (0)
- {
- }
-
- // Destroy the semaphore.
- inline ~simple_semaphore_t ()
- {
- }
-
- // Wait for the semaphore.
- inline void wait ()
- {
- int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAIT_PRIVATE,
- (int) 0, NULL, NULL, (int) 0);
- zmq_assert (rc == 0);
- }
-
- // Post the semaphore.
- inline void post ()
- {
- while (true) {
- int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAKE_PRIVATE,
- (int) 1, NULL, NULL, (int) 0);
- zmq_assert (rc != -1 && rc <= 1);
- if (rc == 1)
- break;
- }
- }
-
- private:
-
- int dummy;
-
- // Disable copying of the object.
- simple_semaphore_t (const simple_semaphore_t&);
- void operator = (const simple_semaphore_t&);
- };
-
-#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
-
- // On platforms that allow for double locking of a mutex from the same
- // thread, simple semaphore is implemented using mutex, as it is more
- // efficient than full-blown semaphore.
-
- class simple_semaphore_t
- {
- public:
-
- // Initialise the semaphore.
- inline simple_semaphore_t ()
- {
- int rc = pthread_mutex_init (&mutex, NULL);
- posix_assert (rc);
- rc = pthread_mutex_lock (&mutex);
- posix_assert (rc);
- }
-
- // Destroy the semaphore.
- inline ~simple_semaphore_t ()
- {
- int rc = pthread_mutex_unlock (&mutex);
- posix_assert (rc);
- rc = pthread_mutex_destroy (&mutex);
- posix_assert (rc);
- }
-
- // Wait for the semaphore.
- inline void wait ()
- {
- int rc = pthread_mutex_lock (&mutex);
- posix_assert (rc);
- }
-
- // Post the semaphore.
- inline void post ()
- {
- int rc = pthread_mutex_unlock (&mutex);
- posix_assert (rc);
- }
-
- private:
-
- pthread_mutex_t mutex;
-
- // Disable copying of the object.
- simple_semaphore_t (const simple_semaphore_t&);
- void operator = (const simple_semaphore_t&);
- };
-
-#elif defined ZMQ_HAVE_WINDOWS
-
- // On Windows platform simple semaphore is implemeted using event object.
-
- class simple_semaphore_t
- {
- public:
-
- // Initialise the semaphore.
- inline simple_semaphore_t ()
- {
- ev = CreateEvent (NULL, FALSE, FALSE, NULL);
- win_assert (ev != NULL);
- }
-
- // Destroy the semaphore.
- inline ~simple_semaphore_t ()
- {
- int rc = CloseHandle (ev);
- win_assert (rc != 0);
- }
-
- // Wait for the semaphore.
- inline void wait ()
- {
- DWORD rc = WaitForSingleObject (ev, INFINITE);
- win_assert (rc != WAIT_FAILED);
- }
-
- // Post the semaphore.
- inline void post ()
- {
- int rc = SetEvent (ev);
- win_assert (rc != 0);
- }
-
- private:
-
- HANDLE ev;
-
- // Disable copying of the object.
- simple_semaphore_t (const simple_semaphore_t&);
- void operator = (const simple_semaphore_t&);
- };
-
-#else
-
- // Default implementation maps simple semaphore to standard semaphore.
-
- class simple_semaphore_t
- {
- public:
-
- // Initialise the semaphore.
- inline simple_semaphore_t ()
- {
- int rc = sem_init (&sem, 0, 0);
- errno_assert (rc != -1);
- }
-
- // Destroy the semaphore.
- inline ~simple_semaphore_t ()
- {
- int rc = sem_destroy (&sem);
- errno_assert (rc != -1);
- }
-
- // Wait for the semaphore.
- inline void wait ()
- {
- int rc = sem_wait (&sem);
- errno_assert (rc != -1);
- }
-
- // Post the semaphore.
- inline void post ()
- {
- int rc = sem_post (&sem);
- errno_assert (rc != -1);
- }
-
- private:
-
- // Underlying system semaphore object.
- sem_t sem;
-
- // Disable copying of the object.
- simple_semaphore_t (const simple_semaphore_t&);
- void operator = (const simple_semaphore_t&);
- };
-
-#endif
-
-}
-
-#endif
diff --git a/src/ypollset.cpp b/src/ypollset.cpp
deleted file mode 100644
index 51284a8..0000000
--- a/src/ypollset.cpp
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "ypollset.hpp"
-
-zmq::ypollset_t::ypollset_t ()
-{
-}
-
-zmq::ypollset_t::~ypollset_t ()
-{
-}
-
-void zmq::ypollset_t::signal (int signal_)
-{
- zmq_assert (signal_ >= 0 && signal_ < wait_signal);
- if (bits.btsr (signal_, wait_signal))
- sem.post ();
-}
-
-uint64_t zmq::ypollset_t::poll ()
-{
- signals_t result = 0;
- while (!result) {
- result = bits.izte (signals_t (1) << wait_signal, 0);
- if (!result) {
- sem.wait ();
- result = bits.xchg (0);
- }
-
- // If btsr was really atomic, result would never be 0 at this
- // point, i.e. no looping would be possible. However, to
- // support even CPU architectures without CAS instruction
- // we allow btsr to be composed of two independent atomic
- // operation (set and reset). In such case looping can occur
- // sporadically.
- }
- return (uint64_t) result;
-}
-
-uint64_t zmq::ypollset_t::check ()
-{
- return (uint64_t) bits.xchg (0);
-}
-
-zmq::fd_t zmq::ypollset_t::get_fd ()
-{
- return retired_fd;
-}
diff --git a/src/ypollset.hpp b/src/ypollset.hpp
deleted file mode 100644
index c75b149..0000000
--- a/src/ypollset.hpp
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_YPOLLSET_HPP_INCLUDED__
-#define __ZMQ_YPOLLSET_HPP_INCLUDED__
-
-#include "i_signaler.hpp"
-#include "simple_semaphore.hpp"
-#include "atomic_bitmap.hpp"
-
-namespace zmq
-{
-
- // ypollset allows for rapid polling for up to constant number of
- // different signals each produced by a different thread. The number of
- // possible signals is platform-dependent.
-
- class ypollset_t : public i_signaler
- {
- public:
-
- ypollset_t ();
- ~ypollset_t ();
-
- // i_signaler interface implementation.
- void signal (int signal_);
- uint64_t poll ();
- uint64_t check ();
- fd_t get_fd ();
-
- private:
-
- // Internal representation of signal bitmap.
- typedef atomic_bitmap_t::bitmap_t signals_t;
-
- // Wait signal is carried in the most significant bit of integer.
- enum {wait_signal = sizeof (signals_t) * 8 - 1};
-
- // The bits of the pollset.
- atomic_bitmap_t bits;
-
- // Used by thread waiting for signals to sleep if there are no
- // signals available.
- simple_semaphore_t sem;
-
- // Disable copying of ypollset object.
- ypollset_t (const ypollset_t&);
- void operator = (const ypollset_t&);
- };
-
-}
-
-#endif
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 74d52f6..c05af25 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -228,14 +228,7 @@ size_t zmq_msg_size (zmq_msg_t *msg_)
void *zmq_init (int app_threads_, int io_threads_, int flags_)
{
- // There should be at least a single application thread managed
- // by the dispatcher. There's no need for I/O threads if 0MQ is used
- // only for inproc messaging
- if (app_threads_ < 1 || io_threads_ < 0 ||
- app_threads_ > 63 || io_threads_ > 63) {
- errno = EINVAL;
- return NULL;
- }
+ // There are no context flags defined at the moment, so flags_ is ignored.
#if defined ZMQ_HAVE_OPENPGM
// Unfortunately, OpenPGM doesn't support refcounted init/shutdown, thus,
@@ -269,7 +262,7 @@ void *zmq_init (int app_threads_, int io_threads_, int flags_)
// Create 0MQ context.
zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t (
- app_threads_, io_threads_, flags_);
+ app_threads_, io_threads_);
zmq_assert (dispatcher);
return (void*) dispatcher;
}