summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/xs_init.txt9
-rw-r--r--doc/xs_setctxopt.txt10
-rw-r--r--include/xs.h3
-rw-r--r--perf/inproc_lat.cpp2
-rw-r--r--perf/inproc_thr.cpp2
-rw-r--r--perf/local_lat.cpp2
-rw-r--r--perf/local_thr.cpp2
-rw-r--r--perf/remote_lat.cpp2
-rw-r--r--perf/remote_thr.cpp2
-rw-r--r--src/ctx.cpp20
-rw-r--r--src/ctx.hpp8
-rw-r--r--src/xs.cpp10
-rw-r--r--src/xszmq.cpp17
-rw-r--r--tests/emptyctx.cpp2
-rw-r--r--tests/hwm.cpp2
-rw-r--r--tests/invalid_rep.cpp2
-rw-r--r--tests/linger.cpp2
-rw-r--r--tests/max_sockets.cpp2
-rw-r--r--tests/msg_flags.cpp2
-rw-r--r--tests/pair_inproc.cpp2
-rw-r--r--tests/pair_ipc.cpp2
-rw-r--r--tests/pair_tcp.cpp2
-rw-r--r--tests/polltimeo.cpp2
-rw-r--r--tests/reconnect.cpp2
-rw-r--r--tests/reqrep_device.cpp2
-rw-r--r--tests/reqrep_inproc.cpp2
-rw-r--r--tests/reqrep_ipc.cpp2
-rw-r--r--tests/reqrep_tcp.cpp2
-rw-r--r--tests/shutdown_stress.cpp7
-rw-r--r--tests/sub_forward.cpp2
-rw-r--r--tests/timeo.cpp2
31 files changed, 79 insertions, 51 deletions
diff --git a/doc/xs_init.txt b/doc/xs_init.txt
index e642824..373822a 100644
--- a/doc/xs_init.txt
+++ b/doc/xs_init.txt
@@ -9,17 +9,13 @@ xs_init - initialise Crossroads context
SYNOPSIS
--------
-*void *xs_init (int 'io_threads');*
+*void *xs_init ();*
DESCRIPTION
-----------
The _xs_init()_ function initialises a Crossroads 'context'.
-The 'io_threads' argument specifies the size of the thread pool to handle
-I/O operations. If your application is using only the 'inproc' transport for
-messaging you may set this to zero, otherwise set it to at least one.
-
.Thread safety
A 'context' is thread safe and may be shared among as many application
threads as necessary, without any additional locking required on the part of
@@ -35,8 +31,7 @@ of the values defined below.
ERRORS
------
-*EINVAL*::
-An invalid number of 'io_threads' was requested.
+No error values are defined.
SEE ALSO
diff --git a/doc/xs_setctxopt.txt b/doc/xs_setctxopt.txt
index dded922..d26d319 100644
--- a/doc/xs_setctxopt.txt
+++ b/doc/xs_setctxopt.txt
@@ -34,6 +34,16 @@ Option value type:: int
Option value unit:: sockets
Default value:: 512
+XS_IO_THREADS: Set number of worker threads
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'XS_IO_THREADS' option shall set specify the size of the thread pool to
+handle I/O operations. The value should be at least 1.
+
+[horizontal]
+Option value type:: int
+Option value unit:: threads
+Default value:: 1
+
RETURN VALUE
------------
The _xs_setctxopt()_ function shall return zero if successful. Otherwise it
diff --git a/include/xs.h b/include/xs.h
index 0b830fb..a339bb9 100644
--- a/include/xs.h
+++ b/include/xs.h
@@ -147,8 +147,9 @@ XS_EXPORT int xs_getmsgopt (xs_msg_t *msg, int option, void *optval,
/******************************************************************************/
#define XS_MAX_SOCKETS 1
+#define XS_IO_THREADS 2
-XS_EXPORT void *xs_init (int io_threads);
+XS_EXPORT void *xs_init ();
XS_EXPORT int xs_term (void *context);
XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval,
size_t optvallen);
diff --git a/perf/inproc_lat.cpp b/perf/inproc_lat.cpp
index 3c8334f..802c71a 100644
--- a/perf/inproc_lat.cpp
+++ b/perf/inproc_lat.cpp
@@ -123,7 +123,7 @@ int main (int argc, char *argv [])
message_size = atoi (argv [1]);
roundtrip_count = atoi (argv [2]);
- ctx = xs_init (1);
+ ctx = xs_init ();
if (!ctx) {
printf ("error in xs_init: %s\n", xs_strerror (errno));
return -1;
diff --git a/perf/inproc_thr.cpp b/perf/inproc_thr.cpp
index f3f07ad..de4b9ad 100644
--- a/perf/inproc_thr.cpp
+++ b/perf/inproc_thr.cpp
@@ -122,7 +122,7 @@ int main (int argc, char *argv [])
message_size = atoi (argv [1]);
message_count = atoi (argv [2]);
- ctx = xs_init (1);
+ ctx = xs_init ();
if (!ctx) {
printf ("error in xs_init: %s\n", xs_strerror (errno));
return -1;
diff --git a/perf/local_lat.cpp b/perf/local_lat.cpp
index f1f4881..ca6a1e3 100644
--- a/perf/local_lat.cpp
+++ b/perf/local_lat.cpp
@@ -44,7 +44,7 @@ int main (int argc, char *argv [])
message_size = atoi (argv [2]);
roundtrip_count = atoi (argv [3]);
- ctx = xs_init (1);
+ ctx = xs_init ();
if (!ctx) {
printf ("error in xs_init: %s\n", xs_strerror (errno));
return -1;
diff --git a/perf/local_thr.cpp b/perf/local_thr.cpp
index dd6057e..78c8930 100644
--- a/perf/local_thr.cpp
+++ b/perf/local_thr.cpp
@@ -47,7 +47,7 @@ int main (int argc, char *argv [])
message_size = atoi (argv [2]);
message_count = atoi (argv [3]);
- ctx = xs_init (1);
+ ctx = xs_init ();
if (!ctx) {
printf ("error in xs_init: %s\n", xs_strerror (errno));
return -1;
diff --git a/perf/remote_lat.cpp b/perf/remote_lat.cpp
index 7bb0c19..03efe73 100644
--- a/perf/remote_lat.cpp
+++ b/perf/remote_lat.cpp
@@ -48,7 +48,7 @@ int main (int argc, char *argv [])
message_size = atoi (argv [2]);
roundtrip_count = atoi (argv [3]);
- ctx = xs_init (1);
+ ctx = xs_init ();
if (!ctx) {
printf ("error in xs_init: %s\n", xs_strerror (errno));
return -1;
diff --git a/perf/remote_thr.cpp b/perf/remote_thr.cpp
index 00d5591..f78958c 100644
--- a/perf/remote_thr.cpp
+++ b/perf/remote_thr.cpp
@@ -45,7 +45,7 @@ int main (int argc, char *argv [])
message_size = atoi (argv [2]);
message_count = atoi (argv [3]);
- ctx = xs_init (1);
+ ctx = xs_init ();
if (!ctx) {
printf ("error in xs_init: %s\n", xs_strerror (errno));
return -1;
diff --git a/src/ctx.cpp b/src/ctx.cpp
index d59e90b..df7f564 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -37,7 +37,7 @@
#include "err.hpp"
#include "msg.hpp"
-xs::ctx_t::ctx_t (uint32_t io_threads_) :
+xs::ctx_t::ctx_t () :
tag (0xbadcafe0),
starting (true),
terminating (false),
@@ -47,7 +47,7 @@ xs::ctx_t::ctx_t (uint32_t io_threads_) :
monitor (NULL),
log_socket (NULL),
max_sockets (512),
- io_thread_count (io_threads_)
+ io_thread_count (1)
{
}
@@ -153,6 +153,15 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_)
max_sockets = *((int*) optval_);
opt_sync.unlock ();
break;
+ case XS_IO_THREADS:
+ if (optvallen_ != sizeof (int) || *((int*) optval_) < 1) {
+ errno = EINVAL;
+ return -1;
+ }
+ opt_sync.lock ();
+ io_thread_count = *((int*) optval_);
+ opt_sync.unlock ();
+ break;
default:
errno = EINVAL;
return -1;
@@ -170,8 +179,9 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_)
// xs_term thread and reaper thread.
opt_sync.lock ();
int maxs = max_sockets;
+ int ios = io_thread_count;
opt_sync.unlock ();
- slot_count = maxs + io_thread_count + 3;
+ slot_count = maxs + ios + 3;
slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
alloc_assert (slots);
@@ -185,7 +195,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_)
reaper->start ();
// Create I/O thread objects and launch them.
- for (uint32_t i = 2; i != io_thread_count + 2; i++) {
+ for (int i = 2; i != ios + 2; i++) {
io_thread_t *io_thread = io_thread_t::create (this, i);
errno_assert (io_thread);
io_threads.push_back (io_thread);
@@ -195,7 +205,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_)
// In the unused part of the slot array, create a list of empty slots.
for (int32_t i = (int32_t) slot_count - 1;
- i >= (int32_t) io_thread_count + 2; i--) {
+ i >= (int32_t) ios + 2; i--) {
empty_slots.push_back (i);
slots [i] = NULL;
}
diff --git a/src/ctx.hpp b/src/ctx.hpp
index df315e4..b83fa1f 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -31,7 +31,6 @@
#include "array.hpp"
#include "config.hpp"
#include "mutex.hpp"
-#include "stdint.hpp"
#include "options.hpp"
#include "atomic_counter.hpp"
@@ -60,9 +59,8 @@ namespace xs
{
public:
- // Create the context object. The argument specifies the size
- // of I/O thread pool to create.
- ctx_t (uint32_t io_threads_);
+ // Create the context object.
+ ctx_t ();
// Returns false if object is not a context.
bool check_tag ();
@@ -171,7 +169,7 @@ namespace xs
int max_sockets;
// Number of I/O threads to launch.
- uint32_t io_thread_count;
+ int io_thread_count;
// Synchronisation of access to context options.
mutex_t opt_sync;
diff --git a/src/xs.cpp b/src/xs.cpp
index 3ee62aa..b90e383 100644
--- a/src/xs.cpp
+++ b/src/xs.cpp
@@ -63,14 +63,8 @@ const char *xs_strerror (int errnum_)
return xs::errno_to_string (errnum_);
}
-void *xs_init (int io_threads_)
+void *xs_init ()
{
- // We need at least one I/O thread to run the monitor object in.
- if (io_threads_ < 1) {
- errno = EINVAL;
- return NULL;
- }
-
#if defined XS_HAVE_OPENPGM
// Init PGM transport. Ensure threading and timer are enabled. Find PGM
@@ -111,7 +105,7 @@ void *xs_init (int io_threads_)
#endif
// Create the context.
- xs::ctx_t *ctx = new (std::nothrow) xs::ctx_t ((uint32_t) io_threads_);
+ xs::ctx_t *ctx = new (std::nothrow) xs::ctx_t;
alloc_assert (ctx);
return (void*) ctx;
}
diff --git a/src/xszmq.cpp b/src/xszmq.cpp
index d97d9ea..6b3930c 100644
--- a/src/xszmq.cpp
+++ b/src/xszmq.cpp
@@ -112,7 +112,22 @@ size_t zmq_msg_size (zmq_msg_t *msg)
void *zmq_init (int io_threads)
{
- return xs_init (io_threads);
+ void *ctx = xs_init ();
+ if (!ctx)
+ return NULL;
+
+ // Crossroads don't allow for zero I/O threads.
+ if (io_threads < 1)
+ io_threads = 1;
+
+ int rc = xs_setctxopt (ctx, XS_IO_THREADS, &io_threads,
+ sizeof (io_threads));
+ if (rc != 0) {
+ xs_term (ctx);
+ return NULL;
+ }
+
+ return ctx;
}
int zmq_term (void *context)
diff --git a/tests/emptyctx.cpp b/tests/emptyctx.cpp
index 4208a5a..1448254 100644
--- a/tests/emptyctx.cpp
+++ b/tests/emptyctx.cpp
@@ -26,7 +26,7 @@ int XS_TEST_MAIN ()
// This is a very simple test to check whether everything works OK when
// context is terminated even before I/O threads were launched.
- void *ctx = xs_init (1);
+ void *ctx = xs_init ();
assert (ctx);
int rc = xs_term (ctx);
assert (rc == 0);
diff --git a/tests/hwm.cpp b/tests/hwm.cpp
index e3faa6c..f93dcd9 100644
--- a/tests/hwm.cpp
+++ b/tests/hwm.cpp
@@ -24,7 +24,7 @@ int XS_TEST_MAIN ()
{
fprintf (stderr, "hwm test running...\n");
- void *ctx = xs_init (1);
+ void *ctx = xs_init ();
assert (ctx);
// Create pair of socket, each with high watermark of 2. Thus the total
diff --git a/tests/invalid_rep.cpp b/tests/invalid_rep.cpp
index b2cb976..cd0ad8e 100644
--- a/tests/invalid_rep.cpp
+++ b/tests/invalid_rep.cpp
@@ -26,7 +26,7 @@ int XS_TEST_MAIN ()
fprintf (stderr, "invalid_rep test running...\n");
// Create REQ/XREP wiring.
- void *ctx = xs_init (1);
+ void *ctx = xs_init ();
assert (ctx);
void *xrep_socket = xs_socket (ctx, XS_XREP);
assert (xrep_socket);
diff --git a/tests/linger.cpp b/tests/linger.cpp
index a016496..8300394 100644
--- a/tests/linger.cpp
+++ b/tests/linger.cpp
@@ -25,7 +25,7 @@ int XS_TEST_MAIN ()
fprintf (stderr, "linger test running...\n");
// Create socket.
- void *ctx = xs_init (1);
+ void *ctx = xs_init ();
assert (ctx);
void *s = xs_socket (ctx, XS_PUSH);
assert (s);
diff --git a/tests/max_sockets.cpp b/tests/max_sockets.cpp
index 5add485..5890d0f 100644
--- a/tests/max_sockets.cpp
+++ b/tests/max_sockets.cpp
@@ -25,7 +25,7 @@ int XS_TEST_MAIN ()
fprintf (stderr, "max_sockets test running...\n");
// Create context and set MAX_SOCKETS to 1.
- void *ctx = xs_init (1);
+ void *ctx = xs_init ();
assert (ctx);
int max_sockets = 1;
int rc = xs_setctxopt (ctx, XS_MAX_SOCKETS, &max_sockets,
diff --git a/tests/msg_flags.cpp b/tests/msg_flags.cpp
index f2853bb..eade40f 100644
--- a/tests/msg_flags.cpp
+++ b/tests/msg_flags.cpp
@@ -25,7 +25,7 @@ int XS_TEST_MAIN ()
fprintf (stderr, "msg_flags test running...\n");
// Create the infrastructure
- void *ctx = xs_init (1);
+ void *ctx = xs_init ();
assert (ctx);
void *sb = xs_socket (ctx, XS_XREP);
assert (sb);
diff --git a/tests/pair_inproc.cpp b/tests/pair_inproc.cpp
index 45b2b7b..c02d08b 100644
--- a/tests/pair_inproc.cpp
+++ b/tests/pair_inproc.cpp
@@ -24,7 +24,7 @@ int XS_TEST_MAIN ()
{
fprintf (stderr, "pair_inproc test running...\n");
- void *ctx = xs_init (1);
+ void *ctx = xs_init ();
assert (ctx);
void *sb = xs_socket (ctx, XS_PAIR);
diff --git a/tests/pair_ipc.cpp b/tests/pair_ipc.cpp
index 4ce2762..46cdf76 100644
--- a/tests/pair_ipc.cpp
+++ b/tests/pair_ipc.cpp
@@ -31,7 +31,7 @@ int XS_TEST_MAIN ()
{
fprintf (stderr, "pair_ipc test running...\n");
- void *ctx = xs_init (1);
+ void *ctx = xs_init ();
assert (ctx);
void *sb = xs_socket (ctx, XS_PAIR);
diff --git a/tests/pair_tcp.cpp b/tests/pair_tcp.cpp
index 9bc910a..a0decb9 100644
--- a/tests/pair_tcp.cpp
+++ b/tests/pair_tcp.cpp
@@ -25,7 +25,7 @@ int XS_TEST_MAIN ()
{
fprintf (stderr, "pair_tcp test running...\n");
- void *ctx = xs_init (1);
+ void *ctx = xs_init ();
assert (ctx);
void *sb = xs_socket (ctx, XS_PAIR);
diff --git a/tests/polltimeo.cpp b/tests/polltimeo.cpp
index b8e63cd..3f7a233 100644
--- a/tests/polltimeo.cpp
+++ b/tests/polltimeo.cpp
@@ -41,7 +41,7 @@ int XS_TEST_MAIN ()
{
fprintf (stderr, "polltimeo test running...\n");
- void *ctx = xs_init (1);
+ void *ctx = xs_init ();
assert (ctx);
// Create a disconnected socket.
diff --git a/tests/reconnect.cpp b/tests/reconnect.cpp
index 2014fe0..c23d306 100644
--- a/tests/reconnect.cpp
+++ b/tests/reconnect.cpp
@@ -25,7 +25,7 @@ int XS_TEST_MAIN ()
fprintf (stderr, "reconnect test running...\n");
// Create the basic infrastructure.
- void *ctx = xs_init (1);
+ void *ctx = xs_init ();
assert (ctx);
void *push = xs_socket (ctx, XS_PUSH);
assert (push);
diff --git a/tests/reqrep_device.cpp b/tests/reqrep_device.cpp
index 4ce6951..54a9ed4 100644
--- a/tests/reqrep_device.cpp
+++ b/tests/reqrep_device.cpp
@@ -25,7 +25,7 @@ int XS_TEST_MAIN ()
{
fprintf (stderr, "reqrep_device test running...\n");
- void *ctx = xs_init (1);
+ void *ctx = xs_init ();
assert (ctx);
// Create a req/rep device.
diff --git a/tests/reqrep_inproc.cpp b/tests/reqrep_inproc.cpp
index 7f38bb3..c66b82b 100644
--- a/tests/reqrep_inproc.cpp
+++ b/tests/reqrep_inproc.cpp
@@ -24,7 +24,7 @@ int XS_TEST_MAIN ()
{
fprintf (stderr, "reqrep_inproc test running...\n");
- void *ctx = xs_init (1);
+ void *ctx = xs_init ();
assert (ctx);
void *sb = xs_socket (ctx, XS_REP);
diff --git a/tests/reqrep_ipc.cpp b/tests/reqrep_ipc.cpp
index ba7b861..7325cbb 100644
--- a/tests/reqrep_ipc.cpp
+++ b/tests/reqrep_ipc.cpp
@@ -31,7 +31,7 @@ int XS_TEST_MAIN ()
{
fprintf (stderr, "reqrep_ipc test running...\n");
- void *ctx = xs_init (1);
+ void *ctx = xs_init ();
assert (ctx);
void *sb = xs_socket (ctx, XS_REP);
diff --git a/tests/reqrep_tcp.cpp b/tests/reqrep_tcp.cpp
index cb99a66..b11d5a1 100644
--- a/tests/reqrep_tcp.cpp
+++ b/tests/reqrep_tcp.cpp
@@ -25,7 +25,7 @@ int XS_TEST_MAIN ()
{
fprintf (stderr, "reqrep_tcp test running...\n");
- void *ctx = xs_init (1);
+ void *ctx = xs_init ();
assert (ctx);
void *sb = xs_socket (ctx, XS_REP);
diff --git a/tests/shutdown_stress.cpp b/tests/shutdown_stress.cpp
index c0816cd..d7dffe8 100644
--- a/tests/shutdown_stress.cpp
+++ b/tests/shutdown_stress.cpp
@@ -46,6 +46,7 @@ int XS_TEST_MAIN ()
int i;
int j;
int rc;
+ int io_threads;
void *threads [THREAD_COUNT];
fprintf (stderr, "shutdown_stress test running...\n");
@@ -53,8 +54,12 @@ int XS_TEST_MAIN ()
for (j = 0; j != 10; j++) {
// Check the shutdown with many parallel I/O threads.
- ctx = xs_init (7);
+ ctx = xs_init ();
assert (ctx);
+ io_threads = 7;
+ rc = xs_setctxopt (ctx, XS_IO_THREADS, &io_threads,
+ sizeof (io_threads));
+ assert (rc == 0);
s1 = xs_socket (ctx, XS_PUB);
assert (s1);
diff --git a/tests/sub_forward.cpp b/tests/sub_forward.cpp
index 5dced71..cfff043 100644
--- a/tests/sub_forward.cpp
+++ b/tests/sub_forward.cpp
@@ -25,7 +25,7 @@ int XS_TEST_MAIN ()
{
fprintf (stderr, "sub_forward test running...\n");
- void *ctx = xs_init (1);
+ void *ctx = xs_init ();
assert (ctx);
// First, create an intermediate device.
diff --git a/tests/timeo.cpp b/tests/timeo.cpp
index 0628b94..000e718 100644
--- a/tests/timeo.cpp
+++ b/tests/timeo.cpp
@@ -41,7 +41,7 @@ int XS_TEST_MAIN ()
{
fprintf (stderr, "timeo test running...\n");
- void *ctx = xs_init (1);
+ void *ctx = xs_init ();
assert (ctx);
// Create a disconnected socket.