From ae93ed318a450d6d763a5f629d478467f7362b07 Mon Sep 17 00:00:00 2001
From: Martin Sustrik <sustrik@250bpm.com>
Date: Thu, 29 Apr 2010 20:34:48 +0200
Subject: signaler rewritten in such a way that any number (>64) of threads can
 be used

---
 src/app_thread.cpp |  25 +++----
 src/app_thread.hpp |   2 +-
 src/config.hpp     |   4 ++
 src/dispatcher.cpp |  16 ++---
 src/dispatcher.hpp |  10 +--
 src/io_thread.cpp  |  30 ++++----
 src/io_thread.hpp  |   2 +-
 src/object.cpp     |  13 ++--
 src/object.hpp     |   9 +--
 src/signaler.cpp   | 200 +++++++++++++++++++++++++++++++----------------------
 src/signaler.hpp   |  24 +++++--
 src/zmq.cpp        |   2 +-
 12 files changed, 190 insertions(+), 147 deletions(-)

(limited to 'src')

diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 10068c0..1350248 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -57,7 +57,8 @@
 #define ZMQ_DELAY_COMMANDS
 #endif
 
-zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
+zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_,
+        uint32_t thread_slot_) :
     object_t (dispatcher_, thread_slot_),
     last_processing_time (0),
     terminated (false)
@@ -81,9 +82,9 @@ zmq::signaler_t *zmq::app_thread_t::get_signaler ()
 
 bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)
 {
-    uint64_t signals;
+    uint32_t signal;
     if (block_)
-        signals = signaler.poll ();
+        signal = signaler.poll ();
     else {
 
 #if defined ZMQ_DELAY_COMMANDS
@@ -116,20 +117,14 @@ bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)
 #endif
 
         //  Check whether there are any commands pending for this thread.
-        signals = signaler.check ();
+        signal = signaler.check ();
     }
 
-    if (signals) {
-
-        //  Traverse all the possible sources of commands and process
-        //  all the commands from all of them.
-        for (int i = 0; i != thread_slot_count (); i++) {
-            if (signals & (uint64_t (1) << i)) {
-                command_t cmd;
-                while (dispatcher->read (i, get_thread_slot (), &cmd))
-                    cmd.destination->process_command (cmd);
-            }
-        }
+    //  Process all the commands from the signaling source if there is one.
+    if (signal != signaler_t::no_signal) {
+        command_t cmd;
+        while (dispatcher->read (signal, get_thread_slot (), &cmd))
+            cmd.destination->process_command (cmd);
     }
 
     return !terminated;
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
index 2bca757..bca6947 100644
--- a/src/app_thread.hpp
+++ b/src/app_thread.hpp
@@ -34,7 +34,7 @@ namespace zmq
     {
     public:
 
-        app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
+        app_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_);
 
         ~app_thread_t ();
 
diff --git a/src/config.hpp b/src/config.hpp
index 12e29ca..99c9d86 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -37,6 +37,10 @@ namespace zmq
         //  footprint of dispatcher.
         command_pipe_granularity = 4,
 
+        //  Number of signals that can be read by the signaler
+        //  using a single system call.
+        signal_buffer_size = 8,
+
         //  Determines how often does socket poll for new commands when it
         //  still has unprocessed messages to handle. Thus, if it is set to 100,
         //  socket will process 100 inbound messages before doing the poll.
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 1e11619..b1ba11f 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -33,7 +33,7 @@
 #include "windows.h"
 #endif
 
-zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
+zmq::dispatcher_t::dispatcher_t (uint32_t app_threads_, uint32_t io_threads_) :
     sockets (0),
     terminated (false)
 {
@@ -49,7 +49,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
 #endif
 
     //  Create application thread proxies.
-    for (int i = 0; i != app_threads_; i++) {
+    for (uint32_t i = 0; i != app_threads_; i++) {
         app_thread_info_t info;
         info.associated = false;
         info.app_thread = new (std::nothrow) app_thread_t (this, i);
@@ -59,7 +59,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
     }
 
     //  Create I/O thread objects.
-    for (int i = 0; i != io_threads_; i++) {
+    for (uint32_t i = 0; i != io_threads_; i++) {
         io_thread_t *io_thread = new (std::nothrow) io_thread_t (this,
             i + app_threads_);
         zmq_assert (io_thread);
@@ -79,7 +79,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
     zmq_assert (command_pipes);
 
     //  Launch I/O threads.
-    for (int i = 0; i != io_threads_; i++)
+    for (uint32_t i = 0; i != io_threads_; i++)
         io_threads [i]->start ();
 }
 
@@ -136,9 +136,9 @@ zmq::dispatcher_t::~dispatcher_t ()
 #endif
 }
 
-int zmq::dispatcher_t::thread_slot_count ()
+uint32_t zmq::dispatcher_t::thread_slot_count ()
 {
-    return signalers.size ();
+    return (uint32_t) signalers.size ();
 }
 
 zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
@@ -213,7 +213,7 @@ void zmq::dispatcher_t::no_sockets (app_thread_t *thread_)
     app_threads_sync.unlock ();
 }
 
-void zmq::dispatcher_t::write (int source_, int destination_,
+void zmq::dispatcher_t::write (uint32_t source_, uint32_t destination_,
     const command_t &command_)
 {
     command_pipe_t &pipe =
@@ -223,7 +223,7 @@ void zmq::dispatcher_t::write (int source_, int destination_,
         signalers [destination_]->signal (source_);
 }
 
-bool zmq::dispatcher_t::read (int source_,  int destination_,
+bool zmq::dispatcher_t::read (uint32_t source_, uint32_t destination_,
     command_t *command_)
 {
     return command_pipes [source_ * signalers.size () +
diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp
index ff08abc..0a1ed5c 100644
--- a/src/dispatcher.hpp
+++ b/src/dispatcher.hpp
@@ -51,7 +51,7 @@ namespace zmq
         //  Create the dispatcher object. Matrix of pipes to communicate between
         //  each socket and each I/O thread is created along with appropriate
         //  signalers.
-        dispatcher_t (int app_threads_, int io_threads_);
+        dispatcher_t (uint32_t app_threads_, uint32_t io_threads_);
 
         //  This function is called when user invokes zmq_term. If there are
         //  no more sockets open it'll cause all the infrastructure to be shut
@@ -72,14 +72,16 @@ namespace zmq
         //  Returns number of thread slots in the dispatcher. To be used by
         //  individual threads to find out how many distinct signals can be
         //  received.
-        int thread_slot_count ();
+        uint32_t thread_slot_count ();
 
         //  Send command from the source to the destination.
-        void write (int source_, int destination_, const command_t &command_);
+        void write (uint32_t source_, uint32_t destination_,
+            const command_t &command_);
 
         //  Receive command from the source. Returns false if there is no
         //  command available.
-        bool read (int source_,  int destination_, command_t *command_);
+        bool read (uint32_t source_,  uint32_t destination_,
+            command_t *command_);
 
         //  Returns the I/O thread that is the least busy at the moment.
         //  Taskset specifies which I/O threads are eligible (0 = all).
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index 7d997ad..e9f9aa5 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -28,7 +28,8 @@
 #include "command.hpp"
 #include "dispatcher.hpp"
 
-zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
+zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_,
+      uint32_t thread_slot_) :
     object_t (dispatcher_, thread_slot_)
 {
     poller = new (std::nothrow) poller_t;
@@ -66,22 +67,17 @@ int zmq::io_thread_t::get_load ()
 
 void zmq::io_thread_t::in_event ()
 {
-    //  Find out which threads are sending us commands.
-    uint64_t signals = signaler.check ();
-    zmq_assert (signals);
-
-    //  Iterate through all the threads in the process and find out
-    //  which of them sent us commands.
-    int slot_count = thread_slot_count ();
-    for (int source_thread_slot = 0;
-          source_thread_slot != slot_count; source_thread_slot++) {
-        if (signals & (uint64_t (1) << source_thread_slot)) {
-
-            //  Read all the commands from particular thread.
-            command_t cmd;
-            while (dispatcher->read (source_thread_slot, thread_slot, &cmd))
-                cmd.destination->process_command (cmd);
-        }
+    while (true) {
+
+        //  Get the next signal.
+        uint32_t signal = signaler.check ();
+        if (signal == signaler_t::no_signal)
+            break;
+
+        //  Process all the commands from the thread that sent the signal.
+        command_t cmd;
+        while (dispatcher->read (signal, thread_slot, &cmd))
+            cmd.destination->process_command (cmd);
     }
 }
 
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index deb03a1..7e105b3 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -38,7 +38,7 @@ namespace zmq
     {
     public:
 
-        io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
+        io_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_);
 
         //  Clean-up. If the thread was started, it's neccessary to call 'stop'
         //  before invoking destructor. Otherwise the destructor would hang up.
diff --git a/src/object.cpp b/src/object.cpp
index 113a456..38eb1f8 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -28,7 +28,7 @@
 #include "session.hpp"
 #include "socket_base.hpp"
 
-zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) :
+zmq::object_t::object_t (dispatcher_t *dispatcher_, uint32_t thread_slot_) :
     dispatcher (dispatcher_),
     thread_slot (thread_slot_)
 {
@@ -44,12 +44,7 @@ zmq::object_t::~object_t ()
 {
 }
 
-int zmq::object_t::thread_slot_count ()
-{
-    return dispatcher->thread_slot_count ();
-}
-
-int zmq::object_t::get_thread_slot ()
+uint32_t zmq::object_t::get_thread_slot ()
 {
     return thread_slot;
 }
@@ -162,7 +157,7 @@ void zmq::object_t::send_stop ()
 {
     //  'stop' command goes always from administrative thread to
     //  the current object. 
-    int admin_thread_id = dispatcher->thread_slot_count () - 1;
+    uint32_t admin_thread_id = dispatcher->thread_slot_count () - 1;
     command_t cmd;
     cmd.destination = this;
     cmd.type = command_t::stop;
@@ -375,7 +370,7 @@ void zmq::object_t::process_seqnum ()
 
 void zmq::object_t::send_command (command_t &cmd_)
 {
-    int destination_thread_slot = cmd_.destination->get_thread_slot ();
+    uint32_t destination_thread_slot = cmd_.destination->get_thread_slot ();
     dispatcher->write (thread_slot, destination_thread_slot, cmd_);
 }
 
diff --git a/src/object.hpp b/src/object.hpp
index f29342e..b29c6b8 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -32,11 +32,11 @@ namespace zmq
     {
     public:
 
-        object_t (class dispatcher_t *dispatcher_, int thread_slot_);
+        object_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_);
         object_t (object_t *parent_);
         virtual ~object_t ();
 
-        int get_thread_slot ();
+        uint32_t get_thread_slot ();
         dispatcher_t *get_dispatcher ();
         void process_command (struct command_t &cmd_);
 
@@ -52,9 +52,6 @@ namespace zmq
         void unregister_endpoints (class socket_base_t *socket_);
         class socket_base_t *find_endpoint (const char *addr_);
 
-        //  Returns number of thead slots in the dispatcher.
-        int thread_slot_count ();
-
         //  Chooses least loaded I/O thread.
         class io_thread_t *choose_io_thread (uint64_t taskset_);
 
@@ -106,7 +103,7 @@ namespace zmq
         class dispatcher_t *dispatcher;
 
         //  Slot ID of the thread the object belongs to.
-        int thread_slot;
+        uint32_t thread_slot;
 
     private:
 
diff --git a/src/signaler.cpp b/src/signaler.cpp
index 1d3b32f..2d89eca 100644
--- a/src/signaler.cpp
+++ b/src/signaler.cpp
@@ -32,9 +32,60 @@
 #include <fcntl.h>
 #endif
 
+const uint32_t zmq::signaler_t::no_signal = 0xffffffff;
+
+uint32_t zmq::signaler_t::poll ()
+{
+    //  Return next signal.
+    if (current != count) {
+        uint32_t result = buffer [current];
+        current++;
+        return result;
+    }
+
+    //  If there is no signal buffered, poll for new signals.
+    xpoll ();
+
+    //  Return first signal.
+    zmq_assert (current != count);
+    uint32_t result = buffer [current];
+    current++;
+    return result;
+}
+
+uint32_t zmq::signaler_t::check ()
+{
+    //  Return next signal.
+    if (current != count) {
+        uint32_t result = buffer [current];
+        current++;
+        return result;
+    }
+
+    //  If there is no signal buffered, check whether more signals
+    //  can be obtained.
+    xcheck ();
+
+    //  Return first signal if any.
+    if (current != count) {
+        uint32_t result = buffer [current];
+        current++;
+        return result;
+    }
+
+    return no_signal;
+}
+
+zmq::fd_t zmq::signaler_t::get_fd ()
+{
+    return r;
+}
+
 #if defined ZMQ_HAVE_WINDOWS
 
-zmq::signaler_t::signaler_t ()
+zmq::signaler_t::signaler_t () :
+    current (0),
+    count (0)
 {
     //  Windows have no 'socketpair' function. CreatePipe is no good as pipe
     //  handles cannot be polled on. Here we create the socketpair by hand.
@@ -95,18 +146,16 @@ zmq::signaler_t::~signaler_t ()
     wsa_assert (rc != SOCKET_ERROR);
 }
 
-void zmq::signaler_t::signal (int signal_)
+void zmq::signaler_t::signal (uint32_t signal_)
 {
     //  TODO: Note that send is a blocking operation.
     //  How should we behave if the signal cannot be written to the signaler?
-
-    zmq_assert (signal_ >= 0 && signal_ < 64);
-    char c = (char) signal_;
-    int rc = send (w, &c, 1, 0);
+    int rc = send (w, &signal_, sizeof (signal_), 0);
     win_assert (rc != SOCKET_ERROR);
+    zmq_assert (rc == sizeof (signal_));
 }
 
-uint64_t zmq::signaler_t::poll ()
+void zmq::signaler_t::xpoll ()
 {
     //  Switch to blocking mode.
     unsigned long argp = 0;
@@ -115,8 +164,8 @@ uint64_t zmq::signaler_t::poll ()
 
     //  Get the signals. Given that we are in the blocking mode now,
     //  there should be at least a single signal returned.
-    uint64_t signals = check ();
-    zmq_assert (signals);
+    xcheck ();
+    zmq_assert (current != count);
 
     //  Switch back to non-blocking mode.
     argp = 1;
@@ -126,25 +175,24 @@ uint64_t zmq::signaler_t::poll ()
     return signals;
 }
 
-uint64_t zmq::signaler_t::check ()
+void zmq::signaler_t::xcheck ()
 {
-    unsigned char buffer [32];
-    int nbytes = recv (r, (char*) buffer, 32, 0);
-    if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK)
-        return 0;
-    wsa_assert (nbytes != -1);
+    int nbytes = recv (r, (char*) buffer, sizeof (buffer), 0);
 
-    uint64_t signals = 0;
-    for (int pos = 0; pos != nbytes; pos++) {
-        zmq_assert (buffer [pos] < 64);
-        signals |= (uint64_t (1) << (buffer [pos]));
+    //  No signals are available.
+    if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) {
+        current = 0;
+        count = 0;
+        return;
     }
-    return signals;
-}
 
-zmq::fd_t zmq::signaler_t::get_fd ()
-{
-    return r;
+    wsa_assert (nbytes != -1);
+
+    //  Check whether we haven't got half of a signal.
+    zmq_assert (nbytes % sizeof (uint32_t) == 0);
+
+    current = 0;
+    count = nbytes / sizeof (uint32_t);
 }
 
 #elif defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX
@@ -152,7 +200,9 @@ zmq::fd_t zmq::signaler_t::get_fd ()
 #include <sys/types.h>
 #include <sys/socket.h>
 
-zmq::signaler_t::signaler_t ()
+zmq::signaler_t::signaler_t () :
+    current (0),
+    count (0)
 {
     int sv [2];
     int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
@@ -174,15 +224,14 @@ zmq::signaler_t::~signaler_t ()
     close (r);
 }
 
-void zmq::signaler_t::signal (int signal_)
+void zmq::signaler_t::signal (uint32_t signal_)
 {
-    zmq_assert (signal_ >= 0 && signal_ < 64);
-    unsigned char c = (unsigned char) signal_;
-    ssize_t nbytes = send (w, &c, 1, 0);
-    errno_assert (nbytes == 1);
+    ssize_t nbytes = send (w, &signal_, sizeof (signal_), 0);
+    errno_assert (nbytes != -1);
+    zmq_assert (nbytes == sizeof (signal_);
 }
 
-uint64_t zmq::signaler_t::poll ()
+void zmq::signaler_t::xpoll ()
 {
     //  Set the reader to blocking mode.
     int flags = fcntl (r, F_GETFL, 0);
@@ -192,7 +241,8 @@ uint64_t zmq::signaler_t::poll ()
     errno_assert (rc != -1);
 
     //  Poll for events.
-    uint64_t signals = check ();
+    xcheck ();
+    zmq_assert (current != count);
 
     //  Set the reader to non-blocking mode.
     flags = fcntl (r, F_GETFL, 0);
@@ -200,29 +250,23 @@ uint64_t zmq::signaler_t::poll ()
         flags = 0;
     rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
     errno_assert (rc != -1);
-
-    return signals;
 }
 
-uint64_t zmq::signaler_t::check ()
+void zmq::signaler_t::xcheck ()
 {
-    unsigned char buffer [64];
-    ssize_t nbytes = recv (r, buffer, 64, 0);
-    if (nbytes == -1 && errno == EAGAIN)
-        return 0;
+    ssize_t nbytes = recv (r, buffer, sizeof (buffer), 0);
+    if (nbytes == -1 && errno == EAGAIN) {
+        current = 0;
+        count = 0;
+        return;
+    }
     zmq_assert (nbytes != -1);
 
-    uint64_t signals = 0;
-    for (int pos = 0; pos != nbytes; pos ++) {
-        zmq_assert (buffer [pos] < 64);
-        signals |= (uint64_t (1) << (buffer [pos]));
-    }
-    return signals;
-}
+    //  Check whether we haven't got half of a signal.
+    zmq_assert (nbytes % sizeof (uint32_t) == 0);
 
-zmq::fd_t zmq::signaler_t::get_fd ()
-{
-    return r;
+    current = 0;
+    count = nbytes / sizeof (uint32_t);
 }
 
 #else
@@ -230,7 +274,9 @@ zmq::fd_t zmq::signaler_t::get_fd ()
 #include <sys/types.h>
 #include <sys/socket.h>
 
-zmq::signaler_t::signaler_t ()
+zmq::signaler_t::signaler_t () :
+    current (0),
+    count (0)
 {
     int sv [2];
     int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
@@ -245,50 +291,42 @@ zmq::signaler_t::~signaler_t ()
     close (r);
 }
 
-void zmq::signaler_t::signal (int signal_)
+void zmq::signaler_t::signal (uint32_t signal_)
 {
     //  TODO: Note that send is a blocking operation.
     //  How should we behave if the signal cannot be written to the signaler?
-
-    zmq_assert (signal_ >= 0 && signal_ < 64);
-    unsigned char c = (unsigned char) signal_;
-    ssize_t nbytes = send (w, &c, 1, 0);
-    errno_assert (nbytes == 1);
+    ssize_t nbytes = send (w, &signal_, sizeof (signal_), 0);
+    errno_assert (nbytes != -1);
+    zmq_assert (nbytes == sizeof (signal_));
 }
 
-uint64_t zmq::signaler_t::poll ()
+void zmq::signaler_t::xpoll ()
 {
-    unsigned char buffer [64];
-    ssize_t nbytes = recv (r, buffer, 64, 0);
-    zmq_assert (nbytes != -1);
+    ssize_t nbytes = recv (r, buffer, sizeof (buffer), 0);
+    errno_assert (nbytes != -1);
+    
+    //  Check whether we haven't got half of a signal.
+    zmq_assert (nbytes % sizeof (uint32_t) == 0);
 
-    uint64_t signals = 0;
-    for (int pos = 0; pos != nbytes; pos ++) {
-        zmq_assert (buffer [pos] < 64);
-        signals |= (uint64_t (1) << (buffer [pos]));
-    }
-    return signals;
+    current = 0;
+    count = nbytes / sizeof (uint32_t);
 }
 
-uint64_t zmq::signaler_t::check ()
+void zmq::signaler_t::xcheck ()
 {
-    unsigned char buffer [64];
     ssize_t nbytes = recv (r, buffer, 64, MSG_DONTWAIT);
-    if (nbytes == -1 && errno == EAGAIN)
-        return 0;
-    zmq_assert (nbytes != -1);
-
-    uint64_t signals = 0;
-    for (int pos = 0; pos != nbytes; pos ++) {
-        zmq_assert (buffer [pos] < 64);
-        signals |= (uint64_t (1) << (buffer [pos]));
+    if (nbytes == -1 && errno == EAGAIN) {
+        current = 0;
+        count = 0;
+        return;
     }
-    return signals;
-}
+    errno_assert (nbytes != -1);
 
-zmq::fd_t zmq::signaler_t::get_fd ()
-{
-    return r;
+    //  Check whether we haven't got half of a signal.
+    zmq_assert (nbytes % sizeof (uint32_t) == 0);
+
+    current = 0;
+    count = nbytes / sizeof (uint32_t);
 }
 
 #endif
diff --git a/src/signaler.hpp b/src/signaler.hpp
index f239771..5509894 100644
--- a/src/signaler.hpp
+++ b/src/signaler.hpp
@@ -20,9 +20,12 @@
 #ifndef __ZMQ_SIGNALER_HPP_INCLUDED__
 #define __ZMQ_SIGNALER_HPP_INCLUDED__
 
+#include <stddef.h>
+
 #include "platform.hpp"
 #include "fd.hpp"
 #include "stdint.hpp"
+#include "config.hpp"
 
 namespace zmq
 {
@@ -39,14 +42,18 @@ namespace zmq
         signaler_t ();
         ~signaler_t ();
 
-        //  i_signaler interface implementation.
-        void signal (int signal_);
-        uint64_t poll ();
-        uint64_t check ();
+        static const uint32_t no_signal;
+
+        void signal (uint32_t signal_);
+        uint32_t poll ();
+        uint32_t check ();
         fd_t get_fd ();
 
     private:
 
+         void xpoll ();
+         void xcheck ();
+
 #if defined ZMQ_HAVE_OPENVMS
 
          //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.
@@ -64,6 +71,15 @@ namespace zmq
         fd_t w;
         fd_t r;
 
+        //  Signal buffer.
+        uint32_t buffer [signal_buffer_size];
+
+        //  Position of the next signal in the buffer to return to the user.
+        size_t current;
+
+        //  Number of signals in the signal buffer.
+        size_t count;
+
         //  Disable copying of fd_signeler object.
         signaler_t (const signaler_t&);
         void operator = (const signaler_t&);
diff --git a/src/zmq.cpp b/src/zmq.cpp
index c05af25..d087d53 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -262,7 +262,7 @@ void *zmq_init (int app_threads_, int io_threads_, int flags_)
 
     //  Create 0MQ context.
     zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t (
-        app_threads_, io_threads_);
+        (uint32_t) app_threads_, (uint32_t) io_threads_);
     zmq_assert (dispatcher);
     return (void*) dispatcher;
 }
-- 
cgit v1.2.3