From cc3755a16f00026af882ed14d122cc8aa6d50e82 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 3 Aug 2009 11:30:13 +0200 Subject: renamed from zs to zmq --- src/Makefile.am | 10 +-- src/app_thread.cpp | 70 +++++++-------- src/app_thread.hpp | 8 +- src/atomic.hpp | 62 ++++++------- src/atomic_bitmap.hpp | 82 ++++++++--------- src/atomic_counter.hpp | 72 +++++++-------- src/atomic_ptr.hpp | 72 +++++++-------- src/command.hpp | 6 +- src/config.hpp | 6 +- src/connecter.cpp | 48 +++++----- src/connecter.hpp | 12 +-- src/data_distributor.cpp | 42 ++++----- src/data_distributor.hpp | 10 +-- src/decoder.hpp | 6 +- src/devpoll.cpp | 38 ++++---- src/devpoll.hpp | 8 +- src/dispatcher.cpp | 52 +++++------ src/dispatcher.hpp | 8 +- src/dummy_aggregator.cpp | 34 +++---- src/dummy_aggregator.hpp | 8 +- src/dummy_distributor.cpp | 28 +++--- src/dummy_distributor.hpp | 8 +- src/encoder.hpp | 6 +- src/epoll.cpp | 36 ++++---- src/epoll.hpp | 8 +- src/err.cpp | 8 +- src/err.hpp | 16 ++-- src/fair_aggregator.cpp | 30 +++---- src/fair_aggregator.hpp | 8 +- src/fd.hpp | 8 +- src/fd_signaler.cpp | 54 +++++------ src/fd_signaler.hpp | 8 +- src/i_api.hpp | 14 +-- src/i_demux.hpp | 11 +-- src/i_engine.hpp | 8 +- src/i_mux.hpp | 11 +-- src/i_poll_events.hpp | 6 +- src/i_poller.hpp | 6 +- src/i_session.hpp | 10 +-- src/i_signaler.hpp | 6 +- src/i_thread.hpp | 6 +- src/io_object.cpp | 6 +- src/io_object.hpp | 6 +- src/io_thread.cpp | 72 +++++++-------- src/io_thread.hpp | 8 +- src/ip.cpp | 36 ++++---- src/ip.hpp | 8 +- src/kqueue.cpp | 40 ++++----- src/kqueue.hpp | 8 +- src/listener.cpp | 42 ++++----- src/listener.hpp | 6 +- src/load_balancer.cpp | 32 +++---- src/load_balancer.hpp | 10 +-- src/msg.hpp | 16 ++-- src/mutex.hpp | 10 +-- src/object.cpp | 88 +++++++++--------- src/object.hpp | 6 +- src/p2p.cpp | 4 +- src/p2p.hpp | 6 +- src/pipe.cpp | 14 +-- src/pipe.hpp | 10 +-- src/pipe_reader.cpp | 26 +++--- src/pipe_reader.hpp | 8 +- src/pipe_writer.cpp | 30 +++---- src/pipe_writer.hpp | 8 +- src/platform.hpp.in | 26 +++--- src/poll.cpp | 40 ++++----- src/poll.hpp | 14 +-- src/pub.cpp | 6 +- src/pub.hpp | 8 +- src/rep.cpp | 4 +- src/rep.hpp | 6 +- src/req.cpp | 4 +- src/req.hpp | 6 +- src/safe_object.cpp | 14 +-- src/safe_object.hpp | 6 +- src/select.cpp | 40 ++++----- src/select.hpp | 10 +-- src/session.cpp | 46 +++++----- src/session.hpp | 10 +-- src/session_stub.cpp | 30 +++---- src/session_stub.hpp | 10 +-- src/simple_semaphore.hpp | 12 +-- src/socket_base.cpp | 56 ++++++------ src/socket_base.hpp | 14 +-- src/stdint.hpp | 6 +- src/sub.cpp | 8 +- src/sub.hpp | 8 +- src/tcp_connecter.cpp | 20 ++--- src/tcp_connecter.hpp | 6 +- src/tcp_listener.cpp | 20 ++--- src/tcp_listener.hpp | 6 +- src/tcp_socket.cpp | 18 ++-- src/tcp_socket.hpp | 6 +- src/thread.cpp | 14 +-- src/thread.hpp | 10 +-- src/uuid.cpp | 40 ++++----- src/uuid.hpp | 18 ++-- src/windows.hpp | 4 +- src/wire.hpp | 6 +- src/ypipe.hpp | 8 +- src/ypollset.cpp | 10 +-- src/ypollset.hpp | 6 +- src/yqueue.hpp | 10 +-- src/zmq.cpp | 223 ++++++++++++++++++++++++++++++++++++++++++++++ src/zmq_decoder.cpp | 25 +++--- src/zmq_decoder.hpp | 10 +-- src/zmq_encoder.cpp | 18 ++-- src/zmq_encoder.hpp | 10 +-- src/zmq_tcp_engine.cpp | 44 ++++----- src/zmq_tcp_engine.hpp | 6 +- src/zs.cpp | 222 --------------------------------------------- 112 files changed, 1299 insertions(+), 1295 deletions(-) create mode 100644 src/zmq.cpp delete mode 100644 src/zs.cpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index bb648ec..e6d09ca 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,6 +1,6 @@ -lib_LTLIBRARIES = libzs.la +lib_LTLIBRARIES = libzmq.la -libzs_la_SOURCES = \ +libzmq_la_SOURCES = \ app_thread.hpp \ atomic_bitmap.hpp \ atomic_counter.hpp \ @@ -109,10 +109,10 @@ libzs_la_SOURCES = \ zmq_decoder.cpp \ zmq_encoder.cpp \ zmq_tcp_engine.cpp \ - zs.cpp + zmq.cpp -libzs_la_LDFLAGS = -version-info 0:0:0 -libzs_la_CXXFLAGS = -Wall -pedantic -Werror @ZS_EXTRA_CXXFLAGS@ +libzmq_la_LDFLAGS = -version-info 0:0:0 +libzmq_la_CXXFLAGS = -Wall -pedantic -Werror @ZMQ_EXTRA_CXXFLAGS@ dist-hook: -rm $(distdir)/src/platform.hpp diff --git a/src/app_thread.cpp b/src/app_thread.cpp index ca08976..2406dbd 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -17,9 +17,9 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" -#if defined ZS_HAVE_WINDOWS +#if defined ZMQ_HAVE_WINDOWS #include "windows.hpp" #else #include @@ -48,17 +48,17 @@ // system with x86 architecture and gcc or MSVC compiler. #if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\ (defined _MSC_VER && (defined _M_IX86 || defined _M_X64)) -#define ZS_DELAY_COMMANDS +#define ZMQ_DELAY_COMMANDS #endif -zs::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : +zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : object_t (dispatcher_, thread_slot_), tid (0), last_processing_time (0) { } -void zs::app_thread_t::shutdown () +void zmq::app_thread_t::shutdown () { // Deallocate all the sessions associated with the thread. while (!sessions.empty ()) @@ -67,17 +67,17 @@ void zs::app_thread_t::shutdown () delete this; } -zs::app_thread_t::~app_thread_t () +zmq::app_thread_t::~app_thread_t () { } -void zs::app_thread_t::attach_session (session_t *session_) +void zmq::app_thread_t::attach_session (session_t *session_) { session_->set_index (sessions.size ()); sessions.push_back (session_); } -void zs::app_thread_t::detach_session (session_t *session_) +void zmq::app_thread_t::detach_session (session_t *session_) { // O(1) removal of the session from the list. sessions_t::size_type i = session_->get_index (); @@ -86,22 +86,22 @@ void zs::app_thread_t::detach_session (session_t *session_) sessions.pop_back (); } -zs::i_poller *zs::app_thread_t::get_poller () +zmq::i_poller *zmq::app_thread_t::get_poller () { - zs_assert (false); + zmq_assert (false); } -zs::i_signaler *zs::app_thread_t::get_signaler () +zmq::i_signaler *zmq::app_thread_t::get_signaler () { return &pollset; } -bool zs::app_thread_t::is_current () +bool zmq::app_thread_t::is_current () { return !sessions.empty () && tid == getpid (); } -bool zs::app_thread_t::make_current () +bool zmq::app_thread_t::make_current () { // If there are object managed by this slot we cannot assign the slot // to a different thread. @@ -112,7 +112,7 @@ bool zs::app_thread_t::make_current () return true; } -zs::i_api *zs::app_thread_t::create_socket (int type_) +zmq::i_api *zmq::app_thread_t::create_socket (int type_) { i_mux *mux = NULL; i_demux *demux = NULL; @@ -120,43 +120,43 @@ zs::i_api *zs::app_thread_t::create_socket (int type_) i_api *api = NULL; switch (type_) { - case ZS_P2P: + case ZMQ_P2P: mux = new dummy_aggregator_t; - zs_assert (mux); + zmq_assert (mux); demux = new dummy_distributor_t; - zs_assert (demux); + zmq_assert (demux); session = new session_t (this, this, mux, demux, true, false); - zs_assert (session); + zmq_assert (session); api = new p2p_t (this, session); - zs_assert (api); + zmq_assert (api); break; - case ZS_PUB: + case ZMQ_PUB: demux = new data_distributor_t; - zs_assert (demux); + zmq_assert (demux); session = new session_t (this, this, mux, demux, true, false); - zs_assert (session); + zmq_assert (session); api = new pub_t (this, session); - zs_assert (api); + zmq_assert (api); break; - case ZS_SUB: + case ZMQ_SUB: mux = new fair_aggregator_t; - zs_assert (mux); + zmq_assert (mux); session = new session_t (this, this, mux, demux, true, false); - zs_assert (session); + zmq_assert (session); api = new sub_t (this, session); - zs_assert (api); + zmq_assert (api); break; - case ZS_REQ: + case ZMQ_REQ: // TODO - zs_assert (false); + zmq_assert (false); api = new req_t (this, session); - zs_assert (api); + zmq_assert (api); break; - case ZS_REP: + case ZMQ_REP: // TODO - zs_assert (false); + zmq_assert (false); api = new rep_t (this, session); - zs_assert (api); + zmq_assert (api); break; default: errno = EINVAL; @@ -168,14 +168,14 @@ zs::i_api *zs::app_thread_t::create_socket (int type_) return api; } -void zs::app_thread_t::process_commands (bool block_) +void zmq::app_thread_t::process_commands (bool block_) { ypollset_t::signals_t signals; if (block_) signals = pollset.poll (); else { -#if defined ZS_DELAY_COMMANDS +#if defined ZMQ_DELAY_COMMANDS // Optimised version of command processing - it doesn't have to check // for incoming commands each time. It does so only if certain time // elapsed since last command processing. Command delay varies diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 61e7ff1..ffe5596 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZS_APP_THREAD_HPP_INCLUDED__ -#define __ZS_APP_THREAD_HPP_INCLUDED__ +#ifndef __ZMQ_APP_THREAD_HPP_INCLUDED__ +#define __ZMQ_APP_THREAD_HPP_INCLUDED__ #include @@ -27,7 +27,7 @@ #include "object.hpp" #include "ypollset.hpp" -namespace zs +namespace zmq { class app_thread_t : public object_t, public i_thread @@ -36,7 +36,7 @@ namespace zs app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); - // To be called when the whole infrastrucure is being closed (zs_term). + // To be called when the whole infrastrucure is being closed. void shutdown (); // Returns signaler associated with this application thread. diff --git a/src/atomic.hpp b/src/atomic.hpp index e24b719..e581593 100644 --- a/src/atomic.hpp +++ b/src/atomic.hpp @@ -17,24 +17,24 @@ along with this program. If not, see . */ -#ifndef __ZS_ATOMIC_HPP_INCLUDED__ -#define __ZS_ATOMIC_HPP_INCLUDED__ +#ifndef __ZMQ_ATOMIC_HPP_INCLUDED__ +#define __ZMQ_ATOMIC_HPP_INCLUDED__ #include "stdint.hpp" -#if defined ZS_FORCE_MUTEXES -#define ZS_ATOMIC_MUTEX +#if defined ZMQ_FORCE_MUTEXES +#define ZMQ_ATOMIC_MUTEX #elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ -#define ZS_ATOMIC_X86 +#define ZMQ_ATOMIC_X86 #elif defined ZMQ_HAVE_WINDOWS -#define ZS_ATOMIC_WINDOWS +#define ZMQ_ATOMIC_WINDOWS #elif defined ZMQ_HAVE_SOLARIS -#define ZS_ATOMIC_SOLARIS +#define ZMQ_ATOMIC_SOLARIS #else -#define ZS_ATOMIC_MUTEX +#define ZMQ_ATOMIC_MUTEX #endif -namespace zs +namespace zmq { // Atomic assignement. @@ -56,11 +56,11 @@ namespace zs // Atomic addition. Returns the old value. inline uint32_t atomic_uint32_add (volatile uint32_t *p_, uint32_t delta_) { -#if defined ZS_ATOMIC_WINDOWS +#if defined ZMQ_ATOMIC_WINDOWS return InterlockedExchangeAdd ((LONG*) &value, increment_); -#elif defined ZS_ATOMIC_SOLARIS +#elif defined ZMQ_ATOMIC_SOLARIS return atomic_add_32_nv (&value, increment_) - delta_; -#elif defined ZS_ATOMIC_X86 +#elif defined ZMQ_ATOMIC_X86 uint32_t old; __asm__ volatile ( "lock; xadd %0, %1\n\t" @@ -80,13 +80,13 @@ namespace zs // Atomic subtraction. Returns the old value. inline uint32_t atomic_uint32_sub (volatile uint32_t *p_, uint32_t delta_) { -#if defined ZS_ATOMIC_WINDOWS +#if defined ZMQ_ATOMIC_WINDOWS LONG delta = - ((LONG) delta_); return InterlockedExchangeAdd ((LONG*) &value, delta); -#elif defined ZS_ATOMIC_SOLARIS +#elif defined ZMQ_ATOMIC_SOLARIS int32_t delta = - ((int32_t) delta_); return atomic_add_32_nv (&value, delta) + delta_; -#elif defined ZS_ATOMIC_X86 +#elif defined ZMQ_ATOMIC_X86 uint32_t old = -delta_; __asm__ volatile ("lock; xaddl %0,%1" : "=r" (old), "=m" (*p_) @@ -116,11 +116,11 @@ namespace zs template inline void *atomic_ptr_xchg (volatile T **p_, T *value_) { -#if defined ZS_ATOMIC_WINDOWS +#if defined ZMQ_ATOMIC_WINDOWS return InterlockedExchangePointer (p_, value_); -#elif defined ZS_ATOMIC_SOLARIS +#elif defined ZMQ_ATOMIC_SOLARIS return atomic_swap_ptr (p_, value_); -#elif defined ZS_ATOMIC_X86 +#elif defined ZMQ_ATOMIC_X86 void *old; __asm__ volatile ( "lock; xchg %0, %2" @@ -144,11 +144,11 @@ namespace zs template inline void *atomic_ptr_cas (volatile T **p_, T *cmp_, T *value_) { -#if defined ZS_ATOMIC_WINDOWS +#if defined ZMQ_ATOMIC_WINDOWS return InterlockedCompareExchangePointer (p_, value_, cmp_); -#elif defined ZS_ATOMIC_SOLARIS +#elif defined ZMQ_ATOMIC_SOLARIS return atomic_cas_ptr (p_, cmp_, value_); -#elif defined ZS_ATOMIC_X86 +#elif defined ZMQ_ATOMIC_X86 void *old; __asm__ volatile ( "lock; cmpxchg %2, %3" @@ -167,7 +167,7 @@ namespace zs #endif } -#if defined ZS_ATOMIC_X86 && defined __x86_64__ +#if defined ZMQ_ATOMIC_X86 && defined __x86_64__ typedef uint64_t atomic_bitmap_t; #else typedef uint32_t atomic_bitmap_t; @@ -187,7 +187,7 @@ namespace zs inline bool atomic_bitmap_btsr (volatile atomic_bitmap_t *p_, int set_index_, int reset_index_) { -#if defined ZS_ATOMIC_WINDOWS +#if defined ZMQ_ATOMIC_WINDOWS while (true) { atomic_bitmap_t oldval = *p_; atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) << @@ -197,7 +197,7 @@ namespace zs return (oldval & (atomic_bitmap_t (1) << reset_index_)) ? true : false; } -#elif defined ZS_ATOMIC_SOLARIS +#elif defined ZMQ_ATOMIC_SOLARIS while (true) { atomic_bitmap_t oldval = *p_; atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) << @@ -206,7 +206,7 @@ namespace zs return (oldval & (atomic_bitmap_t (1) << reset_index_)) ? true : false; } -#elif defined ZS_ATOMIC_X86 +#elif defined ZMQ_ATOMIC_X86 atomic_bitmap_t oldval, dummy; __asm__ volatile ( "mov %0, %1\n\t" @@ -236,11 +236,11 @@ namespace zs inline atomic_bitmap_t atomic_bitmap_xchg (volatile atomic_bitmap_t *p_, atomic_bitmap_t newval_) { -#if defined ZS_ATOMIC_WINDOWS +#if defined ZMQ_ATOMIC_WINDOWS return InterlockedExchange ((volatile LONG*) p_, newval_); -#elif defined ZS_ATOMIC_SOLARIS +#elif defined ZMQ_ATOMIC_SOLARIS return atomic_swap_32 (p_, newval_); -#elif defined ZS_ATOMIC_X86 +#elif defined ZMQ_ATOMIC_X86 atomic_bitmap_t oldval = newval_; __asm__ volatile ( "lock; xchg %0, %1" @@ -263,7 +263,7 @@ namespace zs inline atomic_bitmap_t atomic_bitmap_izte (volatile atomic_bitmap_t *p_, atomic_bitmap_t thenval_, atomic_bitmap_t elseval_) { -#if defined ZS_ATOMIC_WINDOWS +#if defined ZMQ_ATOMIC_WINDOWS while (true) { atomic_bitmap_t oldval = *p_; atomic_bitmap_t newval = (oldval ? elseval_ : thenval_); @@ -271,14 +271,14 @@ namespace zs oldval) == (LONG) oldval) return oldval; } -#elif defined ZS_ATOMIC_SOLARIS +#elif defined ZMQ_ATOMIC_SOLARIS while (true) { atomic_bitmap_t oldval = *p_; atomic_bitmap_t newval = (oldval ? elseval_ : thenval_); if (atomic_cas_32 (p_, oldval, newval) == oldval) return oldval; } -#elif defined ZS_ATOMIC_X86 +#elif defined ZMQ_ATOMIC_X86 atomic_bitmap_t oldval; atomic_bitmap_t dummy; __asm__ volatile ( diff --git a/src/atomic_bitmap.hpp b/src/atomic_bitmap.hpp index a5440de..6b7218e 100644 --- a/src/atomic_bitmap.hpp +++ b/src/atomic_bitmap.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZS_ATOMIC_BITMAP_HPP_INCLUDED__ -#define __ZS_ATOMIC_BITMAP_HPP_INCLUDED__ +#ifndef __ZMQ_ATOMIC_BITMAP_HPP_INCLUDED__ +#define __ZMQ_ATOMIC_BITMAP_HPP_INCLUDED__ #include "stdint.hpp" #include "platform.hpp" @@ -26,29 +26,29 @@ // These are the conditions to choose between different implementations // of atomic_bitmap. -#if defined ZS_FORCE_MUTEXES -#define ZS_ATOMIC_BITMAP_MUTEX +#if defined ZMQ_FORCE_MUTEXES +#define ZMQ_ATOMIC_BITMAP_MUTEX #elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ -#define ZS_ATOMIC_BITMAP_X86 +#define ZMQ_ATOMIC_BITMAP_X86 #elif 0 && defined __sparc__ && defined __GNUC__ -#define ZS_ATOMIC_BITMAP_SPARC -#elif defined ZS_HAVE_WINDOWS -#define ZS_ATOMIC_BITMAP_WINDOWS -#elif defined ZS_HAVE_SOLARIS -#define ZS_ATOMIC_BITMAP_SOLARIS +#define ZMQ_ATOMIC_BITMAP_SPARC +#elif defined ZMQ_HAVE_WINDOWS +#define ZMQ_ATOMIC_BITMAP_WINDOWS +#elif defined ZMQ_HAVE_SOLARIS +#define ZMQ_ATOMIC_BITMAP_SOLARIS #else -#define ZS_ATOMIC_BITMAP_MUTEX +#define ZMQ_ATOMIC_BITMAP_MUTEX #endif -#if defined ZS_ATOMIC_BITMAP_MUTEX +#if defined ZMQ_ATOMIC_BITMAP_MUTEX #include "mutex.hpp" -#elif defined ZS_ATOMIC_BITMAP_WINDOWS +#elif defined ZMQ_ATOMIC_BITMAP_WINDOWS #include "windows.hpp" -#elif defined ZS_ATOMIC_BITMAP_SOLARIS +#elif defined ZMQ_ATOMIC_BITMAP_SOLARIS #include #endif -namespace zs +namespace zmq { // This class encapuslates several bitwise atomic operations on unsigned @@ -79,7 +79,7 @@ namespace zs // another one. Returns the original value of the reset bit. inline bool btsr (int set_index_, int reset_index_) { -#if defined ZS_ATOMIC_BITMAP_WINDOWS +#if defined ZMQ_ATOMIC_BITMAP_WINDOWS while (true) { bitmap_t oldval = value; bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) & @@ -89,7 +89,7 @@ namespace zs return (oldval & (bitmap_t (1) << reset_index_)) ? true : false; } -#elif defined ZS_ATOMIC_BITMAP_SOLARIS +#elif defined ZMQ_ATOMIC_BITMAP_SOLARIS while (true) { bitmap_t oldval = value; bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) & @@ -98,7 +98,7 @@ namespace zs return (oldval & (bitmap_t (1) << reset_index_)) ? true : false; } -#elif defined ZS_ATOMIC_BITMAP_X86 +#elif defined ZMQ_ATOMIC_BITMAP_X86 bitmap_t oldval, dummy; __asm__ volatile ( "mov %0, %1\n\t" @@ -112,7 +112,7 @@ namespace zs : "r" (bitmap_t(set_index_)), "r" (bitmap_t(reset_index_)) : "cc"); return (bool) (oldval & (bitmap_t(1) << reset_index_)); -#elif defined ZS_ATOMIC_BITMAP_SPARC +#elif defined ZMQ_ATOMIC_BITMAP_SPARC volatile bitmap_t* valptr = &value; bitmap_t set_val = bitmap_t(1) << set_index_; bitmap_t reset_val = ~(bitmap_t(1) << reset_index_); @@ -132,7 +132,7 @@ namespace zs : "r" (valptr) : "cc"); return oldval; -#elif defined ZS_ATOMIC_BITMAP_MUTEX +#elif defined ZMQ_ATOMIC_BITMAP_MUTEX sync.lock (); bitmap_t oldval = value; value = (oldval | (bitmap_t (1) << set_index_)) & @@ -148,18 +148,18 @@ namespace zs inline bitmap_t xchg (bitmap_t newval_) { bitmap_t oldval; -#if defined ZS_ATOMIC_BITMAP_WINDOWS +#if defined ZMQ_ATOMIC_BITMAP_WINDOWS oldval = InterlockedExchange ((volatile LONG*) &value, newval_); -#elif defined ZS_ATOMIC_BITMAP_SOLARIS +#elif defined ZMQ_ATOMIC_BITMAP_SOLARIS oldval = atomic_swap_32 (&value, newval_); -#elif defined ZS_ATOMIC_BITMAP_X86 +#elif defined ZMQ_ATOMIC_BITMAP_X86 oldval = newval_; __asm__ volatile ( "lock; xchg %0, %1" : "=r" (oldval) : "m" (value), "0" (oldval) : "memory"); -#elif defined ZS_ATOMIC_BITMAP_SPARC +#elif defined ZMQ_ATOMIC_BITMAP_SPARC oldval = value; volatile bitmap_t* ptrin = &value; bitmap_t tmp; @@ -176,7 +176,7 @@ namespace zs : "r" (ptrin) : "cc"); return prev; -#elif defined ZS_ATOMIC_BITMAP_MUTEX +#elif defined ZMQ_ATOMIC_BITMAP_MUTEX sync.lock (); oldval = value; value = newval_; @@ -193,7 +193,7 @@ namespace zs inline bitmap_t izte (bitmap_t thenval_, bitmap_t elseval_) { -#if defined ZS_ATOMIC_BITMAP_WINDOWS +#if defined ZMQ_ATOMIC_BITMAP_WINDOWS while (true) { bitmap_t oldval = value; bitmap_t newval = oldval == 0 ? thenval_ : elseval_; @@ -201,14 +201,14 @@ namespace zs newval, oldval) == (LONG) oldval) return oldval; } -#elif defined ZS_ATOMIC_BITMAP_SOLARIS +#elif defined ZMQ_ATOMIC_BITMAP_SOLARIS while (true) { bitmap_t oldval = value; bitmap_t newval = oldval == 0 ? thenval_ : elseval_; if (atomic_cas_32 (&value, oldval, newval) == oldval) return oldval; } -#elif defined ZS_ATOMIC_BITMAP_X86 +#elif defined ZMQ_ATOMIC_BITMAP_X86 bitmap_t oldval; bitmap_t dummy; __asm__ volatile ( @@ -225,7 +225,7 @@ namespace zs : "r" (thenval_), "r" (elseval_) : "cc"); return oldval; -#elif defined ZS_ATOMIC_BITMAP_SPARC +#elif defined ZMQ_ATOMIC_BITMAP_SPARC volatile bitmap_t* ptrin = &value; bitmap_t tmp; bitmap_t prev; @@ -242,7 +242,7 @@ namespace zs : "r" (ptrin), "r" (thenval_), "r" (elseval_) : "cc"); return prev; -#elif defined ZS_ATOMIC_BITMAP_MUTEX +#elif defined ZMQ_ATOMIC_BITMAP_MUTEX sync.lock (); bitmap_t oldval = value; value = oldval ? elseval_ : thenval_; @@ -256,7 +256,7 @@ namespace zs private: volatile bitmap_t value; -#if defined ZS_ATOMIC_BITMAP_MUTEX +#if defined ZMQ_ATOMIC_BITMAP_MUTEX mutex_t sync; #endif @@ -267,20 +267,20 @@ namespace zs } // Remove macros local to this file. -#if defined ZS_ATOMIC_BITMAP_WINDOWS -#undef ZS_ATOMIC_BITMAP_WINDOWS +#if defined ZMQ_ATOMIC_BITMAP_WINDOWS +#undef ZMQ_ATOMIC_BITMAP_WINDOWS #endif -#if defined ZS_ATOMIC_BITMAP_SOLARIS -#undef ZS_ATOMIC_BITMAP_SOLARIS +#if defined ZMQ_ATOMIC_BITMAP_SOLARIS +#undef ZMQ_ATOMIC_BITMAP_SOLARIS #endif -#if defined ZS_ATOMIC_BITMAP_X86 -#undef ZS_ATOMIC_BITMAP_X86 +#if defined ZMQ_ATOMIC_BITMAP_X86 +#undef ZMQ_ATOMIC_BITMAP_X86 #endif -#if defined ZS_ATOMIC_BITMAP_SPARC -#undef ZS_ATOMIC_BITMAP_SPARC +#if defined ZMQ_ATOMIC_BITMAP_SPARC +#undef ZMQ_ATOMIC_BITMAP_SPARC #endif -#if defined ZS_ATOMIC_BITMAP_MUTEX -#undef ZS_ATOMIC_BITMAP_MUTEX +#if defined ZMQ_ATOMIC_BITMAP_MUTEX +#undef ZMQ_ATOMIC_BITMAP_MUTEX #endif #endif diff --git a/src/atomic_counter.hpp b/src/atomic_counter.hpp index 0873fdd..305aa59 100644 --- a/src/atomic_counter.hpp +++ b/src/atomic_counter.hpp @@ -18,35 +18,35 @@ */ -#ifndef __ZS_ATOMIC_COUNTER_HPP_INCLUDED__ -#define __ZS_ATOMIC_COUNTER_HPP_INCLUDED__ +#ifndef __ZMQ_ATOMIC_COUNTER_HPP_INCLUDED__ +#define __ZMQ_ATOMIC_COUNTER_HPP_INCLUDED__ #include "stdint.hpp" #include "platform.hpp" -#if defined ZS_FORCE_MUTEXES -#define ZS_ATOMIC_COUNTER_MUTEX +#if defined ZMQ_FORCE_MUTEXES +#define ZMQ_ATOMIC_COUNTER_MUTEX #elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ -#define ZS_ATOMIC_COUNTER_X86 +#define ZMQ_ATOMIC_COUNTER_X86 #elif 0 && defined __sparc__ && defined __GNUC__ -#define ZS_ATOMIC_COUNTER_SPARC -#elif defined ZS_HAVE_WINDOWS -#define ZS_ATOMIC_COUNTER_WINDOWS -#elif defined ZS_HAVE_SOLARIS -#define ZS_ATOMIC_COUNTER_SOLARIS +#define ZMQ_ATOMIC_COUNTER_SPARC +#elif defined ZMQ_HAVE_WINDOWS +#define ZMQ_ATOMIC_COUNTER_WINDOWS +#elif defined ZMQ_HAVE_SOLARIS +#define ZMQ_ATOMIC_COUNTER_SOLARIS #else -#define ZS_ATOMIC_COUNTER_MUTEX +#define ZMQ_ATOMIC_COUNTER_MUTEX #endif -#if defined ZS_ATOMIC_COUNTER_MUTEX +#if defined ZMQ_ATOMIC_COUNTER_MUTEX #include "mutex.hpp" -#elif defined ZS_ATOMIC_COUNTER_WINDOWS +#elif defined ZMQ_ATOMIC_COUNTER_WINDOWS #include "windows.hpp" -#elif defined ZS_ATOMIC_COUNTER_SOLARIS +#elif defined ZMQ_ATOMIC_COUNTER_SOLARIS #include #endif -namespace zs +namespace zmq { // This class represents an integer that can be incremented/decremented @@ -78,18 +78,18 @@ namespace zs { integer_t old_value; -#if defined ZS_ATOMIC_COUNTER_WINDOWS +#if defined ZMQ_ATOMIC_COUNTER_WINDOWS old_value = InterlockedExchangeAdd ((LONG*) &value, increment_); -#elif defined ZS_ATOMIC_COUNTER_SOLARIS +#elif defined ZMQ_ATOMIC_COUNTER_SOLARIS integer_t new_value = atomic_add_32_nv (&value, increment_); old_value = new_value - increment_; -#elif defined ZS_ATOMIC_COUNTER_X86 +#elif defined ZMQ_ATOMIC_COUNTER_X86 __asm__ volatile ( "lock; xadd %0, %1 \n\t" : "=r" (old_value), "=m" (value) : "0" (increment_), "m" (value) : "cc", "memory"); -#elif defined ZS_ATOMIC_COUNTER_SPARC +#elif defined ZMQ_ATOMIC_COUNTER_SPARC integer_t tmp; __asm__ volatile ( "ld [%4], %0 \n\t" @@ -102,7 +102,7 @@ namespace zs : "=&r" (old_value), "=&r" (tmp), "=m" (value) : "r" (increment_), "r" (&value) : "cc", "memory"); -#elif defined ZS_ATOMIC_COUNTER_MUTEX +#elif defined ZMQ_ATOMIC_COUNTER_MUTEX sync.lock (); old_value = value; value += increment_; @@ -116,15 +116,15 @@ namespace zs // Atomic subtraction. Returns false if the counter drops to zero. inline bool sub (integer_t decrement) { -#if defined ZS_ATOMIC_COUNTER_WINDOWS +#if defined ZMQ_ATOMIC_COUNTER_WINDOWS LONG delta = - ((LONG) decrement); integer_t old = InterlockedExchangeAdd ((LONG*) &value, delta); return old - decrement != 0; -#elif defined ZS_ATOMIC_COUNTER_SOLARIS +#elif defined ZMQ_ATOMIC_COUNTER_SOLARIS int32_t delta = - ((int32_t) decrement); integer_t nv = atomic_add_32_nv (&value, delta); return nv != 0; -#elif defined ZS_ATOMIC_COUNTER_X86 +#elif defined ZMQ_ATOMIC_COUNTER_X86 integer_t oldval = -decrement; volatile integer_t *val = &value; __asm__ volatile ("lock; xaddl %0,%1" @@ -132,7 +132,7 @@ namespace zs : "0" (oldval), "m" (*val) : "cc"); return oldval != decrement; -#elif defined ZS_ATOMIC_COUNTER_SPARC +#elif defined ZMQ_ATOMIC_COUNTER_SPARC volatile integer_t *val = &value; integer_t tmp; integer_t result; @@ -148,7 +148,7 @@ namespace zs : "r" (val) : "cc"); return result <= decrement; -#elif defined ZS_ATOMIC_COUNTER_MUTEX +#elif defined ZMQ_ATOMIC_COUNTER_MUTEX sync.lock (); value -= decrement; bool result = value ? true : false; @@ -167,7 +167,7 @@ namespace zs private: volatile integer_t value; -#if defined ZS_ATOMIC_COUNTER_MUTEX +#if defined ZMQ_ATOMIC_COUNTER_MUTEX mutex_t sync; #endif @@ -178,20 +178,20 @@ namespace zs } // Remove macros local to this file. -#if defined ZS_ATOMIC_COUNTER_WINDOWS -#undef ZS_ATOMIC_COUNTER_WINDOWS +#if defined ZMQ_ATOMIC_COUNTER_WINDOWS +#undef ZMQ_ATOMIC_COUNTER_WINDOWS #endif -#if defined ZS_ATOMIC_COUNTER_SOLARIS -#undef ZS_ATOMIC_COUNTER_SOLARIS +#if defined ZMQ_ATOMIC_COUNTER_SOLARIS +#undef ZMQ_ATOMIC_COUNTER_SOLARIS #endif -#if defined ZS_ATOMIC_COUNTER_X86 -#undef ZS_ATOMIC_COUNTER_X86 +#if defined ZMQ_ATOMIC_COUNTER_X86 +#undef ZMQ_ATOMIC_COUNTER_X86 #endif -#if defined ZS_ATOMIC_COUNTER_SPARC -#undef ZS_ATOMIC_COUNTER_SPARC +#if defined ZMQ_ATOMIC_COUNTER_SPARC +#undef ZMQ_ATOMIC_COUNTER_SPARC #endif -#if defined ZS_ATOMIC_COUNTER_MUTEX -#undef ZS_ATOMIC_COUNTER_MUTEX +#if defined ZMQ_ATOMIC_COUNTER_MUTEX +#undef ZMQ_ATOMIC_COUNTER_MUTEX #endif #endif diff --git a/src/atomic_ptr.hpp b/src/atomic_ptr.hpp index fcc4e73..f96782f 100644 --- a/src/atomic_ptr.hpp +++ b/src/atomic_ptr.hpp @@ -18,34 +18,34 @@ */ -#ifndef __ZS_ATOMIC_PTR_HPP_INCLUDED__ -#define __ZS_ATOMIC_PTR_HPP_INCLUDED__ +#ifndef __ZMQ_ATOMIC_PTR_HPP_INCLUDED__ +#define __ZMQ_ATOMIC_PTR_HPP_INCLUDED__ #include "platform.hpp" -#if defined ZS_FORCE_MUTEXES -#define ZS_ATOMIC_PTR_MUTEX +#if defined ZMQ_FORCE_MUTEXES +#define ZMQ_ATOMIC_PTR_MUTEX #elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ -#define ZS_ATOMIC_PTR_X86 +#define ZMQ_ATOMIC_PTR_X86 #elif 0 && defined __sparc__ && defined __GNUC__ -#define ZS_ATOMIC_PTR_SPARC -#elif defined ZS_HAVE_WINDOWS -#define ZS_ATOMIC_PTR_WINDOWS -#elif defined ZS_HAVE_SOLARIS -#define ZS_ATOMIC_PTR_SOLARIS +#define ZMQ_ATOMIC_PTR_SPARC +#elif defined ZMQ_HAVE_WINDOWS +#define ZMQ_ATOMIC_PTR_WINDOWS +#elif defined ZMQ_HAVE_SOLARIS +#define ZMQ_ATOMIC_PTR_SOLARIS #else -#define ZS_ATOMIC_PTR_MUTEX +#define ZMQ_ATOMIC_PTR_MUTEX #endif -#if defined ZS_ATOMIC_PTR_MUTEX +#if defined ZMQ_ATOMIC_PTR_MUTEX #include "mutex.hpp" -#elif defined ZS_ATOMIC_PTR_WINDOWS +#elif defined ZMQ_ATOMIC_PTR_WINDOWS #include "windows.hpp" -#elif defined ZS_ATOMIC_PTR_SOLARIS +#elif defined ZMQ_ATOMIC_PTR_SOLARIS #include #endif -namespace zs +namespace zmq { // This class encapsulates several atomic operations on pointers. @@ -77,18 +77,18 @@ namespace zs // to the 'val' value. Old value is returned. inline T *xchg (T *val_) { -#if defined ZS_ATOMIC_PTR_WINDOWS +#if defined ZMQ_ATOMIC_PTR_WINDOWS return (T*) InterlockedExchangePointer (&ptr, val_); -#elif defined ZS_ATOMIC_PTR_SOLARIS +#elif defined ZMQ_ATOMIC_PTR_SOLARIS return (T*) atomic_swap_ptr (&ptr, val_); -#elif defined ZS_ATOMIC_PTR_X86 +#elif defined ZMQ_ATOMIC_PTR_X86 T *old; __asm__ volatile ( "lock; xchg %0, %2" : "=r" (old), "=m" (ptr) : "m" (ptr), "0" (val_)); return old; -#elif defined ZS_ATOMIC_PTR_SPARC +#elif defined ZMQ_ATOMIC_PTR_SPARC T* newptr = val_; volatile T** ptrin = &ptr; T* tmp; @@ -105,7 +105,7 @@ namespace zs : "r" (ptrin) : "cc"); return prev; -#elif defined ZS_ATOMIC_PTR_MUTEX +#elif defined ZMQ_ATOMIC_PTR_MUTEX sync.lock (); T *old = (T*) ptr; ptr = val_; @@ -122,12 +122,12 @@ namespace zs // is returned. inline T *cas (T *cmp_, T *val_) { -#if defined ZS_ATOMIC_PTR_WINDOWS +#if defined ZMQ_ATOMIC_PTR_WINDOWS return (T*) InterlockedCompareExchangePointer ( (volatile PVOID*) &ptr, val_, cmp_); -#elif defined ZS_ATOMIC_PTR_SOLARIS +#elif defined ZMQ_ATOMIC_PTR_SOLARIS return (T*) atomic_cas_ptr (&ptr, cmp_, val_); -#elif defined ZS_ATOMIC_PTR_X86 +#elif defined ZMQ_ATOMIC_PTR_X86 T *old; __asm__ volatile ( "lock; cmpxchg %2, %3" @@ -135,7 +135,7 @@ namespace zs : "r" (val_), "m" (ptr), "0" (cmp_) : "cc"); return old; -#elif defined ZS_ATOMIC_PTR_SPARC +#elif defined ZMQ_ATOMIC_PTR_SPARC volatile T** ptrin = &ptr; volatile T* prev = ptr; __asm__ __volatile__( @@ -144,7 +144,7 @@ namespace zs : "r" (cmp_), "r" (val_), "r" (ptrin) : "cc"); return prev; -#elif defined ZS_ATOMIC_PTR_MUTEX +#elif defined ZMQ_ATOMIC_PTR_MUTEX sync.lock (); T *old = (T*) ptr; if (ptr == cmp_) @@ -159,7 +159,7 @@ namespace zs private: volatile T *ptr; -#if defined ZS_ATOMIC_PTR_MUTEX +#if defined ZMQ_ATOMIC_PTR_MUTEX mutex_t sync; #endif @@ -170,20 +170,20 @@ namespace zs } // Remove macros local to this file. -#if defined ZS_ATOMIC_PTR_WINDOWS -#undef ZS_ATOMIC_PTR_WINDOWS +#if defined ZMQ_ATOMIC_PTR_WINDOWS +#undef ZMQ_ATOMIC_PTR_WINDOWS #endif -#if defined ZS_ATOMIC_PTR_SOLARIS -#undef ZS_ATOMIC_PTR_SOLARIS +#if defined ZMQ_ATOMIC_PTR_SOLARIS +#undef ZMQ_ATOMIC_PTR_SOLARIS #endif -#if defined ZS_ATOMIC_PTR_X86 -#undef ZS_ATOMIC_PTR_X86 +#if defined ZMQ_ATOMIC_PTR_X86 +#undef ZMQ_ATOMIC_PTR_X86 #endif -#if defined ZS_ATOMIC_PTR_SPARC -#undef ZS_ATOMIC_PTR_SPARC +#if defined ZMQ_ATOMIC_PTR_SPARC +#undef ZMQ_ATOMIC_PTR_SPARC #endif -#if defined ZS_ATOMIC_PTR_MUTEX -#undef ZS_ATOMIC_PTR_MUTEX +#if defined ZMQ_ATOMIC_PTR_MUTEX +#undef ZMQ_ATOMIC_PTR_MUTEX #endif #endif diff --git a/src/command.hpp b/src/command.hpp index 0553137..69c4e57 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_COMMAND_HPP_INCLUDED__ -#define __ZS_COMMAND_HPP_INCLUDED__ +#ifndef __ZMQ_COMMAND_HPP_INCLUDED__ +#define __ZMQ_COMMAND_HPP_INCLUDED__ #include "stdint.hpp" -namespace zs +namespace zmq { // This structure defines the commands that can be sent between threads. diff --git a/src/config.hpp b/src/config.hpp index a0569ea..88b93d7 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -17,10 +17,10 @@ along with this program. If not, see . */ -#ifndef __ZS_CONFIG_HPP_INCLUDED__ -#define __ZS_CONFIG_HPP_INCLUDED__ +#ifndef __ZMQ_CONFIG_HPP_INCLUDED__ +#define __ZMQ_CONFIG_HPP_INCLUDED__ -namespace zs +namespace zmq { // Compile-time settings. diff --git a/src/connecter.cpp b/src/connecter.cpp index a21dde3..970dcf7 100644 --- a/src/connecter.cpp +++ b/src/connecter.cpp @@ -24,7 +24,7 @@ #include "simple_semaphore.hpp" #include "zmq_tcp_engine.hpp" -zs::connecter_t::connecter_t (io_thread_t *thread_, const char *addr_, +zmq::connecter_t::connecter_t (io_thread_t *thread_, const char *addr_, session_t *session_) : io_object_t (thread_), state (idle), @@ -36,24 +36,24 @@ zs::connecter_t::connecter_t (io_thread_t *thread_, const char *addr_, { } -void zs::connecter_t::terminate () +void zmq::connecter_t::terminate () { delete this; } -void zs::connecter_t::shutdown () +void zmq::connecter_t::shutdown () { delete this; } -zs::connecter_t::~connecter_t () +zmq::connecter_t::~connecter_t () { } -void zs::connecter_t::process_reg (simple_semaphore_t *smph_) +void zmq::connecter_t::process_reg (simple_semaphore_t *smph_) { // Fet poller pointer for further use. - zs_assert (!poller); + zmq_assert (!poller); poller = get_poller (); // Ask the session to register itself with the I/O thread. Note that @@ -71,10 +71,10 @@ void zs::connecter_t::process_reg (simple_semaphore_t *smph_) timer_event (); } -void zs::connecter_t::process_unreg (simple_semaphore_t *smph_) +void zmq::connecter_t::process_unreg (simple_semaphore_t *smph_) { // Unregister connecter/engine from the poller. - zs_assert (poller); + zmq_assert (poller); if (state == connecting) poller->rm_fd (handle); else if (state == waiting) @@ -87,22 +87,22 @@ void zs::connecter_t::process_unreg (simple_semaphore_t *smph_) smph_->post (); } -void zs::connecter_t::in_event () +void zmq::connecter_t::in_event () { // Error occured in asynchronous connect. Retry to connect after a while. if (state == connecting) { fd_t fd = tcp_connecter.connect (); - zs_assert (fd == retired_fd); + zmq_assert (fd == retired_fd); poller->rm_fd (handle); poller->add_timer (this); state = waiting; return; } - zs_assert (false); + zmq_assert (false); } -void zs::connecter_t::out_event () +void zmq::connecter_t::out_event () { if (state == connecting) { @@ -116,18 +116,18 @@ void zs::connecter_t::out_event () poller->rm_fd (handle); engine = new zmq_tcp_engine_t (fd); - zs_assert (engine); + zmq_assert (engine); engine->attach (poller, this); state = sending; return; } - zs_assert (false); + zmq_assert (false); } -void zs::connecter_t::timer_event () +void zmq::connecter_t::timer_event () { - zs_assert (state == waiting); + zmq_assert (state == waiting); // Initiate async connect and start polling for its completion. If async // connect fails instantly, try to reconnect after a while. @@ -147,21 +147,21 @@ void zs::connecter_t::timer_event () } } -void zs::connecter_t::set_engine (struct i_engine *engine_) +void zmq::connecter_t::set_engine (struct i_engine *engine_) { engine = engine_; } -bool zs::connecter_t::read (zs_msg *msg_) +bool zmq::connecter_t::read (zmq_msg *msg_) { - zs_assert (state == sending); + zmq_assert (state == sending); // Deallocate old content of the message just in case. - zs_msg_close (msg_); + zmq_msg_close (msg_); // Send the identity. - zs_msg_init_size (msg_, identity.size ()); - memcpy (zs_msg_data (msg_), identity.c_str (), identity.size ()); + zmq_msg_init_size (msg_, identity.size ()); + memcpy (zmq_msg_data (msg_), identity.c_str (), identity.size ()); // Ask engine to unregister from the poller. i_engine *e = engine; @@ -177,13 +177,13 @@ bool zs::connecter_t::read (zs_msg *msg_) return true; } -bool zs::connecter_t::write (struct zs_msg *msg_) +bool zmq::connecter_t::write (struct zmq_msg *msg_) { // No incoming messages are accepted till identity is sent. return false; } -void zs::connecter_t::flush () +void zmq::connecter_t::flush () { // No incoming messages are accepted till identity is sent. } diff --git a/src/connecter.hpp b/src/connecter.hpp index 91dbf17..1f11c63 100644 --- a/src/connecter.hpp +++ b/src/connecter.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_CONNECTER_HPP_INCLUDED__ -#define __ZS_CONNECTER_HPP_INCLUDED__ +#ifndef __ZMQ_CONNECTER_HPP_INCLUDED__ +#define __ZMQ_CONNECTER_HPP_INCLUDED__ #include -#include "../include/zs.h" +#include "../include/zmq.h" #include "i_poller.hpp" #include "io_object.hpp" @@ -30,7 +30,7 @@ #include "i_session.hpp" #include "tcp_connecter.hpp" -namespace zs +namespace zmq { class connecter_t : public io_object_t, public i_poll_events, @@ -55,8 +55,8 @@ namespace zs // i_session implementation void set_engine (struct i_engine *engine_); // void shutdown (); - bool read (struct zs_msg *msg_); - bool write (struct zs_msg *msg_); + bool read (struct zmq_msg *msg_); + bool write (struct zmq_msg *msg_); void flush (); private: diff --git a/src/data_distributor.cpp b/src/data_distributor.cpp index 8f89c46..971edce 100644 --- a/src/data_distributor.cpp +++ b/src/data_distributor.cpp @@ -17,7 +17,7 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "data_distributor.hpp" #include "pipe_writer.hpp" @@ -25,25 +25,25 @@ #include "session.hpp" #include "msg.hpp" -zs::data_distributor_t::data_distributor_t () : +zmq::data_distributor_t::data_distributor_t () : session (NULL) { } -void zs::data_distributor_t::set_session (session_t *session_) +void zmq::data_distributor_t::set_session (session_t *session_) { - zs_assert (!session); + zmq_assert (!session); session = session_; } -void zs::data_distributor_t::shutdown () +void zmq::data_distributor_t::shutdown () { // No need to deallocate pipes here. They'll be deallocated during the // shutdown of the dispatcher. delete this; } -void zs::data_distributor_t::terminate () +void zmq::data_distributor_t::terminate () { // Pipe unregisters itself during the call to terminate, so the pipes // list shinks by one in each iteration. @@ -53,11 +53,11 @@ void zs::data_distributor_t::terminate () delete this; } -zs::data_distributor_t::~data_distributor_t () +zmq::data_distributor_t::~data_distributor_t () { } -void zs::data_distributor_t::attach_pipe (pipe_writer_t *pipe_) +void zmq::data_distributor_t::attach_pipe (pipe_writer_t *pipe_) { // Associate demux with a new pipe. pipe_->set_demux (this); @@ -65,7 +65,7 @@ void zs::data_distributor_t::attach_pipe (pipe_writer_t *pipe_) pipes.push_back (pipe_); } -void zs::data_distributor_t::detach_pipe (pipe_writer_t *pipe_) +void zmq::data_distributor_t::detach_pipe (pipe_writer_t *pipe_) { // Release the reference to the pipe. int index = pipe_->get_index (); @@ -75,19 +75,19 @@ void zs::data_distributor_t::detach_pipe (pipe_writer_t *pipe_) pipes.pop_back (); } -bool zs::data_distributor_t::empty () +bool zmq::data_distributor_t::empty () { return pipes.empty (); } -bool zs::data_distributor_t::send (zs_msg *msg_) +bool zmq::data_distributor_t::send (zmq_msg *msg_) { int pipes_count = pipes.size (); // If there are no pipes available, simply drop the message. if (pipes_count == 0) { - zs_msg_close (msg_); - zs_msg_init (msg_); + zmq_msg_close (msg_); + zmq_msg_init (msg_); return true; } @@ -98,10 +98,10 @@ bool zs::data_distributor_t::send (zs_msg *msg_) // return false; // For VSMs the copying is straighforward. - if (msg_->content == (zs_msg_content*) ZS_VSM) { + if (msg_->content == (zmq_msg_content*) ZMQ_VSM) { for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++) write_to_pipe (*it, msg_); - zs_msg_init (msg_); + zmq_msg_init (msg_); return true; } @@ -110,7 +110,7 @@ bool zs::data_distributor_t::send (zs_msg *msg_) // operations) needed. if (pipes_count == 1) { write_to_pipe (*pipes.begin (), msg_); - zs_msg_init (msg_); + zmq_msg_init (msg_); return true; } @@ -130,12 +130,12 @@ bool zs::data_distributor_t::send (zs_msg *msg_) write_to_pipe (*it, msg_); // Detach the original message from the data buffer. - zs_msg_init (msg_); + zmq_msg_init (msg_); return true; } -void zs::data_distributor_t::flush () +void zmq::data_distributor_t::flush () { // Flush all pipes. If there's large number of pipes, it can be pretty // inefficient (especially if there's new message only in a single pipe). @@ -144,12 +144,12 @@ void zs::data_distributor_t::flush () (*it)->flush (); } -void zs::data_distributor_t::write_to_pipe (class pipe_writer_t *pipe_, - struct zs_msg *msg_) +void zmq::data_distributor_t::write_to_pipe (class pipe_writer_t *pipe_, + struct zmq_msg *msg_) { if (!pipe_->write (msg_)) { // TODO: Push gap notification to the pipe. - zs_assert (false); + zmq_assert (false); } } diff --git a/src/data_distributor.hpp b/src/data_distributor.hpp index 239de31..5bde2e8 100644 --- a/src/data_distributor.hpp +++ b/src/data_distributor.hpp @@ -17,14 +17,14 @@ along with this program. If not, see . */ -#ifndef __ZS_DATA_DISTRIBUTOR_HPP_INCLUDED__ -#define __ZS_DATA_DISTRIBUTOR_HPP_INCLUDED__ +#ifndef __ZMQ_DATA_DISTRIBUTOR_HPP_INCLUDED__ +#define __ZMQ_DATA_DISTRIBUTOR_HPP_INCLUDED__ #include #include -namespace zs +namespace zmq { // Object to distribute messages to outbound pipes. @@ -42,7 +42,7 @@ namespace zs void attach_pipe (class pipe_writer_t *pipe_); void detach_pipe (class pipe_writer_t *pipe_); bool empty (); - bool send (struct zs_msg *msg_); + bool send (struct zmq_msg *msg_); void flush (); private: @@ -55,7 +55,7 @@ namespace zs // Writes the message to the pipe if possible. If it isn't, writes // a gap notification to the pipe. - void write_to_pipe (class pipe_writer_t *pipe_, struct zs_msg *msg_); + void write_to_pipe (class pipe_writer_t *pipe_, struct zmq_msg *msg_); // The list of outbound pipes. typedef std::vector pipes_t; diff --git a/src/decoder.hpp b/src/decoder.hpp index c643df8..897f410 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -17,14 +17,14 @@ along with this program. If not, see . */ -#ifndef __ZS_DECODER_HPP_INCLUDED__ -#define __ZS_DECODER_HPP_INCLUDED__ +#ifndef __ZMQ_DECODER_HPP_INCLUDED__ +#define __ZMQ_DECODER_HPP_INCLUDED__ #include #include #include -namespace zs +namespace zmq { // Helper base class for decoders that know the amount of data to read diff --git a/src/devpoll.cpp b/src/devpoll.cpp index 6e3a8c1..8fb0877 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -19,7 +19,7 @@ #include "platform.hpp" -#if defined ZS_HAVE_SOLARIS || defined ZS_HAVE_HPUX +#if defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_HPUX #include #include @@ -35,7 +35,7 @@ #include "err.hpp" #include "config.hpp" -zs::devpoll_t::devpoll_t () +zmq::devpoll_t::devpoll_t () { // Get limit on open files struct rlimit rl; @@ -50,19 +50,19 @@ zs::devpoll_t::devpoll_t () errno_assert (devpoll_fd != -1); } -zs::devpoll_t::~devpoll_t () +zmq::devpoll_t::~devpoll_t () { close (devpoll_fd); } -void zs::devpoll_t::devpoll_ctl (fd_t fd_, short events_) +void zmq::devpoll_t::devpoll_ctl (fd_t fd_, short events_) { struct pollfd pfd = {fd_, events_, 0}; ssize_t rc = write (devpoll_fd, &pfd, sizeof pfd); - zs_assert (rc == sizeof pfd); + zmq_assert (rc == sizeof pfd); } -zs::handle_t zs::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_) +zmq::handle_t zmq::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_) { assert (!fd_table [fd_].valid); @@ -82,7 +82,7 @@ zs::handle_t zs::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_) return handle; } -void zs::devpoll_t::rm_fd (handle_t handle_) +void zmq::devpoll_t::rm_fd (handle_t handle_) { assert (fd_table [handle_.fd].valid); @@ -93,7 +93,7 @@ void zs::devpoll_t::rm_fd (handle_t handle_) load.sub (1); } -void zs::devpoll_t::set_pollin (handle_t handle_) +void zmq::devpoll_t::set_pollin (handle_t handle_) { fd_t fd = handle_.fd; devpoll_ctl (fd, POLLREMOVE); @@ -101,7 +101,7 @@ void zs::devpoll_t::set_pollin (handle_t handle_) devpoll_ctl (fd, fd_table [fd].events); } -void zs::devpoll_t::reset_pollin (handle_t handle_) +void zmq::devpoll_t::reset_pollin (handle_t handle_) { fd_t fd = handle_.fd; devpoll_ctl (fd, POLLREMOVE); @@ -109,7 +109,7 @@ void zs::devpoll_t::reset_pollin (handle_t handle_) devpoll_ctl (fd, fd_table [fd].events); } -void zs::devpoll_t::set_pollout (handle_t handle_) +void zmq::devpoll_t::set_pollout (handle_t handle_) { fd_t fd = handle_.fd; devpoll_ctl (fd, POLLREMOVE); @@ -117,7 +117,7 @@ void zs::devpoll_t::set_pollout (handle_t handle_) devpoll_ctl (fd, fd_table [fd].events); } -void zs::devpoll_t::reset_pollout (handle_t handle_) +void zmq::devpoll_t::reset_pollout (handle_t handle_) { fd_t fd = handle_.fd; devpoll_ctl (fd, POLLREMOVE); @@ -125,39 +125,39 @@ void zs::devpoll_t::reset_pollout (handle_t handle_) devpoll_ctl (fd, fd_table [fd].events); } -void zs::devpoll_t::add_timer (i_poll_events *events_) +void zmq::devpoll_t::add_timer (i_poll_events *events_) { timers.push_back (events_); } -void zs::devpoll_t::cancel_timer (i_poll_events *events_) +void zmq::devpoll_t::cancel_timer (i_poll_events *events_) { timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); if (it != timers.end ()) timers.erase (it); } -int zs::devpoll_t::get_load () +int zmq::devpoll_t::get_load () { return load.get (); } -void zs::devpoll_t::start () +void zmq::devpoll_t::start () { worker.start (worker_routine, this); } -void zs::devpoll_t::stop () +void zmq::devpoll_t::stop () { stopping = true; } -void zs::devpoll_t::join () +void zmq::devpoll_t::join () { worker.stop (); } -bool zs::devpoll_t::loop () +bool zmq::devpoll_t::loop () { // According to the poll(7d) man page, we can retrieve // no more then (OPEN_MAX - 1) events. @@ -216,7 +216,7 @@ bool zs::devpoll_t::loop () } } -void zs::devpoll_t::worker_routine (void *arg_) +void zmq::devpoll_t::worker_routine (void *arg_) { ((devpoll_t*) arg_)->loop (); } diff --git a/src/devpoll.hpp b/src/devpoll.hpp index 56b3b25..28274c0 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_DEVPOLL_HPP_INCLUDED__ -#define __ZS_DEVPOLL_HPP_INCLUDED__ +#ifndef __ZMQ_DEVPOLL_HPP_INCLUDED__ +#define __ZMQ_DEVPOLL_HPP_INCLUDED__ #include "platform.hpp" -#if defined ZS_HAVE_SOLARIS || ZS_HAVE_HPUX +#if defined ZMQ_HAVE_SOLARIS || ZMQ_HAVE_HPUX #include @@ -31,7 +31,7 @@ #include "thread.hpp" #include "atomic_counter.hpp" -namespace zs +namespace zmq { // Implements socket polling mechanism using the Solaris-specific diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index c468857..56a5e0b 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -17,7 +17,7 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "dispatcher.hpp" #include "app_thread.hpp" @@ -30,27 +30,27 @@ #include "session.hpp" #include "i_api.hpp" -#if defined ZS_HAVE_WINDOWS +#if defined ZMQ_HAVE_WINDOWS #include "windows.h" #endif -zs::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) +zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) { -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS // Intialise Windows sockets. Note that WSAStartup can be called multiple // times given that WSACleanup will be called for each WSAStartup. WORD version_requested = MAKEWORD (2, 2); WSADATA wsa_data; int rc = WSAStartup (version_requested, &wsa_data); - zs_assert (rc == 0); - zs_assert (LOBYTE (wsa_data.wVersion) == 2 && + zmq_assert (rc == 0); + zmq_assert (LOBYTE (wsa_data.wVersion) == 2 && HIBYTE (wsa_data.wVersion) == 2); #endif // Create application thread proxies. for (int i = 0; i != app_threads_; i++) { app_thread_t *app_thread = new app_thread_t (this, i); - zs_assert (app_thread); + zmq_assert (app_thread); app_threads.push_back (app_thread); signalers.push_back (app_thread->get_signaler ()); } @@ -58,26 +58,26 @@ zs::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) // Create I/O thread objects. for (int i = 0; i != io_threads_; i++) { io_thread_t *io_thread = new io_thread_t (this, i + app_threads_); - zs_assert (io_thread); + zmq_assert (io_thread); io_threads.push_back (io_thread); signalers.push_back (io_thread->get_signaler ()); } // Create command pipe matrix. command_pipes = new command_pipe_t [signalers.size () * signalers.size ()]; - zs_assert (command_pipes); + zmq_assert (command_pipes); // Launch I/O threads. for (int i = 0; i != io_threads_; i++) io_threads [i]->start (); } -void zs::dispatcher_t::shutdown () +void zmq::dispatcher_t::shutdown () { delete this; } -zs::dispatcher_t::~dispatcher_t () +zmq::dispatcher_t::~dispatcher_t () { // Ask I/O threads to terminate. for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) @@ -103,19 +103,19 @@ zs::dispatcher_t::~dispatcher_t () delete it->writer; } -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS // On Windows, uninitialise socket layer. int rc = WSACleanup (); wsa_assert (rc != SOCKET_ERROR); #endif } -int zs::dispatcher_t::thread_slot_count () +int zmq::dispatcher_t::thread_slot_count () { return signalers.size (); } -zs::i_api *zs::dispatcher_t::create_socket (int type_) +zmq::i_api *zmq::dispatcher_t::create_socket (int type_) { threads_sync.lock (); app_thread_t *thread = choose_app_thread (); @@ -128,7 +128,7 @@ zs::i_api *zs::dispatcher_t::create_socket (int type_) return s; } -zs::app_thread_t *zs::dispatcher_t::choose_app_thread () +zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () { // Check whether thread ID is already assigned. If so, return it. for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) @@ -145,9 +145,9 @@ zs::app_thread_t *zs::dispatcher_t::choose_app_thread () return NULL; } -zs::io_thread_t *zs::dispatcher_t::choose_io_thread (uint64_t taskset_) +zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_) { - zs_assert (io_threads.size () > 0); + zmq_assert (io_threads.size () > 0); // Find the I/O thread with minimum load. int min_load = io_threads [0]->get_load (); @@ -165,19 +165,19 @@ zs::io_thread_t *zs::dispatcher_t::choose_io_thread (uint64_t taskset_) return io_threads [result]; } -void zs::dispatcher_t::create_pipe (object_t *reader_parent_, +void zmq::dispatcher_t::create_pipe (object_t *reader_parent_, object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, pipe_reader_t **reader_, pipe_writer_t **writer_) { // Create the pipe, reader & writer triple. pipe_t *pipe = new pipe_t; - zs_assert (pipe); + zmq_assert (pipe); pipe_reader_t *reader = new pipe_reader_t (reader_parent_, pipe, hwm_, lwm_); - zs_assert (reader); + zmq_assert (reader); pipe_writer_t *writer = new pipe_writer_t (writer_parent_, pipe, reader, hwm_, lwm_); - zs_assert (writer); + zmq_assert (writer); reader->set_peer (writer); // Store the pipe in the repository. @@ -191,7 +191,7 @@ void zs::dispatcher_t::create_pipe (object_t *reader_parent_, *writer_ = writer; } -void zs::dispatcher_t::destroy_pipe (pipe_t *pipe_) +void zmq::dispatcher_t::destroy_pipe (pipe_t *pipe_) { // Remove the pipe from the repository. pipe_info_t info; @@ -203,13 +203,13 @@ void zs::dispatcher_t::destroy_pipe (pipe_t *pipe_) pipes_sync.unlock (); // Deallocate the pipe and associated pipe reader & pipe writer. - zs_assert (info.pipe == pipe_); + zmq_assert (info.pipe == pipe_); delete info.pipe; delete info.reader; delete info.writer; } -int zs::dispatcher_t::register_inproc_endpoint (const char *endpoint_, +int zmq::dispatcher_t::register_inproc_endpoint (const char *endpoint_, session_t *session_) { inproc_endpoint_sync.lock (); @@ -227,7 +227,7 @@ int zs::dispatcher_t::register_inproc_endpoint (const char *endpoint_, return 0; } -zs::object_t *zs::dispatcher_t::get_inproc_endpoint (const char *endpoint_) +zmq::object_t *zmq::dispatcher_t::get_inproc_endpoint (const char *endpoint_) { inproc_endpoint_sync.lock (); inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_); @@ -245,7 +245,7 @@ zs::object_t *zs::dispatcher_t::get_inproc_endpoint (const char *endpoint_) return session; } -void zs::dispatcher_t::unregister_inproc_endpoints (session_t *session_) +void zmq::dispatcher_t::unregister_inproc_endpoints (session_t *session_) { inproc_endpoint_sync.lock (); diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index 05d2c49..07c35cd 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZS_DISPATCHER_HPP_INCLUDED__ -#define __ZS_DISPATCHER_HPP_INCLUDED__ +#ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__ +#define __ZMQ_DISPATCHER_HPP_INCLUDED__ #include #include @@ -31,7 +31,7 @@ #include "mutex.hpp" #include "stdint.hpp" -namespace zs +namespace zmq { // Dispatcher implements bidirectional thread-safe passing of commands @@ -51,7 +51,7 @@ namespace zs // signalers. dispatcher_t (int app_threads_, int io_threads_); - // To be called to terminate the whole infrastructure (zs_term). + // To be called to terminate the whole infrastructure (zmq_term). void shutdown (); // Create a socket engine. diff --git a/src/dummy_aggregator.cpp b/src/dummy_aggregator.cpp index ef0cea6..0b27fab 100644 --- a/src/dummy_aggregator.cpp +++ b/src/dummy_aggregator.cpp @@ -17,7 +17,7 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "dummy_aggregator.hpp" #include "err.hpp" @@ -30,27 +30,27 @@ pipes [i1_]->set_index (i1_);\ pipes [i2_]->set_index (i2_); -zs::dummy_aggregator_t::dummy_aggregator_t () : +zmq::dummy_aggregator_t::dummy_aggregator_t () : session (NULL), pipe (NULL), active (false) { } -void zs::dummy_aggregator_t::set_session (session_t *session_) +void zmq::dummy_aggregator_t::set_session (session_t *session_) { - zs_assert (!session); + zmq_assert (!session); session = session_; } -void zs::dummy_aggregator_t::shutdown () +void zmq::dummy_aggregator_t::shutdown () { // No need to deallocate the pipe here. It'll be deallocated during the // shutdown of the dispatcher. delete this; } -void zs::dummy_aggregator_t::terminate () +void zmq::dummy_aggregator_t::terminate () { if (pipe) pipe->terminate (); @@ -58,13 +58,13 @@ void zs::dummy_aggregator_t::terminate () delete this; } -zs::dummy_aggregator_t::~dummy_aggregator_t () +zmq::dummy_aggregator_t::~dummy_aggregator_t () { } -void zs::dummy_aggregator_t::attach_pipe (pipe_reader_t *pipe_) +void zmq::dummy_aggregator_t::attach_pipe (pipe_reader_t *pipe_) { - zs_assert (!pipe); + zmq_assert (!pipe); pipe = pipe_; active = true; @@ -73,22 +73,22 @@ void zs::dummy_aggregator_t::attach_pipe (pipe_reader_t *pipe_) session->revive (); } -void zs::dummy_aggregator_t::detach_pipe (pipe_reader_t *pipe_) +void zmq::dummy_aggregator_t::detach_pipe (pipe_reader_t *pipe_) { - zs_assert (pipe == pipe_); + zmq_assert (pipe == pipe_); deactivate (pipe_); pipe = NULL; } -bool zs::dummy_aggregator_t::empty () +bool zmq::dummy_aggregator_t::empty () { return pipe == NULL; } -bool zs::dummy_aggregator_t::recv (zs_msg *msg_) +bool zmq::dummy_aggregator_t::recv (zmq_msg *msg_) { // Deallocate old content of the message. - zs_msg_close (msg_); + zmq_msg_close (msg_); // Try to read from the pipe. if (pipe && pipe->read (msg_)) @@ -96,16 +96,16 @@ bool zs::dummy_aggregator_t::recv (zs_msg *msg_) // No message is available. Initialise the output parameter // to be a 0-byte message. - zs_msg_init (msg_); + zmq_msg_init (msg_); return false; } -void zs::dummy_aggregator_t::deactivate (pipe_reader_t *pipe_) +void zmq::dummy_aggregator_t::deactivate (pipe_reader_t *pipe_) { active = false; } -void zs::dummy_aggregator_t::reactivate (pipe_reader_t *pipe_) +void zmq::dummy_aggregator_t::reactivate (pipe_reader_t *pipe_) { active = true; } diff --git a/src/dummy_aggregator.hpp b/src/dummy_aggregator.hpp index ab5bcb9..6a9e9db 100644 --- a/src/dummy_aggregator.hpp +++ b/src/dummy_aggregator.hpp @@ -17,14 +17,14 @@ along with this program. If not, see . */ -#ifndef __ZS_DUMMY_AGGREGATOR_HPP_INCLUDED__ -#define __ZS_DUMMY_AGGREGATOR_HPP_INCLUDED__ +#ifndef __ZMQ_DUMMY_AGGREGATOR_HPP_INCLUDED__ +#define __ZMQ_DUMMY_AGGREGATOR_HPP_INCLUDED__ #include #include "i_mux.hpp" -namespace zs +namespace zmq { // Fake message aggregator. There can be at most one pipe bound to it, @@ -47,7 +47,7 @@ namespace zs bool empty (); void deactivate (class pipe_reader_t *pipe_); void reactivate (class pipe_reader_t *pipe_); - bool recv (struct zs_msg *msg_); + bool recv (struct zmq_msg *msg_); private: diff --git a/src/dummy_distributor.cpp b/src/dummy_distributor.cpp index 58cadfe..62e2b88 100644 --- a/src/dummy_distributor.cpp +++ b/src/dummy_distributor.cpp @@ -17,7 +17,7 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "dummy_distributor.hpp" #include "pipe_writer.hpp" @@ -25,25 +25,25 @@ #include "session.hpp" #include "msg.hpp" -zs::dummy_distributor_t::dummy_distributor_t () : +zmq::dummy_distributor_t::dummy_distributor_t () : session (NULL) { } -void zs::dummy_distributor_t::set_session (session_t *session_) +void zmq::dummy_distributor_t::set_session (session_t *session_) { - zs_assert (!session); + zmq_assert (!session); session = session_; } -void zs::dummy_distributor_t::shutdown () +void zmq::dummy_distributor_t::shutdown () { // No need to deallocate pipe here. It'll be deallocated during the // shutdown of the dispatcher. delete this; } -void zs::dummy_distributor_t::terminate () +void zmq::dummy_distributor_t::terminate () { if (pipe) pipe->terminate (); @@ -51,33 +51,33 @@ void zs::dummy_distributor_t::terminate () delete this; } -zs::dummy_distributor_t::~dummy_distributor_t () +zmq::dummy_distributor_t::~dummy_distributor_t () { } -void zs::dummy_distributor_t::attach_pipe (pipe_writer_t *pipe_) +void zmq::dummy_distributor_t::attach_pipe (pipe_writer_t *pipe_) { - zs_assert (!pipe); + zmq_assert (!pipe); pipe = pipe_; } -void zs::dummy_distributor_t::detach_pipe (pipe_writer_t *pipe_) +void zmq::dummy_distributor_t::detach_pipe (pipe_writer_t *pipe_) { - zs_assert (pipe == pipe_); + zmq_assert (pipe == pipe_); pipe = NULL; } -bool zs::dummy_distributor_t::empty () +bool zmq::dummy_distributor_t::empty () { return pipe == NULL; } -bool zs::dummy_distributor_t::send (zs_msg *msg_) +bool zmq::dummy_distributor_t::send (zmq_msg *msg_) { return pipe && pipe->write (msg_); } -void zs::dummy_distributor_t::flush () +void zmq::dummy_distributor_t::flush () { if (pipe) pipe->flush (); diff --git a/src/dummy_distributor.hpp b/src/dummy_distributor.hpp index