diff options
47 files changed, 1427 insertions, 1406 deletions
diff --git a/include/zmq.h b/include/zmq.h index bce1215..075ff3c 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -82,7 +82,6 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch); #endif /* Native 0MQ error codes. */ -#define EMTHREAD (ZMQ_HAUSNUMERO + 50) #define EFSM (ZMQ_HAUSNUMERO + 51) #define ENOCOMPATPROTO (ZMQ_HAUSNUMERO + 52) #define ETERM (ZMQ_HAUSNUMERO + 53) @@ -152,36 +151,39 @@ ZMQ_EXPORT int zmq_term (void *context); /******************************************************************************/ /* Socket types. */ -#define ZMQ_PAIR 0 -#define ZMQ_PUB 1 -#define ZMQ_SUB 2 -#define ZMQ_REQ 3 -#define ZMQ_REP 4 -#define ZMQ_XREQ 5 -#define ZMQ_XREP 6 -#define ZMQ_PULL 7 -#define ZMQ_PUSH 8 -#define ZMQ_UPSTREAM ZMQ_PULL /* Old alias, remove in 3.x */ -#define ZMQ_DOWNSTREAM ZMQ_PUSH /* Old alias, remove in 3.x */ +#define ZMQ_PAIR 0 +#define ZMQ_PUB 1 +#define ZMQ_SUB 2 +#define ZMQ_REQ 3 +#define ZMQ_REP 4 +#define ZMQ_XREQ 5 +#define ZMQ_XREP 6 +#define ZMQ_PULL 7 +#define ZMQ_PUSH 8 + +/* Deprecated aliases, to be removed in release 3.x */ +#define ZMQ_UPSTREAM ZMQ_PULL +#define ZMQ_DOWNSTREAM ZMQ_PUSH /* Socket options. */ -#define ZMQ_HWM 1 -/* ZMQ_LWM 2 no longer supported */ -#define ZMQ_SWAP 3 -#define ZMQ_AFFINITY 4 -#define ZMQ_IDENTITY 5 -#define ZMQ_SUBSCRIBE 6 -#define ZMQ_UNSUBSCRIBE 7 -#define ZMQ_RATE 8 +#define ZMQ_HWM 1 +#define ZMQ_SWAP 3 +#define ZMQ_AFFINITY 4 +#define ZMQ_IDENTITY 5 +#define ZMQ_SUBSCRIBE 6 +#define ZMQ_UNSUBSCRIBE 7 +#define ZMQ_RATE 8 #define ZMQ_RECOVERY_IVL 9 -#define ZMQ_MCAST_LOOP 10 -#define ZMQ_SNDBUF 11 -#define ZMQ_RCVBUF 12 -#define ZMQ_RCVMORE 13 +#define ZMQ_MCAST_LOOP 10 +#define ZMQ_SNDBUF 11 +#define ZMQ_RCVBUF 12 +#define ZMQ_RCVMORE 13 +#define ZMQ_FD 14 +#define ZMQ_EVENTS 15 /* Send/recv options. */ -#define ZMQ_NOBLOCK 1 -#define ZMQ_SNDMORE 2 +#define ZMQ_NOBLOCK 1 +#define ZMQ_SNDMORE 2 ZMQ_EXPORT void *zmq_socket (void *context, int type); ZMQ_EXPORT int zmq_close (void *s); @@ -198,9 +200,9 @@ ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags); /* I/O multiplexing. */ /******************************************************************************/ -#define ZMQ_POLLIN 1 -#define ZMQ_POLLOUT 2 -#define ZMQ_POLLERR 4 +#define ZMQ_POLLIN 1 +#define ZMQ_POLLOUT 2 +#define ZMQ_POLLERR 4 typedef struct { @@ -217,17 +219,15 @@ typedef struct ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); /******************************************************************************/ -/* Devices */ +/* Devices - Experimental. */ /******************************************************************************/ -#define ZMQ_QUEUE 1 -#define ZMQ_FORWARDER 2 -#define ZMQ_STREAMER 3 +#define ZMQ_STREAMER 1 +#define ZMQ_FORWARDER 2 +#define ZMQ_QUEUE 3 ZMQ_EXPORT int zmq_device (int device, void * insocket, void* outsocket); -#undef ZMQ_EXPORT - #ifdef __cplusplus } #endif diff --git a/src/Makefile.am b/src/Makefile.am index 19a80d0..937372f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -49,7 +49,7 @@ endif nodist_libzmq_la_SOURCES = $(pgm_sources) -libzmq_la_SOURCES = app_thread.hpp \ +libzmq_la_SOURCES = \ atomic_counter.hpp \ atomic_ptr.hpp \ blob.hpp \ @@ -58,7 +58,6 @@ libzmq_la_SOURCES = app_thread.hpp \ ctx.hpp \ decoder.hpp \ devpoll.hpp \ - push.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ @@ -69,7 +68,6 @@ libzmq_la_SOURCES = app_thread.hpp \ io_object.hpp \ io_thread.hpp \ ip.hpp \ - i_endpoint.hpp \ i_engine.hpp \ i_poll_events.hpp \ kqueue.hpp \ @@ -91,10 +89,13 @@ libzmq_la_SOURCES = app_thread.hpp \ pair.hpp \ prefix_tree.hpp \ pub.hpp \ + pull.hpp \ + push.hpp \ queue.hpp \ rep.hpp \ req.hpp \ select.hpp \ + semaphore.hpp \ session.hpp \ signaler.hpp \ socket_base.hpp \ @@ -105,7 +106,6 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ - pull.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ @@ -121,11 +121,9 @@ libzmq_la_SOURCES = app_thread.hpp \ zmq_engine.hpp \ zmq_init.hpp \ zmq_listener.hpp \ - app_thread.cpp \ command.cpp \ ctx.cpp \ devpoll.cpp \ - push.cpp \ epoll.cpp \ err.cpp \ forwarder.cpp \ @@ -139,13 +137,15 @@ libzmq_la_SOURCES = app_thread.hpp \ object.cpp \ options.cpp \ owned.cpp \ + pair.cpp \ pgm_receiver.cpp \ pgm_sender.cpp \ pgm_socket.cpp \ - pair.cpp \ prefix_tree.cpp \ pipe.cpp \ poll.cpp \ + pull.cpp \ + push.cpp \ pub.cpp \ queue.cpp \ rep.cpp \ @@ -160,7 +160,6 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ - pull.cpp \ uuid.cpp \ xrep.cpp \ xreq.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp deleted file mode 100644 index ac59464..0000000 --- a/src/app_thread.cpp +++ /dev/null @@ -1,195 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <new> -#include <algorithm> - -#include "../include/zmq.h" - -#include "platform.hpp" - -#if defined ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#if defined _MSC_VER -#include <intrin.h> -#endif -#else -#include <unistd.h> -#endif - -#include "app_thread.hpp" -#include "ctx.hpp" -#include "err.hpp" -#include "pipe.hpp" -#include "config.hpp" -#include "socket_base.hpp" -#include "pair.hpp" -#include "pub.hpp" -#include "sub.hpp" -#include "req.hpp" -#include "rep.hpp" -#include "xreq.hpp" -#include "xrep.hpp" -#include "pull.hpp" -#include "push.hpp" - -// If the RDTSC is available we use it to prevent excessive -// polling for commands. The nice thing here is that it will work on any -// system with x86 architecture and gcc or MSVC compiler. -#if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\ - (defined _MSC_VER && (defined _M_IX86 || defined _M_X64)) -#define ZMQ_DELAY_COMMANDS -#endif - -zmq::app_thread_t::app_thread_t (ctx_t *ctx_, - uint32_t thread_slot_) : - object_t (ctx_, thread_slot_), - last_processing_time (0), - terminated (false) -{ -} - -zmq::app_thread_t::~app_thread_t () -{ - zmq_assert (sockets.empty ()); -} - -void zmq::app_thread_t::stop () -{ - send_stop (); -} - -zmq::signaler_t *zmq::app_thread_t::get_signaler () -{ - return &signaler; -} - -bool zmq::app_thread_t::process_commands (bool block_, bool throttle_) -{ - bool received; - command_t cmd; - if (block_) { - received = signaler.recv (&cmd, true); - zmq_assert (received); - } - else { - -#if defined ZMQ_DELAY_COMMANDS - // Optimised version of command processing - it doesn't have to check - // for incoming commands each time. It does so only if certain time - // elapsed since last command processing. Command delay varies - // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU - // etc. The optimisation makes sense only on platforms where getting - // a timestamp is a very cheap operation (tens of nanoseconds). - if (throttle_) { - - // Get timestamp counter. -#if defined __GNUC__ - uint32_t low; - uint32_t high; - __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high)); - uint64_t current_time = (uint64_t) high << 32 | low; -#elif defined _MSC_VER - uint64_t current_time = __rdtsc (); -#else -#error -#endif - - // Check whether certain time have elapsed since last command - // processing. - if (current_time - last_processing_time <= max_command_delay) - return !terminated; - last_processing_time = current_time; - } -#endif - - // Check whether there are any commands pending for this thread. - received = signaler.recv (&cmd, false); - } - - // Process all the commands available at the moment. - while (received) { - cmd.destination->process_command (cmd); - received = signaler.recv (&cmd, false); - } - - return !terminated; -} - -zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) -{ - socket_base_t *s = NULL; - switch (type_) { - case ZMQ_PAIR: - s = new (std::nothrow) pair_t (this); - break; - case ZMQ_PUB: - s = new (std::nothrow) pub_t (this); - break; - case ZMQ_SUB: - s = new (std::nothrow) sub_t (this); - break; - case ZMQ_REQ: - s = new (std::nothrow) req_t (this); - break; - case ZMQ_REP: - s = new (std::nothrow) rep_t (this); - break; - case ZMQ_XREQ: - s = new (std::nothrow) xreq_t (this); - break; - case ZMQ_XREP: - s = new (std::nothrow) xrep_t (this); - break; - case ZMQ_PULL: - s = new (std::nothrow) pull_t (this); - break; - case ZMQ_PUSH: - s = new (std::nothrow) push_t (this); - break; - default: - if (sockets.empty ()) - get_ctx ()->no_sockets (this); - errno = EINVAL; - return NULL; - } - zmq_assert (s); - - sockets.push_back (s); - - return s; -} - -void zmq::app_thread_t::remove_socket (socket_base_t *socket_) -{ - sockets.erase (socket_); - if (sockets.empty ()) - get_ctx ()->no_sockets (this); -} - -void zmq::app_thread_t::process_stop () -{ - terminated = true; -} - -bool zmq::app_thread_t::is_terminated () -{ - return terminated; -} - diff --git a/src/app_thread.hpp b/src/app_thread.hpp deleted file mode 100644 index f0deaab..0000000 --- a/src/app_thread.hpp +++ /dev/null @@ -1,88 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_APP_THREAD_HPP_INCLUDED__ -#define __ZMQ_APP_THREAD_HPP_INCLUDED__ - -#include <vector> - -#include "stdint.hpp" -#include "object.hpp" -#include "yarray.hpp" -#include "signaler.hpp" - -namespace zmq -{ - - class app_thread_t : public object_t - { - public: - - app_thread_t (class ctx_t *ctx_, uint32_t thread_slot_); - - ~app_thread_t (); - - // Interrupt blocking call if the app thread is stuck in one. - // This function is is called from a different thread! - void stop (); - - // Returns signaler associated with this application thread. - signaler_t *get_signaler (); - - // Processes commands sent to this thread (if any). If 'block' is - // set to true, returns only after at least one command was processed. - // If throttle argument is true, commands are processed at most once - // in a predefined time period. The function returns false is the - // associated context was terminated, true otherwise. - bool process_commands (bool block_, bool throttle_); - - // Create a socket of a specified type. - class socket_base_t *create_socket (int type_); - - // Unregister the socket from the app_thread (called by socket itself). - void remove_socket (class socket_base_t *socket_); - - // Returns true is the associated context was already terminated. - bool is_terminated (); - - private: - - // Command handlers. - void process_stop (); - - // All the sockets created from this application thread. - typedef yarray_t <socket_base_t> sockets_t; - sockets_t sockets; - - // App thread's signaler object. - signaler_t signaler; - - // Timestamp of when commands were processed the last time. - uint64_t last_processing_time; - - // If true, 'stop' command was already received. - bool terminated; - - app_thread_t (const app_thread_t&); - void operator = (const app_thread_t&); - }; - -} - -#endif diff --git a/src/config.hpp b/src/config.hpp index 2c0ac2d..83e612a 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -27,9 +27,8 @@ namespace zmq enum { - // Maximal number of OS threads that can own 0MQ sockets - // at the same time. - max_app_threads = 512, + // Maximum number of sockets that can be opened at the same time. + max_sockets = 512, // Number of new messages in message pipe needed to trigger new memory // allocation. Setting this parameter to 256 decreases the impact of diff --git a/src/ctx.cpp b/src/ctx.cpp index 397f692..91157a5 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -24,7 +24,6 @@ #include "ctx.hpp" #include "socket_base.hpp" -#include "app_thread.hpp" #include "io_thread.hpp" #include "platform.hpp" #include "err.hpp" @@ -32,11 +31,12 @@ #if defined ZMQ_HAVE_WINDOWS #include "windows.h" +#else +#include "unistd.h" #endif zmq::ctx_t::ctx_t (uint32_t io_threads_) : - sockets (0), - terminated (false) + no_sockets_notify (false) { #ifdef ZMQ_HAVE_WINDOWS // Intialise Windows sockets. Note that WSAStartup can be called multiple @@ -50,44 +50,32 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : #endif // Initialise the array of signalers. - signalers_count = max_app_threads + io_threads_; - signalers = (signaler_t**) malloc (sizeof (signaler_t*) * signalers_count); - zmq_assert (signalers); - memset (signalers, 0, sizeof (signaler_t*) * signalers_count); + slot_count = max_sockets + io_threads_; + slots = (signaler_t**) malloc (sizeof (signaler_t*) * slot_count); + zmq_assert (slots); // Create I/O thread objects and launch them. for (uint32_t i = 0; i != io_threads_; i++) { io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); zmq_assert (io_thread); io_threads.push_back (io_thread); - signalers [i] = io_thread->get_signaler (); + slots [i] = io_thread->get_signaler (); io_thread->start (); } -} - -int zmq::ctx_t::term () -{ - // First send stop command to application threads so that any - // blocking calls are interrupted. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - app_threads [i].app_thread->stop (); - - // Then mark context as terminated. - term_sync.lock (); - zmq_assert (!terminated); - terminated = true; - bool destroy = (sockets == 0); - term_sync.unlock (); - - // If there are no sockets open, destroy the context immediately. - if (destroy) - delete this; - return 0; + // In the unused part of the slot array, create a list of empty slots. + for (uint32_t i = slot_count - 1; i >= io_threads_; i--) { + empty_slots.push_back (i); + slots [i] = NULL; + } } zmq::ctx_t::~ctx_t () { + // Check that there are no remaining open or zombie sockets. + zmq_assert (sockets.empty ()); + zmq_assert (zombies.empty ()); + // Ask I/O threads to terminate. If stop signal wasn't sent to I/O // thread subsequent invocation of destructor would hang-up. for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) @@ -97,18 +85,10 @@ zmq::ctx_t::~ctx_t () for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) delete io_threads [i]; - // Close all application theads, sockets, io_objects etc. - for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) - delete app_threads [i].app_thread; - - // Deallocate all the orphaned pipes. - while (!pipes.empty ()) - delete *pipes.begin (); - - // Deallocate the array of pointers to signalers. No special work is + // Deallocate the array of slot. No special work is // needed as signalers themselves were deallocated with their - // corresponding (app_/io_) thread objects. - free (signalers); + // corresponding io_thread/socket objects. + free (slots); #ifdef ZMQ_HAVE_WINDOWS // On Windows, uninitialise socket layer. @@ -117,110 +97,113 @@ zmq::ctx_t::~ctx_t () #endif } -zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) +int zmq::ctx_t::term () { - app_threads_sync.lock (); - - // Find whether the calling thread has app_thread_t object associated - // already. At the same time find an unused app_thread_t so that it can - // be used if there's no associated object for the calling thread. - // Check whether thread ID is already assigned. If so, return it. - app_threads_t::size_type unused = app_threads.size (); - app_threads_t::size_type current; - for (current = 0; current != app_threads.size (); current++) { - if (app_threads [current].associated && - thread_t::equal (thread_t::id (), app_threads [current].tid)) - break; - if (!app_threads [current].associated) - unused = current; + // First send stop command to sockets so that any + // blocking calls are interrupted. + for (sockets_t::size_type i = 0; i != sockets.size (); i++) + sockets [i]->stop (); + + // Find out whether there are any open sockets to care about. + // If so, sleep till they are closed. Note that we can use + // no_sockets_notify safely out of the critical section as once set + // its value is never changed again. + slot_sync.lock (); + if (!sockets.empty ()) + no_sockets_notify = true; + slot_sync.unlock (); + if (no_sockets_notify) + no_sockets_sync.wait (); + + // At this point there's only one application thread (this one) remaining. + // We don't even have to synchronise access to data. + zmq_assert (sockets.empty ()); + + // Get rid of remaining zombie sockets. + while (!zombies.empty ()) { + dezombify (); + + // Sleep for 1ms not to end up busy-looping in the case the I/O threads + // are still busy sending data. We can possibly add a grand poll here + // (polling for fds associated with all the zombie sockets), but it's + // probably not worth of implementing it. +#if defined ZMQ_HAVE_WINDOWS + Sleep (1); +#else + usleep (1000); +#endif } - // If no app_thread_t is associated with the calling thread, - // associate it with one of the unused app_thread_t objects. - if (current == app_threads.size ()) { + // Deallocate the resources. + delete this; - // If all the existing app_threads are already used, create one more. - if (unused == app_threads.size ()) { - - // If max_app_threads limit was reached, return error. - if (app_threads.size () == max_app_threads) { - app_threads_sync.unlock (); - errno = EMTHREAD; - return NULL; - } + return 0; +} - // Create the new application thread proxy object. - app_thread_info_t info; - memset (&info, 0, sizeof (info)); - info.associated = false; - info.app_thread = new (std::nothrow) app_thread_t (this, - io_threads.size () + app_threads.size ()); - zmq_assert (info.app_thread); - signalers [io_threads.size () + app_threads.size ()] = - info.app_thread->get_signaler (); - app_threads.push_back (info); - } +zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) +{ + slot_sync.lock (); - // Incidentally, this works both when there is an unused app_thread - // and when a new one is created. - current = unused; + // Free the slots, if possible. + dezombify (); - // Associate the selected app_thread with the OS thread. - app_threads [current].associated = true; - app_threads [current].tid = thread_t::id (); + // If max_sockets limit was reached, return error. + if (empty_slots.empty ()) { + slot_sync.unlock (); + errno = EMFILE; + return NULL; } - app_thread_t *thread = app_threads [current].app_thread; - app_threads_sync.unlock (); + // Choose a slot for the socket. + uint32_t slot = empty_slots.back (); + empty_slots.pop_back (); - socket_base_t *s = thread->create_socket (type_); - if (!s) + // Create the socket and register its signaler. + socket_base_t *s = socket_base_t::create (type_, this, slot); + if (!s) { + empty_slots.push_back (slot); + slot_sync.unlock (); return NULL; + } + sockets.push_back (s); + slots [slot] = s->get_signaler (); - term_sync.lock (); - s |