diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ctx.cpp | 20 | ||||
-rw-r--r-- | src/ctx.hpp | 8 | ||||
-rw-r--r-- | src/xs.cpp | 10 | ||||
-rw-r--r-- | src/xszmq.cpp | 17 |
4 files changed, 36 insertions, 19 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp index d59e90b..df7f564 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -37,7 +37,7 @@ #include "err.hpp" #include "msg.hpp" -xs::ctx_t::ctx_t (uint32_t io_threads_) : +xs::ctx_t::ctx_t () : tag (0xbadcafe0), starting (true), terminating (false), @@ -47,7 +47,7 @@ xs::ctx_t::ctx_t (uint32_t io_threads_) : monitor (NULL), log_socket (NULL), max_sockets (512), - io_thread_count (io_threads_) + io_thread_count (1) { } @@ -153,6 +153,15 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) max_sockets = *((int*) optval_); opt_sync.unlock (); break; + case XS_IO_THREADS: + if (optvallen_ != sizeof (int) || *((int*) optval_) < 1) { + errno = EINVAL; + return -1; + } + opt_sync.lock (); + io_thread_count = *((int*) optval_); + opt_sync.unlock (); + break; default: errno = EINVAL; return -1; @@ -170,8 +179,9 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_) // xs_term thread and reaper thread. opt_sync.lock (); int maxs = max_sockets; + int ios = io_thread_count; opt_sync.unlock (); - slot_count = maxs + io_thread_count + 3; + slot_count = maxs + ios + 3; slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); alloc_assert (slots); @@ -185,7 +195,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_) reaper->start (); // Create I/O thread objects and launch them. - for (uint32_t i = 2; i != io_thread_count + 2; i++) { + for (int i = 2; i != ios + 2; i++) { io_thread_t *io_thread = io_thread_t::create (this, i); errno_assert (io_thread); io_threads.push_back (io_thread); @@ -195,7 +205,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_) // In the unused part of the slot array, create a list of empty slots. for (int32_t i = (int32_t) slot_count - 1; - i >= (int32_t) io_thread_count + 2; i--) { + i >= (int32_t) ios + 2; i--) { empty_slots.push_back (i); slots [i] = NULL; } diff --git a/src/ctx.hpp b/src/ctx.hpp index df315e4..b83fa1f 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -31,7 +31,6 @@ #include "array.hpp" #include "config.hpp" #include "mutex.hpp" -#include "stdint.hpp" #include "options.hpp" #include "atomic_counter.hpp" @@ -60,9 +59,8 @@ namespace xs { public: - // Create the context object. The argument specifies the size - // of I/O thread pool to create. - ctx_t (uint32_t io_threads_); + // Create the context object. + ctx_t (); // Returns false if object is not a context. bool check_tag (); @@ -171,7 +169,7 @@ namespace xs int max_sockets; // Number of I/O threads to launch. - uint32_t io_thread_count; + int io_thread_count; // Synchronisation of access to context options. mutex_t opt_sync; @@ -63,14 +63,8 @@ const char *xs_strerror (int errnum_) return xs::errno_to_string (errnum_); } -void *xs_init (int io_threads_) +void *xs_init () { - // We need at least one I/O thread to run the monitor object in. - if (io_threads_ < 1) { - errno = EINVAL; - return NULL; - } - #if defined XS_HAVE_OPENPGM // Init PGM transport. Ensure threading and timer are enabled. Find PGM @@ -111,7 +105,7 @@ void *xs_init (int io_threads_) #endif // Create the context. - xs::ctx_t *ctx = new (std::nothrow) xs::ctx_t ((uint32_t) io_threads_); + xs::ctx_t *ctx = new (std::nothrow) xs::ctx_t; alloc_assert (ctx); return (void*) ctx; } diff --git a/src/xszmq.cpp b/src/xszmq.cpp index d97d9ea..6b3930c 100644 --- a/src/xszmq.cpp +++ b/src/xszmq.cpp @@ -112,7 +112,22 @@ size_t zmq_msg_size (zmq_msg_t *msg) void *zmq_init (int io_threads) { - return xs_init (io_threads); + void *ctx = xs_init (); + if (!ctx) + return NULL; + + // Crossroads don't allow for zero I/O threads. + if (io_threads < 1) + io_threads = 1; + + int rc = xs_setctxopt (ctx, XS_IO_THREADS, &io_threads, + sizeof (io_threads)); + if (rc != 0) { + xs_term (ctx); + return NULL; + } + + return ctx; } int zmq_term (void *context) |