diff options
-rw-r--r-- | include/xs.h | 6 | ||||
-rw-r--r-- | src/config.hpp | 3 | ||||
-rw-r--r-- | src/ctx.cpp | 210 | ||||
-rw-r--r-- | src/ctx.hpp | 16 | ||||
-rw-r--r-- | src/xs.cpp | 11 | ||||
-rw-r--r-- | tests/Makefile.am | 4 | ||||
-rw-r--r-- | tests/linger.cpp | 2 | ||||
-rw-r--r-- | tests/max_sockets.cpp | 51 |
8 files changed, 209 insertions, 94 deletions
diff --git a/include/xs.h b/include/xs.h index 05d98ab..3aa1045 100644 --- a/include/xs.h +++ b/include/xs.h @@ -143,11 +143,15 @@ XS_EXPORT int xs_getmsgopt (xs_msg_t *msg, int option, void *optval, size_t *optvallen); /******************************************************************************/ -/* Crossroads infrastructure initialisation & termination. */ +/* Crossroads context definition. */ /******************************************************************************/ +#define XS_CTX_MAX_SOCKETS 1 + XS_EXPORT void *xs_init (int io_threads); XS_EXPORT int xs_term (void *context); +XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval, + size_t optvallen); /******************************************************************************/ /* Crossroads socket definition. */ diff --git a/src/config.hpp b/src/config.hpp index e4d948e..8107e17 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -29,9 +29,6 @@ namespace xs enum { - // Maximum number of sockets that can be opened at the same time. - max_sockets = 512, - // Number of new messages in message pipe needed to trigger new memory // allocation. Setting this parameter to 256 decreases the impact of // memory allocation by approximately 99.6% diff --git a/src/ctx.cpp b/src/ctx.cpp index 97a3e62..b6298e2 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -40,59 +40,11 @@ xs::ctx_t::ctx_t (uint32_t io_threads_) : tag (0xbadcafe0), - terminating (false) + starting (true), + terminating (false), + max_sockets (512), + io_thread_count (io_threads_) { - // Initialise the array of mailboxes. Additional three slots are for - // xs_term thread and reaper thread. - slot_count = max_sockets + io_threads_ + 3; - slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); - alloc_assert (slots); - - // Initialise the infrastructure for xs_term thread. - slots [term_tid] = &term_mailbox; - - // Create the reaper thread. - reaper = new (std::nothrow) reaper_t (this, reaper_tid); - alloc_assert (reaper); - slots [reaper_tid] = reaper->get_mailbox (); - reaper->start (); - - // Create I/O thread objects and launch them. - for (uint32_t i = 2; i != io_threads_ + 2; i++) { - io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); - alloc_assert (io_thread); - io_threads.push_back (io_thread); - slots [i] = io_thread->get_mailbox (); - io_thread->start (); - } - - // In the unused part of the slot array, create a list of empty slots. - for (int32_t i = (int32_t) slot_count - 1; - i >= (int32_t) io_threads_ + 2; i--) { - empty_slots.push_back (i); - slots [i] = NULL; - } - - // Create the socket to send logs to. - log_socket = create_socket (XS_PUB); - xs_assert (log_socket); - int linger = 0; - int rc = log_socket->setsockopt (XS_LINGER, &linger, sizeof (linger)); - errno_assert (rc == 0); - int hwm = 1; - rc = log_socket->setsockopt (XS_SNDHWM, &hwm, sizeof (hwm)); - errno_assert (rc == 0); -#if !defined XS_HAVE_WINDOWS - rc = log_socket->connect ("ipc:///tmp/xslogs.ipc"); - errno_assert (rc == 0); -#endif - - // Create the monitor object. - io_thread_t *io_thread = choose_io_thread (0); - xs_assert (io_thread); - monitor = new (std::nothrow) monitor_t (io_thread); - alloc_assert (monitor); - monitor->start (); } bool xs::ctx_t::check_tag () @@ -128,60 +80,142 @@ xs::ctx_t::~ctx_t () int xs::ctx_t::terminate () { - // Check whether termination was already underway, but interrupted and now - // restarted. - slot_sync.lock (); - bool restarted = terminating; - terminating = true; - slot_sync.unlock (); + if (!starting) { - // First attempt to terminate the context. - if (!restarted) { + // Check whether termination was already underway, but interrupted and now + // restarted. + slot_sync.lock (); + bool restarted = terminating; + terminating = true; + slot_sync.unlock (); - // Close the monitor object. Wait for done command from the monitor. - monitor->stop (); + // First attempt to terminate the context. + if (!restarted) { + + // Close the monitor object. Wait for done command from the monitor. + monitor->stop (); + command_t cmd; + int rc = term_mailbox.recv (&cmd, -1); + xs_assert (rc == 0); + xs_assert (cmd.type == command_t::done); + + // Close the logging socket. + log_sync.lock (); + rc = log_socket->close (); + xs_assert (rc == 0); + log_socket = NULL; + log_sync.unlock (); + + // First send stop command to sockets so that any blocking calls + // can be interrupted. If there are no sockets we can ask reaper + // thread to stop. + slot_sync.lock (); + for (sockets_t::size_type i = 0; i != sockets.size (); i++) + sockets [i]->stop (); + if (sockets.empty ()) + reaper->stop (); + slot_sync.unlock (); + } + + // Wait till reaper thread closes all the sockets. command_t cmd; int rc = term_mailbox.recv (&cmd, -1); + if (rc == -1 && errno == EINTR) + return -1; xs_assert (rc == 0); xs_assert (cmd.type == command_t::done); - - // Close the logging socket. - log_sync.lock (); - rc = log_socket->close (); - xs_assert (rc == 0); - log_socket = NULL; - log_sync.unlock (); - - // First send stop command to sockets so that any blocking calls - // can be interrupted. If there are no sockets we can ask reaper - // thread to stop. slot_sync.lock (); - for (sockets_t::size_type i = 0; i != sockets.size (); i++) - sockets [i]->stop (); - if (sockets.empty ()) - reaper->stop (); + xs_assert (sockets.empty ()); slot_sync.unlock (); } - // Wait till reaper thread closes all the sockets. - command_t cmd; - int rc = term_mailbox.recv (&cmd, -1); - if (rc == -1 && errno == EINTR) - return -1; - xs_assert (rc == 0); - xs_assert (cmd.type == command_t::done); - slot_sync.lock (); - xs_assert (sockets.empty ()); - slot_sync.unlock (); - // Deallocate the resources. delete this; return 0; } +int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) +{ + switch (option_) { + case XS_CTX_MAX_SOCKETS: + if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) { + errno = EINVAL; + return -1; + } + opt_sync.lock (); + max_sockets = *((int*) optval_); + opt_sync.unlock (); + break; + default: + errno = EINVAL; + return -1; + } + return 0; +} + xs::socket_base_t *xs::ctx_t::create_socket (int type_) { + if (unlikely (starting)) { + + starting = false; + + // Initialise the array of mailboxes. Additional three slots are for + // xs_term thread and reaper thread. + opt_sync.lock (); + int maxs = max_sockets; + opt_sync.unlock (); + slot_count = maxs + io_thread_count + 3; + slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); + alloc_assert (slots); + + // Initialise the infrastructure for xs_term thread. + slots [term_tid] = &term_mailbox; + + // Create the reaper thread. + reaper = new (std::nothrow) reaper_t (this, reaper_tid); + alloc_assert (reaper); + slots [reaper_tid] = reaper->get_mailbox (); + reaper->start (); + + // Create I/O thread objects and launch them. + for (uint32_t i = 2; i != io_thread_count + 2; i++) { + io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); + alloc_assert (io_thread); + io_threads.push_back (io_thread); + slots [i] = io_thread->get_mailbox (); + io_thread->start (); + } + + // In the unused part of the slot array, create a list of empty slots. + for (int32_t i = (int32_t) slot_count - 1; + i >= (int32_t) io_thread_count + 2; i--) { + empty_slots.push_back (i); + slots [i] = NULL; + } + + // Create the socket to send logs to. + log_socket = create_socket (XS_PUB); + xs_assert (log_socket); + int linger = 0; + int rc = log_socket->setsockopt (XS_LINGER, &linger, sizeof (linger)); + errno_assert (rc == 0); + int hwm = 1; + rc = log_socket->setsockopt (XS_SNDHWM, &hwm, sizeof (hwm)); + errno_assert (rc == 0); + #if !defined XS_HAVE_WINDOWS + rc = log_socket->connect ("ipc:///tmp/xslogs.ipc"); + errno_assert (rc == 0); + #endif + + // Create the monitor object. + io_thread_t *io_thread = choose_io_thread (0); + xs_assert (io_thread); + monitor = new (std::nothrow) monitor_t (io_thread); + alloc_assert (monitor); + monitor->start (); + } + slot_sync.lock (); // Once xs_term() was called, we can't create new sockets. diff --git a/src/ctx.hpp b/src/ctx.hpp index 8c3941f..56b5d4c 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -73,6 +73,9 @@ namespace xs // after the last one is closed. int terminate (); + // Set context option. + int setctxopt (int option_, const void *optval_, size_t optvallen_); + // Create and destroy a socket. xs::socket_base_t *create_socket (int type_); void destroy_socket (xs::socket_base_t *socket_); @@ -119,6 +122,10 @@ namespace xs typedef std::vector <uint32_t> emtpy_slots_t; emtpy_slots_t empty_slots; + // If true, xs_init has been called but no socket have been created + // yes. Launching of I/O threads is delayed. + bool starting; + // If true, xs_term was already called. bool terminating; @@ -160,6 +167,15 @@ namespace xs xs::socket_base_t *log_socket; mutex_t log_sync; + // Maximum number of sockets that can be opened at the same time. + int max_sockets; + + // Number of I/O threads to launch. + uint32_t io_thread_count; + + // Synchronisation of access to context options. + mutex_t opt_sync; + ctx_t (const ctx_t&); const ctx_t &operator = (const ctx_t&); }; @@ -169,6 +169,17 @@ int xs_term (void *ctx_) return rc; } +int xs_setctxopt (void *ctx_, int option_, const void *optval_, + size_t optvallen_) +{ + if (!ctx_ || !((xs::ctx_t*) ctx_)->check_tag ()) { + errno = EFAULT; + return -1; + } + + return ((xs::ctx_t*) ctx_)->setctxopt (option_, optval_, optvallen_); +} + void *xs_socket (void *ctx_, int type_) { if (!ctx_ || !((xs::ctx_t*) ctx_)->check_tag ()) { diff --git a/tests/Makefile.am b/tests/Makefile.am index 85c7527..fba6271 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -17,7 +17,8 @@ noinst_PROGRAMS = pair_inproc \ shutdown_stress \ pair_ipc \ reqrep_ipc \ - timeo + timeo \ + max_sockets pair_inproc_SOURCES = pair_inproc.cpp testutil.hpp pair_tcp_SOURCES = pair_tcp.cpp testutil.hpp @@ -34,5 +35,6 @@ shutdown_stress_SOURCES = shutdown_stress.cpp pair_ipc_SOURCES = pair_ipc.cpp testutil.hpp reqrep_ipc_SOURCES = reqrep_ipc.cpp testutil.hpp timeo_SOURCES = timeo.cpp +max_sockets_SOURCES = max_sockets.cpp TESTS = $(noinst_PROGRAMS) diff --git a/tests/linger.cpp b/tests/linger.cpp index 5c0480c..35de303 100644 --- a/tests/linger.cpp +++ b/tests/linger.cpp @@ -24,7 +24,7 @@ int XS_TEST_MAIN () { fprintf (stderr, "linger test running...\n"); - // Create REQ/XREP wiring. + // Create socket. void *ctx = xs_init (1); assert (ctx); void *s = xs_socket (ctx, XS_PUSH); diff --git a/tests/max_sockets.cpp b/tests/max_sockets.cpp new file mode 100644 index 0000000..4ce6cb9 --- /dev/null +++ b/tests/max_sockets.cpp @@ -0,0 +1,51 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads project. + + Crossroads is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "testutil.hpp" + +int XS_TEST_MAIN () +{ + fprintf (stderr, "max_sockets test running...\n"); + + // Create context and set MAX_SOCKETS to 1. + void *ctx = xs_init (1); + assert (ctx); + int max_sockets = 1; + int rc = xs_setctxopt (ctx, XS_CTX_MAX_SOCKETS, &max_sockets, + sizeof (max_sockets)); + assert (rc == 0); + + // First socket should be created OK. + void *s1 = xs_socket (ctx, XS_PUSH); + assert (s1); + + // Creation of second socket should fail. + void *s2 = xs_socket (ctx, XS_PUSH); + assert (!s2 && errno == EMFILE); + + // Clean up. + rc = xs_close (s1); + assert (rc == 0); + rc = xs_term (ctx); + assert (rc == 0); + + return 0; +} + |