diff options
| -rw-r--r-- | src/dispatcher.cpp | 23 | ||||
| -rw-r--r-- | src/dispatcher.hpp | 16 | ||||
| -rw-r--r-- | src/object.cpp | 12 | 
3 files changed, 30 insertions, 21 deletions
| diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 806866c..167b823 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -67,6 +67,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,          signalers.push_back (io_thread->get_signaler ());      } +    //  Create the administrative thread. Nothing special is needed. NULL +    //  is used instead of signaler given that as for now, administrative +    //  thread doesn't receive any commands. The only thing it is used for +    //  is sending 'stop' command to I/O threads on shutdown. +    signalers.push_back (NULL); +      //  Create command pipe matrix.      command_pipes = new  (std::nothrow) command_pipe_t [signalers.size () *           signalers.size ()]; @@ -159,6 +165,23 @@ void zmq::dispatcher_t::destroy_socket ()         delete this;  } +void zmq::dispatcher_t::write (int source_, int destination_, +    const command_t &command_) +{ +    command_pipe_t &pipe = +        command_pipes [source_ * signalers.size () + destination_]; +    pipe.write (command_); +    if (!pipe.flush ()) +        signalers [destination_]->signal (source_); +} + +bool zmq::dispatcher_t::read (int source_,  int destination_, +    command_t *command_) +{ +    return command_pipes [source_ * signalers.size () + +        destination_].read (command_); +} +  zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread ()  {      //  Check whether thread ID is already assigned. If so, return it. diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index 799e65b..2c11c57 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -70,23 +70,11 @@ namespace zmq          int thread_slot_count ();          //  Send command from the source to the destination. -        inline void write (int source_, int destination_, -            const command_t &command_) -        { -            command_pipe_t &pipe = -                command_pipes [source_ * signalers.size () + destination_]; -            pipe.write (command_); -            if (!pipe.flush ()) -                signalers [destination_]->signal (source_); -        } +        void write (int source_, int destination_, const command_t &command_);          //  Receive command from the source. Returns false if there is no          //  command available. -        inline bool read (int source_,  int destination_, command_t *command_) -        { -            return command_pipes [source_ * signalers.size () + -                destination_].read (command_); -        } +        bool read (int source_,  int destination_, command_t *command_);          //  Returns the I/O thread that is the least busy at the moment.          //  Taskset specifies which I/O threads are eligible (0 = all). diff --git a/src/object.cpp b/src/object.cpp index 20e712a..a977f39 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -146,12 +146,13 @@ zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)  void zmq::object_t::send_stop ()  { -    //  Send command goes always to the current object. To-self pipe is -    //  used exclusively for sending this command. +    //  'stop' command goes always from administrative thread to +    //  the current object.  +    int admin_thread_id = dispatcher->thread_slot_count () - 1;      command_t cmd;      cmd.destination = this;      cmd.type = command_t::stop; -    dispatcher->write (thread_slot, thread_slot, cmd); +    dispatcher->write (admin_thread_id, thread_slot, cmd);  }  void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_) @@ -314,9 +315,6 @@ void zmq::object_t::process_seqnum ()  void zmq::object_t::send_command (command_t &cmd_)  {      int destination_thread_slot = cmd_.destination->get_thread_slot (); -    if (destination_thread_slot == thread_slot) -        cmd_.destination->process_command (cmd_); -    else  -        dispatcher->write (thread_slot, destination_thread_slot, cmd_); +    dispatcher->write (thread_slot, destination_thread_slot, cmd_);  } | 
