summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-02-16 10:06:36 +0900
committerMartin Sustrik <sustrik@250bpm.com>2012-02-16 10:06:36 +0900
commita376c91494c954491fd424f3b51ab4579b9846a2 (patch)
treee306cef422138378720daa843703fc94331ad08e /src
parentfb1209c72bbfd431b61fa4588785586d24ef67c9 (diff)
XS_CTX_MAX_SOCKETS option implemented
To implement context options properly, initialisation of context is postponed till creation of the first socket. In the meantime it is possible to set socket options. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/config.hpp3
-rw-r--r--src/ctx.cpp210
-rw-r--r--src/ctx.hpp16
-rw-r--r--src/xs.cpp11
4 files changed, 149 insertions, 91 deletions
diff --git a/src/config.hpp b/src/config.hpp
index e4d948e..8107e17 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -29,9 +29,6 @@ namespace xs
enum
{
- // Maximum number of sockets that can be opened at the same time.
- max_sockets = 512,
-
// Number of new messages in message pipe needed to trigger new memory
// allocation. Setting this parameter to 256 decreases the impact of
// memory allocation by approximately 99.6%
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 97a3e62..b6298e2 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -40,59 +40,11 @@
xs::ctx_t::ctx_t (uint32_t io_threads_) :
tag (0xbadcafe0),
- terminating (false)
+ starting (true),
+ terminating (false),
+ max_sockets (512),
+ io_thread_count (io_threads_)
{
- // Initialise the array of mailboxes. Additional three slots are for
- // xs_term thread and reaper thread.
- slot_count = max_sockets + io_threads_ + 3;
- slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
- alloc_assert (slots);
-
- // Initialise the infrastructure for xs_term thread.
- slots [term_tid] = &term_mailbox;
-
- // Create the reaper thread.
- reaper = new (std::nothrow) reaper_t (this, reaper_tid);
- alloc_assert (reaper);
- slots [reaper_tid] = reaper->get_mailbox ();
- reaper->start ();
-
- // Create I/O thread objects and launch them.
- for (uint32_t i = 2; i != io_threads_ + 2; i++) {
- io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
- alloc_assert (io_thread);
- io_threads.push_back (io_thread);
- slots [i] = io_thread->get_mailbox ();
- io_thread->start ();
- }
-
- // In the unused part of the slot array, create a list of empty slots.
- for (int32_t i = (int32_t) slot_count - 1;
- i >= (int32_t) io_threads_ + 2; i--) {
- empty_slots.push_back (i);
- slots [i] = NULL;
- }
-
- // Create the socket to send logs to.
- log_socket = create_socket (XS_PUB);
- xs_assert (log_socket);
- int linger = 0;
- int rc = log_socket->setsockopt (XS_LINGER, &linger, sizeof (linger));
- errno_assert (rc == 0);
- int hwm = 1;
- rc = log_socket->setsockopt (XS_SNDHWM, &hwm, sizeof (hwm));
- errno_assert (rc == 0);
-#if !defined XS_HAVE_WINDOWS
- rc = log_socket->connect ("ipc:///tmp/xslogs.ipc");
- errno_assert (rc == 0);
-#endif
-
- // Create the monitor object.
- io_thread_t *io_thread = choose_io_thread (0);
- xs_assert (io_thread);
- monitor = new (std::nothrow) monitor_t (io_thread);
- alloc_assert (monitor);
- monitor->start ();
}
bool xs::ctx_t::check_tag ()
@@ -128,60 +80,142 @@ xs::ctx_t::~ctx_t ()
int xs::ctx_t::terminate ()
{
- // Check whether termination was already underway, but interrupted and now
- // restarted.
- slot_sync.lock ();
- bool restarted = terminating;
- terminating = true;
- slot_sync.unlock ();
+ if (!starting) {
- // First attempt to terminate the context.
- if (!restarted) {
+ // Check whether termination was already underway, but interrupted and now
+ // restarted.
+ slot_sync.lock ();
+ bool restarted = terminating;
+ terminating = true;
+ slot_sync.unlock ();
- // Close the monitor object. Wait for done command from the monitor.
- monitor->stop ();
+ // First attempt to terminate the context.
+ if (!restarted) {
+
+ // Close the monitor object. Wait for done command from the monitor.
+ monitor->stop ();
+ command_t cmd;
+ int rc = term_mailbox.recv (&cmd, -1);
+ xs_assert (rc == 0);
+ xs_assert (cmd.type == command_t::done);
+
+ // Close the logging socket.
+ log_sync.lock ();
+ rc = log_socket->close ();
+ xs_assert (rc == 0);
+ log_socket = NULL;
+ log_sync.unlock ();
+
+ // First send stop command to sockets so that any blocking calls
+ // can be interrupted. If there are no sockets we can ask reaper
+ // thread to stop.
+ slot_sync.lock ();
+ for (sockets_t::size_type i = 0; i != sockets.size (); i++)
+ sockets [i]->stop ();
+ if (sockets.empty ())
+ reaper->stop ();
+ slot_sync.unlock ();
+ }
+
+ // Wait till reaper thread closes all the sockets.
command_t cmd;
int rc = term_mailbox.recv (&cmd, -1);
+ if (rc == -1 && errno == EINTR)
+ return -1;
xs_assert (rc == 0);
xs_assert (cmd.type == command_t::done);
-
- // Close the logging socket.
- log_sync.lock ();
- rc = log_socket->close ();
- xs_assert (rc == 0);
- log_socket = NULL;
- log_sync.unlock ();
-
- // First send stop command to sockets so that any blocking calls
- // can be interrupted. If there are no sockets we can ask reaper
- // thread to stop.
slot_sync.lock ();
- for (sockets_t::size_type i = 0; i != sockets.size (); i++)
- sockets [i]->stop ();
- if (sockets.empty ())
- reaper->stop ();
+ xs_assert (sockets.empty ());
slot_sync.unlock ();
}
- // Wait till reaper thread closes all the sockets.
- command_t cmd;
- int rc = term_mailbox.recv (&cmd, -1);
- if (rc == -1 && errno == EINTR)
- return -1;
- xs_assert (rc == 0);
- xs_assert (cmd.type == command_t::done);
- slot_sync.lock ();
- xs_assert (sockets.empty ());
- slot_sync.unlock ();
-
// Deallocate the resources.
delete this;
return 0;
}
+int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_)
+{
+ switch (option_) {
+ case XS_CTX_MAX_SOCKETS:
+ if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ opt_sync.lock ();
+ max_sockets = *((int*) optval_);
+ opt_sync.unlock ();
+ break;
+ default:
+ errno = EINVAL;
+ return -1;
+ }
+ return 0;
+}
+
xs::socket_base_t *xs::ctx_t::create_socket (int type_)
{
+ if (unlikely (starting)) {
+
+ starting = false;
+
+ // Initialise the array of mailboxes. Additional three slots are for
+ // xs_term thread and reaper thread.
+ opt_sync.lock ();
+ int maxs = max_sockets;
+ opt_sync.unlock ();
+ slot_count = maxs + io_thread_count + 3;
+ slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
+ alloc_assert (slots);
+
+ // Initialise the infrastructure for xs_term thread.
+ slots [term_tid] = &term_mailbox;
+
+ // Create the reaper thread.
+ reaper = new (std::nothrow) reaper_t (this, reaper_tid);
+ alloc_assert (reaper);
+ slots [reaper_tid] = reaper->get_mailbox ();
+ reaper->start ();
+
+ // Create I/O thread objects and launch them.
+ for (uint32_t i = 2; i != io_thread_count + 2; i++) {
+ io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
+ alloc_assert (io_thread);
+ io_threads.push_back (io_thread);
+ slots [i] = io_thread->get_mailbox ();
+ io_thread->start ();
+ }
+
+ // In the unused part of the slot array, create a list of empty slots.
+ for (int32_t i = (int32_t) slot_count - 1;
+ i >= (int32_t) io_thread_count + 2; i--) {
+ empty_slots.push_back (i);
+ slots [i] = NULL;
+ }
+
+ // Create the socket to send logs to.
+ log_socket = create_socket (XS_PUB);
+ xs_assert (log_socket);
+ int linger = 0;
+ int rc = log_socket->setsockopt (XS_LINGER, &linger, sizeof (linger));
+ errno_assert (rc == 0);
+ int hwm = 1;
+ rc = log_socket->setsockopt (XS_SNDHWM, &hwm, sizeof (hwm));
+ errno_assert (rc == 0);
+ #if !defined XS_HAVE_WINDOWS
+ rc = log_socket->connect ("ipc:///tmp/xslogs.ipc");
+ errno_assert (rc == 0);
+ #endif
+
+ // Create the monitor object.
+ io_thread_t *io_thread = choose_io_thread (0);
+ xs_assert (io_thread);
+ monitor = new (std::nothrow) monitor_t (io_thread);
+ alloc_assert (monitor);
+ monitor->start ();
+ }
+
slot_sync.lock ();
// Once xs_term() was called, we can't create new sockets.
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 8c3941f..56b5d4c 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -73,6 +73,9 @@ namespace xs
// after the last one is closed.
int terminate ();
+ // Set context option.
+ int setctxopt (int option_, const void *optval_, size_t optvallen_);
+
// Create and destroy a socket.
xs::socket_base_t *create_socket (int type_);
void destroy_socket (xs::socket_base_t *socket_);
@@ -119,6 +122,10 @@ namespace xs
typedef std::vector <uint32_t> emtpy_slots_t;
emtpy_slots_t empty_slots;
+ // If true, xs_init has been called but no socket have been created
+ // yes. Launching of I/O threads is delayed.
+ bool starting;
+
// If true, xs_term was already called.
bool terminating;
@@ -160,6 +167,15 @@ namespace xs
xs::socket_base_t *log_socket;
mutex_t log_sync;
+ // Maximum number of sockets that can be opened at the same time.
+ int max_sockets;
+
+ // Number of I/O threads to launch.
+ uint32_t io_thread_count;
+
+ // Synchronisation of access to context options.
+ mutex_t opt_sync;
+
ctx_t (const ctx_t&);
const ctx_t &operator = (const ctx_t&);
};
diff --git a/src/xs.cpp b/src/xs.cpp
index 8fc8dd2..ad5bd97 100644
--- a/src/xs.cpp
+++ b/src/xs.cpp
@@ -169,6 +169,17 @@ int xs_term (void *ctx_)
return rc;
}
+int xs_setctxopt (void *ctx_, int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (!ctx_ || !((xs::ctx_t*) ctx_)->check_tag ()) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ return ((xs::ctx_t*) ctx_)->setctxopt (option_, optval_, optvallen_);
+}
+
void *xs_socket (void *ctx_, int type_)
{
if (!ctx_ || !((xs::ctx_t*) ctx_)->check_tag ()) {