From 921da22147e201455837bcd38df1af33aceff26f Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 13 Mar 2012 11:10:33 +0100 Subject: io_threads argument removed from xs_init() The argument was changed to a context option (XS_IO_THREADS). 0MQ compatibility mode sets the option and ensures that there's at least one I/O thread present. Signed-off-by: Martin Sustrik --- doc/xs_init.txt | 9 ++------- doc/xs_setctxopt.txt | 10 ++++++++++ include/xs.h | 3 ++- perf/inproc_lat.cpp | 2 +- perf/inproc_thr.cpp | 2 +- perf/local_lat.cpp | 2 +- perf/local_thr.cpp | 2 +- perf/remote_lat.cpp | 2 +- perf/remote_thr.cpp | 2 +- src/ctx.cpp | 20 +++++++++++++++----- src/ctx.hpp | 8 +++----- src/xs.cpp | 10 ++-------- src/xszmq.cpp | 17 ++++++++++++++++- tests/emptyctx.cpp | 2 +- tests/hwm.cpp | 2 +- tests/invalid_rep.cpp | 2 +- tests/linger.cpp | 2 +- tests/max_sockets.cpp | 2 +- tests/msg_flags.cpp | 2 +- tests/pair_inproc.cpp | 2 +- tests/pair_ipc.cpp | 2 +- tests/pair_tcp.cpp | 2 +- tests/polltimeo.cpp | 2 +- tests/reconnect.cpp | 2 +- tests/reqrep_device.cpp | 2 +- tests/reqrep_inproc.cpp | 2 +- tests/reqrep_ipc.cpp | 2 +- tests/reqrep_tcp.cpp | 2 +- tests/shutdown_stress.cpp | 7 ++++++- tests/sub_forward.cpp | 2 +- tests/timeo.cpp | 2 +- 31 files changed, 79 insertions(+), 51 deletions(-) diff --git a/doc/xs_init.txt b/doc/xs_init.txt index e642824..373822a 100644 --- a/doc/xs_init.txt +++ b/doc/xs_init.txt @@ -9,17 +9,13 @@ xs_init - initialise Crossroads context SYNOPSIS -------- -*void *xs_init (int 'io_threads');* +*void *xs_init ();* DESCRIPTION ----------- The _xs_init()_ function initialises a Crossroads 'context'. -The 'io_threads' argument specifies the size of the thread pool to handle -I/O operations. If your application is using only the 'inproc' transport for -messaging you may set this to zero, otherwise set it to at least one. - .Thread safety A 'context' is thread safe and may be shared among as many application threads as necessary, without any additional locking required on the part of @@ -35,8 +31,7 @@ of the values defined below. ERRORS ------ -*EINVAL*:: -An invalid number of 'io_threads' was requested. +No error values are defined. SEE ALSO diff --git a/doc/xs_setctxopt.txt b/doc/xs_setctxopt.txt index dded922..d26d319 100644 --- a/doc/xs_setctxopt.txt +++ b/doc/xs_setctxopt.txt @@ -34,6 +34,16 @@ Option value type:: int Option value unit:: sockets Default value:: 512 +XS_IO_THREADS: Set number of worker threads +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'XS_IO_THREADS' option shall set specify the size of the thread pool to +handle I/O operations. The value should be at least 1. + +[horizontal] +Option value type:: int +Option value unit:: threads +Default value:: 1 + RETURN VALUE ------------ The _xs_setctxopt()_ function shall return zero if successful. Otherwise it diff --git a/include/xs.h b/include/xs.h index 0b830fb..a339bb9 100644 --- a/include/xs.h +++ b/include/xs.h @@ -147,8 +147,9 @@ XS_EXPORT int xs_getmsgopt (xs_msg_t *msg, int option, void *optval, /******************************************************************************/ #define XS_MAX_SOCKETS 1 +#define XS_IO_THREADS 2 -XS_EXPORT void *xs_init (int io_threads); +XS_EXPORT void *xs_init (); XS_EXPORT int xs_term (void *context); XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval, size_t optvallen); diff --git a/perf/inproc_lat.cpp b/perf/inproc_lat.cpp index 3c8334f..802c71a 100644 --- a/perf/inproc_lat.cpp +++ b/perf/inproc_lat.cpp @@ -123,7 +123,7 @@ int main (int argc, char *argv []) message_size = atoi (argv [1]); roundtrip_count = atoi (argv [2]); - ctx = xs_init (1); + ctx = xs_init (); if (!ctx) { printf ("error in xs_init: %s\n", xs_strerror (errno)); return -1; diff --git a/perf/inproc_thr.cpp b/perf/inproc_thr.cpp index f3f07ad..de4b9ad 100644 --- a/perf/inproc_thr.cpp +++ b/perf/inproc_thr.cpp @@ -122,7 +122,7 @@ int main (int argc, char *argv []) message_size = atoi (argv [1]); message_count = atoi (argv [2]); - ctx = xs_init (1); + ctx = xs_init (); if (!ctx) { printf ("error in xs_init: %s\n", xs_strerror (errno)); return -1; diff --git a/perf/local_lat.cpp b/perf/local_lat.cpp index f1f4881..ca6a1e3 100644 --- a/perf/local_lat.cpp +++ b/perf/local_lat.cpp @@ -44,7 +44,7 @@ int main (int argc, char *argv []) message_size = atoi (argv [2]); roundtrip_count = atoi (argv [3]); - ctx = xs_init (1); + ctx = xs_init (); if (!ctx) { printf ("error in xs_init: %s\n", xs_strerror (errno)); return -1; diff --git a/perf/local_thr.cpp b/perf/local_thr.cpp index dd6057e..78c8930 100644 --- a/perf/local_thr.cpp +++ b/perf/local_thr.cpp @@ -47,7 +47,7 @@ int main (int argc, char *argv []) message_size = atoi (argv [2]); message_count = atoi (argv [3]); - ctx = xs_init (1); + ctx = xs_init (); if (!ctx) { printf ("error in xs_init: %s\n", xs_strerror (errno)); return -1; diff --git a/perf/remote_lat.cpp b/perf/remote_lat.cpp index 7bb0c19..03efe73 100644 --- a/perf/remote_lat.cpp +++ b/perf/remote_lat.cpp @@ -48,7 +48,7 @@ int main (int argc, char *argv []) message_size = atoi (argv [2]); roundtrip_count = atoi (argv [3]); - ctx = xs_init (1); + ctx = xs_init (); if (!ctx) { printf ("error in xs_init: %s\n", xs_strerror (errno)); return -1; diff --git a/perf/remote_thr.cpp b/perf/remote_thr.cpp index 00d5591..f78958c 100644 --- a/perf/remote_thr.cpp +++ b/perf/remote_thr.cpp @@ -45,7 +45,7 @@ int main (int argc, char *argv []) message_size = atoi (argv [2]); message_count = atoi (argv [3]); - ctx = xs_init (1); + ctx = xs_init (); if (!ctx) { printf ("error in xs_init: %s\n", xs_strerror (errno)); return -1; diff --git a/src/ctx.cpp b/src/ctx.cpp index d59e90b..df7f564 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -37,7 +37,7 @@ #include "err.hpp" #include "msg.hpp" -xs::ctx_t::ctx_t (uint32_t io_threads_) : +xs::ctx_t::ctx_t () : tag (0xbadcafe0), starting (true), terminating (false), @@ -47,7 +47,7 @@ xs::ctx_t::ctx_t (uint32_t io_threads_) : monitor (NULL), log_socket (NULL), max_sockets (512), - io_thread_count (io_threads_) + io_thread_count (1) { } @@ -153,6 +153,15 @@ int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) max_sockets = *((int*) optval_); opt_sync.unlock (); break; + case XS_IO_THREADS: + if (optvallen_ != sizeof (int) || *((int*) optval_) < 1) { + errno = EINVAL; + return -1; + } + opt_sync.lock (); + io_thread_count = *((int*) optval_); + opt_sync.unlock (); + break; default: errno = EINVAL; return -1; @@ -170,8 +179,9 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_) // xs_term thread and reaper thread. opt_sync.lock (); int maxs = max_sockets; + int ios = io_thread_count; opt_sync.unlock (); - slot_count = maxs + io_thread_count + 3; + slot_count = maxs + ios + 3; slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); alloc_assert (slots); @@ -185,7 +195,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_) reaper->start (); // Create I/O thread objects and launch them. - for (uint32_t i = 2; i != io_thread_count + 2; i++) { + for (int i = 2; i != ios + 2; i++) { io_thread_t *io_thread = io_thread_t::create (this, i); errno_assert (io_thread); io_threads.push_back (io_thread); @@ -195,7 +205,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_) // In the unused part of the slot array, create a list of empty slots. for (int32_t i = (int32_t) slot_count - 1; - i >= (int32_t) io_thread_count + 2; i--) { + i >= (int32_t) ios + 2; i--) { empty_slots.push_back (i); slots [i] = NULL; } diff --git a/src/ctx.hpp b/src/ctx.hpp index df315e4..b83fa1f 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -31,7 +31,6 @@ #include "array.hpp" #include "config.hpp" #include "mutex.hpp" -#include "stdint.hpp" #include "options.hpp" #include "atomic_counter.hpp" @@ -60,9 +59,8 @@ namespace xs { public: - // Create the context object. The argument specifies the size - // of I/O thread pool to create. - ctx_t (uint32_t io_threads_); + // Create the context object. + ctx_t (); // Returns false if object is not a context. bool check_tag (); @@ -171,7 +169,7 @@ namespace xs int max_sockets; // Number of I/O threads to launch. - uint32_t io_thread_count; + int io_thread_count; // Synchronisation of access to context options. mutex_t opt_sync; diff --git a/src/xs.cpp b/src/xs.cpp index 3ee62aa..b90e383 100644 --- a/src/xs.cpp +++ b/src/xs.cpp @@ -63,14 +63,8 @@ const char *xs_strerror (int errnum_) return xs::errno_to_string (errnum_); } -void *xs_init (int io_threads_) +void *xs_init () { - // We need at least one I/O thread to run the monitor object in. - if (io_threads_ < 1) { - errno = EINVAL; - return NULL; - } - #if defined XS_HAVE_OPENPGM // Init PGM transport. Ensure threading and timer are enabled. Find PGM @@ -111,7 +105,7 @@ void *xs_init (int io_threads_) #endif // Create the context. - xs::ctx_t *ctx = new (std::nothrow) xs::ctx_t ((uint32_t) io_threads_); + xs::ctx_t *ctx = new (std::nothrow) xs::ctx_t; alloc_assert (ctx); return (void*) ctx; } diff --git a/src/xszmq.cpp b/src/xszmq.cpp index d97d9ea..6b3930c 100644 --- a/src/xszmq.cpp +++ b/src/xszmq.cpp @@ -112,7 +112,22 @@ size_t zmq_msg_size (zmq_msg_t *msg) void *zmq_init (int io_threads) { - return xs_init (io_threads); + void *ctx = xs_init (); + if (!ctx) + return NULL; + + // Crossroads don't allow for zero I/O threads. + if (io_threads < 1) + io_threads = 1; + + int rc = xs_setctxopt (ctx, XS_IO_THREADS, &io_threads, + sizeof (io_threads)); + if (rc != 0) { + xs_term (ctx); + return NULL; + } + + return ctx; } int zmq_term (void *context) diff --git a/tests/emptyctx.cpp b/tests/emptyctx.cpp index 4208a5a..1448254 100644 --- a/tests/emptyctx.cpp +++ b/tests/emptyctx.cpp @@ -26,7 +26,7 @@ int XS_TEST_MAIN () // This is a very simple test to check whether everything works OK when // context is terminated even before I/O threads were launched. - void *ctx = xs_init (1); + void *ctx = xs_init (); assert (ctx); int rc = xs_term (ctx); assert (rc == 0); diff --git a/tests/hwm.cpp b/tests/hwm.cpp index e3faa6c..f93dcd9 100644 --- a/tests/hwm.cpp +++ b/tests/hwm.cpp @@ -24,7 +24,7 @@ int XS_TEST_MAIN () { fprintf (stderr, "hwm test running...\n"); - void *ctx = xs_init (1); + void *ctx = xs_init (); assert (ctx); // Create pair of socket, each with high watermark of 2. Thus the total diff --git a/tests/invalid_rep.cpp b/tests/invalid_rep.cpp index b2cb976..cd0ad8e 100644 --- a/tests/invalid_rep.cpp +++ b/tests/invalid_rep.cpp @@ -26,7 +26,7 @@ int XS_TEST_MAIN () fprintf (stderr, "invalid_rep test running...\n"); // Create REQ/XREP wiring. - void *ctx = xs_init (1); + void *ctx = xs_init (); assert (ctx); void *xrep_socket = xs_socket (ctx, XS_XREP); assert (xrep_socket); diff --git a/tests/linger.cpp b/tests/linger.cpp index a016496..8300394 100644 --- a/tests/linger.cpp +++ b/tests/linger.cpp @@ -25,7 +25,7 @@ int XS_TEST_MAIN () fprintf (stderr, "linger test running...\n"); // Create socket. - void *ctx = xs_init (1); + void *ctx = xs_init (); assert (ctx); void *s = xs_socket (ctx, XS_PUSH); assert (s); diff --git a/tests/max_sockets.cpp b/tests/max_sockets.cpp index 5add485..5890d0f 100644 --- a/tests/max_sockets.cpp +++ b/tests/max_sockets.cpp @@ -25,7 +25,7 @@ int XS_TEST_MAIN () fprintf (stderr, "max_sockets test running...\n"); // Create context and set MAX_SOCKETS to 1. - void *ctx = xs_init (1); + void *ctx = xs_init (); assert (ctx); int max_sockets = 1; int rc = xs_setctxopt (ctx, XS_MAX_SOCKETS, &max_sockets, diff --git a/tests/msg_flags.cpp b/tests/msg_flags.cpp index f2853bb..eade40f 100644 --- a/tests/msg_flags.cpp +++ b/tests/msg_flags.cpp @@ -25,7 +25,7 @@ int XS_TEST_MAIN () fprintf (stderr, "msg_flags test running...\n"); // Create the infrastructure - void *ctx = xs_init (1); + void *ctx = xs_init (); assert (ctx); void *sb = xs_socket (ctx, XS_XREP); assert (sb); diff --git a/tests/pair_inproc.cpp b/tests/pair_inproc.cpp index 45b2b7b..c02d08b 100644 --- a/tests/pair_inproc.cpp +++ b/tests/pair_inproc.cpp @@ -24,7 +24,7 @@ int XS_TEST_MAIN () { fprintf (stderr, "pair_inproc test running...\n"); - void *ctx = xs_init (1); + void *ctx = xs_init (); assert (ctx); void *sb = xs_socket (ctx, XS_PAIR); diff --git a/tests/pair_ipc.cpp b/tests/pair_ipc.cpp index 4ce2762..46cdf76 100644 --- a/tests/pair_ipc.cpp +++ b/tests/pair_ipc.cpp @@ -31,7 +31,7 @@ int XS_TEST_MAIN () { fprintf (stderr, "pair_ipc test running...\n"); - void *ctx = xs_init (1); + void *ctx = xs_init (); assert (ctx); void *sb = xs_socket (ctx, XS_PAIR); diff --git a/tests/pair_tcp.cpp b/tests/pair_tcp.cpp index 9bc910a..a0decb9 100644 --- a/tests/pair_tcp.cpp +++ b/tests/pair_tcp.cpp @@ -25,7 +25,7 @@ int XS_TEST_MAIN () { fprintf (stderr, "pair_tcp test running...\n"); - void *ctx = xs_init (1); + void *ctx = xs_init (); assert (ctx); void *sb = xs_socket (ctx, XS_PAIR); diff --git a/tests/polltimeo.cpp b/tests/polltimeo.cpp index b8e63cd..3f7a233 100644 --- a/tests/polltimeo.cpp +++ b/tests/polltimeo.cpp @@ -41,7 +41,7 @@ int XS_TEST_MAIN () { fprintf (stderr, "polltimeo test running...\n"); - void *ctx = xs_init (1); + void *ctx = xs_init (); assert (ctx); // Create a disconnected socket. diff --git a/tests/reconnect.cpp b/tests/reconnect.cpp index 2014fe0..c23d306 100644 --- a/tests/reconnect.cpp +++ b/tests/reconnect.cpp @@ -25,7 +25,7 @@ int XS_TEST_MAIN () fprintf (stderr, "reconnect test running...\n"); // Create the basic infrastructure. - void *ctx = xs_init (1); + void *ctx = xs_init (); assert (ctx); void *push = xs_socket (ctx, XS_PUSH); assert (push); diff --git a/tests/reqrep_device.cpp b/tests/reqrep_device.cpp index 4ce6951..54a9ed4 100644 --- a/tests/reqrep_device.cpp +++ b/tests/reqrep_device.cpp @@ -25,7 +25,7 @@ int XS_TEST_MAIN () { fprintf (stderr, "reqrep_device test running...\n"); - void *ctx = xs_init (1); + void *ctx = xs_init (); assert (ctx); // Create a req/rep device. diff --git a/tests/reqrep_inproc.cpp b/tests/reqrep_inproc.cpp index 7f38bb3..c66b82b 100644 --- a/tests/reqrep_inproc.cpp +++ b/tests/reqrep_inproc.cpp @@ -24,7 +24,7 @@ int XS_TEST_MAIN () { fprintf (stderr, "reqrep_inproc test running...\n"); - void *ctx = xs_init (1); + void *ctx = xs_init (); assert (ctx); void *sb = xs_socket (ctx, XS_REP); diff --git a/tests/reqrep_ipc.cpp b/tests/reqrep_ipc.cpp index ba7b861..7325cbb 100644 --- a/tests/reqrep_ipc.cpp +++ b/tests/reqrep_ipc.cpp @@ -31,7 +31,7 @@ int XS_TEST_MAIN () { fprintf (stderr, "reqrep_ipc test running...\n"); - void *ctx = xs_init (1); + void *ctx = xs_init (); assert (ctx); void *sb = xs_socket (ctx, XS_REP); diff --git a/tests/reqrep_tcp.cpp b/tests/reqrep_tcp.cpp index cb99a66..b11d5a1 100644 --- a/tests/reqrep_tcp.cpp +++ b/tests/reqrep_tcp.cpp @@ -25,7 +25,7 @@ int XS_TEST_MAIN () { fprintf (stderr, "reqrep_tcp test running...\n"); - void *ctx = xs_init (1); + void *ctx = xs_init (); assert (ctx); void *sb = xs_socket (ctx, XS_REP); diff --git a/tests/shutdown_stress.cpp b/tests/shutdown_stress.cpp index c0816cd..d7dffe8 100644 --- a/tests/shutdown_stress.cpp +++ b/tests/shutdown_stress.cpp @@ -46,6 +46,7 @@ int XS_TEST_MAIN () int i; int j; int rc; + int io_threads; void *threads [THREAD_COUNT]; fprintf (stderr, "shutdown_stress test running...\n"); @@ -53,8 +54,12 @@ int XS_TEST_MAIN () for (j = 0; j != 10; j++) { // Check the shutdown with many parallel I/O threads. - ctx = xs_init (7); + ctx = xs_init (); assert (ctx); + io_threads = 7; + rc = xs_setctxopt (ctx, XS_IO_THREADS, &io_threads, + sizeof (io_threads)); + assert (rc == 0); s1 = xs_socket (ctx, XS_PUB); assert (s1); diff --git a/tests/sub_forward.cpp b/tests/sub_forward.cpp index 5dced71..cfff043 100644 --- a/tests/sub_forward.cpp +++ b/tests/sub_forward.cpp @@ -25,7 +25,7 @@ int XS_TEST_MAIN () { fprintf (stderr, "sub_forward test running...\n"); - void *ctx = xs_init (1); + void *ctx = xs_init (); assert (ctx); // First, create an intermediate device. diff --git a/tests/timeo.cpp b/tests/timeo.cpp index 0628b94..000e718 100644 --- a/tests/timeo.cpp +++ b/tests/timeo.cpp @@ -41,7 +41,7 @@ int XS_TEST_MAIN () { fprintf (stderr, "timeo test running...\n"); - void *ctx = xs_init (1); + void *ctx = xs_init (); assert (ctx); // Create a disconnected socket. -- cgit v1.2.3