From 4ed70a930202b103e7e80b8dc925e0aaa4622595 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 29 Jul 2009 12:07:54 +0200 Subject: initial commit --- src/app_thread.cpp | 221 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 221 insertions(+) create mode 100644 src/app_thread.cpp (limited to 'src/app_thread.cpp') diff --git a/src/app_thread.cpp b/src/app_thread.cpp new file mode 100644 index 0000000..ca08976 --- /dev/null +++ b/src/app_thread.cpp @@ -0,0 +1,221 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../include/zs.h" + +#if defined ZS_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#endif + +#include "app_thread.hpp" +#include "dispatcher.hpp" +#include "err.hpp" +#include "session.hpp" +#include "pipe.hpp" +#include "config.hpp" +#include "i_api.hpp" +#include "dummy_aggregator.hpp" +#include "fair_aggregator.hpp" +#include "dummy_distributor.hpp" +#include "data_distributor.hpp" +#include "load_balancer.hpp" +#include "p2p.hpp" +#include "pub.hpp" +#include "sub.hpp" +#include "req.hpp" +#include "rep.hpp" + +// If the RDTSC is available we use it to prevent excessive +// polling for commands. The nice thing here is that it will work on any +// system with x86 architecture and gcc or MSVC compiler. +#if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\ + (defined _MSC_VER && (defined _M_IX86 || defined _M_X64)) +#define ZS_DELAY_COMMANDS +#endif + +zs::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : + object_t (dispatcher_, thread_slot_), + tid (0), + last_processing_time (0) +{ +} + +void zs::app_thread_t::shutdown () +{ + // Deallocate all the sessions associated with the thread. + while (!sessions.empty ()) + sessions [0]->shutdown (); + + delete this; +} + +zs::app_thread_t::~app_thread_t () +{ +} + +void zs::app_thread_t::attach_session (session_t *session_) +{ + session_->set_index (sessions.size ()); + sessions.push_back (session_); +} + +void zs::app_thread_t::detach_session (session_t *session_) +{ + // O(1) removal of the session from the list. + sessions_t::size_type i = session_->get_index (); + sessions [i] = sessions [sessions.size () - 1]; + sessions [i]->set_index (i); + sessions.pop_back (); +} + +zs::i_poller *zs::app_thread_t::get_poller () +{ + zs_assert (false); +} + +zs::i_signaler *zs::app_thread_t::get_signaler () +{ + return &pollset; +} + +bool zs::app_thread_t::is_current () +{ + return !sessions.empty () && tid == getpid (); +} + +bool zs::app_thread_t::make_current () +{ + // If there are object managed by this slot we cannot assign the slot + // to a different thread. + if (!sessions.empty ()) + return false; + + tid = getpid (); + return true; +} + +zs::i_api *zs::app_thread_t::create_socket (int type_) +{ + i_mux *mux = NULL; + i_demux *demux = NULL; + session_t *session = NULL; + i_api *api = NULL; + + switch (type_) { + case ZS_P2P: + mux = new dummy_aggregator_t; + zs_assert (mux); + demux = new dummy_distributor_t; + zs_assert (demux); + session = new session_t (this, this, mux, demux, true, false); + zs_assert (session); + api = new p2p_t (this, session); + zs_assert (api); + break; + case ZS_PUB: + demux = new data_distributor_t; + zs_assert (demux); + session = new session_t (this, this, mux, demux, true, false); + zs_assert (session); + api = new pub_t (this, session); + zs_assert (api); + break; + case ZS_SUB: + mux = new fair_aggregator_t; + zs_assert (mux); + session = new session_t (this, this, mux, demux, true, false); + zs_assert (session); + api = new sub_t (this, session); + zs_assert (api); + break; + case ZS_REQ: + // TODO + zs_assert (false); + api = new req_t (this, session); + zs_assert (api); + break; + case ZS_REP: + // TODO + zs_assert (false); + api = new rep_t (this, session); + zs_assert (api); + break; + default: + errno = EINVAL; + return NULL; + } + + attach_session (session); + + return api; +} + +void zs::app_thread_t::process_commands (bool block_) +{ + ypollset_t::signals_t signals; + if (block_) + signals = pollset.poll (); + else { + +#if defined ZS_DELAY_COMMANDS + // Optimised version of command processing - it doesn't have to check + // for incoming commands each time. It does so only if certain time + // elapsed since last command processing. Command delay varies + // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU + // etc. The optimisation makes sense only on platforms where getting + // a timestamp is a very cheap operation (tens of nanoseconds). + + // Get timestamp counter. +#if defined __GNUC__ + uint32_t low; + uint32_t high; + __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high)); + uint64_t current_time = (uint64_t) high << 32 | low; +#elif defined _MSC_VER + uint64_t current_time = __rdtsc (); +#else +#error +#endif + + // Check whether certain time have elapsed since last command + // processing. + if (current_time - last_processing_time <= max_command_delay) + return; + last_processing_time = current_time; +#endif + + // Check whether there are any commands pending for this thread. + signals = pollset.check (); + } + + if (signals) { + + // Traverse all the possible sources of commands and process + // all the commands from all of them. + for (int i = 0; i != thread_slot_count (); i++) { + if (signals & (ypollset_t::signals_t (1) << i)) { + command_t cmd; + while (dispatcher->read (i, get_thread_slot (), &cmd)) + cmd.destination->process_command (cmd); + } + } + } +} -- cgit v1.2.3