diff options
| -rw-r--r-- | include/xs.h | 6 | ||||
| -rw-r--r-- | src/config.hpp | 3 | ||||
| -rw-r--r-- | src/ctx.cpp | 210 | ||||
| -rw-r--r-- | src/ctx.hpp | 16 | ||||
| -rw-r--r-- | src/xs.cpp | 11 | ||||
| -rw-r--r-- | tests/Makefile.am | 4 | ||||
| -rw-r--r-- | tests/linger.cpp | 2 | ||||
| -rw-r--r-- | tests/max_sockets.cpp | 51 | 
8 files changed, 209 insertions, 94 deletions
| diff --git a/include/xs.h b/include/xs.h index 05d98ab..3aa1045 100644 --- a/include/xs.h +++ b/include/xs.h @@ -143,11 +143,15 @@ XS_EXPORT int xs_getmsgopt (xs_msg_t *msg, int option, void *optval,      size_t *optvallen);  /******************************************************************************/ -/*  Crossroads infrastructure initialisation & termination.                   */ +/*  Crossroads context definition.                                            */  /******************************************************************************/ +#define XS_CTX_MAX_SOCKETS 1 +  XS_EXPORT void *xs_init (int io_threads);  XS_EXPORT int xs_term (void *context); +XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval, +    size_t optvallen);   /******************************************************************************/  /*  Crossroads socket definition.                                             */ diff --git a/src/config.hpp b/src/config.hpp index e4d948e..8107e17 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -29,9 +29,6 @@ namespace xs      enum       { -        //  Maximum number of sockets that can be opened at the same time. -        max_sockets = 512, -          //  Number of new messages in message pipe needed to trigger new memory          //  allocation. Setting this parameter to 256 decreases the impact of          //  memory allocation by approximately 99.6% diff --git a/src/ctx.cpp b/src/ctx.cpp index 97a3e62..b6298e2 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -40,59 +40,11 @@  xs::ctx_t::ctx_t (uint32_t io_threads_) :      tag (0xbadcafe0), -    terminating (false) +    starting (true), +    terminating (false), +    max_sockets (512), +    io_thread_count (io_threads_)  { -    //  Initialise the array of mailboxes. Additional three slots are for -    //  xs_term thread and reaper thread. -    slot_count = max_sockets + io_threads_ + 3; -    slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); -    alloc_assert (slots); - -    //  Initialise the infrastructure for xs_term thread. -    slots [term_tid] = &term_mailbox; - -    //  Create the reaper thread. -    reaper = new (std::nothrow) reaper_t (this, reaper_tid); -    alloc_assert (reaper); -    slots [reaper_tid] = reaper->get_mailbox (); -    reaper->start (); - -    //  Create I/O thread objects and launch them. -    for (uint32_t i = 2; i != io_threads_ + 2; i++) { -        io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); -        alloc_assert (io_thread); -        io_threads.push_back (io_thread); -        slots [i] = io_thread->get_mailbox (); -        io_thread->start (); -    } - -    //  In the unused part of the slot array, create a list of empty slots. -    for (int32_t i = (int32_t) slot_count - 1; -          i >= (int32_t) io_threads_ + 2; i--) { -        empty_slots.push_back (i); -        slots [i] = NULL; -    } - -    //  Create the socket to send logs to. -    log_socket = create_socket (XS_PUB); -    xs_assert (log_socket); -    int linger = 0; -    int rc = log_socket->setsockopt (XS_LINGER, &linger, sizeof (linger)); -    errno_assert (rc == 0); -    int hwm = 1; -    rc = log_socket->setsockopt (XS_SNDHWM, &hwm, sizeof (hwm)); -    errno_assert (rc == 0); -#if !defined XS_HAVE_WINDOWS -    rc = log_socket->connect ("ipc:///tmp/xslogs.ipc"); -    errno_assert (rc == 0); -#endif - -    //  Create the monitor object. -    io_thread_t *io_thread = choose_io_thread (0); -    xs_assert (io_thread); -    monitor = new (std::nothrow) monitor_t (io_thread); -    alloc_assert (monitor); -    monitor->start ();  }  bool xs::ctx_t::check_tag () @@ -128,60 +80,142 @@ xs::ctx_t::~ctx_t ()  int xs::ctx_t::terminate ()  { -    //  Check whether termination was already underway, but interrupted and now -    //  restarted. -    slot_sync.lock (); -    bool restarted = terminating; -    terminating = true; -    slot_sync.unlock (); +    if (!starting) { -    //  First attempt to terminate the context. -    if (!restarted) { +        //  Check whether termination was already underway, but interrupted and now +        //  restarted. +        slot_sync.lock (); +        bool restarted = terminating; +        terminating = true; +        slot_sync.unlock (); -        //  Close the monitor object. Wait for done command from the monitor. -        monitor->stop (); +        //  First attempt to terminate the context. +        if (!restarted) { + +            //  Close the monitor object. Wait for done command from the monitor. +            monitor->stop (); +            command_t cmd; +            int rc = term_mailbox.recv (&cmd, -1); +            xs_assert (rc == 0); +            xs_assert (cmd.type == command_t::done); + +            //  Close the logging socket. +            log_sync.lock (); +            rc = log_socket->close (); +            xs_assert (rc == 0); +            log_socket = NULL; +            log_sync.unlock (); + +            //  First send stop command to sockets so that any blocking calls +            //  can be interrupted. If there are no sockets we can ask reaper +            //  thread to stop. +            slot_sync.lock (); +            for (sockets_t::size_type i = 0; i != sockets.size (); i++) +                sockets [i]->stop (); +            if (sockets.empty ()) +                reaper->stop (); +            slot_sync.unlock (); +        } + +        //  Wait till reaper thread closes all the sockets.          command_t cmd;          int rc = term_mailbox.recv (&cmd, -1); +        if (rc == -1 && errno == EINTR) +            return -1;          xs_assert (rc == 0);          xs_assert (cmd.type == command_t::done); - -        //  Close the logging socket. -        log_sync.lock (); -        rc = log_socket->close (); -        xs_assert (rc == 0); -        log_socket = NULL; -        log_sync.unlock (); - -        //  First send stop command to sockets so that any blocking calls -        //  can be interrupted. If there are no sockets we can ask reaper -        //  thread to stop.          slot_sync.lock (); -        for (sockets_t::size_type i = 0; i != sockets.size (); i++) -            sockets [i]->stop (); -        if (sockets.empty ()) -            reaper->stop (); +        xs_assert (sockets.empty ());          slot_sync.unlock ();      } -    //  Wait till reaper thread closes all the sockets. -    command_t cmd; -    int rc = term_mailbox.recv (&cmd, -1); -    if (rc == -1 && errno == EINTR) -        return -1; -    xs_assert (rc == 0); -    xs_assert (cmd.type == command_t::done); -    slot_sync.lock (); -    xs_assert (sockets.empty ()); -    slot_sync.unlock (); -      //  Deallocate the resources.      delete this;      return 0;  } +int xs::ctx_t::setctxopt (int option_, const void *optval_, size_t optvallen_) +{ +    switch (option_) { +    case XS_CTX_MAX_SOCKETS: +        if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) { +            errno = EINVAL; +            return -1; +        } +        opt_sync.lock (); +        max_sockets = *((int*) optval_); +        opt_sync.unlock (); +        break; +    default: +        errno = EINVAL; +        return -1; +    } +    return 0; +} +  xs::socket_base_t *xs::ctx_t::create_socket (int type_)  { +    if (unlikely (starting)) { + +        starting = false; + +        //  Initialise the array of mailboxes. Additional three slots are for +        //  xs_term thread and reaper thread. +        opt_sync.lock (); +        int maxs = max_sockets; +        opt_sync.unlock (); +        slot_count = maxs + io_thread_count + 3; +        slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); +        alloc_assert (slots); + +        //  Initialise the infrastructure for xs_term thread. +        slots [term_tid] = &term_mailbox; + +        //  Create the reaper thread. +        reaper = new (std::nothrow) reaper_t (this, reaper_tid); +        alloc_assert (reaper); +        slots [reaper_tid] = reaper->get_mailbox (); +        reaper->start (); + +        //  Create I/O thread objects and launch them. +        for (uint32_t i = 2; i != io_thread_count + 2; i++) { +            io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); +            alloc_assert (io_thread); +            io_threads.push_back (io_thread); +            slots [i] = io_thread->get_mailbox (); +            io_thread->start (); +        } + +        //  In the unused part of the slot array, create a list of empty slots. +        for (int32_t i = (int32_t) slot_count - 1; +              i >= (int32_t) io_thread_count + 2; i--) { +            empty_slots.push_back (i); +            slots [i] = NULL; +        } + +        //  Create the socket to send logs to. +        log_socket = create_socket (XS_PUB); +        xs_assert (log_socket); +        int linger = 0; +        int rc = log_socket->setsockopt (XS_LINGER, &linger, sizeof (linger)); +        errno_assert (rc == 0); +        int hwm = 1; +        rc = log_socket->setsockopt (XS_SNDHWM, &hwm, sizeof (hwm)); +        errno_assert (rc == 0); +    #if !defined XS_HAVE_WINDOWS +        rc = log_socket->connect ("ipc:///tmp/xslogs.ipc"); +        errno_assert (rc == 0); +    #endif + +        //  Create the monitor object. +        io_thread_t *io_thread = choose_io_thread (0); +        xs_assert (io_thread); +        monitor = new (std::nothrow) monitor_t (io_thread); +        alloc_assert (monitor); +        monitor->start (); +    } +      slot_sync.lock ();      //  Once xs_term() was called, we can't create new sockets. diff --git a/src/ctx.hpp b/src/ctx.hpp index 8c3941f..56b5d4c 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -73,6 +73,9 @@ namespace xs          //  after the last one is closed.          int terminate (); +        //  Set context option. +        int setctxopt (int option_, const void *optval_, size_t optvallen_); +          //  Create and destroy a socket.          xs::socket_base_t *create_socket (int type_);          void destroy_socket (xs::socket_base_t *socket_); @@ -119,6 +122,10 @@ namespace xs          typedef std::vector <uint32_t> emtpy_slots_t;          emtpy_slots_t empty_slots; +        //  If true, xs_init has been called but no socket have been created +        //  yes. Launching of I/O threads is delayed. +        bool starting; +          //  If true, xs_term was already called.          bool terminating; @@ -160,6 +167,15 @@ namespace xs          xs::socket_base_t *log_socket;          mutex_t log_sync; +        //  Maximum number of sockets that can be opened at the same time. +        int max_sockets; + +        //  Number of I/O threads to launch. +        uint32_t io_thread_count; + +        //  Synchronisation of access to context options. +        mutex_t opt_sync; +          ctx_t (const ctx_t&);          const ctx_t &operator = (const ctx_t&);      }; @@ -169,6 +169,17 @@ int xs_term (void *ctx_)      return rc;  } +int xs_setctxopt (void *ctx_, int option_, const void *optval_, +    size_t optvallen_) +{ +    if (!ctx_ || !((xs::ctx_t*) ctx_)->check_tag ()) { +        errno = EFAULT; +        return -1; +    } + +    return ((xs::ctx_t*) ctx_)->setctxopt (option_, optval_, optvallen_); +} +  void *xs_socket (void *ctx_, int type_)  {      if (!ctx_ || !((xs::ctx_t*) ctx_)->check_tag ()) { diff --git a/tests/Makefile.am b/tests/Makefile.am index 85c7527..fba6271 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -17,7 +17,8 @@ noinst_PROGRAMS = pair_inproc \                    shutdown_stress \                    pair_ipc \                    reqrep_ipc \ -                  timeo +                  timeo \ +                  max_sockets  pair_inproc_SOURCES = pair_inproc.cpp testutil.hpp  pair_tcp_SOURCES = pair_tcp.cpp testutil.hpp @@ -34,5 +35,6 @@ shutdown_stress_SOURCES = shutdown_stress.cpp  pair_ipc_SOURCES = pair_ipc.cpp testutil.hpp  reqrep_ipc_SOURCES = reqrep_ipc.cpp testutil.hpp  timeo_SOURCES = timeo.cpp +max_sockets_SOURCES = max_sockets.cpp  TESTS = $(noinst_PROGRAMS) diff --git a/tests/linger.cpp b/tests/linger.cpp index 5c0480c..35de303 100644 --- a/tests/linger.cpp +++ b/tests/linger.cpp @@ -24,7 +24,7 @@ int XS_TEST_MAIN ()  {      fprintf (stderr, "linger test running...\n"); -    //  Create REQ/XREP wiring. +    //  Create socket.      void *ctx = xs_init (1);      assert (ctx);      void *s = xs_socket (ctx, XS_PUSH); diff --git a/tests/max_sockets.cpp b/tests/max_sockets.cpp new file mode 100644 index 0000000..4ce6cb9 --- /dev/null +++ b/tests/max_sockets.cpp @@ -0,0 +1,51 @@ +/* +    Copyright (c) 2012 250bpm s.r.o. +    Copyright (c) 2012 Other contributors as noted in the AUTHORS file + +    This file is part of Crossroads project. + +    Crossroads is free software; you can redistribute it and/or modify it under +    the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    Crossroads is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "testutil.hpp" + +int XS_TEST_MAIN () +{ +    fprintf (stderr, "max_sockets test running...\n"); + +    //  Create context and set MAX_SOCKETS to 1. +    void *ctx = xs_init (1); +    assert (ctx); +    int max_sockets = 1; +    int rc = xs_setctxopt (ctx, XS_CTX_MAX_SOCKETS, &max_sockets, +        sizeof (max_sockets)); +    assert (rc == 0); + +    //  First socket should be created OK. +    void *s1 = xs_socket (ctx, XS_PUSH); +    assert (s1); + +    //  Creation of second socket should fail. +    void *s2 = xs_socket (ctx, XS_PUSH); +    assert (!s2 && errno == EMFILE); + +    //  Clean up. +    rc = xs_close (s1); +    assert (rc == 0); +    rc = xs_term (ctx); +    assert (rc == 0); + +    return 0; +} + | 
