diff options
-rw-r--r-- | doc/zmq_init.txt | 8 | ||||
-rw-r--r-- | include/zmq.h | 1 | ||||
-rw-r--r-- | src/Makefile.am | 5 | ||||
-rw-r--r-- | src/app_thread.cpp | 22 | ||||
-rw-r--r-- | src/app_thread.hpp | 8 | ||||
-rw-r--r-- | src/atomic_bitmap.hpp | 310 | ||||
-rw-r--r-- | src/dispatcher.cpp | 7 | ||||
-rw-r--r-- | src/dispatcher.hpp | 6 | ||||
-rw-r--r-- | src/fd_signaler.hpp | 3 | ||||
-rw-r--r-- | src/i_signaler.hpp | 55 | ||||
-rw-r--r-- | src/io_thread.cpp | 6 | ||||
-rw-r--r-- | src/io_thread.hpp | 5 | ||||
-rw-r--r-- | src/object.cpp | 1 | ||||
-rw-r--r-- | src/queue.cpp | 5 | ||||
-rw-r--r-- | src/simple_semaphore.hpp | 242 | ||||
-rw-r--r-- | src/ypollset.cpp | 65 | ||||
-rw-r--r-- | src/ypollset.hpp | 69 | ||||
-rw-r--r-- | src/zmq.cpp | 11 |
18 files changed, 28 insertions, 801 deletions
diff --git a/doc/zmq_init.txt b/doc/zmq_init.txt index 9df3e67..c1a5877 100644 --- a/doc/zmq_init.txt +++ b/doc/zmq_init.txt @@ -25,12 +25,8 @@ The 'io_threads' argument specifies the size of the 0MQ thread pool to handle I/O operations. If your application is using 'inproc' messaging exclusively you may set this to zero, otherwise set it to at least one. -The 'flags' argument is a combination of the flags defined below: - -*ZMQ_POLL*:: -Specifies that sockets within this 'context' should support multiplexing using -_zmq_poll()_. Enabling this functionality may add a small amount of latency to -message transfers compared to leaving it disabled. +There are no flags defined at the moment and the 'flags' argument should be set +to zero. RETURN VALUE diff --git a/include/zmq.h b/include/zmq.h index e860146..9745adf 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -144,6 +144,7 @@ ZMQ_EXPORT size_t zmq_msg_size (zmq_msg_t *msg); /* 0MQ infrastructure (a.k.a. context) initialisation & termination. */ /******************************************************************************/ +/* This flag is obsolete and has no effect. To be removed in next version. */ #define ZMQ_POLL 1 ZMQ_EXPORT void *zmq_init (int app_threads, int io_threads, int flags); diff --git a/src/Makefile.am b/src/Makefile.am index 8cd94dd..837cd5f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -50,7 +50,6 @@ endif nodist_libzmq_la_SOURCES = $(pgm_sources) libzmq_la_SOURCES = app_thread.hpp \ - atomic_bitmap.hpp \ atomic_counter.hpp \ atomic_ptr.hpp \ blob.hpp \ @@ -74,7 +73,6 @@ libzmq_la_SOURCES = app_thread.hpp \ i_endpoint.hpp \ i_engine.hpp \ i_poll_events.hpp \ - i_signaler.hpp \ kqueue.hpp \ lb.hpp \ likely.hpp \ @@ -98,7 +96,6 @@ libzmq_la_SOURCES = app_thread.hpp \ req.hpp \ select.hpp \ session.hpp \ - simple_semaphore.hpp \ socket_base.hpp \ stdint.hpp \ streamer.hpp \ @@ -116,7 +113,6 @@ libzmq_la_SOURCES = app_thread.hpp \ yarray.hpp \ yarray_item.hpp \ ypipe.hpp \ - ypollset.hpp \ yqueue.hpp \ zmq_connecter.hpp \ zmq_decoder.hpp \ @@ -166,7 +162,6 @@ libzmq_la_SOURCES = app_thread.hpp \ uuid.cpp \ xrep.cpp \ xreq.cpp \ - ypollset.cpp \ zmq.cpp \ zmq_connecter.cpp \ zmq_decoder.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 9ff2112..0dad660 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -36,7 +36,6 @@ #include "app_thread.hpp" #include "dispatcher.hpp" #include "fd_signaler.hpp" -#include "ypollset.hpp" #include "err.hpp" #include "pipe.hpp" #include "config.hpp" @@ -59,27 +58,16 @@ #define ZMQ_DELAY_COMMANDS #endif -zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_, - int flags_) : +zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : object_t (dispatcher_, thread_slot_), last_processing_time (0), terminated (false) { - if (flags_ & ZMQ_POLL) { - signaler = new (std::nothrow) fd_signaler_t; - zmq_assert (signaler); - } - else { - signaler = new (std::nothrow) ypollset_t; - zmq_assert (signaler); - } } zmq::app_thread_t::~app_thread_t () { zmq_assert (sockets.empty ()); - zmq_assert (signaler); - delete signaler; } void zmq::app_thread_t::stop () @@ -87,16 +75,16 @@ void zmq::app_thread_t::stop () send_stop (); } -zmq::i_signaler *zmq::app_thread_t::get_signaler () +zmq::fd_signaler_t *zmq::app_thread_t::get_signaler () { - return signaler; + return &signaler; } bool zmq::app_thread_t::process_commands (bool block_, bool throttle_) { uint64_t signals; if (block_) - signals = signaler->poll (); + signals = signaler.poll (); else { #if defined ZMQ_DELAY_COMMANDS @@ -129,7 +117,7 @@ bool zmq::app_thread_t::process_commands (bool block_, bool throttle_) #endif // Check whether there are any commands pending for this thread. - signals = signaler->check (); + signals = signaler.check (); } if (signals) { diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 1c2f47a..b7572da 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -25,6 +25,7 @@ #include "stdint.hpp" #include "object.hpp" #include "yarray.hpp" +#include "fd_signaler.hpp" namespace zmq { @@ -33,8 +34,7 @@ namespace zmq { public: - app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_, - int flags_); + app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); ~app_thread_t (); @@ -43,7 +43,7 @@ namespace zmq void stop (); // Returns signaler associated with this application thread. - struct i_signaler *get_signaler (); + fd_signaler_t *get_signaler (); // Processes commands sent to this thread (if any). If 'block' is // set to true, returns only after at least one command was processed. @@ -71,7 +71,7 @@ namespace zmq sockets_t sockets; // App thread's signaler object. - struct i_signaler *signaler; + fd_signaler_t signaler; // Timestamp of when commands were processed the last time. uint64_t last_processing_time; diff --git a/src/atomic_bitmap.hpp b/src/atomic_bitmap.hpp deleted file mode 100644 index 29bdc13..0000000 --- a/src/atomic_bitmap.hpp +++ /dev/null @@ -1,310 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_ATOMIC_BITMAP_HPP_INCLUDED__ -#define __ZMQ_ATOMIC_BITMAP_HPP_INCLUDED__ - -#include "stdint.hpp" -#include "platform.hpp" - -// These are the conditions to choose between different implementations -// of atomic_bitmap. - -#if defined ZMQ_FORCE_MUTEXES -#define ZMQ_ATOMIC_BITMAP_MUTEX -#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ -#define ZMQ_ATOMIC_BITMAP_X86 -#elif 0 && defined __sparc__ && defined __GNUC__ -#define ZMQ_ATOMIC_BITMAP_SPARC -#elif defined ZMQ_HAVE_WINDOWS -#define ZMQ_ATOMIC_BITMAP_WINDOWS -#elif defined sun -#define ZMQ_ATOMIC_COUNTER_SUN -#elif defined( __GNUC__ ) && ( __GNUC__ * 100 + __GNUC_MINOR__ >= 401 ) -#define ZMQ_ATOMIC_COUNTER_GNU -#else -#define ZMQ_ATOMIC_BITMAP_MUTEX -#endif - -#if defined ZMQ_ATOMIC_BITMAP_MUTEX -#include "mutex.hpp" -#elif defined ZMQ_ATOMIC_BITMAP_WINDOWS -#include "windows.hpp" -#elif defined ZMQ_ATOMIC_BITMAP_SUN -#include <atomic.h> -#endif - -namespace zmq -{ - - // This class encapuslates several bitwise atomic operations on unsigned - // integer. Selection of operations is driven specifically by the needs - // of ypollset implementation. - - class atomic_bitmap_t - { - public: - -#if (defined ZMQ_ATOMIC_BITMAP_X86 || defined ZMQ_FORCE_MUTEXES) \ - && defined __x86_64__ - typedef uint64_t bitmap_t; -#else - typedef uint32_t bitmap_t; -#endif - - inline atomic_bitmap_t (bitmap_t value_ = 0) : - value (value_) - { - } - - inline ~atomic_bitmap_t () - { - } - - // Bit-test-set-and-reset. Sets one bit of the value and resets - // another one. Returns the original value of the reset bit. - inline bool btsr (int set_index_, int reset_index_) - { -#if defined ZMQ_ATOMIC_BITMAP_WINDOWS - while (true) { - bitmap_t oldval = value; - bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) & - ~(bitmap_t (1) << reset_index_); - if (InterlockedCompareExchange ((volatile LONG*) &value, newval, - oldval) == (LONG) oldval) - return (oldval & (bitmap_t (1) << reset_index_)) ? - true : false; - } -#elif defined ZMQ_ATOMIC_BITMAP_GNU - while (true) { - bitmap_t oldval = value; - bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) & - ~(bitmap_t (1) << reset_index_); - if (__sync_val_compare_and_swap (&value, oldval, newval) == oldval) - return (oldval & (bitmap_t (1) << reset_index_)) ? - true : false; - } -#elif defined ZMQ_ATOMIC_BITMAP_SUN - while (true) { - bitmap_t oldval = value; - bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) & - ~(bitmap_t (1) << reset_index_); - if (atomic_cas_32 (&value, oldval, newval) == oldval) - return (oldval & (bitmap_t (1) << reset_index_)) ? - true : false; - } -#elif defined ZMQ_ATOMIC_BITMAP_X86 - bitmap_t oldval, dummy; - __asm__ volatile ( - "mov %0, %1\n\t" - "1:" - "mov %1, %2\n\t" - "bts %3, %2\n\t" - "btr %4, %2\n\t" - "lock cmpxchg %2, %0\n\t" - "jnz 1b\n\t" - : "+m" (value), "=&a" (oldval), "=&r" (dummy) - : "r" (bitmap_t(set_index_)), "r" (bitmap_t(reset_index_)) - : "cc"); - return (bool) (oldval & (bitmap_t(1) << reset_index_)); -#elif defined ZMQ_ATOMIC_BITMAP_SPARC - volatile bitmap_t* valptr = &value; - bitmap_t set_val = bitmap_t(1) << set_index_; - bitmap_t reset_val = ~(bitmap_t(1) << reset_index_); - bitmap_t tmp; - bitmap_t oldval; - __asm__ volatile( - "ld [%5], %2 \n\t" - "1: \n\t" - "or %2, %0, %3 \n\t" - "and %3, %1, %3 \n\t" - "cas [%5], %2, %3 \n\t" - "cmp %2, %3 \n\t" - "bne,a,pn %%icc, 1b \n\t" - "mov %3, %2 \n\t" - : "+r" (set_val), "+r" (reset_val), "=&r" (tmp), - "=&r" (oldval), "+m" (*valptr) - : "r" (valptr) - : "cc"); - return oldval; -#elif defined ZMQ_ATOMIC_BITMAP_MUTEX - sync.lock (); - bitmap_t oldval = value; - value = (oldval | (bitmap_t (1) << set_index_)) & - ~(bitmap_t (1) << reset_index_); - sync.unlock (); - return (oldval & (bitmap_t (1) << reset_index_)) ? true : false; -#else -#error -#endif - } - - // Sets value to newval. Returns the original value. - inline bitmap_t xchg (bitmap_t newval_) - { - bitmap_t oldval; -#if defined ZMQ_ATOMIC_BITMAP_WINDOWS - oldval = InterlockedExchange ((volatile LONG*) &value, newval_); -#elif defined ZMQ_ATOMIC_BITMAP_GNU - oldval = __sync_lock_test_and_set (&value, newval_); -#elif defined ZMQ_ATOMIC_BITMAP_SUN - oldval = atomic_swap_32 (&value, newval_); -#elif defined ZMQ_ATOMIC_BITMAP_X86 - oldval = newval_; - __asm__ volatile ( - "lock; xchg %0, %1" - : "=r" (oldval) - : "m" (value), "0" (oldval) - : "memory"); -#elif defined ZMQ_ATOMIC_BITMAP_SPARC - oldval = value; - volatile bitmap_t* ptrin = &value; - bitmap_t tmp; - bitmap_t prev; - __asm__ __volatile__( - "ld [%4], %1\n\t" - "1:\n\t" - "mov %0, %2\n\t" - "cas [%4], %1, %2\n\t" - "cmp %1, %2\n\t" - "bne,a,pn %%icc, 1b\n\t" - "mov %2, %1\n\t" - : "+r" (newval_), "=&r" (tmp), "=&r" (prev), "+m" (*ptrin) - : "r" (ptrin) - : "cc"); - return prev; -#elif defined ZMQ_ATOMIC_BITMAP_MUTEX - sync.lock (); - oldval = value; - value = newval_; - sync.unlock (); -#else -#error -#endif - return oldval; - } - - // izte is "if-zero-then-else" atomic operation - if the value is zero - // it substitutes it by 'thenval' else it rewrites it by 'elseval'. - // Original value of the integer is returned from this function. - inline bitmap_t izte (bitmap_t thenval_, - bitmap_t elseval_) - { -#if defined ZMQ_ATOMIC_BITMAP_WINDOWS - while (true) { - bitmap_t oldval = value; - bitmap_t newval = oldval == 0 ? thenval_ : elseval_; - if (InterlockedCompareExchange ((volatile LONG*) &value, - newval, oldval) == (LONG) oldval) - return oldval; - } -#elif defined ZMQ_ATOMIC_BITMAP_GNU - while (true) { - bitmap_t oldval = value; - bitmap_t newval = oldval == 0 ? thenval_ : elseval_; - if (__sync_val_compare_and_swap (&value, oldval, newval) == oldval) - return oldval; - } -#elif defined ZMQ_ATOMIC_BITMAP_SUN - while (true) { - bitmap_t oldval = value; - bitmap_t newval = oldval == 0 ? thenval_ : elseval_; - if (atomic_cas_32 (&value, oldval, newval) == oldval) - return oldval; - } -#elif defined ZMQ_ATOMIC_BITMAP_X86 - bitmap_t oldval; - bitmap_t dummy; - __asm__ volatile ( - "mov %0, %1\n\t" - "1:" - "mov %3, %2\n\t" - "test %1, %1\n\t" - "jz 2f\n\t" - "mov %4, %2\n\t" - "2:" - "lock cmpxchg %2, %0\n\t" - "jnz 1b\n\t" - : "+m" (value), "=&a" (oldval), "=&r" (dummy) - : "r" (thenval_), "r" (elseval_) - : "cc"); - return oldval; -#elif defined ZMQ_ATOMIC_BITMAP_SPARC - volatile bitmap_t* ptrin = &value; - bitmap_t tmp; - bitmap_t prev; - __asm__ __volatile__( - "ld [%3], %0 \n\t" - "mov 0, %1 \n\t" - "cas [%3], %1, %4 \n\t" - "cmp %0, %1 \n\t" - "be,a,pn %%icc,1f \n\t" - "ld [%3], %0 \n\t" - "cas [%3], %0, %5 \n\t" - "1: \n\t" - : "=&r" (tmp), "=&r" (prev), "+m" (*ptrin) - : "r" (ptrin), "r" (thenval_), "r" (elseval_) - : "cc"); - return prev; -#elif defined ZMQ_ATOMIC_BITMAP_MUTEX - sync.lock (); - bitmap_t oldval = value; - value = oldval ? elseval_ : thenval_; - sync.unlock (); - return oldval; -#else -#error -#endif - } - - private: - - volatile bitmap_t value; -#if defined ZMQ_ATOMIC_BITMAP_MUTEX - mutex_t sync; -#endif - - atomic_bitmap_t (const atomic_bitmap_t&); - void operator = (const atomic_bitmap_t&); - }; - -} - -// Remove macros local to this file. -#if defined ZMQ_ATOMIC_BITMAP_WINDOWS -#undef ZMQ_ATOMIC_BITMAP_WINDOWS -#endif -#if defined ZMQ_ATOMIC_BITMAP_GNU -#undef ZMQ_ATOMIC_BITMAP_GNU -#endif -#if defined ZMQ_ATOMIC_BITMAP_SUN -#undef ZMQ_ATOMIC_BITMAP_SUN -#endif -#if defined ZMQ_ATOMIC_BITMAP_X86 -#undef ZMQ_ATOMIC_BITMAP_X86 -#endif -#if defined ZMQ_ATOMIC_BITMAP_SPARC -#undef ZMQ_ATOMIC_BITMAP_SPARC -#endif -#if defined ZMQ_ATOMIC_BITMAP_MUTEX -#undef ZMQ_ATOMIC_BITMAP_MUTEX -#endif - -#endif - diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index a1154de..1e11619 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -33,8 +33,7 @@ #include "windows.h" #endif -zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_, - int flags_) : +zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) : sockets (0), terminated (false) { @@ -53,7 +52,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_, for (int i = 0; i != app_threads_; i++) { app_thread_info_t info; info.associated = false; - info.app_thread = new (std::nothrow) app_thread_t (this, i, flags_); + info.app_thread = new (std::nothrow) app_thread_t (this, i); zmq_assert (info.app_thread); app_threads.push_back (info); signalers.push_back (info.app_thread->get_signaler ()); @@ -62,7 +61,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_, // Create I/O thread objects. for (int i = 0; i != io_threads_; i++) { io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, - i + app_threads_, flags_); + i + app_threads_); zmq_assert (io_thread); io_threads.push_back (io_thread); signalers.push_back (io_thread->get_signaler ()); diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index 1cd5207..6648f5d 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -25,7 +25,7 @@ #include <map> #include <string> -#include "i_signaler.hpp" +#include "fd_signaler.hpp" #include "ypipe.hpp" #include "command.hpp" #include "config.hpp" @@ -51,7 +51,7 @@ namespace zmq // Create the dispatcher object. Matrix of pipes to communicate between // each socket and each I/O thread is created along with appropriate // signalers. - dispatcher_t (int app_threads_, int io_threads_, int flags_); + dispatcher_t (int app_threads_, int io_threads_); // This function is called when user invokes zmq_term. If there are // no more sockets open it'll cause all the infrastructure to be shut @@ -125,7 +125,7 @@ namespace zmq io_threads_t io_threads; // Signalers for both application and I/O threads. - std::vector <i_signaler*> signalers; + std::vector <fd_signaler_t*> signalers; // Pipe to hold the commands. typedef ypipe_t <command_t, true, diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp index f2037a9..a6ccbba 100644 --- a/src/fd_signaler.hpp +++ b/src/fd_signaler.hpp @@ -21,7 +21,6 @@ #define __ZMQ_FD_SIGNALER_HPP_INCLUDED__ #include "platform.hpp" -#include "i_signaler.hpp" #include "fd.hpp" #include "stdint.hpp" @@ -33,7 +32,7 @@ namespace zmq // descriptor and so it can be polled on. Same signal cannot be sent twice // unless signals are retrieved by the reader side in the meantime. - class fd_signaler_t : public i_signaler + class fd_signaler_t { public: diff --git a/src/i_signaler.hpp b/src/i_signaler.hpp deleted file mode 100644 index 822ab8e..0000000 --- a/src/i_signaler.hpp +++ /dev/null @@ -1,55 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_I_SIGNALER_HPP_INCLUDED__ -#define __ZMQ_I_SIGNALER_HPP_INCLUDED__ - -#include "stdint.hpp" -#include "fd.hpp" - -namespace zmq -{ - // Virtual interface used to send signals. Individual implementations - // may restrict the number of possible signal types to send. - - struct i_signaler - { - virtual ~i_signaler () {}; - - // Send a signal with a specific ID. - virtual void signal (int signal_) = 0; - - // Wait for signal. Returns a set of signals in form of a bitmap. - // Signal with index 0 corresponds to value 1, index 1 to value 2, - // index 2 to value 3 etc. - virtual uint64_t poll () = 0; - - // Same as poll, however, if there is no signal available, - // function returns zero immediately instead of waiting for a signal. - virtual uint64_t check () = 0; - - // Returns file descriptor that allows waiting for signals. Specific - // signalers may not support this functionality. If so, the function - // returns retired_fd. - virtual fd_t get_fd () = 0; - }; - -} - -#endif diff --git a/src/io_thread.cpp b/src/io_thread.cpp index ff45478..41f7f7d 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -27,10 +27,8 @@ #include "err.hpp" #include "command.hpp" #include "dispatcher.hpp" -#include "simple_semaphore.hpp" -zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_, - int flags_) : +zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : object_t (dispatcher_, thread_slot_) { poller = new (std::nothrow) poller_t; @@ -56,7 +54,7 @@ void zmq::io_thread_t::stop () send_stop (); } -zmq::i_signaler *zmq::io_thread_t::get_signaler () +zmq::fd_signaler_t *zmq::io_thread_t::get_signaler () { return &signaler; } diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 9377515..84b9319 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -38,8 +38,7 @@ namespace zmq { public: - io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_, - int flags_); + io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); // Clean-up. If the thread was started, it's neccessary to call 'stop' // before invoking destructor. Otherwise the destructor would hang up. @@ -52,7 +51,7 @@ namespace zmq void stop (); // Returns signaler associated with this I/O thread. - i_signaler *get_signaler (); + fd_signaler_t *get_signaler (); // i_poll_events implementation. void in_event (); diff --git a/src/object.cpp b/src/object.cpp index 9221280..113a456 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -24,7 +24,6 @@ #include "err.hpp" #include "pipe.hpp" #include "io_thread.hpp" -#include "simple_semaphore.hpp" #include "owned.hpp" #include "session.hpp" #include "socket_base.hpp" diff --git a/src/queue.cpp b/src/queue.cpp index 165e360..05cd125 100644 --- a/src/queue.cpp +++ b/src/queue.cpp @@ -51,8 +51,9 @@ int zmq::queue (class socket_base_t *insocket_, errno_assert (rc > 0); // The algorithm below asumes ratio of request and replies processed - // under full load to be 1:1. The alternative would be to process - // replies first, handle request only when there are no more replies. + // under full load to be 1:1. While processing requests replies first + // is tempting it is suspectible to DoS attacks (overloading the system + // with unsolicited replies). // Receive a new request. if (items [0].revents & ZMQ_POLLIN) { diff --git a/src/simple_semaphore.hpp b/src/simple_semaphore.hpp deleted file mode 100644 index 78d72e5..0000000 --- a/src/simple_semaphore.hpp +++ /dev/null @@ -1,242 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_SIMPLE_SEMAPHORE_HPP_INCLUDED__ -#define __ZMQ_SIMPLE_SEMAPHORE_HPP_INCLUDED__ - -#include "platform.hpp" -#include "err.hpp" - -#if 0 //defined ZMQ_HAVE_LINUX -#include <sys/syscall.h> -#include <unistd.h> -#include <linux/futex.h> -#elif defined ZMQ_HAVE_LINUX ||defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS -#include <pthread.h> -#elif defined ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#else -#include <semaphore.h> -#endif - -namespace zmq -{ - // Simple semaphore. Only single thread may be waiting at any given time. - // Also, the semaphore may not be posted before the previous post - // was matched by corresponding wait and the waiting thread was - // released. - -#if 0 //defined ZMQ_HAVE_LINUX - - // In theory, using private futexes should be more efficient on Linux - // platform than using mutexes. However, in uncontended cases of TCP - // transport on loopback interface we haven't seen any latency improvement. - // The code is commented out waiting for more thorough testing. - - class simple_semaphore_t - { - public: - - // Initialise the semaphore. - inline simple_semaphore_t () : - dummy (0) - { - } - - // Destroy the semaphore. - inline ~simple_semaphore_t () - { - } - - // Wait for the semaphore. - inline void wait () - { - int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAIT_PRIVATE, - (int) 0, NULL, NULL, (int) 0); - zmq_assert (rc == 0); - } - - // Post the semaphore. - inline void post () - { - while (true) { - int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAKE_PRIVATE, - (int) 1, NULL, NULL, (int) 0); - zmq_assert (rc != -1 && rc <= 1); - if (rc == 1) - break; - } - } - - private: - - int dummy; - - // Disable copying of the object. - simple_semaphore_t (const simple_semaphore_t&); - void operator = (const simple_semaphore_t&); - }; - -#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS - - // On platforms that allow for double locking of a mutex from the same - // thread, simple semaphore is implemented using mutex, as it is more - // efficient than full-blown semaphore. - - class simple_semaphore_t - { - public: - - // Initialise the semaphore. - inline simple_semaphore_t () - { - int rc = pthread_mutex_init (&mutex, NULL); - posix_assert (rc); - rc = pthread_mutex_lock (&mutex); - posix_assert (rc); - } - - // Destroy the semaphore. - inline ~simple_semaphore_t () - { - int rc = pthread_mutex_unlock (&mutex); - posix_assert (rc); - rc = pthread_mutex_destroy (&mutex); - posix_assert (rc); - } - - // Wait for the semaphore. - inline void wait () - { - int rc = pthread_mutex_lock (&mutex); - posix_assert (rc); - } - - // Post the semaphore. - inline void post () - { - int rc = pthread_mutex_unlock (&mutex); - posix_assert (rc); - } - - private: - - pthread_mutex_t mutex; - - // Disable copying of the object. - simple_semaphore_t (const simple_semaphore_t&); - void operator = (const simple_semaphore_t&); - }; - -#elif defined ZMQ_HAVE_WINDOWS - - // On Windows platform simple semaphore is implemeted using event object. - - class simple_semaphore_t - { - public: - - // Initialise the semaphore. - inline simple_semaphore_t () - { - ev = CreateEvent (NULL, FALSE, FALSE, NULL); - win_assert (ev != NULL); - } - - // Destroy the semaphore. - inline ~simple_semaphore_t () - { - int rc = CloseHandle (ev); - win_assert (rc != 0); - } - - // Wait for the semaphore. - inline void wait () - { - DWORD rc = WaitForSingleObject (ev, INFINITE); - win_assert (rc != WAIT_FAILED); - } - - // Post the semaphore. - inline void post () - { - int rc = SetEvent (ev); - win_assert (rc != 0); - } - - private: - - HANDLE ev; - - // Disable copying of the object. - simple_semaphore_t (const simple_semaphore_t&); - void operator = (const simple_semaphore_t&); - }; - -#else - - // Default implementation maps simple semaphore to standard semaphore. - - class simple_semaphore_t - { - public: - - // Initialise the semaphore. - inline simple_semaphore_t () - { - int rc = sem_init (&sem, 0, 0); - errno_assert (rc != -1); - } - - // Destroy the semaphore. - inline ~simple_semaphore_t () - { - int rc = sem_destroy (&sem); - errno_assert (rc != -1); - } - - // Wait for the semaphore. - inline void wait () - { - int rc = sem_wait (&sem); - errno_assert (rc != -1); - } - - // Post the semaphore. - inline void post () - { - int rc = sem_post (&sem); - errno_assert (rc != -1); - } - - private: - - // Underlying system semaphore object. - sem_t sem; - - // Disable copying of the object. - simple_semaphore_t (const simple_semaphore_t&); - void operator = (const simple_semaphore_t&); - }; - -#endif - -} - -#endif diff --git a/src/ypollset.cpp b/src/ypollset.cpp deleted file mode 100644 index 51284a8..0000000 --- a/src/ypollset.cpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "ypollset.hpp" - -zmq::ypollset_t::ypollset_t () -{ -} - -zmq::ypollset_t::~ypollset_t () -{ -} - -void zmq::ypollset_t::signal (int signal_) -{ - zmq_assert (signal_ >= 0 && signal_ < wait_signal); - if (bits.btsr (signal_, wait_signal)) - sem.post (); -} - -uint64_t zmq::ypollset_t::poll () -{ - signals_t result = 0; - while (!result) { - result = bits.izte (signals_t (1) << wait_signal, 0); - if (!result) { - sem.wait (); - result = bits.xchg (0); - } - - // If btsr was really atomic, result would never be 0 at this - // point, i.e. no looping would be possible. However, to - // support even CPU architectures without CAS instruction - // we allow btsr to be composed of two independent atomic - // operation (set and reset). In such case looping can occur - // sporadically. - } - return (uint64_t) result; -} - -uint64_t zmq::ypollset_t::check () -{ - return (uint64_t) bits.xchg (0); -} - -zmq::fd_t zmq::ypollset_t::get_fd () -{ - return retired_fd; -} diff --git a/src/ypollset.hpp b/src/ypollset.hpp deleted file mode 100644 index c75b149..0000000 --- a/src/ypollset.hpp +++ /dev/null @@ -1,69 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_YPOLLSET_HPP_INCLUDED__ -#define __ZMQ_YPOLLSET_HPP_INCLUDED__ - -#include "i_signaler.hpp" -#include "simple_semaphore.hpp" -#include "atomic_bitmap.hpp" - -namespace zmq -{ - - // ypollset allows for rapid polling for up to constant number of - // different signals each produced by a different thread. The number of - // possible signals is platform-dependent. - - class ypollset_t : public i_signaler - { - public: - - ypollset_t (); - ~ypollset_t (); - - // i_signaler interface implementation. - void signal (int signal_); - uint64_t poll (); - uint64_t check (); - fd_t get_fd (); - - private: - - // Internal representation of signal bitmap. - typedef atomic_bitmap_t::bitmap_t signals_t; - - // Wait signal is carried in the most significant bit of integer. - enum {wait_signal = sizeof (signals_t) * 8 - 1}; - - // The bits of the pollset. - atomic_bitmap_t bits; - - // Used by thread waiting for signals to sleep if there are no - // signals available. - simple_semaphore_t sem; - - // Disable copying of ypollset object. - ypollset_t (const ypollset_t&); - void operator = (const ypollset_t&); - }; - -} - -#endif diff --git a/src/zmq.cpp b/src/zmq.cpp index 74d52f6..c05af25 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -228,14 +228,7 @@ size_t zmq_msg_size (zmq_msg_t *msg_) void *zmq_init (int app_threads_, int io_threads_, int flags_) { - // There should be at least a single application thread managed - // by the dispatcher. There's no need for I/O threads if 0MQ is used - // only for inproc messaging - if (app_threads_ < 1 || io_threads_ < 0 || - app_threads_ > 63 || io_threads_ > 63) { - errno = EINVAL; - return NULL; - } + // There are no context flags defined at the moment, so flags_ is ignored. #if defined ZMQ_HAVE_OPENPGM // Unfortunately, OpenPGM doesn't support refcounted init/shutdown, thus, @@ -269,7 +262,7 @@ void *zmq_init (int app_threads_, int io_threads_, int flags_) // Create 0MQ context. zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t ( - app_threads_, io_threads_, flags_); + app_threads_, io_threads_); zmq_assert (dispatcher); return (void*) dispatcher; } |