diff options
Diffstat (limited to 'src')
58 files changed, 185 insertions, 319 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index c2c6f65..7d3ee8d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -23,7 +23,6 @@ libxs_la_SOURCES = \      fd.hpp \      fq.hpp \      io_object.hpp \ -    io_thread.hpp \      ip.hpp \      ipc_address.hpp \      ipc_connecter.hpp \ @@ -86,7 +85,6 @@ libxs_la_SOURCES = \      err.cpp \      fq.cpp \      io_object.cpp \ -    io_thread.cpp \      ip.cpp \      ipc_address.cpp \      ipc_connecter.cpp \ diff --git a/src/ctx.cpp b/src/ctx.cpp index a8df950..8b27b31 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -31,7 +31,6 @@  #include "ctx.hpp"  #include "socket_base.hpp" -#include "io_thread.hpp"  #include "monitor.hpp"  #include "reaper.hpp"  #include "pipe.hpp" @@ -198,8 +197,8 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_)          //  Create I/O thread objects and launch them.          for (uint32_t i = 2; i != io_thread_count + 2; i++) { -            io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); -            alloc_assert (io_thread); +            poller_base_t *io_thread = poller_base_t::create (this, i); +            errno_assert (io_thread);              io_threads.push_back (io_thread);              slots [i] = io_thread->get_mailbox ();              io_thread->start (); @@ -227,7 +226,7 @@ xs::socket_base_t *xs::ctx_t::create_socket (int type_)      #endif          //  Create the monitor object. -        io_thread_t *io_thread = choose_io_thread (0); +        poller_base_t *io_thread = choose_io_thread (0);          xs_assert (io_thread);          monitor = new (std::nothrow) monitor_t (io_thread);          alloc_assert (monitor); @@ -302,7 +301,7 @@ void xs::ctx_t::send_command (uint32_t tid_, const command_t &command_)      slots [tid_]->send (command_);  } -xs::io_thread_t *xs::ctx_t::choose_io_thread (uint64_t affinity_) +xs::poller_base_t *xs::ctx_t::choose_io_thread (uint64_t affinity_)  {      if (io_threads.empty ())          return NULL; diff --git a/src/ctx.hpp b/src/ctx.hpp index e912443..c12fffc 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -40,7 +40,7 @@ namespace xs      class object_t;      class monitor_t; -    class io_thread_t; +    class poller_base_t;      class socket_base_t;      class reaper_t; @@ -86,7 +86,7 @@ namespace xs          //  Returns the I/O thread that is the least busy at the moment.          //  Affinity specifies which I/O threads are eligible (0 = all).          //  Returns NULL is no I/O thread is available. -        xs::io_thread_t *choose_io_thread (uint64_t affinity_); +        xs::poller_base_t *choose_io_thread (uint64_t affinity_);          //  Returns reaper thread object.          xs::object_t *get_reaper (); @@ -142,7 +142,7 @@ namespace xs          xs::reaper_t *reaper;          //  I/O threads. -        typedef std::vector <xs::io_thread_t*> io_threads_t; +        typedef std::vector <xs::poller_base_t*> io_threads_t;          io_threads_t io_threads;          //  Array of pointers to mailboxes for both application and I/O threads. diff --git a/src/epoll.cpp b/src/epoll.cpp index 7208a57..7ca4608 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -34,7 +34,8 @@  #include "config.hpp"  #include "err.hpp" -xs::epoll_t::epoll_t () : +xs::epoll_t::epoll_t (xs::ctx_t *ctx_, uint32_t tid_) : +    poller_base_t (ctx_, tid_),      stopping (false)  {      epoll_fd = epoll_create (1); diff --git a/src/epoll.hpp b/src/epoll.hpp index bac555b..232d049 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -36,6 +36,7 @@  namespace xs  { +    class ctx_t;      struct i_poll_events;      //  This class implements socket polling mechanism using the Linux-specific @@ -45,7 +46,7 @@ namespace xs      {      public: -        epoll_t (); +        epoll_t (xs::ctx_t *ctx_, uint32_t tid_);          ~epoll_t ();          //  "poller" concept. diff --git a/src/i_engine.hpp b/src/i_engine.hpp index fc03f0e..9106411 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -25,7 +25,7 @@  namespace xs  { -    class io_thread_t; +    class poller_base_t;      //  Abstract interface to be implemented by various engines. @@ -34,7 +34,7 @@ namespace xs          virtual ~i_engine () {}          //  Plug the engine to the session. -        virtual void plug (xs::io_thread_t *io_thread_, +        virtual void plug (xs::poller_base_t *io_thread_,              class session_base_t *session_) = 0;          //  Unplug the engine from the session. diff --git a/src/io_object.cpp b/src/io_object.cpp index abc8204..0c73571 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -20,10 +20,10 @@  */  #include "io_object.hpp" -#include "io_thread.hpp" +#include "poller_base.hpp"  #include "err.hpp" -xs::io_object_t::io_object_t (io_thread_t *io_thread_) : +xs::io_object_t::io_object_t (poller_base_t *io_thread_) :      poller (NULL)  {      if (io_thread_) @@ -34,13 +34,13 @@ xs::io_object_t::~io_object_t ()  {  } -void xs::io_object_t::plug (io_thread_t *io_thread_) +void xs::io_object_t::plug (poller_base_t *io_thread_)  {      xs_assert (io_thread_);      xs_assert (!poller);      //  Retrieve the poller from the thread we are running in. -    poller = io_thread_->get_poller (); +    poller = io_thread_;  }  void xs::io_object_t::unplug () diff --git a/src/io_object.hpp b/src/io_object.hpp index 0abd457..0749ceb 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -30,8 +30,6 @@  namespace xs  { -    class io_thread_t; -      //  Simple base class for objects that live in I/O threads.      //  It makes communication with the poller object easier and      //  makes defining unneeded event handlers unnecessary. @@ -40,12 +38,12 @@ namespace xs      {      public: -        io_object_t (xs::io_thread_t *io_thread_ = NULL); +        io_object_t (xs::poller_base_t *io_thread_ = NULL);          ~io_object_t ();          //  When migrating an object from one I/O thread to another, first          //  unplug it, then migrate it, then plug it to the new thread. -        void plug (xs::io_thread_t *io_thread_); +        void plug (xs::poller_base_t *io_thread_);          void unplug ();      protected: diff --git a/src/io_thread.cpp b/src/io_thread.cpp deleted file mode 100644 index 77cebfe..0000000 --- a/src/io_thread.cpp +++ /dev/null @@ -1,108 +0,0 @@ -/* -    Copyright (c) 2009-2012 250bpm s.r.o. -    Copyright (c) 2007-2009 iMatix Corporation -    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - -    This file is part of Crossroads project. - -    Crossroads is free software; you can redistribute it and/or modify it under -    the terms of the GNU Lesser General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    Crossroads is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    GNU Lesser General Public License for more details. - -    You should have received a copy of the GNU Lesser General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <new> - -#include "io_thread.hpp" -#include "platform.hpp" -#include "err.hpp" -#include "ctx.hpp" - -xs::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) : -    object_t (ctx_, tid_) -{ -    poller = poller_base_t::create (); -    xs_assert (poller); - -    mailbox_handle = poller->add_fd (mailbox.get_fd (), this); -    poller->set_pollin (mailbox_handle); -} - -xs::io_thread_t::~io_thread_t () -{ -    delete poller; -} - -void xs::io_thread_t::start () -{ -    //  Start the underlying I/O thread. -    poller->start (); -} - -void xs::io_thread_t::stop () -{ -    send_stop (); -} - -xs::mailbox_t *xs::io_thread_t::get_mailbox () -{ -    return &mailbox; -} - -int xs::io_thread_t::get_load () -{ -    return poller->get_load (); -} - -void xs::io_thread_t::in_event (fd_t fd_) -{ -    //  TODO: Do we want to limit number of commands I/O thread can -    //  process in a single go? - -    while (true) { - -        //  Get the next command. If there is none, exit. -        command_t cmd; -        int rc = mailbox.recv (&cmd, 0); -        if (rc != 0 && errno == EINTR) -            continue; -        if (rc != 0 && errno == EAGAIN) -            break; -        errno_assert (rc == 0); - -        //  Process the command. -        cmd.destination->process_command (cmd); -    } -} - -void xs::io_thread_t::out_event (fd_t fd_) -{ -    //  We are never polling for POLLOUT here. This function is never called. -    xs_assert (false); -} - -void xs::io_thread_t::timer_event (handle_t handle_) -{ -    //  No timers here. This function is never called. -    xs_assert (false); -} - -xs::poller_base_t *xs::io_thread_t::get_poller () -{ -    xs_assert (poller); -    return poller; -} - -void xs::io_thread_t::process_stop () -{ -    poller->rm_fd (mailbox_handle); -    poller->stop (); -} diff --git a/src/io_thread.hpp b/src/io_thread.hpp deleted file mode 100644 index 379f420..0000000 --- a/src/io_thread.hpp +++ /dev/null @@ -1,90 +0,0 @@ -/* -    Copyright (c) 2009-2012 250bpm s.r.o. -    Copyright (c) 2007-2009 iMatix Corporation -    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - -    This file is part of Crossroads project. - -    Crossroads is free software; you can redistribute it and/or modify it under -    the terms of the GNU Lesser General Public License as published by -    the Free Software Foundation; either version 3 of the License, or -    (at your option) any later version. - -    Crossroads is distributed in the hope that it will be useful, -    but WITHOUT ANY WARRANTY; without even the implied warranty of -    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -    GNU Lesser General Public License for more details. - -    You should have received a copy of the GNU Lesser General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __XS_IO_THREAD_HPP_INCLUDED__ -#define __XS_IO_THREAD_HPP_INCLUDED__ - -#include <vector> - -#include "stdint.hpp" -#include "object.hpp" -#include "poller_base.hpp" -#include "mailbox.hpp" - -namespace xs -{ - -    class ctx_t; - -    //  Generic part of the I/O thread. Polling-mechanism-specific features -    //  are implemented in separate "polling objects". - -    class io_thread_t : public object_t, public i_poll_events -    { -    public: - -        io_thread_t (xs::ctx_t *ctx_, uint32_t tid_); - -        //  Clean-up. If the thread was started, it's neccessary to call 'stop' -        //  before invoking destructor. Otherwise the destructor would hang up. -        ~io_thread_t (); - -        //  Launch the physical thread. -        void start (); - -        //  Ask underlying thread to stop. -        void stop (); - -        //  Returns mailbox associated with this I/O thread. -        mailbox_t *get_mailbox (); - -        //  i_poll_events implementation. -        void in_event (fd_t fd_); -        void out_event (fd_t fd_); -        void timer_event (handle_t handle_); - -        //  Used by io_objects to retrieve the assciated poller object. -        poller_base_t *get_poller (); - -        //  Command handlers. -        void process_stop (); - -        //  Returns load experienced by the I/O thread. -        int get_load (); - -    private: - -        //  I/O thread accesses incoming commands via this mailbox. -        mailbox_t mailbox; - -        //  Handle associated with mailbox' file descriptor. -        handle_t mailbox_handle; - -        //  I/O multiplexing is performed using a poller object. -        poller_base_t *poller; - -        io_thread_t (const io_thread_t&); -        const io_thread_t &operator = (const io_thread_t&); -    }; - -} - -#endif diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index f6a01c3..b65887a 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -26,7 +26,7 @@  #include <string>  #include "stream_engine.hpp" -#include "io_thread.hpp" +#include "poller_base.hpp"  #include "platform.hpp"  #include "random.hpp"  #include "err.hpp" @@ -37,7 +37,7 @@  #include <sys/socket.h>  #include <sys/un.h> -xs::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, +xs::ipc_connecter_t::ipc_connecter_t (class poller_base_t *io_thread_,        class session_base_t *session_, const options_t &options_,        const char *address_, bool wait_) :      own_t (io_thread_, options_), diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index 2a8e8e4..46e260c 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -34,7 +34,7 @@  namespace xs  { -    class io_thread_t; +    class poller_base_t;      class session_base_t;      class ipc_connecter_t : public own_t, public io_object_t @@ -43,7 +43,7 @@ namespace xs          //  If 'delay' is true connecter first waits for a while, then starts          //  connection process. -        ipc_connecter_t (xs::io_thread_t *io_thread_, +        ipc_connecter_t (xs::poller_base_t *io_thread_,              xs::session_base_t *session_, const options_t &options_,              const char *address_, bool delay_);          ~ipc_connecter_t (); diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index db49528..bd55cc7 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -28,7 +28,7 @@  #include "stream_engine.hpp"  #include "ipc_address.hpp" -#include "io_thread.hpp" +#include "poller_base.hpp"  #include "session_base.hpp"  #include "config.hpp"  #include "err.hpp" @@ -39,7 +39,7 @@  #include <fcntl.h>  #include <sys/un.h> -xs::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_, +xs::ipc_listener_t::ipc_listener_t (poller_base_t *io_thread_,        socket_base_t *socket_, const options_t &options_) :      own_t (io_thread_, options_),      io_object_t (io_thread_), @@ -83,7 +83,7 @@ void xs::ipc_listener_t::in_event (fd_t fd_)      //  Choose I/O thread to run connecter in. Given that we are already      //  running in an I/O thread, there must be at least one available. -    io_thread_t *io_thread = choose_io_thread (options.affinity); +    poller_base_t *io_thread = choose_io_thread (options.affinity);      xs_assert (io_thread);      //  Create and launch a session object.  diff --git a/src/ipc_listener.hpp b/src/ipc_listener.hpp index b599bff..24b96fb 100644 --- a/src/ipc_listener.hpp +++ b/src/ipc_listener.hpp @@ -35,14 +35,14 @@  namespace xs  { -    class io_thread_t; +    class poller_base_t;      class socket_base_t;      class ipc_listener_t : public own_t, public io_object_t      {      public: -        ipc_listener_t (xs::io_thread_t *io_thread_, +        ipc_listener_t (xs::poller_base_t *io_thread_,              xs::socket_base_t *socket_, const options_t &options_);          ~ipc_listener_t (); diff --git a/src/monitor.cpp b/src/monitor.cpp index b29c874..14a31f2 100644 --- a/src/monitor.cpp +++ b/src/monitor.cpp @@ -19,12 +19,12 @@  */  #include "monitor.hpp" -#include "io_thread.hpp" +#include "poller_base.hpp"  #include "options.hpp"  #include "random.hpp"  #include "err.hpp" -xs::monitor_t::monitor_t (xs::io_thread_t *io_thread_) : +xs::monitor_t::monitor_t (xs::poller_base_t *io_thread_) :      own_t (io_thread_, options_t ()),      io_object_t (io_thread_),      timer (NULL) diff --git a/src/monitor.hpp b/src/monitor.hpp index 73bc31c..91d441a 100644 --- a/src/monitor.hpp +++ b/src/monitor.hpp @@ -30,14 +30,14 @@  namespace xs  { -    class io_thread_t; +    class poller_base_t;      class socket_base_t;      class monitor_t : public own_t, public io_object_t      {      public: -        monitor_t (xs::io_thread_t *io_thread_); +        monitor_t (xs::poller_base_t *io_thread_);          ~monitor_t ();          void start (); diff --git a/src/object.cpp b/src/object.cpp index 2d1f5aa..df279f5 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -26,7 +26,7 @@  #include "ctx.hpp"  #include "err.hpp"  #include "pipe.hpp" -#include "io_thread.hpp" +#include "poller_base.hpp"  #include "session_base.hpp"  #include "socket_base.hpp" @@ -149,7 +149,7 @@ void xs::object_t::destroy_socket (socket_base_t *socket_)      ctx->destroy_socket (socket_);  } -xs::io_thread_t *xs::object_t::choose_io_thread (uint64_t affinity_) +xs::poller_base_t *xs::object_t::choose_io_thread (uint64_t affinity_)  {      return ctx->choose_io_thread (affinity_);  } diff --git a/src/object.hpp b/src/object.hpp index b1bc0c3..14dad49 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -34,7 +34,7 @@ namespace xs      class pipe_t;      class socket_base_t;      class session_base_t; -    class io_thread_t; +    class poller_base_t;      class own_t;      //  Base class for all objects that participate in inter-thread @@ -62,7 +62,7 @@ namespace xs          void destroy_socket (xs::socket_base_t *socket_);          //  Chooses least loaded I/O thread. -        xs::io_thread_t *choose_io_thread (uint64_t affinity_); +        xs::poller_base_t *choose_io_thread (uint64_t affinity_);          //  Logging related functions.          void log (int sid_, const char *text_); diff --git a/src/own.cpp b/src/own.cpp index a60d9d0..3047d7b 100644 --- a/src/own.cpp +++ b/src/own.cpp @@ -20,7 +20,7 @@  #include "own.hpp"  #include "err.hpp" -#include "io_thread.hpp" +#include "poller_base.hpp"  xs::own_t::own_t (class ctx_t *parent_, uint32_t tid_) :      object_t (parent_, tid_), @@ -32,7 +32,7 @@ xs::own_t::own_t (class ctx_t *parent_, uint32_t tid_) :  {  } -xs::own_t::own_t (io_thread_t *io_thread_, const options_t &options_) : +xs::own_t::own_t (poller_base_t *io_thread_, const options_t &options_) :      object_t (io_thread_),      options (options_),      terminating (false), diff --git a/src/own.hpp b/src/own.hpp index d0afd62..de0320a 100644 --- a/src/own.hpp +++ b/src/own.hpp @@ -33,7 +33,7 @@ namespace xs  {      class ctx_t; -    class io_thread_t; +    class poller_base_t;      //  Base class for objects forming a part of ownership hierarchy.      //  It handles initialisation and destruction of such objects. @@ -50,7 +50,7 @@ namespace xs          own_t (xs::ctx_t *parent_, uint32_t tid_);          //  The object is living within I/O thread. -        own_t (xs::io_thread_t *io_thread_, const options_t &options_); +        own_t (xs::poller_base_t *io_thread_, const options_t &options_);          //  When another owned object wants to send command to this object          //  it calls this function to let it know it should not shut down diff --git a/src/pair.cpp b/src/pair.cpp index dd14d8d..c35084f 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -117,7 +117,7 @@ bool xs::pair_t::xhas_out ()      return result;  } -xs::pair_session_t::pair_session_t (io_thread_t *io_thread_, bool connect_, +xs::pair_session_t::pair_session_t (poller_base_t *io_thread_, bool connect_,        socket_base_t *socket_, const options_t &options_,        const char *protocol_, const char *address_) :      session_base_t (io_thread_, connect_, socket_, options_, protocol_, diff --git a/src/pair.hpp b/src/pair.hpp index 8ae3acf..658e8e9 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -63,7 +63,7 @@ namespace xs      {      public: -        pair_session_t (xs::io_thread_t *io_thread_, bool connect_, +        pair_session_t (xs::poller_base_t *io_thread_, bool connect_, | 
