diff options
author | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-07-29 12:07:54 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-07-29 12:07:54 +0200 |
commit | 4ed70a930202b103e7e80b8dc925e0aaa4622595 (patch) | |
tree | aeed881ce17629f81b7c90f7d675aac8ecf69d4f /src |
initial commit
Diffstat (limited to 'src')
111 files changed, 12680 insertions, 0 deletions
diff --git a/src/Makefile.am b/src/Makefile.am new file mode 100644 index 0000000..bb648ec --- /dev/null +++ b/src/Makefile.am @@ -0,0 +1,120 @@ +lib_LTLIBRARIES = libzs.la + +libzs_la_SOURCES = \ + app_thread.hpp \ + atomic_bitmap.hpp \ + atomic_counter.hpp \ + atomic_ptr.hpp \ + command.hpp \ + config.hpp \ + connecter.hpp \ + data_distributor.hpp \ + decoder.hpp \ + devpoll.hpp \ + dispatcher.hpp \ + dummy_aggregator.hpp \ + dummy_distributor.hpp \ + encoder.hpp \ + epoll.hpp \ + err.hpp \ + fair_aggregator.hpp \ + fd.hpp \ + fd_signaler.hpp \ + io_object.hpp \ + io_thread.hpp \ + ip.hpp \ + i_api.hpp \ + i_demux.hpp \ + i_mux.hpp \ + i_poller.hpp \ + i_poll_events.hpp \ + i_session.hpp \ + i_signaler.hpp \ + i_engine.hpp \ + i_thread.hpp \ + listener.hpp \ + kqueue.hpp \ + load_balancer.hpp \ + msg.hpp \ + mutex.hpp \ + object.hpp \ + p2p.hpp \ + pipe.hpp \ + pipe_reader.hpp \ + pipe_writer.hpp \ + platform.hpp \ + poll.hpp \ + pub.hpp \ + rep.hpp \ + req.hpp \ + safe_object.hpp \ + select.hpp \ + session.hpp \ + session_stub.hpp \ + simple_semaphore.hpp \ + socket_base.hpp \ + sub.hpp \ + stdint.hpp \ + tcp_connecter.hpp \ + tcp_listener.hpp \ + tcp_socket.hpp \ + thread.hpp \ + uuid.hpp \ + windows.hpp \ + wire.hpp \ + ypipe.hpp \ + ypollset.hpp \ + yqueue.hpp \ + zmq_decoder.hpp \ + zmq_encoder.hpp \ + zmq_tcp_engine.hpp \ + app_thread.cpp \ + connecter.cpp \ + data_distributor.cpp \ + devpoll.hpp \ + dispatcher.cpp \ + dummy_aggregator.cpp \ + dummy_distributor.cpp \ + epoll.cpp \ + err.cpp \ + fair_aggregator.cpp \ + fd_signaler.cpp \ + io_object.cpp \ + io_thread.cpp \ + ip.cpp \ + kqueue.cpp \ + listener.cpp \ + load_balancer.cpp \ + object.cpp \ + p2p.cpp \ + pipe.cpp \ + pipe_reader.cpp \ + pipe_writer.cpp \ + poll.cpp \ + pub.cpp \ + rep.cpp \ + req.cpp \ + safe_object.cpp \ + select.cpp \ + session.cpp \ + session_stub.cpp \ + socket_base.cpp \ + sub.cpp \ + tcp_connecter.cpp \ + tcp_listener.cpp \ + tcp_socket.cpp \ + thread.cpp \ + uuid.cpp \ + ypollset.cpp \ + zmq_decoder.cpp \ + zmq_encoder.cpp \ + zmq_tcp_engine.cpp \ + zs.cpp + +libzs_la_LDFLAGS = -version-info 0:0:0 +libzs_la_CXXFLAGS = -Wall -pedantic -Werror @ZS_EXTRA_CXXFLAGS@ + +dist-hook: + -rm $(distdir)/src/platform.hpp + + diff --git a/src/app_thread.cpp b/src/app_thread.cpp new file mode 100644 index 0000000..ca08976 --- /dev/null +++ b/src/app_thread.cpp @@ -0,0 +1,221 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../include/zs.h" + +#if defined ZS_HAVE_WINDOWS +#include "windows.hpp" +#else +#include <unistd.h> +#endif + +#include "app_thread.hpp" +#include "dispatcher.hpp" +#include "err.hpp" +#include "session.hpp" +#include "pipe.hpp" +#include "config.hpp" +#include "i_api.hpp" +#include "dummy_aggregator.hpp" +#include "fair_aggregator.hpp" +#include "dummy_distributor.hpp" +#include "data_distributor.hpp" +#include "load_balancer.hpp" +#include "p2p.hpp" +#include "pub.hpp" +#include "sub.hpp" +#include "req.hpp" +#include "rep.hpp" + +// If the RDTSC is available we use it to prevent excessive +// polling for commands. The nice thing here is that it will work on any +// system with x86 architecture and gcc or MSVC compiler. +#if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\ + (defined _MSC_VER && (defined _M_IX86 || defined _M_X64)) +#define ZS_DELAY_COMMANDS +#endif + +zs::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : + object_t (dispatcher_, thread_slot_), + tid (0), + last_processing_time (0) +{ +} + +void zs::app_thread_t::shutdown () +{ + // Deallocate all the sessions associated with the thread. + while (!sessions.empty ()) + sessions [0]->shutdown (); + + delete this; +} + +zs::app_thread_t::~app_thread_t () +{ +} + +void zs::app_thread_t::attach_session (session_t *session_) +{ + session_->set_index (sessions.size ()); + sessions.push_back (session_); +} + +void zs::app_thread_t::detach_session (session_t *session_) +{ + // O(1) removal of the session from the list. + sessions_t::size_type i = session_->get_index (); + sessions [i] = sessions [sessions.size () - 1]; + sessions [i]->set_index (i); + sessions.pop_back (); +} + +zs::i_poller *zs::app_thread_t::get_poller () +{ + zs_assert (false); +} + +zs::i_signaler *zs::app_thread_t::get_signaler () +{ + return &pollset; +} + +bool zs::app_thread_t::is_current () +{ + return !sessions.empty () && tid == getpid (); +} + +bool zs::app_thread_t::make_current () +{ + // If there are object managed by this slot we cannot assign the slot + // to a different thread. + if (!sessions.empty ()) + return false; + + tid = getpid (); + return true; +} + +zs::i_api *zs::app_thread_t::create_socket (int type_) +{ + i_mux *mux = NULL; + i_demux *demux = NULL; + session_t *session = NULL; + i_api *api = NULL; + + switch (type_) { + case ZS_P2P: + mux = new dummy_aggregator_t; + zs_assert (mux); + demux = new dummy_distributor_t; + zs_assert (demux); + session = new session_t (this, this, mux, demux, true, false); + zs_assert (session); + api = new p2p_t (this, session); + zs_assert (api); + break; + case ZS_PUB: + demux = new data_distributor_t; + zs_assert (demux); + session = new session_t (this, this, mux, demux, true, false); + zs_assert (session); + api = new pub_t (this, session); + zs_assert (api); + break; + case ZS_SUB: + mux = new fair_aggregator_t; + zs_assert (mux); + session = new session_t (this, this, mux, demux, true, false); + zs_assert (session); + api = new sub_t (this, session); + zs_assert (api); + break; + case ZS_REQ: + // TODO + zs_assert (false); + api = new req_t (this, session); + zs_assert (api); + break; + case ZS_REP: + // TODO + zs_assert (false); + api = new rep_t (this, session); + zs_assert (api); + break; + default: + errno = EINVAL; + return NULL; + } + + attach_session (session); + + return api; +} + +void zs::app_thread_t::process_commands (bool block_) +{ + ypollset_t::signals_t signals; + if (block_) + signals = pollset.poll (); + else { + +#if defined ZS_DELAY_COMMANDS + // Optimised version of command processing - it doesn't have to check + // for incoming commands each time. It does so only if ce |