From a8b410e66c3c75809c8e9c01dd3e35c579f02347 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 8 Aug 2009 16:01:58 +0200 Subject: lockfree interaction patter for 3 theads implemented --- src/Makefile.am | 14 +++-- src/app_thread.cpp | 37 +++++++++--- src/app_thread.hpp | 13 +++-- src/command.hpp | 59 ++++++++----------- src/context.cpp | 153 ------------------------------------------------- src/context.hpp | 122 --------------------------------------- src/dispatcher.cpp | 149 ++++++++++++++++++++++++++++++++++++++++++++++++ src/dispatcher.hpp | 122 +++++++++++++++++++++++++++++++++++++++ src/err.hpp | 6 ++ src/i_api.hpp | 34 +++++------ src/i_socket.hpp | 36 ------------ src/io_object.cpp | 41 +++++++++++++ src/io_object.hpp | 62 ++++++++++++++++++++ src/io_thread.cpp | 8 +-- src/io_thread.hpp | 2 +- src/mutex.hpp | 30 +++++----- src/object.cpp | 158 +++++++++++++++------------------------------------ src/object.hpp | 42 +++++--------- src/socket_base.cpp | 129 +++++++++++++++++++++++++++++++++++++++++ src/socket_base.hpp | 72 +++++++++++++++++++++++ src/zmq.cpp | 19 ++++--- src/zmq_listener.cpp | 35 ++++++++++++ src/zmq_listener.hpp | 46 +++++++++++++++ 23 files changed, 845 insertions(+), 544 deletions(-) delete mode 100644 src/context.cpp delete mode 100644 src/context.hpp create mode 100644 src/dispatcher.cpp create mode 100644 src/dispatcher.hpp delete mode 100644 src/i_socket.hpp create mode 100644 src/io_object.cpp create mode 100644 src/io_object.hpp create mode 100644 src/socket_base.cpp create mode 100644 src/socket_base.hpp create mode 100644 src/zmq_listener.cpp create mode 100644 src/zmq_listener.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index bde9c39..47037a2 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -7,14 +7,15 @@ libzmq_la_SOURCES = \ atomic_ptr.hpp \ command.hpp \ config.hpp \ - context.hpp \ decoder.hpp \ devpoll.hpp \ + dispatcher.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ fd.hpp \ fd_signaler.hpp \ + io_object.hpp \ io_thread.hpp \ ip.hpp \ i_api.hpp \ @@ -31,6 +32,7 @@ libzmq_la_SOURCES = \ poll.hpp \ select.hpp \ simple_semaphore.hpp \ + socket_base.hpp \ stdint.hpp \ tcp_connecter.hpp \ tcp_listener.hpp \ @@ -42,25 +44,29 @@ libzmq_la_SOURCES = \ ypipe.hpp \ ypollset.hpp \ yqueue.hpp \ + zmq_listener.hpp \ app_thread.cpp \ - context.cpp \ - devpoll.hpp \ + devpoll.cpp \ + dispatcher.cpp \ epoll.cpp \ err.cpp \ fd_signaler.cpp \ + io_object.cpp \ io_thread.cpp \ ip.cpp \ kqueue.cpp \ object.cpp \ poll.cpp \ select.cpp \ + socket_base.cpp \ tcp_connecter.cpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ uuid.cpp \ ypollset.cpp \ - zmq.cpp + zmq.cpp \ + zmq_listener.cpp libzmq_la_LDFLAGS = -version-info 0:0:0 libzmq_la_CXXFLAGS = -Wall -pedantic -Werror @ZMQ_EXTRA_CXXFLAGS@ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 23a055a..3f76970 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -17,6 +17,8 @@ along with this program. If not, see . */ +#include + #include "../include/zmq.h" #if defined ZMQ_HAVE_WINDOWS @@ -26,10 +28,12 @@ #endif #include "app_thread.hpp" -#include "context.hpp" +#include "i_api.hpp" +#include "dispatcher.hpp" #include "err.hpp" #include "pipe.hpp" #include "config.hpp" +#include "socket_base.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -39,8 +43,8 @@ #define ZMQ_DELAY_COMMANDS #endif -zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) : - object_t (context_, thread_slot_), +zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : + object_t (dispatcher_, thread_slot_), tid (0), last_processing_time (0) { @@ -48,13 +52,9 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) : zmq::app_thread_t::~app_thread_t () { - // Ask all the sockets to start termination, then wait till it is complete. - for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) - (*it)->stop (); + // Destroy all the sockets owned by this application thread. for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) delete *it; - - delete this; } zmq::i_signaler *zmq::app_thread_t::get_signaler () @@ -123,9 +123,28 @@ void zmq::app_thread_t::process_commands (bool block_) for (int i = 0; i != thread_slot_count (); i++) { if (signals & (ypollset_t::signals_t (1) << i)) { command_t cmd; - while (context->read (i, get_thread_slot (), &cmd)) + while (dispatcher->read (i, get_thread_slot (), &cmd)) cmd.destination->process_command (cmd); } } } } + +zmq::i_api *zmq::app_thread_t::create_socket (int type_) +{ + // TODO: type is ignored for the time being. + socket_base_t *s = new socket_base_t (this); + zmq_assert (s); + sockets.push_back (s); + return s; +} + +void zmq::app_thread_t::remove_socket (i_api *socket_) +{ + // TODO: To speed this up we can possibly use the system where each socket + // holds its index (see I/O scheduler implementation). + sockets_t::iterator it = std::find (sockets.begin (), sockets.end (), + socket_); + zmq_assert (it != sockets.end ()); + sockets.erase (it); +} diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 31679b8..59e4a25 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -22,7 +22,6 @@ #include -#include "i_socket.hpp" #include "stdint.hpp" #include "object.hpp" #include "ypollset.hpp" @@ -34,7 +33,7 @@ namespace zmq { public: - app_thread_t (class context_t *context_, int thread_slot_); + app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); ~app_thread_t (); @@ -42,7 +41,7 @@ namespace zmq i_signaler *get_signaler (); // Nota bene: Following two functions are accessed from different - // threads. The caller (context) is responsible for synchronisation + // threads. The caller (dispatcher) is responsible for synchronisation // of accesses. // Returns true is current thread is associated with the app thread. @@ -56,10 +55,16 @@ namespace zmq // set to true, returns only after at least one command was processed. void process_commands (bool block_); + // Create a socket of a specified type. + struct i_api *create_socket (int type_); + + // Unregister the socket from the app_thread (called by socket itself). + void remove_socket (struct i_api *socket_); + private: // All the sockets created from this application thread. - typedef std::vector sockets_t; + typedef std::vector sockets_t; sockets_t sockets; // Thread ID associated with this slot. diff --git a/src/command.hpp b/src/command.hpp index 69c4e57..de94ca3 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -35,60 +35,49 @@ namespace zmq enum type_t { stop, + plug, + own, bind, - head, - tail, - reg, - reg_and_bind, - unreg, - engine, - terminate, - terminate_ack + term_req, + term, + term_ack + } type; union { + // Sent to I/O thread to let it know that it should + // terminate itself. struct { } stop; + // Sent to I/O object to make it register with its I/O thread. struct { - class pipe_reader_t *reader; - class session_t *peer; - } bind; + } plug; + // Sent to socket to let it know about the newly created object. struct { - uint64_t bytes; - } tail; + class object_t *object; + } own; + // Sent between objects to establish pipe(s) between them. struct { - uint64_t bytes; - } head; - - struct { - class simple_semaphore_t *smph; - } reg; - - struct { - class session_t *peer; - bool flow_in; - bool flow_out; - } reg_and_bind; - - struct { - class simple_semaphore_t *smph; - } unreg; + } bind; - // TODO: Engine object won't be deallocated on terminal shutdown - // while the command is still on the fly! + // Sent by I/O object ot the socket to request the shutdown of + // the I/O object. struct { - class i_engine *engine; - } engine; + class object_t *object; + } term_req; + // Sent by socket to I/O object to start its shutdown. struct { - } terminate; + } term; + // Sent by I/O object to the socket to acknowledge it has + // shut down. struct { - } terminate_ack; + } term_ack; } args; }; diff --git a/src/context.cpp b/src/context.cpp deleted file mode 100644 index 6b071cf..0000000 --- a/src/context.cpp +++ /dev/null @@ -1,153 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "context.hpp" -#include "i_api.hpp" -#include "app_thread.hpp" -#include "io_thread.hpp" -#include "platform.hpp" -#include "err.hpp" -#include "pipe.hpp" - -#if defined ZMQ_HAVE_WINDOWS -#include "windows.h" -#endif - -zmq::context_t::context_t (int app_threads_, int io_threads_) -{ -#ifdef ZMQ_HAVE_WINDOWS - // Intialise Windows sockets. Note that WSAStartup can be called multiple - // times given that WSACleanup will be called for each WSAStartup. - WORD version_requested = MAKEWORD (2, 2); - WSADATA wsa_data; - int rc = WSAStartup (version_requested, &wsa_data); - zmq_assert (rc == 0); - zmq_assert (LOBYTE (wsa_data.wVersion) == 2 && - HIBYTE (wsa_data.wVersion) == 2); -#endif - - // Create application thread proxies. - for (int i = 0; i != app_threads_; i++) { - app_thread_t *app_thread = new app_thread_t (this, i); - zmq_assert (app_thread); - app_threads.push_back (app_thread); - signalers.push_back (app_thread->get_signaler ()); - } - - // Create I/O thread objects. - for (int i = 0; i != io_threads_; i++) { - io_thread_t *io_thread = new io_thread_t (this, i + app_threads_); - zmq_assert (io_thread); - io_threads.push_back (io_thread); - signalers.push_back (io_thread->get_signaler ()); - } - - // Create command pipe matrix. - command_pipes = new command_pipe_t [signalers.size () * signalers.size ()]; - zmq_assert (command_pipes); - - // Launch I/O threads. - for (int i = 0; i != io_threads_; i++) - io_threads [i]->start (); -} - -zmq::context_t::~context_t () -{ - // Close all application theads, sockets, io_objects etc. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - delete app_threads [i]; - - // Ask I/O threads to terminate. If stop signal wasn't sent to I/O - // thread subsequent invocation of destructor would hang-up. - for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) - io_threads [i]->stop (); - - // Wait till I/O threads actually terminate. - for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) - delete io_threads [i]; - - delete [] command_pipes; - -#ifdef ZMQ_HAVE_WINDOWS - // On Windows, uninitialise socket layer. - int rc = WSACleanup (); - wsa_assert (rc != SOCKET_ERROR); -#endif -} - -int zmq::context_t::thread_slot_count () -{ - return signalers.size (); -} - -zmq::i_api *zmq::context_t::create_socket (int type_) -{ - threads_sync.lock (); - app_thread_t *thread = choose_app_thread (); - if (!thread) { - threads_sync.unlock (); - return NULL; - } - - zmq_assert (false); - i_api *s = NULL; - //i_api *s = thread->create_socket (type_); - - threads_sync.unlock (); - return s; -} - -zmq::app_thread_t *zmq::context_t::choose_app_thread () -{ - // Check whether thread ID is already assigned. If so, return it. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - if (app_threads [i]->is_current ()) - return app_threads [i]; - - // Check whether there's an unused thread slot in the cotext. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - if (app_threads [i]->make_current ()) - return app_threads [i]; - - // Thread limit was exceeded. - errno = EMFILE; - return NULL; -} - -zmq::io_thread_t *zmq::context_t::choose_io_thread (uint64_t taskset_) -{ - zmq_assert (io_threads.size () > 0); - - // Find the I/O thread with minimum load. - int min_load = io_threads [0]->get_load (); - io_threads_t::size_type result = 0; - for (io_threads_t::size_type i = 1; i != io_threads.size (); i++) { - if (!taskset_ || (taskset_ & (uint64_t (1) << i))) { - int load = io_threads [i]->get_load (); - if (load < min_load) { - min_load = load; - result = i; - } - } - } - - return io_threads [result]; -} diff --git a/src/context.hpp b/src/context.hpp deleted file mode 100644 index f2eab1c..0000000 --- a/src/context.hpp +++ /dev/null @@ -1,122 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_CONTEXT_HPP_INCLUDED__ -#define __ZMQ_CONTEXT_HPP_INCLUDED__ - -#include -#include -#include - -#include "i_signaler.hpp" -#include "ypipe.hpp" -#include "command.hpp" -#include "config.hpp" -#include "mutex.hpp" -#include "stdint.hpp" - -namespace zmq -{ - - // Dispatcher implements bidirectional thread-safe passing of commands - // between N threads. It consists of a ypipes to pass commands and - // signalers to wake up the receiver thread when new commands are - // available. Note that context is inefficient for passing messages - // within a thread (sender thread = receiver thread). The optimisation is - // not part of the class and should be implemented by individual threads - // (presumably by calling the command handling function directly). - - class context_t - { - public: - - // Create the context object. Matrix of pipes to communicate between - // each socket and each I/O thread is created along with appropriate - // signalers. - context_t (int app_threads_, int io_threads_); - - // To be called to terminate the whole infrastructure (zmq_term). - ~context_t (); - - // Create a socket. - struct i_api *create_socket (int type_); - - // Returns number of thread slots in the context. To be used by - // individual threads to find out how many distinct signals can be - // received. - int thread_slot_count (); - - // Send command from the source to the destination. - inline void write (int source_, int destination_, - const command_t &command_) - { - command_pipe_t &pipe = - command_pipes [source_ * signalers.size () + destination_]; - pipe.write (command_); - if (!pipe.flush ()) - signalers [destination_]->signal (source_); - } - - // Receive command from the source. Returns false if there is no - // command available. - inline bool read (int source_, int destination_, command_t *command_) - { - return command_pipes [source_ * signalers.size () + - destination_].read (command_); - } - - // Returns the I/O thread that is the least busy at the moment. - // Taskset specifies which I/O threads are eligible (0 = all). - class io_thread_t *choose_io_thread (uint64_t taskset_); - - private: - - // Returns the app thread associated with the current thread. - // NULL if we are out of app thread slots. - class app_thread_t *choose_app_thread (); - - // Application threads. - typedef std::vector app_threads_t; - app_threads_t app_threads; - - // I/O threads. - typedef std::vector io_threads_t; - io_threads_t io_threads; - - // Signalers for both application and I/O threads. - std::vector signalers; - - // Pipe to hold the commands. - typedef ypipe_t command_pipe_t; - - // NxN matrix of command pipes. - command_pipe_t *command_pipes; - - // Synchronisation of accesses to shared thread data. - mutex_t threads_sync; - - context_t (const context_t&); - void operator = (const context_t&); - }; - -} - -#endif - diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp new file mode 100644 index 0000000..0b68880 --- /dev/null +++ b/src/dispatcher.cpp @@ -0,0 +1,149 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../include/zmq.h" + +#include "dispatcher.hpp" +#include "i_api.hpp" +#include "app_thread.hpp" +#include "io_thread.hpp" +#include "platform.hpp" +#include "err.hpp" +#include "pipe.hpp" + +#if defined ZMQ_HAVE_WINDOWS +#include "windows.h" +#endif + +zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) +{ +#ifdef ZMQ_HAVE_WINDOWS + // Intialise Windows sockets. Note that WSAStartup can be called multiple + // times given that WSACleanup will be called for each WSAStartup. + WORD version_requested = MAKEWORD (2, 2); + WSADATA wsa_data; + int rc = WSAStartup (version_requested, &wsa_data); + zmq_assert (rc == 0); + zmq_assert (LOBYTE (wsa_data.wVersion) == 2 && + HIBYTE (wsa_data.wVersion) == 2); +#endif + + // Create application thread proxies. + for (int i = 0; i != app_threads_; i++) { + app_thread_t *app_thread = new app_thread_t (this, i); + zmq_assert (app_thread); + app_threads.push_back (app_thread); + signalers.push_back (app_thread->get_signaler ()); + } + + // Create I/O thread objects. + for (int i = 0; i != io_threads_; i++) { + io_thread_t *io_thread = new io_thread_t (this, i + app_threads_); + zmq_assert (io_thread); + io_threads.push_back (io_thread); + signalers.push_back (io_thread->get_signaler ()); + } + + // Create command pipe matrix. + command_pipes = new command_pipe_t [signalers.size () * signalers.size ()]; + zmq_assert (command_pipes); + + // Launch I/O threads. + for (int i = 0; i != io_threads_; i++) + io_threads [i]->start (); +} + +zmq::dispatcher_t::~dispatcher_t () +{ + // Close all application theads, sockets, io_objects etc. + for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) + delete app_threads [i]; + + // Ask I/O threads to terminate. If stop signal wasn't sent to I/O + // thread subsequent invocation of destructor would hang-up. + for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) + io_threads [i]->stop (); + + // Wait till I/O threads actually terminate. + for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) + delete io_threads [i]; + + delete [] command_pipes; + +#ifdef ZMQ_HAVE_WINDOWS + // On Windows, uninitialise socket layer. + int rc = WSACleanup (); + wsa_assert (rc != SOCKET_ERROR); +#endif +} + +int zmq::dispatcher_t::thread_slot_count () +{ + return signalers.size (); +} + +zmq::i_api *zmq::dispatcher_t::create_socket (int type_) +{ + threads_sync.lock (); + app_thread_t *thread = choose_app_thread (); + if (!thread) { + threads_sync.unlock (); + return NULL; + } + threads_sync.unlock (); + + return thread->create_socket (type_); +} + +zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () +{ + // Check whether thread ID is already assigned. If so, return it. + for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) + if (app_threads [i]->is_current ()) + return app_threads [i]; + + // Check whether there's an unused thread slot in the cotext. + for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) + if (app_threads [i]->make_current ()) + return app_threads [i]; + + // Thread limit was exceeded. + errno = EMFILE; + return NULL; +} + +zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_) +{ + zmq_assert (io_threads.size () > 0); + + // Find the I/O thread with minimum load. + int min_load = io_threads [0]->get_load (); + io_threads_t::size_type result = 0; + for (io_threads_t::size_type i = 1; i != io_threads.size (); i++) { + if (!taskset_ || (taskset_ & (uint64_t (1) << i))) { + int load = io_threads [i]->get_load (); + if (load < min_load) { + min_load = load; + result = i; + } + } + } + + return io_threads [result]; +} diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp new file mode 100644 index 0000000..08ffab1 --- /dev/null +++ b/src/dispatcher.hpp @@ -0,0 +1,122 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__ +#define __ZMQ_DISPATCHER_HPP_INCLUDED__ + +#include +#include +#include + +#include "i_signaler.hpp" +#include "ypipe.hpp" +#include "command.hpp" +#include "config.hpp" +#include "mutex.hpp" +#include "stdint.hpp" + +namespace zmq +{ + + // Dispatcher implements bidirectional thread-safe passing of commands + // between N threads. It consists of a ypipes to pass commands and + // signalers to wake up the receiver thread when new commands are + // available. Note that dispatcher is inefficient for passing messages + // within a thread (sender thread = receiver thread). The optimisation is + // not part of the class and should be implemented by individual threads + // (presumably by calling the command handling function directly). + + class dispatcher_t + { + public: + + // Create the dispatcher object. Matrix of pipes to communicate between + // each socket and each I/O thread is created along with appropriate + // signalers. + dispatcher_t (int app_threads_, int io_threads_); + + // To be called to terminate the whole infrastructure (zmq_term). + ~dispatcher_t (); + + // Create a socket. + struct i_api *create_socket (int type_); + + // Returns number of thread slots in the dispatcher. To be used by + // individual threads to find out how many distinct signals can be + // received. + int thread_slot_count (); + + // Send command from the source to the destination. + inline void write (int source_, int destination_, + const command_t &command_) + { + command_pipe_t &pipe = + command_pipes [source_ * signalers.size () + destination_]; + pipe.write (command_); + if (!pipe.flush ()) + signalers [destination_]->signal (source_); + } + + // Receive command from the source. Returns false if there is no + // command available. + inline bool read (int source_, int destination_, command_t *command_) + { + return command_pipes [source_ * signalers.size () + + destination_].read (command_); + } + + // Returns the I/O thread that is the least busy at the moment. + // Taskset specifies which I/O threads are eligible (0 = all). + class io_thread_t *choose_io_thread (uint64_t taskset_); + + private: + + // Returns the app thread associated with the current thread. + // NULL if we are out of app thread slots. + class app_thread_t *choose_app_thread (); + + // Application threads. + typedef std::vector app_threads_t; + app_threads_t app_threads; + + // I/O threads. + typedef std::vector io_threads_t; + io_threads_t io_threads; + + // Signalers for both application and I/O threads. + std::vector signalers; + + // Pipe to hold the commands. + typedef ypipe_t command_pipe_t; + + // NxN matrix of command pipes. + command_pipe_t *command_pipes; + + // Synchronisation of accesses to shared thread data. + mutex_t threads_sync; + + dispatcher_t (const dispatcher_t&); + void operator = (const dispatcher_t&); + }; + +} + +#endif + diff --git a/src/err.hpp b/src/err.hpp index fdfce01..3854d8a 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -80,6 +80,12 @@ namespace zmq abort ();\ }} while (false) +// Provides convenient way to check for POSIX errors. +#define posix_assert(x) do {\ +fprintf (stderr, "%s (%s:%d)\n", strerror (x), __FILE__, __LINE__);\ +abort ();\ +} while (false) + // Provides convenient way to check for errors from getaddrinfo. #define gai_assert(x) do { if (x) {\ const char *errstr = gai_strerror (x);\ diff --git a/src/i_api.hpp b/src/i_api.hpp index a87e41d..36afcea 100644 --- a/src/i_api.hpp +++ b/src/i_api.hpp @@ -1,20 +1,20 @@ /* -Copyright (c) 2007-2009 FastMQ Inc. - -This file is part of 0MQ. - -0MQ is free software; you can redistribute it and/or modify it under -the terms of the Lesser GNU General Public License as published by -the Free Software Foundation; either version 3 of the License, or -(at your option) any later version. - -0MQ is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -Lesser GNU General Public License for more details. - -You should have received a copy of the Lesser GNU General Public License -along with this program. If not, see . + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . */ #ifndef __ZMQ_I_API_HPP_INCLUDED__ @@ -25,6 +25,8 @@ namespace zmq struct i_api { + virtual ~i_api () {} + virtual int bind (const char *addr_, struct zmq_opts *opts_) = 0; virtual int connect (const char *addr_, struct zmq_opts *opts_) = 0; virtual int subscribe (const char *criteria_) = 0; diff --git a/src/i_socket.hpp b/src/i_socket.hpp deleted file mode 100644 index 99ade8a..0000000 --- a/src/i_socket.hpp +++ /dev/null @@ -1,36 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_I_SOCKET_HPP_INCLUDED__ -#define __ZMQ_I_SOCKET_HPP_INCLUDED__ - -namespace zmq -{ - - struct i_socket - { - virtual ~i_socket () {}; - - // Start shutting down the socket. - virtual void stop () = 0; - }; - -} - -#endif diff --git a/src/io_object.cpp b/src/io_object.cpp new file mode 100644 index 0000000..41e4717 --- /dev/null +++ b/src/io_object.cpp @@ -0,0 +1,41 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "io_object.hpp" + +zmq::io_object_t::io_object_t (object_t *parent_, object_t *owner_) : + object_t (parent_), + owner (owner_) +{ +} + +zmq::io_object_t::~io_object_t () +{ +} + +void zmq::io_object_t::term () +{ + send_term_req (owner, this); +} + +void zmq::io_object_t::process_term () +{ + send_term_ack (owner); + delete this; +} diff --git a/src/io_object.hpp b/src/io_object.hpp new file mode 100644 index 0000000..5ed1830 --- /dev/null +++ b/src/io_object.hpp @@ -0,0 +1,62 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_IO_OBJECT_HPP_INCLUDED__ +#define __ZMQ_IO_OBJECT_HPP_INCLUDED__ + +#include "object.hpp" + +namespace zmq +{ + + class io_object_t : public object_t + { + public: + + // I/O object will live in the thread inherited from the parent. + // However, it's lifetime is managed by the owner. + io_object_t (object_t *parent_, object_t *owner_); + + protected: + + // Ask owner socket to terminate this I/O object. This may not happen + void term (); + + // I/O object destroys itself. No point in allowing others to invoke + // the destructor. At the same time, it has to be virtual so that + // generic io_object deallocation mechanism destroys specific type + // of I/O object correctly. + virtual ~io_object_t (); + + private: + + // Handlers for incoming commands. + void process_term (); + + // Socket owning this I/O object. It is responsible for destroying + // it when it's being closed. + object_t *owner; + + io_object_t (const io_object_t&); + void operator = (const io_object_t&); + }; + +} + +#endif diff --git a/src/io_thread.cpp b/src/io_thread.cpp index f5261a6..1d85292 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -29,11 +29,11 @@ #include "select.hpp" #include "devpoll.hpp" #include "kqueue.hpp" -#include "context.hpp" +#include "dispatcher.hpp" #include "simple_semaphore.hpp" -zmq::io_thread_t::io_thread_t (context_t *context_, int thread_slot_) : - object_t (context_, thread_slot_) +zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : + object_t (dispatcher_, thread_slot_) { #if defined ZMQ_FORCE_SELECT poller = new select_t; @@ -115,7 +115,7 @@ void zmq::io_thread_t::in_event () // Read all the commands from particular thread. command_t cmd; - while (context->read (source_thread_slot, thread_slot, &cmd)) + while (dispatcher->read (source_thread_slot, thread_slot, &cmd)) cmd.destination->process_command (cmd); } } diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 43ee19e..6f25627 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -37,7 +37,7 @@ namespace zmq { public: - io_thread_t (class context_t *context_, int thread_slot_); + io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); // Clean-up. If the thread was started, it's neccessary to call 'stop' // before invoking destructor. Otherwise the destructor would hang up. diff --git a/src/mutex.hpp b/src/mutex.hpp index 9b51955..e233c9e 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -72,43 +72,47 @@ namespace zmq namespace zmq { - + class mutex_t { public: inline mutex_t () { int rc = pthread_mutex_init (&mutex, NULL); - errno_assert (rc == 0); + if (rc) + posix_assert (rc); } - + inline ~mutex_t () { int rc = pthread_mutex_destroy (&mutex); - errno_assert (rc == 0); + if (rc) + posix_assert (rc); } - + inline void lock () { int rc = pthread_mutex_lock (&mutex); - errno_assert (rc == 0); + if (rc) + posix_assert (rc); } - + inline void unlock () { int rc = pthread_mutex_unlock (&mutex); - errno_assert (rc == 0); + if (rc) + posix_assert (rc); } - + private: - + pthread_mutex_t mutex; - - // Disable copy construction and assignment. + + // Disable copy construction and assignment. mutex_t (const mutex_t&); void operator = (const mutex_t&); }; - + } #endif diff --git a/src/object.cpp b/src/object.cpp index 36f3937..e2267d6 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -18,19 +18,19 @@ */ #include "object.hpp" -#include "context.hpp" +#include "dispatcher.hpp" #include "err.hpp" #include "io_thread.hpp" #include "simple_semaphore.hpp" -zmq::object_t::object_t (context_t *context_, int thread_slot_) : - context (context_), +zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) : + dispatcher (dispatcher_), thread_slot (thread_slot_) { } zmq::object_t::object_t (object_t *parent_) : - context (parent_->context), + dispatcher (parent_->dispatcher), thread_slot (parent_->thread_slot) { } @@ -41,7 +41,7 @@ zmq::object_t::~object_t () int zmq::object_t::thread_slot_count () { - return context->thread_slot_count (); + return dispatcher->thread_slot_count (); } int zmq::object_t::get_thread_slot () @@ -53,45 +53,32 @@ void zmq::object_t::process_command (command_t &cmd_) { switch (cmd_.type) { - case command_t::head: - process_head (cmd_.args.head.bytes); + case command_t::stop: + process_stop (); break; - case command_t::tail: - process_tail (cmd_.args.tail.bytes); - break; + case command_t::plug: + process_plug (); + return; - case command_t::engine: - process_engine (cmd_.args.engine.engine); - break; + case command_t::own: + process_own (cmd_.args.own.object); + return; case command_t::bind: - process_bind (cmd_.args.bind.reader, cmd_.args.bind.peer); - break; - - case command_t::reg: - process_reg (cmd_.args.reg.smph); - break; - - case command_t::reg_and_bind: - process_reg_and_bind (cmd_.args.reg_and_bind.peer, - cmd_.args.reg_and_bind.flow_in, cmd_.args.reg_and_bind.flow_out); - break; - - case command_t::unreg: - process_unreg (cmd_.args.unreg.smph); - break; - - case command_t::terminate: - process_terminate (); - break; + process_bind (); + return; - case command_t::terminate_ack: - process_terminate_ack (); - break; + case command_t::term_req: + process_term_req (cmd_.args.term_req.object); + return; + + case command_t::term: + process_term (); + return; - case command_t::stop: - process_stop (); + case command_t::term_ack: + process_term_ack (); return; default: @@ -101,7 +88,7 @@ void zmq::object_t::process_command (command_t &cmd_) zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) { - return context->choose_io_thread (taskset_); + return dispatcher->choose_io_thread (taskset_); } void zmq::object_t::send_stop () @@ -111,91 +98,56 @@ void zmq::object_t::send_stop () command_t cmd; cmd.destination = this; cmd.type = command_t::stop; - context->write (thread_slot, thread_slot, cmd); + dispatcher->write (thread_slot, thread_slot, cmd); } -void zmq::object_t::send_bind (object_t *destination_, pipe_reader_t *reader_, - session_t *peer_) +void zmq::object_t::send_plug (object_t *destination_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::bind; - cmd.args.bind.reader = reader_; - cmd.args.bind.peer = peer_; + cmd.type = command_t::plug; send_command (cmd); } -void zmq::object_t::send_head (object_t *destination_, uint64_t bytes_) +void zmq::object_t::send_own (object_t *destination_, object_t *object_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::head; - cmd.args.head.bytes = bytes_; + cmd.type = command_t::own; + cmd.args.own.object = object_; send_command (cmd); } -void zmq::object_t::send_tail (object_t *destination_, uint64_t bytes_) +void zmq::object_t::send_bind (object_t *destination_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::tail; - cmd.args.tail.bytes = bytes_; - send_command (cmd); -} - -void zmq::object_t::send_reg (object_t *destination_, simple_semaphore_t *smph_) -{ - command_t cmd; - cmd.destination = destination_; - cmd.type = command_t::reg; - cmd.args.reg.smph = smph_; - send_command (cmd); -} - -void zmq::object_t::send_reg_and_bind (object_t *destination_, - session_t *peer_, bool flow_in_, bool flow_out_) -{ - command_t cmd; - cmd.destination = destination_; - cmd.type = command_t::reg_and_bind; - cmd.args.reg_and_bind.peer = peer_; - cmd.args.reg_and_bind.flow_in = flow_in_; - cmd.args.reg_and_bind.flow_out = flow_out_; - send_command (cmd); -} - -void zmq::object_t::send_unreg (object_t *destination_, - simple_semaphore_t *smph_) -{ - command_t cmd; - cmd.destination = destination_; - cmd.type = command_t::unreg; - cmd.args.unreg.smph = smph_; + cmd.type = command_t::bind; send_command (cmd); } -void zmq::object_t::send_engine (object_t *destination_, i_engine *engine_) +void zmq::object_t::send_term_req (object_t *destination_, object_t *object_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::engine; - cmd.args.engine.engine = engine_; + cmd.type = command_t::term_req; + cmd.args.term_req.object = object_; send_command (cmd); } -void zmq::object_t::send_terminate (object_t *destination_) +void zmq::object_t::send_term (object_t *destination_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::terminate; + cmd.type = command_t::term; send_command (cmd); } -void zmq::object_t::send_terminate_ack (object_t *destination_) +void zmq::object_t::send_term_ack (object_t *destination_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::terminate_ack; + cmd.type = command_t::term_ack; send_command (cmd); } @@ -204,48 +156,32 @@ void zmq::object_t::process_stop () zmq_assert (false); } -void zmq::object_t::process_bind (pipe_reader_t *reader_, session_t *peer_) -{ - zmq_assert (false); -} - -void zmq::object_t::process_head (uint64_t bytes_) -{ - zmq_assert (false); -} - -void zmq::object_t::process_tail (uint64_t bytes_) -{ - zmq_assert (false); -} - -void zmq::object_t::process_reg (simple_semaphore_t *smph_) +void zmq::object_t::process_plug () { zmq_assert (false); } -void zmq::object_t::process_reg_and_bind (session_t *session_, - bool flow_in_, bool flow_out_) +void zmq::object_t::process_own (object_t *object_) { zmq_assert (false); } -void zmq::object_t::process_unreg (simple_semaphore_t *smph_) +void zmq::object_t::process_bind () { zmq_assert (false); } -void zmq::object_t::process_engine (i_engine *engine_) +void zmq::object_t::process_term_req (object_t *object_) { zmq_assert (false); } -void zmq::object_t::process_terminate () +void zmq::object_t::process_term () { zmq_assert (false); } -void zmq::object_t::process_terminate_ack () +void zmq::object_t::process_term_ack () { zmq_assert (false); } @@ -256,6 +192,6 @@ void zmq::object_t::send_command (command_t &cmd_) if (destination_thread_slot == thread_slot) cmd_.destination->process_command (cmd_); else - context->write (thread_slot, destination_thread_slot, cmd_); + dispatcher->write (thread_slot, destination_thread_slot, cmd_); } diff --git a/src/object.hpp b/src/object.hpp index 5851c68..7357549 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -32,7 +32,7 @@ namespace zmq { public: - object_t (class context_t *context_, int thread_slot_); + object_t (class dispatcher_t *dispatcher_, int thread_slot_); object_t (object_t *parent_); ~object_t (); @@ -42,44 +42,32 @@ namespace zmq protected: // Derived object can use following functions to interact with - // global repositories. See context.hpp for function details. + // global repositories. See dispatcher.hpp for function details. int thread_slot_count (); class io_thread_t *choose_io_thread (uint64_t taskset_); // Derived object can use these functions to send commands // to other objects. void send_stop (); - void send_bind (object_t *destination_, class pipe_reader_t *reader_, - class session_t *peer_); - void send_head (object_t *destination_, uint64_t bytes_); - void send_tail (object_t *destination_, uint64_t bytes_); - void send_reg (object_t *destination_, - class simple_semaphore_t *smph_); - void send_reg_and_bind (object_t *destination_, class session_t *peer_, - bool flow_in_, bool flow_out_); - void send_unreg (object_t *destination_, - class simple_semaphore_t *smph_); - void send_engine (object_t *destination_, struct i_engine *engine_); - void send_terminate (object_t *destination_); - void send_terminate_ack (object_t *destination_); + void send_plug (object_t *destination_); + void send_own (object_t *destination_, object_t *object_); + void send_bind (object_t *destination_); + void send_term_req (object_t *destination_, object_t *object_); + void send_term (object_t *destination_); + void send_term_ack (object_t *destination_); // These handlers can be overloaded by the derived objects. They are // called when command arrives from another thread. virtual void process_stop (); - virtual void process_bind (class pipe_reader_t *reader_, - class session_t *peer_); - virtual void process_head (uint64_t bytes_); - virtual void process_tail (uint64_t bytes_); - virtual void process_reg (class simple_semaphore_t *smph_); - virtual void process_reg_and_bind (class session_t *peer_, - bool flow_in_, bool flow_out_); - virtual void process_unreg (class simple_semaphore_t *smph_); - virtual void process_engine (struct i_engine *engine_); - virtual void process_terminate (); - virtual void process_terminate_ack (); + virtual void process_plug (); + virtual void process_own (object_t *object_); + virtual void process_bind (); + virtual void process_term_req (object_t *object_); + virtual void process_term (); + virtual void process_term_ack (); // Pointer to the root of the infrastructure. - class context_t *context; + class dispatcher_t *dispatcher; // Slot ID of the thread the object belongs to. int thread_slot; diff --git a/src/socket_base.cpp b/src/socket_base.cpp new file mode 100644 index 0000000..3737410 --- /dev/null +++ b/src/socket_base.cpp @@ -0,0 +1,129 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include + +#include "../include/zmq.h" + +#include "socket_base.hpp" +#include "app_thread.hpp" +#include "err.hpp" +#include "zmq_listener.hpp" +#include "io_thread.hpp" + +zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : + object_t (parent_), + pending_term_acks (0), + app_thread (parent_) +{ +} + +zmq::socket_base_t::~socket_base_t () +{ + while (true) { + + // On third pass of the loop there should be no more I/O objects + // because all connecters and listerners were destroyed during + // the first pass and all engines delivered by delayed 'own' commands + // are destroyed during the second pass. + if (io_objects.empty () && !pending_term_acks) + break; + + // Send termination request to all associated I/O objects. + for (io_objects_t::size_type i = 0; i != io_objects.size (); i++) + send_term (io_objects [i]); + + // Move the objects to the list of pending term acks. + pending_term_acks += io_objects.size (); + io_objects.clear (); + + // Process commands till we get all the termination acknowledgements. + while (pending_term_acks) + app_thread->process_commands (true); + } +} + +int zmq::socket_base_t::bind (const char *addr_, struct zmq_opts *opts_) +{ + uint64_t taskset = opts_ ? opts_->taskset : 0; + object_t *listener = new zmq_listener_t (choose_io_thread (taskset), this); + send_plug (listener); + send_own (this, listener); + return 0; +} + +int zmq::socket_base_t::connect (const char *addr_, struct zmq_opts *opts_) +{ + zmq_assert (false); +} + +int zmq::socket_base_t::subscribe (const char *criteria_) +{ + zmq_assert (false); +} + +int zmq::socket_base_t::send (struct zmq_msg *msg_, int flags_) +{ + zmq_assert (false); +} + +int zmq::socket_base_t::flush () +{ + zmq_assert (false); +} + +int zmq::socket_base_t::recv (struct zmq_msg *msg_, int flags_) +{ + zmq_assert (false); +} + +int zmq::socket_base_t::close () +{ + app_thread->remove_socket (this); + delete this; + return 0; +} + +void zmq::socket_base_t::process_own (object_t *object_) +{ + io_objects.push_back (object_); +} + +void zmq::socket_base_t::process_term_req (object_t *object_) +{ + // If I/O object is well and alive ask it to terminate. + // TODO: Following find may produce an unacceptable jitter in + // C10K-style applications. If so, use set instead of vector. + io_objects_t::iterator it = std::find (io_objects.begin (), + io_objects.end (), object_); + if (it != io_objects.end ()) { + pending_term_acks++; + io_objects.erase (it); + send_term (object_); + } + + // If not found, we assume that termination request was already sent to + // the object so we can sagely ignore the request. +} + +void zmq::socket_base_t::process_term_ack () +{ + zmq_assert (pending_term_acks); + pending_term_acks--; +} diff --git a/src/socket_base.hpp b/src/socket_base.hpp new file mode 100644 index 0000000..633f003 --- /dev/null +++ b/src/socket_base.hpp @@ -0,0 +1,72 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__ +#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__ + +#include + +#include "i_api.hpp" +#include "object.hpp" + +namespace zmq +{ + + class socket_base_t : public object_t, public i_api + { + public: + + socket_base_t (class app_thread_t *parent_); + ~socket_base_t (); + + // i_api interface implementation. + int bind (const char *addr_, struct zmq_opts *opts_); + int connect (const char *addr_, struct zmq_opts *opts_); + int subscribe (const char *criteria_); + int send (struct zmq_msg *msg_, int flags_); + int flush (); + int recv (struct zmq_msg *msg_, int flags_); + int close (); + + private: + + // Handlers for incoming commands. + void process_own (object_t *object_); + void process_term_req (object_t *object_); + void process_term_ack (); + + // List of all I/O objects owned by this socket. The socket is + // responsible for deallocating them before it quits. + typedef std::vector io_objects_t; + io_objects_t io_objects; + + // Number of I/O objects that were already asked to terminate + // but haven't acknowledged it yet. + int pending_term_acks; + + // Application thread the socket lives in. + class app_thread_t *app_thread; + + socket_base_t (const socket_base_t&); + void operator = (const socket_base_t&); + }; + +} + +#endif diff --git a/src/zmq.cpp b/src/zmq.cpp index d19b229..149a7e2 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -25,7 +25,7 @@ #include "i_api.hpp" #include "err.hpp" -#include "context.hpp" +#include "dispatcher.hpp" #include "msg.hpp" int zmq_msg_init (zmq_msg *msg_) @@ -162,27 +162,28 @@ int zmq_msg_type (zmq_msg *msg_) void *zmq_init (int app_threads_, int io_threads_) { - // There should be at least a single thread managed by the context. + // There should be at least a single thread managed by the dispatcher. if (app_threads_ < 0 || io_threads_ < 0 || app_threads_ + io_threads_ == 0) { errno = EINVAL; return NULL; } - zmq::context_t *context = new zmq::context_t (app_threads_, io_threads_); - zmq_assert (context); - return (void*) context; + zmq::dispatcher_t *dispatcher = new zmq::dispatcher_t (app_threads_, + io_threads_); + zmq_assert (dispatcher); + return (void*) dispatcher; } -int zmq_term (void *context_) +int zmq_term (void *dispatcher_) { - delete (zmq::context_t*) context_; + delete (zmq::dispatcher_t*) dispatcher_; return 0; } -void *zmq_socket (void *context_, int type_) +void *zmq_socket (void *dispatcher_, int type_) { - return (void*) (((zmq::context_t*) context_)->create_socket (type_)); + return (void*) (((zmq::dispatcher_t*) dispatcher_)->create_socket (type_)); } int zmq_close (void *s_) diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp new file mode 100644 index 0000000..1f1e012 --- /dev/null +++ b/src/zmq_listener.cpp @@ -0,0 +1,35 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "zmq_listener.hpp" +#include "err.hpp" + +zmq::zmq_listener_t::zmq_listener_t (object_t *parent_, object_t *owner_) : + io_object_t (parent_, owner_) +{ +} + +zmq::zmq_listener_t::~zmq_listener_t () +{ +} + +void zmq::zmq_listener_t::process_plug () +{ + // TODO: Register with the I/O thread here. +} diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp new file mode 100644 index 0000000..12192b2 --- /dev/null +++ b/src/zmq_listener.hpp @@ -0,0 +1,46 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ +#define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ + +#include "io_object.hpp" + +namespace zmq +{ + + class zmq_listener_t : public io_object_t + { + public: + + zmq_listener_t (object_t *parent_, object_t *owner_); + ~zmq_listener_t (); + + private: + + // Handlers for incoming commands. + void process_plug (); + + zmq_listener_t (const zmq_listener_t&); + void operator = (const zmq_listener_t&); + }; + +} + +#endif -- cgit v1.2.3