summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/xs.h6
-rw-r--r--src/config.hpp3
-rw-r--r--src/ctx.cpp210
-rw-r--r--src/ctx.hpp16
-rw-r--r--src/xs.cpp11
-rw-r--r--tests/Makefile.am4
-rw-r--r--tests/linger.cpp2
-rw-r--r--tests/max_sockets.cpp51
8 files changed, 209 insertions, 94 deletions
diff --git a/include/xs.h b/include/xs.h
index 05d98ab..3aa1045 100644
--- a/include/xs.h
+++ b/include/xs.h
@@ -143,11 +143,15 @@ XS_EXPORT int xs_getmsgopt (xs_msg_t *msg, int option, void *optval,
size_t *optvallen);
/******************************************************************************/
-/* Crossroads infrastructure initialisation & termination. */
+/* Crossroads context definition. */
/******************************************************************************/
+#define XS_CTX_MAX_SOCKETS 1
+
XS_EXPORT void *xs_init (int io_threads);
XS_EXPORT int xs_term (void *context);
+XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval,
+ size_t optvallen);
/******************************************************************************/
/* Crossroads socket definition. */
diff --git a/src/config.hpp b/src/config.hpp
index e4d948e..8107e17 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -29,9 +29,6 @@ namespace xs
enum
{
- // Maximum number of sockets that can be opened at the same time.
- max_sockets = 512,
-
// Number of new messages in message pipe needed to trigger new memory
// allocation. Setting this parameter to 256 decreases the impact of
// memory allocation by approximately 99.6%
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 97a3e62..b6298e2 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -40,59 +40,11 @@
xs::ctx_t::ctx_t (uint32_t io_threads_) :
tag (0xbadcafe0),
- terminating (false)
+ starting (true),
+ terminating (false),
+ max_sockets (512),
+ io_thread_count (io_threads_)
{
- // Initialise the array of mailboxes. Additional three slots are for
- // xs_term thread and reaper thread.
- slot_count = max_sockets + io_threads_ + 3;
- slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
- alloc_assert (slots);
-
- // Initialise the infrastructure for xs_term thread.
- slots [term_tid] = &term_mailbox;
-
- // Create the reaper thread.
- reaper = new (std::nothrow) reaper_t (this, reaper_tid);
- alloc_assert (reaper);
- slots [reaper_tid] = reaper->get_mailbox ();
- reaper->start ();
-
- // Create I/O thread objects and launch them.
- for (uint32_t i = 2; i != io_threads_ + 2; i++) {
- io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
- alloc_assert (io_thread);
- io_threads.push_back (io_thread);
- slots [i] = io_thread->get_mailbox ();
- io_thread->start ();
- }
-
- // In the unused part of the slot array, create a list of empty slots.
- for (int32_t i = (int32_t) slot_count - 1;
- i >= (int32_t) io_threads_ + 2; i--) {
- empty_slots.push_back (i);
- slots [i] = NULL;
- }
-
- // Create the socket to send logs to.
- log_socket = create_socket (XS_PUB);
- xs_assert (log_socket);
- int linger = 0;
- int rc = log_socket->setsockopt (XS_LINGER, &linger, sizeof (linger));
- errno_assert (rc == 0);
- int hwm = 1;
- rc = log_socket->setsockopt (XS_SNDHWM, &hwm, sizeof (hwm));
- errno_assert (rc == 0);
-#if !defined XS_HAVE_WINDOWS
- rc = log_socket->connect ("ipc:///tmp/xslogs.ipc");
- errno_assert (rc == 0);
-#endif
-
- // Create the monitor object.
- io_thread_t *io_thread = choose_io_thread (0);
- xs_assert (io_thread);
- monitor = new (std::nothrow) monitor_t (io_thread);
- alloc_assert (monitor);
- monitor->start ();
}
bool xs::ctx_t::check_tag ()
@@ -128,60 +80,142 @@ xs::ctx_t::~ctx_t ()
int xs::ctx_t::terminate ()
{
- // Check whether termination was already underway, but interrupted and now
- // restarted.
- slot_sync.lock ();
- bool restarted = terminating;
- terminating = true;
- slot_sync.unlock ();
+ if (!starting) {
- // First attempt to terminate the context.
- if (!restarted) {
+ // Check whether termination was already underway, but interrupted and now
+ // restarted.
+ slot_sync.lock ();
+ bool restarted = terminating;
+ terminating = true;
+ slot_sync.unlock ();
- // Close the monitor object. Wait for done command from the monitor.
- monitor->stop ();
+ // First attempt to terminate the context.
+ if (!restarted) {
+
+ // Close the monitor object. Wait for done command from the monitor.
+ monitor->stop ();
+ command_t cmd;
+ int rc = term_mailbox.recv (&cmd, -1);
+ xs_assert (rc == 0);
+ xs_assert (cmd.type == command_t::done);
+
+ // Close the logging socket.
+ log_sync.lock ();
+ rc = log_socket->close ();
+ xs_assert (rc == 0);
+ log_socket = NULL;
+ log_sync.unlock ();
+
+ // First send stop command to sockets so that any blocking calls
+ // can be interrupted. If there are no sockets we can ask reaper
+ // thread to stop.
+ slot_sync.lock ();
+ for (sockets_t::size_type i = 0; i != sockets.size (); i++)
+ sockets [i]->stop ();
+ if (sockets.empty ())
+ reaper->stop ();
+ slot_sync.unlock ();
+ }
+
+ // Wait till reaper thread closes all the sockets.
command_t cmd;
int rc = term_mailbox.recv (&cmd, -1);
+ if (rc == -1 && errno == EINTR)
+ return -1;
xs_assert (rc == 0);
xs_assert (cmd.type == command_t::done);
-
- // Close the logging socket.
- log_sync.lock ();
- rc = log_socket->close ();
- xs_assert (rc == 0);
- log_socket = NULL;
- log_sync.unlock ();
-
- // First send stop command to sockets so that any blocking calls
- // can be interrupted. If there are no sockets we can ask reaper
- // thread to stop.
slot_sync.lock ();
- for (sockets_t::size_type i = 0; i != sockets.size (); i++)
- sockets [i]->stop ();
- if (sockets.empty ())
- reaper->stop ();
+ xs_assert (sockets.empty ());
slot_sync.unlock ();
}
- // Wait till reaper thread closes all the sockets.
- command_t cmd;
- int rc = term_mailbox.recv (&cmd, -1);
- if (rc == -1 && errno == EINTR)
- return -1;
- xs_assert (rc == 0);
- xs_assert (cmd.type == command_t::done);
- slot_sync.lock ();
- xs_assert (sockets.empty ());
- slot_sync.unlock ();
-
// Deallocate the resources.
delete this;
return 0;
}
+int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_)
+{
+ switch (option_) {
+ case XS_CTX_MAX_SOCKETS:
+ if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ opt_sync.lock ();
+ max_sockets = *((int*) optval_);
+ opt_sync.unlock ();
+ break;
+ default:
+ errno = EINVAL;
+ return -1;
+ }
+ return 0;
+}
+
xs::socket_base_t *xs::ctx_t::create_socket (int type_)
{
+ if (unlikely (starting)) {
+
+ starting = false;
+
+ // Initialise the array of mailboxes. Additional three slots are for
+ // xs_term thread and reaper thread.
+ opt_sync.lock ();
+ int maxs = max_sockets;
+ opt_sync.unlock ();
+ slot_count = maxs + io_thread_count + 3;
+ slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
+ alloc_assert (slots);
+
+ // Initialise the infrastructure for xs_term thread.
+ slots [term_tid] = &term_mailbox;
+
+ // Create the reaper thread.
+ reaper = new (std::nothrow) reaper_t (this, reaper_tid);
+ alloc_assert (reaper);
+ slots [reaper_tid] = reaper->get_mailbox ();
+ reaper->start ();
+
+ // Create I/O thread objects and launch them.
+ for (uint32_t i = 2; i != io_thread_count + 2; i++) {
+ io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
+ alloc_assert (io_thread);
+ io_threads.push_back (io_thread);
+ slots [i] = io_thread->get_mailbox ();
+ io_thread->start ();
+ }
+
+ // In the unused part of the slot array, create a list of empty slots.
+ for (int32_t i = (int32_t) slot_count - 1;
+ i >= (int32_t) io_thread_count + 2; i--) {
+ empty_slots.push_back (i);
+ slots [i] = NULL;
+ }
+
+ // Create the socket to send logs to.
+ log_socket = create_socket (XS_PUB);
+ xs_assert (log_socket);
+ int linger = 0;
+ int rc = log_socket->setsockopt (XS_LINGER, &linger, sizeof (linger));
+ errno_assert (rc == 0);
+ int hwm = 1;
+ rc = log_socket->setsockopt (XS_SNDHWM, &hwm, sizeof (hwm));
+ errno_assert (rc == 0);
+ #if !defined XS_HAVE_WINDOWS
+ rc = log_socket->connect ("ipc:///tmp/xslogs.ipc");
+ errno_assert (rc == 0);
+ #endif
+
+ // Create the monitor object.
+ io_thread_t *io_thread = choose_io_thread (0);
+ xs_assert (io_thread);
+ monitor = new (std::nothrow) monitor_t (io_thread);
+ alloc_assert (monitor);
+ monitor->start ();
+ }
+
slot_sync.lock ();
// Once xs_term() was called, we can't create new sockets.
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 8c3941f..56b5d4c 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -73,6 +73,9 @@ namespace xs
// after the last one is closed.
int terminate ();
+ // Set context option.
+ int setctxopt (int option_, const void *optval_, size_t optvallen_);
+
// Create and destroy a socket.
xs::socket_base_t *create_socket (int type_);
void destroy_socket (xs::socket_base_t *socket_);
@@ -119,6 +122,10 @@ namespace xs
typedef std::vector <uint32_t> emtpy_slots_t;
emtpy_slots_t empty_slots;
+ // If true, xs_init has been called but no socket have been created
+ // yes. Launching of I/O threads is delayed.
+ bool starting;
+
// If true, xs_term was already called.
bool terminating;
@@ -160,6 +167,15 @@ namespace xs
xs::socket_base_t *log_socket;
mutex_t log_sync;
+ // Maximum number of sockets that can be opened at the same time.
+ int max_sockets;
+
+ // Number of I/O threads to launch.
+ uint32_t io_thread_count;
+
+ // Synchronisation of access to context options.
+ mutex_t opt_sync;
+
ctx_t (const ctx_t&);
const ctx_t &operator = (const ctx_t&);
};
diff --git a/src/xs.cpp b/src/xs.cpp
index 8fc8dd2..ad5bd97 100644
--- a/src/xs.cpp
+++ b/src/xs.cpp
@@ -169,6 +169,17 @@ int xs_term (void *ctx_)
return rc;
}
+int xs_setctxopt (void *ctx_, int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (!ctx_ || !((xs::ctx_t*) ctx_)->check_tag ()) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ return ((xs::ctx_t*) ctx_)->setctxopt (option_, optval_, optvallen_);
+}
+
void *xs_socket (void *ctx_, int type_)
{
if (!ctx_ || !((xs::ctx_t*) ctx_)->check_tag ()) {
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 85c7527..fba6271 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -17,7 +17,8 @@ noinst_PROGRAMS = pair_inproc \
shutdown_stress \
pair_ipc \
reqrep_ipc \
- timeo
+ timeo \
+ max_sockets
pair_inproc_SOURCES = pair_inproc.cpp testutil.hpp
pair_tcp_SOURCES = pair_tcp.cpp testutil.hpp
@@ -34,5 +35,6 @@ shutdown_stress_SOURCES = shutdown_stress.cpp
pair_ipc_SOURCES = pair_ipc.cpp testutil.hpp
reqrep_ipc_SOURCES = reqrep_ipc.cpp testutil.hpp
timeo_SOURCES = timeo.cpp
+max_sockets_SOURCES = max_sockets.cpp
TESTS = $(noinst_PROGRAMS)
diff --git a/tests/linger.cpp b/tests/linger.cpp
index 5c0480c..35de303 100644
--- a/tests/linger.cpp
+++ b/tests/linger.cpp
@@ -24,7 +24,7 @@ int XS_TEST_MAIN ()
{
fprintf (stderr, "linger test running...\n");
- // Create REQ/XREP wiring.
+ // Create socket.
void *ctx = xs_init (1);
assert (ctx);
void *s = xs_socket (ctx, XS_PUSH);
diff --git a/tests/max_sockets.cpp b/tests/max_sockets.cpp
new file mode 100644
index 0000000..4ce6cb9
--- /dev/null
+++ b/tests/max_sockets.cpp
@@ -0,0 +1,51 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads project.
+
+ Crossroads is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "testutil.hpp"
+
+int XS_TEST_MAIN ()
+{
+ fprintf (stderr, "max_sockets test running...\n");
+
+ // Create context and set MAX_SOCKETS to 1.
+ void *ctx = xs_init (1);
+ assert (ctx);
+ int max_sockets = 1;
+ int rc = xs_setctxopt (ctx, XS_CTX_MAX_SOCKETS, &max_sockets,
+ sizeof (max_sockets));
+ assert (rc == 0);
+
+ // First socket should be created OK.
+ void *s1 = xs_socket (ctx, XS_PUSH);
+ assert (s1);
+
+ // Creation of second socket should fail.
+ void *s2 = xs_socket (ctx, XS_PUSH);
+ assert (!s2 && errno == EMFILE);
+
+ // Clean up.
+ rc = xs_close (s1);
+ assert (rc == 0);
+ rc = xs_term (ctx);
+ assert (rc == 0);
+
+ return 0;
+}
+