diff options
-rw-r--r-- | include/zmq.hpp | 8 | ||||
-rw-r--r-- | src/Makefile.am | 14 | ||||
-rw-r--r-- | src/app_thread.cpp | 37 | ||||
-rw-r--r-- | src/app_thread.hpp | 13 | ||||
-rw-r--r-- | src/command.hpp | 59 | ||||
-rw-r--r-- | src/dispatcher.cpp (renamed from src/context.cpp) | 22 | ||||
-rw-r--r-- | src/dispatcher.hpp (renamed from src/context.hpp) | 20 | ||||
-rw-r--r-- | src/err.hpp | 6 | ||||
-rw-r--r-- | src/i_api.hpp | 34 | ||||
-rw-r--r-- | src/io_object.cpp | 41 | ||||
-rw-r--r-- | src/io_object.hpp | 62 | ||||
-rw-r--r-- | src/io_thread.cpp | 8 | ||||
-rw-r--r-- | src/io_thread.hpp | 2 | ||||
-rw-r--r-- | src/mutex.hpp | 30 | ||||
-rw-r--r-- | src/object.cpp | 158 | ||||
-rw-r--r-- | src/object.hpp | 42 | ||||
-rw-r--r-- | src/socket_base.cpp | 129 | ||||
-rw-r--r-- | src/socket_base.hpp | 72 | ||||
-rw-r--r-- | src/zmq.cpp | 19 | ||||
-rw-r--r-- | src/zmq_listener.cpp (renamed from src/i_socket.hpp) | 23 | ||||
-rw-r--r-- | src/zmq_listener.hpp | 46 |
21 files changed, 573 insertions, 272 deletions
diff --git a/include/zmq.hpp b/include/zmq.hpp index ccc234d..004706b 100644 --- a/include/zmq.hpp +++ b/include/zmq.hpp @@ -38,7 +38,7 @@ namespace zmq message_delimiter = 1 << ZMQ_DELIMITER }; - class no_memory : public exception + class no_memory : public std::exception { virtual const char *what () { @@ -46,7 +46,7 @@ namespace zmq } }; - class invalid_argument : public exception + class invalid_argument : public std::exception { virtual const char *what () { @@ -54,7 +54,7 @@ namespace zmq } }; - class too_many_threads : public exception + class too_many_threads : public std::exception { virtual const char *what () { @@ -62,7 +62,7 @@ namespace zmq } }; - class address_in_use : public exception + class address_in_use : public std::exception { virtual const char *what () { diff --git a/src/Makefile.am b/src/Makefile.am index bde9c39..47037a2 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -7,14 +7,15 @@ libzmq_la_SOURCES = \ atomic_ptr.hpp \ command.hpp \ config.hpp \ - context.hpp \ decoder.hpp \ devpoll.hpp \ + dispatcher.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ fd.hpp \ fd_signaler.hpp \ + io_object.hpp \ io_thread.hpp \ ip.hpp \ i_api.hpp \ @@ -31,6 +32,7 @@ libzmq_la_SOURCES = \ poll.hpp \ select.hpp \ simple_semaphore.hpp \ + socket_base.hpp \ stdint.hpp \ tcp_connecter.hpp \ tcp_listener.hpp \ @@ -42,25 +44,29 @@ libzmq_la_SOURCES = \ ypipe.hpp \ ypollset.hpp \ yqueue.hpp \ + zmq_listener.hpp \ app_thread.cpp \ - context.cpp \ - devpoll.hpp \ + devpoll.cpp \ + dispatcher.cpp \ epoll.cpp \ err.cpp \ fd_signaler.cpp \ + io_object.cpp \ io_thread.cpp \ ip.cpp \ kqueue.cpp \ object.cpp \ poll.cpp \ select.cpp \ + socket_base.cpp \ tcp_connecter.cpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ uuid.cpp \ ypollset.cpp \ - zmq.cpp + zmq.cpp \ + zmq_listener.cpp libzmq_la_LDFLAGS = -version-info 0:0:0 libzmq_la_CXXFLAGS = -Wall -pedantic -Werror @ZMQ_EXTRA_CXXFLAGS@ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 23a055a..3f76970 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <algorithm> + #include "../include/zmq.h" #if defined ZMQ_HAVE_WINDOWS @@ -26,10 +28,12 @@ #endif #include "app_thread.hpp" -#include "context.hpp" +#include "i_api.hpp" +#include "dispatcher.hpp" #include "err.hpp" #include "pipe.hpp" #include "config.hpp" +#include "socket_base.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -39,8 +43,8 @@ #define ZMQ_DELAY_COMMANDS #endif -zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) : - object_t (context_, thread_slot_), +zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : + object_t (dispatcher_, thread_slot_), tid (0), last_processing_time (0) { @@ -48,13 +52,9 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) : zmq::app_thread_t::~app_thread_t () { - // Ask all the sockets to start termination, then wait till it is complete. - for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) - (*it)->stop (); + // Destroy all the sockets owned by this application thread. for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) delete *it; - - delete this; } zmq::i_signaler *zmq::app_thread_t::get_signaler () @@ -123,9 +123,28 @@ void zmq::app_thread_t::process_commands (bool block_) for (int i = 0; i != thread_slot_count (); i++) { if (signals & (ypollset_t::signals_t (1) << i)) { command_t cmd; - while (context->read (i, get_thread_slot (), &cmd)) + while (dispatcher->read (i, get_thread_slot (), &cmd)) cmd.destination->process_command (cmd); } } } } + +zmq::i_api *zmq::app_thread_t::create_socket (int type_) +{ + // TODO: type is ignored for the time being. + socket_base_t *s = new socket_base_t (this); + zmq_assert (s); + sockets.push_back (s); + return s; +} + +void zmq::app_thread_t::remove_socket (i_api *socket_) +{ + // TODO: To speed this up we can possibly use the system where each socket + // holds its index (see I/O scheduler implementation). + sockets_t::iterator it = std::find (sockets.begin (), sockets.end (), + socket_); + zmq_assert (it != sockets.end ()); + sockets.erase (it); +} diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 31679b8..59e4a25 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -22,7 +22,6 @@ #include <vector> -#include "i_socket.hpp" #include "stdint.hpp" #include "object.hpp" #include "ypollset.hpp" @@ -34,7 +33,7 @@ namespace zmq { public: - app_thread_t (class context_t *context_, int thread_slot_); + app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); ~app_thread_t (); @@ -42,7 +41,7 @@ namespace zmq i_signaler *get_signaler (); // Nota bene: Following two functions are accessed from different - // threads. The caller (context) is responsible for synchronisation + // threads. The caller (dispatcher) is responsible for synchronisation // of accesses. // Returns true is current thread is associated with the app thread. @@ -56,10 +55,16 @@ namespace zmq // set to true, returns only after at least one command was processed. void process_commands (bool block_); + // Create a socket of a specified type. + struct i_api *create_socket (int type_); + + // Unregister the socket from the app_thread (called by socket itself). + void remove_socket (struct i_api *socket_); + private: // All the sockets created from this application thread. - typedef std::vector <i_socket*> sockets_t; + typedef std::vector <struct i_api*> sockets_t; sockets_t sockets; // Thread ID associated with this slot. diff --git a/src/command.hpp b/src/command.hpp index 69c4e57..de94ca3 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -35,60 +35,49 @@ namespace zmq enum type_t { stop, + plug, + own, bind, - head, - tail, - reg, - reg_and_bind, - unreg, - engine, - terminate, - terminate_ack + term_req, + term, + term_ack + } type; union { + // Sent to I/O thread to let it know that it should + // terminate itself. struct { } stop; + // Sent to I/O object to make it register with its I/O thread. struct { - class pipe_reader_t *reader; - class session_t *peer; - } bind; + } plug; + // Sent to socket to let it know about the newly created object. struct { - uint64_t bytes; - } tail; + class object_t *object; + } own; + // Sent between objects to establish pipe(s) between them. struct { - uint64_t bytes; - } head; - - struct { - class simple_semaphore_t *smph; - } reg; - - struct { - class session_t *peer; - bool flow_in; - bool flow_out; - } reg_and_bind; - - struct { - class simple_semaphore_t *smph; - } unreg; + } bind; - // TODO: Engine object won't be deallocated on terminal shutdown - // while the command is still on the fly! + // Sent by I/O object ot the socket to request the shutdown of + // the I/O object. struct { - class i_engine *engine; - } engine; + class object_t *object; + } term_req; + // Sent by socket to I/O object to start its shutdown. struct { - } terminate; + } term; + // Sent by I/O object to the socket to acknowledge it has + // shut down. struct { - } terminate_ack; + } term_ack; } args; }; diff --git a/src/context.cpp b/src/dispatcher.cpp index 6b071cf..0b68880 100644 --- a/src/context.cpp +++ b/src/dispatcher.cpp @@ -19,7 +19,7 @@ #include "../include/zmq.h" -#include "context.hpp" +#include "dispatcher.hpp" #include "i_api.hpp" #include "app_thread.hpp" #include "io_thread.hpp" @@ -31,7 +31,7 @@ #include "windows.h" #endif -zmq::context_t::context_t (int app_threads_, int io_threads_) +zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) { #ifdef ZMQ_HAVE_WINDOWS // Intialise Windows sockets. Note that WSAStartup can be called multiple @@ -69,7 +69,7 @@ zmq::context_t::context_t (int app_threads_, int io_threads_) io_threads [i]->start (); } -zmq::context_t::~context_t () +zmq::dispatcher_t::~dispatcher_t () { // Close all application theads, sockets, io_objects etc. for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) @@ -93,12 +93,12 @@ zmq::context_t::~context_t () #endif } -int zmq::context_t::thread_slot_count () +int zmq::dispatcher_t::thread_slot_count () { return signalers.size (); } -zmq::i_api *zmq::context_t::create_socket (int type_) +zmq::i_api *zmq::dispatcher_t::create_socket (int type_) { threads_sync.lock (); app_thread_t *thread = choose_app_thread (); @@ -106,16 +106,12 @@ zmq::i_api *zmq::context_t::create_socket (int type_) threads_sync.unlock (); return NULL; } - - zmq_assert (false); - i_api *s = NULL; - //i_api *s = thread->create_socket (type_); - threads_sync.unlock (); - return s; + + return thread->create_socket (type_); } -zmq::app_thread_t *zmq::context_t::choose_app_thread () +zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () { // Check whether thread ID is already assigned. If so, return it. for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) @@ -132,7 +128,7 @@ zmq::app_thread_t *zmq::context_t::choose_app_thread () return NULL; } -zmq::io_thread_t *zmq::context_t::choose_io_thread (uint64_t taskset_) +zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_) { zmq_assert (io_threads.size () > 0); diff --git a/src/context.hpp b/src/dispatcher.hpp index f2eab1c..08ffab1 100644 --- a/src/context.hpp +++ b/src/dispatcher.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_CONTEXT_HPP_INCLUDED__ -#define __ZMQ_CONTEXT_HPP_INCLUDED__ +#ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__ +#define __ZMQ_DISPATCHER_HPP_INCLUDED__ #include <vector> #include <map> @@ -37,27 +37,27 @@ namespace zmq // Dispatcher implements bidirectional thread-safe passing of commands // between N threads. It consists of a ypipes to pass commands and // signalers to wake up the receiver thread when new commands are - // available. Note that context is inefficient for passing messages + // available. Note that dispatcher is inefficient for passing messages // within a thread (sender thread = receiver thread). The optimisation is // not part of the class and should be implemented by individual threads // (presumably by calling the command handling function directly). - class context_t + class dispatcher_t { public: - // Create the context object. Matrix of pipes to communicate between + // Create the dispatcher object. Matrix of pipes to communicate between // each socket and each I/O thread is created along with appropriate // signalers. - context_t (int app_threads_, int io_threads_); + dispatcher_t (int app_threads_, int io_threads_); // To be called to terminate the whole infrastructure (zmq_term). - ~context_t (); + ~dispatcher_t (); // Create a socket. struct i_api *create_socket (int type_); - // Returns number of thread slots in the context. To be used by + // Returns number of thread slots in the dispatcher. To be used by // individual threads to find out how many distinct signals can be // received. int thread_slot_count (); @@ -112,8 +112,8 @@ namespace zmq // Synchronisation of accesses to shared thread data. mutex_t threads_sync; - context_t (const context_t&); - void operator = (const context_t&); + dispatcher_t (const dispatcher_t&); + void operator = (const dispatcher_t&); }; } diff --git a/src/err.hpp b/src/err.hpp index fdfce01..3854d8a 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -80,6 +80,12 @@ namespace zmq abort ();\ }} while (false) +// Provides convenient way to check for POSIX errors. +#define posix_assert(x) do {\ +fprintf (stderr, "%s (%s:%d)\n", strerror (x), __FILE__, __LINE__);\ +abort ();\ +} while (false) + // Provides convenient way to check for errors from getaddrinfo. #define gai_assert(x) do { if (x) {\ const char *errstr = gai_strerror (x);\ diff --git a/src/i_api.hpp b/src/i_api.hpp index a87e41d..36afcea 100644 --- a/src/i_api.hpp +++ b/src/i_api.hpp @@ -1,20 +1,20 @@ /* -Copyright (c) 2007-2009 FastMQ Inc. - -This file is part of 0MQ. - -0MQ is free software; you can redistribute it and/or modify it under -the terms of the Lesser GNU General Public License as published by -the Free Software Foundation; either version 3 of the License, or -(at your option) any later version. - -0MQ is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -Lesser GNU General Public License for more details. - -You should have received a copy of the Lesser GNU General Public License -along with this program. If not, see <http://www.gnu.org/licenses/>. + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. */ #ifndef __ZMQ_I_API_HPP_INCLUDED__ @@ -25,6 +25,8 @@ namespace zmq struct i_api { + virtual ~i_api () {} + virtual int bind (const char *addr_, struct zmq_opts *opts_) = 0; virtual int connect (const char *addr_, struct zmq_opts *opts_) = 0; virtual int subscribe (const char *criteria_) = 0; diff --git a/src/io_object.cpp b/src/io_object.cpp new file mode 100644 index 0000000..41e4717 --- /dev/null +++ b/src/io_object.cpp @@ -0,0 +1,41 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "io_object.hpp" + +zmq::io_object_t::io_object_t (object_t *parent_, object_t *owner_) : + object_t (parent_), + owner (owner_) +{ +} + +zmq::io_object_t::~io_object_t () +{ +} + +void zmq::io_object_t::term () +{ + send_term_req (owner, this); +} + +void zmq::io_object_t::process_term () +{ + send_term_ack (owner); + delete this; +} diff --git a/src/io_object.hpp b/src/io_object.hpp new file mode 100644 index 0000000..5ed1830 --- /dev/null +++ b/src/io_object.hpp @@ -0,0 +1,62 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_IO_OBJECT_HPP_INCLUDED__ +#define __ZMQ_IO_OBJECT_HPP_INCLUDED__ + +#include "object.hpp" + +namespace zmq +{ + + class io_object_t : public object_t + { + public: + + // I/O object will live in the thread inherited from the parent. + // However, it's lifetime is managed by the owner. + io_object_t (object_t *parent_, object_t *owner_); + + protected: + + // Ask owner socket to terminate this I/O object. This may not happen + void term (); + + // I/O object destroys itself. No point in allowing others to invoke + // the destructor. At the same time, it has to be virtual so that + // generic io_object deallocation mechanism destroys specific type + // of I/O object correctly. + virtual ~io_object_t (); + + private: + + // Handlers for incoming commands. + void process_term (); + + // Socket owning this I/O object. It is responsible for destroying + // it when it's being closed. + object_t *owner; + + io_object_t (const io_object_t&); + void operator = (const io_object_t&); + }; + +} + +#endif diff --git a/src/io_thread.cpp b/src/io_thread.cpp index f5261a6..1d85292 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -29,11 +29,11 @@ #include "select.hpp" #include "devpoll.hpp" #include "kqueue.hpp" -#include "context.hpp" +#include "dispatcher.hpp" #include "simple_semaphore.hpp" -zmq::io_thread_t::io_thread_t (context_t *context_, int thread_slot_) : - object_t (context_, thread_slot_) +zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : + object_t (dispatcher_, thread_slot_) { #if defined ZMQ_FORCE_SELECT poller = new select_t; @@ -115,7 +115,7 @@ void zmq::io_thread_t::in_event () // Read all the commands from particular thread. command_t cmd; - while (context->read (source_thread_slot, thread_slot, &cmd)) + while (dispatcher->read (source_thread_slot, thread_slot, &cmd)) cmd.destination->process_command (cmd); } } diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 43ee19e..6f25627 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -37,7 +37,7 @@ namespace zmq { public: - io_thread_t (class context_t *context_, int thread_slot_); + io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); // Clean-up. If the thread was started, it's neccessary to call 'stop' // before invoking destructor. Otherwise the destructor would hang up. diff --git a/src/mutex.hpp b/src/mutex.hpp index 9b51955..e233c9e 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -72,43 +72,47 @@ namespace zmq namespace zmq { - + class mutex_t { public: inline mutex_t () { int rc = pthread_mutex_init (&mutex, NULL); - errno_assert (rc == 0); + if (rc) + posix_assert (rc); } - + inline ~mutex_t () { int rc = pthread_mutex_destroy (&mutex); - errno_assert (rc == 0); + if (rc) + posix_assert (rc); } - + inline void lock () { int rc = pthread_mutex_lock (&mutex); - errno_assert (rc == 0); + if (rc) + posix_assert (rc); } - + inline void unlock () { int rc = pthread_mutex_unlock (&mutex); - errno_assert (rc == 0); + if (rc) + posix_assert (rc); } - + private: - + pthread_mutex_t mutex; - - // Disable copy construction and assignment. + + // Disable copy construction and assignment. mutex_t (const mutex_t&); void operator = (const mutex_t&); }; - + } #endif diff --git a/src/object.cpp b/src/object.cpp index 36f3937..e2267d6 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -18,19 +18,19 @@ */ #include "object.hpp" -#include "context.hpp" +#include "dispatcher.hpp" #include "err.hpp" #include "io_thread.hpp" #include "simple_semaphore.hpp" -zmq::object_t::object_t (context_t *context_, int thread_slot_) : - context (context_), +zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) : + dispatcher (dispatcher_), thread_slot (thread_slot_) { } zmq::object_t::object_t (object_t *parent_) : - context (parent_->context), + dispatcher (parent_->dispatcher), thread_slot (parent_->thread_slot) { } @@ -41,7 +41,7 @@ zmq::object_t::~object_t () int zmq::object_t::thread_slot_count () { - return context->thread_slot_count (); + return dispatcher->thread_slot_count (); } int zmq::object_t::get_thread_slot () @@ -53,45 +53,32 @@ void zmq::object_t::process_command (command_t &cmd_) { switch (cmd_.type) { - case command_t::head: - process_head (cmd_.args.head.bytes); + case command_t::stop: + process_stop (); break; - case command_t::tail: - process_tail (cmd_.args.tail.bytes); - break; + case command_t::plug: + process_plug (); + return; - case command_t::engine: - process_engine (cmd_.args.engine.engine); - break; + case command_t::own: + process_own (cmd_.args.own.object); + return; case command_t::bind: - process_bind (cmd_.args.bind.reader, cmd_.args.bind.peer); - break; - - case command_t::reg: - process_reg (cmd_.args.reg.smph); - break; - - case command_t::reg_and_bind: - process_reg_and_bind (cmd_.args.reg_and_bind.peer, - cmd_.args.reg_and_bind.flow_in, cmd_.args.reg_and_bind.flow_out); - break; - - case command_t::unreg: - process_unreg (cmd_.args.unreg.smph); - break; - - case command_t::terminate: - process_terminate (); - break; + process_bind (); + return; - case command_t::terminate_ack: - process_terminate_ack (); - break; + case command_t::term_req: + process_term_req (cmd_.args.term_req.object); + return; + + case command_t::term: + process_term (); + return; - case command_t::stop: - process_stop (); + case command_t::term_ack: + process_term_ack (); return; default: @@ -101,7 +88,7 @@ void zmq::object_t::process_command (command_t &cmd_) zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) { - return context->choose_io_thread (taskset_); + return dispatcher->choose_io_thread (taskset_); } void zmq::object_t::send_stop () @@ -111,91 +98,56 @@ void zmq::object_t::send_stop () command_t cmd; cmd.destination = this; cmd.type = command_t::stop; - context->write (thread_slot, thread_slot, cmd); + dispatcher->write (thread_slot, thread_slot, cmd); } -void zmq::object_t::send_bind (object_t *destination_, pipe_reader_t *reader_, - session_t *peer_) +void zmq::object_t::send_plug (object_t *destination_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::bind; - cmd.args.bind.reader = reader_; - cmd.args.bind.peer = peer_; + cmd.type = command_t::plug; send_command (cmd); } -void zmq::object_t::send_head (object_t *destination_, uint64_t bytes_) +void zmq::object_t::send_own (object_t *destination_, object_t *object_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::head; - cmd.args.head.bytes = bytes_; + cmd.type = command_t::own; + cmd.args.own.object = object_; send_command (cmd); } -void zmq::object_t::send_tail (object_t *destination_, uint64_t bytes_) +void zmq::object_t::send_bind (object_t *destination_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::tail; - cmd.args.tail.bytes = bytes_; - send_command (cmd); -} - -void zmq::object_t::send_reg (object_t *destination_, simple_semaphore_t *smph_) -{ - command_t cmd; - cmd.destination = destination_; - cmd.type = command_t::reg; - cmd.args.reg.smph = smph_; - send_command (cmd); -} - -void zmq::object_t::send_reg_and_bind (object_t *destination_, - session_t *peer_, bool flow_in_, bool flow_out_) -{ - command_t cmd; - cmd.destination = destination_; - cmd.type = command_t::reg_and_bind; - cmd.args.reg_and_bind.peer = peer_; - cmd.args.reg_and_bind.flow_in = flow_in_; - cmd.args.reg_and_bind.flow_out = flow_out_; - send_command (cmd); -} - -void zmq::object_t::send_unreg (object_t *destination_, - simple_semaphore_t *smph_) -{ - command_t cmd; - cmd.destination = destination_; - cmd.type = command_t::unreg; - cmd.args.unreg.smph = smph_; + cmd.type = command_t::bind; send_command (cmd); } -void zmq::object_t::send_engine (object_t *destination_, i_engine *engine_) +void zmq::object_t::send_term_req (object_t *destination_, object_t *object_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::engine; - cmd.args.engine.engine = engine_; + cmd.type = command_t::term_req; + cmd.args.term_req.object = object_; send_command (cmd); } -void zmq::object_t::send_terminate (object_t *destination_) +void zmq::object_t::send_term (object_t *destination_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::terminate; + cmd.type = command_t::term; send_command (cmd); } -void zmq::object_t::send_terminate_ack (object_t *destination_) +void zmq::object_t::send_term_ack (object_t *destination_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::terminate_ack; + cmd.type = command_t::term_ack; send_command (cmd); } @@ -204,48 +156,32 @@ void zmq::object_t::process_stop () zmq_assert (false); } -void zmq::object_t::process_bind (pipe_reader_t *reader_, session_t *peer_) -{ - zmq_assert (false); -} - -void zmq::object_t::process_head (uint64_t bytes_) -{ - zmq_assert (false); -} - -void zmq::object_t::process_tail (uint64_t bytes_) -{ - zmq_assert (false); -} - -void zmq::object_t::process_reg (simple_semaphore_t *smph_) +void zmq::object_t::process_plug () { zmq_assert (false); } -void zmq::object_t::process_reg_and_bind (session_t *session_, - bool flow_in_, bool flow_out_) +void zmq::object_t::process_own (object_t *object_) { zmq_assert (false); } -void zmq::object_t::process_unreg (simple_semaphore_t *smph_) +void zmq::object_t::process_bind () { zmq_assert (false); } -void zmq::object_t::process_engine (i_engine *engine_) +void zmq::object_t::process_term_req (object_t *object_) { zmq_assert (false); } -void zmq::object_t::process_terminate () +void zmq::object_t::process_term () { zmq_assert (false); } -void zmq::object_t::process_terminate_ack () +void zmq::object_t::process_term_ack () { zmq_assert (false); } @@ -256,6 +192,6 @@ void zmq::object_t::send_command (command_t &cmd_) if (destination_thread_slot == thread_slot) cmd_.destination->process_command (cmd_); else - context->write (thread_slot, destination_thread_slot, cmd_); + dispatcher->write (thread_slot, destination_thread_slot, cmd_); } diff --git a/src/object.hpp b/src/object.hpp index 5851c68..7357549 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -32,7 +32,7 @@ namespace zmq { public: - object_t (class context_t *context_, int thread_slot_); + object_t (class dispatcher_t *dispatcher_, int thread_slot_); object_t (object_t *parent_); ~object_t (); @@ -42,44 +42,32 @@ namespace zmq protected: // Derived object can use following functions to interact with - // global repositories. See context.hpp for function details. + // global repositories. See dispatcher.hpp for function details. int thread_slot_count (); class io_thread_t *choose_io_thread (uint64_t taskset_); // Derived object can use these functions to send commands // to other objects. void send_stop (); - void send_bind (object_t *destination_, class pipe_reader_t *reader_, - class session_t *peer_); - void send_head (object_t *destination_, uint64_t bytes_); - void send_tail (object_t *destination_, uint64_t bytes_); - void send_reg (object_t *destination_, - class simple_semaphore_t *smph_); - void send_reg_and_bind (object_t *destination_, class session_t *peer_, - bool flow_in_, bool flow_out_); - void send_unreg (object_t *destination_, - class simple_semaphore_t *smph_); - void send_engine (object_t *destination_, struct i_engine *engine_); - void send_terminate (object_t *destination_); - void send_terminate_ack (object_t *destination_); + void send_plug (object_t *destination_); + void send_own (object_t *destination_, object_t *object_); + void send_bind (object_t *destination_); + void send_term_req (object_t *destination_, object_t *object_); + void send_term (object_t *destination_); + void send_term_ack (object_t *destination_); // These handlers can be overloaded by the derived objects. They are // called when command arrives from another thread. virtual void process_stop (); - virtual void process_bind (class pipe_reader_t *reader_, - class session_t *peer_); - virtual void process_head (uint64_t bytes_); - virtual void process_tail (uint64_t bytes_); - virtual void process_reg (class simple_semaphore_t *smph_); - virtual void process_reg_and_bind (class session_t *peer_, - bool flow_in_, bool flow_out_); - virtual void process_unreg (class simple_semaphore_t *smph_); - virtual void process_engine (struct i_engine *engine_); - virtual void process_terminate (); - virtual void process_terminate_ack (); + virtual void process_plug (); + virtual void process_own (object_t *object_); + virtual void process_bind (); + virtual void process_term_req (object_t *object_); + virtual void process_term (); + virtual void process_term_ack (); // Pointer to the root of the infrastructure. - class context_t *context; + class dispatcher_t *dispatcher; // Slot ID of the thread the object belongs to. int thread_slot; diff --git a/src/socket_base.cpp b/src/socket_base.cpp new file mode 100644 index 0000000..3737410 --- /dev/null +++ b/src/socket_base.cpp @@ -0,0 +1,129 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <algorithm> + +#include "../include/zmq.h" + +#include "socket_base.hpp" +#include "app_thread.hpp" +#include "err.hpp" +#include "zmq_listener.hpp" +#include "io_thread.hpp" + +zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : + object_t (parent_), + pending_term_acks (0), + app_thread (parent_) +{ +} + +zmq::socket_base_t::~socket_base_t () +{ + while (true) { + + // On third pass of the loop there should be no more I/O objects + // because all connecters and listerners were destroyed during + // the first pass and all engines delivered by delayed 'own' commands + // are destroyed during the second pass. + if (io_objects.empty () && !pending_term_acks) + break; + + // Send termination request to all associated I/O objects. + for (io_objects_t::size_type i = 0; i != io_objects.size (); i++) + send_term (io_objects [i]); + + // Move the objects to the list of pending term acks. + pending_term_acks += io_objects.size (); + io_objects.clear (); + + // Process commands till we get all the termination acknowledgements. + while (pending_term_acks) + app_thread->process_commands (true); + } +} + +int zmq::socket_base_t::bind (const char *addr_, struct zmq_opts *opts_) +{ + uint64_t taskset = opts_ ? opts_->taskset : 0; + object_t *listener = new zmq_listener_t (choose_io_thread (taskset), this); + send_plug (listener); + send_own (this, listener); + return 0; +} + +int zmq::socket_base_t::connect (const char *addr_, struct zmq_opts *opts_) +{ + zmq_assert (false); +} + +int zmq::socket_base_t::subscribe (const char *criteria_) +{ + zmq_assert (false); +} + +int zmq::socket_base_t::send (struct zmq_msg *msg_, int flags_) +{ + zmq_assert (false); +} + +int zmq::socket_base_t::flush () +{ + zmq_assert (false); +} + +int zmq::socket_base_t::recv (struct zmq_msg *msg_, int flags_) +{ + zmq_assert (false); +} + +int zmq::socket_base_t::close () +{ + app_thread->remove_socket (this); + delete this; + return 0; +} + +void zmq::socket_base_t::process_own (object_t *object_) +{ + io_objects.push_back (object_); +} + +void zmq::socket_base_t::process_term_req (object_t *object_) +{ + // If I/O object is well and alive ask it to terminate. + // TODO: Following find may produce an unacceptable jitter in + // C10K-style applications. If so, use set instead of vector. + io_objects_t::iterator it = std::find (io_objects.begin (), + io_objects.end (), object_); + if (it != io_objects.end ()) { + pending_term_acks++; + io_objects.erase (it); + send_term (object_); + } + + // If not found, we assume that termination request was already sent to + // the object so we can sagely ignore the request. +} + +void zmq::socket_base_t::process_term_ack () +{ + zmq_assert (pending_term_acks); + pending_term_acks--; +} diff --git a/src/socket_base.hpp b/src/socket_base.hpp new file mode 100644 index 0000000..633f003 --- /dev/null +++ b/src/socket_base.hpp @@ -0,0 +1,72 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__ +#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__ + +#include <vector> + +#include "i_api.hpp" +#include "object.hpp" + +namespace zmq +{ + + class socket_base_t : public object_t, public i_api + { + public: + + socket_base_t (class app_thread_t *parent_); + ~socket_base_t (); + + // i_api interface implementation. + int bind (const char *addr_, struct zmq_opts *opts_); + int connect (const char *addr_, struct zmq_opts *opts_); + int subscribe (const char *criteria_); + int send (struct zmq_msg *msg_, int flags_); + int flush (); + int recv (struct zmq_msg *msg_, int flags_); + int close (); + + private: + + // Handlers for incoming commands. + void process_own (object_t *object_); + void process_term_req (object_t *object_); + void process_term_ack (); + + // List of all I/O objects owned by this socket. The socket is + // responsible for deallocating them before it quits. + typedef std::vector <object_t*> io_objects_t; + io_objects_t io_objects; + + // Number of I/O objects that were already asked to terminate + // but haven't acknowledged it yet. + int pending_term_acks; + + // Application thread the socket lives in. + class app_thread_t *app_thread; + + socket_base_t (const socket_base_t&); + void operator = (const socket_base_t&); + }; + +} + +#endif diff --git a/src/zmq.cpp b/src/zmq.cpp index d19b229..149a7e2 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -25,7 +25,7 @@ #include "i_api.hpp" #include "err.hpp" -#include "context.hpp" +#include "dispatcher.hpp" #include "msg.hpp" int zmq_msg_init (zmq_msg *msg_) @@ -162,27 +162,28 @@ int zmq_msg_type (zmq_msg *msg_) void *zmq_init (int app_threads_, int io_threads_) { - // There should be at least a single thread managed by the context. + // There should be at least a single thread managed by the dispatcher. if (app_threads_ < 0 || io_threads_ < 0 || app_threads_ + io_threads_ == 0) { errno = EINVAL; return NULL; } - zmq::context_t *context = new zmq::context_t (app_threads_, io_threads_); - zmq_assert (context); - return (void*) context; + zmq::dispatcher_t *dispatcher = new zmq::dispatcher_t (app_threads_, + io_threads_); + zmq_assert (dispatcher); + return (void*) dispatcher; } -int zmq_term (void *context_) +int zmq_term (void *dispatcher_) { - delete (zmq::context_t*) context_; + delete (zmq::dispatcher_t*) dispatcher_; return 0; } -void *zmq_socket (void *context_, int type_) +void *zmq_socket (void *dispatcher_, int type_) { - return (void*) (((zmq::context_t*) context_)->create_socket (type_)); + return (void*) (((zmq::dispatcher_t*) dispatcher_)->create_socket (type_)); } int zmq_close (void *s_) diff --git a/src/i_socket.hpp b/src/zmq_listener.cpp index 99ade8a..1f1e012 100644 --- a/src/i_socket.hpp +++ b/src/zmq_listener.cpp @@ -17,20 +17,19 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_I_SOCKET_HPP_INCLUDED__ -#define __ZMQ_I_SOCKET_HPP_INCLUDED__ +#include "zmq_listener.hpp" +#include "err.hpp" -namespace zmq +zmq::zmq_listener_t::zmq_listener_t (object_t *parent_, object_t *owner_) : + io_object_t (parent_, owner_) { +} - struct i_socket - { - virtual ~i_socket () {}; - - // Start shutting down the socket. - virtual void stop () = 0; - }; - +zmq::zmq_listener_t::~zmq_listener_t () +{ } -#endif +void zmq::zmq_listener_t::process_plug () +{ + // TODO: Register with the I/O thread here. +} diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp new file mode 100644 index 0000000..12192b2 --- /dev/null +++ b/src/zmq_listener.hpp @@ -0,0 +1,46 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ +#define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ + +#include "io_object.hpp" + +namespace zmq +{ + + class zmq_listener_t : public io_object_t + { + public: + + zmq_listener_t (object_t *parent_, object_t *owner_); + ~zmq_listener_t (); + + private: + + // Handlers for incoming commands. + void process_plug (); + + zmq_listener_t (const zmq_listener_t&); + void operator = (const zmq_listener_t&); + }; + +} + +#endif |