diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile.am | 5 | ||||
| -rw-r--r-- | src/app_thread.cpp | 22 | ||||
| -rw-r--r-- | src/app_thread.hpp | 8 | ||||
| -rw-r--r-- | src/atomic_bitmap.hpp | 310 | ||||
| -rw-r--r-- | src/dispatcher.cpp | 7 | ||||
| -rw-r--r-- | src/dispatcher.hpp | 6 | ||||
| -rw-r--r-- | src/fd_signaler.hpp | 3 | ||||
| -rw-r--r-- | src/i_signaler.hpp | 55 | ||||
| -rw-r--r-- | src/io_thread.cpp | 6 | ||||
| -rw-r--r-- | src/io_thread.hpp | 5 | ||||
| -rw-r--r-- | src/object.cpp | 1 | ||||
| -rw-r--r-- | src/queue.cpp | 5 | ||||
| -rw-r--r-- | src/simple_semaphore.hpp | 242 | ||||
| -rw-r--r-- | src/ypollset.cpp | 65 | ||||
| -rw-r--r-- | src/ypollset.hpp | 69 | ||||
| -rw-r--r-- | src/zmq.cpp | 11 | 
16 files changed, 25 insertions, 795 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 8cd94dd..837cd5f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -50,7 +50,6 @@ endif  nodist_libzmq_la_SOURCES = $(pgm_sources)  libzmq_la_SOURCES = app_thread.hpp \ -    atomic_bitmap.hpp \      atomic_counter.hpp \      atomic_ptr.hpp \      blob.hpp \ @@ -74,7 +73,6 @@ libzmq_la_SOURCES = app_thread.hpp \      i_endpoint.hpp \      i_engine.hpp \      i_poll_events.hpp \ -    i_signaler.hpp \      kqueue.hpp \      lb.hpp \      likely.hpp \ @@ -98,7 +96,6 @@ libzmq_la_SOURCES = app_thread.hpp \      req.hpp \      select.hpp \      session.hpp \ -    simple_semaphore.hpp \      socket_base.hpp \      stdint.hpp \      streamer.hpp \ @@ -116,7 +113,6 @@ libzmq_la_SOURCES = app_thread.hpp \      yarray.hpp \      yarray_item.hpp \      ypipe.hpp \ -    ypollset.hpp \      yqueue.hpp \      zmq_connecter.hpp \      zmq_decoder.hpp \ @@ -166,7 +162,6 @@ libzmq_la_SOURCES = app_thread.hpp \      uuid.cpp \      xrep.cpp \      xreq.cpp \ -    ypollset.cpp \      zmq.cpp \      zmq_connecter.cpp \      zmq_decoder.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 9ff2112..0dad660 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -36,7 +36,6 @@  #include "app_thread.hpp"  #include "dispatcher.hpp"  #include "fd_signaler.hpp" -#include "ypollset.hpp"  #include "err.hpp"  #include "pipe.hpp"  #include "config.hpp" @@ -59,27 +58,16 @@  #define ZMQ_DELAY_COMMANDS  #endif -zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_, -      int flags_) : +zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :      object_t (dispatcher_, thread_slot_),      last_processing_time (0),      terminated (false)  { -    if (flags_ & ZMQ_POLL) { -        signaler = new (std::nothrow) fd_signaler_t; -        zmq_assert (signaler); -    } -    else { -        signaler = new (std::nothrow) ypollset_t; -        zmq_assert (signaler); -    }  }  zmq::app_thread_t::~app_thread_t ()  {      zmq_assert (sockets.empty ()); -    zmq_assert (signaler); -    delete signaler;  }  void zmq::app_thread_t::stop () @@ -87,16 +75,16 @@ void zmq::app_thread_t::stop ()      send_stop ();  } -zmq::i_signaler *zmq::app_thread_t::get_signaler () +zmq::fd_signaler_t *zmq::app_thread_t::get_signaler ()  { -    return signaler; +    return &signaler;  }  bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)  {      uint64_t signals;      if (block_) -        signals = signaler->poll (); +        signals = signaler.poll ();      else {  #if defined ZMQ_DELAY_COMMANDS @@ -129,7 +117,7 @@ bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)  #endif          //  Check whether there are any commands pending for this thread. -        signals = signaler->check (); +        signals = signaler.check ();      }      if (signals) { diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 1c2f47a..b7572da 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -25,6 +25,7 @@  #include "stdint.hpp"  #include "object.hpp"  #include "yarray.hpp" +#include "fd_signaler.hpp"  namespace zmq  { @@ -33,8 +34,7 @@ namespace zmq      {      public: -        app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_, -            int flags_); +        app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);          ~app_thread_t (); @@ -43,7 +43,7 @@ namespace zmq          void stop ();          //  Returns signaler associated with this application thread. -        struct i_signaler *get_signaler (); +        fd_signaler_t *get_signaler ();          //  Processes commands sent to this thread (if any). If 'block' is          //  set to true, returns only after at least one command was processed. @@ -71,7 +71,7 @@ namespace zmq          sockets_t sockets;          //  App thread's signaler object. -        struct i_signaler *signaler; +        fd_signaler_t signaler;          //  Timestamp of when commands were processed the last time.          uint64_t last_processing_time; diff --git a/src/atomic_bitmap.hpp b/src/atomic_bitmap.hpp deleted file mode 100644 index 29bdc13..0000000 --- a/src/atomic_bitmap.hpp +++ /dev/null @@ -1,310 +0,0 @@ -/* -    Copyright (c) 2007-2010 iMatix Corporation - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the Lesser GNU General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    Lesser GNU General Public License for more details. - -    You should have received a copy of the Lesser GNU General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_ATOMIC_BITMAP_HPP_INCLUDED__ -#define __ZMQ_ATOMIC_BITMAP_HPP_INCLUDED__ - -#include "stdint.hpp" -#include "platform.hpp" - -//  These are the conditions to choose between different implementations -//  of atomic_bitmap. - -#if defined ZMQ_FORCE_MUTEXES -#define ZMQ_ATOMIC_BITMAP_MUTEX -#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ -#define ZMQ_ATOMIC_BITMAP_X86 -#elif 0 && defined __sparc__ && defined __GNUC__ -#define ZMQ_ATOMIC_BITMAP_SPARC -#elif defined ZMQ_HAVE_WINDOWS -#define ZMQ_ATOMIC_BITMAP_WINDOWS -#elif defined sun -#define ZMQ_ATOMIC_COUNTER_SUN -#elif defined( __GNUC__ ) && ( __GNUC__ * 100 + __GNUC_MINOR__ >= 401 ) -#define ZMQ_ATOMIC_COUNTER_GNU -#else -#define ZMQ_ATOMIC_BITMAP_MUTEX -#endif - -#if defined ZMQ_ATOMIC_BITMAP_MUTEX -#include "mutex.hpp" -#elif defined ZMQ_ATOMIC_BITMAP_WINDOWS -#include "windows.hpp" -#elif defined ZMQ_ATOMIC_BITMAP_SUN -#include <atomic.h> -#endif - -namespace zmq -{ - -    //  This class encapuslates several bitwise atomic operations on unsigned -    //  integer. Selection of operations is driven specifically by the needs -    //  of ypollset implementation. - -    class atomic_bitmap_t -    { -    public: - -#if (defined ZMQ_ATOMIC_BITMAP_X86 || defined ZMQ_FORCE_MUTEXES) \ -    && defined __x86_64__ -        typedef uint64_t bitmap_t; -#else -        typedef uint32_t bitmap_t; -#endif - -        inline atomic_bitmap_t (bitmap_t value_ = 0) : -            value (value_) -        { -        } - -        inline ~atomic_bitmap_t () -        { -        } - -        //  Bit-test-set-and-reset. Sets one bit of the value and resets -        //  another one. Returns the original value of the reset bit. -        inline bool btsr (int set_index_, int reset_index_) -        { -#if defined ZMQ_ATOMIC_BITMAP_WINDOWS -            while (true) { -                bitmap_t oldval = value; -                bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) & -                    ~(bitmap_t (1) << reset_index_); -                if (InterlockedCompareExchange ((volatile LONG*) &value, newval, -                      oldval) == (LONG) oldval) -                    return (oldval & (bitmap_t (1) << reset_index_)) ? -                        true : false; -            } -#elif defined ZMQ_ATOMIC_BITMAP_GNU -            while (true) { -                bitmap_t oldval = value; -                bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) & -                    ~(bitmap_t (1) << reset_index_); -                if (__sync_val_compare_and_swap (&value, oldval, newval) == oldval) -                    return (oldval & (bitmap_t (1) << reset_index_)) ? -                        true : false; -            } -#elif defined ZMQ_ATOMIC_BITMAP_SUN -            while (true) { -                bitmap_t oldval = value; -                bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) & -                    ~(bitmap_t (1) << reset_index_); -                if (atomic_cas_32 (&value, oldval, newval) == oldval) -                    return (oldval & (bitmap_t (1) << reset_index_)) ? -                        true : false; -            } -#elif defined ZMQ_ATOMIC_BITMAP_X86 -            bitmap_t oldval, dummy; -            __asm__ volatile ( -                "mov %0, %1\n\t" -                "1:" -                "mov %1, %2\n\t" -                "bts %3, %2\n\t" -                "btr %4, %2\n\t" -                "lock cmpxchg %2, %0\n\t" -                "jnz 1b\n\t" -                : "+m" (value), "=&a" (oldval), "=&r" (dummy) -                : "r" (bitmap_t(set_index_)), "r" (bitmap_t(reset_index_)) -                : "cc"); -            return (bool) (oldval & (bitmap_t(1) << reset_index_)); -#elif defined ZMQ_ATOMIC_BITMAP_SPARC -            volatile bitmap_t* valptr = &value; -            bitmap_t set_val = bitmap_t(1) << set_index_; -            bitmap_t reset_val = ~(bitmap_t(1) << reset_index_); -            bitmap_t tmp; -            bitmap_t oldval; -            __asm__ volatile( -                "ld [%5], %2 \n\t" -                "1: \n\t" -                "or %2, %0, %3 \n\t" -                "and %3, %1, %3 \n\t" -                "cas [%5], %2, %3 \n\t" -                "cmp %2, %3 \n\t" -                "bne,a,pn %%icc, 1b \n\t" -                "mov %3, %2 \n\t" -                : "+r" (set_val), "+r" (reset_val), "=&r" (tmp), -                  "=&r" (oldval), "+m" (*valptr) -                : "r" (valptr) -                : "cc"); -            return oldval; -#elif defined ZMQ_ATOMIC_BITMAP_MUTEX -            sync.lock (); -            bitmap_t oldval = value; -            value = (oldval | (bitmap_t (1) << set_index_)) & -                ~(bitmap_t (1) << reset_index_); -            sync.unlock (); -            return (oldval & (bitmap_t (1) << reset_index_)) ? true : false; -#else -#error -#endif -        } - -        //  Sets value to newval. Returns the original value. -        inline bitmap_t xchg (bitmap_t newval_) -        { -            bitmap_t oldval; -#if defined ZMQ_ATOMIC_BITMAP_WINDOWS -            oldval = InterlockedExchange ((volatile LONG*) &value, newval_); -#elif defined ZMQ_ATOMIC_BITMAP_GNU -            oldval = __sync_lock_test_and_set (&value, newval_); -#elif defined ZMQ_ATOMIC_BITMAP_SUN -            oldval = atomic_swap_32 (&value, newval_); -#elif defined ZMQ_ATOMIC_BITMAP_X86 -            oldval = newval_; -            __asm__ volatile ( -                "lock; xchg %0, %1" -                : "=r" (oldval) -                : "m" (value), "0" (oldval) -                : "memory"); -#elif defined ZMQ_ATOMIC_BITMAP_SPARC -            oldval = value; -            volatile bitmap_t* ptrin = &value; -            bitmap_t tmp; -            bitmap_t prev; -            __asm__ __volatile__( -                "ld [%4], %1\n\t" -                "1:\n\t" -                "mov %0, %2\n\t" -                "cas [%4], %1, %2\n\t" -                "cmp %1, %2\n\t" -                "bne,a,pn %%icc, 1b\n\t" -                "mov %2, %1\n\t" -                : "+r" (newval_), "=&r" (tmp), "=&r" (prev), "+m" (*ptrin) -                : "r" (ptrin) -                : "cc"); -            return prev; -#elif defined ZMQ_ATOMIC_BITMAP_MUTEX -            sync.lock (); -            oldval = value; -            value = newval_; -            sync.unlock (); -#else -#error -#endif -            return oldval; -        } - -        //  izte is "if-zero-then-else" atomic operation - if the value is zero -        //  it substitutes it by 'thenval' else it rewrites it by 'elseval'. -        //  Original value of the integer is returned from this function. -        inline bitmap_t izte (bitmap_t thenval_, -            bitmap_t elseval_) -        { -#if defined ZMQ_ATOMIC_BITMAP_WINDOWS -            while (true) { -                bitmap_t oldval = value; -                bitmap_t newval = oldval == 0 ? thenval_ : elseval_; -                if (InterlockedCompareExchange ((volatile LONG*) &value, -                      newval, oldval) == (LONG) oldval) -                    return oldval; -            } -#elif defined ZMQ_ATOMIC_BITMAP_GNU -            while (true) { -                bitmap_t oldval = value; -                bitmap_t newval = oldval == 0 ? thenval_ : elseval_; -                if (__sync_val_compare_and_swap (&value, oldval, newval) == oldval) -                    return oldval; -            } -#elif defined ZMQ_ATOMIC_BITMAP_SUN -            while (true) { -                bitmap_t oldval = value; -                bitmap_t newval = oldval == 0 ? thenval_ : elseval_; -                if (atomic_cas_32 (&value, oldval, newval) == oldval) -                    return oldval; -            } -#elif defined ZMQ_ATOMIC_BITMAP_X86 -            bitmap_t oldval; -            bitmap_t dummy; -            __asm__ volatile ( -                "mov %0, %1\n\t" -                "1:" -                "mov %3, %2\n\t" -                "test %1, %1\n\t" -                "jz 2f\n\t" -                "mov %4, %2\n\t" -                "2:" -                "lock cmpxchg %2, %0\n\t" -                "jnz 1b\n\t" -                : "+m" (value), "=&a" (oldval), "=&r" (dummy) -                : "r" (thenval_), "r" (elseval_) -                : "cc"); -            return oldval; -#elif defined ZMQ_ATOMIC_BITMAP_SPARC -            volatile bitmap_t* ptrin = &value; -            bitmap_t tmp; -            bitmap_t prev; -            __asm__ __volatile__( -                "ld [%3], %0 \n\t" -                "mov 0, %1 \n\t" -                "cas [%3], %1, %4 \n\t" -                "cmp %0, %1 \n\t" -                "be,a,pn %%icc,1f \n\t" -                "ld [%3], %0 \n\t" -                "cas [%3], %0, %5 \n\t" -                "1: \n\t" -                : "=&r" (tmp), "=&r" (prev), "+m" (*ptrin) -                : "r" (ptrin), "r" (thenval_), "r" (elseval_) -                : "cc"); -            return prev; -#elif defined ZMQ_ATOMIC_BITMAP_MUTEX -            sync.lock (); -            bitmap_t oldval = value; -            value = oldval ? elseval_ : thenval_; -            sync.unlock (); -            return oldval; -#else -#error -#endif -        } - -    private: - -        volatile bitmap_t value; -#if defined ZMQ_ATOMIC_BITMAP_MUTEX -        mutex_t sync; -#endif - -        atomic_bitmap_t (const atomic_bitmap_t&); -        void operator = (const atomic_bitmap_t&); -    }; - -} - -//  Remove macros local to this file. -#if defined ZMQ_ATOMIC_BITMAP_WINDOWS -#undef ZMQ_ATOMIC_BITMAP_WINDOWS -#endif -#if defined ZMQ_ATOMIC_BITMAP_GNU -#undef ZMQ_ATOMIC_BITMAP_GNU -#endif -#if defined ZMQ_ATOMIC_BITMAP_SUN -#undef ZMQ_ATOMIC_BITMAP_SUN -#endif -#if defined ZMQ_ATOMIC_BITMAP_X86 -#undef ZMQ_ATOMIC_BITMAP_X86 -#endif -#if defined ZMQ_ATOMIC_BITMAP_SPARC -#undef ZMQ_ATOMIC_BITMAP_SPARC -#endif -#if defined ZMQ_ATOMIC_BITMAP_MUTEX -#undef ZMQ_ATOMIC_BITMAP_MUTEX -#endif - -#endif - diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index a1154de..1e11619 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -33,8 +33,7 @@  #include "windows.h"  #endif -zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_, -      int flags_) : +zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :      sockets (0),      terminated (false)  { @@ -53,7 +52,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,      for (int i = 0; i != app_threads_; i++) {          app_thread_info_t info;          info.associated = false; -        info.app_thread = new (std::nothrow) app_thread_t (this, i, flags_); +        info.app_thread = new (std::nothrow) app_thread_t (this, i);          zmq_assert (info.app_thread);          app_threads.push_back (info);          signalers.push_back (info.app_thread->get_signaler ()); @@ -62,7 +61,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,      //  Create I/O thread objects.      for (int i = 0; i != io_threads_; i++) {          io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, -            i + app_threads_, flags_); +            i + app_threads_);          zmq_assert (io_thread);          io_threads.push_back (io_thread);          signalers.push_back (io_thread->get_signaler ()); diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index 1cd5207..6648f5d 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -25,7 +25,7 @@  #include <map>  #include <string> -#include "i_signaler.hpp" +#include "fd_signaler.hpp"  #include "ypipe.hpp"  #include "command.hpp"  #include "config.hpp" @@ -51,7 +51,7 @@ namespace zmq          //  Create the dispatcher object. Matrix of pipes to communicate between          //  each socket and each I/O thread is created along with appropriate          //  signalers. -        dispatcher_t (int app_threads_, int io_threads_, int flags_); +        dispatcher_t (int app_threads_, int io_threads_);          //  This function is called when user invokes zmq_term. If there are          //  no more sockets open it'll cause all the infrastructure to be shut @@ -125,7 +125,7 @@ namespace zmq          io_threads_t io_threads;          //  Signalers for both application and I/O threads. -        std::vector <i_signaler*> signalers; +        std::vector <fd_signaler_t*> signalers;          //  Pipe to hold the commands.          typedef ypipe_t <command_t, true, diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp index f2037a9..a6ccbba 100644 --- a/src/fd_signaler.hpp +++ b/src/fd_signaler.hpp @@ -21,7 +21,6 @@  #define __ZMQ_FD_SIGNALER_HPP_INCLUDED__  #include "platform.hpp" -#include "i_signaler.hpp"  #include "fd.hpp"  #include "stdint.hpp" @@ -33,7 +32,7 @@ namespace zmq      //  descriptor and so it can be polled on. Same signal cannot be sent twice      //  unless signals are retrieved by the reader side in the meantime. -    class fd_signaler_t : public i_signaler +    class fd_signaler_t      {      public: diff --git a/src/i_signaler.hpp b/src/i_signaler.hpp deleted file mode 100644 index 822ab8e..0000000 --- a/src/i_signaler.hpp +++ /dev/null @@ -1,55 +0,0 @@ -/* -    Copyright (c) 2007-2010 iMatix Corporation - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the Lesser GNU General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    Lesser GNU General Public License for more details. - -    You should have received a copy of the Lesser GNU General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_I_SIGNALER_HPP_INCLUDED__ -#define __ZMQ_I_SIGNALER_HPP_INCLUDED__ - -#include "stdint.hpp" -#include "fd.hpp" - -namespace zmq -{ -    //  Virtual interface used to send signals. Individual implementations -    //  may restrict the number of possible signal types to send. - -    struct i_signaler -    { -        virtual ~i_signaler () {}; - -        //  Send a signal with a specific ID. -        virtual void signal (int signal_) = 0; - -        //  Wait for signal. Returns a set of signals in form of a bitmap. -        //  Signal with index 0 corresponds to value 1, index 1 to value 2, -        //  index 2 to value 3 etc. -        virtual uint64_t poll () = 0; - -        //  Same as poll, however, if there is no signal available, -        //  function returns zero immediately instead of waiting for a signal. -        virtual uint64_t check () = 0; - -        //  Returns file descriptor that allows waiting for signals. Specific -        //  signalers may not support this functionality. If so, the function -        //  returns retired_fd. -        virtual fd_t get_fd () = 0; -    }; - -} - -#endif diff --git a/src/io_thread.cpp b/src/io_thread.cpp index ff45478..41f7f7d 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -27,10 +27,8 @@  #include "err.hpp"  #include "command.hpp"  #include "dispatcher.hpp" -#include "simple_semaphore.hpp" -zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_, -      int flags_) : +zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :      object_t (dispatcher_, thread_slot_)  {      poller = new (std::nothrow) poller_t; @@ -56,7 +54,7 @@ void zmq::io_thread_t::stop ()      send_stop ();  } -zmq::i_signaler *zmq::io_thread_t::get_signaler () +zmq::fd_signaler_t *zmq::io_thread_t::get_signaler ()  {      return &signaler;  } diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 9377515..84b9319 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -38,8 +38,7 @@ namespace zmq      {      public: -        io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_, -            int flags_); +        io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);          //  Clean-up. If the thread was started, it's neccessary to call 'stop'          //  before invoking destructor. Otherwise the destructor would hang up. @@ -52,7 +51,7 @@ namespace zmq          void stop ();          //  Returns signaler associated with this I/O thread. -        i_signaler *get_signaler (); +        fd_signaler_t *get_signaler ();          //  i_poll_events implementation.          void in_event (); diff --git a/src/object.cpp b/src/object.cpp index 9221280..113a456 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -24,7 +24,6 @@  #include "err.hpp"  #include "pipe.hpp"  #include "io_thread.hpp" -#include "simple_semaphore.hpp"  #include "owned.hpp"  #include "session.hpp"  #include "socket_base.hpp" diff --git a/src/queue.cpp b/src/queue.cpp index 165e360..05cd125 100644 --- a/src/queue.cpp +++ b/src/queue.cpp @@ -51,8 +51,9 @@ int zmq::queue (class socket_base_t *insocket_,          errno_assert (rc > 0);          //  The algorithm below asumes ratio of request and replies processed -        //  under full load to be 1:1. The alternative would be to process -        //  replies first, handle request only when there are no more replies. +        //  under full load to be 1:1. While processing requests replies first +        //  is tempting it is suspectible to DoS attacks (overloading the system +        //  with unsolicited replies).          //  Receive a new request.          if (items [0].revents & ZMQ_POLLIN) { diff --git a/src/simple_semaphore.hpp b/src/simple_semaphore.hpp deleted file mode 100644 index 78d72e5..0000000 --- a/src/simple_semaphore.hpp +++ /dev/null @@ -1,242 +0,0 @@ -/* -    Copyright (c) 2007-2010 iMatix Corporation - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the Lesser GNU General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    Lesser GNU General Public License for more details. - -    You should have received a copy of the Lesser GNU General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_SIMPLE_SEMAPHORE_HPP_INCLUDED__ -#define __ZMQ_SIMPLE_SEMAPHORE_HPP_INCLUDED__ - -#include "platform.hpp" -#include "err.hpp" - -#if 0 //defined ZMQ_HAVE_LINUX -#include <sys/syscall.h> -#include <unistd.h> -#include <linux/futex.h> -#elif defined ZMQ_HAVE_LINUX ||defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS -#include <pthread.h> -#elif defined ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#else -#include <semaphore.h> -#endif - -namespace zmq -{ -    //  Simple semaphore. Only single thread may be waiting at any given time. -    //  Also, the semaphore may not be posted before the previous post -    //  was matched by corresponding wait and the waiting thread was -    //  released. - -#if 0 //defined ZMQ_HAVE_LINUX - -    //  In theory, using private futexes should be more efficient on Linux -    //  platform than using mutexes. However, in uncontended cases of TCP -    //  transport on loopback interface we haven't seen any latency improvement. -    //  The code is commented out waiting for more thorough testing. - -    class simple_semaphore_t -    {  -    public: - -        //  Initialise the semaphore. -        inline simple_semaphore_t () : -            dummy (0) -        { -        } - -        //  Destroy the semaphore. -        inline ~simple_semaphore_t () -        { -        } - -        //  Wait for the semaphore. -        inline void wait () -        { -            int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAIT_PRIVATE, -                (int) 0, NULL, NULL, (int) 0); -            zmq_assert (rc == 0); -        } - -        //  Post the semaphore. -        inline void post () -        { -            while (true) { -                int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAKE_PRIVATE, -                    (int) 1, NULL, NULL, (int) 0); -                zmq_assert (rc != -1 && rc <= 1); -                if (rc == 1) -                    break; -            } -        } - -    private: - -        int dummy; - -        //  Disable copying of the object. -        simple_semaphore_t (const simple_semaphore_t&); -        void operator = (const simple_semaphore_t&); -    }; - -#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS - -    //  On platforms that allow for double locking of a mutex from the same -    //  thread, simple semaphore is implemented using mutex, as it is more -    //  efficient than full-blown semaphore. - -    class simple_semaphore_t -    {  -    public: - -        //  Initialise the semaphore. -        inline simple_semaphore_t () -        { -            int rc = pthread_mutex_init (&mutex, NULL); -            posix_assert (rc); -            rc = pthread_mutex_lock (&mutex); -            posix_assert (rc); -        } - -        //  Destroy the semaphore. -        inline ~simple_semaphore_t () -        { -            int rc = pthread_mutex_unlock (&mutex); -            posix_assert (rc); -            rc = pthread_mutex_destroy (&mutex); -            posix_assert (rc); -        } - -        //  Wait for the semaphore. -        inline void wait () -        { -             int rc = pthread_mutex_lock (&mutex); -             posix_assert (rc); -        } - -        //  Post the semaphore. -        inline void post () -        { -            int rc = pthread_mutex_unlock (&mutex); -            posix_assert (rc); -        } - -    private: - -        pthread_mutex_t mutex; - -        //  Disable copying of the object. -        simple_semaphore_t (const simple_semaphore_t&); -        void operator = (const simple_semaphore_t&); -    }; - -#elif defined ZMQ_HAVE_WINDOWS - -    //  On Windows platform simple semaphore is implemeted using event object. - -    class simple_semaphore_t -    {  -    public: - -        //  Initialise the semaphore. -        inline simple_semaphore_t () -        { -            ev = CreateEvent (NULL, FALSE, FALSE, NULL); -            win_assert (ev != NULL); -        } - -        //  Destroy the semaphore. -        inline ~simple_semaphore_t () -        { -            int rc = CloseHandle (ev); -            win_assert (rc != 0);     -        } - -        //  Wait for the semaphore. -        inline void wait () -        { -            DWORD rc = WaitForSingleObject (ev, INFINITE); -            win_assert (rc != WAIT_FAILED); -        } - -        //  Post the semaphore. -        inline void post () -        { -            int rc = SetEvent (ev); -            win_assert (rc != 0); -        } - -    private: - -        HANDLE ev; - -        //  Disable copying of the object. -        simple_semaphore_t (const simple_semaphore_t&); -        void operator = (const simple_semaphore_t&); -    }; - -#else - -    //  Default implementation maps simple semaphore to standard semaphore. - -    class simple_semaphore_t -    {  -    public: - -        //  Initialise the semaphore. -        inline simple_semaphore_t () -        { -             int rc = sem_init (&sem, 0, 0); -             errno_assert (rc != -1); -        } - -        //  Destroy the semaphore. -        inline ~simple_semaphore_t () -        { -             int rc = sem_destroy (&sem); -             errno_assert (rc != -1); -        } - -        //  Wait for the semaphore. -        inline void wait () -        { -             int rc = sem_wait (&sem); -             errno_assert (rc != -1); -        } - -        //  Post the semaphore. -        inline void post () -        { -            int rc = sem_post (&sem); -            errno_assert (rc != -1); -        } - -    private: - -        //  Underlying system semaphore object. -        sem_t sem; - -        //  Disable copying of the object. -        simple_semaphore_t (const simple_semaphore_t&); -        void operator = (const simple_semaphore_t&); -    }; - -#endif - -} - -#endif diff --git a/src/ypollset.cpp b/src/ypollset.cpp deleted file mode 100644 index 51284a8..0000000 --- a/src/ypollset.cpp +++ /dev/null @@ -1,65 +0,0 @@ -/* -    Copyright (c) 2007-2010 iMatix Corporation - -    This file is part of 0MQ. - -    0MQ is free software; you can redistribute it and/or modify it under -    the terms of the Lesser GNU General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    0MQ is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    Lesser GNU General Public License for more details. - -    You should have received a copy of the Lesser GNU General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "ypollset.hpp" - -zmq::ypollset_t::ypollset_t () -{ -} - -zmq::ypollset_t::~ypollset_t () -{ -} - -void zmq::ypollset_t::signal (int signal_) -{ -    zmq_assert (signal_ >= 0 && signal_ < wait_signal); -    if (bits.btsr (signal_, wait_signal)) -        sem.post ();  -} - -uint64_t zmq::ypollset_t::poll () -{ -    signals_t result = 0;  | 
