From fba28c7c0cddd7c54fe45b38fc38ac6fe5a48438 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 11 Apr 2010 16:36:27 +0200 Subject: issue 1 - Change zmq_term semantics --- src/app_thread.cpp | 25 ++++++++++++++++++++++--- src/app_thread.hpp | 18 ++++++++++++++++-- src/dispatcher.cpp | 7 +++++++ src/socket_base.cpp | 50 +++++++++++++++++++++++++++++++++++++++++++------- src/zmq.cpp | 2 ++ 5 files changed, 90 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/app_thread.cpp b/src/app_thread.cpp index 61e49e5..6141c06 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -62,7 +62,8 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_, int flags_) : object_t (dispatcher_, thread_slot_), - last_processing_time (0) + last_processing_time (0), + terminated (false) { if (flags_ & ZMQ_POLL) { signaler = new (std::nothrow) fd_signaler_t; @@ -81,12 +82,17 @@ zmq::app_thread_t::~app_thread_t () delete signaler; } +void zmq::app_thread_t::stop () +{ + send_stop (); +} + zmq::i_signaler *zmq::app_thread_t::get_signaler () { return signaler; } -void zmq::app_thread_t::process_commands (bool block_, bool throttle_) +bool zmq::app_thread_t::process_commands (bool block_, bool throttle_) { uint64_t signals; if (block_) @@ -117,7 +123,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_) // Check whether certain time have elapsed since last command // processing. if (current_time - last_processing_time <= max_command_delay) - return; + return !terminated; last_processing_time = current_time; } #endif @@ -138,6 +144,8 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_) } } } + + return !terminated; } zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) @@ -190,3 +198,14 @@ void zmq::app_thread_t::remove_socket (socket_base_t *socket_) if (sockets.empty ()) dispatcher->no_sockets (this); } + +void zmq::app_thread_t::process_stop () +{ + terminated = true; +} + +bool zmq::app_thread_t::is_terminated () +{ + return terminated; +} + diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 9bd5641..1c2f47a 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -38,14 +38,19 @@ namespace zmq ~app_thread_t (); + // Interrupt blocking call if the app thread is stuck in one. + // This function is is called from a different thread! + void stop (); + // Returns signaler associated with this application thread. struct i_signaler *get_signaler (); // Processes commands sent to this thread (if any). If 'block' is // set to true, returns only after at least one command was processed. // If throttle argument is true, commands are processed at most once - // in a predefined time period. - void process_commands (bool block_, bool throttle_); + // in a predefined time period. The function returns false is the + // associated context was terminated, true otherwise. + bool process_commands (bool block_, bool throttle_); // Create a socket of a specified type. class socket_base_t *create_socket (int type_); @@ -53,8 +58,14 @@ namespace zmq // Unregister the socket from the app_thread (called by socket itself). void remove_socket (class socket_base_t *socket_); + // Returns true is the associated context was already terminated. + bool is_terminated (); + private: + // Command handlers. + void process_stop (); + // All the sockets created from this application thread. typedef yarray_t sockets_t; sockets_t sockets; @@ -65,6 +76,9 @@ namespace zmq // Timestamp of when commands were processed the last time. uint64_t last_processing_time; + // If true, 'stop' command was already received. + bool terminated; + app_thread_t (const app_thread_t&); void operator = (const app_thread_t&); }; diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 93c5db3..a1154de 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -86,12 +86,19 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_, int zmq::dispatcher_t::term () { + // First send stop command to application threads so that any + // blocking calls are interrupted. + for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) + app_threads [i].app_thread->stop (); + + // Then mark context as terminated. term_sync.lock (); zmq_assert (!terminated); terminated = true; bool destroy = (sockets == 0); term_sync.unlock (); + // If there are no sockets open, destroy the context immediately. if (destroy) delete this; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 3b74359..b186683 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -37,6 +37,7 @@ #include "platform.hpp" #include "pgm_sender.hpp" #include "pgm_receiver.hpp" +#include "likely.hpp" zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), @@ -58,6 +59,11 @@ zmq::socket_base_t::~socket_base_t () int zmq::socket_base_t::setsockopt (int option_, const void *optval_, size_t optvallen_) { + if (unlikely (app_thread->is_terminated ())) { + errno = ETERM; + return -1; + } + // First, check whether specific socket type overloads the option. int rc = xsetsockopt (option_, optval_, optvallen_); if (rc == 0 || errno != EINVAL) @@ -71,6 +77,11 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, int zmq::socket_base_t::getsockopt (int option_, void *optval_, size_t *optvallen_) { + if (unlikely (app_thread->is_terminated ())) { + errno = ETERM; + return -1; + } + if (option_ == ZMQ_RCVMORE) { if (*optvallen_ < sizeof (int64_t)) { errno = EINVAL; @@ -86,6 +97,11 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, int zmq::socket_base_t::bind (const char *addr_) { + if (unlikely (app_thread->is_terminated ())) { + errno = ETERM; + return -1; + } + // Parse addr_ string. std::string addr_type; std::string addr_args; @@ -141,6 +157,11 @@ int zmq::socket_base_t::bind (const char *addr_) int zmq::socket_base_t::connect (const char *addr_) { + if (unlikely (app_thread->is_terminated ())) { + errno = ETERM; + return -1; + } + // Parse addr_ string. std::string addr_type; std::string addr_args; @@ -328,13 +349,16 @@ int zmq::socket_base_t::connect (const char *addr_) int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) { + // Process pending commands, if any. + if (unlikely (!app_thread->process_commands (false, true))) { + errno = ETERM; + return -1; + } + // At this point we impose the MORE flag on the message. if (flags_ & ZMQ_SNDMORE) msg_->flags |= ZMQ_MSG_MORE; - // Process pending commands, if any. - app_thread->process_commands (false, true); - // Try to send the message. int rc = xsend (msg_, flags_); if (rc == 0) @@ -350,7 +374,10 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) while (rc != 0) { if (errno != EAGAIN) return -1; - app_thread->process_commands (true, false); + if (unlikely (!app_thread->process_commands (true, false))) { + errno = ETERM; + return -1; + } rc = xsend (msg_, flags_); } return 0; @@ -371,7 +398,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) // described above) from the one used by 'send'. This is because counting // ticks is more efficient than doing rdtsc all the time. if (++ticks == inbound_poll_rate) { - app_thread->process_commands (false, false); + if (unlikely (!app_thread->process_commands (false, false))) { + errno = ETERM; + return -1; + } ticks = 0; } @@ -392,7 +422,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) if (flags_ & ZMQ_NOBLOCK) { if (errno != EAGAIN) return -1; - app_thread->process_commands (false, false); + if (unlikely (!app_thread->process_commands (false, false))) { + errno = ETERM; + return -1; + } ticks = 0; return xrecv (msg_, flags_); } @@ -402,7 +435,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) while (rc != 0) { if (errno != EAGAIN) return -1; - app_thread->process_commands (true, false); + if (unlikely (!app_thread->process_commands (true, false))) { + errno = ETERM; + return -1; + } rc = xrecv (msg_, flags_); ticks = 0; } diff --git a/src/zmq.cpp b/src/zmq.cpp index b8d18f7..0681cbb 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -88,6 +88,8 @@ const char *zmq_strerror (int errnum_) return "Operation cannot be accomplished in current state"; case ENOCOMPATPROTO: return "The protocol is not compatible with the socket type"; + case ETERM: + return "Context was terminated"; default: #if defined _MSC_VER #pragma warning (push) -- cgit v1.2.3