From 921da22147e201455837bcd38df1af33aceff26f Mon Sep 17 00:00:00 2001
From: Martin Sustrik <sustrik@250bpm.com>
Date: Tue, 13 Mar 2012 11:10:33 +0100
Subject: io_threads argument removed from xs_init()

The argument was changed to a context option (XS_IO_THREADS).
0MQ compatibility mode sets the option and ensures that
there's at least one I/O thread present.

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
---
 src/ctx.cpp   | 20 +++++++++++++++-----
 src/ctx.hpp   |  8 +++-----
 src/xs.cpp    | 10 ++--------
 src/xszmq.cpp | 17 ++++++++++++++++-
 4 files changed, 36 insertions(+), 19 deletions(-)

(limited to 'src')

diff --git a/src/ctx.cpp b/src/ctx.cpp
index d59e90b..df7f564 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -37,7 +37,7 @@
 #include "err.hpp"
 #include "msg.hpp"
 
-xs::ctx_t::ctx_t (uint32_t io_threads_) :
+xs::ctx_t::ctx_t () :
     tag (0xbadcafe0),
     starting (true),
     terminating (false),
@@ -47,7 +47,7 @@ xs::ctx_t::ctx_t (uint32_t io_threads_) :
     monitor (NULL),
     log_socket (NULL),
     max_sockets (512),
-    io_thread_count (io_threads_)
+    io_thread_count (1)
 {
 }
 
@@ -153,6 +153,15 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_)
         max_sockets = *((int*) optval_);
         opt_sync.unlock ();
         break;
+    case XS_IO_THREADS:
+        if (optvallen_ != sizeof (int) || *((int*) optval_) < 1) {
+            errno = EINVAL;
+            return -1;
+        }
+        opt_sync.lock ();
+        io_thread_count = *((int*) optval_);
+        opt_sync.unlock ();
+        break;
     default:
         errno = EINVAL;
         return -1;
@@ -170,8 +179,9 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_)
         //  xs_term thread and reaper thread.
         opt_sync.lock ();
         int maxs = max_sockets;
+        int ios = io_thread_count;
         opt_sync.unlock ();
-        slot_count = maxs + io_thread_count + 3;
+        slot_count = maxs + ios + 3;
         slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
         alloc_assert (slots);
 
@@ -185,7 +195,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_)
         reaper->start ();
 
         //  Create I/O thread objects and launch them.
-        for (uint32_t i = 2; i != io_thread_count + 2; i++) {
+        for (int i = 2; i != ios + 2; i++) {
             io_thread_t *io_thread = io_thread_t::create (this, i);
             errno_assert (io_thread);
             io_threads.push_back (io_thread);
@@ -195,7 +205,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_)
 
         //  In the unused part of the slot array, create a list of empty slots.
         for (int32_t i = (int32_t) slot_count - 1;
-              i >= (int32_t) io_thread_count + 2; i--) {
+              i >= (int32_t) ios + 2; i--) {
             empty_slots.push_back (i);
             slots [i] = NULL;
         }
diff --git a/src/ctx.hpp b/src/ctx.hpp
index df315e4..b83fa1f 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -31,7 +31,6 @@
 #include "array.hpp"
 #include "config.hpp"
 #include "mutex.hpp"
-#include "stdint.hpp"
 #include "options.hpp"
 #include "atomic_counter.hpp"
 
@@ -60,9 +59,8 @@ namespace xs
     {
     public:
 
-        //  Create the context object. The argument specifies the size
-        //  of I/O thread pool to create.
-        ctx_t (uint32_t io_threads_);
+        //  Create the context object.
+        ctx_t ();
 
         //  Returns false if object is not a context.
         bool check_tag ();
@@ -171,7 +169,7 @@ namespace xs
         int max_sockets;
 
         //  Number of I/O threads to launch.
-        uint32_t io_thread_count;
+        int io_thread_count;
 
         //  Synchronisation of access to context options.
         mutex_t opt_sync;
diff --git a/src/xs.cpp b/src/xs.cpp
index 3ee62aa..b90e383 100644
--- a/src/xs.cpp
+++ b/src/xs.cpp
@@ -63,14 +63,8 @@ const char *xs_strerror (int errnum_)
     return xs::errno_to_string (errnum_);
 }
 
-void *xs_init (int io_threads_)
+void *xs_init ()
 {
-    //  We need at least one I/O thread to run the monitor object in.
-    if (io_threads_ < 1) {
-        errno = EINVAL;
-        return NULL;
-    }
-
 #if defined XS_HAVE_OPENPGM
 
     //  Init PGM transport. Ensure threading and timer are enabled. Find PGM
@@ -111,7 +105,7 @@ void *xs_init (int io_threads_)
 #endif
 
     //  Create the context.
-    xs::ctx_t *ctx = new (std::nothrow) xs::ctx_t ((uint32_t) io_threads_);
+    xs::ctx_t *ctx = new (std::nothrow) xs::ctx_t;
     alloc_assert (ctx);
     return (void*) ctx;
 }
diff --git a/src/xszmq.cpp b/src/xszmq.cpp
index d97d9ea..6b3930c 100644
--- a/src/xszmq.cpp
+++ b/src/xszmq.cpp
@@ -112,7 +112,22 @@ size_t zmq_msg_size (zmq_msg_t *msg)
 
 void *zmq_init (int io_threads)
 {
-    return xs_init (io_threads);
+    void *ctx = xs_init ();
+    if (!ctx)
+        return NULL;
+
+    //  Crossroads don't allow for zero I/O threads.
+    if (io_threads < 1)
+        io_threads = 1;
+
+    int rc = xs_setctxopt (ctx, XS_IO_THREADS, &io_threads,
+        sizeof (io_threads));
+    if (rc != 0) {
+        xs_term (ctx);
+        return NULL;
+    }
+
+    return ctx;
 }
 
 int zmq_term (void *context)
-- 
cgit v1.2.3