From 4ed70a930202b103e7e80b8dc925e0aaa4622595 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 29 Jul 2009 12:07:54 +0200 Subject: initial commit --- src/Makefile.am | 120 ++++++++++++++++++ src/app_thread.cpp | 221 +++++++++++++++++++++++++++++++++ src/app_thread.hpp | 95 ++++++++++++++ src/atomic.hpp | 310 ++++++++++++++++++++++++++++++++++++++++++++++ src/atomic_bitmap.hpp | 286 ++++++++++++++++++++++++++++++++++++++++++ src/atomic_counter.hpp | 197 +++++++++++++++++++++++++++++ src/atomic_ptr.hpp | 189 ++++++++++++++++++++++++++++ src/command.hpp | 98 +++++++++++++++ src/config.hpp | 71 +++++++++++ src/connecter.cpp | 189 ++++++++++++++++++++++++++++ src/connecter.hpp | 99 +++++++++++++++ src/data_distributor.cpp | 155 +++++++++++++++++++++++ src/data_distributor.hpp | 70 +++++++++++ src/decoder.hpp | 101 +++++++++++++++ src/devpoll.cpp | 224 +++++++++++++++++++++++++++++++++ src/devpoll.hpp | 110 ++++++++++++++++ src/dispatcher.cpp | 266 +++++++++++++++++++++++++++++++++++++++ src/dispatcher.hpp | 170 +++++++++++++++++++++++++ src/dummy_aggregator.cpp | 111 +++++++++++++++++ src/dummy_aggregator.hpp | 73 +++++++++++ src/dummy_distributor.cpp | 85 +++++++++++++ src/dummy_distributor.hpp | 68 ++++++++++ src/encoder.hpp | 108 ++++++++++++++++ src/epoll.cpp | 214 ++++++++++++++++++++++++++++++++ src/epoll.hpp | 107 ++++++++++++++++ src/err.cpp | 146 ++++++++++++++++++++++ src/err.hpp | 90 ++++++++++++++ src/fair_aggregator.cpp | 143 +++++++++++++++++++++ src/fair_aggregator.hpp | 77 ++++++++++++ src/fd.hpp | 44 +++++++ src/fd_signaler.cpp | 278 +++++++++++++++++++++++++++++++++++++++++ src/fd_signaler.hpp | 92 ++++++++++++++ src/i_api.hpp | 39 ++++++ src/i_demux.hpp | 56 +++++++++ src/i_engine.hpp | 53 ++++++++ src/i_mux.hpp | 59 +++++++++ src/i_poll_events.hpp | 45 +++++++ src/i_poller.hpp | 89 +++++++++++++ src/i_session.hpp | 37 ++++++ src/i_signaler.hpp | 38 ++++++ src/i_thread.hpp | 38 ++++++ src/io_object.cpp | 37 ++++++ src/io_object.hpp | 51 ++++++++ src/io_thread.cpp | 177 ++++++++++++++++++++++++++ src/io_thread.hpp | 99 +++++++++++++++ src/ip.cpp | 310 ++++++++++++++++++++++++++++++++++++++++++++++ src/ip.hpp | 47 +++++++ src/kqueue.cpp | 214 ++++++++++++++++++++++++++++++++ src/kqueue.hpp | 112 +++++++++++++++++ src/listener.cpp | 170 +++++++++++++++++++++++++ src/listener.hpp | 110 ++++++++++++++++ src/load_balancer.cpp | 130 +++++++++++++++++++ src/load_balancer.hpp | 73 +++++++++++ src/msg.hpp | 49 ++++++++ src/mutex.hpp | 116 +++++++++++++++++ src/object.cpp | 294 +++++++++++++++++++++++++++++++++++++++++++ src/object.hpp | 105 ++++++++++++++++ src/p2p.cpp | 29 +++++ src/p2p.hpp | 42 +++++++ src/pipe.cpp | 47 +++++++ src/pipe.hpp | 57 +++++++++ src/pipe_reader.cpp | 118 ++++++++++++++++++ src/pipe_reader.hpp | 89 +++++++++++++ src/pipe_writer.cpp | 120 ++++++++++++++++++ src/pipe_writer.hpp | 88 +++++++++++++ src/platform.hpp.in | 210 +++++++++++++++++++++++++++++++ src/poll.cpp | 205 ++++++++++++++++++++++++++++++ src/poll.hpp | 112 +++++++++++++++++ src/pub.cpp | 38 ++++++ src/pub.hpp | 45 +++++++ src/rep.cpp | 29 +++++ src/rep.hpp | 42 +++++++ src/req.cpp | 29 +++++ src/req.hpp | 42 +++++++ src/safe_object.cpp | 76 ++++++++++++ src/safe_object.hpp | 68 ++++++++++ src/select.cpp | 236 +++++++++++++++++++++++++++++++++++ src/select.hpp | 122 ++++++++++++++++++ src/session.cpp | 273 ++++++++++++++++++++++++++++++++++++++++ src/session.hpp | 107 ++++++++++++++++ src/session_stub.cpp | 110 ++++++++++++++++ src/session_stub.hpp | 83 +++++++++++++ src/simple_semaphore.hpp | 188 ++++++++++++++++++++++++++++ src/socket_base.cpp | 267 +++++++++++++++++++++++++++++++++++++++ src/socket_base.hpp | 96 ++++++++++++++ src/stdint.hpp | 70 +++++++++++ src/sub.cpp | 45 +++++++ src/sub.hpp | 46 +++++++ src/tcp_connecter.cpp | 138 +++++++++++++++++++++ src/tcp_connecter.hpp | 65 ++++++++++ src/tcp_listener.cpp | 165 ++++++++++++++++++++++++ src/tcp_listener.hpp | 65 ++++++++++ src/tcp_socket.cpp | 116 +++++++++++++++++ src/tcp_socket.hpp | 70 +++++++++++ src/thread.cpp | 88 +++++++++++++ src/thread.hpp | 77 ++++++++++++ src/uuid.cpp | 136 ++++++++++++++++++++ src/uuid.hpp | 82 ++++++++++++ src/windows.hpp | 56 +++++++++ src/wire.hpp | 98 +++++++++++++++ src/ypipe.hpp | 209 +++++++++++++++++++++++++++++++ src/ypollset.cpp | 56 +++++++++ src/ypollset.hpp | 74 +++++++++++ src/yqueue.hpp | 138 +++++++++++++++++++++ src/zmq_decoder.cpp | 78 ++++++++++++ src/zmq_decoder.hpp | 57 +++++++++ src/zmq_encoder.cpp | 75 +++++++++++ src/zmq_encoder.hpp | 54 ++++++++ src/zmq_tcp_engine.cpp | 185 +++++++++++++++++++++++++++ src/zmq_tcp_engine.hpp | 92 ++++++++++++++ src/zs.cpp | 222 +++++++++++++++++++++++++++++++++ 111 files changed, 12680 insertions(+) create mode 100644 src/Makefile.am create mode 100644 src/app_thread.cpp create mode 100644 src/app_thread.hpp create mode 100644 src/atomic.hpp create mode 100644 src/atomic_bitmap.hpp create mode 100644 src/atomic_counter.hpp create mode 100644 src/atomic_ptr.hpp create mode 100644 src/command.hpp create mode 100644 src/config.hpp create mode 100644 src/connecter.cpp create mode 100644 src/connecter.hpp create mode 100644 src/data_distributor.cpp create mode 100644 src/data_distributor.hpp create mode 100644 src/decoder.hpp create mode 100644 src/devpoll.cpp create mode 100644 src/devpoll.hpp create mode 100644 src/dispatcher.cpp create mode 100644 src/dispatcher.hpp create mode 100644 src/dummy_aggregator.cpp create mode 100644 src/dummy_aggregator.hpp create mode 100644 src/dummy_distributor.cpp create mode 100644 src/dummy_distributor.hpp create mode 100644 src/encoder.hpp create mode 100644 src/epoll.cpp create mode 100644 src/epoll.hpp create mode 100644 src/err.cpp create mode 100644 src/err.hpp create mode 100644 src/fair_aggregator.cpp create mode 100644 src/fair_aggregator.hpp create mode 100644 src/fd.hpp create mode 100644 src/fd_signaler.cpp create mode 100644 src/fd_signaler.hpp create mode 100644 src/i_api.hpp create mode 100644 src/i_demux.hpp create mode 100644 src/i_engine.hpp create mode 100644 src/i_mux.hpp create mode 100644 src/i_poll_events.hpp create mode 100644 src/i_poller.hpp create mode 100644 src/i_session.hpp create mode 100644 src/i_signaler.hpp create mode 100644 src/i_thread.hpp create mode 100644 src/io_object.cpp create mode 100644 src/io_object.hpp create mode 100644 src/io_thread.cpp create mode 100644 src/io_thread.hpp create mode 100644 src/ip.cpp create mode 100644 src/ip.hpp create mode 100644 src/kqueue.cpp create mode 100644 src/kqueue.hpp create mode 100644 src/listener.cpp create mode 100644 src/listener.hpp create mode 100644 src/load_balancer.cpp create mode 100644 src/load_balancer.hpp create mode 100644 src/msg.hpp create mode 100644 src/mutex.hpp create mode 100644 src/object.cpp create mode 100644 src/object.hpp create mode 100644 src/p2p.cpp create mode 100644 src/p2p.hpp create mode 100644 src/pipe.cpp create mode 100644 src/pipe.hpp create mode 100644 src/pipe_reader.cpp create mode 100644 src/pipe_reader.hpp create mode 100644 src/pipe_writer.cpp create mode 100644 src/pipe_writer.hpp create mode 100644 src/platform.hpp.in create mode 100644 src/poll.cpp create mode 100644 src/poll.hpp create mode 100644 src/pub.cpp create mode 100644 src/pub.hpp create mode 100644 src/rep.cpp create mode 100644 src/rep.hpp create mode 100644 src/req.cpp create mode 100644 src/req.hpp create mode 100644 src/safe_object.cpp create mode 100644 src/safe_object.hpp create mode 100644 src/select.cpp create mode 100644 src/select.hpp create mode 100644 src/session.cpp create mode 100644 src/session.hpp create mode 100644 src/session_stub.cpp create mode 100644 src/session_stub.hpp create mode 100644 src/simple_semaphore.hpp create mode 100644 src/socket_base.cpp create mode 100644 src/socket_base.hpp create mode 100644 src/stdint.hpp create mode 100644 src/sub.cpp create mode 100644 src/sub.hpp create mode 100644 src/tcp_connecter.cpp create mode 100644 src/tcp_connecter.hpp create mode 100644 src/tcp_listener.cpp create mode 100644 src/tcp_listener.hpp create mode 100644 src/tcp_socket.cpp create mode 100644 src/tcp_socket.hpp create mode 100644 src/thread.cpp create mode 100644 src/thread.hpp create mode 100644 src/uuid.cpp create mode 100644 src/uuid.hpp create mode 100644 src/windows.hpp create mode 100644 src/wire.hpp create mode 100644 src/ypipe.hpp create mode 100644 src/ypollset.cpp create mode 100644 src/ypollset.hpp create mode 100644 src/yqueue.hpp create mode 100644 src/zmq_decoder.cpp create mode 100644 src/zmq_decoder.hpp create mode 100644 src/zmq_encoder.cpp create mode 100644 src/zmq_encoder.hpp create mode 100644 src/zmq_tcp_engine.cpp create mode 100644 src/zmq_tcp_engine.hpp create mode 100644 src/zs.cpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am new file mode 100644 index 0000000..bb648ec --- /dev/null +++ b/src/Makefile.am @@ -0,0 +1,120 @@ +lib_LTLIBRARIES = libzs.la + +libzs_la_SOURCES = \ + app_thread.hpp \ + atomic_bitmap.hpp \ + atomic_counter.hpp \ + atomic_ptr.hpp \ + command.hpp \ + config.hpp \ + connecter.hpp \ + data_distributor.hpp \ + decoder.hpp \ + devpoll.hpp \ + dispatcher.hpp \ + dummy_aggregator.hpp \ + dummy_distributor.hpp \ + encoder.hpp \ + epoll.hpp \ + err.hpp \ + fair_aggregator.hpp \ + fd.hpp \ + fd_signaler.hpp \ + io_object.hpp \ + io_thread.hpp \ + ip.hpp \ + i_api.hpp \ + i_demux.hpp \ + i_mux.hpp \ + i_poller.hpp \ + i_poll_events.hpp \ + i_session.hpp \ + i_signaler.hpp \ + i_engine.hpp \ + i_thread.hpp \ + listener.hpp \ + kqueue.hpp \ + load_balancer.hpp \ + msg.hpp \ + mutex.hpp \ + object.hpp \ + p2p.hpp \ + pipe.hpp \ + pipe_reader.hpp \ + pipe_writer.hpp \ + platform.hpp \ + poll.hpp \ + pub.hpp \ + rep.hpp \ + req.hpp \ + safe_object.hpp \ + select.hpp \ + session.hpp \ + session_stub.hpp \ + simple_semaphore.hpp \ + socket_base.hpp \ + sub.hpp \ + stdint.hpp \ + tcp_connecter.hpp \ + tcp_listener.hpp \ + tcp_socket.hpp \ + thread.hpp \ + uuid.hpp \ + windows.hpp \ + wire.hpp \ + ypipe.hpp \ + ypollset.hpp \ + yqueue.hpp \ + zmq_decoder.hpp \ + zmq_encoder.hpp \ + zmq_tcp_engine.hpp \ + app_thread.cpp \ + connecter.cpp \ + data_distributor.cpp \ + devpoll.hpp \ + dispatcher.cpp \ + dummy_aggregator.cpp \ + dummy_distributor.cpp \ + epoll.cpp \ + err.cpp \ + fair_aggregator.cpp \ + fd_signaler.cpp \ + io_object.cpp \ + io_thread.cpp \ + ip.cpp \ + kqueue.cpp \ + listener.cpp \ + load_balancer.cpp \ + object.cpp \ + p2p.cpp \ + pipe.cpp \ + pipe_reader.cpp \ + pipe_writer.cpp \ + poll.cpp \ + pub.cpp \ + rep.cpp \ + req.cpp \ + safe_object.cpp \ + select.cpp \ + session.cpp \ + session_stub.cpp \ + socket_base.cpp \ + sub.cpp \ + tcp_connecter.cpp \ + tcp_listener.cpp \ + tcp_socket.cpp \ + thread.cpp \ + uuid.cpp \ + ypollset.cpp \ + zmq_decoder.cpp \ + zmq_encoder.cpp \ + zmq_tcp_engine.cpp \ + zs.cpp + +libzs_la_LDFLAGS = -version-info 0:0:0 +libzs_la_CXXFLAGS = -Wall -pedantic -Werror @ZS_EXTRA_CXXFLAGS@ + +dist-hook: + -rm $(distdir)/src/platform.hpp + + diff --git a/src/app_thread.cpp b/src/app_thread.cpp new file mode 100644 index 0000000..ca08976 --- /dev/null +++ b/src/app_thread.cpp @@ -0,0 +1,221 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../include/zs.h" + +#if defined ZS_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#endif + +#include "app_thread.hpp" +#include "dispatcher.hpp" +#include "err.hpp" +#include "session.hpp" +#include "pipe.hpp" +#include "config.hpp" +#include "i_api.hpp" +#include "dummy_aggregator.hpp" +#include "fair_aggregator.hpp" +#include "dummy_distributor.hpp" +#include "data_distributor.hpp" +#include "load_balancer.hpp" +#include "p2p.hpp" +#include "pub.hpp" +#include "sub.hpp" +#include "req.hpp" +#include "rep.hpp" + +// If the RDTSC is available we use it to prevent excessive +// polling for commands. The nice thing here is that it will work on any +// system with x86 architecture and gcc or MSVC compiler. +#if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\ + (defined _MSC_VER && (defined _M_IX86 || defined _M_X64)) +#define ZS_DELAY_COMMANDS +#endif + +zs::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : + object_t (dispatcher_, thread_slot_), + tid (0), + last_processing_time (0) +{ +} + +void zs::app_thread_t::shutdown () +{ + // Deallocate all the sessions associated with the thread. + while (!sessions.empty ()) + sessions [0]->shutdown (); + + delete this; +} + +zs::app_thread_t::~app_thread_t () +{ +} + +void zs::app_thread_t::attach_session (session_t *session_) +{ + session_->set_index (sessions.size ()); + sessions.push_back (session_); +} + +void zs::app_thread_t::detach_session (session_t *session_) +{ + // O(1) removal of the session from the list. + sessions_t::size_type i = session_->get_index (); + sessions [i] = sessions [sessions.size () - 1]; + sessions [i]->set_index (i); + sessions.pop_back (); +} + +zs::i_poller *zs::app_thread_t::get_poller () +{ + zs_assert (false); +} + +zs::i_signaler *zs::app_thread_t::get_signaler () +{ + return &pollset; +} + +bool zs::app_thread_t::is_current () +{ + return !sessions.empty () && tid == getpid (); +} + +bool zs::app_thread_t::make_current () +{ + // If there are object managed by this slot we cannot assign the slot + // to a different thread. + if (!sessions.empty ()) + return false; + + tid = getpid (); + return true; +} + +zs::i_api *zs::app_thread_t::create_socket (int type_) +{ + i_mux *mux = NULL; + i_demux *demux = NULL; + session_t *session = NULL; + i_api *api = NULL; + + switch (type_) { + case ZS_P2P: + mux = new dummy_aggregator_t; + zs_assert (mux); + demux = new dummy_distributor_t; + zs_assert (demux); + session = new session_t (this, this, mux, demux, true, false); + zs_assert (session); + api = new p2p_t (this, session); + zs_assert (api); + break; + case ZS_PUB: + demux = new data_distributor_t; + zs_assert (demux); + session = new session_t (this, this, mux, demux, true, false); + zs_assert (session); + api = new pub_t (this, session); + zs_assert (api); + break; + case ZS_SUB: + mux = new fair_aggregator_t; + zs_assert (mux); + session = new session_t (this, this, mux, demux, true, false); + zs_assert (session); + api = new sub_t (this, session); + zs_assert (api); + break; + case ZS_REQ: + // TODO + zs_assert (false); + api = new req_t (this, session); + zs_assert (api); + break; + case ZS_REP: + // TODO + zs_assert (false); + api = new rep_t (this, session); + zs_assert (api); + break; + default: + errno = EINVAL; + return NULL; + } + + attach_session (session); + + return api; +} + +void zs::app_thread_t::process_commands (bool block_) +{ + ypollset_t::signals_t signals; + if (block_) + signals = pollset.poll (); + else { + +#if defined ZS_DELAY_COMMANDS + // Optimised version of command processing - it doesn't have to check + // for incoming commands each time. It does so only if certain time + // elapsed since last command processing. Command delay varies + // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU + // etc. The optimisation makes sense only on platforms where getting + // a timestamp is a very cheap operation (tens of nanoseconds). + + // Get timestamp counter. +#if defined __GNUC__ + uint32_t low; + uint32_t high; + __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high)); + uint64_t current_time = (uint64_t) high << 32 | low; +#elif defined _MSC_VER + uint64_t current_time = __rdtsc (); +#else +#error +#endif + + // Check whether certain time have elapsed since last command + // processing. + if (current_time - last_processing_time <= max_command_delay) + return; + last_processing_time = current_time; +#endif + + // Check whether there are any commands pending for this thread. + signals = pollset.check (); + } + + if (signals) { + + // Traverse all the possible sources of commands and process + // all the commands from all of them. + for (int i = 0; i != thread_slot_count (); i++) { + if (signals & (ypollset_t::signals_t (1) << i)) { + command_t cmd; + while (dispatcher->read (i, get_thread_slot (), &cmd)) + cmd.destination->process_command (cmd); + } + } + } +} diff --git a/src/app_thread.hpp b/src/app_thread.hpp new file mode 100644 index 0000000..61e7ff1 --- /dev/null +++ b/src/app_thread.hpp @@ -0,0 +1,95 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZS_APP_THREAD_HPP_INCLUDED__ +#define __ZS_APP_THREAD_HPP_INCLUDED__ + +#include + +#include "i_thread.hpp" +#include "stdint.hpp" +#include "object.hpp" +#include "ypollset.hpp" + +namespace zs +{ + + class app_thread_t : public object_t, public i_thread + { + public: + + app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); + + // To be called when the whole infrastrucure is being closed (zs_term). + void shutdown (); + + // Returns signaler associated with this application thread. + i_signaler *get_signaler (); + + // Create socket engine in this thread. Return false if the calling + // thread doesn't match the thread handled by this app thread object. + struct i_api *create_socket (int type_); + + // Nota bene: The following two functions are accessed from different + // threads. The caller (dispatcher) is responsible for synchronisation + // of accesses. + + // Returns true is current thread is associated with the app thread. + bool is_current (); + + // Tries to associate current thread with the app thread object. + // Returns true is successfull, false otherwise. + bool make_current (); + + // Processes commands sent to this thread (if any). If 'block' is + // set to true, returns only after at least one command was processed. + void process_commands (bool block_); + + // i_thread implementation. + void attach_session (class session_t *session_); + void detach_session (class session_t *session_); + struct i_poller *get_poller (); + + private: + + // Clean-up. + ~app_thread_t (); + + // Thread ID associated with this slot. + // TODO: Virtualise pid_t! + // TODO: Check whether getpid returns unique ID for each thread. + int tid; + + // Vector of all sessionss associated with this app thread. + typedef std::vector sessions_t; + sessions_t sessions; + + // App thread's signaler object. + ypollset_t pollset; + + // Timestamp of when commands were processed the last time. + uint64_t last_processing_time; + + app_thread_t (const app_thread_t&); + void operator = (const app_thread_t&); + }; + +} + +#endif diff --git a/src/atomic.hpp b/src/atomic.hpp new file mode 100644 index 0000000..e24b719 --- /dev/null +++ b/src/atomic.hpp @@ -0,0 +1,310 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZS_ATOMIC_HPP_INCLUDED__ +#define __ZS_ATOMIC_HPP_INCLUDED__ + +#include "stdint.hpp" + +#if defined ZS_FORCE_MUTEXES +#define ZS_ATOMIC_MUTEX +#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ +#define ZS_ATOMIC_X86 +#elif defined ZMQ_HAVE_WINDOWS +#define ZS_ATOMIC_WINDOWS +#elif defined ZMQ_HAVE_SOLARIS +#define ZS_ATOMIC_SOLARIS +#else +#define ZS_ATOMIC_MUTEX +#endif + +namespace zs +{ + + // Atomic assignement. + inline void atomic_uint32_set (volatile uint32_t *p_, uint32_t value_) + { + *p_ = value_; + // StoreLoad memory barrier should go here on platforms with + // memory models that require it. + } + + // Atomic retrieval of an integer. + inline uint32_t atomic_uint32_get (volatile uint32_t *p_) + { + // StoreLoad memory barrier should go here on platforms with + // memory models that require it. + return *p_; + } + + // Atomic addition. Returns the old value. + inline uint32_t atomic_uint32_add (volatile uint32_t *p_, uint32_t delta_) + { +#if defined ZS_ATOMIC_WINDOWS + return InterlockedExchangeAdd ((LONG*) &value, increment_); +#elif defined ZS_ATOMIC_SOLARIS + return atomic_add_32_nv (&value, increment_) - delta_; +#elif defined ZS_ATOMIC_X86 + uint32_t old; + __asm__ volatile ( + "lock; xadd %0, %1\n\t" + : "=r" (old), "=m" (*p_) + : "0" (delta_), "m" (*p_) + : "cc", "memory"); + return old; +#else +#error // TODO: + sync.lock (); + uint32_t old = *p_; + *p_ += delta_; + sync.unlock (); +#endif + } + + // Atomic subtraction. Returns the old value. + inline uint32_t atomic_uint32_sub (volatile uint32_t *p_, uint32_t delta_) + { +#if defined ZS_ATOMIC_WINDOWS + LONG delta = - ((LONG) delta_); + return InterlockedExchangeAdd ((LONG*) &value, delta); +#elif defined ZS_ATOMIC_SOLARIS + int32_t delta = - ((int32_t) delta_); + return atomic_add_32_nv (&value, delta) + delta_; +#elif defined ZS_ATOMIC_X86 + uint32_t old = -delta_; + __asm__ volatile ("lock; xaddl %0,%1" + : "=r" (old), "=m" (*p_) + : "0" (old), "m" (*p_) + : "cc"); + return old; +#else +#error // TODO: + sync.lock (); + uint32_t old = *p_; + *p_ -= delta_; + sync.unlock (); + return old; +#endif + } + + // Atomic assignement. + template + inline void atomic_ptr_set (volatile T **p_, T *value_) + { + *p_ = value_; + // StoreLoad memory barrier should go here on platforms with + // memory models that require it. + } + + // Perform atomic 'exchange pointers' operation. Old value is returned. + template + inline void *atomic_ptr_xchg (volatile T **p_, T *value_) + { +#if defined ZS_ATOMIC_WINDOWS + return InterlockedExchangePointer (p_, value_); +#elif defined ZS_ATOMIC_SOLARIS + return atomic_swap_ptr (p_, value_); +#elif defined ZS_ATOMIC_X86 + void *old; + __asm__ volatile ( + "lock; xchg %0, %2" + : "=r" (old), "=m" (*p_) + : "m" (*p_), "0" (value_)); + return old; +#else +#error // TODO: + sync.lock (); + void *old = *p_; + *p_ = value_; + sync.unlock (); + return old; +#endif + } + + // Perform atomic 'compare and swap' operation on the pointer. + // The pointer is compared to 'cmp' argument and if they are + // equal, its value is set to 'value'. Old value of the pointer + // is returned. + template + inline void *atomic_ptr_cas (volatile T **p_, T *cmp_, T *value_) + { +#if defined ZS_ATOMIC_WINDOWS + return InterlockedCompareExchangePointer (p_, value_, cmp_); +#elif defined ZS_ATOMIC_SOLARIS + return atomic_cas_ptr (p_, cmp_, value_); +#elif defined ZS_ATOMIC_X86 + void *old; + __asm__ volatile ( + "lock; cmpxchg %2, %3" + : "=a" (old), "=m" (*p_) + : "r" (value_), "m" (*p_), "0" (cmp_) + : "cc"); + return old; +#else +#error // TODO: + sync.lock (); + void *old = *p_; + if (old == cmp_) + *p_ = value_; + sync.unlock (); + return old; +#endif + } + +#if defined ZS_ATOMIC_X86 && defined __x86_64__ + typedef uint64_t atomic_bitmap_t; +#else + typedef uint32_t atomic_bitmap_t; +#endif + + // Atomic assignement. + inline void atomic_bitmap_set (volatile atomic_bitmap_t *p_, + atomic_bitmap_t value_) + { + *p_ = value_; + // StoreLoad memory barrier should go here on platforms with + // memory models that require it. + } + + // Bit-test-set-and-reset. Sets one bit of the value and resets + // another one. Returns the original value of the reset bit. + inline bool atomic_bitmap_btsr (volatile atomic_bitmap_t *p_, + int set_index_, int reset_index_) + { +#if defined ZS_ATOMIC_WINDOWS + while (true) { + atomic_bitmap_t oldval = *p_; + atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) << + set_index_)) & ~(integer_t (1) << reset_index_); + if (InterlockedCompareExchange ((volatile LONG*) p_, newval, + oldval) == (LONG) oldval) + return (oldval & (atomic_bitmap_t (1) << reset_index_)) ? + true : false; + } +#elif defined ZS_ATOMIC_SOLARIS + while (true) { + atomic_bitmap_t oldval = *p_; + atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) << + set_index_)) & ~(integer_t (1) << reset_index_); + if (atomic_cas_32 (p_, oldval, newval) == oldval) + return (oldval & (atomic_bitmap_t (1) << reset_index_)) ? + true : false; + } +#elif defined ZS_ATOMIC_X86 + atomic_bitmap_t oldval, dummy; + __asm__ volatile ( + "mov %0, %1\n\t" + "1:" + "mov %1, %2\n\t" + "bts %3, %2\n\t" + "btr %4, %2\n\t" + "lock cmpxchg %2, %0\n\t" + "jnz 1b\n\t" + : "+m" (*p_), "=&a" (oldval), "=&r" (dummy) + : "r" (atomic_bitmap_t (set_index_)), + "r" (atomic_bitmap_t (reset_index_)) + : "cc"); + return (bool) (oldval & (atomic_bitmap_t (1) << reset_index_)); +#else +#error // TODO: + sync.lock (); + atomic_bitmap_t oldval = *p_; + *p_ = (oldval | (atomic_bitmap_t (1) << set_index_)) & + ~(atomic_bitmap_t (1) << reset_index_); + sync.unlock (); + return (oldval & (atomic_bitmap_t (1) << reset_index_)) ? true : false; +#endif + } + + // Sets value to newval. Returns the original value. + inline atomic_bitmap_t atomic_bitmap_xchg (volatile atomic_bitmap_t *p_, + atomic_bitmap_t newval_) + { +#if defined ZS_ATOMIC_WINDOWS + return InterlockedExchange ((volatile LONG*) p_, newval_); +#elif defined ZS_ATOMIC_SOLARIS + return atomic_swap_32 (p_, newval_); +#elif defined ZS_ATOMIC_X86 + atomic_bitmap_t oldval = newval_; + __asm__ volatile ( + "lock; xchg %0, %1" + : "=r" (oldval) + : "m" (*p_), "0" (oldval) + : "memory"); + return oldval; +#else +#error // TODO: + sync.lock (); + atomic_bitmap_t oldval = *p_; + *p_ = newval_; + sync.unlock (); +#endif + } + + // izte is "if-zero-then-else" atomic operation - if the value is zero + // it substitutes it by 'thenval' else it rewrites it by 'elseval'. + // Original value of the integer is returned from this function. + inline atomic_bitmap_t atomic_bitmap_izte (volatile atomic_bitmap_t *p_, + atomic_bitmap_t thenval_, atomic_bitmap_t elseval_) + { +#if defined ZS_ATOMIC_WINDOWS + while (true) { + atomic_bitmap_t oldval = *p_; + atomic_bitmap_t newval = (oldval ? elseval_ : thenval_); + if (InterlockedCompareExchange ((volatile LONG*) p_, newval, + oldval) == (LONG) oldval) + return oldval; + } +#elif defined ZS_ATOMIC_SOLARIS + while (true) { + atomic_bitmap_t oldval = *p_; + atomic_bitmap_t newval = (oldval ? elseval_ : thenval_); + if (atomic_cas_32 (p_, oldval, newval) == oldval) + return oldval; + } +#elif defined ZS_ATOMIC_X86 + atomic_bitmap_t oldval; + atomic_bitmap_t dummy; + __asm__ volatile ( + "mov %0, %1\n\t" + "1:" + "mov %3, %2\n\t" + "test %1, %1\n\t" + "jz 2f\n\t" + "mov %4, %2\n\t" + "2:" + "lock cmpxchg %2, %0\n\t" + "jnz 1b\n\t" + : "+m" (*p_), "=&a" (oldval), "=&r" (dummy) + : "r" (thenval_), "r" (elseval_) + : "cc"); + return oldval; +#else +#error // TODO: + sync.lock (); + atomic_bitmap_t oldval = *p_; + *p_ = oldval ? elseval_ : thenval_; + sync.unlock (); + return oldval; +#endif + } + +} + +#endif diff --git a/src/atomic_bitmap.hpp b/src/atomic_bitmap.hpp new file mode 100644 index 0000000..a5440de --- /dev/null +++ b/src/atomic_bitmap.hpp @@ -0,0 +1,286 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZS_ATOMIC_BITMAP_HPP_INCLUDED__ +#define __ZS_ATOMIC_BITMAP_HPP_INCLUDED__ + +#include "stdint.hpp" +#include "platform.hpp" + +// These are the conditions to choose between different implementations +// of atomic_bitmap. + +#if defined ZS_FORCE_MUTEXES +#define ZS_ATOMIC_BITMAP_MUTEX +#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ +#define ZS_ATOMIC_BITMAP_X86 +#elif 0 && defined __sparc__ && defined __GNUC__ +#define ZS_ATOMIC_BITMAP_SPARC +#elif defined ZS_HAVE_WINDOWS +#define ZS_ATOMIC_BITMAP_WINDOWS +#elif defined ZS_HAVE_SOLARIS +#define ZS_ATOMIC_BITMAP_SOLARIS +#else +#define ZS_ATOMIC_BITMAP_MUTEX +#endif + +#if defined ZS_ATOMIC_BITMAP_MUTEX +#include "mutex.hpp" +#elif defined ZS_ATOMIC_BITMAP_WINDOWS +#include "windows.hpp" +#elif defined ZS_ATOMIC_BITMAP_SOLARIS +#include +#endif + +namespace zs +{ + + // This class encapuslates several bitwise atomic operations on unsigned + // integer. Selection of operations is driven specifically by the needs + // of ypollset implementation. + + class atomic_bitmap_t + { + public: + +#if (defined ZMQ_ATOMIC_BITMAP_X86 || defined ZMQ_FORCE_MUTEXES) \ + && defined __x86_64__ + typedef uint64_t bitmap_t; +#else + typedef uint32_t bitmap_t; +#endif + + inline atomic_bitmap_t (bitmap_t value_ = 0) : + value (value_) + { + } + + inline ~atomic_bitmap_t () + { + } + + // Bit-test-set-and-reset. Sets one bit of the value and resets + // another one. Returns the original value of the reset bit. + inline bool btsr (int set_index_, int reset_index_) + { +#if defined ZS_ATOMIC_BITMAP_WINDOWS + while (true) { + bitmap_t oldval = value; + bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) & + ~(bitmap_t (1) << reset_index_); + if (InterlockedCompareExchange ((volatile LONG*) &value, newval, + oldval) == (LONG) oldval) + return (oldval & (bitmap_t (1) << reset_index_)) ? + true : false; + } +#elif defined ZS_ATOMIC_BITMAP_SOLARIS + while (true) { + bitmap_t oldval = value; + bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) & + ~(bitmap_t (1) << reset_index_); + if (atomic_cas_32 (&value, oldval, newval) == oldval) + return (oldval & (bitmap_t (1) << reset_index_)) ? + true : false; + } +#elif defined ZS_ATOMIC_BITMAP_X86 + bitmap_t oldval, dummy; + __asm__ volatile ( + "mov %0, %1\n\t" + "1:" + "mov %1, %2\n\t" + "bts %3, %2\n\t" + "btr %4, %2\n\t" + "lock cmpxchg %2, %0\n\t" + "jnz 1b\n\t" + : "+m" (value), "=&a" (oldval), "=&r" (dummy) + : "r" (bitmap_t(set_index_)), "r" (bitmap_t(reset_index_)) + : "cc"); + return (bool) (oldval & (bitmap_t(1) << reset_index_)); +#elif defined ZS_ATOMIC_BITMAP_SPARC + volatile bitmap_t* valptr = &value; + bitmap_t set_val = bitmap_t(1) << set_index_; + bitmap_t reset_val = ~(bitmap_t(1) << reset_index_); + bitmap_t tmp; + bitmap_t oldval; + __asm__ volatile( + "ld [%5], %2 \n\t" + "1: \n\t" + "or %2, %0, %3 \n\t" + "and %3, %1, %3 \n\t" + "cas [%5], %2, %3 \n\t" + "cmp %2, %3 \n\t" + "bne,a,pn %%icc, 1b \n\t" + "mov %3, %2 \n\t" + : "+r" (set_val), "+r" (reset_val), "=&r" (tmp), + "=&r" (oldval), "+m" (*valptr) + : "r" (valptr) + : "cc"); + return oldval; +#elif defined ZS_ATOMIC_BITMAP_MUTEX + sync.lock (); + bitmap_t oldval = value; + value = (oldval | (bitmap_t (1) << set_index_)) & + ~(bitmap_t (1) << reset_index_); + sync.unlock (); + return (oldval & (bitmap_t (1) << reset_index_)) ? true : false; +#else +#error +#endif + } + + // Sets value to newval. Returns the original value. + inline bitmap_t xchg (bitmap_t newval_) + { + bitmap_t oldval; +#if defined ZS_ATOMIC_BITMAP_WINDOWS + oldval = InterlockedExchange ((volatile LONG*) &value, newval_); +#elif defined ZS_ATOMIC_BITMAP_SOLARIS + oldval = atomic_swap_32 (&value, newval_); +#elif defined ZS_ATOMIC_BITMAP_X86 + oldval = newval_; + __asm__ volatile ( + "lock; xchg %0, %1" + : "=r" (oldval) + : "m" (value), "0" (oldval) + : "memory"); +#elif defined ZS_ATOMIC_BITMAP_SPARC + oldval = value; + volatile bitmap_t* ptrin = &value; + bitmap_t tmp; + bitmap_t prev; + __asm__ __volatile__( + "ld [%4], %1\n\t" + "1:\n\t" + "mov %0, %2\n\t" + "cas [%4], %1, %2\n\t" + "cmp %1, %2\n\t" + "bne,a,pn %%icc, 1b\n\t" + "mov %2, %1\n\t" + : "+r" (newval_), "=&r" (tmp), "=&r" (prev), "+m" (*ptrin) + : "r" (ptrin) + : "cc"); + return prev; +#elif defined ZS_ATOMIC_BITMAP_MUTEX + sync.lock (); + oldval = value; + value = newval_; + sync.unlock (); +#else +#error +#endif + return oldval; + } + + // izte is "if-zero-then-else" atomic operation - if the value is zero + // it substitutes it by 'thenval' else it rewrites it by 'elseval'. + // Original value of the integer is returned from this function. + inline bitmap_t izte (bitmap_t thenval_, + bitmap_t elseval_) + { +#if defined ZS_ATOMIC_BITMAP_WINDOWS + while (true) { + bitmap_t oldval = value; + bitmap_t newval = oldval == 0 ? thenval_ : elseval_; + if (InterlockedCompareExchange ((volatile LONG*) &value, + newval, oldval) == (LONG) oldval) + return oldval; + } +#elif defined ZS_ATOMIC_BITMAP_SOLARIS + while (true) { + bitmap_t oldval = value; + bitmap_t newval = oldval == 0 ? thenval_ : elseval_; + if (atomic_cas_32 (&value, oldval, newval) == oldval) + return oldval; + } +#elif defined ZS_ATOMIC_BITMAP_X86 + bitmap_t oldval; + bitmap_t dummy; + __asm__ volatile ( + "mov %0, %1\n\t" + "1:" + "mov %3, %2\n\t" + "test %1, %1\n\t" + "jz 2f\n\t" + "mov %4, %2\n\t" + "2:" + "lock cmpxchg %2, %0\n\t" + "jnz 1b\n\t" + : "+m" (value), "=&a" (oldval), "=&r" (dummy) + : "r" (thenval_), "r" (elseval_) + : "cc"); + return oldval; +#elif defined ZS_ATOMIC_BITMAP_SPARC + volatile bitmap_t* ptrin = &value; + bitmap_t tmp; + bitmap_t prev; + __asm__ __volatile__( + "ld [%3], %0 \n\t" + "mov 0, %1 \n\t" + "cas [%3], %1, %4 \n\t" + "cmp %0, %1 \n\t" + "be,a,pn %%icc,1f \n\t" + "ld [%3], %0 \n\t" + "cas [%3], %0, %5 \n\t" + "1: \n\t" + : "=&r" (tmp), "=&r" (prev), "+m" (*ptrin) + : "r" (ptrin), "r" (thenval_), "r" (elseval_) + : "cc"); + return prev; +#elif defined ZS_ATOMIC_BITMAP_MUTEX + sync.lock (); + bitmap_t oldval = value; + value = oldval ? elseval_ : thenval_; + sync.unlock (); + return oldval; +#else +#error +#endif + } + + private: + + volatile bitmap_t value; +#if defined ZS_ATOMIC_BITMAP_MUTEX + mutex_t sync; +#endif + + atomic_bitmap_t (const atomic_bitmap_t&); + void operator = (const atomic_bitmap_t&); + }; + +} + +// Remove macros local to this file. +#if defined ZS_ATOMIC_BITMAP_WINDOWS +#undef ZS_ATOMIC_BITMAP_WINDOWS +#endif +#if defined ZS_ATOMIC_BITMAP_SOLARIS +#undef ZS_ATOMIC_BITMAP_SOLARIS +#endif +#if defined ZS_ATOMIC_BITMAP_X86 +#undef ZS_ATOMIC_BITMAP_X86 +#endif +#if defined ZS_ATOMIC_BITMAP_SPARC +#undef ZS_ATOMIC_BITMAP_SPARC +#endif +#if defined ZS_ATOMIC_BITMAP_MUTEX +#undef ZS_ATOMIC_BITMAP_MUTEX +#endif + +#endif diff --git a/src/atomic_counter.hpp b/src/atomic_counter.hpp new file mode 100644 index 0000000..0873fdd --- /dev/null +++ b/src/atomic_counter.hpp @@ -0,0 +1,197 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + + +#ifndef __ZS_ATOMIC_COUNTER_HPP_INCLUDED__ +#define __ZS_ATOMIC_COUNTER_HPP_INCLUDED__ + +#include "stdint.hpp" +#include "platform.hpp" + +#if defined ZS_FORCE_MUTEXES +#define ZS_ATOMIC_COUNTER_MUTEX +#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ +#define ZS_ATOMIC_COUNTER_X86 +#elif 0 && defined __sparc__ && defined __GNUC__ +#define ZS_ATOMIC_COUNTER_SPARC +#elif defined ZS_HAVE_WINDOWS +#define ZS_ATOMIC_COUNTER_WINDOWS +#elif defined ZS_HAVE_SOLARIS +#define ZS_ATOMIC_COUNTER_SOLARIS +#else +#define ZS_ATOMIC_COUNTER_MUTEX +#endif + +#if defined ZS_ATOMIC_COUNTER_MUTEX +#include "mutex.hpp" +#elif defined ZS_ATOMIC_COUNTER_WINDOWS +#include "windows.hpp" +#elif defined ZS_ATOMIC_COUNTER_SOLARIS +#include +#endif + +namespace zs +{ + + // This class represents an integer that can be incremented/decremented + // in atomic fashion. + + class atomic_counter_t + { + public: + + typedef uint32_t integer_t; + + inline atomic_counter_t (integer_t value_ = 0) : + value (value_) + { + } + + inline ~atomic_counter_t () + { + } + + // Set counter value (not thread-safe). + inline void set (integer_t value_) + { + value = value_; + } + + // Atomic addition. Returns the old value. + inline integer_t add (integer_t increment_) + { + integer_t old_value; + +#if defined ZS_ATOMIC_COUNTER_WINDOWS + old_value = InterlockedExchangeAdd ((LONG*) &value, increment_); +#elif defined ZS_ATOMIC_COUNTER_SOLARIS + integer_t new_value = atomic_add_32_nv (&value, increment_); + old_value = new_value - increment_; +#elif defined ZS_ATOMIC_COUNTER_X86 + __asm__ volatile ( + "lock; xadd %0, %1 \n\t" + : "=r" (old_value), "=m" (value) + : "0" (increment_), "m" (value) + : "cc", "memory"); +#elif defined ZS_ATOMIC_COUNTER_SPARC + integer_t tmp; + __asm__ volatile ( + "ld [%4], %0 \n\t" + "1: \n\t" + "add %0, %3, %1 \n\t" + "cas [%4], %0, %1 \n\t" + "cmp %0, %1 \n\t" + "bne,a,pn %%icc, 1b \n\t" + "mov %1, %0 \n\t" + : "=&r" (old_value), "=&r" (tmp), "=m" (value) + : "r" (increment_), "r" (&value) + : "cc", "memory"); +#elif defined ZS_ATOMIC_COUNTER_MUTEX + sync.lock (); + old_value = value; + value += increment_; + sync.unlock (); +#else +#error +#endif + return old_value; + } + + // Atomic subtraction. Returns false if the counter drops to zero. + inline bool sub (integer_t decrement) + { +#if defined ZS_ATOMIC_COUNTER_WINDOWS + LONG delta = - ((LONG) decrement); + integer_t old = InterlockedExchangeAdd ((LONG*) &value, delta); + return old - decrement != 0; +#elif defined ZS_ATOMIC_COUNTER_SOLARIS + int32_t delta = - ((int32_t) decrement); + integer_t nv = atomic_add_32_nv (&value, delta); + return nv != 0; +#elif defined ZS_ATOMIC_COUNTER_X86 + integer_t oldval = -decrement; + volatile integer_t *val = &value; + __asm__ volatile ("lock; xaddl %0,%1" + : "=r" (oldval), "=m" (*val) + : "0" (oldval), "m" (*val) + : "cc"); + return oldval != decrement; +#elif defined ZS_ATOMIC_COUNTER_SPARC + volatile integer_t *val = &value; + integer_t tmp; + integer_t result; + __asm__ volatile( + "ld [%4], %1\n\t" + "1:\n\t" + "add %1, %0, %2\n\t" + "cas [%4], %1, %2\n\t" + "cmp %1, %2\n\t" + "bne,a,pn %%icc, 1b\n\t" + "mov %2, %1\n\t" + : "+r" (-decrement), "=&r" (tmp), "=&r" (result), "+m" (*val) + : "r" (val) + : "cc"); + return result <= decrement; +#elif defined ZS_ATOMIC_COUNTER_MUTEX + sync.lock (); + value -= decrement; + bool result = value ? true : false; + sync.unlock (); + return result; +#else +#error +#endif + } + + inline integer_t get () + { + return value; + } + + private: + + volatile integer_t value; +#if defined ZS_ATOMIC_COUNTER_MUTEX + mutex_t sync; +#endif + + atomic_counter_t (const atomic_counter_t&); + void operator = (const atomic_counter_t&); + }; + +} + +// Remove macros local to this file. +#if defined ZS_ATOMIC_COUNTER_WINDOWS +#undef ZS_ATOMIC_COUNTER_WINDOWS +#endif +#if defined ZS_ATOMIC_COUNTER_SOLARIS +#undef ZS_ATOMIC_COUNTER_SOLARIS +#endif +#if defined ZS_ATOMIC_COUNTER_X86 +#undef ZS_ATOMIC_COUNTER_X86 +#endif +#if defined ZS_ATOMIC_COUNTER_SPARC +#undef ZS_ATOMIC_COUNTER_SPARC +#endif +#if defined ZS_ATOMIC_COUNTER_MUTEX +#undef ZS_ATOMIC_COUNTER_MUTEX +#endif + +#endif diff --git a/src/atomic_ptr.hpp b/src/atomic_ptr.hpp new file mode 100644 index 0000000..fcc4e73 --- /dev/null +++ b/src/atomic_ptr.hpp @@ -0,0 +1,189 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + + +#ifndef __ZS_ATOMIC_PTR_HPP_INCLUDED__ +#define __ZS_ATOMIC_PTR_HPP_INCLUDED__ + +#include "platform.hpp" + +#if defined ZS_FORCE_MUTEXES +#define ZS_ATOMIC_PTR_MUTEX +#elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ +#define ZS_ATOMIC_PTR_X86 +#elif 0 && defined __sparc__ && defined __GNUC__ +#define ZS_ATOMIC_PTR_SPARC +#elif defined ZS_HAVE_WINDOWS +#define ZS_ATOMIC_PTR_WINDOWS +#elif defined ZS_HAVE_SOLARIS +#define ZS_ATOMIC_PTR_SOLARIS +#else +#define ZS_ATOMIC_PTR_MUTEX +#endif + +#if defined ZS_ATOMIC_PTR_MUTEX +#include "mutex.hpp" +#elif defined ZS_ATOMIC_PTR_WINDOWS +#include "windows.hpp" +#elif defined ZS_ATOMIC_PTR_SOLARIS +#include +#endif + +namespace zs +{ + + // This class encapsulates several atomic operations on pointers. + + template class atomic_ptr_t + { + public: + + // Initialise atomic pointer + inline atomic_ptr_t () + { + ptr = NULL; + } + + // Destroy atomic pointer + inline ~atomic_ptr_t () + { + } + + // Set value of atomic pointer in a non-threadsafe way + // Use this function only when you are sure that at most one + // thread is accessing the pointer at the moment. + inline void set (T *ptr_) + { + this->ptr = ptr_; + } + + // Perform atomic 'exchange pointers' operation. Pointer is set + // to the 'val' value. Old value is returned. + inline T *xchg (T *val_) + { +#if defined ZS_ATOMIC_PTR_WINDOWS + return (T*) InterlockedExchangePointer (&ptr, val_); +#elif defined ZS_ATOMIC_PTR_SOLARIS + return (T*) atomic_swap_ptr (&ptr, val_); +#elif defined ZS_ATOMIC_PTR_X86 + T *old; + __asm__ volatile ( + "lock; xchg %0, %2" + : "=r" (old), "=m" (ptr) + : "m" (ptr), "0" (val_)); + return old; +#elif defined ZS_ATOMIC_PTR_SPARC + T* newptr = val_; + volatile T** ptrin = &ptr; + T* tmp; + T* prev; + __asm__ __volatile__( + "ld [%4], %1\n\t" + "1:\n\t" + "mov %0, %2\n\t" + "cas [%4], %1, %2\n\t" + "cmp %1, %2\n\t" + "bne,a,pn %%icc, 1b\n\t" + "mov %2, %1\n\t" + : "+r" (newptr), "=&r" (tmp), "=&r" (prev), "+m" (*ptrin) + : "r" (ptrin) + : "cc"); + return prev; +#elif defined ZS_ATOMIC_PTR_MUTEX + sync.lock (); + T *old = (T*) ptr; + ptr = val_; + sync.unlock (); + return old; +#else +#error +#endif + } + + // Perform atomic 'compare and swap' operation on the pointer. + // The pointer is compared to 'cmp' argument and if they are + // equal, its value is set to 'val'. Old value of the pointer + // is returned. + inline T *cas (T *cmp_, T *val_) + { +#if defined ZS_ATOMIC_PTR_WINDOWS + return (T*) InterlockedCompareExchangePointer ( + (volatile PVOID*) &ptr, val_, cmp_); +#elif defined ZS_ATOMIC_PTR_SOLARIS + return (T*) atomic_cas_ptr (&ptr, cmp_, val_); +#elif defined ZS_ATOMIC_PTR_X86 + T *old; + __asm__ volatile ( + "lock; cmpxchg %2, %3" + : "=a" (old), "=m" (ptr) + : "r" (val_), "m" (ptr), "0" (cmp_) + : "cc"); + return old; +#elif defined ZS_ATOMIC_PTR_SPARC + volatile T** ptrin = &ptr; + volatile T* prev = ptr; + __asm__ __volatile__( + "cas [%3], %1, %2\n\t" + : "+m" (*ptrin) + : "r" (cmp_), "r" (val_), "r" (ptrin) + : "cc"); + return prev; +#elif defined ZS_ATOMIC_PTR_MUTEX + sync.lock (); + T *old = (T*) ptr; + if (ptr == cmp_) + ptr = val_; + sync.unlock (); + return old; +#else +#error +#endif + } + + private: + + volatile T *ptr; +#if defined ZS_ATOMIC_PTR_MUTEX + mutex_t sync; +#endif + + atomic_ptr_t (const atomic_ptr_t&); + void operator = (const atomic_ptr_t&); + }; + +} + +// Remove macros local to this file. +#if defined ZS_ATOMIC_PTR_WINDOWS +#undef ZS_ATOMIC_PTR_WINDOWS +#endif +#if defined ZS_ATOMIC_PTR_SOLARIS +#undef ZS_ATOMIC_PTR_SOLARIS +#endif +#if defined ZS_ATOMIC_PTR_X86 +#undef ZS_ATOMIC_PTR_X86 +#endif +#if defined ZS_ATOMIC_PTR_SPARC +#undef ZS_ATOMIC_PTR_SPARC +#endif +#if defined ZS_ATOMIC_PTR_MUTEX +#undef ZS_ATOMIC_PTR_MUTEX +#endif + +#endif diff --git a/src/command.hpp b/src/command.hpp new file mode 100644 index 0000000..0553137 --- /dev/null +++ b/src/command.hpp @@ -0,0 +1,98 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZS_COMMAND_HPP_INCLUDED__ +#define __ZS_COMMAND_HPP_INCLUDED__ + +#include "stdint.hpp" + +namespace zs +{ + + // This structure defines the commands that can be sent between threads. + + struct command_t + { + // Object to process the command. + class object_t *destination; + + enum type_t + { + stop, + bind, + head, + tail, + reg, + reg_and_bind, + unreg, + engine, + terminate, + terminate_ack + } type; + + union { + + struct { + } stop; + + struct { + class pipe_reader_t *reader; + class session_t *peer; + } bind; + + struct { + uint64_t bytes; + } tail; + + struct { + uint64_t bytes; + } head; + + struct { + class simple_semaphore_t *smph; + } reg; + + struct { + class session_t *peer; + bool flow_in; + bool flow_out; + } reg_and_bind; + + struct { + class simple_semaphore_t *smph; + } unreg; + + // TODO: Engine object won't be deallocated on terminal shutdown + // while the command is still on the fly! + struct { + class i_engine *engine; + } engine; + + struct { + } terminate; + + struct { + } terminate_ack; + + } args; + }; + +} + +#endif diff --git a/src/config.hpp b/src/config.hpp new file mode 100644 index 0000000..a0569ea --- /dev/null +++ b/src/config.hpp @@ -0,0 +1,71 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZS_CONFIG_HPP_INCLUDED__ +#define __ZS_CONFIG_HPP_INCLUDED__ + +namespace zs +{ + + // Compile-time settings. + + enum + { + + // Number of new messages in message pipe needed to trigger new memory + // allocation. Setting this parameter to 256 decreases the impact of + // memory allocation by approximately 99.6% + message_pipe_granularity = 256, + + // Number of new commands in command pipe needed to trigger new memory + // allocation. The number should be kept low to decrease the memory + // footprint of dispatcher. + command_pipe_granularity = 4, + + // Maximal batching size for engines with receiving functionality. + // So, if there are 10 messages that fit into the batch size, all of + // them may be read by a single 'recv' system call, thus avoiding + // unnecessary network stack traversals. + in_batch_size = 8192, + + // Maximal batching size for engines with sending functionality. + // So, if there are 10 messages that fit into the batch size, all of + // them may be written by a single 'send' system call, thus avoiding + // unnecessary network stack traversals. + out_batch_size = 8192, + + // Maximum number of events the I/O thread can process in one go. + max_io_events = 256, + + // Maximal wait time for a timer (milliseconds). + max_timer_period = 100, + + // Maximal delay to process command in API thread (in CPU ticks). + // 3,000,000 ticks equals to 1 - 2 milliseconds on current CPUs. + max_command_delay = 3000000, + + // Maximal number of non-accepted connections that can be held by + // TCP listener object. + tcp_connection_backlog = 10 + + }; + +} + +#endif diff --git a/src/connecter.cpp b/src/connecter.cpp new file mode 100644 index 0000000..a21dde3 --- /dev/null +++ b/src/connecter.cpp @@ -0,0 +1,189 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "connecter.hpp" +#include "io_thread.hpp" +#include "session.hpp" +#include "err.hpp" +#include "simple_semaphore.hpp" +#include "zmq_tcp_engine.hpp" + +zs::connecter_t::connecter_t (io_thread_t *thread_, const char *addr_, + session_t *session_) : + io_object_t (thread_), + state (idle), + poller (NULL), + session (session_), + addr (addr_), + identity ("abcde"), + engine (NULL) +{ +} + +void zs::connecter_t::terminate () +{ + delete this; +} + +void zs::connecter_t::shutdown () +{ + delete this; +} + +zs::connecter_t::~connecter_t () +{ +} + +void zs::connecter_t::process_reg (simple_semaphore_t *smph_) +{ + // Fet poller pointer for further use. + zs_assert (!poller); + poller = get_poller (); + + // Ask the session to register itself with the I/O thread. Note that + // the session is living in the same I/O thread, thus this results + // in a synchronous call. + session->inc_seqnum (); + send_reg (session, NULL); + + // Unlock the application thread that created the connecter. + if (smph_) + smph_->post (); + + // Manually trigger timer event which will launch asynchronous connect. + state = waiting; + timer_event (); +} + +void zs::connecter_t::process_unreg (simple_semaphore_t *smph_) +{ + // Unregister connecter/engine from the poller. + zs_assert (poller); + if (state == connecting) + poller->rm_fd (handle); + else if (state == waiting) + poller->cancel_timer (this); + else if (state == sending) + engine->terminate (); + + // Unlock the application thread closing the connecter. + if (smph_) + smph_->post (); +} + +void zs::connecter_t::in_event () +{ + // Error occured in asynchronous connect. Retry to connect after a while. + if (state == connecting) { + fd_t fd = tcp_connecter.connect (); + zs_assert (fd == retired_fd); + poller->rm_fd (handle); + poller->add_timer (this); + state = waiting; + return; + } + + zs_assert (false); +} + +void zs::connecter_t::out_event () +{ + if (state == connecting) { + + fd_t fd = tcp_connecter.connect (); + if (fd == retired_fd) { + poller->rm_fd (handle); + poller->add_timer (this); + state = waiting; + return; + } + + poller->rm_fd (handle); + engine = new zmq_tcp_engine_t (fd); + zs_assert (engine); + engine->attach (poller, this); + state = sending; + return; + } + + zs_assert (false); +} + +void zs::connecter_t::timer_event () +{ + zs_assert (state == waiting); + + // Initiate async connect and start polling for its completion. If async + // connect fails instantly, try to reconnect after a while. + int rc = tcp_connecter.open (addr.c_str ()); + if (rc == 0) { + state = connecting; + in_event (); + } + else if (rc == 1) { + handle = poller->add_fd (tcp_connecter.get_fd (), this); + poller->set_pollout (handle); + state = connecting; + } + else { + poller->add_timer (this); + state = waiting; + } +} + +void zs::connecter_t::set_engine (struct i_engine *engine_) +{ + engine = engine_; +} + +bool zs::connecter_t::read (zs_msg *msg_) +{ + zs_assert (state == sending); + + // Deallocate old content of the message just in case. + zs_msg_close (msg_); + + // Send the identity. + zs_msg_init_size (msg_, identity.size ()); + memcpy (zs_msg_data (msg_), identity.c_str (), identity.size ()); + + // Ask engine to unregister from the poller. + i_engine *e = engine; + engine->detach (); + + // Attach the engine to the session. (Note that this is actually + // a synchronous call. + session->inc_seqnum (); + send_engine (session, e); + + state = idle; + + return true; +} + +bool zs::connecter_t::write (struct zs_msg *msg_) +{ + // No incoming messages are accepted till identity is sent. + return false; +} + +void zs::connecter_t::flush () +{ + // No incoming messages are accepted till identity is sent. +} diff --git a/src/connecter.hpp b/src/connecter.hpp new file mode 100644 index 0000000..91dbf17 --- /dev/null +++ b/src/connecter.hpp @@ -0,0 +1,99 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the F