diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2010-08-06 17:49:37 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-08-25 15:39:20 +0200 |
commit | 05d908492dc382941fc633ad7082b5bd86e84e67 (patch) | |
tree | ae10e49766152e42521a6c100e622dc616998143 /src | |
parent | b7e0fa972f45d21e45cacb93a1a92d38fdc11f40 (diff) |
WIP: Socket migration between threads, new zmq_close() semantics
Sockets may now be migrated between OS threads; sockets may not be used by
more than one thread at any time. To migrate a socket to another thread the
caller must ensure that a full memory barrier is called before using the
socket from the target thread.
The new zmq_close() semantics implement the behaviour discussed at:
http://lists.zeromq.org/pipermail/zeromq-dev/2010-July/004244.html
Specifically, zmq_close() is now deterministic and while it still returns
immediately, it does not discard any data that may still be queued for
sending. Further, zmq_term() will now block until all outstanding data has
been sent.
TODO: Many bugs have been introduced, needs testing. Further, SO_LINGER or
an equivalent mechanism (possibly a configurable timeout to zmq_term())
needs to be implemented.
Diffstat (limited to 'src')
46 files changed, 1392 insertions, 1371 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 19a80d0..937372f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -49,7 +49,7 @@ endif nodist_libzmq_la_SOURCES = $(pgm_sources) -libzmq_la_SOURCES = app_thread.hpp \ +libzmq_la_SOURCES = \ atomic_counter.hpp \ atomic_ptr.hpp \ blob.hpp \ @@ -58,7 +58,6 @@ libzmq_la_SOURCES = app_thread.hpp \ ctx.hpp \ decoder.hpp \ devpoll.hpp \ - push.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ @@ -69,7 +68,6 @@ libzmq_la_SOURCES = app_thread.hpp \ io_object.hpp \ io_thread.hpp \ ip.hpp \ - i_endpoint.hpp \ i_engine.hpp \ i_poll_events.hpp \ kqueue.hpp \ @@ -91,10 +89,13 @@ libzmq_la_SOURCES = app_thread.hpp \ pair.hpp \ prefix_tree.hpp \ pub.hpp \ + pull.hpp \ + push.hpp \ queue.hpp \ rep.hpp \ req.hpp \ select.hpp \ + semaphore.hpp \ session.hpp \ signaler.hpp \ socket_base.hpp \ @@ -105,7 +106,6 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ - pull.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ @@ -121,11 +121,9 @@ libzmq_la_SOURCES = app_thread.hpp \ zmq_engine.hpp \ zmq_init.hpp \ zmq_listener.hpp \ - app_thread.cpp \ command.cpp \ ctx.cpp \ devpoll.cpp \ - push.cpp \ epoll.cpp \ err.cpp \ forwarder.cpp \ @@ -139,13 +137,15 @@ libzmq_la_SOURCES = app_thread.hpp \ object.cpp \ options.cpp \ owned.cpp \ + pair.cpp \ pgm_receiver.cpp \ pgm_sender.cpp \ pgm_socket.cpp \ - pair.cpp \ prefix_tree.cpp \ pipe.cpp \ poll.cpp \ + pull.cpp \ + push.cpp \ pub.cpp \ queue.cpp \ rep.cpp \ @@ -160,7 +160,6 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ - pull.cpp \ uuid.cpp \ xrep.cpp \ xreq.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp deleted file mode 100644 index ac59464..0000000 --- a/src/app_thread.cpp +++ /dev/null @@ -1,195 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <new> -#include <algorithm> - -#include "../include/zmq.h" - -#include "platform.hpp" - -#if defined ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#if defined _MSC_VER -#include <intrin.h> -#endif -#else -#include <unistd.h> -#endif - -#include "app_thread.hpp" -#include "ctx.hpp" -#include "err.hpp" -#include "pipe.hpp" -#include "config.hpp" -#include "socket_base.hpp" -#include "pair.hpp" -#include "pub.hpp" -#include "sub.hpp" -#include "req.hpp" -#include "rep.hpp" -#include "xreq.hpp" -#include "xrep.hpp" -#include "pull.hpp" -#include "push.hpp" - -// If the RDTSC is available we use it to prevent excessive -// polling for commands. The nice thing here is that it will work on any -// system with x86 architecture and gcc or MSVC compiler. -#if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\ - (defined _MSC_VER && (defined _M_IX86 || defined _M_X64)) -#define ZMQ_DELAY_COMMANDS -#endif - -zmq::app_thread_t::app_thread_t (ctx_t *ctx_, - uint32_t thread_slot_) : - object_t (ctx_, thread_slot_), - last_processing_time (0), - terminated (false) -{ -} - -zmq::app_thread_t::~app_thread_t () -{ - zmq_assert (sockets.empty ()); -} - -void zmq::app_thread_t::stop () -{ - send_stop (); -} - -zmq::signaler_t *zmq::app_thread_t::get_signaler () -{ - return &signaler; -} - -bool zmq::app_thread_t::process_commands (bool block_, bool throttle_) -{ - bool received; - command_t cmd; - if (block_) { - received = signaler.recv (&cmd, true); - zmq_assert (received); - } - else { - -#if defined ZMQ_DELAY_COMMANDS - // Optimised version of command processing - it doesn't have to check - // for incoming commands each time. It does so only if certain time - // elapsed since last command processing. Command delay varies - // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU - // etc. The optimisation makes sense only on platforms where getting - // a timestamp is a very cheap operation (tens of nanoseconds). - if (throttle_) { - - // Get timestamp counter. -#if defined __GNUC__ - uint32_t low; - uint32_t high; - __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high)); - uint64_t current_time = (uint64_t) high << 32 | low; -#elif defined _MSC_VER - uint64_t current_time = __rdtsc (); -#else -#error -#endif - - // Check whether certain time have elapsed since last command - // processing. - if (current_time - last_processing_time <= max_command_delay) - return !terminated; - last_processing_time = current_time; - } -#endif - - // Check whether there are any commands pending for this thread. - received = signaler.recv (&cmd, false); - } - - // Process all the commands available at the moment. - while (received) { - cmd.destination->process_command (cmd); - received = signaler.recv (&cmd, false); - } - - return !terminated; -} - -zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) -{ - socket_base_t *s = NULL; - switch (type_) { - case ZMQ_PAIR: - s = new (std::nothrow) pair_t (this); - break; - case ZMQ_PUB: - s = new (std::nothrow) pub_t (this); - break; - case ZMQ_SUB: - s = new (std::nothrow) sub_t (this); - break; - case ZMQ_REQ: - s = new (std::nothrow) req_t (this); - break; - case ZMQ_REP: - s = new (std::nothrow) rep_t (this); - break; - case ZMQ_XREQ: - s = new (std::nothrow) xreq_t (this); - break; - case ZMQ_XREP: - s = new (std::nothrow) xrep_t (this); - break; - case ZMQ_PULL: - s = new (std::nothrow) pull_t (this); - break; - case ZMQ_PUSH: - s = new (std::nothrow) push_t (this); - break; - default: - if (sockets.empty ()) - get_ctx ()->no_sockets (this); - errno = EINVAL; - return NULL; - } - zmq_assert (s); - - sockets.push_back (s); - - return s; -} - -void zmq::app_thread_t::remove_socket (socket_base_t *socket_) -{ - sockets.erase (socket_); - if (sockets.empty ()) - get_ctx ()->no_sockets (this); -} - -void zmq::app_thread_t::process_stop () -{ - terminated = true; -} - -bool zmq::app_thread_t::is_terminated () -{ - return terminated; -} - diff --git a/src/app_thread.hpp b/src/app_thread.hpp deleted file mode 100644 index f0deaab..0000000 --- a/src/app_thread.hpp +++ /dev/null @@ -1,88 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_APP_THREAD_HPP_INCLUDED__ -#define __ZMQ_APP_THREAD_HPP_INCLUDED__ - -#include <vector> - -#include "stdint.hpp" -#include "object.hpp" -#include "yarray.hpp" -#include "signaler.hpp" - -namespace zmq -{ - - class app_thread_t : public object_t - { - public: - - app_thread_t (class ctx_t *ctx_, uint32_t thread_slot_); - - ~app_thread_t (); - - // Interrupt blocking call if the app thread is stuck in one. - // This function is is called from a different thread! - void stop (); - - // Returns signaler associated with this application thread. - signaler_t *get_signaler (); - - // Processes commands sent to this thread (if any). If 'block' is - // set to true, returns only after at least one command was processed. - // If throttle argument is true, commands are processed at most once - // in a predefined time period. The function returns false is the - // associated context was terminated, true otherwise. - bool process_commands (bool block_, bool throttle_); - - // Create a socket of a specified type. - class socket_base_t *create_socket (int type_); - - // Unregister the socket from the app_thread (called by socket itself). - void remove_socket (class socket_base_t *socket_); - - // Returns true is the associated context was already terminated. - bool is_terminated (); - - private: - - // Command handlers. - void process_stop (); - - // All the sockets created from this application thread. - typedef yarray_t <socket_base_t> sockets_t; - sockets_t sockets; - - // App thread's signaler object. - signaler_t signaler; - - // Timestamp of when commands were processed the last time. - uint64_t last_processing_time; - - // If true, 'stop' command was already received. - bool terminated; - - app_thread_t (const app_thread_t&); - void operator = (const app_thread_t&); - }; - -} - -#endif diff --git a/src/config.hpp b/src/config.hpp index 2c0ac2d..83e612a 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -27,9 +27,8 @@ namespace zmq enum { - // Maximal number of OS threads that can own 0MQ sockets - // at the same time. - max_app_threads = 512, + // Maximum number of sockets that can be opened at the same time. + max_sockets = 512, // Number of new messages in message pipe needed to trigger new memory // allocation. Setting this parameter to 256 decreases the impact of diff --git a/src/ctx.cpp b/src/ctx.cpp index 397f692..91157a5 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -24,7 +24,6 @@ #include "ctx.hpp" #include "socket_base.hpp" -#include "app_thread.hpp" #include "io_thread.hpp" #include "platform.hpp" #include "err.hpp" @@ -32,11 +31,12 @@ #if defined ZMQ_HAVE_WINDOWS #include "windows.h" +#else +#include "unistd.h" #endif zmq::ctx_t::ctx_t (uint32_t io_threads_) : - sockets (0), - terminated (false) + no_sockets_notify (false) { #ifdef ZMQ_HAVE_WINDOWS // Intialise Windows sockets. Note that WSAStartup can be called multiple @@ -50,44 +50,32 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : #endif // Initialise the array of signalers. - signalers_count = max_app_threads + io_threads_; - signalers = (signaler_t**) malloc (sizeof (signaler_t*) * signalers_count); - zmq_assert (signalers); - memset (signalers, 0, sizeof (signaler_t*) * signalers_count); + slot_count = max_sockets + io_threads_; + slots = (signaler_t**) malloc (sizeof (signaler_t*) * slot_count); + zmq_assert (slots); // Create I/O thread objects and launch them. for (uint32_t i = 0; i != io_threads_; i++) { io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); zmq_assert (io_thread); io_threads.push_back (io_thread); - signalers [i] = io_thread->get_signaler (); + slots [i] = io_thread->get_signaler (); io_thread->start (); } -} - -int zmq::ctx_t::term () -{ - // First send stop command to application threads so that any - // blocking calls are interrupted. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - app_threads [i].app_thread->stop (); - - // Then mark context as terminated. - term_sync.lock (); - zmq_assert (!terminated); - terminated = true; - bool destroy = (sockets == 0); - term_sync.unlock (); - - // If there are no sockets open, destroy the context immediately. - if (destroy) - delete this; - return 0; + // In the unused part of the slot array, create a list of empty slots. + for (uint32_t i = slot_count - 1; i >= io_threads_; i--) { + empty_slots.push_back (i); + slots [i] = NULL; + } } zmq::ctx_t::~ctx_t () { + // Check that there are no remaining open or zombie sockets. + zmq_assert (sockets.empty ()); + zmq_assert (zombies.empty ()); + // Ask I/O threads to terminate. If stop signal wasn't sent to I/O // thread subsequent invocation of destructor would hang-up. for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) @@ -97,18 +85,10 @@ zmq::ctx_t::~ctx_t () for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) delete io_threads [i]; - // Close all application theads, sockets, io_objects etc. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - delete app_threads [i].app_thread; - - // Deallocate all the orphaned pipes. - while (!pipes.empty ()) - delete *pipes.begin (); - - // Deallocate the array of pointers to signalers. No special work is + // Deallocate the array of slot. No special work is // needed as signalers themselves were deallocated with their - // corresponding (app_/io_) thread objects. - free (signalers); + // corresponding io_thread/socket objects. + free (slots); #ifdef ZMQ_HAVE_WINDOWS // On Windows, uninitialise socket layer. @@ -117,110 +97,113 @@ zmq::ctx_t::~ctx_t () #endif } -zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) +int zmq::ctx_t::term () { - app_threads_sync.lock (); - - // Find whether the calling thread has app_thread_t object associated - // already. At the same time find an unused app_thread_t so that it can - // be used if there's no associated object for the calling thread. - // Check whether thread ID is already assigned. If so, return it. - app_threads_t::size_type unused = app_threads.size (); - app_threads_t::size_type current; - for (current = 0; current != app_threads.size (); current++) { - if (app_threads [current].associated && - thread_t::equal (thread_t::id (), app_threads [current].tid)) - break; - if (!app_threads [current].associated) - unused = current; + // First send stop command to sockets so that any + // blocking calls are interrupted. + for (sockets_t::size_type i = 0; i != sockets.size (); i++) + sockets [i]->stop (); + + // Find out whether there are any open sockets to care about. + // If so, sleep till they are closed. Note that we can use + // no_sockets_notify safely out of the critical section as once set + // its value is never changed again. + slot_sync.lock (); + if (!sockets.empty ()) + no_sockets_notify = true; + slot_sync.unlock (); + if (no_sockets_notify) + no_sockets_sync.wait (); + + // At this point there's only one application thread (this one) remaining. + // We don't even have to synchronise access to data. + zmq_assert (sockets.empty ()); + + // Get rid of remaining zombie sockets. + while (!zombies.empty ()) { + dezombify (); + + // Sleep for 1ms not to end up busy-looping in the case the I/O threads + // are still busy sending data. We can possibly add a grand poll here + // (polling for fds associated with all the zombie sockets), but it's + // probably not worth of implementing it. +#if defined ZMQ_HAVE_WINDOWS + Sleep (1); +#else + usleep (1000); +#endif } - // If no app_thread_t is associated with the calling thread, - // associate it with one of the unused app_thread_t objects. - if (current == app_threads.size ()) { + // Deallocate the resources. + delete this; - // If all the existing app_threads are already used, create one more. - if (unused == app_threads.size ()) { - - // If max_app_threads limit was reached, return error. - if (app_threads.size () == max_app_threads) { - app_threads_sync.unlock (); - errno = EMTHREAD; - return NULL; - } + return 0; +} - // Create the new application thread proxy object. - app_thread_info_t info; - memset (&info, 0, sizeof (info)); - info.associated = false; - info.app_thread = new (std::nothrow) app_thread_t (this, - io_threads.size () + app_threads.size ()); - zmq_assert (info.app_thread); - signalers [io_threads.size () + app_threads.size ()] = - info.app_thread->get_signaler (); - app_threads.push_back (info); - } +zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) +{ + slot_sync.lock (); - // Incidentally, this works both when there is an unused app_thread - // and when a new one is created. - current = unused; + // Free the slots, if possible. + dezombify (); - // Associate the selected app_thread with the OS thread. - app_threads [current].associated = true; - app_threads [current].tid = thread_t::id (); + // If max_sockets limit was reached, return error. + if (empty_slots.empty ()) { + slot_sync.unlock (); + errno = EMFILE; + return NULL; } - app_thread_t *thread = app_threads [current].app_thread; - app_threads_sync.unlock (); + // Choose a slot for the socket. + uint32_t slot = empty_slots.back (); + empty_slots.pop_back (); - socket_base_t *s = thread->create_socket (type_); - if (!s) + // Create the socket and register its signaler. + socket_base_t *s = socket_base_t::create (type_, this, slot); + if (!s) { + empty_slots.push_back (slot); + slot_sync.unlock (); return NULL; + } + sockets.push_back (s); + slots [slot] = s->get_signaler (); - term_sync.lock (); - sockets++; - term_sync.unlock (); + slot_sync.unlock (); return s; } -void zmq::ctx_t::destroy_socket () +void zmq::ctx_t::zombify (socket_base_t *socket_) { - // If zmq_term was already called and there are no more sockets, - // terminate the whole 0MQ infrastructure. - term_sync.lock (); - zmq_assert (sockets > 0); - sockets--; - bool destroy = (sockets == 0 && terminated); - term_sync.unlock (); - - if (destroy) - delete this; -} + // Zombification of socket basically means that its ownership is tranferred + // from the application that created it to the context. -void zmq::ctx_t::no_sockets (app_thread_t *thread_) -{ - app_threads_sync.lock (); - app_threads_t::size_type i; - for (i = 0; i != app_threads.size (); i++) - if (app_threads [i].app_thread == thread_) { - app_threads [i].associated = false; - break; - } - zmq_assert (i != app_threads.size ()); - app_threads_sync.unlock (); + // Note that the lock provides the memory barrier needed to migrate + // zombie-to-be socket from it's native thread to shared data area + // synchronised by slot_sync. + slot_sync.lock (); + sockets.erase (socket_); + zombies.push_back (socket_); + + // Try to get rid of at least some zombie sockets at this point. + dezombify (); + + // If shutdown thread is interested in notification about no more + // open sockets, notify it now. + if (sockets.empty () && no_sockets_notify) + no_sockets_sync.post (); + + slot_sync.unlock (); } -void zmq::ctx_t::send_command (uint32_t destination_, - const command_t &command_) +void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_) { - signalers [destination_]->send (command_); + slots [slot_]->send (command_); } -bool zmq::ctx_t::recv_command (uint32_t thread_slot_, - command_t *command_, bool block_) +bool zmq::ctx_t::recv_command (uint32_t slot_, command_t *command_, bool block_) { - return signalers [thread_slot_]->recv (command_, block_); + return slots [slot_]->recv (command_, block_); } zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) @@ -242,22 +225,6 @@ zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) return io_threads [result]; } -void zmq::ctx_t::register_pipe (class pipe_t *pipe_) -{ - pipes_sync.lock (); - bool inserted = pipes.insert (pipe_).second; - zmq_assert (inserted); - pipes_sync.unlock (); -} - -void zmq::ctx_t::unregister_pipe (class pipe_t *pipe_) -{ - pipes_sync.lock (); - pipes_t::size_type erased = pipes.erase (pipe_); - zmq_assert (erased == 1); - pipes_sync.unlock (); -} - int zmq::ctx_t::register_endpoint (const char *addr_, socket_base_t *socket_) { @@ -315,3 +282,15 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_) return endpoint; } +void zmq::ctx_t::dezombify () +{ + // Try to dezombify each zombie in the list. + for (zombies_t::size_type i = 0; i != zombies.size ();) + if (zombies [i]->dezombify ()) { + empty_slots.push_back (zombies [i]->get_slot ()); + zombies.erase (zombies [i]); + } + else + i++; +} + diff --git a/src/ctx.hpp b/src/ctx.hpp index c96a923..cb9a2d9 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -26,7 +26,9 @@ #include <string> #include "signaler.hpp" +#include "semaphore.hpp" #include "ypipe.hpp" +#include "yarray.hpp" #include "config.hpp" #include "mutex.hpp" #include "stdint.hpp" @@ -55,29 +57,19 @@ namespace zmq // Create a socket. class socket_base_t *create_socket (int type_); - // Destroy a socket. - void destroy_socket (); + // Make socket a zombie. + void zombify (socket_base_t *socket_); - // Called by app_thread_t when it has no more sockets. The function - // should disassociate the object from the current OS thread. - void no_sockets (class app_thread_t *thread_); + // Send command to the destination slot. + void send_command (uint32_t slot_, const command_t &command_); - // Send command to the destination thread. - void send_command (uint32_t destination_, const command_t &command_); - - // Receive command from another thread. - bool recv_command (uint32_t thread_slot_, command_t *command_, - bool block_); + // Receive command from the source slot. + bool recv_command (uint32_t slot_, command_t *command_, bool block_); // Returns the I/O thread that is the least busy at the moment. // Taskset specifies which I/O threads are eligible (0 = all). class io_thread_t *choose_io_thread (uint64_t taskset_); - // All pipes are registered with the context so that even the - // orphaned pipes can be deallocated on the terminal shutdown. - void register_pipe (class pipe_t *pipe_); - void unregister_pipe (class pipe_t *pipe_); - // Management of inproc endpoints. int register_endpoint (const char *addr_, class socket_base_t *socket_); void unregister_endpoints (class socket_base_t *socket_); @@ -87,57 +79,45 @@ namespace zmq ~ctx_t (); - struct app_thread_info_t - { - // If false, 0MQ application thread is free, there's no associated - // OS thread. - bool associated; + // Sockets belonging to this context. + typedef yarray_t <socket_base_t> sockets_t; + sockets_t sockets; + + // Array of sockets that were already closed but not yet deallocated. + // These sockets still have some pipes and I/O objects attached. + typedef yarray_t <socket_base_t> zombies_t; + zombies_t zombies; + + // List of unused slots. + typedef std::vector <uint32_t> emtpy_slots_t; + emtpy_slots_t empty_slots; - // ID of the associated OS thread. If 'associated' is false, - // this field contains bogus data. - thread_t::id_t tid; + // If true, shutdown thread wants to be informed when there are no + // more open sockets. Do so by posting no_sockets_sync semaphore. + // Note that this variable is synchronised by slot_sync mutex. + bool no_sockets_notify; - // Pointer to the 0MQ application thread object. - class app_thread_t *app_thread; - }; + // Object used by zmq_term to wait while all the sockets are closed + // by different application threads. + semaphore_t no_sockets_sync; - // Application threads. - typedef std::vector <app_thread_info_t> app_threads_t; - app_threads_t app_threads; + // Synchronisation of accesses to global slot-related data: + // sockets, zombies, empty_slots, terminated. It also synchronises + // access to zombie sockets as such (as oposed to slots) and provides + // a memory barrier to ensure that all CPU cores see the same data. + mutex_t slot_sync; - // Synchronisation of accesses to shared application thread data. - mutex_t app_threads_sync; + // This function attempts to deallocate as many zombie sockets as + // possible. It must be called within a slot_sync critical section. + void dezombify (); // I/O threads. typedef std::vector <class io_thread_t*> io_threads_t; io_threads_t io_threads; // Array of pointers to signalers for both application and I/O threads. - int signalers_count; - signaler_t **signalers; - - // As pipes may reside in orphaned state in particular moments - // of the pipe shutdown process, i.e. neither pipe reader nor - // pipe writer hold reference to the pipe, we have to hold references - // to all pipes in context so that we can deallocate them - // during terminal shutdown even though it conincides with the - // pipe being in the orphaned state. - typedef std::set <class pipe_t*> pipes_t; - pipes_t pipes; - - // Synchronisation of access to the pipes repository. - mutex_t pipes_sync; - - // Number of sockets alive. - int sockets; - - // If true, zmq_term was already called. When last socket is closed - // the whole 0MQ infrastructure should be deallocated. - bool terminated; - - // Synchronisation of access to the termination data (socket count - // and 'terminated' flag). - mutex_t term_sync; + uint32_t slot_count; + signaler_t **slots; // List of inproc endpoints within this context. typedef std::map <std::string, class socket_base_t*> endpoints_t; @@ -32,18 +32,19 @@ zmq::fq_t::fq_t () : zmq::fq_t::~fq_t () { - for (pipes_t::size_type i = 0; i != pipes.size (); i++) - pipes [i]->term (); + zmq_assert (pipes.empty ()); } void zmq::fq_t::attach (reader_t *pipe_) { + pipe_->set_event_sink (this); + pipes.push_back (pipe_); pipes.swap (active, pipes.size () - 1); active++; } -void zmq::fq_t::detach (reader_t *pipe_) +void zmq::fq_t::terminated (reader_t *pipe_) { zmq_assert (!more || pipes [current] != pipe_); @@ -57,16 +58,18 @@ void zmq::fq_t::detach (reader_t *pipe_) pipes.erase (pipe_); } -void zmq::fq_t::kill (reader_t *pipe_) +bool zmq::fq_t::has_pipes () { - // Move the pipe to the list of inactive pipes. - active--; - if (current == active) - current = 0; - pipes.swap (pipes.index (pipe_), active); + return !pipes.empty (); +} + +void zmq::fq_t::term_pipes () +{ + for (pipes_t::size_type i = 0; i != pipes.size (); i++) + pipes [i]->terminate (); } -void zmq::fq_t::revive (reader_t *pipe_) +void zmq::fq_t::activated (reader_t *pipe_) { // Move the pipe to the list of active pipes. pipes.swap (pipes.index (pipe_), active); @@ -98,6 +101,12 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_) } return 0; } + else { + active--; + pipes.swap (current, active); + if (current == active) + current = 0; + } } // No message is available. Initialise the output parameter @@ -21,6 +21,7 @@ #define __ZMQ_FQ_HPP_INCLUDED__ #include "yarray.hpp" +#include "pipe.hpp" namespace zmq { @@ -28,24 +29,28 @@ namespace zmq // Class manages a set of inbound pipes. On receive it performs fair // queueing (RFC970) so that senders gone berserk won't cause denial of // service for decent senders. - class fq_t + class fq_t : public i_reader_events { public: fq_t (); ~fq_t (); - void attach (class reader_t *pipe_); - void detach (class reader_t *pipe_); - void kill (class reader_t *pipe_); - void revive (class reader_t *pipe_); + void attach (reader_t *pipe_); + bool has_pipes (); + void term_pipes (); + int recv (zmq_msg_t *msg_, int flags_); bool has_in (); + // i_reader_events implementation. + void activated (reader_t *pipe_); + void terminated (reader_t *pipe_); + private: // Inbound pipes. - typedef yarray_t <class reader_t> pipes_t; + typedef yarray_t <reader_t> pipes_t; pipes_t pipes; // Number of active pipes. All the active pipes are located at the diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp deleted file mode 100644 index 0d14224..0000000 --- a/src/i_endpoint.hpp +++ /dev/null @@ -1,43 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__ -#define __ZMQ_I_ENDPOINT_HPP_INCLUDED__ - -#include "blob.hpp" - -namespace zmq -{ - - struct i_endpoint - { - virtual ~i_endpoint () {} - - virtual void attach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) = 0; - virtual void detach_inpipe (class reader_t *pipe_) = 0; - virtual void detach_outpipe (class writer_t *pipe_) = 0; - virtual void kill (class reader_t *pipe_) = 0; - virtual void revive (class reader_t *pipe_) = 0; - virtual void revive (class writer_t *pipe_) = 0; - }; - -} - -#endif diff --git a/src/io_thread.cpp b/src/io_thread.cpp index fac6961..3d202cf 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -26,9 +26,8 @@ #include "err.hpp" #include "ctx.hpp" -zmq::io_thread_t::io_thread_t (ctx_t *ctx_, - uint32_t thread_slot_) : - object_t (ctx_, thread_slot_) +zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t slot_) : + object_t (ctx_, slot_) { poller = new (std::nothrow) poller_t; zmq_assert (poller); diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 3d832c0..9e7c2ea 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -38,7 +38,7 @@ namespace zmq { public: - io_thread_t (class ctx_t *ctx_, uint32_t thread_slot_); + io_thread_t (class ctx_t *ctx_, uint32_t slot_); // Clean-up. If the thread was started, it's neccessary to call 'stop' // before invoking destructor. Otherwise the destructor would hang up. @@ -32,19 +32,27 @@ zmq::lb_t::lb_t () : zmq::lb_t::~lb_t () { - for (pipes_t::size_type i = 0; i != pipes.size (); i++) - pipes [i]->term (); + zmq_assert (pipes.empty ()); } void zmq::lb_t::attach (writer_t *pipe_) { + pipe_->set_event_sink (this); + pipes.push_back (pipe_); pipes.swap (active, pipes.size () - 1); active++; } -void zmq::lb_t::detach (writer_t *pipe_) +void zmq::lb_t::term_pipes () { + for (pipes_t::size_type i = 0; i != pipes.size (); i++) + pipes [i]->terminate (); +} + +void zmq::lb_t::terminated (writer_t *pipe_) +{ + // ??? zmq_assert (!more || pipes [current] != pipe_); // Remove the pipe from the list; adjust number of active pipes @@ -57,7 +65,12 @@ void zmq::lb_t::detach (writer_t *pipe_) pipes.erase (pipe_); } -void zmq::lb_t::revive (writer_t *pipe_) +bool zmq::lb_t::has_pipes () +{ + return !pipes.empty (); +} + +void zmq::lb_t::activated (writer_t *pipe_) { // Move the pipe to the list of active pipes. pipes.swap (pipes.index (pipe_), active); @@ -21,25 +21,30 @@ #define __ZMQ_LB_HPP_INCLUDED__ #include "yarray.hpp" +#include "pipe.hpp" namespace zmq { // Class manages a set of outbound pipes. On send it load balances // messages fairly among the pipes. - class lb_t + class lb_t : public i_writer_events { public: lb_t (); ~lb_t (); - void attach (class writer_t *pipe_); - void detach (class writer_t *pipe_); - void revive (class writer_t *pipe_); + void attach (writer_t *pipe_); + void term_pipes (); + bool has_pipes (); int send (zmq_msg_t *msg_, int flags_); bool has_out (); + // i_writer_events interface implementation. + void activated (writer_t *pipe_); + void terminated (writer_t *pipe_); + private: // List of outbound pipes. diff --git a/src/object.cpp b/src/object.cpp index 324450f..cdb177f 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -28,15 +28,15 @@ #include "session.hpp" #include "socket_base.hpp" -zmq::object_t::object_t (ctx_t *ctx_, uint32_t thread_slot_) : +zmq::object_t::object_t (ctx_t *ctx_, uint32_t slot_) : ctx (ctx_), - thread_slot (thread_slot_) + slot (slot_) { } zmq::object_t::object_t (object_t *parent_) : ctx (parent_->ctx), - thread_slot (parent_->thread_slot) + slot (parent_->slot) { } @@ -44,9 +44,9 @@ zmq::object_t::~object_t () { } -uint32_t zmq::object_t::get_thread_slot () +uint32_t zmq::object_t::get_slot () { - return thread_slot; + return slot; } zmq::ctx_t *zmq::object_t::get_ctx () @@ -123,16 +123,6 @@ void zmq::object_t::process_command (command_t &cmd_) deallocate_command (&cmd_); } -void zmq::object_t::register_pipe (class pipe_t *pipe_) -{ - ctx->register_pipe (pipe_); -} - -void zmq::object_t::unregister_pipe (class pipe_t *pipe_) -{ - ctx->unregister_pipe (pipe_); -} - int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_) { return ctx->register_endpoint (addr_, socket_); @@ -153,6 +143,11 @@ zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) return ctx->choose_io_thread (taskset_); } +void zmq::object_t::zombify (socket_base_t *socket_) +{ + ctx->zombify (socket_); +} + void zmq::object_t::send_stop () { // 'stop' command goes always from administrative thread to @@ -160,7 +155,7 @@ void zmq::object_t::send_stop () command_t cmd; cmd.destination = this; cmd.type = command_t::stop; - ctx->send_command (thread_slot, cmd); + ctx->send_command (slot, cmd); } void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_) @@ -369,6 +364,6 @@ void zmq::object_t::process_seqnum () void zmq::object_t::send_command (command_t &cmd_) { - ctx->send_command (cmd_.destination->get_thread_slot (), cmd_); + ctx->send_command (cmd_.destination->get_slot (), cmd_); } diff --git a/src/object.hpp b/src/object.hpp index a38b0a6..c75a95a 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -32,18 +32,14 @@ namespace zmq { public: - object_t (class ctx_t *ctx_, uint32_t thread_slot_); + object_t (class ctx_t *ctx_, uint32_t slot_); object_t (object_t *parent_); virtual ~object_t (); - uint32_t get_thread_slot (); + uint32_t get_slot (); ctx_t *get_ctx (); void process_command (struct command_t &cmd_); - // Allow pipe to access corresponding context functions. - void register_pipe (class pipe_t *pipe_); - void unregister_pipe (class pipe_t *pipe_); - protected: // Using following function, socket is able to access global @@ -55,6 +51,10 @@ namespace zmq // Chooses least loaded I/O thread. class io_thread_t *choose_io_thread (uint64_t taskset_); + // Zombify particular socket. In other words, pass the ownership to + // the context. + void zombify (class socket_base_t *socket_); + // Derived object can use these functions to send commands // to other objects. void send_stop (); @@ -105,7 +105,7 @@ namespace zmq class ctx_t *ctx; // Slot ID of the thread the object belongs to. - uint32_t thread_slot; + uint32_t slot; void send_command (command_t &cmd_); diff --git a/src/owned.cpp b/src/owned.cpp index d6be444..7d1cf5e 100644 --- a/src/owned.cpp +++ b/src/owned.cpp @@ -35,7 +35,7 @@ zmq::owned_t::~owned_t () void zmq::owned_t::inc_seqnum () { - // NB: This function may be called from a different thread! + // This function may be called from a different thread! sent_seqnum.add (1); } @@ -62,10 +62,16 @@ void zmq::owned_t::finalise () { // If termination request was already received and there are no more // commands to wait for, terminate the object. - if (shutting_down && processed_seqnum == sent_seqnum.get ()) { + if (shutting_down && processed_seqnum == sent_seqnum.get () + && is_terminable ()) { process_unplug (); send_term_ack (owner); delete this; } } +bool zmq::owned_t::is_terminable () +{ + return true; +} + diff --git a/src/owned.hpp b/src/owned.hpp index 91189a1..80cf42f 100644 --- a/src/owned.hpp +++ b/src/owned.hpp @@ -45,6 +45,13 @@ namespace zmq protected: + // A mechanism allowing derived owned objects to postpone the + // termination process. Default implementation defines no such delay. + // Note that the derived object has to call finalise method when the + // delay is over. + virtual bool is_terminable (); + void finalise (); + // Ask owner socket to terminate this object. void term (); @@ -69,8 +76,6 @@ namespace zmq void process_term (); void process_seqnum (); - void finalise (); - // Sequence number of the last command sent to this object. atomic_counter_t sent_seqnum; diff --git a/src/pair.cpp b/src/pair.cpp index 3872b28..1ff2e1a 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -23,11 +23,12 @@ #include "err.hpp" #include "pipe.hpp" -zmq::pair_t::pair_t (class app_thread_t *parent_) : - socket_base_t (parent_), +zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t slot_) : + socket_base_t (parent_, slot_), inpipe (NULL), outpipe (NULL), - alive (true) + inpipe_alive (false), + outpipe_alive (false) { options.requires_in = true; options.requires_out = true; @@ -35,56 +36,61 @@ zmq::pair_t::pair_t (class app_thread_t *parent_) : zmq::pair_t::~pair_t () { - if (inpipe) - inpipe->term (); - if (outpipe) - outpipe->term (); + zmq_assert (!inpipe); + zmq_assert (!outpipe); } void zmq::pair_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (!inpipe && !outpipe); + inpipe = inpipe_; + inpipe_alive = true; + inpipe->set_event_sink (this); + outpipe = outpipe_; outpipe_alive = true; + outpipe->set_event_sink (this); } -void zmq::pair_t::xdetach_inpipe (class reader_t *pipe_) +void zmq::pair_t::terminated (class reader_t *pipe_) { zmq_assert (pipe_ == inpipe); inpipe = NULL; + inpipe_alive = false; } -void zmq::pair_t::xdetach_outpipe (class writer_t *pipe_) +void zmq::pair_t::terminated (class writer_t *pipe_) { zmq_assert (pipe_ == outpipe); outpipe = NULL; + outpipe_alive = false; } -void zmq::pair_t::xkill (class reader_t *pipe_) +void zmq::pair_t::xterm_pipes () { - zmq_assert (alive); - alive = false; + if (inpipe) + inpipe->terminate (); + if (outpipe) + outpipe->terminate (); } -void zmq::pair_t::xrevive (class reader_t *pipe_) +bool zmq::pair_t::xhas_pipes () { - zmq_assert (!alive); - alive = true; + return inpipe != NULL || outpipe != NULL; } -void zmq::pair_t::xrevive (class writer_t *pipe_) +void zmq::pair_t::activated (class reader_t *pipe_) { - zmq_assert (!outpipe_alive); - outpipe_alive = true; + zmq_assert (!inpipe_alive); + inpipe_alive = true; } -int zmq::pair_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) +void zmq::pair_t::activated (class writer_t *pipe_) { - errno = EINVAL; - return -1; + zmq_assert (!outpipe_alive); + outpipe_alive = true; } int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_) @@ -100,7 +106,8 @@ int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_) return -1; } - outpipe->flush (); + if (!(flags_ & ZMQ_SNDMORE)) + outpipe->flush (); // Detach the original message from the data buffer. int rc = zmq_msg_init (msg_); @@ -114,9 +121,12 @@ int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_) // Deallocate old content of the message. zmq_msg_close (msg_); - if (!alive || !inpipe || !inpipe->read (msg_)) { - // No message is available. Initialise the output parameter - // to be a 0-byte message. + if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) { + + // No message is available. + inpipe_alive = false; + + // Initialise the output parameter to be a 0-byte message. zmq_msg_init (msg_); errno = EAGAIN; return -1; @@ -126,14 +136,16 @@ int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_) bool zmq::pair_t::xhas_in () { - if (alive && inpipe && inpipe->check_read ()) - return true; - return false; + if (!inpipe || !inpipe_alive) + return false; + + inpipe_alive = inpipe->check_read (); + return inpipe_alive; } bool zmq::pair_t::xhas_out () { - if (outpipe == NULL || !outpipe_alive) + if (!outpipe || !outpipe_alive) return false; outpipe_alive = outpipe->check_write (); diff --git a/src/pair.hpp b/src/pair.hpp index aea249f..0c484d7 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -21,37 +21,45 @@ #define __ZMQ_PAIR_HPP_INCLUDED__ #include "socket_base.hpp" +#include "pipe.hpp" namespace zmq { - class pair_t : public socket_base_t + class pair_t : + public socket_base_t, + public i_reader_events, + public i_writer_events { public: - pair_t (class app_thread_t *parent_); + pair_t (class ctx_t *parent_, uint32_t slot_); ~pair_t (); // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - void xdetach_inpipe (class reader_t *pipe_); - void xdetach_outpipe (class writer_t *pipe_); - void xkill (class reader_t *pipe_); - void xrevive (class reader_t *pipe_); - void xrevive (class writer_t *pipe_); - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + void xterm_pipes (); + bool xhas_pipes (); int xsend (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); + // i_reader_events interface implementation. + void activated (class reader_t *pipe_); + void terminated (class reader_t *pipe_); + + // i_writer_events interface implementation. + void activated (class writer_t *pipe_); + void terminated (class writer_t *pipe_); + private: class reader_t *inpipe; class writer_t *outpipe; - bool alive; + bool inpipe_alive; bool outpipe_alive; pair_t (const pair_t&); diff --git a/src/pipe.cpp b/src/pipe.cpp index 200beb0..1903422 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -17,31 +17,54 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <new> + #include "../include/zmq.h" #include "pipe.hpp" +#include "likely.hpp" -zmq::reader_t::reader_t (object_t *parent_, uint64_t lwm_) : +zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, + uint64_t lwm_) : object_t (parent_), - pipe (NULL), - peer (NULL), + pipe (pipe_), + writer (NULL), lwm (lwm_), msgs_read (0), - endpoint (NULL) -{} + sink (NULL), + terminating (false) +{ + // Note that writer is not set here. Writer will inform reader about its + // address once it is created (via set_writer method). +} + +void zmq::reader_t::set_writer (writer_t *writer_) +{ + zmq_assert (!writer); + writer = writer_; +} zmq::reader_t::~reader_t () { - if (pipe) - unregister_pipe (pipe); + // Pipe as such is owned and deallocated by reader object. + // The point is that reader processes the last step of terminal + // handshaking (term_ack). + zmq_assert (pipe); + + // First delete all the unread messages in the pipe. We have to do it by + // hand because zmq_msg_t is a POD, not a class, so there's no associated + // destructor. + zmq_msg_t msg; + while (pipe->read (&msg)) + zmq_msg_close (&msg); + + delete pipe; } -void zmq::reader_t::set_pipe (pipe_t *pipe_) +void zmq::reader_t::set_event_sink (i_reader_events *sink_) { - zmq_assert (!pipe); - pipe = pipe_; - peer = &pipe->writer; - register_pipe (pipe); + zmq_assert (!sink); + sink = sink_; } bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_) @@ -53,19 +76,20 @@ bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_) bool zmq::reader_t::check_read () { + if (unlikely (terminating)) + return false; + // Check if there's an item in the pipe. // If not, deactivate the pipe. if (!pipe->check_read ()) { - endpoint->kill (this); + terminate (); return false; } // If the next item in the pipe is message delimiter, // initiate its termination. if (pipe->probe (is_delimiter)) { - if (endpoint) - endpoint->detach_inpipe (this); - term (); + terminate (); return false; } @@ -74,17 +98,16 @@ bool zmq::reader_t::check_read () bool zmq::reader_t::read (zmq_msg_t *msg_) { - if (!pipe->read (msg_)) { - endpoint->kill (this); + if (unlikely (terminating)) + return false; + + if (!pipe->read (msg_)) return false; - } // If delimiter was read, start termination process of the pipe. unsigned char *offset = 0; if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) { - if (endpoint) - endpoint->detach_inpipe (this); - term (); + terminate (); return false; } @@ -92,51 +115,64 @@ bool zmq::reader_t::read (zmq_msg_t *msg_) msgs_read++; if (lwm > 0 && msgs_read % lwm == 0) - send_reader_info (peer, msgs_read); + send_reader_info (writer, msgs_read); return true; } -void zmq::reader_t::set_endpoint (i_endpoint *endpoint_) +void zmq::reader_t::terminate () { - endpoint = endpoint_; + // If termination was already started by the peer, do nothing. + if (terminating) + return; + + terminating = true; + send_pipe_term (writer); } -void zmq::reader_t::term () +bool zmq::reader_t::is_terminating () { - endpoint = NULL; - send_pipe_term (peer); + return terminating; } void zmq::reader_t::process_revive () { - // Beacuse of command throttling mechanism, incoming termination request - // may not have been processed before subsequent send. - // In that case endpoint is NULL. - if (endpoint) - endpoint->revive (this); + // Forward the event to the sink (either socket or session). + sink->activated (this); } void zmq::reader_t::process_pipe_term_ack () { - peer = NULL; - delete pipe; + // At this point writer may already be deallocated. + // For safety's sake drop the reference to it. + writer = NULL; + + // Notify owner about the termination. + zmq_assert (sink); + sink->terminated (this); + + // Deallocate resources. + delete this; } -zmq::writer_t::writer_t (object_t *parent_, +zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_, uint64_t hwm_, int64_t swap_size_) : object_t (parent_), - pipe (NULL), - peer (NULL), + pipe (pipe_), + reader (reader_), hwm (hwm_), msgs_read (0), msgs_written (0), msg_store (NULL), extra_msg_flag (false), stalled (false), - pending_close (false), - endpoint (NULL) + sink (NULL), + terminating (false), + pending_close (false) { + // Inform reader about the writer. + reader->set_writer (this); + if (swap_size_ > 0) { msg_store = new (std::nothrow) msg_store_t (swap_size_); if (msg_store != NULL) { @@ -148,11 +184,6 @@ zmq::writer_t::writer_t (object_t *parent_, } } -void zmq::writer_t::set_endpoint (i_endpoint *endpoint_) -{ - endpoint = endpoint_; -} - zmq::writer_t::~writer_t () { if (extra_msg_flag) @@ -161,15 +192,17 @@ zmq::writer_t::~writer_t () delete msg_store; } -void zmq::writer_t::set_pipe (pipe_t *pipe_) +void zmq::writer_t::set_event_sink (i_writer_events *sink_) { - zmq_assert (!pipe); - pipe = pipe_; - peer = &pipe->reader; + zmq_assert (!sink); + sink = sink_; } bool zmq::writer_t::check_write () { + if (terminating) + return false; + if (pipe_full () && (msg_store == NULL || msg_store->full () || extra_msg_flag)) { stalled = true; return false; @@ -180,6 +213,9 @@ bool zmq::writer_t::check_write () bool zmq::writer_t::write (zmq_msg_t *msg_) { + if (terminating) + return false; + if (!check_write ()) return false; @@ -216,23 +252,27 @@ void zmq::writer_t::rollback () while (pipe->unwrite (&msg)) { zmq_assert (msg.flags & ZMQ_MSG_MORE); zmq_msg_close (&msg); + msgs_written--; } - if (stalled && endpoint != NULL && check_write ()) { + if (stalled && check_write ()) { stalled = false; - endpoint->revive (this); + zmq_assert (sink); + sink->activated (this); } } void zmq::writer_t::flush () { if (!pipe->flush ()) - send_revive (peer); + send_revive (reader); } -void zmq::writer_t::term () +void zmq::writer_t::terminate () { - endpoint = NULL; + // Prevent double termination. + if (terminating) + return; // Rollback any unfinished messages. rollback (); @@ -293,71 +333,69 @@ void zmq::writer_t::process_reader_info (uint64_t msgs_read_) flush (); } - if (stalled && endpoint != NULL) { + if (stalled) { stalled = false; - endpoint->revive (this); + zmq_assert (sink); + sink->activated (this); } } void zmq::writer_t::process_pipe_term () { - if (endpoint) - endpoint->detach_outpipe (this); + send_pipe_term_ack (reader); - reader_t *p = peer; - peer = NULL; - send_pipe_term_ack (p); -} + // The above command allows reader to deallocate itself and the pipe. + // For safety's sake we'll drop the pointers here. + reader = NULL; + pipe = NULL; -bool zmq::writer_t::pipe_full () -{ - return hwm > 0 && msgs_written - msgs_read == hwm; -} + // Notify owner about the termination. + zmq_assert (sink); + sink->terminated (this); -zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_, int64_t swap_size_) : - reader (reader_parent_, compute_lwm (hwm_)), - writer (writer_parent_, hwm_, swap_size_) -{ - reader.set_pipe (this); - writer.set_pipe (this); + // Deallocate the resources. + delete this; } -zmq::pipe_t::~pipe_t () +bool zmq::writer_t::pipe_full () { - // Deallocate all the unread messages in the pipe. We have to do it by - // hand because zmq_msg_t is a POD, not a class, so there's no associated - // destructor. - zmq_msg_t msg; - while (read (&msg)) - zmq_msg_close (&msg); + return hwm > 0 && msgs_written - msgs_read == hwm; } -uint64_t zmq::pipe_t::compute_lwm (uint64_t hwm_) +void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_, + uint64_t hwm_, int64_t swap_size_, reader_t **reader_, writer_t **writer_) { - // Following point should be taken into consideration when computing - // low watermark: - // - // 1. LWM has to be less than HWM. - // 2. LWM cannot be set to very low value (such as zero) as after filling - // the queue it would start to refill only after all the messages are - // read from it and thus unnecessarily hold the progress back. - // 3. LWM cannot be set to very high value (such as HWM-1) as it would - // result in lock-step filling of the queue - if a single message is read - // from a full queue, writer thread is resumed to write exactly one - // message to the queue and go back to sleep immediately. This would - // result in low performance. - // - // Given the 3. it would be good to keep HWM and LWM as far apart as - // possible to reduce the thread switching overhead to almost zero, - // say HWM-LWM should be 500 (max_wm_delta). - // - // That done, we still we have to account for the cases where HWM<500 thus - // driving LWM to negative numbers. Let's make LWM 1/2 of HWM in such cases. - - if (hwm_ > max_wm_delta * 2) - return hwm_ - max_wm_delta; - else - return (hwm_ + 1) / 2; + // First compute the low water mark. Following point should be taken + // into consideration: + // + // 1. LWM has to be less than HWM. + // 2. LWM cannot be set to very low value (such as zero) as after filling + // the queue it would start to refill only after all the messages are + // read from it and thus unnecessarily hold the progress back. + // 3. LWM cannot be set to very high value (such as HWM-1) as it would + // result in lock-step filling of the queue - if a single message is + // read from a full queue, writer thread is resumed to write exactly one + // message to the queue and go back to sleep immediately. This would + // result in low performance. + // + // Given the 3. it would be good to keep HWM and LWM as far apart as + // possible to reduce the thread switching overhead to almost zero, + // say HWM-LWM should be max_wm_delta. + // + // That done, we still we have to account for the cases where + // HWM < max_wm_delta thus driving LWM to negative numbers. + // Let's make LWM 1/2 of HWM in such cases. + uint64_t lwm = (hwm_ > max_wm_delta * 2) ? + hwm_ - max_wm_delta : (hwm_ + 1) / 2; + + // Create all three objects pipe consists of: the pipe per se, reader and + // writer. The pipe will be handled by reader and writer, its never passed + // to the user. Reader and writer are returned to the user. + pipe_t *pipe = new (std::nothrow) pipe_t (); + zmq_assert (pipe); + *reader_ = new (std::nothrow) reader_t (reader_parent_, pipe, lwm); + zmq_assert (*reader_); + *writer_ = new (std::nothrow) writer_t (writer_parent_, pipe, *reader_, + hwm_, swap_size_); + zmq_assert (*writer_); } - diff --git a/src/pipe.hpp b/src/pipe.hpp index ece678a..34c5600 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -23,7 +23,6 @@ #include "../include/zmq.h" #include "stdint.hpp" -#include "i_endpoint.hpp" #include "yarray_item.hpp" #include "ypipe.hpp" #include "msg_store.hpp" @@ -33,15 +32,31 @@ namespace zmq { + // The shutdown mechanism for pipe works as follows: Either endpoint + // (or even both of them) can ask pipe to terminate by calling 'terminate' + // method. Pipe then terminates in asynchronous manner. When the part of + // the shutdown tied to the endpoint is done it triggers 'terminated' + // event. When endpoint processes the event and returns, associated + // reader/writer object is deallocated. + + typedef ypipe_t <zmq_msg_t, message_pipe_granularity> pipe_t; + + struct i_reader_events + { + virtual void terminated (class reader_t *pipe_) = 0; + virtual void activated (class reader_t *pipe_) = 0; + }; + class reader_t : public object_t, public yarray_item_t { - public: + friend void zmq::create_pipe (object_t*, object_t*, uint64_t, + int64_t, reader_t**, writer_t**); + friend class writer_t; - reader_t (class object_t *parent_, uint64_t lwm_); - ~reader_t (); + public: - void set_pipe (class pipe_t *pipe_); - void set_endpoint (i_endpoint *endpoint_); + // Specifies the object to get events from the reader. + void set_event_sink (i_reader_events *endpoint_); // Returns true if there is at least one message to read in the pipe. bool check_read (); @@ -50,10 +65,20 @@ namespace zmq bool read (zmq_msg_t *msg_); // Ask pipe to terminate. - void term (); + void terminate (); + + // Returns true if the pipe is already terminating + // (say if delimiter was already read). + bool is_terminating (); private: + reader_t (class object_t *parent_, pipe_t *pipe_, uint64_t lwm_); + ~reader_t (); + + // To be called only by writer itself! + void set_writer (class writer_t *writer_); + // Command handlers. void process_revive (); void process_pipe_term_ack (); @@ -62,10 +87,10 @@ namespace zmq static bool is_delimiter (zmq_msg_t &msg_); // The underlying pipe. - class pipe_t *pipe; + pipe_t *pipe; // Pipe writer associated with the other side of the pipe. - class writer_t *peer; + class writer_t *writer; // Low watermark for in-memory storage (in bytes). uint64_t lwm; @@ -73,22 +98,32 @@ namespace zmq // Number of messages read so far. uint64_t msgs_read; - // Endpoint (either session or socket) the pipe is attached to. - i_endpoint *endpoint; + // Sink for the events (either the socket of the session). + i_reader_events *sink; + + // True is 'terminate' method was called or delimiter + // was read from the pipe. + bool terminating; reader_t (const reader_t&); void operator = (const reader_t&); }; + struct i_writer_events + { + virtual void terminated (class writer_t *pipe_) = 0; + virtual void activated (class writer_t *pipe_) = 0; + }; + class writer_t : public object_t, public yarray_item_t { - public: + friend void zmq::create_pipe (object_t*, object_t*, uint64_t, + int64_t, reader_t**, writer_t**); - writer_t (class object_t *parent_, uint64_t hwm_, int64_t swap_size_); - ~writer_t (); + public: - void set_pipe (class pipe_t *pipe_); - void set_endpoint (i_endpoint *endpoint_); + // Specifies the object to get events from the writer. + void set_event_sink (i_writer_events *endpoint_); // Checks whether a message can be written to the pipe. // If writing the message would cause high watermark to be @@ -106,10 +141,14 @@ namespace zmq void flush (); // Ask pipe to terminate. - void term (); + void terminate (); private: + writer_t (class object_t *parent_, pipe_t *pipe_, reader_t *reader_, + uint64_t hwm_, int64_t swap_size_); + ~writer_t (); + void process_reader_info (uint64_t msgs_read_); // Command handlers. @@ -123,10 +162,10 @@ namespace zmq void write_delimiter (); // The underlying pipe. - class pipe_t *pipe; + pipe_t *pipe; // Pipe reader associated with the other side of the pipe. - class reader_t *peer; + reader_t *reader; // High watermark for in-memory storage (in bytes). uint64_t hwm; @@ -149,35 +188,23 @@ namespace zmq // True iff the last attempt to write a message has failed. bool stalled; - bool pending_close; + // Sink for the events (either the socket or the session). + i_writer_events *sink; - // Endpoint (either session or socket) the pipe is attached to. - i_endpoint *endpoint; + // True is 'terminate' method was called of 'pipe_term' command + // arrived from the reader. + bool terminating; + + bool pending_close; writer_t (const writer_t&); void operator = (const writer_t&); }; - // Message pipe. - class pipe_t : public ypipe_t <zmq_msg_t, message_pipe_granularity> - { - public: - - pipe_t (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_, int64_t swap_size_); - ~pipe_t (); - - reader_t reader; - writer_t writer; - - private: - - uint64_t compute_lwm (uint64_t hwm_); - - pipe_t (const pipe_t&); - void operator = (const pipe_t&); - }; - + // Creates a pipe. Returns pointer to reader and writer objects. + void create_pipe (object_t *reader_parent_, object_t *writer_parent_, + uint64_t hwm_, int64_t swap_size_, reader_t **reader_, + writer_t **writer_); } #endif diff --git a/src/pub.cpp b/src/pub.cpp index 4e73b19..d1d1c72 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -24,8 +24,8 @@ #include "msg_content.hpp" #include "pipe.hpp" -zmq::pub_t::pub_t (class app_thread_t *parent_) : - socket_base_t (parent_), +zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t slot_) : + socket_base_t (parent_, slot_), active (0) { options.requires_in = false; @@ -34,56 +34,47 @@ zmq::pub_t::pub_t (class app_thread_t *parent_) : zmq::pub_t::~pub_t () { - for (pipes_t::size_type i = 0; i != pipes.size (); i++) - pipes [i]->term (); - pipes.clear (); + zmq_assert (pipes.empty ()); } void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (!inpipe_); + + outpipe_->set_event_sink (this); + pipes.push_back (outpipe_); pipes.swap (active, pipes.size () - 1); active++; } -void zmq::pub_t::xdetach_inpipe (class reader_t *pipe_) -{ - zmq_assert (false); -} - -void zmq::pub_t::xdetach_outpipe (class writer_t *pipe_) -{ - // Remove the pipe from the list; adjust number of active pipes - // accordingly. - if (pipes.index (pipe_) < active) - active--; - pipes.erase (pipe_); -} - -void zmq::pub_t::xkill (class reader_t *pipe_) +void zmq::pub_t::xterm_pipes () { - zmq_assert (false); + // Start shutdown process for all the pipes. + for (pipes_t::size_type i = 0; i != pipes.size (); i++) + pipes [i]->terminate (); } -void zmq::pub_t::xrevive (class reader_t *pipe_) +bool zmq::pub_t::xhas_pipes () { - zmq_assert (false); + return !pipes.empty (); } -void zmq::pub_t::xrevive (class writer_t *pipe_) +void zmq::pub_t::activated (writer_t *pipe_) { // Move the pipe to the list of active pipes. pipes.swap (pipes.index (pipe_), active); active++; } -int zmq::pub_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) +void zmq::pub_t::terminated (writer_t *pipe_) { - errno = EINVAL; - return -1; + // Remove the pipe from the list; adjust number of active pipes + // accordingly. + if (pipes.index (pipe_) < active) + active--; + pipes.erase (pipe_); } int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) @@ -101,7 +92,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) // For VSMs the copying is straighforward. if (content == (msg_content_t*) ZMQ_VSM) { - for (pipes_t::size_type i = 0; i != active;) + for (pipes_t::size_type i = 0; i < active;) if (write (pipes [i], msg_)) i++; int rc = zmq_msg_init (msg_); @@ -133,7 +124,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) } // Push the message to all destinations. - for (pipes_t::size_type i = 0; i != active;) { + for (pipes_t::size_type i = 0; i < active;) { if (!write (pipes [i], msg_)) content->refcnt.sub (1); else @@ -147,17 +138,6 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) return 0; } -int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_) -{ - errno = ENOTSUP; - return -1; -} - -bool zmq::pub_t::xhas_in () -{ - return false; -} - bool zmq::pub_t::xhas_out () { return true; diff --git a/src/pub.hpp b/src/pub.hpp index ac3924a..a81edfe 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -22,31 +22,30 @@ #include "socket_base.hpp" #include "yarray.hpp" +#include "pipe.hpp" namespace zmq { - class pub_t : public socket_base_t + class pub_t : public socket_base_t, public i_writer_events { public: - pub_t (class app_thread_t *parent_); + pub_t (class ctx_t *parent_, uint32_t slot_); ~pub_t (); - // Overloads of functions from socket_base_t. + // Implementations of virtual functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - void xdetach_inpipe (class reader_t *pipe_); - void xdetach_outpipe (class writer_t *pipe_); - void xkill (class reader_t *pipe_); - void xrevive (class reader_t *pipe_); - void xrevive (class writer_t *pipe_); - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + void xterm_pipes (); + bool xhas_pipes (); int xsend (zmq_msg_t *msg_, int flags_); - int xrecv (zmq_msg_t *msg_, int flags_); - bool xhas_in (); bool xhas_out (); + // i_writer_events interface implementation. + void activated (writer_t *pipe_); + void terminated (writer_t *pipe_); + private: // Write the message to the pipe. Make the pipe inactive if writing diff --git a/src/pull.cpp b/src/pull.cpp index b2413ee..4f4a8b3 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -22,8 +22,8 @@ #include "pull.hpp" #include "err.hpp" -zmq::pull_t::pull_t (class app_thread_t *parent_) : - socket_base_t (parent_) +zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t slot_) : + socket_base_t (parent_, slot_) { options.requires_in = true; options.requires_out = false; @@ -40,45 +40,14 @@ void zmq::pull_t::xattach_pipes (class reader_t *inpipe_, fq.attach (inpipe_); } -void zmq::pull_t::xdetach_inpipe (class reader_t *pipe_) +void zmq::pull_t::xterm_pipes () { - zmq_assert (pipe_); - fq.detach (pipe_); + fq.term_pipes (); } -void zmq::pull_t::xdetach_outpipe (class writer_t *pipe_) +bool zmq::pull_t::xhas_pipes () { - // There are no outpipes, so this function shouldn't be called at all. - zmq_assert (false); -} - -void zmq::pull_t::xkill (class reader_t *pipe_) -{ - fq.kill (pipe_); -} - -void zmq::pull_t::xrevive (class reader_t *pipe_) -{ - fq.revive (pipe_); -} - -void zmq::pull_t::xrevive (class writer_t *pipe_) -{ - zmq_assert (false); -} - -int zmq::pull_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - // No special options for this socket type. - errno = EINVAL; - return -1; -} - -int zmq::pull_t::xsend (zmq_msg_t *msg_, int flags_) -{ - errno = ENOTSUP; - return -1; + return fq.has_pipes (); } int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_) @@ -91,8 +60,3 @@ bool zmq::pull_t::xhas_in () return fq.has_in (); } -bool zmq::pull_t::xhas_out () -{ - return false; -} - diff --git a/src/pull.hpp b/src/pull.hpp index 7f249e9..4be40dd 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_PULL_HPP_INCLUDED__ -#define __ZMQ_PULL_HPP_INCLUDED__ +#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__ +#define __ZMQ_UPSTREAM_HPP_INCLUDED__ #include "socket_base.hpp" #include "fq.hpp" @@ -30,22 +30,16 @@ namespace zmq { public: - pull_t (class app_thread_t *parent_); + pull_t (class ctx_t *parent_, uint32_t slot_); ~pull_t (); // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - void xdetach_inpipe (class reader_t *pipe_); - void xdetach_outpipe (class writer_t *pipe_); - void xkill (class reader_t *pipe_); - void xrevive (class reader_t *pipe_); - void xrevive (class writer_t *pipe_); - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); - int xsend (zmq_msg_t *msg_, int flags_); + void xterm_pipes (); + bool xhas_pipes (); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); - bool xhas_out (); private: diff --git a/src/push.cpp b/src/push.cpp index 522101f..3a3d258 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -23,8 +23,8 @@ #include "err.hpp" #include "pipe.hpp" -zmq::push_t::push_t (class app_thread_t *parent_) : - socket_base_t (parent_) +zmq::push_t::push_t (class ctx_t *parent_, uint32_t slot_) : + socket_base_t (parent_, slot_) { options.requires_in = false; options.requires_out = true; @@ -41,41 +41,14 @@ void zmq::push_t::xattach_pipes (class reader_t *inpipe_, lb.attach (outpipe_); } -void zmq::push_t::xdetach_inpipe (class reader_t *pipe_) +void zmq::push_t::xterm_pipes () { - // There are no inpipes, so this function shouldn't be called at all. - zmq_assert (false); + lb.term_pipes (); } -void zmq::push_t::xdetach_outpipe (class writer_t *pipe_) +bool zmq::push_t::xhas_pipes () { - zmq_assert (pipe_); - lb.detach (pipe_); -} - -void zmq::push_t::xkill (class reader_t *pipe_) -{ - // There are no inpipes, so this function shouldn't be called at all. - zmq_assert (false); -} - -void zmq::push_t::xrevive (class reader_t *pipe_) -{ - // There are no inpipes, so this function shouldn't be called at all. - zmq_assert (false); -} - -void zmq::push_t::xrevive (class writer_t *pipe_) -{ - lb.revive (pipe_); -} - -int zmq::push_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - // No special option for this socket type. - errno = EINVAL; - return -1; + return lb.has_pipes (); } int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_) @@ -83,17 +56,6 @@ int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_) return lb.send (msg_, flags_); } -int zmq::push_t::xrecv (zmq_msg_t *msg_, int flags_) -{ - errno = ENOTSUP; - return -1; -} - -bool zmq::push_t::xhas_in () -{ - return false; -} - bool zmq::push_t::xhas_out () { return lb.has_out (); diff --git a/src/push.hpp b/src/push.hpp index b3c8d87..e604abc 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_PUSH_HPP_INCLUDED__ -#define __ZMQ_PUSH_HPP_INCLUDED__ +#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__ +#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__ #include "socket_base.hpp" #include "lb.hpp" @@ -30,21 +30,15 @@ namespace zmq { public: - push_t (class app_thread_t *parent_); + push_t (class ctx_t *parent_, uint32_t slot_); ~push_t (); // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - void xdetach_inpipe (class reader_t *pipe_); - void xdetach_outpipe (class writer_t *pipe_); - void xkill (class reader_t *pipe_); - void xrevive (class reader_t *pipe_); - void xrevive (class writer_t *pipe_); - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + void xterm_pipes (); + bool xhas_pipes (); int xsend (zmq_msg_t *msg_, int flags_); - int xrecv (zmq_msg_t *msg_, int flags_); - bool xhas_in (); bool xhas_out (); private: diff --git a/src/rep.cpp b/src/rep.cpp index 34b77c4..7636d13 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -23,8 +23,8 @@ #include "err.hpp" #include "pipe.hpp" -zmq::rep_t::rep_t (class app_thread_t *parent_) : - socket_base_t (parent_), +zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t slot_) : + socket_base_t (parent_, slot_), active (0), current (0), sending_reply (false), @@ -42,6 +42,8 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) : zmq::rep_t::~rep_t () { + zmq_assert (in_pipes.empty ()); + zmq_assert (out_pipes.empty ()); } void zmq::rep_t::xattach_pipes (class reader_t *inpipe_, @@ -50,15 +52,28 @@ void zmq::rep_t::xattach_pipes (class reader_t *inpipe_, zmq_assert (inpipe_ && outpipe_); zmq_assert (in_pipes.size () == out_pipes.size ()); + inpipe_->set_event_sink (this); in_pipes.push_back (inpipe_); in_pipes.swap (active, in_pipes.size () - 1); + + outpipe_->set_event_sink (this); out_pipes.push_back (outpipe_); out_pipes.swap (active, out_pipes.size () - 1); + active++; } -void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_) +void zmq::rep_t::xterm_pipes () +{ + for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++) + in_pipes [i]->terminate (); + for (out_pipes_t::size_type i = 0; i != out_pipes.size (); i++) + out_pipes [i]->terminate (); +} + +void zmq::rep_t::terminated (reader_t *pipe_) { + // ??? zmq_assert (sending_reply || !more || in_pipes [current] != pipe_); zmq_assert (pipe_); @@ -71,14 +86,17 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_) if (current == active) current = 0; } - - if (out_pipes [index]) - out_pipes [index]->term (); in_pipes.erase (index); - out_pipes.erase (index); + + // ??? + if (!zombie) { + if (out_pipes [index]) + out_pipes [index]->terminate (); + out_pipes.erase (index); + } } -void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_) +void zmq::rep_t::terminated (writer_t *pipe_) { zmq_assert (pipe_); zmq_assert (in_pipes.size () == out_pipes.size ()); @@ -97,22 +115,22 @@ void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_) current = 0; } - if (in_pipes [index]) - in_pipes [index]->term (); - in_pipes.erase (index); out_pipes.erase (index); + + // ??? + if (!zombie) { + if (in_pipes [index]) + in_pipes [index]->terminate (); + in_pipes.erase (index); + } } -void zmq::rep_t::xkill (class reader_t *pipe_) +bool zmq::rep_t::xhas_pipes () { - // Move the pipe to the list of inactive pipes. - in_pipes_t::size_type index = in_pipes.index (pipe_); - active--; - in_pipes.swap (index, active); - out_pipes.swap (index, active); + return !in_pipes.empty () || !out_pipes.empty (); } -void zmq::rep_t::xrevive (class reader_t *pipe_) +void zmq::rep_t::activated (reader_t *pipe_) { // Move the pipe to the list of active pipes. in_pipes_t::size_type index = in_pipes.index (pipe_); @@ -121,15 +139,10 @@ void zmq::rep_t::xrevive (class reader_t *pipe_) active++; } -void zmq::rep_t::xrevive (class writer_t *pipe_) -{ -} - -int zmq::rep_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) +void zmq::rep_t::activated (writer_t *pipe_) { - errno = EINVAL; - return -1; + // TODO: What here? + zmq_assert (false); } int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) @@ -151,6 +164,8 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) // misbehaving requesters stop collecting replies. // TODO: Tear down the underlying connection (?) if (!written) { + + // TODO: The reply socket becomes deactivated here... errno = EAGAIN; return -1; } @@ -198,6 +213,13 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) for (count = active; count != 0; count--) { if (in_pipes [current]->read (msg_)) break; + + // Move the pipe to the list of inactive pipes. + active--; + in_pipes.swap (current, active); + out_pipes.swap (current, active); + + // Move to next pipe. current++; if (current >= active) current = 0; @@ -258,6 +280,13 @@ bool zmq::rep_t::xhas_in () for (int count = active; count != 0; count--) { if (in_pipes [current]->check_read ()) return !sending_reply; + + // Move the pipe to the list of inactive pipes. + active--; + in_pipes.swap (current, active); + out_pipes.swap (current, active); + + // Move to the next pipe. current++; if (current >= active) current = 0; diff --git a/src/rep.hpp b/src/rep.hpp index aef4318..7d82a28 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -22,39 +22,47 @@ #include "socket_base.hpp" #include "yarray.hpp" +#include "pipe.hpp" namespace zmq { - class rep_t : public socket_base_t + class rep_t : + public socket_base_t, + public i_reader_events, + public i_writer_events { public: - rep_t (class app_thread_t *parent_); + rep_t (class ctx_t *parent_, uint32_t slot_); ~rep_t (); // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - void xdetach_inpipe (class reader_t *pipe_); - void xdetach_outpipe (class writer_t *pipe_); - void xkill (class reader_t *pipe_); - void xrevive (class reader_t *pipe_); - void xrevive (class writer_t *pipe_); - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + void xterm_pipes (); + bool xhas_pipes (); int xsend (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); + // i_reader_events interface implementation. + void activated (reader_t *pipe_); + void terminated (reader_t *pipe_); + + // i_writer_events interface implementation. + void activated (writer_t *pipe_); + void terminated (writer_t *pipe_); + private: // List in outbound and inbound pipes. Note that the two lists are // always in sync. I.e. outpipe with index N communicates with the // same session as inpipe with index N. - typedef yarray_t <class writer_t> out_pipes_t; + typedef yarray_t <writer_t> out_pipes_t; out_pipes_t out_pipes; - typedef yarray_t <class reader_t> in_pipes_t; + typedef yarray_t <reader_t> in_pipes_t; in_pipes_t in_pipes; // Number of active inpipes. All the active inpipes are located at the @@ -73,7 +81,7 @@ namespace zmq bool more; // Pipe we are going to send reply to. - class writer_t *reply_pipe; + writer_t *reply_pipe; rep_t (const rep_t&); void operator = (const rep_t&); diff --git a/src/req.cpp b/src/req.cpp index f3695a2..b900961 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -23,8 +23,8 @@ #include "err.hpp" #include "pipe.hpp" -zmq::req_t::req_t (class app_thread_t *parent_) : - socket_base_t (parent_), +zmq::req_t::req_t (class ctx_t *parent_, uint32_t slot_) : + socket_base_t (parent_, slot_), active (0), current (0), receiving_reply (false), @@ -38,24 +38,36 @@ zmq::req_t::req_t (class app_thread_t *parent_) : zmq::req_t::~req_t () { + zmq_assert (in_pipes.empty ()); + zmq_assert (out_pipes.empty ()); } -void zmq::req_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) +void zmq::req_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, + const blob_t &peer_identity_) { zmq_assert (inpipe_ && outpipe_); zmq_assert (in_pipes.size () == out_pipes.size ()); + inpipe_->set_event_sink (this); in_pipes.push_back (inpipe_); in_pipes.swap (active, in_pipes.size () - 1); + outpipe_->set_event_sink (this); out_pipes.push_back (outpipe_); out_pipes.swap (active, out_pipes.size () - 1); active++; } -void zmq::req_t::xdetach_inpipe (class reader_t *pipe_) +void zmq::req_t::xterm_pipes () +{ + for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++) + in_pipes [i]->terminate (); + for (out_pipes_t::size_type i = 0; i != out_pipes.size (); i++) + out_pipes [i]->terminate (); +} + +void zmq::req_t::terminated (reader_t *pipe_) { zmq_assert (!receiving_reply || !more || reply_pipe != pipe_); @@ -63,17 +75,21 @@ void zmq::req_t::xdetach_inpipe (class reader_t *pipe_) zmq_assert (in_pipes.size () == out_pipes.size ()); // TODO: The pipe we are awaiting the reply from is detached. What now? - // Return ECONNRESET from subsequent recv? if (receiving_reply && pipe_ == reply_pipe) { zmq_assert (false); } in_pipes_t::size_type index = in_pipes.index (pipe_); - if (out_pipes [index]) - out_pipes [index]->term (); + // ??? + if (!zombie) { + if (out_pipes [index]) + out_pipes [index]->terminate (); + out_pipes.erase (index); + } + in_pipes.erase (index); - out_pipes.erase (index); + if (index < active) { active--; if (current == active) @@ -81,7 +97,7 @@ void zmq::req_t::xdetach_inpipe (class reader_t *pipe_) } } -void zmq::req_t::xdetach_outpipe (class writer_t *pipe_) +void zmq::req_t::terminated (writer_t *pipe_) { zmq_assert (receiving_reply || !more || out_pipes [current] != pipe_); @@ -90,9 +106,13 @@ void zmq::req_t::xdetach_outpipe (class writer_t *pipe_) out_pipes_t::size_type index = out_pipes.index (pipe_); - if (in_pipes [index]) - in_pipes [index]->term (); - in_pipes.erase (index); + // ??? + if (!zombie) { + if (in_pipes [index]) + in_pipes [index]->terminate (); + in_pipes.erase (index); + } + out_pipes.erase (index); if (index < active) { active--; @@ -101,15 +121,12 @@ void zmq::req_t::xdetach_outpipe (class writer_t *pipe_) } } -void zmq::req_t::xkill (class reader_t *pipe_) +bool zmq::req_t::xhas_pipes () { - zmq_assert (receiving_reply); - zmq_assert (pipe_ == reply_pipe); - - reply_pipe_active = false; + return !in_pipes.empty () || !out_pipes.empty (); } -void zmq::req_t::xrevive (class reader_t *pipe_) +void zmq::req_t::activated (reader_t *pipe_) { // TODO: Actually, misbehaving peer can cause this kind of thing. // Handle it decently, presumably kill the offending connection. @@ -117,7 +134,7 @@ void zmq::req_t::xrevive (class reader_t *pipe_) reply_pipe_active = true; } -void zmq::req_t::xrevive (class writer_t *pipe_) +void zmq::req_t::activated (writer_t *pipe_) { out_pipes_t::size_type index = out_pipes.index (pipe_); zmq_assert (index >= active); @@ -129,13 +146,6 @@ void zmq::req_t::xrevive (class writer_t *pipe_) } } -int zmq::req_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - errno = EINVAL; - return -1; -} - int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) { // If we've sent a request and we still haven't got the reply, @@ -214,6 +224,7 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_) // Get the reply from the reply pipe. if (!reply_pipe_active || !reply_pipe->read (msg_)) { + reply_pipe_active = false; zmq_msg_init (msg_); errno = EAGAIN; return -1; diff --git a/src/req.hpp b/src/req.hpp index 5ab7bca..5fd5642 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -22,31 +22,39 @@ #include "socket_base.hpp" #include "yarray.hpp" +#include "pipe.hpp" namespace zmq { - class req_t : public socket_base_t + class req_t : + public socket_base_t, + public i_reader_events, + public i_writer_events { public: - req_t (class app_thread_t *parent_); + req_t (class ctx_t *parent_, uint32_t slot_); ~req_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, const blob_t &peer_identity_); - void xdetach_inpipe (class reader_t *pipe_); - void xdetach_outpipe (class writer_t *pipe_); - void xkill (class reader_t *pipe_); - void xrevive (class reader_t *pipe_); - void xrevive (class writer_t *pipe_); - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + void xterm_pipes (); + bool xhas_pipes (); int xsend (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); + // i_reader_events interface implementation. + void activated (reader_t *pipe_); + void terminated (reader_t *pipe_); + + // i_writer_events interface implementation. + void activated (writer_t *pipe_); + void terminated (writer_t *pipe_); + private: // List in outbound and inbound pipes. Note that the two lists are @@ -58,9 +66,9 @@ namespace zmq // the beginning of the array). We don't have to do the same thing for // inpipes, because we know which pipe we want to read the // reply from. - typedef yarray_t <class writer_t> out_pipes_t; + typedef yarray_t <writer_t> out_pipes_t; out_pipes_t out_pipes; - typedef yarray_t <class reader_t> in_pipes_t; + typedef yarray_t <reader_t> in_pipes_t; in_pipes_t in_pipes; // Number of active pipes. @@ -82,7 +90,7 @@ namespace zmq bool more; // Pipe we are awaiting the reply from. - class reader_t *reply_pipe; + reader_t *reply_pipe; req_t (const req_t&); void operator = (const req_t&); diff --git a/src/semaphore.hpp b/src/semaphore.hpp new file mode 100644 index 0000000..1c4d2a0 --- /dev/null +++ b/src/semaphore.hpp @@ -0,0 +1,135 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_SEMAPHORE_HPP_INCLUDED__ +#define __ZMQ_SEMAPHORE_HPP_INCLUDED__ + +#include "platform.hpp" +#include "err.hpp" + +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include <semaphore.h> +#endif + +namespace zmq +{ + // Simple semaphore. Only single thread may be waiting at any given time. + // Also, the semaphore may not be posted before the previous post + // was matched by corresponding wait and the waiting thread was + // released. + +#if defined ZMQ_HAVE_WINDOWS + + // On Windows platform simple semaphore is implemeted using event object. + + class semaphore_t + { + public: + + // Initialise the semaphore. + inline semaphore_t () + { + ev = CreateEvent (NULL, FALSE, FALSE, NULL); + win_assert (ev != NULL); + } + + // Destroy the semaphore. + inline ~semaphore_t () + { + int rc = CloseHandle (ev); + win_assert (rc != 0); + } + + // Wait for the semaphore. + inline void wait () + { + DWORD rc = WaitForSingleObject (ev, INFINITE); + win_assert (rc != WAIT_FAILED); + } + + // Post the semaphore. + inline void post () + { + int rc = SetEvent (ev); + win_assert (rc != 0); + } + + private: + + HANDLE ev; + + // Disable copying of the object. + semaphore_t (const semaphore_t&); + void operator = (const semaphore_t&); + }; + +#else + + // Default implementation maps simple semaphore to POSIX semaphore. + + class semaphore_t + { + public: + + // Initialise the semaphore. + inline semaphore_t () + { + int rc = sem_init (&sem, 0, 0); + errno_assert (rc != -1); + } + + // Destroy the semaphore. + inline ~semaphore_t () + { + int rc = sem_destroy (&sem); + errno_assert (rc != -1); + } + + // Wait for the semaphore. + inline void wait () + { + int rc = sem_wait (&sem); + errno_assert (rc != -1); + } + + // Post the semaphore. + inline void post () + { + int rc = sem_post (&sem); + errno_assert (rc != -1); + } + + private: + + // Underlying system semaphore object. + sem_t sem; + + // Disable copying of the object. + semaphore_t (const semaphore_t&); + void operator = (const semaphore_t&); + }; + +#endif + +} + +#endif + diff --git a/src/session.cpp b/src/session.cpp index f798877..86086fb 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -69,13 +69,22 @@ zmq::session_t::~session_t () zmq_assert (!out_pipe); } +bool zmq::session_t::is_terminable () +{ + return in_pipe->is_terminating (); +} + bool zmq::session_t::read (::zmq_msg_t *msg_) { if (!in_pipe || !active) return false; - if (!in_pipe->read (msg_)) + if (!in_pipe->read (msg_)) { + active = false; + if (in_pipe->is_terminating ()) + finalise (); return false; + } incomplete_in = msg_->flags & ZMQ_MSG_MORE; return true; @@ -156,33 +165,28 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_, zmq_assert (!in_pipe); in_pipe = inpipe_; active = true; - in_pipe->set_endpoint (this); + in_pipe->set_event_sink (this); } if (outpipe_) { zmq_assert (!out_pipe); out_pipe = outpipe_; - out_pipe->set_endpoint (this); + out_pipe->set_event_sink (this); } } -void zmq::session_t::detach_inpipe (reader_t *pipe_) +void zmq::session_t::terminated (reader_t *pipe_) { active = false; in_pipe = NULL; } -void zmq::session_t::detach_outpipe (writer_t *pipe_) +void zmq::session_t::terminated (writer_t *pipe_) { out_pipe = NULL; } -void zmq::session_t::kill (reader_t *pipe_) -{ - active = false; -} - -void zmq::session_t::revive (reader_t *pipe_) +void zmq::session_t::activated (reader_t *pipe_) { zmq_assert (in_pipe == pipe_); active = true; @@ -190,7 +194,7 @@ void zmq::session_t::revive (reader_t *pipe_) engine->revive (); } -void zmq::session_t::revive (writer_t *pipe_) +void zmq::session_t::activated (writer_t *pipe_) { zmq_assert (out_pipe == pipe_); if (engine) @@ -203,6 +207,11 @@ void zmq::session_t::process_plug () void zmq::session_t::process_unplug () { + // TODO: There may be a problem here. The called ensures that all the + // commands on the fly have been delivered. However, given that the + // session is unregistered from the global repository only at this point + // there may be some commands being sent to the session right now. + // Unregister the session from the socket. if (ordinal) owner->unregister_session (ordinal); @@ -210,14 +219,10 @@ void zmq::session_t::process_unplug () owner->unregister_session (peer_identity); // Ask associated pipes to terminate. - if (in_pipe) { - in_pipe->term (); - in_pipe = NULL; - } - if (out_pipe) { - out_pipe->term (); - out_pipe = NULL; - } + if (in_pipe) + in_pipe->terminate (); + if (out_pipe) + out_pipe->terminate (); if (engine) { engine->unplug (); @@ -265,19 +270,15 @@ void zmq::session_t::process_attach (i_engine *engine_, writer_t *socket_writer = NULL; if (options.requires_in && !out_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm, options.swap); - zmq_assert (pipe); - out_pipe = &pipe->writer; - out_pipe->set_endpoint (this); - socket_reader = &pipe->reader; + create_pipe (owner, this, options.hwm, options.swap, &socket_reader, + &out_pipe); + out_pipe->set_event_sink (this); } if (options.requires_out && !in_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm, options.swap); - zmq_assert (pipe); - in_pipe = &pipe->reader; - in_pipe->set_endpoint (this); - socket_writer = &pipe->writer; + create_pipe (this, owner, options.hwm, options.swap, &in_pipe, + &socket_writer); + in_pipe->set_event_sink (this); } if (socket_reader || socket_writer) @@ -289,3 +290,4 @@ void zmq::session_t::process_attach (i_engine *engine_, engine = engine_; engine->plug (this); } + diff --git a/src/session.hpp b/src/session.hpp index 9bda1ad..603b50c 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -21,15 +21,19 @@ #define __ZMQ_SESSION_HPP_INCLUDED__ #include "i_inout.hpp" -#include "i_endpoint.hpp" #include "owned.hpp" #include "options.hpp" #include "blob.hpp" +#include "pipe.hpp" namespace zmq { - class session_t : public owned_t, public i_inout, public i_endpoint + class session_t : + public owned_t, + public i_inout, + public i_reader_events, + public i_writer_events { public: @@ -50,19 +54,25 @@ namespace zmq class socket_base_t *get_owner (); uint64_t get_ordinal (); - // i_endpoint interface implementation. void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - void detach_inpipe (class reader_t *pipe_); - void detach_outpipe (class writer_t *pipe_); - void kill (class reader_t *pipe_); - void revive (class reader_t *pipe_); - void revive (class writer_t *pipe_); + + // i_reader_events interface implementation. + void activated (class reader_t *pipe_); + void terminated (class reader_t *pipe_); + + // i_writer_events interface implementation. + void activated (class writer_t *pipe_); + void terminated (class writer_t *pipe_); private: ~session_t (); + // Define the delayed termination. (I.e. termination is postponed + // till all the data is flushed to the kernel.) + bool is_terminable (); + // Handlers for incoming commands. void process_plug (); void process_unplug (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index c933954..5d3175a 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -23,9 +23,18 @@ #include "../include/zmq.h" -#include "socket_base.hpp" -#include "app_thread.hpp" +#include "platform.hpp" + +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#if defined _MSC_VER +#include <intrin.h> +#endif +#else +#include <unistd.h> +#endif +#include "socket_base.hpp" #include "zmq_listener.hpp" #include "zmq_connecter.hpp" #include "io_thread.hpp" @@ -39,15 +48,73 @@ #include "pgm_sender.hpp" #include "pgm_receiver.hpp" #include "likely.hpp" +#include "pair.hpp" +#include "pub.hpp" +#include "sub.hpp" +#include "req.hpp" +#include "rep.hpp" +#include "pull.hpp" +#include "push.hpp" +#include "xreq.hpp" +#include "xrep.hpp" #include "uuid.hpp" -zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : - object_t (parent_), +// If the RDTSC is available we use it to prevent excessive +// polling for commands. The nice thing here is that it will work on any +// system with x86 architecture and gcc or MSVC compiler. +#if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\ + (defined _MSC_VER && (defined _M_IX86 || defined _M_X64)) +#define ZMQ_DELAY_COMMANDS +#endif + +zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, + uint32_t slot_) +{ + socket_base_t *s = NULL; + switch (type_) { + + case ZMQ_PAIR: + s = new (std::nothrow) pair_t (parent_, slot_); + break; + case ZMQ_PUB: + s = new (std::nothrow) pub_t (parent_, slot_); + break; + case ZMQ_SUB: + s = new (std::nothrow) sub_t (parent_, slot_); + break; + case ZMQ_REQ: + s = new (std::nothrow) req_t (parent_, slot_); + break; + case ZMQ_REP: + s = new (std::nothrow) rep_t (parent_, slot_); + break; + case ZMQ_XREQ: + s = new (std::nothrow) xreq_t (parent_, slot_); + break; + case ZMQ_XREP: + s = new (std::nothrow) xrep_t (parent_, slot_); + break; + case ZMQ_PULL: + s = new (std::nothrow) pull_t (parent_, slot_); + break; + case ZMQ_PUSH: + s = new (std::nothrow) push_t (parent_, slot_); + break; + default: + errno = EINVAL; + return NULL; + } + zmq_assert (s); + return s; +} + +zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) : + object_t (parent_, slot_), + zombie (false), + last_processing_time (0), pending_term_acks (0), ticks (0), rcvmore (false), - app_thread (parent_), - shutting_down (false), sent_seqnum (0), processed_seqnum (0), next_ordinal (1) @@ -58,10 +125,38 @@ zmq::socket_base_t::~socket_base_t () { } +zmq::signaler_t *zmq::socket_base_t::get_signaler () +{ + return &signaler; +} + +void zmq::socket_base_t::stop () +{ + // Called by ctx when it is terminated (zmq_term). + // 'stop' command is sent from the threads that called zmq_term to + // the thread owning the socket. This way, blocking call in the + // owner thread can be interrupted. + send_stop (); +} + +void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_, const blob_t &peer_identity_) +{ + // If the peer haven't specified it's identity, let's generate one. + if (peer_identity_.size ()) { + xattach_pipes (inpipe_, outpipe_, peer_identity_); + } + else { + blob_t identity (1, 0); + identity.append (uuid_t ().to_blob (), uuid_t::uuid_blob_len); + xattach_pipes (inpipe_, outpipe_, identity); + } +} + int zmq::socket_base_t::setsockopt (int option_, const void *optval_, size_t optvallen_) { - if (unlikely (app_thread->is_terminated ())) { + if (unlikely (zombie)) { errno = ETERM; return -1; } @@ -79,7 +174,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, int zmq::socket_base_t::getsockopt (int option_, void *optval_, size_t *optvallen_) { - if (unlikely (app_thread->is_terminated ())) { + if (unlikely (zombie)) { errno = ETERM; return -1; } @@ -94,12 +189,37 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, return 0; } + if (option_ == ZMQ_FD) { + if (*optvallen_ < sizeof (fd_t)) { + errno = EINVAL; + return -1; + } + *((fd_t*) optval_) = signaler.get_fd (); + *optvallen_ = sizeof (fd_t); + return 0; + } + + if (option_ == ZMQ_EVENTS) { + if (*optvallen_ < sizeof (uint32_t)) { + errno = EINVAL; + return -1; + } + process_commands(false, false); + *((uint32_t*) optval_) = 0; + if (has_out ()) + *((uint32_t*) optval_) |= ZMQ_POLLOUT; + if (has_in ()) + *((uint32_t*) optval_) |= ZMQ_POLLIN; + *optvallen_ = sizeof (uint32_t); + return 0; + } + return options.getsockopt (option_, optval_, optvallen_); } int zmq::socket_base_t::bind (const char *addr_) { - if (unlikely (app_thread->is_terminated ())) { + if (unlikely (zombie)) { errno = ETERM; return -1; } @@ -159,7 +279,7 @@ int zmq::socket_base_t::bind (const char *addr_) int zmq::socket_base_t::connect (const char *addr_) { - if (unlikely (app_thread->is_terminated ())) { + if (unlikely (zombie)) { errno = ETERM; return -1; } @@ -190,30 +310,29 @@ int zmq::socket_base_t::connect (const char *addr_) if (!peer) return -1; - pipe_t *in_pipe = NULL; - pipe_t *out_pipe = NULL; - + reader_t *inpipe_reader = NULL; + writer_t *inpipe_writer = NULL; + reader_t *outpipe_reader = NULL; + writer_t *outpipe_writer = NULL; + // Create inbound pipe, if required. - if (options.requires_in) { - in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm, options.swap); - zmq_assert (in_pipe); - } + if (options.requires_in) + create_pipe (this, peer, options.hwm, options.swap, + &inpipe_reader, &inpipe_writer); // Create outbound pipe, if required. - if (options.requires_out) { - out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm, options.swap); - zmq_assert (out_pipe); - } + if (options.requires_out) + create_pipe (peer, this, options.hwm, options.swap, + &outpipe_reader, &outpipe_writer); // Attach the pipes to this socket object. - attach_pipes (in_pipe ? &in_pipe->reader : NULL, - out_pipe ? &out_pipe->writer : NULL, blob_t ()); + attach_pipes (inpipe_reader, outpipe_writer, blob_t ()); // Attach the pipes to the peer socket. Note that peer's seqnum // was incremented in find_endpoint function. The callee is notified // about the fact via the last parameter. - send_bind (peer, out_pipe ? &out_pipe->reader : NULL, - in_pipe ? &in_pipe->writer : NULL, options.identity, false); + send_bind (peer, outpipe_reader, inpipe_writer, + options.identity, false); return 0; } @@ -224,34 +343,31 @@ int zmq::socket_base_t::connect (const char *addr_) this, options); zmq_assert (session); - // If 'immediate connect' feature is required, we'll created the pipes + // If 'immediate connect' feature is required, we'll create the pipes // to the session straight away. Otherwise, they'll be created by the // session once the connection is established. if (options.immediate_connect) { - pipe_t *in_pipe = NULL; - pipe_t *out_pipe = NULL; + reader_t *inpipe_reader = NULL; + writer_t *inpipe_writer = NULL; + reader_t *outpipe_reader = NULL; + writer_t *outpipe_writer = NULL; // Create inbound pipe, if required. - if (options.requires_in) { - in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm, options.swap); - zmq_assert (in_pipe); - - } + if (options.requires_in) + create_pipe (this, session, options.hwm, options.swap, + &inpipe_reader, &inpipe_writer); // Create outbound pipe, if required. - if (options.requires_out) { - out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm, options.swap); - zmq_assert (out_pipe); - } + if (options.requires_out) + create_pipe (session, this, options.hwm, options.swap, + &outpipe_reader, &outpipe_writer); // Attach the pipes to the socket object. - attach_pipes (in_pipe ? &in_pipe->reader : NULL, - out_pipe ? &out_pipe->writer : NULL, blob_t ()); + attach_pipes (inpipe_reader, outpipe_writer, blob_t ()); // Attach the pipes to the session object. - session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, - in_pipe ? &in_pipe->writer : NULL, blob_t ()); + session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ()); } // Activate the session. @@ -347,8 +463,14 @@ int zmq::socket_base_t::connect (const char *addr_) int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) { + if (unlikely (zombie)) { + errno = ETERM; + return -1; + } + // Process pending commands, if any. - if (unlikely (!app_thread->process_commands (false, true))) { + process_commands (false, true); + if (unlikely (zombie)) { errno = ETERM; return -1; } @@ -372,7 +494,8 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) while (rc != 0) { if (errno != EAGAIN) return -1; - if (unlikely (!app_thread->process_commands (true, false))) { + process_commands (true, false); + if (unlikely (zombie)) { errno = ETERM; return -1; } @@ -383,6 +506,11 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) { + if (unlikely (zombie)) { + errno = ETERM; + return -1; + } + // Get the message. int rc = xrecv (msg_, flags_); int err = errno; @@ -396,7 +524,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) // described above) from the one used by 'send'. This is because counting // ticks is more efficient than doing rdtsc all the time. if (++ticks == inbound_poll_rate) { - if (unlikely (!app_thread->process_commands (false, false))) { + process_commands (false, false); + if (unlikely (zombie)) { errno = ETERM; return -1; } @@ -420,7 +549,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) if (flags_ & ZMQ_NOBLOCK) { if (errno != EAGAIN) return -1; - if (unlikely (!app_thread->process_commands (false, false))) { + process_commands (false, false); + if (unlikely (zombie)) { errno = ETERM; return -1; } @@ -440,7 +570,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) while (rc != 0) { if (errno != EAGAIN) return -1; - if (unlikely (!app_thread->process_commands (true, false))) { + process_commands (true, false); + if (unlikely (zombie)) { errno = ETERM; return -1; } @@ -456,74 +587,72 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::close () { - shutting_down = true; - - // Let the thread know that the socket is no longer available. - app_thread->remove_socket (this); - - // Pointer to the context must be retrieved before the socket is - // deallocated. Afterwards it is not available. - ctx_t *ctx = get_ctx (); + // Socket becomes a zombie. From now on all new arrived pipes (bind + // command) and I/O objects (own command) are immediately terminated. + // Also, any further requests form I/O object termination are ignored + // (we are going to shut them down anyway -- this way we assure that + // we do so once only). + zombie = true; // Unregister all inproc endpoints associated with this socket. - // From this point we are sure that inc_seqnum won't be called again - // on this object. - ctx->unregister_endpoints (this); - - // Wait till all undelivered commands are delivered. This should happen - // very quickly. There's no way to wait here for extensive period of time. + // Doing this we make sure that no new pipes from other sockets (inproc) + // will be initiated. However, there may be some inproc pipes already + // on the fly, but not yet received by this socket. To get finished + // with them we'll do the subsequent waiting from on-the-fly commands. + // This should happen very quickly. There's no way to block here for + // extensive period of time. + unregister_endpoints (this); while (processed_seqnum != sent_seqnum.get ()) - app_thread->process_commands (true, false); - - while (true) { - - // On third pass of the loop there should be no more I/O objects - // because all connecters and listerners were destroyed during - // the first pass and all engines delivered by delayed 'own' commands - // are destroyed during the second pass. - if (io_objects.empty () && !pending_term_acks) - break; - - // Send termination request to all associated I/O objects. - for (io_objects_t::iterator it = io_objects.begin (); - it != io_objects.end (); it++) - send_term (*it); - - // Move the objects to the list of pending term acks. - pending_term_acks += io_objects.size (); - io_objects.clear (); - - // Process commands till we get all the termination acknowledgements. - while (pending_term_acks) - app_thread->process_commands (true, false); - } - - // Check whether there are no session leaks. - sessions_sync.lock (); - zmq_assert (named_sessions.empty ()); - zmq_assert (unnamed_sessions.empty ()); - sessions_sync.unlock (); - - delete this; - - // This function must be called after the socket is completely deallocated - // as it may cause termination of the whole 0MQ infrastructure. - ctx->destroy_socket (); + process_commands (true, false); + // TODO: My feeling is that the above has to be done in the dezombification + // loop, otherwise we may end up with number of i/o object dropping to zero + // even though there are more i/o objects on the way. + + // The above process ensures that only pipes that will arrive from now on + // are those initiated by sessions. These in turn have a nice property of + // not arriving totally asynchronously. When a session -- being an I/O + // object -- acknowledges its termination we are 100% sure that we'll get + // no new pipe from it. + + // Start termination of all the pipes presently associated with the socket. + xterm_pipes (); + + // Send termination request to all associated I/O objects. + // Start waiting for the acks. Note that the actual waiting is not done + // in this function. Rather it is done in delayed manner as socket is + // being dezombified. The reason is that I/O object shutdown can take + // considerable amount of time in case there's still a lot of data to + // push to the network. + for (io_objects_t::iterator it = io_objects.begin (); + it != io_objects.end (); it++) + send_term (*it); + pending_term_acks += io_objects.size (); + io_objects.clear (); + + // Note that new I/O objects may arrive even in zombie state (say new + // session initiated by a listener object), however, in such case number + // of pending acks never drops to zero. Here's the scenario: We have an + // pending ack for the listener object. Then 'own' commands arrives from + // the listener notifying the socket about new session. It immediately + // triggers termination request and number of of pending acks if + // incremented. Then term_acks arrives from the listener. Number of pending + // acks is decremented. Later on, the session itself will ack its + // termination. During the process, number of pending acks never dropped + // to zero and thus the socket remains safely in the zombie state. + + // Transfer the ownership of the socket from this application thread + // to the context which will take care of the rest of shutdown process. + zombify (this); return 0; } void zmq::socket_base_t::inc_seqnum () { - // NB: This function may be called from a different thread! + // Be aware: This function may be called from a different thread! sent_seqnum.add (1); } -zmq::app_thread_t *zmq::socket_base_t::get_thread () -{ - return app_thread; -} - bool zmq::socket_base_t::has_in () { return xhas_in (); @@ -607,68 +736,133 @@ zmq::session_t *zmq::socket_base_t::find_session (uint64_t ordinal_) return session; } -void zmq::socket_base_t::kill (reader_t *pipe_) +bool zmq::socket_base_t::dezombify () { - xkill (pipe_); -} + zmq_assert (zombie); -void zmq::socket_base_t::revive (reader_t *pipe_) -{ - xrevive (pipe_); -} + // Process any commands from other threads/sockets that may be available + // at the moment. + process_commands (false, false); -void zmq::socket_base_t::revive (writer_t *pipe_) -{ - xrevive (pipe_); + // If there are no more pipes attached and there are no more I/O objects + // owned by the socket, we can kill the zombie. + if (!pending_term_acks && !xhas_pipes ()) { + + // If all objects have acknowledged their termination there should + // definitely be no I/O object remaining in the list. + zmq_assert (io_objects.empty ()); + + // Check whether there are no session leaks. + sessions_sync.lock (); + zmq_assert (named_sessions.empty ()); + zmq_assert (unnamed_sessions.empty ()); + sessions_sync.unlock (); + + // Deallocate all the resources tied to this socket. + delete this; + + // Notify the caller about the fact that the zombie is finally dead. + return true; + } + + // The zombie remains undead. + return false; } -void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) +void zmq::socket_base_t::process_commands (bool block_, bool throttle_) { - if (inpipe_) - inpipe_->set_endpoint (this); - if (outpipe_) - outpipe_->set_endpoint (this); - - // If the peer haven't specified it's identity, let's generate one. - if (peer_identity_.size ()) { - xattach_pipes (inpipe_, outpipe_, peer_identity_); + bool received; + command_t cmd; + if (block_) { + received = signaler.recv (&cmd, true); + zmq_assert (received); } else { - blob_t identity (1, 0); - identity.append (uuid_t ().to_blob (), uuid_t::uuid_blob_len); - xattach_pipes (inpipe_, outpipe_, identity); + +#if defined ZMQ_DELAY_COMMANDS + // Optimised version of command processing - it doesn't have to check + // for incoming commands each time. It does so only if certain time + // elapsed since last command processing. Command delay varies + // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU + // etc. The optimisation makes sense only on platforms where getting + // a timestamp is a very cheap operation (tens of nanoseconds). + if (throttle_) { + + // Get timestamp counter. +#if defined __GNUC__ + uint32_t low; + uint32_t high; + __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high)); + uint64_t current_time = (uint64_t) high << 32 | low; +#elif defined _MSC_VER + uint64_t current_time = __rdtsc (); +#else +#error +#endif + + // Check whether certain time have elapsed since last command + // processing. + if (current_time - last_processing_time <= max_command_delay) + return; + last_processing_time = current_time; + } +#endif + + // Check whether there are any commands pending for this thread. + received = signaler.recv (&cmd, false); } -} -void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_) -{ - xdetach_inpipe (pipe_); - pipe_->set_endpoint (NULL); // ? + // Process all the commands available at the moment. + while (received) { + cmd.destination->process_command (cmd); + received = signaler.recv (&cmd, false); + } } -void zmq::socket_base_t::detach_outpipe (class writer_t *pipe_) +void zmq::socket_base_t::process_stop () { - xdetach_outpipe (pipe_); - pipe_->set_endpoint (NULL); // ? + // Here, someone have called zmq_term while the socket was still alive. + // We'll zombify it so that any blocking call is interrupted and any + // further attempt to use the socket will return ETERM. The user is still + // responsible for calling zmq_close on the socket though! + zombie = true; } void zmq::socket_base_t::process_own (owned_t *object_) { + // If the socket is already being shut down, new owned objects are + // immediately asked to terminate. + if (zombie) { + send_term (object_); + pending_term_acks++; + return; + } + io_objects.insert (object_); } void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_) { + // If the socket is already being shut down, the termination process on + // the new pipes is started immediately. However, they are still attached + // as to let the process finish in a decent manner. + if (unlikely (zombie)) { + if (in_pipe_) + in_pipe_->terminate (); + if (out_pipe_) + out_pipe_->terminate (); + } + attach_pipes (in_pipe_, out_pipe_, peer_identity_); } void zmq::socket_base_t::process_term_req (owned_t *object_) { // When shutting down we can ignore termination requests from owned - // objects. They are going to be terminated anyway. - if (shutting_down) + // objects. It means the termination request was already sent to + // the object. + if (zombie) return; // If I/O object is well and alive ask it to terminate. @@ -676,7 +870,7 @@ void zmq::socket_base_t::process_term_req (owned_t *object_) io_objects.end (), object_); // If not found, we assume that termination request was already sent to - // the object so we can sagely ignore the request. + // the object so we can safely ignore the request. if (it == io_objects.end ()) return; @@ -696,3 +890,32 @@ void zmq::socket_base_t::process_seqnum () processed_seqnum++; } +int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + errno = EINVAL; + return -1; +} + +bool zmq::socket_base_t::xhas_out () +{ + return false; +} + +int zmq::socket_base_t::xsend (zmq_msg_t *msg_, int options_) +{ + errno = ENOTSUP; + return -1; +} + +bool zmq::socket_base_t::xhas_in () +{ + return false; +} + +int zmq::socket_base_t::xrecv (zmq_msg_t *msg_, int options_) +{ + errno = ENOTSUP; + return -1; +} + diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 3d95cec..386fdbb 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -26,13 +26,13 @@ #include "../include/zmq.h" -#include "i_endpoint.hpp" #include "object.hpp" #include "yarray_item.hpp" #include "mutex.hpp" #include "options.hpp" #include "stdint.hpp" #include "atomic_counter.hpp" +#include "signaler.hpp" #include "stdint.hpp" #include "blob.hpp" @@ -40,11 +40,21 @@ namespace zmq { class socket_base_t : - public object_t, public i_endpoint, public yarray_item_t + public object_t, + public yarray_item_t { public: - socket_base_t (class app_thread_t *parent_); + // Create a socket of a specified type. + static socket_base_t *create (int type_, class ctx_t *parent_, + uint32_t slot_); + + // Returns the signaler associated with this socket. + signaler_t *get_signaler (); + + // Interrupt blocking call if the socket is stuck in one. + // This function can be called from a different thread! + void stop (); // Interface for communication with the API layer. int setsockopt (int option_, const void *optval_, size_t optvallen_); @@ -60,11 +70,6 @@ namespace zmq // before the command is delivered. void inc_seqnum (); - // This function is used by the polling mechanism to determine - // whether the socket belongs to the application thread the poll - // is called from. - class app_thread_t *get_thread (); - // These functions are used by the polling mechanism to determine // which events are to be reported from this socket. bool has_in (); @@ -85,43 +90,67 @@ namespace zmq void unregister_session (uint64_t ordinal_); class session_t *find_session (uint64_t ordinal_); - // i_endpoint interface implementation. - void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, - const blob_t &peer_identity_); - void detach_inpipe (class reader_t *pipe_); - void detach_outpipe (class writer_t *pipe_); - void kill (class reader_t *pipe_); - void revive (class reader_t *pipe_); - void revive (class writer_t *pipe_); + // i_reader_events interface implementation. + void activated (class reader_t *pipe_); + void terminated (class reader_t *pipe_); + + // i_writer_events interface implementation. + void activated (class writer_t *pipe_); + void terminated (class writer_t *pipe_); + + // This function should be called only on zombie sockets. It tries + // to deallocate the zombie and returns true is successful. + bool dezombify (); protected: - // Destructor is protected. Socket is closed using 'close' function. + socket_base_t (class ctx_t *parent_, uint32_t slot_); virtual ~socket_base_t (); - // Pipe management is done by individual socket types. + // Concrete algorithms for the x- methods are to be defined by + // individual socket types. + virtual void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_) = 0; - virtual void xdetach_inpipe (class reader_t *pipe_) = 0; - virtual void xdetach_outpipe (class writer_t *pipe_) = 0; - virtual void xkill (class reader_t *pipe_) = 0; - virtual void xrevive (class reader_t *pipe_) = 0; - virtual void xrevive (class writer_t *pipe_) = 0; + virtual void xterm_pipes () = 0; + virtual bool xhas_pipes () = 0; - // Actual algorithms are to be defined by individual socket types. + // The default implementation assumes there are no specific socket + // options for the particular socket type. If not so, overload this + // method. virtual int xsetsockopt (int option_, const void *optval_, - size_t optvallen_) = 0; - virtual int xsend (zmq_msg_t *msg_, int options_) = 0; - virtual int xrecv (zmq_msg_t *msg_, int options_) = 0; - virtual bool xhas_in () = 0; - virtual bool xhas_out () = 0; + size_t optvallen_); + + // The default implementation assumes that send is not supported. + virtual bool xhas_out (); + virtual int xsend (zmq_msg_t *msg_, int options_); + + // The default implementation assumes that recv in not supported. + virtual bool xhas_in (); + virtual int xrecv (zmq_msg_t *msg_, int options_); // Socket options. options_t options; + // If true, socket was already closed but not yet deallocated + // because either shutdown is in process or there are still pipes + // attached to the socket. + bool zombie; + private: + // If no identity set generate one and call xattach_pipes (). + void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); + + // Processes commands sent to this socket (if any). If 'block' is + // set to true, returns only after at least one command was processed. + // If throttle argument is true, commands are processed at most once + // in a predefined time period. + void process_commands (bool block_, bool throttle_); + // Handlers for incoming commands. + void process_stop (); void process_own (class owned_t *object_); void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_, const blob_t &peer_identity_); @@ -129,6 +158,12 @@ namespace zmq void process_term_ack (); void process_seqnum (); + // App thread's signaler object. + signaler_t signaler; + + // Timestamp of when commands were processed the last time. + uint64_t last_processing_time; + // List of all I/O objects owned by this socket. The socket is // responsible for deallocating them before it quits. typedef std::set <class owned_t*> io_objects_t; @@ -144,13 +179,6 @@ namespace zmq // If true there's a half-read message in the socket. bool rcvmore; - // Application thread the socket lives in. - class app_thread_t *app_thread; - - // If true, socket is already shutting down. No new work should be - // started. - bool shutting_down; - // Sequence number of the last command sent to this object. atomic_counter_t sent_seqnum; diff --git a/src/sub.cpp b/src/sub.cpp index eeb50cd..a1e8fb7 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -24,8 +24,8 @@ #include "sub.hpp" #include "err.hpp" -zmq::sub_t::sub_t (class app_thread_t *parent_) : - socket_base_t (parent_), +zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t slot_) : + socket_base_t (parent_, slot_), has_message (false), more (false) { @@ -46,31 +46,14 @@ void zmq::sub_t::xattach_pipes (class reader_t *inpipe_, fq.attach (inpipe_); } -void zmq::sub_t::xdetach_inpipe (class reader_t *pipe_) +void zmq::sub_t::xterm_pipes () { - zmq_assert (pipe_); - fq.detach (pipe_); + fq.term_pipes (); } -void zmq::sub_t::xdetach_outpipe (class writer_t *pipe_) +bool zmq::sub_t::xhas_pipes () { - // SUB socket is read-only thus there should be no outpipes. - zmq_assert (false); -} - -void zmq::sub_t::xkill (class reader_t *pipe_) -{ - fq.kill (pipe_); -} - -void zmq::sub_t::xrevive (class reader_t *pipe_) -{ - fq.revive (pipe_); -} - -void zmq::sub_t::xrevive (class writer_t *pipe_) -{ - zmq_assert (false); + return fq.has_pipes (); } int zmq::sub_t::xsetsockopt (int option_, const void *optval_, @@ -93,12 +76,6 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_, return -1; } -int zmq::sub_t::xsend (zmq_msg_t *msg_, int flags_) -{ - errno = ENOTSUP; - return -1; -} - int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_) { // If there's already a message prepared by a previous call to zmq_poll, @@ -179,11 +156,6 @@ bool zmq::sub_t::xhas_in () } } -bool zmq::sub_t::xhas_out () -{ - return false; -} - bool zmq::sub_t::match (zmq_msg_t *msg_) { return subscriptions.check ((unsigned char*) zmq_msg_data (msg_), diff --git a/src/sub.hpp b/src/sub.hpp index 7b997c9..da69892 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -33,7 +33,7 @@ namespace zmq { public: - sub_t (class app_thread_t *parent_); + sub_t (class ctx_t *parent_, uint32_t slot_); ~sub_t (); protected: @@ -41,16 +41,11 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - void xdetach_inpipe (class reader_t *pipe_); - void xdetach_outpipe (class writer_t *pipe_); - void xkill (class reader_t *pipe_); - void xrevive (class reader_t *pipe_); - void xrevive (class writer_t *pipe_); + void xterm_pipes (); + bool xhas_pipes (); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); - int xsend (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); - bool xhas_out (); private: diff --git a/src/thread.cpp b/src/thread.cpp index 602ca8b..4e86531 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -38,16 +38,6 @@ void zmq::thread_t::stop () win_assert (rc != WAIT_FAILED); } -zmq::thread_t::id_t zmq::thread_t::id () -{ - return GetCurrentThreadId (); -} - -bool zmq::thread_t::equal (id_t id1_, id_t id2_) -{ - return id1_ == id2_; -} - unsigned int __stdcall zmq::thread_t::thread_routine (void *arg_) { thread_t *self = (thread_t*) arg_; @@ -73,16 +63,6 @@ void zmq::thread_t::stop () errno_assert (rc == 0); } -zmq::thread_t::id_t zmq::thread_t::id () -{ - return pthread_self (); -} - -bool zmq::thread_t::equal (id_t id1_, id_t id2_) -{ - return pthread_equal (id1_, id2_) != 0; -} - void *zmq::thread_t::thread_routine (void *arg_) { #if !defined ZMQ_HAVE_OPENVMS diff --git a/src/thread.hpp b/src/thread.hpp index 432770c..8af6ea5 100644 --- a/src/thread.hpp +++ b/src/thread.hpp @@ -54,15 +54,6 @@ namespace zmq // Waits for thread termination. void stop (); - -#ifdef ZMQ_HAVE_WINDOWS - typedef DWORD id_t; -#else - typedef pthread_t id_t; -#endif - - static id_t id (); - static bool equal (id_t id1_, id_t id2_); private: diff --git a/src/xrep.cpp b/src/xrep.cpp index 5fd6cbb..73d7856 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -23,8 +23,8 @@ #include "err.hpp" #include "pipe.hpp" -zmq::xrep_t::xrep_t (class app_thread_t *parent_) : - socket_base_t (parent_), +zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t slot_) : + socket_base_t (parent_, slot_), current_in (0), prefetched (false), more_in (false), @@ -41,31 +41,41 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) : zmq::xrep_t::~xrep_t () { - for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); it++) - it->reader->term (); - for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); - it++) - it->second.writer->term (); + zmq_assert (inpipes.empty ()); + zmq_assert (outpipes.empty ()); } -void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) +void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, + const blob_t &peer_identity_) { zmq_assert (inpipe_ && outpipe_); + outpipe_->set_event_sink (this); + // TODO: What if new connection has same peer identity as the old one? outpipe_t outpipe = {outpipe_, true}; bool ok = outpipes.insert (std::make_pair ( peer_identity_, outpipe)).second; zmq_assert (ok); + inpipe_->set_event_sink (this); + inpipe_t inpipe = {inpipe_, peer_identity_, true}; inpipes.push_back (inpipe); } -void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_) +void zmq::xrep_t::xterm_pipes () +{ + for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); + it++) + it->reader->terminate (); + for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); + it++) + it->second.writer->terminate (); +} + +void zmq::xrep_t::terminated (reader_t *pipe_) { -// TODO:! for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); it++) { if (it->reader == pipe_) { @@ -76,7 +86,7 @@ void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_) zmq_assert (false); } -void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_) +void zmq::xrep_t::terminated (writer_t *pipe_) { for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); ++it) { @@ -90,20 +100,12 @@ void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_) zmq_assert (false); } -void zmq::xrep_t::xkill (class reader_t *pipe_) +bool zmq::xrep_t::xhas_pipes () { - for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); - it++) { - if (it->reader == pipe_) { - zmq_assert (it->active); - it->active = false; - return; - } - } - zmq_assert (false); + return !inpipes.empty () || !outpipes.empty (); } -void zmq::xrep_t::xrevive (class reader_t *pipe_) +void zmq::xrep_t::activated (reader_t *pipe_) { for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); it++) { @@ -116,7 +118,7 @@ void zmq::xrep_t::xrevive (class reader_t *pipe_) zmq_assert (false); } -void zmq::xrep_t::xrevive (class writer_t *pipe_) +void zmq::xrep_t::activated (writer_t *pipe_) { for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); ++it) { @@ -129,13 +131,6 @@ void zmq::xrep_t::xrevive (class writer_t *pipe_) zmq_assert (false); } -int zmq::xrep_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - errno = EINVAL; - return -1; -} - int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) { // If this is the first part of the message it's the identity of the @@ -232,7 +227,9 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) return 0; } - // If me don't have a message, move to next pipe. + // If me don't have a message, mark the pipe as passive and + // move to next pipe. + inpipes [current_in].active = false; current_in++; if (current_in >= inpipes.size ()) current_in = 0; @@ -259,6 +256,10 @@ bool zmq::xrep_t::xhas_in () if (inpipes [current_in].active && inpipes [current_in].reader->check_read ()) return true; + + // If me don't have a message, mark the pipe as passive and + // move to next pipe. + inpipes [current_in].active = false; current_in++; if (current_in >= inpipes.size ()) current_in = 0; diff --git a/src/xrep.hpp b/src/xrep.hpp index da1b3d8..1c240ff 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -25,32 +25,40 @@ #include "socket_base.hpp" #include "blob.hpp" +#include "pipe.hpp" namespace zmq { // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. - class xrep_t : public socket_base_t + class xrep_t : + public socket_base_t, + public i_reader_events, + public i_writer_events { public: - xrep_t (class app_thread_t *parent_); + xrep_t (class ctx_t *parent_, uint32_t slot_); ~xrep_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, const blob_t &peer_identity_); - void xdetach_inpipe (class reader_t *pipe_); - void xdetach_outpipe (class writer_t *pipe_); - void xkill (class reader_t *pipe_); - void xrevive (class reader_t *pipe_); - void xrevive (class writer_t *pipe_); - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + void xterm_pipes (); + bool xhas_pipes (); int xsend (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); + // i_reader_events interface implementation. + void activated (reader_t *pipe_); + void terminated (reader_t *pipe_); + + // i_writer_events interface implementation. + void activated (writer_t *pipe_); + void terminated (writer_t *pipe_); + private: struct inpipe_t diff --git a/src/xreq.cpp b/src/xreq.cpp index 66e5cc3..893c18e 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -22,8 +22,8 @@ #include "xreq.hpp" #include "err.hpp" -zmq::xreq_t::xreq_t (class app_thread_t *parent_) : - socket_base_t (parent_) +zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t slot_) : + socket_base_t (parent_, slot_) { options.requires_in = true; options.requires_out = true; @@ -41,38 +41,15 @@ void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_, lb.attach (outpipe_); } -void zmq::xreq_t::xdetach_inpipe (class reader_t *pipe_) +void zmq::xreq_t::xterm_pipes () { - zmq_assert (pipe_); - fq.detach (pipe_); + fq.term_pipes (); + lb.term_pipes (); } -void zmq::xreq_t::xdetach_outpipe (class writer_t *pipe_) +bool zmq::xreq_t::xhas_pipes () { - zmq_assert (pipe_); - lb.detach (pipe_); -} - -void zmq::xreq_t::xkill (class reader_t *pipe_) -{ - fq.kill (pipe_); -} - -void zmq::xreq_t::xrevive (class reader_t *pipe_) -{ - fq.revive (pipe_); -} - -void zmq::xreq_t::xrevive (class writer_t *pipe_) -{ - lb.revive (pipe_); -} - -int zmq::xreq_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - errno = EINVAL; - return -1; + return fq.has_pipes () || lb.has_pipes (); } int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) diff --git a/src/xreq.hpp b/src/xreq.hpp index 8ee0bb9..b8b9a0b 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -31,18 +31,14 @@ namespace zmq { public: - xreq_t (class app_thread_t *parent_); + xreq_t (class ctx_t *parent_, uint32_t slot_); ~xreq_t (); // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - void xdetach_inpipe (class reader_t *pipe_); - void xdetach_outpipe (class writer_t *pipe_); - void xkill (class reader_t *pipe_); - void xrevive (class reader_t *pipe_); - void xrevive (class writer_t *pipe_); - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + void xterm_pipes (); + bool xhas_pipes (); int xsend (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); diff --git a/src/zmq.cpp b/src/zmq.cpp index 342a8a6..6dc577e 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -29,7 +29,6 @@ #include "queue.hpp" #include "streamer.hpp" #include "socket_base.hpp" -#include "app_thread.hpp" #include "msg_content.hpp" #include "platform.hpp" #include "stdint.hpp" @@ -83,8 +82,6 @@ const char *zmq_strerror (int errnum_) case EINPROGRESS: return "Operation in progress"; #endif - case EMTHREAD: - return "Number of preallocated application threads exceeded"; case EFSM: return "Operation cannot be accomplished in current state"; case ENOCOMPATPROTO: @@ -367,6 +364,7 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) { +/* #if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ @@ -679,6 +677,9 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) errno = ENOTSUP; return -1; #endif +*/ +zmq_assert (false); +return -1; } int zmq_errno () diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index 077286f..d552c61 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -52,7 +52,7 @@ bool zmq::zmq_encoder_t::size_ready () bool zmq::zmq_encoder_t::message_ready () { // Destroy content of the old message. - zmq_msg_close(&in_progress); + zmq_msg_close (&in_progress); // Read new message. If there is none, return false. // Note that new state is set only if write is successful. That way |