diff options
author | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-08-08 16:01:58 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-08-08 16:01:58 +0200 |
commit | a8b410e66c3c75809c8e9c01dd3e35c579f02347 (patch) | |
tree | 7af63906dce0216f86e5ff0767efaddfd6492cfd /src | |
parent | 0b5cc026fbe7ccc6de66907be29471562a2d344d (diff) |
lockfree interaction patter for 3 theads implemented
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 14 | ||||
-rw-r--r-- | src/app_thread.cpp | 37 | ||||
-rw-r--r-- | src/app_thread.hpp | 13 | ||||
-rw-r--r-- | src/command.hpp | 59 | ||||
-rw-r--r-- | src/dispatcher.cpp (renamed from src/context.cpp) | 22 | ||||
-rw-r--r-- | src/dispatcher.hpp (renamed from src/context.hpp) | 20 | ||||
-rw-r--r-- | src/err.hpp | 6 | ||||
-rw-r--r-- | src/i_api.hpp | 34 | ||||
-rw-r--r-- | src/io_object.cpp | 41 | ||||
-rw-r--r-- | src/io_object.hpp | 62 | ||||
-rw-r--r-- | src/io_thread.cpp | 8 | ||||
-rw-r--r-- | src/io_thread.hpp | 2 | ||||
-rw-r--r-- | src/mutex.hpp | 30 | ||||
-rw-r--r-- | src/object.cpp | 158 | ||||
-rw-r--r-- | src/object.hpp | 42 | ||||
-rw-r--r-- | src/socket_base.cpp | 129 | ||||
-rw-r--r-- | src/socket_base.hpp | 72 | ||||
-rw-r--r-- | src/zmq.cpp | 19 | ||||
-rw-r--r-- | src/zmq_listener.cpp (renamed from src/i_socket.hpp) | 23 | ||||
-rw-r--r-- | src/zmq_listener.hpp | 46 |
20 files changed, 569 insertions, 268 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index bde9c39..47037a2 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -7,14 +7,15 @@ libzmq_la_SOURCES = \ atomic_ptr.hpp \ command.hpp \ config.hpp \ - context.hpp \ decoder.hpp \ devpoll.hpp \ + dispatcher.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ fd.hpp \ fd_signaler.hpp \ + io_object.hpp \ io_thread.hpp \ ip.hpp \ i_api.hpp \ @@ -31,6 +32,7 @@ libzmq_la_SOURCES = \ poll.hpp \ select.hpp \ simple_semaphore.hpp \ + socket_base.hpp \ stdint.hpp \ tcp_connecter.hpp \ tcp_listener.hpp \ @@ -42,25 +44,29 @@ libzmq_la_SOURCES = \ ypipe.hpp \ ypollset.hpp \ yqueue.hpp \ + zmq_listener.hpp \ app_thread.cpp \ - context.cpp \ - devpoll.hpp \ + devpoll.cpp \ + dispatcher.cpp \ epoll.cpp \ err.cpp \ fd_signaler.cpp \ + io_object.cpp \ io_thread.cpp \ ip.cpp \ kqueue.cpp \ object.cpp \ poll.cpp \ select.cpp \ + socket_base.cpp \ tcp_connecter.cpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ uuid.cpp \ ypollset.cpp \ - zmq.cpp + zmq.cpp \ + zmq_listener.cpp libzmq_la_LDFLAGS = -version-info 0:0:0 libzmq_la_CXXFLAGS = -Wall -pedantic -Werror @ZMQ_EXTRA_CXXFLAGS@ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 23a055a..3f76970 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <algorithm> + #include "../include/zmq.h" #if defined ZMQ_HAVE_WINDOWS @@ -26,10 +28,12 @@ #endif #include "app_thread.hpp" -#include "context.hpp" +#include "i_api.hpp" +#include "dispatcher.hpp" #include "err.hpp" #include "pipe.hpp" #include "config.hpp" +#include "socket_base.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -39,8 +43,8 @@ #define ZMQ_DELAY_COMMANDS #endif -zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) : - object_t (context_, thread_slot_), +zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : + object_t (dispatcher_, thread_slot_), tid (0), last_processing_time (0) { @@ -48,13 +52,9 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) : zmq::app_thread_t::~app_thread_t () { - // Ask all the sockets to start termination, then wait till it is complete. - for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) - (*it)->stop (); + // Destroy all the sockets owned by this application thread. for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) delete *it; - - delete this; } zmq::i_signaler *zmq::app_thread_t::get_signaler () @@ -123,9 +123,28 @@ void zmq::app_thread_t::process_commands (bool block_) for (int i = 0; i != thread_slot_count (); i++) { if (signals & (ypollset_t::signals_t (1) << i)) { command_t cmd; - while (context->read (i, get_thread_slot (), &cmd)) + while (dispatcher->read (i, get_thread_slot (), &cmd)) cmd.destination->process_command (cmd); } } } } + +zmq::i_api *zmq::app_thread_t::create_socket (int type_) +{ + // TODO: type is ignored for the time being. + socket_base_t *s = new socket_base_t (this); + zmq_assert (s); + sockets.push_back (s); + return s; +} + +void zmq::app_thread_t::remove_socket (i_api *socket_) +{ + // TODO: To speed this up we can possibly use the system where each socket + // holds its index (see I/O scheduler implementation). + sockets_t::iterator it = std::find (sockets.begin (), sockets.end (), + socket_); + zmq_assert (it != sockets.end ()); + sockets.erase (it); +} diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 31679b8..59e4a25 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -22,7 +22,6 @@ #include <vector> -#include "i_socket.hpp" #include "stdint.hpp" #include "object.hpp" #include "ypollset.hpp" @@ -34,7 +33,7 @@ namespace zmq { public: - app_thread_t (class context_t *context_, int thread_slot_); + app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); ~app_thread_t (); @@ -42,7 +41,7 @@ namespace zmq i_signaler *get_signaler (); // Nota bene: Following two functions are accessed from different - // threads. The caller (context) is responsible for synchronisation + // threads. The caller (dispatcher) is responsible for synchronisation // of accesses. // Returns true is current thread is associated with the app thread. @@ -56,10 +55,16 @@ namespace zmq // set to true, returns only after at least one command was processed. void process_commands (bool block_); + // Create a socket of a specified type. + struct i_api *create_socket (int type_); + + // Unregister the socket from the app_thread (called by socket itself). + void remove_socket (struct i_api *socket_); + private: // All the sockets created from this application thread. - typedef std::vector <i_socket*> sockets_t; + typedef std::vector <struct i_api*> sockets_t; sockets_t sockets; // Thread ID associated with this slot. diff --git a/src/command.hpp b/src/command.hpp index 69c4e57..de94ca3 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -35,60 +35,49 @@ namespace zmq enum type_t { stop, + plug, + own, bind, - head, - tail, - reg, - reg_and_bind, - unreg, - engine, - terminate, - terminate_ack + term_req, + term, + term_ack + } type; union { + // Sent to I/O thread to let it know that it should + // terminate itself. struct { } stop; + // Sent to I/O object to make it register with its I/O thread. struct { - class pipe_reader_t *reader; - class session_t *peer; - } bind; + } plug; + // Sent to socket to let it know about the newly created object. struct { - uint64_t bytes; - } tail; + class object_t *object; + } own; + // Sent between objects to establish pipe(s) between them. struct { - uint64_t bytes; - } head; - - struct { - class simple_semaphore_t *smph; - } reg; - - struct { - class session_t *peer; - bool flow_in; - bool flow_out; - } reg_and_bind; - - struct { - class simple_semaphore_t *smph; - } unreg; + } bind; - // TODO: Engine object won't be deallocated on terminal shutdown - // while the command is still on the fly! + // Sent by I/O object ot the socket to request the shutdown of + // the I/O object. struct { - class i_engine *engine; - } engine; + class object_t *object; + } term_req; + // Sent by socket to I/O object to start its shutdown. struct { - } terminate; + } term; + // Sent by I/O object to the socket to acknowledge it has + // shut down. struct { - } terminate_ack; + } term_ack; } args; }; diff --git a/src/context.cpp b/src/dispatcher.cpp index 6b071cf..0b68880 100644 --- a/src/context.cpp +++ b/src/dispatcher.cpp @@ -19,7 +19,7 @@ #include "../include/zmq.h" -#include "context.hpp" +#include "dispatcher.hpp" #include "i_api.hpp" #include "app_thread.hpp" #include "io_thread.hpp" @@ -31,7 +31,7 @@ #include "windows.h" #endif -zmq::context_t::context_t (int app_threads_, int io_threads_) +zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) { #ifdef ZMQ_HAVE_WINDOWS // Intialise Windows sockets. Note that WSAStartup can be called multiple @@ -69,7 +69,7 @@ zmq::context_t::context_t (int app_threads_, int io_threads_) io_threads [i]->start (); } -zmq::context_t::~context_t () +zmq::dispatcher_t::~dispatcher_t () { // Close all application theads, sockets, io_objects etc. for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) @@ -93,12 +93,12 @@ zmq::context_t::~context_t () #endif } -int zmq::context_t::thread_slot_count () +int zmq::dispatcher_t::thread_slot_count () { return signalers.size (); } -zmq::i_api *zmq::context_t::create_socket (int type_) +zmq::i_api *zmq::dispatcher_t::create_socket (int type_) { threads_sync.lock (); app_thread_t *thread = choose_app_thread (); @@ -106,16 +106,12 @@ zmq::i_api *zmq::context_t::create_socket (int type_) threads_sync.unlock (); return NULL; } - - zmq_assert (false); - i_api *s = NULL; - //i_api *s = thread->create_socket (type_); - threads_sync.unlock (); - return s; + + return thread->create_socket (type_); } -zmq::app_thread_t *zmq::context_t::choose_app_thread () +zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () { // Check whether thread ID is already assigned. If so, return it. for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) @@ -132,7 +128,7 @@ zmq::app_thread_t *zmq::context_t::choose_app_thread () return NULL; } -zmq::io_thread_t *zmq::context_t::choose_io_thread (uint64_t taskset_) +zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_) { zmq_assert (io_threads.size () > 0); diff --git a/src/context.hpp b/src/dispatcher.hpp index f2eab1c..08ffab1 100644 --- a/src/context.hpp +++ b/src/dispatcher.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_CONTEXT_HPP_INCLUDED__ -#define __ZMQ_CONTEXT_HPP_INCLUDED__ +#ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__ +#define __ZMQ_DISPATCHER_HPP_INCLUDED__ #include <vector> #include <map> @@ -37,27 +37,27 @@ namespace zmq // Dispatcher implements bidirectional thread-safe passing of commands // between N threads. It consists of a ypipes to pass commands and // signalers to wake up the receiver thread when new commands are - // available. Note that context is inefficient for passing messages + // available. Note that dispatcher is inefficient for passing messages // within a thread (sender thread = receiver thread). The optimisation is // not part of the class and should be implemented by individual threads // (presumably by calling the command handling function directly). - class context_t + class dispatcher_t { public: - // Create the context object. Matrix of pipes to communicate between + // Create the dispatcher object. Matrix of pipes to communicate between // each socket and each I/O thread is created along with appropriate // signalers. - context_t (int app_threads_, int io_threads_); + dispatcher_t (int app_threads_, int io_threads_); // To be called to terminate the whole infrastructure (zmq_term). - ~context_t (); + ~dispatcher_t (); // Create a socket. struct i_api *create_socket (int type_); - // Returns number of thread slots in the context. To be used by + // Returns number of thread slots in the dispatcher. To be used by // individual threads to find out how many distinct signals can be // received. int thread_slot_count (); @@ -112,8 +112,8 @@ namespace zmq // Synchronisation of accesses to shared thread data. mutex_t threads_sync; - context_t (const context_t&); - void operator = (const context_t&); + dispatcher_t (const dispatcher_t&); + void operator = (const dispatcher_t&); }; } diff --git a/src/err.hpp b/src/err.hpp index fdfce01..3854d8a 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -80,6 +80,12 @@ namespace zmq abort ();\ }} while (false) +// Provides convenient way to check for POSIX errors. +#define posix_assert(x) do {\ +fprintf (stderr, "%s (%s:%d)\n", strerror (x), __FILE__, __LINE__);\ +abort ();\ +} while (false) + // Provides convenient way to check for errors from getaddrinfo. #define gai_assert(x) do { if (x) {\ const char *errstr = gai_strerror (x);\ diff --git a/src/i_api.hpp b/src/i_api.hpp index a87e41d..36afcea 100644 --- a/src/i_api.hpp +++ b/src/i_api.hpp @@ -1,20 +1,20 @@ /* -Copyright (c) 2007-2009 FastMQ Inc. - -This file is part of 0MQ. - -0MQ is free software; you can redistribute it and/or modify it under -the terms of the Lesser GNU General Public License as published by -the Free Software Foundation; either version 3 of the License, or -(at your option) any later version. - -0MQ is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -Lesser GNU General Public License for more details. - -You should have received a copy of the Lesser GNU General Public License -along with this program. If not, see <http://www.gnu.org/licenses/>. + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. */ #ifndef __ZMQ_I_API_HPP_INCLUDED__ @@ -25,6 +25,8 @@ namespace zmq struct i_api { + virtual ~i_api () {} + virtual int bind (const char *addr_, struct zmq_opts *opts_) = 0; virtual int connect (const char *addr_, struct zmq_opts *opts_) = 0; virtual int subscribe (const char *criteria_) = 0; diff --git a/src/io_object.cpp b/src/io_object.cpp new file mode 100644 index 0000000..41e4717 --- /dev/null +++ b/src/io_object.cpp @@ -0,0 +1,41 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "io_object.hpp" + +zmq::io_object_t::io_object_t (object_t *parent_, object_t *owner_) : + object_t (parent_), + owner (owner_) +{ +} + +zmq::io_object_t::~io_object_t () +{ +} + +void zmq::io_object_t::term () +{ + send_term_req (owner, this); +} + +void zmq::io_object_t::process_term () +{ + send_term_ack (owner); + delete this; +} diff --git a/src/io_object.hpp b/src/io_object.hpp new file mode 100644 index 0000000..5ed1830 --- /dev/null +++ b/src/io_object.hpp @@ -0,0 +1,62 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_IO_OBJECT_HPP_INCLUDED__ +#define __ZMQ_IO_OBJECT_HPP_INCLUDED__ + +#include "object.hpp" + +namespace zmq +{ + + class io_object_t : public object_t + { + public: + + // I/O object will live in the thread inherited from the parent. + // However, it's lifetime is managed by the owner. + io_object_t (object_t *parent_, object_t *owner_); + + protected: + + // Ask owner socket to terminate this I/O object. This may not happen + void term (); + + // I/O object destroys itself. No point in allowing others to invoke + // the destructor. At the same time, it has to be virtual so that + // generic io_object deallocation mechanism destroys specific type + // of I/O object correctly. + virtual ~io_object_t (); + + private: + + // Handlers for incoming commands. + void process_term (); + + // Socket owning this I/O object. It is responsible for destroying + // it when it's being closed. + object_t *owner; + + io_object_t (const io_object_t&); + void operator = (const io_object_t&); + }; + +} + +#endif diff --git a/src/io_thread.cpp b/src/io_thread.cpp index f5261a6..1d85292 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -29,11 +29,11 @@ #include "select.hpp" #include "devpoll.hpp" #include "kqueue.hpp" -#include "context.hpp" +#include "dispatcher.hpp" #include "simple_semaphore.hpp" -zmq::io_thread_t::io_thread_t (context_t *context_, int thread_slot_) : - object_t (context_, thread_slot_) +zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : + object_t (dispatcher_, thread_slot_) { #if defined ZMQ_FORCE_SELECT poller = new select_t; @@ -115,7 +115,7 @@ void zmq::io_thread_t::in_event () // Read all the commands from particular thread. command_t cmd; - while (context->read (source_thread_slot, thread_slot, &cmd)) + while (dispatcher->read (source_thread_slot, thread_slot, &cmd)) cmd.destination->process_command (cmd); } } diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 43ee19e..6f25627 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -37,7 +37,7 @@ namespace zmq { public: - io_thread_t (class context_t *context_, int thread_slot_); + io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); // Clean-up. If the thread was started, it's neccessary to call 'stop' // before invoking destructor. Otherwise the destructor would hang up. diff --git a/src/mutex.hpp b/src/mutex.hpp index 9b51955..e233c9e 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -72,43 +72,47 @@ namespace zmq namespace zmq { - + class mutex_t { public: inline mutex_t () { int rc = pthread_mutex_init (&mutex, NULL); - errno_assert (rc == 0); + if (rc) + posix_assert (rc); } - + inline ~mutex_t () { int rc = pthread_mutex_destroy (&mutex); - errno_assert (rc == 0); + if (rc) + posix_assert (rc); } - + inline void lock () { int rc = pthread_mutex_lock (&mutex); - errno_assert (rc == 0); + if (rc) + posix_assert (rc); } - + inline void unlock () { int rc = pthread_mutex_unlock (&mutex); - errno_assert (rc == 0); + if (rc) + posix_assert (rc); } - + private: - + pthread_mutex_t mutex; - - // Disable copy construction and assignment. + + // Disable copy construction and assignment. mutex_t (const mutex_t&); void operator = (const mutex_t&); }; - + } #endif diff --git a/src/object.cpp b/src/object.cpp index 36f3937..e2267d6 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -18,19 +18,19 @@ */ #include "object.hpp" -#include "context.hpp" +#include "dispatcher.hpp" #include "err.hpp" #include "io_thread.hpp" #include "simple_semaphore.hpp" -zmq::object_t::object_t (context_t *context_, int thread_slot_) : - context (context_), +zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) : + dispatcher (dispatcher_), thread_slot (thread_slot_) { } zmq::object_t::object_t (object_t *parent_) : - context (parent_->context), + dispatcher (parent_->dispatcher), thread_slot (parent_->thread_slot) { } @@ -41,7 +41,7 @@ zmq::object_t::~object_t () int zmq::object_t::thread_slot_count () { - return context->thread_slot_count (); + return dispatcher->thread_slot_count (); } int zmq::object_t::get_thread_slot () @@ -53,45 +53,32 @@ void zmq::object_t::process_command (command_t &cmd_) { switch (cmd_.type) { - case command_t::head: - process_head (cmd_.args.head.bytes); + case command_t::stop: + process_stop (); break; - case command_t::tail: - process_tail (cmd_.args.tail.bytes); - break; + case command_t::plug: + process_plug (); + return; - case command_t::engine: - process_engine (cmd_.args.engine.engine); - break; + case command_t::own: + process_own (cmd_.args.own.object); + return; case command_t::bind: - process_bind (cmd_.args.bind.reader, cmd_.args.bind.peer); - break; - - case command_t::reg: - process_reg (cmd_.args.reg.smph); - break; - - case command_t::reg_and_bind: - process_reg_and_bind (cmd_.args.reg_and_bind.peer, - cmd_.args.reg_and_bind.flow_in, cmd_.args.reg_and_bind.flow_out); - break; - - case command_t::unreg: - process_unreg (cmd_.args.unreg.smph); - break; - - case command_t::terminate: - process_terminate (); - break; + process_bind (); + return; - case command_t::terminate_ack: - process_terminate_ack (); - break; + case command_t::term_req: + process_term_req (cmd_.args.term_req.object); + return; + + case command_t::term: + process_term (); + return; - case command_t::stop: - process_stop (); + case command_t::term_ack: + process_term_ack (); return; default: @@ -101,7 +88,7 @@ void zmq::object_t::process_command (command_t &cmd_) zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) { - return context->choose_io_thread (taskset_); + return dispatcher->choose_io_thread (taskset_); } void zmq::object_t::send_stop () @@ -111,91 +98,56 @@ void zmq::object_t::send_stop () command_t cmd; cmd.destination = this; cmd.type = command_t::stop; - context->write (thread_slot, thread_slot, cmd); + dispatcher->write (thread_slot, thread_slot, cmd); } -void zmq::object_t::send_bind (object_t *destination_, pipe_reader_t *reader_, - session_t *peer_) +void zmq::object_t::send_plug (object_t *destination_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::bind; - cmd.args.bind.reader = reader_; - cmd.args.bind.peer = peer_; + cmd.type = command_t::plug; send_command (cmd); } -void zmq::object_t::send_head (object_t *destination_, uint64_t bytes_) +void zmq::object_t::send_own (object_t *destination_, object_t *object_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::head; - cmd.args.head.bytes = bytes_; + cmd.type = command_t::own; + cmd.args.own.object = object_; send_command (cmd); } -void zmq::object_t::send_tail (object_t *destination_, uint64_t bytes_) +void zmq::object_t::send_bind (object_t *destination_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::tail; - cmd.args.tail.bytes = bytes_; - send_command (cmd); -} - -void zmq::object_t::send_reg (object_t *destination_, simple_semaphore_t *smph_) -{ - command_t cmd; - cmd.destination = destination_; - cmd.type = command_t::reg; - cmd.args.reg.smph = smph_; - send_command (cmd); -} - -void zmq::object_t::send_reg_and_bind (object_t *destination_, - session_t *peer_, bool flow_in_, bool flow_out_) -{ - command_t cmd; - cmd.destination = destination_; - cmd.type = command_t::reg_and_bind; - cmd.args.reg_and_bind.peer = peer_; - cmd.args.reg_and_bind.flow_in = flow_in_; - cmd.args.reg_and_bind.flow_out = flow_out_; - send_command (cmd); -} - -void zmq::object_t::send_unreg (object_t *destination_, - simple_semaphore_t *smph_) -{ - command_t cmd; - cmd.destination = destination_; - cmd.type = command_t::unreg; - cmd.args.unreg.smph = smph_; + cmd.type = command_t::bind; send_command (cmd); } -void zmq::object_t::send_engine (object_t *destination_, i_engine *engine_) +void zmq::object_t::send_term_req (object_t *destination_, object_t *object_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::engine; - cmd.args.engine.engine = engine_; + cmd.type = command_t::term_req; + cmd.args.term_req.object = object_; send_command (cmd); } -void zmq::object_t::send_terminate (object_t *destination_) +void zmq::object_t::send_term (object_t *destination_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::terminate; + cmd.type = command_t::term; send_command (cmd); } -void zmq::object_t::send_terminate_ack (object_t *destination_) +void zmq::object_t::send_term_ack (object_t *destination_) { command_t cmd; cmd.destination = destination_; - cmd.type = command_t::terminate_ack; + cmd.type = command_t::term_ack; send_command (cmd); } @@ -204,48 +156,32 @@ void zmq::object_t::process_stop () zmq_assert (false); } -void zmq::object_t::process_bind (pipe_reader_t *reader_, session_t *peer_) -{ - zmq_assert (false); -} - -void zmq::object_t::process_head (uint64_t bytes_) -{ - zmq_assert (false); -} - -void zmq::object_t::process_tail (uint64_t bytes_) -{ - zmq_assert (false); -} - -void zmq::object_t::process_reg (simple_semaphore_t *smph_) +void zmq::object_t::process_plug () { zmq_assert (false); } -void zmq::object_t::process_reg_and_bind (session_t *session_, - bool flow_in_, bool flow_out_) +void zmq::object_t::process_own (object_t *object_) { zmq_assert (false); } -void zmq::object_t::process_unreg (simple_semaphore_t *smph_) +void zmq::object_t::process_bind () { zmq_assert (false); } -void zmq::object_t::process_engine (i_engine *engine_) +void zmq::object_t::process_term_req (object_t *object_) { zmq_assert (false); } |