summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ctx.cpp20
-rw-r--r--src/ctx.hpp8
-rw-r--r--src/xs.cpp10
-rw-r--r--src/xszmq.cpp17
4 files changed, 36 insertions, 19 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp
index d59e90b..df7f564 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -37,7 +37,7 @@
#include "err.hpp"
#include "msg.hpp"
-xs::ctx_t::ctx_t (uint32_t io_threads_) :
+xs::ctx_t::ctx_t () :
tag (0xbadcafe0),
starting (true),
terminating (false),
@@ -47,7 +47,7 @@ xs::ctx_t::ctx_t (uint32_t io_threads_) :
monitor (NULL),
log_socket (NULL),
max_sockets (512),
- io_thread_count (io_threads_)
+ io_thread_count (1)
{
}
@@ -153,6 +153,15 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_)
max_sockets = *((int*) optval_);
opt_sync.unlock ();
break;
+ case XS_IO_THREADS:
+ if (optvallen_ != sizeof (int) || *((int*) optval_) < 1) {
+ errno = EINVAL;
+ return -1;
+ }
+ opt_sync.lock ();
+ io_thread_count = *((int*) optval_);
+ opt_sync.unlock ();
+ break;
default:
errno = EINVAL;
return -1;
@@ -170,8 +179,9 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_)
// xs_term thread and reaper thread.
opt_sync.lock ();
int maxs = max_sockets;
+ int ios = io_thread_count;
opt_sync.unlock ();
- slot_count = maxs + io_thread_count + 3;
+ slot_count = maxs + ios + 3;
slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
alloc_assert (slots);
@@ -185,7 +195,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_)
reaper->start ();
// Create I/O thread objects and launch them.
- for (uint32_t i = 2; i != io_thread_count + 2; i++) {
+ for (int i = 2; i != ios + 2; i++) {
io_thread_t *io_thread = io_thread_t::create (this, i);
errno_assert (io_thread);
io_threads.push_back (io_thread);
@@ -195,7 +205,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_)
// In the unused part of the slot array, create a list of empty slots.
for (int32_t i = (int32_t) slot_count - 1;
- i >= (int32_t) io_thread_count + 2; i--) {
+ i >= (int32_t) ios + 2; i--) {
empty_slots.push_back (i);
slots [i] = NULL;
}
diff --git a/src/ctx.hpp b/src/ctx.hpp
index df315e4..b83fa1f 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -31,7 +31,6 @@
#include "array.hpp"
#include "config.hpp"
#include "mutex.hpp"
-#include "stdint.hpp"
#include "options.hpp"
#include "atomic_counter.hpp"
@@ -60,9 +59,8 @@ namespace xs
{
public:
- // Create the context object. The argument specifies the size
- // of I/O thread pool to create.
- ctx_t (uint32_t io_threads_);
+ // Create the context object.
+ ctx_t ();
// Returns false if object is not a context.
bool check_tag ();
@@ -171,7 +169,7 @@ namespace xs
int max_sockets;
// Number of I/O threads to launch.
- uint32_t io_thread_count;
+ int io_thread_count;
// Synchronisation of access to context options.
mutex_t opt_sync;
diff --git a/src/xs.cpp b/src/xs.cpp
index 3ee62aa..b90e383 100644
--- a/src/xs.cpp
+++ b/src/xs.cpp
@@ -63,14 +63,8 @@ const char *xs_strerror (int errnum_)
return xs::errno_to_string (errnum_);
}
-void *xs_init (int io_threads_)
+void *xs_init ()
{
- // We need at least one I/O thread to run the monitor object in.
- if (io_threads_ < 1) {
- errno = EINVAL;
- return NULL;
- }
-
#if defined XS_HAVE_OPENPGM
// Init PGM transport. Ensure threading and timer are enabled. Find PGM
@@ -111,7 +105,7 @@ void *xs_init (int io_threads_)
#endif
// Create the context.
- xs::ctx_t *ctx = new (std::nothrow) xs::ctx_t ((uint32_t) io_threads_);
+ xs::ctx_t *ctx = new (std::nothrow) xs::ctx_t;
alloc_assert (ctx);
return (void*) ctx;
}
diff --git a/src/xszmq.cpp b/src/xszmq.cpp
index d97d9ea..6b3930c 100644
--- a/src/xszmq.cpp
+++ b/src/xszmq.cpp
@@ -112,7 +112,22 @@ size_t zmq_msg_size (zmq_msg_t *msg)
void *zmq_init (int io_threads)
{
- return xs_init (io_threads);
+ void *ctx = xs_init ();
+ if (!ctx)
+ return NULL;
+
+ // Crossroads don't allow for zero I/O threads.
+ if (io_threads < 1)
+ io_threads = 1;
+
+ int rc = xs_setctxopt (ctx, XS_IO_THREADS, &io_threads,
+ sizeof (io_threads));
+ if (rc != 0) {
+ xs_term (ctx);
+ return NULL;
+ }
+
+ return ctx;
}
int zmq_term (void *context)