From cc3755a16f00026af882ed14d122cc8aa6d50e82 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 3 Aug 2009 11:30:13 +0200 Subject: renamed from zs to zmq --- src/Makefile.am | 10 +-- src/app_thread.cpp | 70 +++++++-------- src/app_thread.hpp | 8 +- src/atomic.hpp | 62 ++++++------- src/atomic_bitmap.hpp | 82 ++++++++--------- src/atomic_counter.hpp | 72 +++++++-------- src/atomic_ptr.hpp | 72 +++++++-------- src/command.hpp | 6 +- src/config.hpp | 6 +- src/connecter.cpp | 48 +++++----- src/connecter.hpp | 12 +-- src/data_distributor.cpp | 42 ++++----- src/data_distributor.hpp | 10 +-- src/decoder.hpp | 6 +- src/devpoll.cpp | 38 ++++---- src/devpoll.hpp | 8 +- src/dispatcher.cpp | 52 +++++------ src/dispatcher.hpp | 8 +- src/dummy_aggregator.cpp | 34 +++---- src/dummy_aggregator.hpp | 8 +- src/dummy_distributor.cpp | 28 +++--- src/dummy_distributor.hpp | 8 +- src/encoder.hpp | 6 +- src/epoll.cpp | 36 ++++---- src/epoll.hpp | 8 +- src/err.cpp | 8 +- src/err.hpp | 16 ++-- src/fair_aggregator.cpp | 30 +++---- src/fair_aggregator.hpp | 8 +- src/fd.hpp | 8 +- src/fd_signaler.cpp | 54 +++++------ src/fd_signaler.hpp | 8 +- src/i_api.hpp | 14 +-- src/i_demux.hpp | 11 +-- src/i_engine.hpp | 8 +- src/i_mux.hpp | 11 +-- src/i_poll_events.hpp | 6 +- src/i_poller.hpp | 6 +- src/i_session.hpp | 10 +-- src/i_signaler.hpp | 6 +- src/i_thread.hpp | 6 +- src/io_object.cpp | 6 +- src/io_object.hpp | 6 +- src/io_thread.cpp | 72 +++++++-------- src/io_thread.hpp | 8 +- src/ip.cpp | 36 ++++---- src/ip.hpp | 8 +- src/kqueue.cpp | 40 ++++----- src/kqueue.hpp | 8 +- src/listener.cpp | 42 ++++----- src/listener.hpp | 6 +- src/load_balancer.cpp | 32 +++---- src/load_balancer.hpp | 10 +-- src/msg.hpp | 16 ++-- src/mutex.hpp | 10 +-- src/object.cpp | 88 +++++++++--------- src/object.hpp | 6 +- src/p2p.cpp | 4 +- src/p2p.hpp | 6 +- src/pipe.cpp | 14 +-- src/pipe.hpp | 10 +-- src/pipe_reader.cpp | 26 +++--- src/pipe_reader.hpp | 8 +- src/pipe_writer.cpp | 30 +++---- src/pipe_writer.hpp | 8 +- src/platform.hpp.in | 26 +++--- src/poll.cpp | 40 ++++----- src/poll.hpp | 14 +-- src/pub.cpp | 6 +- src/pub.hpp | 8 +- src/rep.cpp | 4 +- src/rep.hpp | 6 +- src/req.cpp | 4 +- src/req.hpp | 6 +- src/safe_object.cpp | 14 +-- src/safe_object.hpp | 6 +- src/select.cpp | 40 ++++----- src/select.hpp | 10 +-- src/session.cpp | 46 +++++----- src/session.hpp | 10 +-- src/session_stub.cpp | 30 +++---- src/session_stub.hpp | 10 +-- src/simple_semaphore.hpp | 12 +-- src/socket_base.cpp | 56 ++++++------ src/socket_base.hpp | 14 +-- src/stdint.hpp | 6 +- src/sub.cpp | 8 +- src/sub.hpp | 8 +- src/tcp_connecter.cpp | 20 ++--- src/tcp_connecter.hpp | 6 +- src/tcp_listener.cpp | 20 ++--- src/tcp_listener.hpp | 6 +- src/tcp_socket.cpp | 18 ++-- src/tcp_socket.hpp | 6 +- src/thread.cpp | 14 +-- src/thread.hpp | 10 +-- src/uuid.cpp | 40 ++++----- src/uuid.hpp | 18 ++-- src/windows.hpp | 4 +- src/wire.hpp | 6 +- src/ypipe.hpp | 8 +- src/ypollset.cpp | 10 +-- src/ypollset.hpp | 6 +- src/yqueue.hpp | 10 +-- src/zmq.cpp | 223 ++++++++++++++++++++++++++++++++++++++++++++++ src/zmq_decoder.cpp | 25 +++--- src/zmq_decoder.hpp | 10 +-- src/zmq_encoder.cpp | 18 ++-- src/zmq_encoder.hpp | 10 +-- src/zmq_tcp_engine.cpp | 44 ++++----- src/zmq_tcp_engine.hpp | 6 +- src/zs.cpp | 222 --------------------------------------------- 112 files changed, 1299 insertions(+), 1295 deletions(-) create mode 100644 src/zmq.cpp delete mode 100644 src/zs.cpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index bb648ec..e6d09ca 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,6 +1,6 @@ -lib_LTLIBRARIES = libzs.la +lib_LTLIBRARIES = libzmq.la -libzs_la_SOURCES = \ +libzmq_la_SOURCES = \ app_thread.hpp \ atomic_bitmap.hpp \ atomic_counter.hpp \ @@ -109,10 +109,10 @@ libzs_la_SOURCES = \ zmq_decoder.cpp \ zmq_encoder.cpp \ zmq_tcp_engine.cpp \ - zs.cpp + zmq.cpp -libzs_la_LDFLAGS = -version-info 0:0:0 -libzs_la_CXXFLAGS = -Wall -pedantic -Werror @ZS_EXTRA_CXXFLAGS@ +libzmq_la_LDFLAGS = -version-info 0:0:0 +libzmq_la_CXXFLAGS = -Wall -pedantic -Werror @ZMQ_EXTRA_CXXFLAGS@ dist-hook: -rm $(distdir)/src/platform.hpp diff --git a/src/app_thread.cpp b/src/app_thread.cpp index ca08976..2406dbd 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -17,9 +17,9 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" -#if defined ZS_HAVE_WINDOWS +#if defined ZMQ_HAVE_WINDOWS #include "windows.hpp" #else #include @@ -48,17 +48,17 @@ // system with x86 architecture and gcc or MSVC compiler. #if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\ (defined _MSC_VER && (defined _M_IX86 || defined _M_X64)) -#define ZS_DELAY_COMMANDS +#define ZMQ_DELAY_COMMANDS #endif -zs::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : +zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : object_t (dispatcher_, thread_slot_), tid (0), last_processing_time (0) { } -void zs::app_thread_t::shutdown () +void zmq::app_thread_t::shutdown () { // Deallocate all the sessions associated with the thread. while (!sessions.empty ()) @@ -67,17 +67,17 @@ void zs::app_thread_t::shutdown () delete this; } -zs::app_thread_t::~app_thread_t () +zmq::app_thread_t::~app_thread_t () { } -void zs::app_thread_t::attach_session (session_t *session_) +void zmq::app_thread_t::attach_session (session_t *session_) { session_->set_index (sessions.size ()); sessions.push_back (session_); } -void zs::app_thread_t::detach_session (session_t *session_) +void zmq::app_thread_t::detach_session (session_t *session_) { // O(1) removal of the session from the list. sessions_t::size_type i = session_->get_index (); @@ -86,22 +86,22 @@ void zs::app_thread_t::detach_session (session_t *session_) sessions.pop_back (); } -zs::i_poller *zs::app_thread_t::get_poller () +zmq::i_poller *zmq::app_thread_t::get_poller () { - zs_assert (false); + zmq_assert (false); } -zs::i_signaler *zs::app_thread_t::get_signaler () +zmq::i_signaler *zmq::app_thread_t::get_signaler () { return &pollset; } -bool zs::app_thread_t::is_current () +bool zmq::app_thread_t::is_current () { return !sessions.empty () && tid == getpid (); } -bool zs::app_thread_t::make_current () +bool zmq::app_thread_t::make_current () { // If there are object managed by this slot we cannot assign the slot // to a different thread. @@ -112,7 +112,7 @@ bool zs::app_thread_t::make_current () return true; } -zs::i_api *zs::app_thread_t::create_socket (int type_) +zmq::i_api *zmq::app_thread_t::create_socket (int type_) { i_mux *mux = NULL; i_demux *demux = NULL; @@ -120,43 +120,43 @@ zs::i_api *zs::app_thread_t::create_socket (int type_) i_api *api = NULL; switch (type_) { - case ZS_P2P: + case ZMQ_P2P: mux = new dummy_aggregator_t; - zs_assert (mux); + zmq_assert (mux); demux = new dummy_distributor_t; - zs_assert (demux); + zmq_assert (demux); session = new session_t (this, this, mux, demux, true, false); - zs_assert (session); + zmq_assert (session); api = new p2p_t (this, session); - zs_assert (api); + zmq_assert (api); break; - case ZS_PUB: + case ZMQ_PUB: demux = new data_distributor_t; - zs_assert (demux); + zmq_assert (demux); session = new session_t (this, this, mux, demux, true, false); - zs_assert (session); + zmq_assert (session); api = new pub_t (this, session); - zs_assert (api); + zmq_assert (api); break; - case ZS_SUB: + case ZMQ_SUB: mux = new fair_aggregator_t; - zs_assert (mux); + zmq_assert (mux); session = new session_t (this, this, mux, demux, true, false); - zs_assert (session); + zmq_assert (session); api = new sub_t (this, session); - zs_assert (api); + zmq_assert (api); break; - case ZS_REQ: + case ZMQ_REQ: // TODO - zs_assert (false); + zmq_assert (false); api = new req_t (this, session); - zs_assert (api); + zmq_assert (api); break; - case ZS_REP: + case ZMQ_REP: // TODO - zs_assert (false); + zmq_assert (false); api = new rep_t (this, session); - zs_assert (api); + zmq_assert (api); break; default: errno = EINVAL; @@ -168,14 +168,14 @@ zs::i_api *zs::app_thread_t::create_socket (int type_) return api; } -void zs::app_thread_t::process_commands (bool block_) +void zmq::app_thread_t::process_commands (bool block_) { ypollset_t::signals_t signals; if (block_) signals = pollset.poll (); else { -#if defined ZS_DELAY_COMMANDS +#if defined ZMQ_DELAY_COMMANDS // Optimised version of command processing - it doesn't have to check // for incoming commands each time. It does so only if certain time // elapsed since last command processing. Command delay varies diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 61e7ff1..ffe5596 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZS_APP_THREAD_HPP_INCLUDED__ -#define __ZS_APP_THREAD_HPP_INCLUDED__ +#ifndef __ZMQ_APP_THREAD_HPP_INCLUDED__ +#define __ZMQ_APP_THREAD_HPP_INCLUDED__ #include @@ -27,7 +27,7 @@ #include "object.hpp" #include "ypollset.hpp" -namespace zs +namespace zmq { class app_thread_t : public object_t, public i_thread @@ -36,7 +36,7 @@ namespace zs app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); - // To be called when the whole infrastrucure is being closed (zs_term). + // To be called when the whole infrastrucure is being closed. void shutdown (); // Returns signaler associated with this application thread. diff --git a/src/atomic.hpp b/src/atomic.hpp index e24b719..e581593 100644 --- a/src/atomic.hpp +++ b/src/atomic.hpp @@ -17,24 +17,24 @@ along with this program. If not, see . */ -#ifndef __ZS_ATOMIC_HPP_INCLUDED__ -#define __ZS_ATOMIC_HPP_INCLUDED__ +#ifndef __ZMQ_ATOMIC_HPP_INCLUDED__ +#define __ZMQ_ATOMIC_HPP_INCLUDED__ #include "stdint.hpp" -#if defined ZS_FORCE_MUTEXES -#define ZS_ATOMIC_MUTEX +#if defined ZMQ_FORCE_MUTEXES +#define ZMQ_ATOMIC_MUTEX #elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ -#define ZS_ATOMIC_X86 +#define ZMQ_ATOMIC_X86 #elif defined ZMQ_HAVE_WINDOWS -#define ZS_ATOMIC_WINDOWS +#define ZMQ_ATOMIC_WINDOWS #elif defined ZMQ_HAVE_SOLARIS -#define ZS_ATOMIC_SOLARIS +#define ZMQ_ATOMIC_SOLARIS #else -#define ZS_ATOMIC_MUTEX +#define ZMQ_ATOMIC_MUTEX #endif -namespace zs +namespace zmq { // Atomic assignement. @@ -56,11 +56,11 @@ namespace zs // Atomic addition. Returns the old value. inline uint32_t atomic_uint32_add (volatile uint32_t *p_, uint32_t delta_) { -#if defined ZS_ATOMIC_WINDOWS +#if defined ZMQ_ATOMIC_WINDOWS return InterlockedExchangeAdd ((LONG*) &value, increment_); -#elif defined ZS_ATOMIC_SOLARIS +#elif defined ZMQ_ATOMIC_SOLARIS return atomic_add_32_nv (&value, increment_) - delta_; -#elif defined ZS_ATOMIC_X86 +#elif defined ZMQ_ATOMIC_X86 uint32_t old; __asm__ volatile ( "lock; xadd %0, %1\n\t" @@ -80,13 +80,13 @@ namespace zs // Atomic subtraction. Returns the old value. inline uint32_t atomic_uint32_sub (volatile uint32_t *p_, uint32_t delta_) { -#if defined ZS_ATOMIC_WINDOWS +#if defined ZMQ_ATOMIC_WINDOWS LONG delta = - ((LONG) delta_); return InterlockedExchangeAdd ((LONG*) &value, delta); -#elif defined ZS_ATOMIC_SOLARIS +#elif defined ZMQ_ATOMIC_SOLARIS int32_t delta = - ((int32_t) delta_); return atomic_add_32_nv (&value, delta) + delta_; -#elif defined ZS_ATOMIC_X86 +#elif defined ZMQ_ATOMIC_X86 uint32_t old = -delta_; __asm__ volatile ("lock; xaddl %0,%1" : "=r" (old), "=m" (*p_) @@ -116,11 +116,11 @@ namespace zs template inline void *atomic_ptr_xchg (volatile T **p_, T *value_) { -#if defined ZS_ATOMIC_WINDOWS +#if defined ZMQ_ATOMIC_WINDOWS return InterlockedExchangePointer (p_, value_); -#elif defined ZS_ATOMIC_SOLARIS +#elif defined ZMQ_ATOMIC_SOLARIS return atomic_swap_ptr (p_, value_); -#elif defined ZS_ATOMIC_X86 +#elif defined ZMQ_ATOMIC_X86 void *old; __asm__ volatile ( "lock; xchg %0, %2" @@ -144,11 +144,11 @@ namespace zs template inline void *atomic_ptr_cas (volatile T **p_, T *cmp_, T *value_) { -#if defined ZS_ATOMIC_WINDOWS +#if defined ZMQ_ATOMIC_WINDOWS return InterlockedCompareExchangePointer (p_, value_, cmp_); -#elif defined ZS_ATOMIC_SOLARIS +#elif defined ZMQ_ATOMIC_SOLARIS return atomic_cas_ptr (p_, cmp_, value_); -#elif defined ZS_ATOMIC_X86 +#elif defined ZMQ_ATOMIC_X86 void *old; __asm__ volatile ( "lock; cmpxchg %2, %3" @@ -167,7 +167,7 @@ namespace zs #endif } -#if defined ZS_ATOMIC_X86 && defined __x86_64__ +#if defined ZMQ_ATOMIC_X86 && defined __x86_64__ typedef uint64_t atomic_bitmap_t; #else typedef uint32_t atomic_bitmap_t; @@ -187,7 +187,7 @@ namespace zs inline bool atomic_bitmap_btsr (volatile atomic_bitmap_t *p_, int set_index_, int reset_index_) { -#if defined ZS_ATOMIC_WINDOWS +#if defined ZMQ_ATOMIC_WINDOWS while (true) { atomic_bitmap_t oldval = *p_; atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) << @@ -197,7 +197,7 @@ namespace zs return (oldval & (atomic_bitmap_t (1) << reset_index_)) ? true : false; } -#elif defined ZS_ATOMIC_SOLARIS +#elif defined ZMQ_ATOMIC_SOLARIS while (true) { atomic_bitmap_t oldval = *p_; atomic_bitmap_t newval = (oldval | (atomic_bitmap_t (1) << @@ -206,7 +206,7 @@ namespace zs return (oldval & (atomic_bitmap_t (1) << reset_index_)) ? true : false; } -#elif defined ZS_ATOMIC_X86 +#elif defined ZMQ_ATOMIC_X86 atomic_bitmap_t oldval, dummy; __asm__ volatile ( "mov %0, %1\n\t" @@ -236,11 +236,11 @@ namespace zs inline atomic_bitmap_t atomic_bitmap_xchg (volatile atomic_bitmap_t *p_, atomic_bitmap_t newval_) { -#if defined ZS_ATOMIC_WINDOWS +#if defined ZMQ_ATOMIC_WINDOWS return InterlockedExchange ((volatile LONG*) p_, newval_); -#elif defined ZS_ATOMIC_SOLARIS +#elif defined ZMQ_ATOMIC_SOLARIS return atomic_swap_32 (p_, newval_); -#elif defined ZS_ATOMIC_X86 +#elif defined ZMQ_ATOMIC_X86 atomic_bitmap_t oldval = newval_; __asm__ volatile ( "lock; xchg %0, %1" @@ -263,7 +263,7 @@ namespace zs inline atomic_bitmap_t atomic_bitmap_izte (volatile atomic_bitmap_t *p_, atomic_bitmap_t thenval_, atomic_bitmap_t elseval_) { -#if defined ZS_ATOMIC_WINDOWS +#if defined ZMQ_ATOMIC_WINDOWS while (true) { atomic_bitmap_t oldval = *p_; atomic_bitmap_t newval = (oldval ? elseval_ : thenval_); @@ -271,14 +271,14 @@ namespace zs oldval) == (LONG) oldval) return oldval; } -#elif defined ZS_ATOMIC_SOLARIS +#elif defined ZMQ_ATOMIC_SOLARIS while (true) { atomic_bitmap_t oldval = *p_; atomic_bitmap_t newval = (oldval ? elseval_ : thenval_); if (atomic_cas_32 (p_, oldval, newval) == oldval) return oldval; } -#elif defined ZS_ATOMIC_X86 +#elif defined ZMQ_ATOMIC_X86 atomic_bitmap_t oldval; atomic_bitmap_t dummy; __asm__ volatile ( diff --git a/src/atomic_bitmap.hpp b/src/atomic_bitmap.hpp index a5440de..6b7218e 100644 --- a/src/atomic_bitmap.hpp +++ b/src/atomic_bitmap.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZS_ATOMIC_BITMAP_HPP_INCLUDED__ -#define __ZS_ATOMIC_BITMAP_HPP_INCLUDED__ +#ifndef __ZMQ_ATOMIC_BITMAP_HPP_INCLUDED__ +#define __ZMQ_ATOMIC_BITMAP_HPP_INCLUDED__ #include "stdint.hpp" #include "platform.hpp" @@ -26,29 +26,29 @@ // These are the conditions to choose between different implementations // of atomic_bitmap. -#if defined ZS_FORCE_MUTEXES -#define ZS_ATOMIC_BITMAP_MUTEX +#if defined ZMQ_FORCE_MUTEXES +#define ZMQ_ATOMIC_BITMAP_MUTEX #elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ -#define ZS_ATOMIC_BITMAP_X86 +#define ZMQ_ATOMIC_BITMAP_X86 #elif 0 && defined __sparc__ && defined __GNUC__ -#define ZS_ATOMIC_BITMAP_SPARC -#elif defined ZS_HAVE_WINDOWS -#define ZS_ATOMIC_BITMAP_WINDOWS -#elif defined ZS_HAVE_SOLARIS -#define ZS_ATOMIC_BITMAP_SOLARIS +#define ZMQ_ATOMIC_BITMAP_SPARC +#elif defined ZMQ_HAVE_WINDOWS +#define ZMQ_ATOMIC_BITMAP_WINDOWS +#elif defined ZMQ_HAVE_SOLARIS +#define ZMQ_ATOMIC_BITMAP_SOLARIS #else -#define ZS_ATOMIC_BITMAP_MUTEX +#define ZMQ_ATOMIC_BITMAP_MUTEX #endif -#if defined ZS_ATOMIC_BITMAP_MUTEX +#if defined ZMQ_ATOMIC_BITMAP_MUTEX #include "mutex.hpp" -#elif defined ZS_ATOMIC_BITMAP_WINDOWS +#elif defined ZMQ_ATOMIC_BITMAP_WINDOWS #include "windows.hpp" -#elif defined ZS_ATOMIC_BITMAP_SOLARIS +#elif defined ZMQ_ATOMIC_BITMAP_SOLARIS #include #endif -namespace zs +namespace zmq { // This class encapuslates several bitwise atomic operations on unsigned @@ -79,7 +79,7 @@ namespace zs // another one. Returns the original value of the reset bit. inline bool btsr (int set_index_, int reset_index_) { -#if defined ZS_ATOMIC_BITMAP_WINDOWS +#if defined ZMQ_ATOMIC_BITMAP_WINDOWS while (true) { bitmap_t oldval = value; bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) & @@ -89,7 +89,7 @@ namespace zs return (oldval & (bitmap_t (1) << reset_index_)) ? true : false; } -#elif defined ZS_ATOMIC_BITMAP_SOLARIS +#elif defined ZMQ_ATOMIC_BITMAP_SOLARIS while (true) { bitmap_t oldval = value; bitmap_t newval = (oldval | (bitmap_t (1) << set_index_)) & @@ -98,7 +98,7 @@ namespace zs return (oldval & (bitmap_t (1) << reset_index_)) ? true : false; } -#elif defined ZS_ATOMIC_BITMAP_X86 +#elif defined ZMQ_ATOMIC_BITMAP_X86 bitmap_t oldval, dummy; __asm__ volatile ( "mov %0, %1\n\t" @@ -112,7 +112,7 @@ namespace zs : "r" (bitmap_t(set_index_)), "r" (bitmap_t(reset_index_)) : "cc"); return (bool) (oldval & (bitmap_t(1) << reset_index_)); -#elif defined ZS_ATOMIC_BITMAP_SPARC +#elif defined ZMQ_ATOMIC_BITMAP_SPARC volatile bitmap_t* valptr = &value; bitmap_t set_val = bitmap_t(1) << set_index_; bitmap_t reset_val = ~(bitmap_t(1) << reset_index_); @@ -132,7 +132,7 @@ namespace zs : "r" (valptr) : "cc"); return oldval; -#elif defined ZS_ATOMIC_BITMAP_MUTEX +#elif defined ZMQ_ATOMIC_BITMAP_MUTEX sync.lock (); bitmap_t oldval = value; value = (oldval | (bitmap_t (1) << set_index_)) & @@ -148,18 +148,18 @@ namespace zs inline bitmap_t xchg (bitmap_t newval_) { bitmap_t oldval; -#if defined ZS_ATOMIC_BITMAP_WINDOWS +#if defined ZMQ_ATOMIC_BITMAP_WINDOWS oldval = InterlockedExchange ((volatile LONG*) &value, newval_); -#elif defined ZS_ATOMIC_BITMAP_SOLARIS +#elif defined ZMQ_ATOMIC_BITMAP_SOLARIS oldval = atomic_swap_32 (&value, newval_); -#elif defined ZS_ATOMIC_BITMAP_X86 +#elif defined ZMQ_ATOMIC_BITMAP_X86 oldval = newval_; __asm__ volatile ( "lock; xchg %0, %1" : "=r" (oldval) : "m" (value), "0" (oldval) : "memory"); -#elif defined ZS_ATOMIC_BITMAP_SPARC +#elif defined ZMQ_ATOMIC_BITMAP_SPARC oldval = value; volatile bitmap_t* ptrin = &value; bitmap_t tmp; @@ -176,7 +176,7 @@ namespace zs : "r" (ptrin) : "cc"); return prev; -#elif defined ZS_ATOMIC_BITMAP_MUTEX +#elif defined ZMQ_ATOMIC_BITMAP_MUTEX sync.lock (); oldval = value; value = newval_; @@ -193,7 +193,7 @@ namespace zs inline bitmap_t izte (bitmap_t thenval_, bitmap_t elseval_) { -#if defined ZS_ATOMIC_BITMAP_WINDOWS +#if defined ZMQ_ATOMIC_BITMAP_WINDOWS while (true) { bitmap_t oldval = value; bitmap_t newval = oldval == 0 ? thenval_ : elseval_; @@ -201,14 +201,14 @@ namespace zs newval, oldval) == (LONG) oldval) return oldval; } -#elif defined ZS_ATOMIC_BITMAP_SOLARIS +#elif defined ZMQ_ATOMIC_BITMAP_SOLARIS while (true) { bitmap_t oldval = value; bitmap_t newval = oldval == 0 ? thenval_ : elseval_; if (atomic_cas_32 (&value, oldval, newval) == oldval) return oldval; } -#elif defined ZS_ATOMIC_BITMAP_X86 +#elif defined ZMQ_ATOMIC_BITMAP_X86 bitmap_t oldval; bitmap_t dummy; __asm__ volatile ( @@ -225,7 +225,7 @@ namespace zs : "r" (thenval_), "r" (elseval_) : "cc"); return oldval; -#elif defined ZS_ATOMIC_BITMAP_SPARC +#elif defined ZMQ_ATOMIC_BITMAP_SPARC volatile bitmap_t* ptrin = &value; bitmap_t tmp; bitmap_t prev; @@ -242,7 +242,7 @@ namespace zs : "r" (ptrin), "r" (thenval_), "r" (elseval_) : "cc"); return prev; -#elif defined ZS_ATOMIC_BITMAP_MUTEX +#elif defined ZMQ_ATOMIC_BITMAP_MUTEX sync.lock (); bitmap_t oldval = value; value = oldval ? elseval_ : thenval_; @@ -256,7 +256,7 @@ namespace zs private: volatile bitmap_t value; -#if defined ZS_ATOMIC_BITMAP_MUTEX +#if defined ZMQ_ATOMIC_BITMAP_MUTEX mutex_t sync; #endif @@ -267,20 +267,20 @@ namespace zs } // Remove macros local to this file. -#if defined ZS_ATOMIC_BITMAP_WINDOWS -#undef ZS_ATOMIC_BITMAP_WINDOWS +#if defined ZMQ_ATOMIC_BITMAP_WINDOWS +#undef ZMQ_ATOMIC_BITMAP_WINDOWS #endif -#if defined ZS_ATOMIC_BITMAP_SOLARIS -#undef ZS_ATOMIC_BITMAP_SOLARIS +#if defined ZMQ_ATOMIC_BITMAP_SOLARIS +#undef ZMQ_ATOMIC_BITMAP_SOLARIS #endif -#if defined ZS_ATOMIC_BITMAP_X86 -#undef ZS_ATOMIC_BITMAP_X86 +#if defined ZMQ_ATOMIC_BITMAP_X86 +#undef ZMQ_ATOMIC_BITMAP_X86 #endif -#if defined ZS_ATOMIC_BITMAP_SPARC -#undef ZS_ATOMIC_BITMAP_SPARC +#if defined ZMQ_ATOMIC_BITMAP_SPARC +#undef ZMQ_ATOMIC_BITMAP_SPARC #endif -#if defined ZS_ATOMIC_BITMAP_MUTEX -#undef ZS_ATOMIC_BITMAP_MUTEX +#if defined ZMQ_ATOMIC_BITMAP_MUTEX +#undef ZMQ_ATOMIC_BITMAP_MUTEX #endif #endif diff --git a/src/atomic_counter.hpp b/src/atomic_counter.hpp index 0873fdd..305aa59 100644 --- a/src/atomic_counter.hpp +++ b/src/atomic_counter.hpp @@ -18,35 +18,35 @@ */ -#ifndef __ZS_ATOMIC_COUNTER_HPP_INCLUDED__ -#define __ZS_ATOMIC_COUNTER_HPP_INCLUDED__ +#ifndef __ZMQ_ATOMIC_COUNTER_HPP_INCLUDED__ +#define __ZMQ_ATOMIC_COUNTER_HPP_INCLUDED__ #include "stdint.hpp" #include "platform.hpp" -#if defined ZS_FORCE_MUTEXES -#define ZS_ATOMIC_COUNTER_MUTEX +#if defined ZMQ_FORCE_MUTEXES +#define ZMQ_ATOMIC_COUNTER_MUTEX #elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ -#define ZS_ATOMIC_COUNTER_X86 +#define ZMQ_ATOMIC_COUNTER_X86 #elif 0 && defined __sparc__ && defined __GNUC__ -#define ZS_ATOMIC_COUNTER_SPARC -#elif defined ZS_HAVE_WINDOWS -#define ZS_ATOMIC_COUNTER_WINDOWS -#elif defined ZS_HAVE_SOLARIS -#define ZS_ATOMIC_COUNTER_SOLARIS +#define ZMQ_ATOMIC_COUNTER_SPARC +#elif defined ZMQ_HAVE_WINDOWS +#define ZMQ_ATOMIC_COUNTER_WINDOWS +#elif defined ZMQ_HAVE_SOLARIS +#define ZMQ_ATOMIC_COUNTER_SOLARIS #else -#define ZS_ATOMIC_COUNTER_MUTEX +#define ZMQ_ATOMIC_COUNTER_MUTEX #endif -#if defined ZS_ATOMIC_COUNTER_MUTEX +#if defined ZMQ_ATOMIC_COUNTER_MUTEX #include "mutex.hpp" -#elif defined ZS_ATOMIC_COUNTER_WINDOWS +#elif defined ZMQ_ATOMIC_COUNTER_WINDOWS #include "windows.hpp" -#elif defined ZS_ATOMIC_COUNTER_SOLARIS +#elif defined ZMQ_ATOMIC_COUNTER_SOLARIS #include #endif -namespace zs +namespace zmq { // This class represents an integer that can be incremented/decremented @@ -78,18 +78,18 @@ namespace zs { integer_t old_value; -#if defined ZS_ATOMIC_COUNTER_WINDOWS +#if defined ZMQ_ATOMIC_COUNTER_WINDOWS old_value = InterlockedExchangeAdd ((LONG*) &value, increment_); -#elif defined ZS_ATOMIC_COUNTER_SOLARIS +#elif defined ZMQ_ATOMIC_COUNTER_SOLARIS integer_t new_value = atomic_add_32_nv (&value, increment_); old_value = new_value - increment_; -#elif defined ZS_ATOMIC_COUNTER_X86 +#elif defined ZMQ_ATOMIC_COUNTER_X86 __asm__ volatile ( "lock; xadd %0, %1 \n\t" : "=r" (old_value), "=m" (value) : "0" (increment_), "m" (value) : "cc", "memory"); -#elif defined ZS_ATOMIC_COUNTER_SPARC +#elif defined ZMQ_ATOMIC_COUNTER_SPARC integer_t tmp; __asm__ volatile ( "ld [%4], %0 \n\t" @@ -102,7 +102,7 @@ namespace zs : "=&r" (old_value), "=&r" (tmp), "=m" (value) : "r" (increment_), "r" (&value) : "cc", "memory"); -#elif defined ZS_ATOMIC_COUNTER_MUTEX +#elif defined ZMQ_ATOMIC_COUNTER_MUTEX sync.lock (); old_value = value; value += increment_; @@ -116,15 +116,15 @@ namespace zs // Atomic subtraction. Returns false if the counter drops to zero. inline bool sub (integer_t decrement) { -#if defined ZS_ATOMIC_COUNTER_WINDOWS +#if defined ZMQ_ATOMIC_COUNTER_WINDOWS LONG delta = - ((LONG) decrement); integer_t old = InterlockedExchangeAdd ((LONG*) &value, delta); return old - decrement != 0; -#elif defined ZS_ATOMIC_COUNTER_SOLARIS +#elif defined ZMQ_ATOMIC_COUNTER_SOLARIS int32_t delta = - ((int32_t) decrement); integer_t nv = atomic_add_32_nv (&value, delta); return nv != 0; -#elif defined ZS_ATOMIC_COUNTER_X86 +#elif defined ZMQ_ATOMIC_COUNTER_X86 integer_t oldval = -decrement; volatile integer_t *val = &value; __asm__ volatile ("lock; xaddl %0,%1" @@ -132,7 +132,7 @@ namespace zs : "0" (oldval), "m" (*val) : "cc"); return oldval != decrement; -#elif defined ZS_ATOMIC_COUNTER_SPARC +#elif defined ZMQ_ATOMIC_COUNTER_SPARC volatile integer_t *val = &value; integer_t tmp; integer_t result; @@ -148,7 +148,7 @@ namespace zs : "r" (val) : "cc"); return result <= decrement; -#elif defined ZS_ATOMIC_COUNTER_MUTEX +#elif defined ZMQ_ATOMIC_COUNTER_MUTEX sync.lock (); value -= decrement; bool result = value ? true : false; @@ -167,7 +167,7 @@ namespace zs private: volatile integer_t value; -#if defined ZS_ATOMIC_COUNTER_MUTEX +#if defined ZMQ_ATOMIC_COUNTER_MUTEX mutex_t sync; #endif @@ -178,20 +178,20 @@ namespace zs } // Remove macros local to this file. -#if defined ZS_ATOMIC_COUNTER_WINDOWS -#undef ZS_ATOMIC_COUNTER_WINDOWS +#if defined ZMQ_ATOMIC_COUNTER_WINDOWS +#undef ZMQ_ATOMIC_COUNTER_WINDOWS #endif -#if defined ZS_ATOMIC_COUNTER_SOLARIS -#undef ZS_ATOMIC_COUNTER_SOLARIS +#if defined ZMQ_ATOMIC_COUNTER_SOLARIS +#undef ZMQ_ATOMIC_COUNTER_SOLARIS #endif -#if defined ZS_ATOMIC_COUNTER_X86 -#undef ZS_ATOMIC_COUNTER_X86 +#if defined ZMQ_ATOMIC_COUNTER_X86 +#undef ZMQ_ATOMIC_COUNTER_X86 #endif -#if defined ZS_ATOMIC_COUNTER_SPARC -#undef ZS_ATOMIC_COUNTER_SPARC +#if defined ZMQ_ATOMIC_COUNTER_SPARC +#undef ZMQ_ATOMIC_COUNTER_SPARC #endif -#if defined ZS_ATOMIC_COUNTER_MUTEX -#undef ZS_ATOMIC_COUNTER_MUTEX +#if defined ZMQ_ATOMIC_COUNTER_MUTEX +#undef ZMQ_ATOMIC_COUNTER_MUTEX #endif #endif diff --git a/src/atomic_ptr.hpp b/src/atomic_ptr.hpp index fcc4e73..f96782f 100644 --- a/src/atomic_ptr.hpp +++ b/src/atomic_ptr.hpp @@ -18,34 +18,34 @@ */ -#ifndef __ZS_ATOMIC_PTR_HPP_INCLUDED__ -#define __ZS_ATOMIC_PTR_HPP_INCLUDED__ +#ifndef __ZMQ_ATOMIC_PTR_HPP_INCLUDED__ +#define __ZMQ_ATOMIC_PTR_HPP_INCLUDED__ #include "platform.hpp" -#if defined ZS_FORCE_MUTEXES -#define ZS_ATOMIC_PTR_MUTEX +#if defined ZMQ_FORCE_MUTEXES +#define ZMQ_ATOMIC_PTR_MUTEX #elif (defined __i386__ || defined __x86_64__) && defined __GNUC__ -#define ZS_ATOMIC_PTR_X86 +#define ZMQ_ATOMIC_PTR_X86 #elif 0 && defined __sparc__ && defined __GNUC__ -#define ZS_ATOMIC_PTR_SPARC -#elif defined ZS_HAVE_WINDOWS -#define ZS_ATOMIC_PTR_WINDOWS -#elif defined ZS_HAVE_SOLARIS -#define ZS_ATOMIC_PTR_SOLARIS +#define ZMQ_ATOMIC_PTR_SPARC +#elif defined ZMQ_HAVE_WINDOWS +#define ZMQ_ATOMIC_PTR_WINDOWS +#elif defined ZMQ_HAVE_SOLARIS +#define ZMQ_ATOMIC_PTR_SOLARIS #else -#define ZS_ATOMIC_PTR_MUTEX +#define ZMQ_ATOMIC_PTR_MUTEX #endif -#if defined ZS_ATOMIC_PTR_MUTEX +#if defined ZMQ_ATOMIC_PTR_MUTEX #include "mutex.hpp" -#elif defined ZS_ATOMIC_PTR_WINDOWS +#elif defined ZMQ_ATOMIC_PTR_WINDOWS #include "windows.hpp" -#elif defined ZS_ATOMIC_PTR_SOLARIS +#elif defined ZMQ_ATOMIC_PTR_SOLARIS #include #endif -namespace zs +namespace zmq { // This class encapsulates several atomic operations on pointers. @@ -77,18 +77,18 @@ namespace zs // to the 'val' value. Old value is returned. inline T *xchg (T *val_) { -#if defined ZS_ATOMIC_PTR_WINDOWS +#if defined ZMQ_ATOMIC_PTR_WINDOWS return (T*) InterlockedExchangePointer (&ptr, val_); -#elif defined ZS_ATOMIC_PTR_SOLARIS +#elif defined ZMQ_ATOMIC_PTR_SOLARIS return (T*) atomic_swap_ptr (&ptr, val_); -#elif defined ZS_ATOMIC_PTR_X86 +#elif defined ZMQ_ATOMIC_PTR_X86 T *old; __asm__ volatile ( "lock; xchg %0, %2" : "=r" (old), "=m" (ptr) : "m" (ptr), "0" (val_)); return old; -#elif defined ZS_ATOMIC_PTR_SPARC +#elif defined ZMQ_ATOMIC_PTR_SPARC T* newptr = val_; volatile T** ptrin = &ptr; T* tmp; @@ -105,7 +105,7 @@ namespace zs : "r" (ptrin) : "cc"); return prev; -#elif defined ZS_ATOMIC_PTR_MUTEX +#elif defined ZMQ_ATOMIC_PTR_MUTEX sync.lock (); T *old = (T*) ptr; ptr = val_; @@ -122,12 +122,12 @@ namespace zs // is returned. inline T *cas (T *cmp_, T *val_) { -#if defined ZS_ATOMIC_PTR_WINDOWS +#if defined ZMQ_ATOMIC_PTR_WINDOWS return (T*) InterlockedCompareExchangePointer ( (volatile PVOID*) &ptr, val_, cmp_); -#elif defined ZS_ATOMIC_PTR_SOLARIS +#elif defined ZMQ_ATOMIC_PTR_SOLARIS return (T*) atomic_cas_ptr (&ptr, cmp_, val_); -#elif defined ZS_ATOMIC_PTR_X86 +#elif defined ZMQ_ATOMIC_PTR_X86 T *old; __asm__ volatile ( "lock; cmpxchg %2, %3" @@ -135,7 +135,7 @@ namespace zs : "r" (val_), "m" (ptr), "0" (cmp_) : "cc"); return old; -#elif defined ZS_ATOMIC_PTR_SPARC +#elif defined ZMQ_ATOMIC_PTR_SPARC volatile T** ptrin = &ptr; volatile T* prev = ptr; __asm__ __volatile__( @@ -144,7 +144,7 @@ namespace zs : "r" (cmp_), "r" (val_), "r" (ptrin) : "cc"); return prev; -#elif defined ZS_ATOMIC_PTR_MUTEX +#elif defined ZMQ_ATOMIC_PTR_MUTEX sync.lock (); T *old = (T*) ptr; if (ptr == cmp_) @@ -159,7 +159,7 @@ namespace zs private: volatile T *ptr; -#if defined ZS_ATOMIC_PTR_MUTEX +#if defined ZMQ_ATOMIC_PTR_MUTEX mutex_t sync; #endif @@ -170,20 +170,20 @@ namespace zs } // Remove macros local to this file. -#if defined ZS_ATOMIC_PTR_WINDOWS -#undef ZS_ATOMIC_PTR_WINDOWS +#if defined ZMQ_ATOMIC_PTR_WINDOWS +#undef ZMQ_ATOMIC_PTR_WINDOWS #endif -#if defined ZS_ATOMIC_PTR_SOLARIS -#undef ZS_ATOMIC_PTR_SOLARIS +#if defined ZMQ_ATOMIC_PTR_SOLARIS +#undef ZMQ_ATOMIC_PTR_SOLARIS #endif -#if defined ZS_ATOMIC_PTR_X86 -#undef ZS_ATOMIC_PTR_X86 +#if defined ZMQ_ATOMIC_PTR_X86 +#undef ZMQ_ATOMIC_PTR_X86 #endif -#if defined ZS_ATOMIC_PTR_SPARC -#undef ZS_ATOMIC_PTR_SPARC +#if defined ZMQ_ATOMIC_PTR_SPARC +#undef ZMQ_ATOMIC_PTR_SPARC #endif -#if defined ZS_ATOMIC_PTR_MUTEX -#undef ZS_ATOMIC_PTR_MUTEX +#if defined ZMQ_ATOMIC_PTR_MUTEX +#undef ZMQ_ATOMIC_PTR_MUTEX #endif #endif diff --git a/src/command.hpp b/src/command.hpp index 0553137..69c4e57 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_COMMAND_HPP_INCLUDED__ -#define __ZS_COMMAND_HPP_INCLUDED__ +#ifndef __ZMQ_COMMAND_HPP_INCLUDED__ +#define __ZMQ_COMMAND_HPP_INCLUDED__ #include "stdint.hpp" -namespace zs +namespace zmq { // This structure defines the commands that can be sent between threads. diff --git a/src/config.hpp b/src/config.hpp index a0569ea..88b93d7 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -17,10 +17,10 @@ along with this program. If not, see . */ -#ifndef __ZS_CONFIG_HPP_INCLUDED__ -#define __ZS_CONFIG_HPP_INCLUDED__ +#ifndef __ZMQ_CONFIG_HPP_INCLUDED__ +#define __ZMQ_CONFIG_HPP_INCLUDED__ -namespace zs +namespace zmq { // Compile-time settings. diff --git a/src/connecter.cpp b/src/connecter.cpp index a21dde3..970dcf7 100644 --- a/src/connecter.cpp +++ b/src/connecter.cpp @@ -24,7 +24,7 @@ #include "simple_semaphore.hpp" #include "zmq_tcp_engine.hpp" -zs::connecter_t::connecter_t (io_thread_t *thread_, const char *addr_, +zmq::connecter_t::connecter_t (io_thread_t *thread_, const char *addr_, session_t *session_) : io_object_t (thread_), state (idle), @@ -36,24 +36,24 @@ zs::connecter_t::connecter_t (io_thread_t *thread_, const char *addr_, { } -void zs::connecter_t::terminate () +void zmq::connecter_t::terminate () { delete this; } -void zs::connecter_t::shutdown () +void zmq::connecter_t::shutdown () { delete this; } -zs::connecter_t::~connecter_t () +zmq::connecter_t::~connecter_t () { } -void zs::connecter_t::process_reg (simple_semaphore_t *smph_) +void zmq::connecter_t::process_reg (simple_semaphore_t *smph_) { // Fet poller pointer for further use. - zs_assert (!poller); + zmq_assert (!poller); poller = get_poller (); // Ask the session to register itself with the I/O thread. Note that @@ -71,10 +71,10 @@ void zs::connecter_t::process_reg (simple_semaphore_t *smph_) timer_event (); } -void zs::connecter_t::process_unreg (simple_semaphore_t *smph_) +void zmq::connecter_t::process_unreg (simple_semaphore_t *smph_) { // Unregister connecter/engine from the poller. - zs_assert (poller); + zmq_assert (poller); if (state == connecting) poller->rm_fd (handle); else if (state == waiting) @@ -87,22 +87,22 @@ void zs::connecter_t::process_unreg (simple_semaphore_t *smph_) smph_->post (); } -void zs::connecter_t::in_event () +void zmq::connecter_t::in_event () { // Error occured in asynchronous connect. Retry to connect after a while. if (state == connecting) { fd_t fd = tcp_connecter.connect (); - zs_assert (fd == retired_fd); + zmq_assert (fd == retired_fd); poller->rm_fd (handle); poller->add_timer (this); state = waiting; return; } - zs_assert (false); + zmq_assert (false); } -void zs::connecter_t::out_event () +void zmq::connecter_t::out_event () { if (state == connecting) { @@ -116,18 +116,18 @@ void zs::connecter_t::out_event () poller->rm_fd (handle); engine = new zmq_tcp_engine_t (fd); - zs_assert (engine); + zmq_assert (engine); engine->attach (poller, this); state = sending; return; } - zs_assert (false); + zmq_assert (false); } -void zs::connecter_t::timer_event () +void zmq::connecter_t::timer_event () { - zs_assert (state == waiting); + zmq_assert (state == waiting); // Initiate async connect and start polling for its completion. If async // connect fails instantly, try to reconnect after a while. @@ -147,21 +147,21 @@ void zs::connecter_t::timer_event () } } -void zs::connecter_t::set_engine (struct i_engine *engine_) +void zmq::connecter_t::set_engine (struct i_engine *engine_) { engine = engine_; } -bool zs::connecter_t::read (zs_msg *msg_) +bool zmq::connecter_t::read (zmq_msg *msg_) { - zs_assert (state == sending); + zmq_assert (state == sending); // Deallocate old content of the message just in case. - zs_msg_close (msg_); + zmq_msg_close (msg_); // Send the identity. - zs_msg_init_size (msg_, identity.size ()); - memcpy (zs_msg_data (msg_), identity.c_str (), identity.size ()); + zmq_msg_init_size (msg_, identity.size ()); + memcpy (zmq_msg_data (msg_), identity.c_str (), identity.size ()); // Ask engine to unregister from the poller. i_engine *e = engine; @@ -177,13 +177,13 @@ bool zs::connecter_t::read (zs_msg *msg_) return true; } -bool zs::connecter_t::write (struct zs_msg *msg_) +bool zmq::connecter_t::write (struct zmq_msg *msg_) { // No incoming messages are accepted till identity is sent. return false; } -void zs::connecter_t::flush () +void zmq::connecter_t::flush () { // No incoming messages are accepted till identity is sent. } diff --git a/src/connecter.hpp b/src/connecter.hpp index 91dbf17..1f11c63 100644 --- a/src/connecter.hpp +++ b/src/connecter.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_CONNECTER_HPP_INCLUDED__ -#define __ZS_CONNECTER_HPP_INCLUDED__ +#ifndef __ZMQ_CONNECTER_HPP_INCLUDED__ +#define __ZMQ_CONNECTER_HPP_INCLUDED__ #include -#include "../include/zs.h" +#include "../include/zmq.h" #include "i_poller.hpp" #include "io_object.hpp" @@ -30,7 +30,7 @@ #include "i_session.hpp" #include "tcp_connecter.hpp" -namespace zs +namespace zmq { class connecter_t : public io_object_t, public i_poll_events, @@ -55,8 +55,8 @@ namespace zs // i_session implementation void set_engine (struct i_engine *engine_); // void shutdown (); - bool read (struct zs_msg *msg_); - bool write (struct zs_msg *msg_); + bool read (struct zmq_msg *msg_); + bool write (struct zmq_msg *msg_); void flush (); private: diff --git a/src/data_distributor.cpp b/src/data_distributor.cpp index 8f89c46..971edce 100644 --- a/src/data_distributor.cpp +++ b/src/data_distributor.cpp @@ -17,7 +17,7 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "data_distributor.hpp" #include "pipe_writer.hpp" @@ -25,25 +25,25 @@ #include "session.hpp" #include "msg.hpp" -zs::data_distributor_t::data_distributor_t () : +zmq::data_distributor_t::data_distributor_t () : session (NULL) { } -void zs::data_distributor_t::set_session (session_t *session_) +void zmq::data_distributor_t::set_session (session_t *session_) { - zs_assert (!session); + zmq_assert (!session); session = session_; } -void zs::data_distributor_t::shutdown () +void zmq::data_distributor_t::shutdown () { // No need to deallocate pipes here. They'll be deallocated during the // shutdown of the dispatcher. delete this; } -void zs::data_distributor_t::terminate () +void zmq::data_distributor_t::terminate () { // Pipe unregisters itself during the call to terminate, so the pipes // list shinks by one in each iteration. @@ -53,11 +53,11 @@ void zs::data_distributor_t::terminate () delete this; } -zs::data_distributor_t::~data_distributor_t () +zmq::data_distributor_t::~data_distributor_t () { } -void zs::data_distributor_t::attach_pipe (pipe_writer_t *pipe_) +void zmq::data_distributor_t::attach_pipe (pipe_writer_t *pipe_) { // Associate demux with a new pipe. pipe_->set_demux (this); @@ -65,7 +65,7 @@ void zs::data_distributor_t::attach_pipe (pipe_writer_t *pipe_) pipes.push_back (pipe_); } -void zs::data_distributor_t::detach_pipe (pipe_writer_t *pipe_) +void zmq::data_distributor_t::detach_pipe (pipe_writer_t *pipe_) { // Release the reference to the pipe. int index = pipe_->get_index (); @@ -75,19 +75,19 @@ void zs::data_distributor_t::detach_pipe (pipe_writer_t *pipe_) pipes.pop_back (); } -bool zs::data_distributor_t::empty () +bool zmq::data_distributor_t::empty () { return pipes.empty (); } -bool zs::data_distributor_t::send (zs_msg *msg_) +bool zmq::data_distributor_t::send (zmq_msg *msg_) { int pipes_count = pipes.size (); // If there are no pipes available, simply drop the message. if (pipes_count == 0) { - zs_msg_close (msg_); - zs_msg_init (msg_); + zmq_msg_close (msg_); + zmq_msg_init (msg_); return true; } @@ -98,10 +98,10 @@ bool zs::data_distributor_t::send (zs_msg *msg_) // return false; // For VSMs the copying is straighforward. - if (msg_->content == (zs_msg_content*) ZS_VSM) { + if (msg_->content == (zmq_msg_content*) ZMQ_VSM) { for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++) write_to_pipe (*it, msg_); - zs_msg_init (msg_); + zmq_msg_init (msg_); return true; } @@ -110,7 +110,7 @@ bool zs::data_distributor_t::send (zs_msg *msg_) // operations) needed. if (pipes_count == 1) { write_to_pipe (*pipes.begin (), msg_); - zs_msg_init (msg_); + zmq_msg_init (msg_); return true; } @@ -130,12 +130,12 @@ bool zs::data_distributor_t::send (zs_msg *msg_) write_to_pipe (*it, msg_); // Detach the original message from the data buffer. - zs_msg_init (msg_); + zmq_msg_init (msg_); return true; } -void zs::data_distributor_t::flush () +void zmq::data_distributor_t::flush () { // Flush all pipes. If there's large number of pipes, it can be pretty // inefficient (especially if there's new message only in a single pipe). @@ -144,12 +144,12 @@ void zs::data_distributor_t::flush () (*it)->flush (); } -void zs::data_distributor_t::write_to_pipe (class pipe_writer_t *pipe_, - struct zs_msg *msg_) +void zmq::data_distributor_t::write_to_pipe (class pipe_writer_t *pipe_, + struct zmq_msg *msg_) { if (!pipe_->write (msg_)) { // TODO: Push gap notification to the pipe. - zs_assert (false); + zmq_assert (false); } } diff --git a/src/data_distributor.hpp b/src/data_distributor.hpp index 239de31..5bde2e8 100644 --- a/src/data_distributor.hpp +++ b/src/data_distributor.hpp @@ -17,14 +17,14 @@ along with this program. If not, see . */ -#ifndef __ZS_DATA_DISTRIBUTOR_HPP_INCLUDED__ -#define __ZS_DATA_DISTRIBUTOR_HPP_INCLUDED__ +#ifndef __ZMQ_DATA_DISTRIBUTOR_HPP_INCLUDED__ +#define __ZMQ_DATA_DISTRIBUTOR_HPP_INCLUDED__ #include #include -namespace zs +namespace zmq { // Object to distribute messages to outbound pipes. @@ -42,7 +42,7 @@ namespace zs void attach_pipe (class pipe_writer_t *pipe_); void detach_pipe (class pipe_writer_t *pipe_); bool empty (); - bool send (struct zs_msg *msg_); + bool send (struct zmq_msg *msg_); void flush (); private: @@ -55,7 +55,7 @@ namespace zs // Writes the message to the pipe if possible. If it isn't, writes // a gap notification to the pipe. - void write_to_pipe (class pipe_writer_t *pipe_, struct zs_msg *msg_); + void write_to_pipe (class pipe_writer_t *pipe_, struct zmq_msg *msg_); // The list of outbound pipes. typedef std::vector pipes_t; diff --git a/src/decoder.hpp b/src/decoder.hpp index c643df8..897f410 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -17,14 +17,14 @@ along with this program. If not, see . */ -#ifndef __ZS_DECODER_HPP_INCLUDED__ -#define __ZS_DECODER_HPP_INCLUDED__ +#ifndef __ZMQ_DECODER_HPP_INCLUDED__ +#define __ZMQ_DECODER_HPP_INCLUDED__ #include #include #include -namespace zs +namespace zmq { // Helper base class for decoders that know the amount of data to read diff --git a/src/devpoll.cpp b/src/devpoll.cpp index 6e3a8c1..8fb0877 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -19,7 +19,7 @@ #include "platform.hpp" -#if defined ZS_HAVE_SOLARIS || defined ZS_HAVE_HPUX +#if defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_HPUX #include #include @@ -35,7 +35,7 @@ #include "err.hpp" #include "config.hpp" -zs::devpoll_t::devpoll_t () +zmq::devpoll_t::devpoll_t () { // Get limit on open files struct rlimit rl; @@ -50,19 +50,19 @@ zs::devpoll_t::devpoll_t () errno_assert (devpoll_fd != -1); } -zs::devpoll_t::~devpoll_t () +zmq::devpoll_t::~devpoll_t () { close (devpoll_fd); } -void zs::devpoll_t::devpoll_ctl (fd_t fd_, short events_) +void zmq::devpoll_t::devpoll_ctl (fd_t fd_, short events_) { struct pollfd pfd = {fd_, events_, 0}; ssize_t rc = write (devpoll_fd, &pfd, sizeof pfd); - zs_assert (rc == sizeof pfd); + zmq_assert (rc == sizeof pfd); } -zs::handle_t zs::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_) +zmq::handle_t zmq::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_) { assert (!fd_table [fd_].valid); @@ -82,7 +82,7 @@ zs::handle_t zs::devpoll_t::add_fd (fd_t fd_, i_poll_events *reactor_) return handle; } -void zs::devpoll_t::rm_fd (handle_t handle_) +void zmq::devpoll_t::rm_fd (handle_t handle_) { assert (fd_table [handle_.fd].valid); @@ -93,7 +93,7 @@ void zs::devpoll_t::rm_fd (handle_t handle_) load.sub (1); } -void zs::devpoll_t::set_pollin (handle_t handle_) +void zmq::devpoll_t::set_pollin (handle_t handle_) { fd_t fd = handle_.fd; devpoll_ctl (fd, POLLREMOVE); @@ -101,7 +101,7 @@ void zs::devpoll_t::set_pollin (handle_t handle_) devpoll_ctl (fd, fd_table [fd].events); } -void zs::devpoll_t::reset_pollin (handle_t handle_) +void zmq::devpoll_t::reset_pollin (handle_t handle_) { fd_t fd = handle_.fd; devpoll_ctl (fd, POLLREMOVE); @@ -109,7 +109,7 @@ void zs::devpoll_t::reset_pollin (handle_t handle_) devpoll_ctl (fd, fd_table [fd].events); } -void zs::devpoll_t::set_pollout (handle_t handle_) +void zmq::devpoll_t::set_pollout (handle_t handle_) { fd_t fd = handle_.fd; devpoll_ctl (fd, POLLREMOVE); @@ -117,7 +117,7 @@ void zs::devpoll_t::set_pollout (handle_t handle_) devpoll_ctl (fd, fd_table [fd].events); } -void zs::devpoll_t::reset_pollout (handle_t handle_) +void zmq::devpoll_t::reset_pollout (handle_t handle_) { fd_t fd = handle_.fd; devpoll_ctl (fd, POLLREMOVE); @@ -125,39 +125,39 @@ void zs::devpoll_t::reset_pollout (handle_t handle_) devpoll_ctl (fd, fd_table [fd].events); } -void zs::devpoll_t::add_timer (i_poll_events *events_) +void zmq::devpoll_t::add_timer (i_poll_events *events_) { timers.push_back (events_); } -void zs::devpoll_t::cancel_timer (i_poll_events *events_) +void zmq::devpoll_t::cancel_timer (i_poll_events *events_) { timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); if (it != timers.end ()) timers.erase (it); } -int zs::devpoll_t::get_load () +int zmq::devpoll_t::get_load () { return load.get (); } -void zs::devpoll_t::start () +void zmq::devpoll_t::start () { worker.start (worker_routine, this); } -void zs::devpoll_t::stop () +void zmq::devpoll_t::stop () { stopping = true; } -void zs::devpoll_t::join () +void zmq::devpoll_t::join () { worker.stop (); } -bool zs::devpoll_t::loop () +bool zmq::devpoll_t::loop () { // According to the poll(7d) man page, we can retrieve // no more then (OPEN_MAX - 1) events. @@ -216,7 +216,7 @@ bool zs::devpoll_t::loop () } } -void zs::devpoll_t::worker_routine (void *arg_) +void zmq::devpoll_t::worker_routine (void *arg_) { ((devpoll_t*) arg_)->loop (); } diff --git a/src/devpoll.hpp b/src/devpoll.hpp index 56b3b25..28274c0 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_DEVPOLL_HPP_INCLUDED__ -#define __ZS_DEVPOLL_HPP_INCLUDED__ +#ifndef __ZMQ_DEVPOLL_HPP_INCLUDED__ +#define __ZMQ_DEVPOLL_HPP_INCLUDED__ #include "platform.hpp" -#if defined ZS_HAVE_SOLARIS || ZS_HAVE_HPUX +#if defined ZMQ_HAVE_SOLARIS || ZMQ_HAVE_HPUX #include @@ -31,7 +31,7 @@ #include "thread.hpp" #include "atomic_counter.hpp" -namespace zs +namespace zmq { // Implements socket polling mechanism using the Solaris-specific diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index c468857..56a5e0b 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -17,7 +17,7 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "dispatcher.hpp" #include "app_thread.hpp" @@ -30,27 +30,27 @@ #include "session.hpp" #include "i_api.hpp" -#if defined ZS_HAVE_WINDOWS +#if defined ZMQ_HAVE_WINDOWS #include "windows.h" #endif -zs::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) +zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) { -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS // Intialise Windows sockets. Note that WSAStartup can be called multiple // times given that WSACleanup will be called for each WSAStartup. WORD version_requested = MAKEWORD (2, 2); WSADATA wsa_data; int rc = WSAStartup (version_requested, &wsa_data); - zs_assert (rc == 0); - zs_assert (LOBYTE (wsa_data.wVersion) == 2 && + zmq_assert (rc == 0); + zmq_assert (LOBYTE (wsa_data.wVersion) == 2 && HIBYTE (wsa_data.wVersion) == 2); #endif // Create application thread proxies. for (int i = 0; i != app_threads_; i++) { app_thread_t *app_thread = new app_thread_t (this, i); - zs_assert (app_thread); + zmq_assert (app_thread); app_threads.push_back (app_thread); signalers.push_back (app_thread->get_signaler ()); } @@ -58,26 +58,26 @@ zs::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) // Create I/O thread objects. for (int i = 0; i != io_threads_; i++) { io_thread_t *io_thread = new io_thread_t (this, i + app_threads_); - zs_assert (io_thread); + zmq_assert (io_thread); io_threads.push_back (io_thread); signalers.push_back (io_thread->get_signaler ()); } // Create command pipe matrix. command_pipes = new command_pipe_t [signalers.size () * signalers.size ()]; - zs_assert (command_pipes); + zmq_assert (command_pipes); // Launch I/O threads. for (int i = 0; i != io_threads_; i++) io_threads [i]->start (); } -void zs::dispatcher_t::shutdown () +void zmq::dispatcher_t::shutdown () { delete this; } -zs::dispatcher_t::~dispatcher_t () +zmq::dispatcher_t::~dispatcher_t () { // Ask I/O threads to terminate. for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) @@ -103,19 +103,19 @@ zs::dispatcher_t::~dispatcher_t () delete it->writer; } -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS // On Windows, uninitialise socket layer. int rc = WSACleanup (); wsa_assert (rc != SOCKET_ERROR); #endif } -int zs::dispatcher_t::thread_slot_count () +int zmq::dispatcher_t::thread_slot_count () { return signalers.size (); } -zs::i_api *zs::dispatcher_t::create_socket (int type_) +zmq::i_api *zmq::dispatcher_t::create_socket (int type_) { threads_sync.lock (); app_thread_t *thread = choose_app_thread (); @@ -128,7 +128,7 @@ zs::i_api *zs::dispatcher_t::create_socket (int type_) return s; } -zs::app_thread_t *zs::dispatcher_t::choose_app_thread () +zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () { // Check whether thread ID is already assigned. If so, return it. for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) @@ -145,9 +145,9 @@ zs::app_thread_t *zs::dispatcher_t::choose_app_thread () return NULL; } -zs::io_thread_t *zs::dispatcher_t::choose_io_thread (uint64_t taskset_) +zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_) { - zs_assert (io_threads.size () > 0); + zmq_assert (io_threads.size () > 0); // Find the I/O thread with minimum load. int min_load = io_threads [0]->get_load (); @@ -165,19 +165,19 @@ zs::io_thread_t *zs::dispatcher_t::choose_io_thread (uint64_t taskset_) return io_threads [result]; } -void zs::dispatcher_t::create_pipe (object_t *reader_parent_, +void zmq::dispatcher_t::create_pipe (object_t *reader_parent_, object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, pipe_reader_t **reader_, pipe_writer_t **writer_) { // Create the pipe, reader & writer triple. pipe_t *pipe = new pipe_t; - zs_assert (pipe); + zmq_assert (pipe); pipe_reader_t *reader = new pipe_reader_t (reader_parent_, pipe, hwm_, lwm_); - zs_assert (reader); + zmq_assert (reader); pipe_writer_t *writer = new pipe_writer_t (writer_parent_, pipe, reader, hwm_, lwm_); - zs_assert (writer); + zmq_assert (writer); reader->set_peer (writer); // Store the pipe in the repository. @@ -191,7 +191,7 @@ void zs::dispatcher_t::create_pipe (object_t *reader_parent_, *writer_ = writer; } -void zs::dispatcher_t::destroy_pipe (pipe_t *pipe_) +void zmq::dispatcher_t::destroy_pipe (pipe_t *pipe_) { // Remove the pipe from the repository. pipe_info_t info; @@ -203,13 +203,13 @@ void zs::dispatcher_t::destroy_pipe (pipe_t *pipe_) pipes_sync.unlock (); // Deallocate the pipe and associated pipe reader & pipe writer. - zs_assert (info.pipe == pipe_); + zmq_assert (info.pipe == pipe_); delete info.pipe; delete info.reader; delete info.writer; } -int zs::dispatcher_t::register_inproc_endpoint (const char *endpoint_, +int zmq::dispatcher_t::register_inproc_endpoint (const char *endpoint_, session_t *session_) { inproc_endpoint_sync.lock (); @@ -227,7 +227,7 @@ int zs::dispatcher_t::register_inproc_endpoint (const char *endpoint_, return 0; } -zs::object_t *zs::dispatcher_t::get_inproc_endpoint (const char *endpoint_) +zmq::object_t *zmq::dispatcher_t::get_inproc_endpoint (const char *endpoint_) { inproc_endpoint_sync.lock (); inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_); @@ -245,7 +245,7 @@ zs::object_t *zs::dispatcher_t::get_inproc_endpoint (const char *endpoint_) return session; } -void zs::dispatcher_t::unregister_inproc_endpoints (session_t *session_) +void zmq::dispatcher_t::unregister_inproc_endpoints (session_t *session_) { inproc_endpoint_sync.lock (); diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index 05d2c49..07c35cd 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZS_DISPATCHER_HPP_INCLUDED__ -#define __ZS_DISPATCHER_HPP_INCLUDED__ +#ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__ +#define __ZMQ_DISPATCHER_HPP_INCLUDED__ #include #include @@ -31,7 +31,7 @@ #include "mutex.hpp" #include "stdint.hpp" -namespace zs +namespace zmq { // Dispatcher implements bidirectional thread-safe passing of commands @@ -51,7 +51,7 @@ namespace zs // signalers. dispatcher_t (int app_threads_, int io_threads_); - // To be called to terminate the whole infrastructure (zs_term). + // To be called to terminate the whole infrastructure (zmq_term). void shutdown (); // Create a socket engine. diff --git a/src/dummy_aggregator.cpp b/src/dummy_aggregator.cpp index ef0cea6..0b27fab 100644 --- a/src/dummy_aggregator.cpp +++ b/src/dummy_aggregator.cpp @@ -17,7 +17,7 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "dummy_aggregator.hpp" #include "err.hpp" @@ -30,27 +30,27 @@ pipes [i1_]->set_index (i1_);\ pipes [i2_]->set_index (i2_); -zs::dummy_aggregator_t::dummy_aggregator_t () : +zmq::dummy_aggregator_t::dummy_aggregator_t () : session (NULL), pipe (NULL), active (false) { } -void zs::dummy_aggregator_t::set_session (session_t *session_) +void zmq::dummy_aggregator_t::set_session (session_t *session_) { - zs_assert (!session); + zmq_assert (!session); session = session_; } -void zs::dummy_aggregator_t::shutdown () +void zmq::dummy_aggregator_t::shutdown () { // No need to deallocate the pipe here. It'll be deallocated during the // shutdown of the dispatcher. delete this; } -void zs::dummy_aggregator_t::terminate () +void zmq::dummy_aggregator_t::terminate () { if (pipe) pipe->terminate (); @@ -58,13 +58,13 @@ void zs::dummy_aggregator_t::terminate () delete this; } -zs::dummy_aggregator_t::~dummy_aggregator_t () +zmq::dummy_aggregator_t::~dummy_aggregator_t () { } -void zs::dummy_aggregator_t::attach_pipe (pipe_reader_t *pipe_) +void zmq::dummy_aggregator_t::attach_pipe (pipe_reader_t *pipe_) { - zs_assert (!pipe); + zmq_assert (!pipe); pipe = pipe_; active = true; @@ -73,22 +73,22 @@ void zs::dummy_aggregator_t::attach_pipe (pipe_reader_t *pipe_) session->revive (); } -void zs::dummy_aggregator_t::detach_pipe (pipe_reader_t *pipe_) +void zmq::dummy_aggregator_t::detach_pipe (pipe_reader_t *pipe_) { - zs_assert (pipe == pipe_); + zmq_assert (pipe == pipe_); deactivate (pipe_); pipe = NULL; } -bool zs::dummy_aggregator_t::empty () +bool zmq::dummy_aggregator_t::empty () { return pipe == NULL; } -bool zs::dummy_aggregator_t::recv (zs_msg *msg_) +bool zmq::dummy_aggregator_t::recv (zmq_msg *msg_) { // Deallocate old content of the message. - zs_msg_close (msg_); + zmq_msg_close (msg_); // Try to read from the pipe. if (pipe && pipe->read (msg_)) @@ -96,16 +96,16 @@ bool zs::dummy_aggregator_t::recv (zs_msg *msg_) // No message is available. Initialise the output parameter // to be a 0-byte message. - zs_msg_init (msg_); + zmq_msg_init (msg_); return false; } -void zs::dummy_aggregator_t::deactivate (pipe_reader_t *pipe_) +void zmq::dummy_aggregator_t::deactivate (pipe_reader_t *pipe_) { active = false; } -void zs::dummy_aggregator_t::reactivate (pipe_reader_t *pipe_) +void zmq::dummy_aggregator_t::reactivate (pipe_reader_t *pipe_) { active = true; } diff --git a/src/dummy_aggregator.hpp b/src/dummy_aggregator.hpp index ab5bcb9..6a9e9db 100644 --- a/src/dummy_aggregator.hpp +++ b/src/dummy_aggregator.hpp @@ -17,14 +17,14 @@ along with this program. If not, see . */ -#ifndef __ZS_DUMMY_AGGREGATOR_HPP_INCLUDED__ -#define __ZS_DUMMY_AGGREGATOR_HPP_INCLUDED__ +#ifndef __ZMQ_DUMMY_AGGREGATOR_HPP_INCLUDED__ +#define __ZMQ_DUMMY_AGGREGATOR_HPP_INCLUDED__ #include #include "i_mux.hpp" -namespace zs +namespace zmq { // Fake message aggregator. There can be at most one pipe bound to it, @@ -47,7 +47,7 @@ namespace zs bool empty (); void deactivate (class pipe_reader_t *pipe_); void reactivate (class pipe_reader_t *pipe_); - bool recv (struct zs_msg *msg_); + bool recv (struct zmq_msg *msg_); private: diff --git a/src/dummy_distributor.cpp b/src/dummy_distributor.cpp index 58cadfe..62e2b88 100644 --- a/src/dummy_distributor.cpp +++ b/src/dummy_distributor.cpp @@ -17,7 +17,7 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "dummy_distributor.hpp" #include "pipe_writer.hpp" @@ -25,25 +25,25 @@ #include "session.hpp" #include "msg.hpp" -zs::dummy_distributor_t::dummy_distributor_t () : +zmq::dummy_distributor_t::dummy_distributor_t () : session (NULL) { } -void zs::dummy_distributor_t::set_session (session_t *session_) +void zmq::dummy_distributor_t::set_session (session_t *session_) { - zs_assert (!session); + zmq_assert (!session); session = session_; } -void zs::dummy_distributor_t::shutdown () +void zmq::dummy_distributor_t::shutdown () { // No need to deallocate pipe here. It'll be deallocated during the // shutdown of the dispatcher. delete this; } -void zs::dummy_distributor_t::terminate () +void zmq::dummy_distributor_t::terminate () { if (pipe) pipe->terminate (); @@ -51,33 +51,33 @@ void zs::dummy_distributor_t::terminate () delete this; } -zs::dummy_distributor_t::~dummy_distributor_t () +zmq::dummy_distributor_t::~dummy_distributor_t () { } -void zs::dummy_distributor_t::attach_pipe (pipe_writer_t *pipe_) +void zmq::dummy_distributor_t::attach_pipe (pipe_writer_t *pipe_) { - zs_assert (!pipe); + zmq_assert (!pipe); pipe = pipe_; } -void zs::dummy_distributor_t::detach_pipe (pipe_writer_t *pipe_) +void zmq::dummy_distributor_t::detach_pipe (pipe_writer_t *pipe_) { - zs_assert (pipe == pipe_); + zmq_assert (pipe == pipe_); pipe = NULL; } -bool zs::dummy_distributor_t::empty () +bool zmq::dummy_distributor_t::empty () { return pipe == NULL; } -bool zs::dummy_distributor_t::send (zs_msg *msg_) +bool zmq::dummy_distributor_t::send (zmq_msg *msg_) { return pipe && pipe->write (msg_); } -void zs::dummy_distributor_t::flush () +void zmq::dummy_distributor_t::flush () { if (pipe) pipe->flush (); diff --git a/src/dummy_distributor.hpp b/src/dummy_distributor.hpp index c200ad6..a71cc49 100644 --- a/src/dummy_distributor.hpp +++ b/src/dummy_distributor.hpp @@ -17,14 +17,14 @@ along with this program. If not, see . */ -#ifndef __ZS_DUMMY_DISTRIBUTOR_HPP_INCLUDED__ -#define __ZS_DUMMY_DISTRIBUTOR_HPP_INCLUDED__ +#ifndef __ZMQ_DUMMY_DISTRIBUTOR_HPP_INCLUDED__ +#define __ZMQ_DUMMY_DISTRIBUTOR_HPP_INCLUDED__ #include #include -namespace zs +namespace zmq { // Fake message distributor. There can be only one pipe bound to it @@ -45,7 +45,7 @@ namespace zs void attach_pipe (class pipe_writer_t *pipe_); void detach_pipe (class pipe_writer_t *pipe_); bool empty (); - bool send (struct zs_msg *msg_); + bool send (struct zmq_msg *msg_); void flush (); private: diff --git a/src/encoder.hpp b/src/encoder.hpp index 1241873..653fbfb 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -17,14 +17,14 @@ along with this program. If not, see . */ -#ifndef __ZS_ENCODER_HPP_INCLUDED__ -#define __ZS_ENCODER_HPP_INCLUDED__ +#ifndef __ZMQ_ENCODER_HPP_INCLUDED__ +#define __ZMQ_ENCODER_HPP_INCLUDED__ #include #include #include -namespace zs +namespace zmq { // Helper base class for encoders. It implements the state machine that diff --git a/src/epoll.cpp b/src/epoll.cpp index a9780d2..c4c8fdb 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -19,7 +19,7 @@ #include "platform.hpp" -#ifdef ZS_HAVE_LINUX +#ifdef ZMQ_HAVE_LINUX #include #include @@ -32,14 +32,14 @@ #include "config.hpp" #include "i_poll_events.hpp" -zs::epoll_t::epoll_t () : +zmq::epoll_t::epoll_t () : stopping (false) { epoll_fd = epoll_create (1); errno_assert (epoll_fd != -1); } -zs::epoll_t::~epoll_t () +zmq::epoll_t::~epoll_t () { close (epoll_fd); @@ -47,10 +47,10 @@ zs::epoll_t::~epoll_t () delete *it; } -zs::handle_t zs::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) +zmq::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) { poll_entry_t *pe = new poll_entry_t; - zs_assert (pe != NULL); + zmq_assert (pe != NULL); // The memset is not actually needed. It's here to prevent debugging // tools to complain about using uninitialised memory. @@ -72,7 +72,7 @@ zs::handle_t zs::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) return handle; } -void zs::epoll_t::rm_fd (handle_t handle_) +void zmq::epoll_t::rm_fd (handle_t handle_) { poll_entry_t *pe = (poll_entry_t*) handle_.ptr; int rc = epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev); @@ -84,7 +84,7 @@ void zs::epoll_t::rm_fd (handle_t handle_) load.sub (1); } -void zs::epoll_t::set_pollin (handle_t handle_) +void zmq::epoll_t::set_pollin (handle_t handle_) { poll_entry_t *pe = (poll_entry_t*) handle_.ptr; pe->ev.events |= EPOLLIN; @@ -92,7 +92,7 @@ void zs::epoll_t::set_pollin (handle_t handle_) errno_assert (rc != -1); } -void zs::epoll_t::reset_pollin (handle_t handle_) +void zmq::epoll_t::reset_pollin (handle_t handle_) { poll_entry_t *pe = (poll_entry_t*) handle_.ptr; pe->ev.events &= ~((short) EPOLLIN); @@ -100,7 +100,7 @@ void zs::epoll_t::reset_pollin (handle_t handle_) errno_assert (rc != -1); } -void zs::epoll_t::set_pollout (handle_t handle_) +void zmq::epoll_t::set_pollout (handle_t handle_) { poll_entry_t *pe = (poll_entry_t*) handle_.ptr; pe->ev.events |= EPOLLOUT; @@ -108,7 +108,7 @@ void zs::epoll_t::set_pollout (handle_t handle_) errno_assert (rc != -1); } -void zs::epoll_t::reset_pollout (handle_t handle_) +void zmq::epoll_t::reset_pollout (handle_t handle_) { poll_entry_t *pe = (poll_entry_t*) handle_.ptr; pe->ev.events &= ~((short) EPOLLOUT); @@ -116,12 +116,12 @@ void zs::epoll_t::reset_pollout (handle_t handle_) errno_assert (rc != -1); } -void zs::epoll_t::add_timer (i_poll_events *events_) +void zmq::epoll_t::add_timer (i_poll_events *events_) { timers.push_back (events_); } -void zs::epoll_t::cancel_timer (i_poll_events *events_) +void zmq::epoll_t::cancel_timer (i_poll_events *events_) { timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); if (it == timers.end ()) @@ -129,27 +129,27 @@ void zs::epoll_t::cancel_timer (i_poll_events *events_) timers.erase (it); } -int zs::epoll_t::get_load () +int zmq::epoll_t::get_load () { return load.get (); } -void zs::epoll_t::start () +void zmq::epoll_t::start () { worker.start (worker_routine, this); } -void zs::epoll_t::stop () +void zmq::epoll_t::stop () { stopping = true; } -void zs::epoll_t::join () +void zmq::epoll_t::join () { worker.stop (); } -void zs::epoll_t::loop () +void zmq::epoll_t::loop () { epoll_event ev_buf [max_io_events]; @@ -206,7 +206,7 @@ void zs::epoll_t::loop () } } -void zs::epoll_t::worker_routine (void *arg_) +void zmq::epoll_t::worker_routine (void *arg_) { ((epoll_t*) arg_)->loop (); } diff --git a/src/epoll.hpp b/src/epoll.hpp index a3f5168..aa363ee 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_EPOLL_HPP_INCLUDED__ -#define __ZS_EPOLL_HPP_INCLUDED__ +#ifndef __ZMQ_EPOLL_HPP_INCLUDED__ +#define __ZMQ_EPOLL_HPP_INCLUDED__ #include "platform.hpp" -#ifdef ZS_HAVE_LINUX +#ifdef ZMQ_HAVE_LINUX #include #include @@ -33,7 +33,7 @@ #include "thread.hpp" #include "atomic_counter.hpp" -namespace zs +namespace zmq { // This class implements socket polling mechanism using the Linux-specific diff --git a/src/err.cpp b/src/err.cpp index 92a03ba..bca0c03 100644 --- a/src/err.cpp +++ b/src/err.cpp @@ -20,9 +20,9 @@ #include "err.hpp" #include "platform.hpp" -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS -const char *zs::wsa_error() +const char *zmq::wsa_error() { int errcode = WSAGetLastError (); // TODO: This is not a generic way to handle this... @@ -134,13 +134,13 @@ const char *zs::wsa_error() "Valid name no data record of requested" : "error not defined"; } -void zs::win_error (char *buffer_, size_t buffer_size_) +void zmq::win_error (char *buffer_, size_t buffer_size_) { DWORD errcode = GetLastError (); DWORD rc = FormatMessageA (FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, errcode, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), buffer_, buffer_size_, NULL ); - zs_assert (rc); + zmq_assert (rc); } #endif diff --git a/src/err.hpp b/src/err.hpp index 657eb3d..fdfce01 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZS_ERR_HPP_INCLUDED__ -#define __ZS_ERR_HPP_INCLUDED__ +#ifndef __ZMQ_ERR_HPP_INCLUDED__ +#define __ZMQ_ERR_HPP_INCLUDED__ #include #include @@ -27,15 +27,15 @@ #include "platform.hpp" -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS #include "windows.hpp" #else #include #endif -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS -namespace zs +namespace zmq { const char *wsa_error (); @@ -45,7 +45,7 @@ namespace zs // Provides convenient way to check WSA-style errors on Windows. #define wsa_assert(x) do { if (!(x)){\ - const char *errstr = zs::wsa_error ();\ + const char *errstr = zmq::wsa_error ();\ if (errstr != NULL) {\ fprintf (stderr, "Assertion failed: %s (%s:%d)\n", errstr, \ __FILE__, __LINE__);\ @@ -56,7 +56,7 @@ namespace zs // Provides convenient way to check GetLastError-style errors on Windows. #define win_assert(x) do { if (!(x)) {\ char errstr [256];\ - zs::win_error (errstr, 256);\ + zmq::win_error (errstr, 256);\ fprintf (stderr, "Assertion failed: %s (%s:%d)\n", errstr, \ __FILE__, __LINE__);\ abort ();\ @@ -67,7 +67,7 @@ namespace zs // This macro works in exactly the same way as the normal assert. It is used // in its stead because standard assert on Win32 in broken - it prints nothing // when used within the scope of JNI library. -#define zs_assert(x) do { if (!(x)){\ +#define zmq_assert(x) do { if (!(x)){\ fprintf (stderr, "Assertion failed: %s (%s:%d)\n", #x, \ __FILE__, __LINE__);\ abort ();\ diff --git a/src/fair_aggregator.cpp b/src/fair_aggregator.cpp index 65bfac0..1e6937f 100644 --- a/src/fair_aggregator.cpp +++ b/src/fair_aggregator.cpp @@ -17,7 +17,7 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "fair_aggregator.hpp" #include "err.hpp" @@ -30,27 +30,27 @@ pipes [i1_]->set_index (i1_);\ pipes [i2_]->set_index (i2_); -zs::fair_aggregator_t::fair_aggregator_t () : +zmq::fair_aggregator_t::fair_aggregator_t () : session (NULL), active (0), current (0) { } -void zs::fair_aggregator_t::set_session (session_t *session_) +void zmq::fair_aggregator_t::set_session (session_t *session_) { - zs_assert (!session); + zmq_assert (!session); session = session_; } -void zs::fair_aggregator_t::shutdown () +void zmq::fair_aggregator_t::shutdown () { // No need to deallocate pipes here. They'll be deallocated during the // shutdown of the dispatcher. delete this; } -void zs::fair_aggregator_t::terminate () +void zmq::fair_aggregator_t::terminate () { // Pipe unregisters itself during the call to terminate, so the pipes // list shinks by one in each iteration. @@ -60,11 +60,11 @@ void zs::fair_aggregator_t::terminate () delete this; } -zs::fair_aggregator_t::~fair_aggregator_t () +zmq::fair_aggregator_t::~fair_aggregator_t () { } -void zs::fair_aggregator_t::attach_pipe (pipe_reader_t *pipe_) +void zmq::fair_aggregator_t::attach_pipe (pipe_reader_t *pipe_) { // Associate new pipe with the mux object. pipe_->set_mux (this); @@ -76,7 +76,7 @@ void zs::fair_aggregator_t::attach_pipe (pipe_reader_t *pipe_) session->revive (); } -void zs::fair_aggregator_t::detach_pipe (pipe_reader_t *pipe_) +void zmq::fair_aggregator_t::detach_pipe (pipe_reader_t *pipe_) { // Move the pipe from the list of active pipes to the list of idle pipes. deactivate (pipe_); @@ -86,15 +86,15 @@ void zs::fair_aggregator_t::detach_pipe (pipe_reader_t *pipe_) pipes.pop_back (); } -bool zs::fair_aggregator_t::empty () +bool zmq::fair_aggregator_t::empty () { return pipes.empty (); } -bool zs::fair_aggregator_t::recv (zs_msg *msg_) +bool zmq::fair_aggregator_t::recv (zmq_msg *msg_) { // Deallocate old content of the message. - zs_msg_close (msg_); + zmq_msg_close (msg_); // O(1) fair queueing. Round-robin over the active pipes to get // next message. @@ -110,11 +110,11 @@ bool zs::fair_aggregator_t::recv (zs_msg *msg_) // No message is available. Initialise the output parameter // to be a 0-byte message. - zs_msg_init (msg_); + zmq_msg_init (msg_); return false; } -void zs::fair_aggregator_t::deactivate (pipe_reader_t *pipe_) +void zmq::fair_aggregator_t::deactivate (pipe_reader_t *pipe_) { int index = pipe_->get_index (); @@ -133,7 +133,7 @@ void zs::fair_aggregator_t::deactivate (pipe_reader_t *pipe_) } } -void zs::fair_aggregator_t::reactivate (pipe_reader_t *pipe_) +void zmq::fair_aggregator_t::reactivate (pipe_reader_t *pipe_) { // Revive an idle pipe. swap_pipes (pipe_->get_index (), active); diff --git a/src/fair_aggregator.hpp b/src/fair_aggregator.hpp index 9e6c3bb..6ae1fc5 100644 --- a/src/fair_aggregator.hpp +++ b/src/fair_aggregator.hpp @@ -17,14 +17,14 @@ along with this program. If not, see . */ -#ifndef __ZS_FAIR_AGGREGATOR_HPP_INCLUDED__ -#define __ZS_FAIR_AGGREGATOR_HPP_INCLUDED__ +#ifndef __ZMQ_FAIR_AGGREGATOR_HPP_INCLUDED__ +#define __ZMQ_FAIR_AGGREGATOR_HPP_INCLUDED__ #include #include "i_mux.hpp" -namespace zs +namespace zmq { // Object to aggregate messages from inbound pipes. @@ -44,7 +44,7 @@ namespace zs bool empty (); void deactivate (class pipe_reader_t *pipe_); void reactivate (class pipe_reader_t *pipe_); - bool recv (struct zs_msg *msg_); + bool recv (struct zmq_msg *msg_); private: diff --git a/src/fd.hpp b/src/fd.hpp index 4d45ed2..b7b2391 100644 --- a/src/fd.hpp +++ b/src/fd.hpp @@ -17,16 +17,16 @@ along with this program. If not, see . */ -#ifndef __ZS_FD_HPP_INCLUDED__ -#define __ZS_FD_HPP_INCLUDED__ +#ifndef __ZMQ_FD_HPP_INCLUDED__ +#define __ZMQ_FD_HPP_INCLUDED__ #include "platform.hpp" -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS #include "windows.hpp" #endif -namespace zs +namespace zmq { #ifdef _MSC_VER #if (_MSC_VER <= 1400) diff --git a/src/fd_signaler.cpp b/src/fd_signaler.cpp index 005dd86..771094b 100644 --- a/src/fd_signaler.cpp +++ b/src/fd_signaler.cpp @@ -22,20 +22,20 @@ #include "err.hpp" #include "fd.hpp" -#if defined ZS_HAVE_OPENVMS +#if defined ZMQ_HAVE_OPENVMS #include -#elif defined ZS_HAVE_WINDOWS +#elif defined ZMQ_HAVE_WINDOWS #include "windows.hpp" #else #include #include #endif -#if defined ZS_HAVE_EVENTFD +#if defined ZMQ_HAVE_EVENTFD #include -zs::fd_signaler_t::fd_signaler_t () +zmq::fd_signaler_t::fd_signaler_t () { // Create eventfd object. fd = eventfd (0, 0); @@ -49,22 +49,22 @@ zs::fd_signaler_t::fd_signaler_t () errno_assert (rc != -1); } -zs::fd_signaler_t::~fd_signaler_t () +zmq::fd_signaler_t::~fd_signaler_t () { int rc = close (fd); errno_assert (rc != -1); } -void zs::fd_signaler_t::signal (int signal_) +void zmq::fd_signaler_t::signal (int signal_) { - zs_assert (signal_ >= 0 && signal_ < 64); + zmq_assert (signal_ >= 0 && signal_ < 64); signals_t inc = 1; inc <<= signal_; ssize_t sz = write (fd, &inc, sizeof (signals_t)); errno_assert (sz == sizeof (signals_t)); } -zs::fd_signaler_t::signals_t zs::fd_signaler_t::check () +zmq::fd_signaler_t::signals_t zmq::fd_signaler_t::check () { signals_t val; ssize_t sz = read (fd, &val, sizeof (signals_t)); @@ -75,14 +75,14 @@ zs::fd_signaler_t::signals_t zs::fd_signaler_t::check () return val; } -zs::fd_t zs::fd_signaler_t::get_fd () +zmq::fd_t zmq::fd_signaler_t::get_fd () { return fd; } -#elif defined ZS_HAVE_WINDOWS +#elif defined ZMQ_HAVE_WINDOWS -zs::fd_signaler_t::fd_signaler_t () +zmq::fd_signaler_t::fd_signaler_t () { struct sockaddr_in addr; SOCKET listener; @@ -124,7 +124,7 @@ zs::fd_signaler_t::fd_signaler_t () wsa_assert (rc != SOCKET_ERROR); } -zs::fd_signaler_t::~fd_signaler_t () +zmq::fd_signaler_t::~fd_signaler_t () { int rc = closesocket (w); wsa_assert (rc != SOCKET_ERROR); @@ -133,15 +133,15 @@ zs::fd_signaler_t::~fd_signaler_t () wsa_assert (rc != SOCKET_ERROR); } -void zs::fd_signaler_t::signal (int signal_) +void zmq::fd_signaler_t::signal (int signal_) { - zs_assert (signal_ >= 0 && signal_ < 64); + zmq_assert (signal_ >= 0 && signal_ < 64); char c = (char) signal_; int rc = send (w, &c, 1, 0); win_assert (rc != SOCKET_ERROR); } -zs::fd_signaler_t::signals_t zs::fd_signaler_t::check () +zmq::fd_signaler_t::signals_t zmq::fd_signaler_t::check () { char buffer [32]; int nbytes = recv (r, buffer, 32, 0); @@ -149,13 +149,13 @@ zs::fd_signaler_t::signals_t zs::fd_signaler_t::check () signals_t signals = 0; for (int pos = 0; pos != nbytes; pos++) { - zs_assert (buffer [pos] < 64); + zmq_assert (buffer [pos] < 64); signals |= (1 << (buffer [pos])); } return signals; } -zs::fd_t zs::fd_signaler_t::get_fd () +zmq::fd_t zmq::fd_signaler_t::get_fd () { return r; } @@ -165,7 +165,7 @@ zs::fd_t zs::fd_signaler_t::get_fd () #include #include -zs::fd_signaler_t::fd_signaler_t () +zmq::fd_signaler_t::fd_signaler_t () { int sv [2]; int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); @@ -181,43 +181,43 @@ zs::fd_signaler_t::fd_signaler_t () errno_assert (rc != -1); } -zs::fd_signaler_t::~fd_signaler_t () +zmq::fd_signaler_t::~fd_signaler_t () { close (w); close (r); } -void zs::fd_signaler_t::signal (int signal_) +void zmq::fd_signaler_t::signal (int signal_) { - zs_assert (signal_ >= 0 && signal_ < 64); + zmq_assert (signal_ >= 0 && signal_ < 64); unsigned char c = (unsigned char) signal_; ssize_t nbytes = send (w, &c, 1, 0); errno_assert (nbytes == 1); } -zs::fd_signaler_t::signals_t zs::fd_signaler_t::check () +zmq::fd_signaler_t::signals_t zmq::fd_signaler_t::check () { unsigned char buffer [32]; ssize_t nbytes = recv (r, buffer, 32, 0); errno_assert (nbytes != -1); signals_t signals = 0; for (int pos = 0; pos != nbytes; pos ++) { - zs_assert (buffer [pos] < 64); + zmq_assert (buffer [pos] < 64); signals |= (1 << (buffer [pos])); } return signals; } -zs::fd_t zs::fd_signaler_t::get_fd () +zmq::fd_t zmq::fd_signaler_t::get_fd () { return r; } #endif -#if defined ZS_HAVE_OPENVMS +#if defined ZMQ_HAVE_OPENVMS -int zs::fd_signaler_t::socketpair (int domain_, int type_, int protocol_, +int zmq::fd_signaler_t::socketpair (int domain_, int type_, int protocol_, int sv_ [2]) { int listener; @@ -226,7 +226,7 @@ int zs::fd_signaler_t::socketpair (int domain_, int type_, int protocol_, int rc; int on = 1; - zs_assert (type_ == SOCK_STREAM); + zmq_assert (type_ == SOCK_STREAM); // Fill in the localhost address (127.0.0.1). memset (&lcladdr, 0, sizeof (lcladdr)); diff --git a/src/fd_signaler.hpp b/src/fd_signaler.hpp index 34c5e8c..11baa95 100644 --- a/src/fd_signaler.hpp +++ b/src/fd_signaler.hpp @@ -17,15 +17,15 @@ along with this program. If not, see . */ -#ifndef __ZS_FD_SIGNALER_HPP_INCLUDED__ -#define __ZS_FD_SIGNALER_HPP_INCLUDED__ +#ifndef __ZMQ_FD_SIGNALER_HPP_INCLUDED__ +#define __ZMQ_FD_SIGNALER_HPP_INCLUDED__ #include "platform.hpp" #include "i_signaler.hpp" #include "fd.hpp" #include "stdint.hpp" -namespace zs +namespace zmq { // This object can be used to send individual signals from one thread to @@ -73,7 +73,7 @@ namespace zs #endif -#if defined ZS_HAVE_EVENTFD +#if defined ZMQ_HAVE_EVENTFD // Eventfd descriptor. fd_t fd; #else diff --git a/src/i_api.hpp b/src/i_api.hpp index 4dccd9e..fc7275b 100644 --- a/src/i_api.hpp +++ b/src/i_api.hpp @@ -17,20 +17,20 @@ along with this program. If not, see . */ -#ifndef __ZS_I_API_HPP_INCLUDED__ -#define __ZS_I_API_HPP_INCLUDED__ +#ifndef __ZMQ_I_API_HPP_INCLUDED__ +#define __ZMQ_I_API_HPP_INCLUDED__ -namespace zs +namespace zmq { struct i_api { - virtual int bind (const char *addr_, struct zs_opts *opts_) = 0; - virtual int connect (const char *addr_, struct zs_opts *opts_) = 0; + virtual int bind (const char *addr_, struct zmq_opts *opts_) = 0; + virtual int connect (const char *addr_, struct zmq_opts *opts_) = 0; virtual int subscribe (const char *criteria_) = 0; - virtual int send (struct zs_msg *msg_, int flags_) = 0; + virtual int send (struct zmq_msg *msg_, int flags_) = 0; virtual int flush () = 0; - virtual int recv (struct zs_msg *msg_, int flags_) = 0; + virtual int recv (struct zmq_msg *msg_, int flags_) = 0; virtual int close () = 0; }; diff --git a/src/i_demux.hpp b/src/i_demux.hpp index edded1e..c4755b5 100644 --- a/src/i_demux.hpp +++ b/src/i_demux.hpp @@ -17,10 +17,10 @@ along with this program. If not, see . */ -#ifndef __ZS_I_DEMUX_HPP_INCLUDED__ -#define __ZS_I_DEMUX_HPP_INCLUDED__ +#ifndef __ZMQ_I_DEMUX_HPP_INCLUDED__ +#define __ZMQ_I_DEMUX_HPP_INCLUDED__ -namespace zs +namespace zmq { struct i_demux @@ -28,7 +28,8 @@ namespace zs // Attaches mux to a particular session. virtual void set_session (class session_t *session_) = 0; - // To be called when the whole infrastrucure is being closed (zs_term). + // To be called when the whole infrastrucure + // is being closed (zmq_term). virtual void shutdown () = 0; // To be called when session is being closed. @@ -45,7 +46,7 @@ namespace zs // Sends the message. Returns false if the message cannot be sent // because the pipes are full. - virtual bool send (struct zs_msg *msg_) = 0; + virtual bool send (struct zmq_msg *msg_) = 0; // Flushes messages downstream. virtual void flush () = 0; diff --git a/src/i_engine.hpp b/src/i_engine.hpp index bade705..8ca2007 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -17,10 +17,10 @@ along with this program. If not, see . */ -#ifndef __ZS_I_ENGINE_HPP_INCLUDED__ -#define __ZS_I_ENGINE_HPP_INCLUDED__ +#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__ +#define __ZMQ_I_ENGINE_HPP_INCLUDED__ -namespace zs +namespace zmq { // Generic interface to access engines from MD objects. @@ -44,7 +44,7 @@ namespace zs // Called by normal object termination process. virtual void terminate () = 0; - // To be called by MD when terminal shutdown (zs_term) is in progress. + // To be called by MD when terminal shutdown (zmq_term) is in progress. virtual void shutdown () = 0; }; diff --git a/src/i_mux.hpp b/src/i_mux.hpp index 4c8ef72..22e0a26 100644 --- a/src/i_mux.hpp +++ b/src/i_mux.hpp @@ -17,10 +17,10 @@ along with this program. If not, see . */ -#ifndef __ZS_I_MUX_HPP_INCLUDED__ -#define __ZS_I_MUX_HPP_INCLUDED__ +#ifndef __ZMQ_I_MUX_HPP_INCLUDED__ +#define __ZMQ_I_MUX_HPP_INCLUDED__ -namespace zs +namespace zmq { struct i_mux @@ -28,7 +28,8 @@ namespace zs // Attaches mux to a particular session. virtual void set_session (class session_t *session_) = 0; - // To be called when the whole infrastrucure is being closed (zs_term). + // To be called when the whole infrastrucure + // is being closed (zmq_term). virtual void shutdown () = 0; // To be called when session is being closed. @@ -51,7 +52,7 @@ namespace zs // Receives a message. Returns false when there is no message // to receive. - virtual bool recv (struct zs_msg *msg_) = 0; + virtual bool recv (struct zmq_msg *msg_) = 0; }; } diff --git a/src/i_poll_events.hpp b/src/i_poll_events.hpp index 5189dad..c065884 100644 --- a/src/i_poll_events.hpp +++ b/src/i_poll_events.hpp @@ -17,10 +17,10 @@ along with this program. If not, see . */ -#ifndef __ZS_I_POLL_EVENTS_HPP_INCLUDED__ -#define __ZS_I_POLL_EVENTS_HPP_INCLUDED__ +#ifndef __ZMQ_I_POLL_EVENTS_HPP_INCLUDED__ +#define __ZMQ_I_POLL_EVENTS_HPP_INCLUDED__ -namespace zs +namespace zmq { // Virtual interface to be exposed by object that want to be notified diff --git a/src/i_poller.hpp b/src/i_poller.hpp index c226dfa..52ca095 100644 --- a/src/i_poller.hpp +++ b/src/i_poller.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_I_POLLER_HPP_INCLUDED__ -#define __ZS_I_POLLER_HPP_INCLUDED__ +#ifndef __ZMQ_I_POLLER_HPP_INCLUDED__ +#define __ZMQ_I_POLLER_HPP_INCLUDED__ #include "fd.hpp" -namespace zs +namespace zmq { union handle_t diff --git a/src/i_session.hpp b/src/i_session.hpp index 8a8c40f..21cdc0d 100644 --- a/src/i_session.hpp +++ b/src/i_session.hpp @@ -17,18 +17,18 @@ along with this program. If not, see . */ -#ifndef __ZS_I_SESSION_HPP_INCLUDED__ -#define __ZS_I_SESSION_HPP_INCLUDED__ +#ifndef __ZMQ_I_SESSION_HPP_INCLUDED__ +#define __ZMQ_I_SESSION_HPP_INCLUDED__ -namespace zs +namespace zmq { struct i_session { virtual void set_engine (struct i_engine *engine_) = 0; virtual void shutdown () = 0; - virtual bool read (struct zs_msg *msg_) = 0; - virtual bool write (struct zs_msg *msg_) = 0; + virtual bool read (struct zmq_msg *msg_) = 0; + virtual bool write (struct zmq_msg *msg_) = 0; virtual void flush () = 0; }; diff --git a/src/i_signaler.hpp b/src/i_signaler.hpp index f6f0398..adf54e5 100644 --- a/src/i_signaler.hpp +++ b/src/i_signaler.hpp @@ -17,10 +17,10 @@ along with this program. If not, see . */ -#ifndef __ZS_I_SIGNALER_HPP_INCLUDED__ -#define __ZS_I_SIGNALER_HPP_INCLUDED__ +#ifndef __ZMQ_I_SIGNALER_HPP_INCLUDED__ +#define __ZMQ_I_SIGNALER_HPP_INCLUDED__ -namespace zs +namespace zmq { // Virtual interface used to send signals. Individual implementations // may restrict the number of possible signal types to send. diff --git a/src/i_thread.hpp b/src/i_thread.hpp index fdb60c5..9f31592 100644 --- a/src/i_thread.hpp +++ b/src/i_thread.hpp @@ -17,10 +17,10 @@ along with this program. If not, see . */ -#ifndef __ZS_I_THREAD_HPP_INCLUDED__ -#define __ZS_I_THREAD_HPP_INCLUDED__ +#ifndef __ZMQ_I_THREAD_HPP_INCLUDED__ +#define __ZMQ_I_THREAD_HPP_INCLUDED__ -namespace zs +namespace zmq { // Interface used by session object to communicate with the thread diff --git a/src/io_object.cpp b/src/io_object.cpp index 01388eb..ad379cf 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -21,17 +21,17 @@ #include "io_thread.hpp" #include "i_poller.hpp" -zs::io_object_t::io_object_t (io_thread_t *thread_) : +zmq::io_object_t::io_object_t (io_thread_t *thread_) : object_t (thread_), thread (thread_) { } -zs::io_object_t::~io_object_t () +zmq::io_object_t::~io_object_t () { } -zs::i_poller *zs::io_object_t::get_poller () +zmq::i_poller *zmq::io_object_t::get_poller () { return thread->get_poller (); } diff --git a/src/io_object.hpp b/src/io_object.hpp index 766c008..d3fa809 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_IO_OBJECT_HPP_INCLUDED__ -#define __ZS_IO_OBJECT_HPP_INCLUDED__ +#ifndef __ZMQ_IO_OBJECT_HPP_INCLUDED__ +#define __ZMQ_IO_OBJECT_HPP_INCLUDED__ #include "object.hpp" -namespace zs +namespace zmq { // All objects running within the context of an I/O thread should be diff --git a/src/io_thread.cpp b/src/io_thread.cpp index 7994874..045627c 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -17,7 +17,7 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "io_thread.hpp" #include "command.hpp" @@ -34,49 +34,49 @@ #include "simple_semaphore.hpp" #include "session.hpp" -zs::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : +zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : object_t (dispatcher_, thread_slot_) { -#if defined ZS_FORCE_SELECT +#if defined ZMQ_FORCE_SELECT poller = new select_t; -#elif defined ZS_FORCE_POLL +#elif defined ZMQ_FORCE_POLL poller = new poll_t; -#elif defined ZS_FORCE_EPOLL +#elif defined ZMQ_FORCE_EPOLL poller = new epoll_t; -#elif defined ZS_FORCE_DEVPOLL +#elif defined ZMQ_FORCE_DEVPOLL poller = new devpoll_t; -#elif defined ZS_FORCE_KQUEUE +#elif defined ZMQ_FORCE_KQUEUE poller = new kqueue_t; -#elif defined ZS_HAVE_LINUX +#elif defined ZMQ_HAVE_LINUX poller = new epoll_t; -#elif defined ZS_HAVE_WINDOWS +#elif defined ZMQ_HAVE_WINDOWS poller = new select_t; -#elif defined ZS_HAVE_FREEBSD +#elif defined ZMQ_HAVE_FREEBSD poller = new kqueue_t; -#elif defined ZS_HAVE_OPENBSD +#elif defined ZMQ_HAVE_OPENBSD poller = new kqueue_t; -#elif defined ZS_HAVE_SOLARIS +#elif defined ZMQ_HAVE_SOLARIS poller = new devpoll_t; -#elif defined ZS_HAVE_OSX +#elif defined ZMQ_HAVE_OSX poller = new kqueue_t; -#elif defined ZS_HAVE_QNXNTO +#elif defined ZMQ_HAVE_QNXNTO poller = new poll_t; -#elif defined ZS_HAVE_AIX +#elif defined ZMQ_HAVE_AIX poller = new poll_t; -#elif defined ZS_HAVE_HPUX +#elif defined ZMQ_HAVE_HPUX poller = new devpoll_t; -#elif defined ZS_HAVE_OPENVMS +#elif defined ZMQ_HAVE_OPENVMS poller = new select_t; #else #error Unsupported platform #endif - zs_assert (poller); + zmq_assert (poller); signaler_handle = poller->add_fd (signaler.get_fd (), this); poller->set_pollin (signaler_handle); } -void zs::io_thread_t::shutdown () +void zmq::io_thread_t::shutdown () { // Deallocate all the sessions associated with the thread. while (!sessions.empty ()) @@ -85,42 +85,42 @@ void zs::io_thread_t::shutdown () delete this; } -zs::io_thread_t::~io_thread_t () +zmq::io_thread_t::~io_thread_t () { delete poller; } -void zs::io_thread_t::start () +void zmq::io_thread_t::start () { // Start the underlying I/O thread. poller->start (); } -void zs::io_thread_t::stop () +void zmq::io_thread_t::stop () { send_stop (); } -void zs::io_thread_t::join () +void zmq::io_thread_t::join () { poller->join (); } -zs::i_signaler *zs::io_thread_t::get_signaler () +zmq::i_signaler *zmq::io_thread_t::get_signaler () { return &signaler; } -int zs::io_thread_t::get_load () +int zmq::io_thread_t::get_load () { return poller->get_load (); } -void zs::io_thread_t::in_event () +void zmq::io_thread_t::in_event () { // Find out which threads are sending us commands. fd_signaler_t::signals_t signals = signaler.check (); - zs_assert (signals); + zmq_assert (signals); // Iterate through all the threads in the process and find out // which of them sent us commands. @@ -137,25 +137,25 @@ void zs::io_thread_t::in_event () } } -void zs::io_thread_t::out_event () +void zmq::io_thread_t::out_event () { // We are never polling for POLLOUT here. This function is never called. - zs_assert (false); + zmq_assert (false); } -void zs::io_thread_t::timer_event () +void zmq::io_thread_t::timer_event () { // No timers here. This function is never called. - zs_assert (false); + zmq_assert (false); } -void zs::io_thread_t::attach_session (session_t *session_) +void zmq::io_thread_t::attach_session (session_t *session_) { session_->set_index (sessions.size ()); sessions.push_back (session_); } -void zs::io_thread_t::detach_session (session_t *session_) +void zmq::io_thread_t::detach_session (session_t *session_) { // O(1) removal of the session from the list. sessions_t::size_type i = session_->get_index (); @@ -164,13 +164,13 @@ void zs::io_thread_t::detach_session (session_t *session_) sessions.pop_back (); } -zs::i_poller *zs::io_thread_t::get_poller () +zmq::i_poller *zmq::io_thread_t::get_poller () { - zs_assert (poller); + zmq_assert (poller); return poller; } -void zs::io_thread_t::process_stop () +void zmq::io_thread_t::process_stop () { poller->rm_fd (signaler_handle); poller->stop (); diff --git a/src/io_thread.hpp b/src/io_thread.hpp index a57aa34..afb8110 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZS_IO_THREAD_HPP_INCLUDED__ -#define __ZS_IO_THREAD_HPP_INCLUDED__ +#ifndef __ZMQ_IO_THREAD_HPP_INCLUDED__ +#define __ZMQ_IO_THREAD_HPP_INCLUDED__ #include @@ -28,7 +28,7 @@ #include "i_poll_events.hpp" #include "fd_signaler.hpp" -namespace zs +namespace zmq { // Generic part of the I/O thread. Polling-mechanism-specific features @@ -49,7 +49,7 @@ namespace zs // Wait till undelying thread terminates. void join (); - // To be called when the whole infrastrucure is being closed (zs_term). + // To be called when the whole infrastrucure is being closed (zmq_term). // It's vital to call the individual commands in this sequence: // stop, join, shutdown. void shutdown (); diff --git a/src/ip.cpp b/src/ip.cpp index f435bef..05a267e 100644 --- a/src/ip.cpp +++ b/src/ip.cpp @@ -27,7 +27,7 @@ #include "err.hpp" #include "stdint.hpp" -#if defined ZS_HAVE_SOLARIS +#if defined ZMQ_HAVE_SOLARIS #include #include @@ -44,14 +44,14 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_) // Create a socket. int fd = socket (AF_INET, SOCK_DGRAM, 0); - zs_assert (fd != -1); + zmq_assert (fd != -1); // Retrieve number of interfaces. lifnum ifn; ifn.lifn_family = AF_UNSPEC; ifn.lifn_flags = 0; int rc = ioctl (fd, SIOCGLIFNUM, (char*) &ifn); - zs_assert (rc != -1); + zmq_assert (rc != -1); // Allocate memory to get interface names. size_t ifr_size = sizeof (struct lifreq) * ifn.lifn_count; @@ -65,7 +65,7 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_) ifc.lifc_len = ifr_size; ifc.lifc_buf = ifr; rc = ioctl (fd, SIOCGLIFCONF, (char*) &ifc); - zs_assert (rc != -1); + zmq_assert (rc != -1); // Find the interface with the specified name and AF_INET family. bool found = false; @@ -74,7 +74,7 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_) n ++, ifrp ++) { if (!strcmp (interface_, ifrp->lifr_name)) { rc = ioctl (fd, SIOCGLIFADDR, (char*) ifrp); - zs_assert (rc != -1); + zmq_assert (rc != -1); if (ifrp->lifr_addr.ss_family == AF_INET) { *addr_ = ((sockaddr_in*) &ifrp->lifr_addr)->sin_addr; found = true; @@ -100,7 +100,7 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_) return 0; } -#elif defined ZS_HAVE_AIX || ZS_HAVE_HPUX +#elif defined ZMQ_HAVE_AIX || ZMQ_HAVE_HPUX #include #include @@ -119,12 +119,12 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_) // Create a socket. int sd = socket (AF_INET, SOCK_DGRAM, 0); - zs_assert (sd != -1); + zmq_assert (sd != -1); struct ifreq ifr; // Copy interface name for ioctl get. - zs_strncpy (ifr.ifr_name, interface_, sizeof (ifr.ifr_name)); + zmq_strncpy (ifr.ifr_name, interface_, sizeof (ifr.ifr_name)); // Fetch interface address. int rc = ioctl (sd, SIOCGIFADDR, (caddr_t) &ifr, sizeof (struct ifreq)); @@ -149,7 +149,7 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_) return 0; } -#elif defined ZS_HAVE_WINDOWS +#elif defined ZMQ_HAVE_WINDOWS static int resolve_nic_name (in_addr* addr_, char const *interface_) { @@ -173,9 +173,9 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_) return 0; } -#elif ((defined ZS_HAVE_LINUX || defined ZS_HAVE_FREEBSD ||\ - defined ZS_HAVE_OSX || defined ZS_HAVE_OPENBSD ||\ - defined ZS_HAVE_QNXNTO) && defined ZS_HAVE_IFADDRS) +#elif ((defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ + defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENBSD ||\ + defined ZMQ_HAVE_QNXNTO) && defined ZMQ_HAVE_IFADDRS) #include @@ -195,8 +195,8 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_) // Get the addresses. ifaddrs* ifa = NULL; int rc = getifaddrs (&ifa); - zs_assert (rc == 0); - zs_assert (ifa != NULL); + zmq_assert (rc == 0); + zmq_assert (ifa != NULL); // Find the corresponding network interface. bool found = false; @@ -237,7 +237,7 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_) // Convert IP address into sockaddr_in structure. int rc = inet_pton (AF_INET, interface_, addr_); - zs_assert (rc != 0); + zmq_assert (rc != 0); errno_assert (rc == 1); return 0; @@ -245,7 +245,7 @@ static int resolve_nic_name (in_addr* addr_, char const *interface_) #endif -int zs::resolve_ip_interface (sockaddr_in* addr_, char const *interface_) +int zmq::resolve_ip_interface (sockaddr_in* addr_, char const *interface_) { // Find the ':' that separates NIC name from port. const char *delimiter = strchr (interface_, ':'); @@ -273,7 +273,7 @@ int zs::resolve_ip_interface (sockaddr_in* addr_, char const *interface_) return 0; } -int zs::resolve_ip_hostname (sockaddr_in *addr_, const char *hostname_) +int zmq::resolve_ip_hostname (sockaddr_in *addr_, const char *hostname_) { // Find the ':' that separates hostname name from port. const char *delimiter = strchr (hostname_, ':'); @@ -295,7 +295,7 @@ int zs::resolve_ip_hostname (sockaddr_in *addr_, const char *hostname_) errno = EINVAL; return -1; } - zs_assert (res->ai_addr->sa_family == AF_INET); + zmq_assert (res->ai_addr->sa_family == AF_INET); memcpy (addr_, res->ai_addr, sizeof (sockaddr_in)); freeaddrinfo (res); diff --git a/src/ip.hpp b/src/ip.hpp index 90f02e3..2552aa2 100644 --- a/src/ip.hpp +++ b/src/ip.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_IP_HPP_INCLUDED__ -#define __ZS_IP_HPP_INCLUDED__ +#ifndef __ZMQ_IP_HPP_INCLUDED__ +#define __ZMQ_IP_HPP_INCLUDED__ #include "platform.hpp" -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS #include "windows.hpp" #else #include @@ -32,7 +32,7 @@ #include #endif -namespace zs +namespace zmq { // Resolves network interface name in : format. Symbol "*" diff --git a/src/kqueue.cpp b/src/kqueue.cpp index b0c23ee..28c15de 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -19,7 +19,7 @@ #include "platform.hpp" -#if defined ZS_HAVE_FREEBSD || defined ZS_HAVE_OPENBSD || defined ZS_HAVE_OSX +#if defined ZMQ_HAVE_FREEBSD || defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_OSX #include #include @@ -33,19 +33,19 @@ #include "config.hpp" #include "i_poll_events.hpp" -zs::kqueue_t::kqueue_t () +zmq::kqueue_t::kqueue_t () { // Create event queue kqueue_fd = kqueue (); errno_assert (kqueue_fd != -1); } -zs::kqueue_t::~kqueue_t () +zmq::kqueue_t::~kqueue_t () { close (kqueue_fd); } -void zs::kqueue_t::kevent_add (fd_t fd_, short filter_, void *udata_) +void zmq::kqueue_t::kevent_add (fd_t fd_, short filter_, void *udata_) { struct kevent ev; @@ -54,7 +54,7 @@ void zs::kqueue_t::kevent_add (fd_t fd_, short filter_, void *udata_) errno_assert (rc != -1); } -void zs::kqueue_t::kevent_delete (fd_t fd_, short filter_) +void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_) { struct kevent ev; @@ -63,10 +63,10 @@ void zs::kqueue_t::kevent_delete (fd_t fd_, short filter_) errno_assert (rc != -1); } -zs::handle_t zs::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_) +zmq::handle_t zmq::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_) { poll_entry_t *pe = new poll_entry_t; - zs_assert (pe != NULL); + zmq_assert (pe != NULL); pe->fd = fd_; pe->flag_pollin = 0; @@ -78,7 +78,7 @@ zs::handle_t zs::kqueue_t::add_fd (fd_t fd_, i_poll_events *reactor_) return handle; } -void zs::kqueue_t::rm_fd (handle_t handle_) +void zmq::kqueue_t::rm_fd (handle_t handle_) { poll_entry_t *pe = (poll_entry_t*) handle_.ptr; if (pe->flag_pollin) @@ -89,67 +89,67 @@ void zs::kqueue_t::rm_fd (handle_t handle_) retired.push_back (pe); } -void zs::kqueue_t::set_pollin (handle_t handle_) +void zmq::kqueue_t::set_pollin (handle_t handle_) { poll_entry_t *pe = (poll_entry_t*) handle_.ptr; pe->flag_pollin = true; kevent_add (pe->fd, EVFILT_READ, pe); } -void zs::kqueue_t::reset_pollin (handle_t handle_) +void zmq::kqueue_t::reset_pollin (handle_t handle_) { poll_entry_t *pe = (poll_entry_t*) handle_.ptr; pe->flag_pollin = false; kevent_delete (pe->fd, EVFILT_READ); } -void zs::kqueue_t::set_pollout (handle_t handle_) +void zmq::kqueue_t::set_pollout (handle_t handle_) { poll_entry_t *pe = (poll_entry_t*) handle_.ptr; pe->flag_pollout = true; kevent_add (pe->fd, EVFILT_WRITE, pe); } -void zs::kqueue_t::reset_pollout (handle_t handle_) +void zmq::kqueue_t::reset_pollout (handle_t handle_) { poll_entry_t *pe = (poll_entry_t*) handle_.ptr; pe->flag_pollout = false; kevent_delete (pe->fd, EVFILT_WRITE); } -void zs::kqueue_t::add_timer (i_poll_events *events_) +void zmq::kqueue_t::add_timer (i_poll_events *events_) { timers.push_back (events_); } -void zs::kqueue_t::cancel_timer (i_poll_events *events_) +void zmq::kqueue_t::cancel_timer (i_poll_events *events_) { timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); if (it != timers.end ()) timers.erase (it); } -int zs::kqueue_t::get_load () +int zmq::kqueue_t::get_load () { return load.get (); } -void zs::kqueue_t::start () +void zmq::kqueue_t::start () { worker.start (worker_routine, this); } -void zs::kqueue_t::stop () +void zmq::kqueue_t::stop () { stopping = true; } -void zs::kqueue_t::join () +void zmq::kqueue_t::join () { worker.stop (); } -void zs::kqueue_t::loop () +void zmq::kqueue_t::loop () { while (!stopping) { @@ -206,7 +206,7 @@ void zs::kqueue_t::loop () } } -void zs::kqueue_t::worker_routine (void *arg_) +void zmq::kqueue_t::worker_routine (void *arg_) { ((kqueue_t*) arg_)->loop (); } diff --git a/src/kqueue.hpp b/src/kqueue.hpp index f060b28..2fd6819 100644 --- a/src/kqueue.hpp +++ b/src/kqueue.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_KQUEUE_HPP_INCLUDED__ -#define __ZS_KQUEUE_HPP_INCLUDED__ +#ifndef __ZMQ_KQUEUE_HPP_INCLUDED__ +#define __ZMQ_KQUEUE_HPP_INCLUDED__ #include "platform.hpp" -#if defined ZS_HAVE_FREEBSD || defined ZS_HAVE_OPENBSD || defined ZS_HAVE_OSX +#if defined ZMQ_HAVE_FREEBSD || defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_OSX #include @@ -31,7 +31,7 @@ #include "thread.hpp" #include "atomic_counter.hpp" -namespace zs +namespace zmq { // Implements socket polling mechanism using the BSD-specific diff --git a/src/listener.cpp b/src/listener.cpp index ae4a80f..823b21b 100644 --- a/src/listener.cpp +++ b/src/listener.cpp @@ -27,7 +27,7 @@ #include "dummy_aggregator.hpp" #include "dummy_distributor.hpp" -zs::listener_t::listener_t (io_thread_t *thread_, const char *addr_, +zmq::listener_t::listener_t (io_thread_t *thread_, const char *addr_, session_t *peer_, bool has_in_, bool has_out_, uint64_t taskset_) : io_object_t (thread_), poller (NULL), @@ -39,30 +39,30 @@ zs::listener_t::listener_t (io_thread_t *thread_, const char *addr_, { } -void zs::listener_t::terminate () +void zmq::listener_t::terminate () { for (session_stubs_t::size_type i = 0; i != session_stubs.size (); i++) session_stubs [i]->terminate (); delete this; } -void zs::listener_t::shutdown () +void zmq::listener_t::shutdown () { for (session_stubs_t::size_type i = 0; i != session_stubs.size (); i++) session_stubs [i]->shutdown (); delete this; } -zs::listener_t::~listener_t () +zmq::listener_t::~listener_t () { } -void zs::listener_t::got_identity (session_stub_t *session_stub_, +void zmq::listener_t::got_identity (session_stub_t *session_stub_, const char *identity_) { // Get the engine allready disconnected from the stub and poller. i_engine *engine = session_stub_->detach_engine (); - zs_assert (engine); + zmq_assert (engine); // Find the corresponding session. session_t *session; @@ -89,11 +89,11 @@ void zs::listener_t::got_identity (session_stub_t *session_stub_, // sure that the peer session won't get deallocated till it processes // the subsequent bind command. i_mux *mux = new dummy_aggregator_t; - zs_assert (mux); + zmq_assert (mux); i_demux *demux = new dummy_distributor_t; - zs_assert (demux); + zmq_assert (demux); session = new session_t (io_thread, io_thread, mux, demux, false, true); - zs_assert (session); + zmq_assert (session); session->inc_seqnum (); session->inc_seqnum (); peer->inc_seqnum (); @@ -104,14 +104,14 @@ void zs::listener_t::got_identity (session_stub_t *session_stub_, send_engine (session, engine); } -void zs::listener_t::process_reg (simple_semaphore_t *smph_) +void zmq::listener_t::process_reg (simple_semaphore_t *smph_) { - zs_assert (!poller); + zmq_assert (!poller); poller = get_poller (); // Open the listening socket. int rc = tcp_listener.open (addr.c_str ()); - zs_assert (rc == 0); + zmq_assert (rc == 0); // Unlock the application thread that created the listener. if (smph_) @@ -122,10 +122,10 @@ void zs::listener_t::process_reg (simple_semaphore_t *smph_) poller->set_pollin (handle); } -void zs::listener_t::process_unreg (simple_semaphore_t *smph_) +void zmq::listener_t::process_unreg (simple_semaphore_t *smph_) { // Disassociate listener from the poller. - zs_assert (poller); + zmq_assert (poller); poller->rm_fd (handle); poller = NULL; @@ -134,7 +134,7 @@ void zs::listener_t::process_unreg (simple_semaphore_t *smph_) smph_->post (); } -void zs::listener_t::in_event () +void zmq::listener_t::in_event () { fd_t fd = tcp_listener.accept (); @@ -146,25 +146,25 @@ void zs::listener_t::in_event () // Create an session stub for the engine to take care for it till its // identity is retreived. session_stub_t *session_stub = new session_stub_t (this); - zs_assert (session_stub); + zmq_assert (session_stub); session_stub->set_index (session_stubs.size ()); session_stubs.push_back (session_stub); // Create an engine to encaspulate the socket. Engine will register itself // with the stub so the stub will be able to free it in case of shutdown. zmq_tcp_engine_t *engine = new zmq_tcp_engine_t (fd); - zs_assert (engine); + zmq_assert (engine); engine->attach (poller, session_stub); } -void zs::listener_t::out_event () +void zmq::listener_t::out_event () { - zs_assert (false); + zmq_assert (false); } -void zs::listener_t::timer_event () +void zmq::listener_t::timer_event () { - zs_assert (false); + zmq_assert (false); } diff --git a/src/listener.hpp b/src/listener.hpp index f3c745a..2fe93db 100644 --- a/src/listener.hpp +++ b/src/listener.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZS_LISTENER_HPP_INCLUDED__ -#define __ZS_LISTENER_HPP_INCLUDED__ +#ifndef __ZMQ_LISTENER_HPP_INCLUDED__ +#define __ZMQ_LISTENER_HPP_INCLUDED__ #include #include @@ -30,7 +30,7 @@ #include "i_poll_events.hpp" #include "stdint.hpp" -namespace zs +namespace zmq { class listener_t : public io_object_t, public i_poll_events diff --git a/src/load_balancer.cpp b/src/load_balancer.cpp index 63dc15c..0d382a1 100644 --- a/src/load_balancer.cpp +++ b/src/load_balancer.cpp @@ -17,7 +17,7 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "load_balancer.hpp" #include "pipe_writer.hpp" @@ -25,26 +25,26 @@ #include "session.hpp" #include "msg.hpp" -zs::load_balancer_t::load_balancer_t () : +zmq::load_balancer_t::load_balancer_t () : session (NULL), current (0) { } -void zs::load_balancer_t::set_session (session_t *session_) +void zmq::load_balancer_t::set_session (session_t *session_) { - zs_assert (!session); + zmq_assert (!session); session = session_; } -void zs::load_balancer_t::shutdown () +void zmq::load_balancer_t::shutdown () { // No need to deallocate pipes here. They'll be deallocated during the // shutdown of the dispatcher. delete this; } -void zs::load_balancer_t::terminate () +void zmq::load_balancer_t::terminate () { // Pipe unregisters itself during the call to terminate, so the pipes // list shinks by one in each iteration. @@ -54,11 +54,11 @@ void zs::load_balancer_t::terminate () delete this; } -zs::load_balancer_t::~load_balancer_t () +zmq::load_balancer_t::~load_balancer_t () { } -void zs::load_balancer_t::attach_pipe (pipe_writer_t *pipe_) +void zmq::load_balancer_t::attach_pipe (pipe_writer_t *pipe_) { // Associate demux with a new pipe. pipe_->set_demux (this); @@ -66,7 +66,7 @@ void zs::load_balancer_t::attach_pipe (pipe_writer_t *pipe_) pipes.push_back (pipe_); } -void zs::load_balancer_t::detach_pipe (pipe_writer_t *pipe_) +void zmq::load_balancer_t::detach_pipe (pipe_writer_t *pipe_) { // Release the reference to the pipe. int index = pipe_->get_index (); @@ -76,12 +76,12 @@ void zs::load_balancer_t::detach_pipe (pipe_writer_t *pipe_) pipes.pop_back (); } -bool zs::load_balancer_t::empty () +bool zmq::load_balancer_t::empty () { return pipes.empty (); } -bool zs::load_balancer_t::send (zs_msg *msg_) +bool zmq::load_balancer_t::send (zmq_msg *msg_) { // If there are no pipes, message cannot be sent. if (pipes.size () == 0) @@ -105,12 +105,12 @@ bool zs::load_balancer_t::send (zs_msg *msg_) current = (current + 1) % pipes.size (); // Detach the original message from the data buffer. - zs_msg_init (msg_); + zmq_msg_init (msg_); return true; } -void zs::load_balancer_t::flush () +void zmq::load_balancer_t::flush () { // Flush all pipes. If there's large number of pipes, it can be pretty // inefficient (especially if there's new message only in a single pipe). @@ -119,12 +119,12 @@ void zs::load_balancer_t::flush () (*it)->flush (); } -void zs::load_balancer_t::write_to_pipe (class pipe_writer_t *pipe_, - struct zs_msg *msg_) +void zmq::load_balancer_t::write_to_pipe (class pipe_writer_t *pipe_, + struct zmq_msg *msg_) { if (!pipe_->write (msg_)) { // TODO: Push gap notification to the pipe. - zs_assert (false); + zmq_assert (false); } } diff --git a/src/load_balancer.hpp b/src/load_balancer.hpp index 9cdc68f..953ed3b 100644 --- a/src/load_balancer.hpp +++ b/src/load_balancer.hpp @@ -17,14 +17,14 @@ along with this program. If not, see . */ -#ifndef __ZS_LOAD_BALANCER_HPP_INCLUDED__ -#define __ZS_LOAD_BALANCER_HPP_INCLUDED__ +#ifndef __ZMQ_LOAD_BALANCER_HPP_INCLUDED__ +#define __ZMQ_LOAD_BALANCER_HPP_INCLUDED__ #include #include -namespace zs +namespace zmq { // Object to distribute messages to outbound pipes. @@ -42,7 +42,7 @@ namespace zs void attach_pipe (class pipe_writer_t *pipe_); void detach_pipe (class pipe_writer_t *pipe_); bool empty (); - bool send (struct zs_msg *msg_); + bool send (struct zmq_msg *msg_); void flush (); private: @@ -55,7 +55,7 @@ namespace zs // Writes the message to the pipe if possible. If it isn't, writes // a gap notification to the pipe. - void write_to_pipe (class pipe_writer_t *pipe_, struct zs_msg *msg_); + void write_to_pipe (class pipe_writer_t *pipe_, struct zmq_msg *msg_); // The list of outbound pipes. typedef std::vector pipes_t; diff --git a/src/msg.hpp b/src/msg.hpp index f4f4d26..4f35961 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -17,31 +17,31 @@ along with this program. If not, see . */ -#ifndef __ZS_MSG_HPP_INCLUDE__ -#define __ZS_MSG_HPP_INCLUDE__ +#ifndef __ZMQ_MSG_HPP_INCLUDE__ +#define __ZMQ_MSG_HPP_INCLUDE__ #include -#include "../include/zs.h" +#include "../include/zmq.h" #include "atomic_counter.hpp" -//namespace zs +//namespace zmq //{ // Shared message buffer. Message data are either allocated in one - // continguous block along with this structure - thus avoiding one + // continuous block along with this structure - thus avoiding one // malloc/free pair or they are stored in used-supplied memory. // In the latter case, ffn member stores pointer to the function to be // used to deallocate the data. If the buffer is actually shared (there // are at least 2 references to it) refcount member contains number of // references. - struct zs_msg_content + struct zmq_msg_content { void *data; size_t size; - zs_free_fn *ffn; - zs::atomic_counter_t refcnt; + zmq_free_fn *ffn; + zmq::atomic_counter_t refcnt; }; //} diff --git a/src/mutex.hpp b/src/mutex.hpp index a7f95da..9b51955 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -17,19 +17,19 @@ along with this program. If not, see . */ -#ifndef __ZS_MUTEX_HPP_INCLUDED__ -#define __ZS_MUTEX_HPP_INCLUDED__ +#ifndef __ZMQ_MUTEX_HPP_INCLUDED__ +#define __ZMQ_MUTEX_HPP_INCLUDED__ #include "platform.hpp" #include "err.hpp" // Mutex class encapsulates OS mutex in a platform-independent way. -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS #include "windows.hpp" -namespace zs +namespace zmq { class mutex_t @@ -70,7 +70,7 @@ namespace zs #include -namespace zs +namespace zmq { class mutex_t diff --git a/src/object.cpp b/src/object.cpp index 8a154ae..a9370ab 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -27,33 +27,33 @@ #include "simple_semaphore.hpp" #include "i_engine.hpp" -zs::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) : +zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) : dispatcher (dispatcher_), thread_slot (thread_slot_) { } -zs::object_t::object_t (object_t *parent_) : +zmq::object_t::object_t (object_t *parent_) : dispatcher (parent_->dispatcher), thread_slot (parent_->thread_slot) { } -zs::object_t::~object_t () +zmq::object_t::~object_t () { } -int zs::object_t::thread_slot_count () +int zmq::object_t::thread_slot_count () { return dispatcher->thread_slot_count (); } -int zs::object_t::get_thread_slot () +int zmq::object_t::get_thread_slot () { return thread_slot; } -void zs::object_t::process_command (command_t &cmd_) +void zmq::object_t::process_command (command_t &cmd_) { switch (cmd_.type) { @@ -99,11 +99,11 @@ void zs::object_t::process_command (command_t &cmd_) return; default: - zs_assert (false); + zmq_assert (false); } } -void zs::object_t::create_pipe (object_t *reader_parent_, +void zmq::object_t::create_pipe (object_t *reader_parent_, object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, pipe_reader_t **reader_, pipe_writer_t **writer_) { @@ -111,33 +111,33 @@ void zs::object_t::create_pipe (object_t *reader_parent_, reader_, writer_); } -void zs::object_t::destroy_pipe (pipe_t *pipe_) +void zmq::object_t::destroy_pipe (pipe_t *pipe_) { dispatcher->destroy_pipe (pipe_); } -int zs::object_t::register_inproc_endpoint (const char *endpoint_, +int zmq::object_t::register_inproc_endpoint (const char *endpoint_, session_t *session_) { return dispatcher->register_inproc_endpoint (endpoint_, session_); } -zs::object_t *zs::object_t::get_inproc_endpoint (const char *endpoint_) +zmq::object_t *zmq::object_t::get_inproc_endpoint (const char *endpoint_) { return dispatcher->get_inproc_endpoint (endpoint_); } -void zs::object_t::unregister_inproc_endpoints (session_t *session_) +void zmq::object_t::unregister_inproc_endpoints (session_t *session_) { dispatcher->unregister_inproc_endpoints (session_); } -zs::io_thread_t *zs::object_t::choose_io_thread (uint64_t taskset_) +zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) { return dispatcher->choose_io_thread (taskset_); } -void zs::object_t::send_stop () +void zmq::object_t::send_stop () { // Send command goes always to the current object. To-self pipe is // used exclusively for sending this command. @@ -147,7 +147,7 @@ void zs::object_t::send_stop () dispatcher->write (thread_slot, thread_slot, cmd); } -void zs::object_t::send_bind (object_t *destination_, pipe_reader_t *reader_, +void zmq::object_t::send_bind (object_t *destination_, pipe_reader_t *reader_, session_t *peer_) { command_t cmd; @@ -158,7 +158,7 @@ void zs::object_t::send_bind (object_t *destination_, pipe_reader_t *reader_, send_command (cmd); } -void zs::object_t::send_head (object_t *destination_, uint64_t bytes_) +void zmq::object_t::send_head (object_t *destination_, uint64_t bytes_) { command_t cmd; cmd.destination = destination_; @@ -167,7 +167,7 @@ void zs::object_t::send_head (object_t *destination_, uint64_t bytes_) send_command (cmd); } -void zs::object_t::send_tail (object_t *destination_, uint64_t bytes_) +void zmq::object_t::send_tail (object_t *destination_, uint64_t bytes_) { command_t cmd; cmd.destination = destination_; @@ -176,7 +176,7 @@ void zs::object_t::send_tail (object_t *destination_, uint64_t bytes_) send_command (cmd); } -void zs::object_t::send_reg (object_t *destination_, simple_semaphore_t *smph_) +void zmq::object_t::send_reg (object_t *destination_, simple_semaphore_t *smph_) { command_t cmd; cmd.destination = destination_; @@ -185,7 +185,7 @@ void zs::object_t::send_reg (object_t *destination_, simple_semaphore_t *smph_) send_command (cmd); } -void zs::object_t::send_reg_and_bind (object_t *destination_, +void zmq::object_t::send_reg_and_bind (object_t *destination_, session_t *peer_, bool flow_in_, bool flow_out_) { command_t cmd; @@ -197,7 +197,7 @@ void zs::object_t::send_reg_and_bind (object_t *destination_, send_command (cmd); } -void zs::object_t::send_unreg (object_t *destination_, +void zmq::object_t::send_unreg (object_t *destination_, simple_semaphore_t *smph_) { command_t cmd; @@ -207,7 +207,7 @@ void zs::object_t::send_unreg (object_t *destination_, send_command (cmd); } -void zs::object_t::send_engine (object_t *destination_, i_engine *engine_) +void zmq::object_t::send_engine (object_t *destination_, i_engine *engine_) { command_t cmd; cmd.destination = destination_; @@ -216,7 +216,7 @@ void zs::object_t::send_engine (object_t *destination_, i_engine *engine_) send_command (cmd); } -void zs::object_t::send_terminate (object_t *destination_) +void zmq::object_t::send_terminate (object_t *destination_) { command_t cmd; cmd.destination = destination_; @@ -224,7 +224,7 @@ void zs::object_t::send_terminate (object_t *destination_) send_command (cmd); } -void zs::object_t::send_terminate_ack (object_t *destination_) +void zmq::object_t::send_terminate_ack (object_t *destination_) { command_t cmd; cmd.destination = destination_; @@ -232,58 +232,58 @@ void zs::object_t::send_terminate_ack (object_t *destination_) send_command (cmd); } -void zs::object_t::process_stop () +void zmq::object_t::process_stop () { - zs_assert (false); + zmq_assert (false); } -void zs::object_t::process_bind (pipe_reader_t *reader_, session_t *peer_) +void zmq::object_t::process_bind (pipe_reader_t *reader_, session_t *peer_) { - zs_assert (false); + zmq_assert (false); } -void zs::object_t::process_head (uint64_t bytes_) +void zmq::object_t::process_head (uint64_t bytes_) { - zs_assert (false); + zmq_assert (false); } -void zs::object_t::process_tail (uint64_t bytes_) +void zmq::object_t::process_tail (uint64_t bytes_) { - zs_assert (false); + zmq_assert (false); } -void zs::object_t::process_reg (simple_semaphore_t *smph_) +void zmq::object_t::process_reg (simple_semaphore_t *smph_) { - zs_assert (false); + zmq_assert (false); } -void zs::object_t::process_reg_and_bind (session_t *session_, +void zmq::object_t::process_reg_and_bind (session_t *session_, bool flow_in_, bool flow_out_) { - zs_assert (false); + zmq_assert (false); } -void zs::object_t::process_unreg (simple_semaphore_t *smph_) +void zmq::object_t::process_unreg (simple_semaphore_t *smph_) { - zs_assert (false); + zmq_assert (false); } -void zs::object_t::process_engine (i_engine *engine_) +void zmq::object_t::process_engine (i_engine *engine_) { - zs_assert (false); + zmq_assert (false); } -void zs::object_t::process_terminate () +void zmq::object_t::process_terminate () { - zs_assert (false); + zmq_assert (false); } -void zs::object_t::process_terminate_ack () +void zmq::object_t::process_terminate_ack () { - zs_assert (false); + zmq_assert (false); } -void zs::object_t::send_command (command_t &cmd_) +void zmq::object_t::send_command (command_t &cmd_) { int destination_thread_slot = cmd_.destination->get_thread_slot (); if (destination_thread_slot == thread_slot) diff --git a/src/object.hpp b/src/object.hpp index a4f93ae..b2ae334 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_OBJECT_HPP_INCLUDED__ -#define __ZS_OBJECT_HPP_INCLUDED__ +#ifndef __ZMQ_OBJECT_HPP_INCLUDED__ +#define __ZMQ_OBJECT_HPP_INCLUDED__ #include "stdint.hpp" -namespace zs +namespace zmq { // Base class for all objects that participate in inter-thread diff --git a/src/p2p.cpp b/src/p2p.cpp index f15b663..c83d8b1 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -17,13 +17,13 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "p2p.hpp" #include "app_thread.hpp" #include "session.hpp" -zs::p2p_t::p2p_t (app_thread_t *thread_, session_t *session_) : +zmq::p2p_t::p2p_t (app_thread_t *thread_, session_t *session_) : socket_base_t (thread_, session_) { } diff --git a/src/p2p.hpp b/src/p2p.hpp index 1032a61..d3d9dc3 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_P2P_HPP_INCLUDED__ -#define __ZS_P2P_HPP_INCLUDED__ +#ifndef __ZMQ_P2P_HPP_INCLUDED__ +#define __ZMQ_P2P_HPP_INCLUDED__ #include "socket_base.hpp" -namespace zs +namespace zmq { class p2p_t : public socket_base_t diff --git a/src/pipe.cpp b/src/pipe.cpp index 26042ae..bf761b4 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -19,29 +19,29 @@ #include "pipe.hpp" -zs::pipe_t::pipe_t () : - ypipe_t (false), +zmq::pipe_t::pipe_t () : + ypipe_t (false), index (-1) { } -zs::pipe_t::~pipe_t () +zmq::pipe_t::~pipe_t () { // Flush any outstanding messages to the pipe. flush (); // Deallocate all the messages in the pipe. - zs_msg msg; + zmq_msg msg; while (read (&msg)) - zs_msg_close (&msg); + zmq_msg_close (&msg); } -void zs::pipe_t::set_index (int index_) +void zmq::pipe_t::set_index (int index_) { index = index_; } -int zs::pipe_t::get_index () +int zmq::pipe_t::get_index () { return index; } diff --git a/src/pipe.hpp b/src/pipe.hpp index c0e722d..16ac837 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -17,20 +17,20 @@ along with this program. If not, see . */ -#ifndef __ZS_PIPE_HPP_INCLUDED__ -#define __ZS_PIPE_HPP_INCLUDED__ +#ifndef __ZMQ_PIPE_HPP_INCLUDED__ +#define __ZMQ_PIPE_HPP_INCLUDED__ -#include "../include/zs.h" +#include "../include/zmq.h" #include "ypipe.hpp" #include "config.hpp" -namespace zs +namespace zmq { // Message pipe. A simple wrapper on top of ypipe. - class pipe_t : public ypipe_t + class pipe_t : public ypipe_t { // Dispatcher is a friend so that it can create & destroy the pipes. // By making constructor & destructor private we are sure that nobody diff --git a/src/pipe_reader.cpp b/src/pipe_reader.cpp index 5585b92..eea1371 100644 --- a/src/pipe_reader.cpp +++ b/src/pipe_reader.cpp @@ -17,14 +17,14 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "pipe_reader.hpp" #include "pipe.hpp" #include "err.hpp" #include "i_mux.hpp" -zs::pipe_reader_t::pipe_reader_t (object_t *parent_, pipe_t *pipe_, +zmq::pipe_reader_t::pipe_reader_t (object_t *parent_, pipe_t *pipe_, uint64_t hwm_, uint64_t lwm_) : object_t (parent_), pipe (pipe_), @@ -39,37 +39,37 @@ zs::pipe_reader_t::pipe_reader_t (object_t *parent_, pipe_t *pipe_, { } -void zs::pipe_reader_t::set_peer (object_t *peer_) +void zmq::pipe_reader_t::set_peer (object_t *peer_) { peer = peer_; } -zs::pipe_reader_t::~pipe_reader_t () +zmq::pipe_reader_t::~pipe_reader_t () { } -void zs::pipe_reader_t::set_mux (i_mux *mux_) +void zmq::pipe_reader_t::set_mux (i_mux *mux_) { mux = mux_; } -void zs::pipe_reader_t::set_index (int index_) +void zmq::pipe_reader_t::set_index (int index_) { index = index_; } -int zs::pipe_reader_t::get_index () +int zmq::pipe_reader_t::get_index () { return index; } -void zs::pipe_reader_t::process_tail (uint64_t bytes_) +void zmq::pipe_reader_t::process_tail (uint64_t bytes_) { tail = bytes_; mux->reactivate (this); } -bool zs::pipe_reader_t::read (struct zs_msg *msg_) +bool zmq::pipe_reader_t::read (struct zmq_msg *msg_) { // Read a message. if (!pipe->read (msg_)) { @@ -78,7 +78,7 @@ bool zs::pipe_reader_t::read (struct zs_msg *msg_) } // If successfull, adjust the head of the pipe. - head += zs_msg_size (msg_); + head += zmq_msg_size (msg_); // If pipe writer wasn't notified about the head position for long enough, // notify it. @@ -87,7 +87,7 @@ bool zs::pipe_reader_t::read (struct zs_msg *msg_) last_sent_head = head; } - if (zs_msg_type (msg_) == ZS_DELIMITER) { + if (zmq_msg_type (msg_) == ZMQ_DELIMITER) { // Detach the pipe from the mux and send termination request to // the pipe writer. @@ -100,7 +100,7 @@ bool zs::pipe_reader_t::read (struct zs_msg *msg_) return true; } -void zs::pipe_reader_t::terminate () +void zmq::pipe_reader_t::terminate () { // Detach the pipe from the mux and send termination request to // the pipe writer. @@ -111,7 +111,7 @@ void zs::pipe_reader_t::terminate () send_terminate (peer); } -void zs::pipe_reader_t::process_terminate_ack () +void zmq::pipe_reader_t::process_terminate_ack () { // Ask dispatcher to deallocate the pipe. destroy_pipe (pipe); diff --git a/src/pipe_reader.hpp b/src/pipe_reader.hpp index bc3fd2e..4f85988 100644 --- a/src/pipe_reader.hpp +++ b/src/pipe_reader.hpp @@ -17,13 +17,13 @@ along with this program. If not, see . */ -#ifndef __ZS_PIPE_READER_HPP_INCLUDED__ -#define __ZS_PIPE_READER_HPP_INCLUDED__ +#ifndef __ZMQ_PIPE_READER_HPP_INCLUDED__ +#define __ZMQ_PIPE_READER_HPP_INCLUDED__ #include "object.hpp" #include "stdint.hpp" -namespace zs +namespace zmq { class pipe_reader_t : public object_t @@ -41,7 +41,7 @@ namespace zs int get_index (); // Reads a message to the underlying pipe. - bool read (struct zs_msg *msg_); + bool read (struct zmq_msg *msg_); // Asks pipe to destroy itself. void terminate (); diff --git a/src/pipe_writer.cpp b/src/pipe_writer.cpp index 173cf97..a54034b 100644 --- a/src/pipe_writer.cpp +++ b/src/pipe_writer.cpp @@ -17,13 +17,13 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "pipe_writer.hpp" #include "pipe.hpp" #include "i_demux.hpp" -zs::pipe_writer_t::pipe_writer_t (object_t *parent_, pipe_t *pipe_, +zmq::pipe_writer_t::pipe_writer_t (object_t *parent_, pipe_t *pipe_, object_t *peer_, uint64_t hwm_, uint64_t lwm_) : object_t (parent_), pipe (pipe_), @@ -37,34 +37,34 @@ zs::pipe_writer_t::pipe_writer_t (object_t *parent_, pipe_t *pipe_, { } -zs::pipe_writer_t::~pipe_writer_t () +zmq::pipe_writer_t::~pipe_writer_t () { } -void zs::pipe_writer_t::set_demux (i_demux *demux_) +void zmq::pipe_writer_t::set_demux (i_demux *demux_) { demux = demux_; } -void zs::pipe_writer_t::set_index (int index_) +void zmq::pipe_writer_t::set_index (int index_) { index = index_; } -int zs::pipe_writer_t::get_index () +int zmq::pipe_writer_t::get_index () { return index; } -bool zs::pipe_writer_t::write (zs_msg *msg_) +bool zmq::pipe_writer_t::write (zmq_msg *msg_) { - size_t msg_size = zs_msg_size (msg_); + size_t msg_size = zmq_msg_size (msg_); // If message won't fit into the in-memory pipe, there's no way // to pass it further. // TODO: It should be discarded and 'oversized' notification should be // placed into the pipe. - zs_assert (!hwm || msg_size <= hwm); + zmq_assert (!hwm || msg_size <= hwm); // If there's not enough space in the pipe at the moment, return false. if (hwm && tail + msg_size - head > hwm) @@ -78,18 +78,18 @@ bool zs::pipe_writer_t::write (zs_msg *msg_) return true; } -void zs::pipe_writer_t::flush () +void zmq::pipe_writer_t::flush () { if (!pipe->flush ()) send_tail (peer, tail); } -void zs::pipe_writer_t::process_head (uint64_t bytes_) +void zmq::pipe_writer_t::process_head (uint64_t bytes_) { head = bytes_; } -void zs::pipe_writer_t::terminate () +void zmq::pipe_writer_t::terminate () { // Disconnect from the associated demux. if (demux) { @@ -99,15 +99,15 @@ void zs::pipe_writer_t::terminate () // Push the delimiter to the pipe. Delimiter is a notification for pipe // reader that there will be no more messages in the pipe. - zs_msg delimiter; - delimiter.content = (zs_msg_content*) ZS_DELIMITER; + zmq_msg delimiter; + delimiter.content = (zmq_msg_content*) ZMQ_DELIMITER; delimiter.shared = false; delimiter.vsm_size = 0; pipe->write (delimiter); flush (); } -void zs::pipe_writer_t::process_terminate () +void zmq::pipe_writer_t::process_terminate () { // Disconnect from the associated demux. if (demux) { diff --git a/src/pipe_writer.hpp b/src/pipe_writer.hpp index 3b4b4cd..2c5132e 100644 --- a/src/pipe_writer.hpp +++ b/src/pipe_writer.hpp @@ -17,13 +17,13 @@ along with this program. If not, see . */ -#ifndef __ZS_PIPE_WRITER_HPP_INCLUDED__ -#define __ZS_PIPE_WRITER_HPP_INCLUDED__ +#ifndef __ZMQ_PIPE_WRITER_HPP_INCLUDED__ +#define __ZMQ_PIPE_WRITER_HPP_INCLUDED__ #include "object.hpp" #include "stdint.hpp" -namespace zs +namespace zmq { class pipe_writer_t : public object_t @@ -42,7 +42,7 @@ namespace zs // Writes a message to the underlying pipe. Returns false if the // message cannot be written to the pipe at the moment. - bool write (struct zs_msg *msg_); + bool write (struct zmq_msg *msg_); // Flush the messages downsteam. void flush (); diff --git a/src/platform.hpp.in b/src/platform.hpp.in index 108d50d..97be183 100644 --- a/src/platform.hpp.in +++ b/src/platform.hpp.in @@ -140,43 +140,43 @@ #undef VERSION /* Force to use mutexes */ -#undef ZS_FORCE_MUTEXES +#undef ZMQ_FORCE_MUTEXES /* Have AIX OS */ -#undef ZS_HAVE_AIX +#undef ZMQ_HAVE_AIX /* Have eventfd extension. */ -#undef ZS_HAVE_EVENTFD +#undef ZMQ_HAVE_EVENTFD /* Have FreeBSD OS */ -#undef ZS_HAVE_FREEBSD +#undef ZMQ_HAVE_FREEBSD /* Have HPUX OS */ -#undef ZS_HAVE_HPUX +#undef ZMQ_HAVE_HPUX /* Have ifaddrs.h header. */ -#undef ZS_HAVE_IFADDRS +#undef ZMQ_HAVE_IFADDRS /* Have Linux OS */ -#undef ZS_HAVE_LINUX +#undef ZMQ_HAVE_LINUX /* Have MinGW32 */ -#undef ZS_HAVE_MINGW32 +#undef ZMQ_HAVE_MINGW32 /* Have OpenBSD OS */ -#undef ZS_HAVE_OPENBSD +#undef ZMQ_HAVE_OPENBSD /* Have DarwinOSX OS */ -#undef ZS_HAVE_OSX +#undef ZMQ_HAVE_OSX /* Have QNX Neutrino OS */ -#undef ZS_HAVE_QNXNTO +#undef ZMQ_HAVE_QNXNTO /* Have Solaris OS */ -#undef ZS_HAVE_SOLARIS +#undef ZMQ_HAVE_SOLARIS /* Have Windows OS */ -#undef ZS_HAVE_WINDOWS +#undef ZMQ_HAVE_WINDOWS /* Define for Solaris 2.5.1 so the uint32_t typedef from , , or is not used. If the typedef was allowed, the diff --git a/src/poll.cpp b/src/poll.cpp index 59a0cd7..864cfad 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -19,10 +19,10 @@ #include "platform.hpp" -#if defined ZS_HAVE_LINUX || defined ZS_HAVE_FREEBSD ||\ - defined ZS_HAVE_OPENBSD || defined ZS_HAVE_SOLARIS ||\ - defined ZS_HAVE_OSX || defined ZS_HAVE_QNXNTO ||\ - defined ZS_HAVE_HPUX || defined ZS_HAVE_AIX +#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ + defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ + defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ + defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX #include #include @@ -35,7 +35,7 @@ #include "config.hpp" #include "i_poll_events.hpp" -zs::poll_t::poll_t () : +zmq::poll_t::poll_t () : retired (false), stopping (false) { @@ -50,7 +50,7 @@ zs::poll_t::poll_t () : fd_table [i].index = retired_fd; } -zs::handle_t zs::poll_t::add_fd (fd_t fd_, i_poll_events *events_) +zmq::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) { pollfd pfd = {fd_, 0, 0}; pollset.push_back (pfd); @@ -67,7 +67,7 @@ zs::handle_t zs::poll_t::add_fd (fd_t fd_, i_poll_events *events_) return handle; } -void zs::poll_t::rm_fd (handle_t handle_) +void zmq::poll_t::rm_fd (handle_t handle_) { fd_t index = fd_table [handle_.fd].index; assert (index != retired_fd); @@ -81,63 +81,63 @@ void zs::poll_t::rm_fd (handle_t handle_) load.sub (1); } -void zs::poll_t::set_pollin (handle_t handle_) +void zmq::poll_t::set_pollin (handle_t handle_) { int index = fd_table [handle_.fd].index; pollset [index].events |= POLLIN; } -void zs::poll_t::reset_pollin (handle_t handle_) +void zmq::poll_t::reset_pollin (handle_t handle_) { int index = fd_table [handle_.fd].index; pollset [index].events &= ~((short) POLLIN); } -void zs::poll_t::set_pollout (handle_t handle_) +void zmq::poll_t::set_pollout (handle_t handle_) { int index = fd_table [handle_.fd].index; pollset [index].events |= POLLOUT; } -void zs::poll_t::reset_pollout (handle_t handle_) +void zmq::poll_t::reset_pollout (handle_t handle_) { int index = fd_table [handle_.fd].index; pollset [index].events &= ~((short) POLLOUT); } -void zs::poll_t::add_timer (i_poll_events *events_) +void zmq::poll_t::add_timer (i_poll_events *events_) { timers.push_back (events_); } -void zs::poll_t::cancel_timer (i_poll_events *events_) +void zmq::poll_t::cancel_timer (i_poll_events *events_) { timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); if (it != timers.end ()) timers.erase (it); } -int zs::poll_t::get_load () +int zmq::poll_t::get_load () { return load.get (); } -void zs::poll_t::start () +void zmq::poll_t::start () { worker.start (worker_routine, this); } -void zs::poll_t::stop () +void zmq::poll_t::stop () { stopping = true; } -void zs::poll_t::join () +void zmq::poll_t::join () { worker.stop (); } -void zs::poll_t::loop () +void zmq::poll_t::loop () { while (!stopping) { @@ -166,7 +166,7 @@ void zs::poll_t::loop () for (pollset_t::iterator it = pollset.begin (); it != pollset.end (); it ++) { - zs_assert (!(it->revents & POLLNVAL)); + zmq_assert (!(it->revents & POLLNVAL)); if (it->fd == retired_fd) continue; if (it->revents & (POLLERR | POLLHUP)) @@ -197,7 +197,7 @@ void zs::poll_t::loop () } } -void zs::poll_t::worker_routine (void *arg_) +void zmq::poll_t::worker_routine (void *arg_) { ((poll_t*) arg_)->loop (); } diff --git a/src/poll.hpp b/src/poll.hpp index 65095f7..dbfa776 100644 --- a/src/poll.hpp +++ b/src/poll.hpp @@ -17,15 +17,15 @@ along with this program. If not, see . */ -#ifndef __ZS_POLL_HPP_INCLUDED__ -#define __ZS_POLL_HPP_INCLUDED__ +#ifndef __ZMQ_POLL_HPP_INCLUDED__ +#define __ZMQ_POLL_HPP_INCLUDED__ #include "platform.hpp" -#if defined ZS_HAVE_LINUX || defined ZS_HAVE_FREEBSD ||\ - defined ZS_HAVE_OPENBSD || defined ZS_HAVE_SOLARIS ||\ - defined ZS_HAVE_OSX || defined ZS_HAVE_QNXNTO ||\ - defined ZS_HAVE_HPUX || defined ZS_HAVE_AIX +#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ + defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ + defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ + defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX #include #include @@ -36,7 +36,7 @@ #include "thread.hpp" #include "atomic_counter.hpp" -namespace zs +namespace zmq { // Implements socket polling mechanism using the POSIX.1-2001 diff --git a/src/pub.cpp b/src/pub.cpp index 70add18..5dca0b8 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -17,20 +17,20 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "pub.hpp" #include "app_thread.hpp" #include "session.hpp" #include "err.hpp" -zs::pub_t::pub_t (app_thread_t *thread_, session_t *session_) : +zmq::pub_t::pub_t (app_thread_t *thread_, session_t *session_) : socket_base_t (thread_, session_) { disable_in (); } -int zs::pub_t::recv (struct zs_msg *msg_, int flags_) +int zmq::pub_t::recv (struct zmq_msg *msg_, int flags_) { // Publisher socket has no recv function. errno = ENOTSUP; diff --git a/src/pub.hpp b/src/pub.hpp index b071318..909e731 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_PUB_HPP_INCLUDED__ -#define __ZS_PUB_HPP_INCLUDED__ +#ifndef __ZMQ_PUB_HPP_INCLUDED__ +#define __ZMQ_PUB_HPP_INCLUDED__ #include "socket_base.hpp" -namespace zs +namespace zmq { class pub_t : public socket_base_t @@ -32,7 +32,7 @@ namespace zs pub_t (class app_thread_t *thread_, class session_t *session_); // i_api overloads. - int recv (struct zs_msg *msg_, int flags_); + int recv (struct zmq_msg *msg_, int flags_); private: diff --git a/src/rep.cpp b/src/rep.cpp index 586c7ed..60767e1 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -17,13 +17,13 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "rep.hpp" #include "app_thread.hpp" #include "session.hpp" -zs::rep_t::rep_t (app_thread_t *thread_, session_t *session_) : +zmq::rep_t::rep_t (app_thread_t *thread_, session_t *session_) : socket_base_t (thread_, session_) { } diff --git a/src/rep.hpp b/src/rep.hpp index e11eaa3..92d2758 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_REP_HPP_INCLUDED__ -#define __ZS_REP_HPP_INCLUDED__ +#ifndef __ZMQ_REP_HPP_INCLUDED__ +#define __ZMQ_REP_HPP_INCLUDED__ #include "socket_base.hpp" -namespace zs +namespace zmq { class rep_t : public socket_base_t diff --git a/src/req.cpp b/src/req.cpp index d7a9ad7..01018f5 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -17,13 +17,13 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "req.hpp" #include "app_thread.hpp" #include "session.hpp" -zs::req_t::req_t (app_thread_t *thread_, session_t *session_) : +zmq::req_t::req_t (app_thread_t *thread_, session_t *session_) : socket_base_t (thread_, session_) { } diff --git a/src/req.hpp b/src/req.hpp index 678897d..c279f0e 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_REQ_HPP_INCLUDED__ -#define __ZS_REQ_HPP_INCLUDED__ +#ifndef __ZMQ_REQ_HPP_INCLUDED__ +#define __ZMQ_REQ_HPP_INCLUDED__ #include "socket_base.hpp" -namespace zs +namespace zmq { class req_t : public socket_base_t diff --git a/src/safe_object.cpp b/src/safe_object.cpp index a7c72e6..5a5ab8b 100644 --- a/src/safe_object.cpp +++ b/src/safe_object.cpp @@ -19,7 +19,7 @@ #include "safe_object.hpp" -zs::safe_object_t::safe_object_t (class dispatcher_t *dispatcher_, +zmq::safe_object_t::safe_object_t (class dispatcher_t *dispatcher_, int thread_slot_) : object_t (dispatcher_, thread_slot_), processed_seqnum (0), @@ -27,14 +27,14 @@ zs::safe_object_t::safe_object_t (class dispatcher_t *dispatcher_, { } -zs::safe_object_t::safe_object_t (object_t *parent_) : +zmq::safe_object_t::safe_object_t (object_t *parent_) : object_t (parent_), processed_seqnum (0), terminating (false) { } -void zs::safe_object_t::inc_seqnum () +void zmq::safe_object_t::inc_seqnum () { // This function is called from the sender thread to ensure that this // object will still exist when the command sent to it arrives in the @@ -42,7 +42,7 @@ void zs::safe_object_t::inc_seqnum () sent_seqnum.add (1); } -void zs::safe_object_t::process_command (struct command_t &cmd_) +void zmq::safe_object_t::process_command (struct command_t &cmd_) { object_t::process_command (cmd_); @@ -55,7 +55,7 @@ void zs::safe_object_t::process_command (struct command_t &cmd_) delete this; } -void zs::safe_object_t::terminate () +void zmq::safe_object_t::terminate () { // Wait till all commands sent to this session are processed. terminating = true; @@ -66,11 +66,11 @@ void zs::safe_object_t::terminate () delete this; } -bool zs::safe_object_t::is_terminating () +bool zmq::safe_object_t::is_terminating () { return terminating; } -zs::safe_object_t::~safe_object_t () +zmq::safe_object_t::~safe_object_t () { } diff --git a/src/safe_object.hpp b/src/safe_object.hpp index efa0d2f..8bdd41c 100644 --- a/src/safe_object.hpp +++ b/src/safe_object.hpp @@ -17,13 +17,13 @@ along with this program. If not, see . */ -#ifndef __ZS_SAFE_OBJECT_HPP_INCLUDED__ -#define __ZS_SAFE_OBJECT_HPP_INCLUDED__ +#ifndef __ZMQ_SAFE_OBJECT_HPP_INCLUDED__ +#define __ZMQ_SAFE_OBJECT_HPP_INCLUDED__ #include "object.hpp" #include "atomic_counter.hpp" -namespace zs +namespace zmq { // Same as object_t with the exception of termination mechanism. While diff --git a/src/select.cpp b/src/select.cpp index 9776db3..68ec9a0 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -22,13 +22,13 @@ #include #include -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS #include "winsock2.h" -#elif defined ZS_HAVE_HPUX +#elif defined ZMQ_HAVE_HPUX #include #include #include -#elif defined ZS_HAVE_OPENVMS +#elif defined ZMQ_HAVE_OPENVMS #include #include #else @@ -40,7 +40,7 @@ #include "config.hpp" #include "i_poll_events.hpp" -zs::select_t::select_t () : +zmq::select_t::select_t () : maxfd (retired_fd), retired (false), stopping (false) @@ -51,7 +51,7 @@ zs::select_t::select_t () : FD_ZERO (&source_set_err); } -zs::handle_t zs::select_t::add_fd (fd_t fd_, i_poll_events *events_) +zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) { // Store the file descriptor. fd_entry_t entry = {fd_, events_}; @@ -72,7 +72,7 @@ zs::handle_t zs::select_t::add_fd (fd_t fd_, i_poll_events *events_) return handle; } -void zs::select_t::rm_fd (handle_t handle_) +void zmq::select_t::rm_fd (handle_t handle_) { // Get file descriptor. fd_t fd = handle_.fd; @@ -82,7 +82,7 @@ void zs::select_t::rm_fd (handle_t handle_) for (it = fds.begin (); it != fds.end (); it ++) if (it->fd == fd) break; - zs_assert (it != fds.end ()); + zmq_assert (it != fds.end ()); it->fd = retired_fd; retired = true; @@ -109,59 +109,59 @@ void zs::select_t::rm_fd (handle_t handle_) load.sub (1); } -void zs::select_t::set_pollin (handle_t handle_) +void zmq::select_t::set_pollin (handle_t handle_) { FD_SET (handle_.fd, &source_set_in); } -void zs::select_t::reset_pollin (handle_t handle_) +void zmq::select_t::reset_pollin (handle_t handle_) { FD_CLR (handle_.fd, &source_set_in); } -void zs::select_t::set_pollout (handle_t handle_) +void zmq::select_t::set_pollout (handle_t handle_) { FD_SET (handle_.fd, &source_set_out); } -void zs::select_t::reset_pollout (handle_t handle_) +void zmq::select_t::reset_pollout (handle_t handle_) { FD_CLR (handle_.fd, &source_set_out); } -void zs::select_t::add_timer (i_poll_events *events_) +void zmq::select_t::add_timer (i_poll_events *events_) { timers.push_back (events_); } -void zs::select_t::cancel_timer (i_poll_events *events_) +void zmq::select_t::cancel_timer (i_poll_events *events_) { timers_t::iterator it = std::find (timers.begin (), timers.end (), events_); if (it != timers.end ()) timers.erase (it); } -int zs::select_t::get_load () +int zmq::select_t::get_load () { return load.get (); } -void zs::select_t::start () +void zmq::select_t::start () { worker.start (worker_routine, this); } -void zs::select_t::stop () +void zmq::select_t::stop () { stopping = true; } -void zs::select_t::join () +void zmq::select_t::join () { worker.stop (); } -void zs::select_t::loop () +void zmq::select_t::loop () { while (!stopping) { @@ -179,7 +179,7 @@ void zs::select_t::loop () int rc = select (maxfd + 1, &readfds, &writefds, &exceptfds, timers.empty () ? NULL : &timeout); -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); #else if (rc == -1 && errno == EINTR) @@ -230,7 +230,7 @@ void zs::select_t::loop () } } -void zs::select_t::worker_routine (void *arg_) +void zmq::select_t::worker_routine (void *arg_) { ((select_t*) arg_)->loop (); } diff --git a/src/select.hpp b/src/select.hpp index 7151f84..c1e72a7 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -17,17 +17,17 @@ along with this program. If not, see . */ -#ifndef __ZS_SELECT_HPP_INCLUDED__ -#define __ZS_SELECT_HPP_INCLUDED__ +#ifndef __ZMQ_SELECT_HPP_INCLUDED__ +#define __ZMQ_SELECT_HPP_INCLUDED__ #include "platform.hpp" #include #include -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS #include "winsock2.h" -#elif defined ZS_HAVE_OPENVMS +#elif defined ZMQ_HAVE_OPENVMS #include #include #else @@ -39,7 +39,7 @@ #include "thread.hpp" #include "atomic_counter.hpp" -namespace zs +namespace zmq { // Implements socket polling mechanism using POSIX.1-2001 select() diff --git a/src/session.cpp b/src/session.cpp index 63868b2..b9a450d 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -17,7 +17,7 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "session.hpp" #include "i_engine.hpp" @@ -30,7 +30,7 @@ #include "pipe_writer.hpp" #include "simple_semaphore.hpp" -zs::session_t::session_t (object_t *parent_, i_thread *thread_, +zmq::session_t::session_t (object_t *parent_, i_thread *thread_, i_mux *mux_, i_demux *demux_, bool terminate_on_disconnect_, bool terminate_on_no_pipes_) : safe_object_t (parent_), @@ -45,7 +45,7 @@ zs::session_t::session_t (object_t *parent_, i_thread *thread_, { // At least one way to terminate the session should be allowed. Otherwise // the session can be orphaned forever. - zs_assert (terminate_on_disconnect || terminate_on_no_pipes_delayed); + zmq_assert (terminate_on_disconnect || terminate_on_no_pipes_delayed); // Give the mux and the demux callback pointer to ourselves. if (mux) @@ -54,7 +54,7 @@ zs::session_t::session_t (object_t *parent_, i_thread *thread_, demux->set_session (this); } -void zs::session_t::shutdown () +void zmq::session_t::shutdown () { // Session may live even without an associated engine, thus we have // to check if for NULL value. @@ -70,7 +70,7 @@ void zs::session_t::shutdown () delete this; } -void zs::session_t::disconnected () +void zmq::session_t::disconnected () { // It's engine who calls this function so there's no need to deallocate // the engine. Just drop the reference. @@ -84,7 +84,7 @@ void zs::session_t::disconnected () terminate (); } -void zs::session_t::bind (object_t *peer_, bool in_, bool out_) +void zmq::session_t::bind (object_t *peer_, bool in_, bool out_) { // Create the out pipe (if required). pipe_reader_t *pipe_reader = NULL; @@ -107,13 +107,13 @@ void zs::session_t::bind (object_t *peer_, bool in_, bool out_) send_bind (peer_, pipe_reader, in_ ? this : NULL); } -void zs::session_t::revive () +void zmq::session_t::revive () { if (engine) engine->revive (); } -void zs::session_t::terminate () +void zmq::session_t::terminate () { // Terminate is always called by engine, thus it'll terminate itself, // we just have to drop the pointer. @@ -138,7 +138,7 @@ void zs::session_t::terminate () safe_object_t::terminate (); } -zs::session_t::~session_t () +zmq::session_t::~session_t () { // When session is actually deallocated it unregisters from its thread. // Unregistration cannot be done earlier as it would result in memory @@ -146,44 +146,44 @@ zs::session_t::~session_t () thread->detach_session (this); } -void zs::session_t::set_engine (i_engine *engine_) +void zmq::session_t::set_engine (i_engine *engine_) { - zs_assert (!engine || !engine_); + zmq_assert (!engine || !engine_); engine = engine_; } -void zs::session_t::set_index (int index_) +void zmq::session_t::set_index (int index_) { index = index_; } -int zs::session_t::get_index () +int zmq::session_t::get_index () { return index; } -bool zs::session_t::write (zs_msg *msg_) +bool zmq::session_t::write (zmq_msg *msg_) { return demux->send (msg_); } -void zs::session_t::flush () +void zmq::session_t::flush () { demux->flush (); } -bool zs::session_t::read (zs_msg *msg_) +bool zmq::session_t::read (zmq_msg *msg_) { bool retrieved = mux->recv (msg_); if (terminate_on_no_pipes && mux->empty () && demux->empty ()) { - zs_assert (engine); + zmq_assert (engine); engine->schedule_terminate (); terminate (); } return retrieved; } -void zs::session_t::process_bind (pipe_reader_t *reader_, session_t *peer_) +void zmq::session_t::process_bind (pipe_reader_t *reader_, session_t *peer_) { if (is_terminating ()) { @@ -223,9 +223,9 @@ void zs::session_t::process_bind (pipe_reader_t *reader_, session_t *peer_) } } -void zs::session_t::process_reg (simple_semaphore_t *smph_) +void zmq::session_t::process_reg (simple_semaphore_t *smph_) { - zs_assert (!is_terminating ()); + zmq_assert (!is_terminating ()); // Add the session to the list of sessions associated with this I/O thread. // This way the session will be deallocated on the terminal shutdown. @@ -236,10 +236,10 @@ void zs::session_t::process_reg (simple_semaphore_t *smph_) smph_->post (); } -void zs::session_t::process_reg_and_bind (session_t *peer_, +void zmq::session_t::process_reg_and_bind (session_t *peer_, bool flow_in_, bool flow_out_) { - zs_assert (!is_terminating ()); + zmq_assert (!is_terminating ()); // Add the session to the list of sessions associated with this I/O thread. // This way the session will be deallocated on the terminal shutdown. @@ -260,7 +260,7 @@ void zs::session_t::process_reg_and_bind (session_t *peer_, send_bind (peer_, pipe_reader, flow_in_ ? this : NULL); } -void zs::session_t::process_engine (i_engine *engine_) +void zmq::session_t::process_engine (i_engine *engine_) { if (is_terminating ()) { diff --git a/src/session.hpp b/src/session.hpp index 3cdace2..855dd1d 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -17,15 +17,15 @@ along with this program. If not, see . */ -#ifndef __ZS_SESSION_HPP_INCLUDED__ -#define __ZS_SESSION_HPP_INCLUDED__ +#ifndef __ZMQ_SESSION_HPP_INCLUDED__ +#define __ZMQ_SESSION_HPP_INCLUDED__ #include "i_session.hpp" #include "safe_object.hpp" #include "stdint.hpp" #include "atomic_counter.hpp" -namespace zs +namespace zmq { // Object that encapsulates both mux and demux. @@ -42,8 +42,8 @@ namespace zs // i_session implementation void set_engine (struct i_engine *engine_); void shutdown (); - bool read (struct zs_msg *msg_); - bool write (struct zs_msg *msg_); + bool read (struct zmq_msg *msg_); + bool write (struct zmq_msg *msg_); void flush (); // Called by the engine when it is being closed. diff --git a/src/session_stub.cpp b/src/session_stub.cpp index 3bebcb0..152b9fb 100644 --- a/src/session_stub.cpp +++ b/src/session_stub.cpp @@ -19,14 +19,14 @@ #include -#include "../include/zs.h" +#include "../include/zmq.h" #include "session_stub.hpp" #include "i_engine.hpp" #include "listener.hpp" #include "err.hpp" -zs::session_stub_t::session_stub_t (listener_t *listener_) : +zmq::session_stub_t::session_stub_t (listener_t *listener_) : state (reading_identity), engine (NULL), listener (listener_), @@ -34,42 +34,42 @@ zs::session_stub_t::session_stub_t (listener_t *listener_) : { } -void zs::session_stub_t::terminate () +void zmq::session_stub_t::terminate () { if (engine) engine->terminate (); delete this; } -void zs::session_stub_t::shutdown () +void zmq::session_stub_t::shutdown () { if (engine) engine->shutdown (); delete this; } -zs::session_stub_t::~session_stub_t () +zmq::session_stub_t::~session_stub_t () { } -void zs::session_stub_t::set_engine (i_engine *engine_) +void zmq::session_stub_t::set_engine (i_engine *engine_) { - zs_assert (!engine_ || !engine); + zmq_assert (!engine_ || !engine); engine = engine_; } -bool zs::session_stub_t::read (struct zs_msg *msg_) +bool zmq::session_stub_t::read (struct zmq_msg *msg_) { // No messages are sent to the connecting peer. return false; } -bool zs::session_stub_t::write (struct zs_msg *msg_) +bool zmq::session_stub_t::write (struct zmq_msg *msg_) { // The first message arrived is the connection identity. if (state == reading_identity) { - identity = std::string ((const char*) zs_msg_data (msg_), - zs_msg_size (msg_)); + identity = std::string ((const char*) zmq_msg_data (msg_), + zmq_msg_size (msg_)); state = has_identity; return true; } @@ -78,7 +78,7 @@ bool zs::session_stub_t::write (struct zs_msg *msg_) return false; } -void zs::session_stub_t::flush () +void zmq::session_stub_t::flush () { // We have the identity. At this point we can find the correct session and // attach it to the connection. @@ -91,7 +91,7 @@ void zs::session_stub_t::flush () } } -zs::i_engine *zs::session_stub_t::detach_engine () +zmq::i_engine *zmq::session_stub_t::detach_engine () { // Ask engine to unregister from the poller. i_engine *e = engine; @@ -99,12 +99,12 @@ zs::i_engine *zs::session_stub_t::detach_engine () return e; } -void zs::session_stub_t::set_index (int index_) +void zmq::session_stub_t::set_index (int index_) { index = index_; } -int zs::session_stub_t::get_index () +int zmq::session_stub_t::get_index () { return index; } diff --git a/src/session_stub.hpp b/src/session_stub.hpp index 2e882f8..4499e45 100644 --- a/src/session_stub.hpp +++ b/src/session_stub.hpp @@ -17,14 +17,14 @@ along with this program. If not, see . */ -#ifndef __ZS_SESSION_STUB_HPP_INCLUDED__ -#define __ZS_SESSION_STUB_HPP_INCLUDED__ +#ifndef __ZMQ_SESSION_STUB_HPP_INCLUDED__ +#define __ZMQ_SESSION_STUB_HPP_INCLUDED__ #include #include "i_session.hpp" -namespace zs +namespace zmq { // This class is used instead of regular session till the identity of @@ -41,8 +41,8 @@ namespace zs void set_engine (struct i_engine *engine_); void terminate (); void shutdown (); - bool read (struct zs_msg *msg_); - bool write (struct zs_msg *msg_); + bool read (struct zmq_msg *msg_); + bool write (struct zmq_msg *msg_); void flush (); // Detaches engine from the stub. Returns it to the caller. diff --git a/src/simple_semaphore.hpp b/src/simple_semaphore.hpp index 1bd114f..b48a7f5 100644 --- a/src/simple_semaphore.hpp +++ b/src/simple_semaphore.hpp @@ -17,21 +17,21 @@ along with this program. If not, see . */ -#ifndef __ZS_SIMPLE_SEMAPHORE_HPP_INCLUDED__ -#define __ZS_SIMPLE_SEMAPHORE_HPP_INCLUDED__ +#ifndef __ZMQ_SIMPLE_SEMAPHORE_HPP_INCLUDED__ +#define __ZMQ_SIMPLE_SEMAPHORE_HPP_INCLUDED__ #include "platform.hpp" #include "err.hpp" -#if defined ZS_HAVE_LINUX || defined ZS_HAVE_OSX || defined ZS_HAVE_OPENVMS +#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS #include -#elif defined ZS_HAVE_WINDOWS +#elif defined ZMQ_HAVE_WINDOWS #include "windows.hpp" #else #include #endif -namespace zs +namespace zmq { // Simple semaphore. Only single thread may be waiting at any given time. @@ -39,7 +39,7 @@ namespace zs // was matched by corresponding wait and the waiting thread was // released. -#if defined ZS_HAVE_LINUX || defined ZS_HAVE_OSX || defined ZS_HAVE_OPENVMS +#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS // On platforms that allow for double locking of a mutex from the same // thread, simple semaphore is implemented using mutex, as it is more diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 07606ad..6718244 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -19,7 +19,7 @@ #include -#include "../include/zs.h" +#include "../include/zmq.h" #include "socket_base.hpp" #include "app_thread.hpp" @@ -33,7 +33,7 @@ #include "dummy_aggregator.hpp" #include "dummy_distributor.hpp" -zs::socket_base_t::socket_base_t (app_thread_t *thread_, session_t *session_) : +zmq::socket_base_t::socket_base_t (app_thread_t *thread_, session_t *session_) : object_t (thread_), thread (thread_), session (session_), @@ -43,7 +43,7 @@ zs::socket_base_t::socket_base_t (app_thread_t *thread_, session_t *session_) : session->set_engine (this); } -void zs::socket_base_t::shutdown () +void zmq::socket_base_t::shutdown () { // Destroy all the I/O objects created from this socket. for (io_objects_t::size_type i = 0; i != io_objects.size (); i++) @@ -52,13 +52,13 @@ void zs::socket_base_t::shutdown () delete this; } -void zs::socket_base_t::schedule_terminate () +void zmq::socket_base_t::schedule_terminate () { // Terminate is never scheduled on socket engines. - zs_assert (false); + zmq_assert (false); } -void zs::socket_base_t::terminate () +void zmq::socket_base_t::terminate () { // Destroy all the I/O objects created from this socket. // First unregister the object from I/O thread, then terminate it in @@ -70,27 +70,27 @@ void zs::socket_base_t::terminate () io_objects [i]->terminate (); } - zs_assert (session); + zmq_assert (session); session->disconnected (); delete this; } -zs::socket_base_t::~socket_base_t () +zmq::socket_base_t::~socket_base_t () { } -void zs::socket_base_t::disable_in () +void zmq::socket_base_t::disable_in () { has_in = false; } -void zs::socket_base_t::disable_out () +void zmq::socket_base_t::disable_out () { has_out = false; } -int zs::socket_base_t::bind (const char *addr_, zs_opts *opts_) +int zmq::socket_base_t::bind (const char *addr_, zmq_opts *opts_) { thread->process_commands (false); @@ -136,7 +136,7 @@ int zs::socket_base_t::bind (const char *addr_, zs_opts *opts_) } } -int zs::socket_base_t::connect (const char *addr_, zs_opts *opts_) +int zmq::socket_base_t::connect (const char *addr_, zmq_opts *opts_) { thread->process_commands (false); @@ -148,14 +148,14 @@ int zs::socket_base_t::connect (const char *addr_, zs_opts *opts_) // session. io_thread_t *io_thread = choose_io_thread (opts_ ? opts_->taskset : 0); i_mux *mux = new dummy_aggregator_t; - zs_assert (mux); + zmq_assert (mux); i_demux *demux = new dummy_distributor_t; - zs_assert (demux); + zmq_assert (demux); session_t *peer = new session_t (io_thread, io_thread, mux, demux, false, true); - zs_assert (peer); + zmq_assert (peer); connecter_t *connecter = new connecter_t (io_thread, addr_, peer); - zs_assert (connecter); + zmq_assert (connecter); // Increment session's command sequence number so that it won't get // deallocated till the subsequent bind command arrives. @@ -202,20 +202,20 @@ int zs::socket_base_t::connect (const char *addr_, zs_opts *opts_) } } -int zs::socket_base_t::subscribe (const char *criteria_) +int zmq::socket_base_t::subscribe (const char *criteria_) { // No implementation at the moment... errno = ENOTSUP; return -1; } -int zs::socket_base_t::send (zs_msg *msg_, int flags_) +int zmq::socket_base_t::send (zmq_msg *msg_, int flags_) { thread->process_commands (false); while (true) { if (session->write (msg_)) return 0; - if (flags_ & ZS_NOBLOCK) { + if (flags_ & ZMQ_NOBLOCK) { errno = EAGAIN; return -1; } @@ -223,20 +223,20 @@ int zs::socket_base_t::send (zs_msg *msg_, int flags_) } } -int zs::socket_base_t::flush () +int zmq::socket_base_t::flush () { thread->process_commands (false); session->flush (); return 0; } -int zs::socket_base_t::recv (zs_msg *msg_, int flags_) +int zmq::socket_base_t::recv (zmq_msg *msg_, int flags_) { thread->process_commands (false); while (true) { if (session->read (msg_)) return 0; - if (flags_ & ZS_NOBLOCK) { + if (flags_ & ZMQ_NOBLOCK) { errno = EAGAIN; return -1; } @@ -244,23 +244,23 @@ int zs::socket_base_t::recv (zs_msg *msg_, int flags_) } } -int zs::socket_base_t::close () +int zmq::socket_base_t::close () { terminate (); return 0; } -void zs::socket_base_t::attach (struct i_poller *poller_, i_session *session_) +void zmq::socket_base_t::attach (struct i_poller *poller_, i_session *session_) { - zs_assert (false); + zmq_assert (false); } -void zs::socket_base_t::detach () +void zmq::socket_base_t::detach () { - zs_assert (false); + zmq_assert (false); } -void zs::socket_base_t::revive () +void zmq::socket_base_t::revive () { // We can ignore the event safely here. } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index ed0272a..c1de8e6 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZS_SOCKET_BASE_HPP_INCLUDED__ -#define __ZS_SOCKET_BASE_HPP_INCLUDED__ +#ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__ +#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__ #include @@ -26,7 +26,7 @@ #include "i_api.hpp" #include "object.hpp" -namespace zs +namespace zmq { class socket_base_t : public object_t, public i_engine, public i_api @@ -46,12 +46,12 @@ namespace zs void shutdown (); // i_api interface implementation. - int bind (const char *addr_, struct zs_opts *opts_); - int connect (const char *addr_, struct zs_opts *opts_); + int bind (const char *addr_, struct zmq_opts *opts_); + int connect (const char *addr_, struct zmq_opts *opts_); int subscribe (const char *criteria_); - int send (struct zs_msg *msg_, int flags_); + int send (struct zmq_msg *msg_, int flags_); int flush (); - int recv (struct zs_msg *msg_, int flags_); + int recv (struct zmq_msg *msg_, int flags_); int close (); protected: diff --git a/src/stdint.hpp b/src/stdint.hpp index b4c6125..1be8491 100644 --- a/src/stdint.hpp +++ b/src/stdint.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_STDINT_HPP_INCLUDED__ -#define __ZS_STDINT_HPP_INCLUDED__ +#ifndef __ZMQ_STDINT_HPP_INCLUDED__ +#define __ZMQ_STDINT_HPP_INCLUDED__ #include "platform.hpp" -#ifdef ZS_HAVE_SOLARIS +#ifdef ZMQ_HAVE_SOLARIS #include diff --git a/src/sub.cpp b/src/sub.cpp index 59e838a..3d1d578 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -17,27 +17,27 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" #include "sub.hpp" #include "app_thread.hpp" #include "session.hpp" #include "err.hpp" -zs::sub_t::sub_t (app_thread_t *thread_, session_t *session_) : +zmq::sub_t::sub_t (app_thread_t *thread_, session_t *session_) : socket_base_t (thread_, session_) { disable_out (); } -int zs::sub_t::send (struct zs_msg *msg_, int flags_) +int zmq::sub_t::send (struct zmq_msg *msg_, int flags_) { // Subscriber socket has no send function. errno = ENOTSUP; return -1; } -int zs::sub_t::flush () +int zmq::sub_t::flush () { // Subscriber socket has no flush function. errno = ENOTSUP; diff --git a/src/sub.hpp b/src/sub.hpp index b7c2bd2..f3e23c1 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_SUB_HPP_INCLUDED__ -#define __ZS_SUB_HPP_INCLUDED__ +#ifndef __ZMQ_SUB_HPP_INCLUDED__ +#define __ZMQ_SUB_HPP_INCLUDED__ #include "socket_base.hpp" -namespace zs +namespace zmq { class sub_t : public socket_base_t @@ -32,7 +32,7 @@ namespace zs sub_t (class app_thread_t *thread_, class session_t *session_); // i_api overloads. - int send (struct zs_msg *msg_, int flags_); + int send (struct zmq_msg *msg_, int flags_); int flush (); private: diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 6b59290..7d29019 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -22,7 +22,7 @@ #include "ip.hpp" #include "err.hpp" -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS #include "windows.hpp" #error @@ -37,20 +37,20 @@ #include #include -zs::tcp_connecter_t::tcp_connecter_t () : +zmq::tcp_connecter_t::tcp_connecter_t () : s (retired_fd) { } -zs::tcp_connecter_t::~tcp_connecter_t () +zmq::tcp_connecter_t::~tcp_connecter_t () { if (s != retired_fd) close (); } -int zs::tcp_connecter_t::open (const char *addr_) +int zmq::tcp_connecter_t::open (const char *addr_) { - zs_assert (s == retired_fd); + zmq_assert (s == retired_fd); // Convert the hostname into sockaddr_in structure. sockaddr_in address; @@ -75,7 +75,7 @@ int zs::tcp_connecter_t::open (const char *addr_) rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof (int)); errno_assert (rc == 0); -#ifdef ZS_HAVE_OPENVMS +#ifdef ZMQ_HAVE_OPENVMS // Disable delayed acknowledgements. flag = 1; rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, sizeof (int)); @@ -100,9 +100,9 @@ int zs::tcp_connecter_t::open (const char *addr_) return -1; } -int zs::tcp_connecter_t::close () +int zmq::tcp_connecter_t::close () { - zs_assert (s != retired_fd); + zmq_assert (s != retired_fd); int rc = ::close (s); if (rc != 0) return -1; @@ -110,12 +110,12 @@ int zs::tcp_connecter_t::close () return 0; } -zs::fd_t zs::tcp_connecter_t::get_fd () +zmq::fd_t zmq::tcp_connecter_t::get_fd () { return s; } -zs::fd_t zs::tcp_connecter_t::connect () +zmq::fd_t zmq::tcp_connecter_t::connect () { // Following code should handle both Berkeley-derived socket // implementations and Solaris. diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index ef11242..aa1ef05 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_TCP_CONNECTER_HPP_INCLUDED__ -#define __ZS_TCP_CONNECTER_HPP_INCLUDED__ +#ifndef __ZMQ_TCP_CONNECTER_HPP_INCLUDED__ +#define __ZMQ_TCP_CONNECTER_HPP_INCLUDED__ #include "fd.hpp" -namespace zs +namespace zmq { // The class encapsulating simple TCP listening socket. diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 3703950..6aae88a 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -23,7 +23,7 @@ #include "config.hpp" #include "err.hpp" -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS #include "windows.hpp" #error @@ -38,18 +38,18 @@ #include #include -zs::tcp_listener_t::tcp_listener_t () : +zmq::tcp_listener_t::tcp_listener_t () : s (retired_fd) { } -zs::tcp_listener_t::~tcp_listener_t () +zmq::tcp_listener_t::~tcp_listener_t () { if (s != retired_fd) close (); } -int zs::tcp_listener_t::open (const char *addr_) +int zmq::tcp_listener_t::open (const char *addr_) { // Convert the interface into sockaddr_in structure. sockaddr_in ip_address; @@ -91,9 +91,9 @@ int zs::tcp_listener_t::open (const char *addr_) return 0; } -int zs::tcp_listener_t::close () +int zmq::tcp_listener_t::close () { - zs_assert (s != retired_fd); + zmq_assert (s != retired_fd); int rc = ::close (s); if (rc != 0) return -1; @@ -101,14 +101,14 @@ int zs::tcp_listener_t::close () return 0; } -zs::fd_t zs::tcp_listener_t::get_fd () +zmq::fd_t zmq::tcp_listener_t::get_fd () { return s; } -zs::fd_t zs::tcp_listener_t::accept () +zmq::fd_t zmq::tcp_listener_t::accept () { - zs_assert (s != retired_fd); + zmq_assert (s != retired_fd); // Accept one incoming connection. fd_t sock = ::accept (s, NULL, NULL); @@ -151,7 +151,7 @@ zs::fd_t zs::tcp_listener_t::accept () sizeof (int)); errno_assert (rc == 0); -#ifdef ZS_HAVE_OPENVMS +#ifdef ZMQ_HAVE_OPENVMS // Disable delayed acknowledgements. flag = 1; rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp index 156195b..43a4aa8 100644 --- a/src/tcp_listener.hpp +++ b/src/tcp_listener.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_TCP_LISTENER_HPP_INCLUDED__ -#define __ZS_TCP_LISTENER_HPP_INCLUDED__ +#ifndef __ZMQ_TCP_LISTENER_HPP_INCLUDED__ +#define __ZMQ_TCP_LISTENER_HPP_INCLUDED__ #include "fd.hpp" -namespace zs +namespace zmq { // The class encapsulating simple TCP listening socket. diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp index 70fde96..1eb338f 100644 --- a/src/tcp_socket.cpp +++ b/src/tcp_socket.cpp @@ -21,7 +21,7 @@ #include "platform.hpp" #include "err.hpp" -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS #include "windows.hpp" #error @@ -36,27 +36,27 @@ #include #include -zs::tcp_socket_t::tcp_socket_t () : +zmq::tcp_socket_t::tcp_socket_t () : s (retired_fd) { } -zs::tcp_socket_t::~tcp_socket_t () +zmq::tcp_socket_t::~tcp_socket_t () { if (s != retired_fd) close (); } -int zs::tcp_socket_t::open (fd_t fd_) +int zmq::tcp_socket_t::open (fd_t fd_) { assert (s == retired_fd); s = fd_; return 0; } -int zs::tcp_socket_t::close () +int zmq::tcp_socket_t::close () { - zs_assert (s != retired_fd); + zmq_assert (s != retired_fd); int rc = ::close (s); if (rc != 0) return -1; @@ -64,12 +64,12 @@ int zs::tcp_socket_t::close () return 0; } -zs::fd_t zs::tcp_socket_t::get_fd () +zmq::fd_t zmq::tcp_socket_t::get_fd () { return s; } -int zs::tcp_socket_t::write (const void *data, int size) +int zmq::tcp_socket_t::write (const void *data, int size) { ssize_t nbytes = send (s, data, size, 0); @@ -88,7 +88,7 @@ int zs::tcp_socket_t::write (const void *data, int size) return (size_t) nbytes; } -int zs::tcp_socket_t::read (void *data, int size) +int zmq::tcp_socket_t::read (void *data, int size) { ssize_t nbytes = recv (s, data, size, 0); diff --git a/src/tcp_socket.hpp b/src/tcp_socket.hpp index a6c61ac..406e4c0 100644 --- a/src/tcp_socket.hpp +++ b/src/tcp_socket.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_TCP_SOCKET_HPP_INCLUDED__ -#define __ZS_TCP_SOCKET_HPP_INCLUDED__ +#ifndef __ZMQ_TCP_SOCKET_HPP_INCLUDED__ +#define __ZMQ_TCP_SOCKET_HPP_INCLUDED__ #include "fd.hpp" -namespace zs +namespace zmq { // The class encapsulating simple TCP read/write socket. diff --git a/src/thread.cpp b/src/thread.cpp index 7cf54f2..77993e2 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -21,9 +21,9 @@ #include "err.hpp" #include "platform.hpp" -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS -void zs::thread_t::start (thread_fn *tfn_, void *arg_) +void zmq::thread_t::start (thread_fn *tfn_, void *arg_) { tfn = tfn_; arg =arg_; @@ -32,13 +32,13 @@ void zs::thread_t::start (thread_fn *tfn_, void *arg_) win_assert (descriptor != NULL); } -void zs::thread_t::stop () +void zmq::thread_t::stop () { DWORD rc = WaitForSingleObject (descriptor, INFINITE); win_assert (rc != WAIT_FAILED); } -unsigned int __stdcall zs::thread_t::thread_routine (void *arg_) +unsigned int __stdcall zmq::thread_t::thread_routine (void *arg_) { thread_t *self = (thread_t*) arg_; self->tfn (self->arg); @@ -49,7 +49,7 @@ unsigned int __stdcall zs::thread_t::thread_routine (void *arg_) #include -void zs::thread_t::start (thread_fn *tfn_, void *arg_) +void zmq::thread_t::start (thread_fn *tfn_, void *arg_) { tfn = tfn_; arg =arg_; @@ -57,13 +57,13 @@ void zs::thread_t::start (thread_fn *tfn_, void *arg_) errno_assert (rc == 0); } -void zs::thread_t::stop () +void zmq::thread_t::stop () { int rc = pthread_join (descriptor, NULL); errno_assert (rc == 0); } -void *zs::thread_t::thread_routine (void *arg_) +void *zmq::thread_t::thread_routine (void *arg_) { #if !defined ZMQ_HAVE_OPENVMS // Following code will guarantee more predictable latecnies as it'll diff --git a/src/thread.hpp b/src/thread.hpp index 6ee9194..01f1f78 100644 --- a/src/thread.hpp +++ b/src/thread.hpp @@ -17,18 +17,18 @@ along with this program. If not, see . */ -#ifndef __ZS_THREAD_HPP_INCLUDED__ -#define __ZS_THREAD_HPP_INCLUDED__ +#ifndef __ZMQ_THREAD_HPP_INCLUDED__ +#define __ZMQ_THREAD_HPP_INCLUDED__ #include "platform.hpp" -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS #include "windows.hpp" #else #include #endif -namespace zs +namespace zmq { typedef void (thread_fn) (void*); @@ -57,7 +57,7 @@ namespace zs private: -#ifdef ZS_HAVE_WINDOWS +#ifdef ZMQ_HAVE_WINDOWS static unsigned int __stdcall thread_routine (void *arg_); HANDLE descriptor; #else diff --git a/src/uuid.cpp b/src/uuid.cpp index d9883cd..10db3bc 100644 --- a/src/uuid.cpp +++ b/src/uuid.cpp @@ -21,68 +21,68 @@ #include "uuid.hpp" #include "err.hpp" -#if defined ZS_HAVE_WINDOWS +#if defined ZMQ_HAVE_WINDOWS #include -zs::uuid_t::uuid_t () +zmq::uuid_t::uuid_t () { RPC_STATUS ret = UuidCreate (&uuid); - zs_assert (ret == RPC_S_OK); + zmq_assert (ret == RPC_S_OK); ret = UuidToString (&uuid, &uuid_str); - zs_assert (ret == RPC_S_OK); + zmq_assert (ret == RPC_S_OK); } -zs::uuid_t::~uuid_t () +zmq::uuid_t::~uuid_t () { RPC_STATUS ret = RpcStringFree(&uuid_str); assert (ret == RPC_S_OK); } -const char *zs::uuid_t::to_string () +const char *zmq::uuid_t::to_string () { return uuid_str; } -#elif defined ZS_HAVE_FREEBSD +#elif defined ZMQ_HAVE_FREEBSD #include #include -zs::uuid_t::uuid_t () +zmq::uuid_t::uuid_t () { uint32_t status; uuid_create (&uuid, &status); - zs_assert (status == uuid_s_ok); + zmq_assert (status == uuid_s_ok); uuid_to_string (&uuid, &uuid_str, &status); - zs_assert (status == uuid_s_ok); + zmq_assert (status == uuid_s_ok); } -zs::uuid_t::~uuid_t () +zmq::uuid_t::~uuid_t () { free (uuid_str); } -const char *zs::uuid_t::to_string () +const char *zmq::uuid_t::to_string () { return uuid_str; } -#elif defined ZS_HAVE_LINUX || defined ZS_HAVE_SOLARIS || defined ZS_HAVE_OSX +#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_OSX #include -zs::uuid_t::uuid_t () +zmq::uuid_t::uuid_t () { uuid_generate (uuid); uuid_unparse (uuid, uuid_buf); } -zs::uuid_t::~uuid_t () +zmq::uuid_t::~uuid_t () { } -const char *zs::uuid_t::to_string () +const char *zmq::uuid_t::to_string () { return uuid_buf; } @@ -93,11 +93,11 @@ const char *zs::uuid_t::to_string () #include #include -zs::uuid_t::uuid_t () +zmq::uuid_t::uuid_t () { unsigned char rand_buf [16]; int ret = RAND_bytes (rand_buf, sizeof rand_buf); - zs_assert (ret == 1); + zmq_assert (ret == 1); // Read in UUID fields. memcpy (&time_low, rand_buf, sizeof time_low); @@ -124,11 +124,11 @@ zs::uuid_t::uuid_t () node [0], node [1], node [2], node [3], node [4], node [5]); } -zs::uuid_t::~uuid_t () +zmq::uuid_t::~uuid_t () { } -const char *zs::uuid_t::to_string () +const char *zmq::uuid_t::to_string () { return uuid_buf; } diff --git a/src/uuid.hpp b/src/uuid.hpp index 94c3e8a..79a9620 100644 --- a/src/uuid.hpp +++ b/src/uuid.hpp @@ -17,22 +17,22 @@ along with this program. If not, see . */ -#ifndef __ZS_UUID_HPP_INCLUDED__ -#define __ZS_UUID_HPP_INCLUDED__ +#ifndef __ZMQ_UUID_HPP_INCLUDED__ +#define __ZMQ_UUID_HPP_INCLUDED__ #include "platform.hpp" -#if defined ZS_HAVE_WINDOWS +#if defined ZMQ_HAVE_WINDOWS #include -#elif defined ZS_HAVE_FREEBSD +#elif defined ZMQ_HAVE_FREEBSD #include -#elif defined ZS_HAVE_LINUX || defined ZS_HAVE_SOLARIS || defined ZS_HAVE_OSX +#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_OSX #include #else #include #endif -namespace zs +namespace zmq { // This class provides RFC 4122 (a Universally Unique IDentifier) @@ -55,13 +55,13 @@ namespace zs // The length of textual representation of UUID. enum { uuid_string_len = 36 }; -#if defined ZS_HAVE_WINDOWS +#if defined ZMQ_HAVE_WINDOWS ::UUID uuid; char *uuid_str; -#elif defined ZS_HAVE_FREEBSD +#elif defined ZMQ_HAVE_FREEBSD ::uuid_t uuid; char *uuid_str; -#elif defined ZS_HAVE_LINUX || defined ZS_HAVE_SOLARIS || defined ZS_HAVE_OSX +#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_SOLARIS || defined ZMQ_HAVE_OSX ::uuid_t uuid; char uuid_buf [uuid_string_len + 1]; #else diff --git a/src/windows.hpp b/src/windows.hpp index a4a89ba..e9dc30d 100644 --- a/src/windows.hpp +++ b/src/windows.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZS_WINDOWS_HPP_INCLUDED__ -#define __ZS_WINDOWS_HPP_INCLUDED__ +#ifndef __ZMQ_WINDOWS_HPP_INCLUDED__ +#define __ZMQ_WINDOWS_HPP_INCLUDED__ // The purpose of this header file is to turn on only the items actually needed // on the windows platform. diff --git a/src/wire.hpp b/src/wire.hpp index 31ebef1..4dbb063 100644 --- a/src/wire.hpp +++ b/src/wire.hpp @@ -17,12 +17,12 @@ along with this program. If not, see . */ -#ifndef __ZS_WIRE_HPP_INCLUDED__ -#define __ZS_WIRE_HPP_INCLUDED__ +#ifndef __ZMQ_WIRE_HPP_INCLUDED__ +#define __ZMQ_WIRE_HPP_INCLUDED__ #include "stdint.hpp" -namespace zs +namespace zmq { // Helper functions to convert different integer types to/from network diff --git a/src/ypipe.hpp b/src/ypipe.hpp index 5ffd5c9..01b4137 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -17,14 +17,14 @@ along with this program. If not, see . */ -#ifndef __ZS_YPIPE_HPP_INCLUDED__ -#define __ZS_YPIPE_HPP_INCLUDED__ +#ifndef __ZMQ_YPIPE_HPP_INCLUDED__ +#define __ZMQ_YPIPE_HPP_INCLUDED__ #include "atomic_ptr.hpp" #include "yqueue.hpp" #include "platform.hpp" -namespace zs +namespace zmq { // Lock-free queue implementation. @@ -58,7 +58,7 @@ namespace zs } // Following function (write) deliberately copies uninitialised data - // when used with zs_msg. Initialising the VSM body for + // when used with zmq_msg. Initialising the VSM body for // non-VSM messages won't be good for performance. #ifdef ZMQ_HAVE_OPENVMS diff --git a/src/ypollset.cpp b/src/ypollset.cpp index 0be6791..a90d042 100644 --- a/src/ypollset.cpp +++ b/src/ypollset.cpp @@ -19,18 +19,18 @@ #include "ypollset.hpp" -zs::ypollset_t::ypollset_t () +zmq::ypollset_t::ypollset_t () { } -void zs::ypollset_t::signal (int signal_) +void zmq::ypollset_t::signal (int signal_) { - zs_assert (signal_ >= 0 && signal_ < wait_signal); + zmq_assert (signal_ >= 0 && signal_ < wait_signal); if (bits.btsr (signal_, wait_signal)) sem.post (); } -zs::ypollset_t::signals_t zs::ypollset_t::poll () +zmq::ypollset_t::signals_t zmq::ypollset_t::poll () { signals_t result = 0; while (!result) { @@ -50,7 +50,7 @@ zs::ypollset_t::signals_t zs::ypollset_t::poll () return result; } -zs::ypollset_t::signals_t zs::ypollset_t::check () +zmq::ypollset_t::signals_t zmq::ypollset_t::check () { return bits.xchg (0); } diff --git a/src/ypollset.hpp b/src/ypollset.hpp index 7c71152..b49581a 100644 --- a/src/ypollset.hpp +++ b/src/ypollset.hpp @@ -17,14 +17,14 @@ along with this program. If not, see . */ -#ifndef __ZS_YPOLLSET_HPP_INCLUDED__ -#define __ZS_YPOLLSET_HPP_INCLUDED__ +#ifndef __ZMQ_YPOLLSET_HPP_INCLUDED__ +#define __ZMQ_YPOLLSET_HPP_INCLUDED__ #include "i_signaler.hpp" #include "simple_semaphore.hpp" #include "atomic_bitmap.hpp" -namespace zs +namespace zmq { // ypollset allows for rapid polling for up to constant number of diff --git a/src/yqueue.hpp b/src/yqueue.hpp index 78be17c..0686f07 100644 --- a/src/yqueue.hpp +++ b/src/yqueue.hpp @@ -17,14 +17,14 @@ along with this program. If not, see . */ -#ifndef __ZS_YQUEUE_HPP_INCLUDED__ -#define __ZS_YQUEUE_HPP_INCLUDED__ +#ifndef __ZMQ_YQUEUE_HPP_INCLUDED__ +#define __ZMQ_YQUEUE_HPP_INCLUDED__ #include #include "err.hpp" -namespace zs +namespace zmq { // yqueue is an efficient queue implementation. The main goal is @@ -48,7 +48,7 @@ namespace zs inline yqueue_t () { begin_chunk = new chunk_t; - zs_assert (begin_chunk); + zmq_assert (begin_chunk); begin_pos = 0; back_chunk = NULL; back_pos = 0; @@ -92,7 +92,7 @@ namespace zs return; end_chunk->next = new chunk_t; - zs_assert (end_chunk->next); + zmq_assert (end_chunk->next); end_chunk = end_chunk->next; end_pos = 0; } diff --git a/src/zmq.cpp b/src/zmq.cpp new file mode 100644 index 0000000..a7fd486 --- /dev/null +++ b/src/zmq.cpp @@ -0,0 +1,223 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../include/zmq.h" + +#include +#include +#include + +#include "i_api.hpp" +#include "err.hpp" +#include "dispatcher.hpp" +#include "msg.hpp" + +int zmq_msg_init (zmq_msg *msg_) +{ + msg_->content = (zmq_msg_content*) ZMQ_VSM; + msg_->vsm_size = 0; + return 0; +} + +int zmq_msg_init_size (zmq_msg *msg_, size_t size_) +{ + if (size_ <= ZMQ_MAX_VSM_SIZE) { + msg_->content = (zmq_msg_content*) ZMQ_VSM; + msg_->vsm_size = (uint16_t) size_; + } + else { + msg_->content = (zmq_msg_content*) malloc (sizeof (zmq_msg_content) + + size_); + if (!msg_->content) { + errno = ENOMEM; + return -1; + } + msg_->shared = 0; + + msg_->content->data = (void*) (msg_->content + 1); + msg_->content->size = size_; + msg_->content->ffn = NULL; + new (&msg_->content->refcnt) zmq::atomic_counter_t (); + } + return 0; +} + +int zmq_msg_init_data (zmq_msg *msg_, void *data_, size_t size_, + zmq_free_fn *ffn_) +{ + msg_->shared = 0; + msg_->content = (zmq_msg_content*) malloc (sizeof (zmq_msg_content)); + zmq_assert (msg_->content); + msg_->content->data = data_; + msg_->content->size = size_; + msg_->content->ffn = ffn_; + new (&msg_->content->refcnt) zmq::atomic_counter_t (); + return 0; +} + +int zmq_msg_close (zmq_msg *msg_) +{ + // For VSMs and delimiters there are no resources to free + if (msg_->content == (zmq_msg_content*) ZMQ_DELIMITER || + msg_->content == (zmq_msg_content*) ZMQ_VSM || + msg_->content == (zmq_msg_content*) ZMQ_GAP) + return 0; + + // If the content is not shared, or if it is shared and the reference + // count has dropped to zero, deallocate it. + if (!msg_->shared || !msg_->content->refcnt.sub (1)) { + + // We used "placement new" operator to initialize the reference + // counter so we call its destructor now. + msg_->content->refcnt.~atomic_counter_t (); + + if (msg_->content->ffn) + msg_->content->ffn (msg_->content->data); + free (msg_->content); + } + + return 0; +} + +int zmq_msg_move (zmq_msg *dest_, zmq_msg *src_) +{ + zmq_msg_close (dest_); + *dest_ = *src_; + zmq_msg_init (src_); + return 0; +} + +int zmq_msg_copy (zmq_msg *dest_, zmq_msg *src_) +{ + zmq_msg_close (dest_); + + // VSMs and delimiters require no special handling. + if (src_->content != + (zmq_msg_content*) ZMQ_DELIMITER && + src_->content != (zmq_msg_content*) ZMQ_VSM && + src_->content != (zmq_msg_content*) ZMQ_GAP) { + + // One reference is added to shared messages. Non-shared messages + // are turned into shared messages and reference count is set to 2. + if (src_->shared) + src_->content->refcnt.add (1); + else { + src_->shared = true; + src_->content->refcnt.set (2); + } + } + + *dest_ = *src_; + return 0; +} + +void *zmq_msg_data (zmq_msg *msg_) +{ + if (msg_->content == (zmq_msg_content*) ZMQ_VSM) + return msg_->vsm_data; + if (msg_->content == + (zmq_msg_content*) ZMQ_DELIMITER || + msg_->content == (zmq_msg_content*) ZMQ_GAP) + return NULL; + return msg_->content->data; +} + +size_t zmq_msg_size (zmq_msg *msg_) +{ + if (msg_->content == (zmq_msg_content*) ZMQ_VSM) + return msg_->vsm_size; + if (msg_->content == + (zmq_msg_content*) ZMQ_DELIMITER || + msg_->content == (zmq_msg_content*) ZMQ_GAP) + return 0; + return msg_->content->size; +} + +int zmq_msg_type (zmq_msg *msg_) +{ + // If it's a genuine message, return 0. + if (msg_->content >= (zmq_msg_content*) ZMQ_VSM) + return 0; + + // Trick the compiler to believe that content is an integer. + unsigned char *offset = 0; + return (((const unsigned char*) msg_->content) - offset); +} + +void *zmq_init (int app_threads_, int io_threads_) +{ + // There should be at least a single thread managed by the dispatcher. + if (app_threads_ < 0 || io_threads_ < 0 || + app_threads_ + io_threads_ == 0) { + errno = EINVAL; + return NULL; + } + + zmq::dispatcher_t *dispatcher = + new zmq::dispatcher_t (app_threads_, io_threads_); + zmq_assert (dispatcher); + return (void*) dispatcher; +} + +int zmq_term (void *context_) +{ + ((zmq::dispatcher_t*) context_)->shutdown (); + return 0; +} + +void *zmq_socket (void *context_, int type_) +{ + return (void*) (((zmq::dispatcher_t*) context_)->create_socket (type_)); +} + +int zmq_close (void *s_) +{ + ((zmq::i_api*) s_)->close (); + return 0; +} + +int zmq_bind (void *s_, const char *addr_, zmq_opts *opts_) +{ + return (((zmq::i_api*) s_)->bind (addr_, opts_)); +} + +int zmq_connect (void *s_, const char *addr_, zmq_opts *opts_) +{ + return (((zmq::i_api*) s_)->connect (addr_, opts_)); +} + +int zmq_subscribe (void *s_, const char *criteria_) +{ + return (((zmq::i_api*) s_)->subscribe (criteria_)); +} + +int zmq_send (void *s_, zmq_msg *msg_, int flags_) +{ + return (((zmq::i_api*) s_)->send (msg_, flags_)); +} + +int zmq_flush (void *s_) +{ + return (((zmq::i_api*) s_)->flush ()); +} + +int zmq_recv (void *s_, zmq_msg *msg_, int flags_) +{ + return (((zmq::i_api*) s_)->recv (msg_, flags_)); +} diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index 46e4752..0c491ea 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -21,26 +21,26 @@ #include "i_session.hpp" #include "wire.hpp" -zs::zmq_decoder_t::zmq_decoder_t () : +zmq::zmq_decoder_t::zmq_decoder_t () : destination (NULL) { - zs_msg_init (&in_progress); + zmq_msg_init (&in_progress); // At the beginning, read one byte and go to one_byte_size_ready state. next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready); } -zs::zmq_decoder_t::~zmq_decoder_t () +zmq::zmq_decoder_t::~zmq_decoder_t () { - zs_msg_close (&in_progress); + zmq_msg_close (&in_progress); } -void zs::zmq_decoder_t::set_session (i_session *destination_) +void zmq::zmq_decoder_t::set_session (i_session *destination_) { destination = destination_; } -bool zs::zmq_decoder_t::one_byte_size_ready () +bool zmq::zmq_decoder_t::one_byte_size_ready () { // First byte of size is read. If it is 0xff read 8-byte size. // Otherwise allocate the buffer for message data and read the @@ -48,24 +48,25 @@ bool zs::zmq_decoder_t::one_byte_size_ready () if (*tmpbuf == 0xff) next_step (tmpbuf, 8, &zmq_decoder_t::eight_byte_size_ready); else { - zs_msg_init_size (&in_progress, *tmpbuf); - next_step (zs_msg_data (&in_progress), *tmpbuf, + zmq_msg_init_size (&in_progress, *tmpbuf); + next_step (zmq_msg_data (&in_progress), *tmpbuf, &zmq_decoder_t::message_ready); } return true; } -bool zs::zmq_decoder_t::eight_byte_size_ready () +bool zmq::zmq_decoder_t::eight_byte_size_ready () { // 8-byte size is read. Allocate the buffer for message body and // read the message data into it. size_t size = (size_t) get_uint64 (tmpbuf); - zs_msg_init_size (&in_progress, size); - next_step (zs_msg_data (&in_progress), size, &zmq_decoder_t::message_ready); + zmq_msg_init_size (&in_progress, size); + next_step (zmq_msg_data (&in_progress), size, + &zmq_decoder_t::message_ready); return true; } -bool zs::zmq_decoder_t::message_ready () +bool zmq::zmq_decoder_t::message_ready () { // Message is completely read. Push it further and start reading // new message. diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp index 3ab8d6b..f648819 100644 --- a/src/zmq_decoder.hpp +++ b/src/zmq_decoder.hpp @@ -18,14 +18,14 @@ */ -#ifndef __ZS_ZMQ_DECODER_HPP_INCLUDED__ -#define __ZS_ZMQ_DECODER_HPP_INCLUDED__ +#ifndef __ZMQ_ZMQ_DECODER_HPP_INCLUDED__ +#define __ZMQ_ZMQ_DECODER_HPP_INCLUDED__ -#include "../include/zs.h" +#include "../include/zmq.h" #include "decoder.hpp" -namespace zs +namespace zmq { // Decoder for 0MQ backend protocol. Converts data batches into messages. @@ -46,7 +46,7 @@ namespace zs struct i_session *destination; unsigned char tmpbuf [8]; - ::zs_msg in_progress; + ::zmq_msg in_progress; zmq_decoder_t (const zmq_decoder_t&); void operator = (const zmq_decoder_t&); diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index 9624b69..8a713cf 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -21,34 +21,34 @@ #include "i_session.hpp" #include "wire.hpp" -zs::zmq_encoder_t::zmq_encoder_t () : +zmq::zmq_encoder_t::zmq_encoder_t () : source (NULL) { - zs_msg_init (&in_progress); + zmq_msg_init (&in_progress); // Write 0 bytes to the batch and go to message_ready state. next_step (NULL, 0, &zmq_encoder_t::message_ready, true); } -zs::zmq_encoder_t::~zmq_encoder_t () +zmq::zmq_encoder_t::~zmq_encoder_t () { - zs_msg_close (&in_progress); + zmq_msg_close (&in_progress); } -void zs::zmq_encoder_t::set_session (i_session *source_) +void zmq::zmq_encoder_t::set_session (i_session *source_) { source = source_; } -bool zs::zmq_encoder_t::size_ready () +bool zmq::zmq_encoder_t::size_ready () { // Write message body into the buffer. - next_step (zs_msg_data (&in_progress), zs_msg_size (&in_progress), + next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), &zmq_encoder_t::message_ready, false); return true; } -bool zs::zmq_encoder_t::message_ready () +bool zmq::zmq_encoder_t::message_ready () { // Read new message from the dispatcher. If there is none, return false. // Note that new state is set only if write is successful. That way @@ -57,7 +57,7 @@ bool zs::zmq_encoder_t::message_ready () if (!source->read (&in_progress)) { return false; } - size_t size = zs_msg_size (&in_progress); + size_t size = zmq_msg_size (&in_progress); // For messages less than 255 bytes long, write one byte of message size. // For longer messages write 0xff escape character followed by 8-byte diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp index 37c4aee..829fd4b 100644 --- a/src/zmq_encoder.hpp +++ b/src/zmq_encoder.hpp @@ -17,14 +17,14 @@ along with this program. If not, see . */ -#ifndef __ZS_ZMQ_ENCODER_HPP_INCLUDED__ -#define __ZS_ZMQ_ENCODER_HPP_INCLUDED__ +#ifndef __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__ +#define __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__ -#include "../include/zs.h" +#include "../include/zmq.h" #include "encoder.hpp" -namespace zs +namespace zmq { // Encoder for 0MQ backend protocol. Converts messages into data batches. @@ -43,7 +43,7 @@ namespace zs bool message_ready (); struct i_session *source; - ::zs_msg in_progress; + ::zmq_msg in_progress; unsigned char tmpbuf [9]; zmq_encoder_t (const zmq_encoder_t&); diff --git a/src/zmq_tcp_engine.cpp b/src/zmq_tcp_engine.cpp index 0f55808..6091d80 100644 --- a/src/zmq_tcp_engine.cpp +++ b/src/zmq_tcp_engine.cpp @@ -22,7 +22,7 @@ #include "i_session.hpp" #include "err.hpp" -zs::zmq_tcp_engine_t::zmq_tcp_engine_t (fd_t fd_) : +zmq::zmq_tcp_engine_t::zmq_tcp_engine_t (fd_t fd_) : poller (NULL), session (NULL), terminating (false), @@ -33,20 +33,20 @@ zs::zmq_tcp_engine_t::zmq_tcp_engine_t (fd_t fd_) : { // Allocate read & write buffer. inbuf = new unsigned char [in_batch_size]; - zs_assert (inbuf); + zmq_assert (inbuf); outbuf = new unsigned char [out_batch_size]; - zs_assert (outbuf); + zmq_assert (outbuf); // Attach the socket. int rc = socket.open (fd_); - zs_assert (rc == 0); + zmq_assert (rc == 0); } -void zs::zmq_tcp_engine_t::attach (i_poller *poller_, i_session *session_) +void zmq::zmq_tcp_engine_t::attach (i_poller *poller_, i_session *session_) { - zs_assert (!poller); + zmq_assert (!poller); poller = poller_; - zs_assert (!session); + zmq_assert (!session); session = session_; encoder.set_session (session); decoder.set_session (session); @@ -63,47 +63,47 @@ void zs::zmq_tcp_engine_t::attach (i_poller *poller_, i_session *session_) in_event (); } -void zs::zmq_tcp_engine_t::detach () +void zmq::zmq_tcp_engine_t::detach () { - zs_assert (poller); + zmq_assert (poller); poller->rm_fd (handle); poller = NULL; - zs_assert (session); + zmq_assert (session); session->set_engine (NULL); session = NULL; encoder.set_session (NULL); decoder.set_session (NULL); } -void zs::zmq_tcp_engine_t::revive () +void zmq::zmq_tcp_engine_t::revive () { - zs_assert (poller); + zmq_assert (poller); poller->set_pollout (handle); } -void zs::zmq_tcp_engine_t::schedule_terminate () +void zmq::zmq_tcp_engine_t::schedule_terminate () { terminating = true; } -void zs::zmq_tcp_engine_t::terminate () +void zmq::zmq_tcp_engine_t::terminate () { delete this; } -void zs::zmq_tcp_engine_t::shutdown () +void zmq::zmq_tcp_engine_t::shutdown () { delete this; } -zs::zmq_tcp_engine_t::~zmq_tcp_engine_t () +zmq::zmq_tcp_engine_t::~zmq_tcp_engine_t () { detach (); delete [] outbuf; delete [] inbuf; } -void zs::zmq_tcp_engine_t::in_event () +void zmq::zmq_tcp_engine_t::in_event () { // If there's no data to process in the buffer, read new data. if (inpos == insize) { @@ -139,7 +139,7 @@ void zs::zmq_tcp_engine_t::in_event () session->flush (); } -void zs::zmq_tcp_engine_t::out_event () +void zmq::zmq_tcp_engine_t::out_event () { // If write buffer is empty, try to read new data from the encoder. if (outpos == outsize) { @@ -173,13 +173,13 @@ void zs::zmq_tcp_engine_t::out_event () } } -void zs::zmq_tcp_engine_t::timer_event () +void zmq::zmq_tcp_engine_t::timer_event () { - zs_assert (false); + zmq_assert (false); } -void zs::zmq_tcp_engine_t::error () +void zmq::zmq_tcp_engine_t::error () { - zs_assert (false); + zmq_assert (false); } diff --git a/src/zmq_tcp_engine.hpp b/src/zmq_tcp_engine.hpp index fb1413c..6a83cec 100644 --- a/src/zmq_tcp_engine.hpp +++ b/src/zmq_tcp_engine.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZS_ZMQ_TCP_ENGINE_HPP_INCLUDED__ -#define __ZS_ZMQ_TCP_ENGINE_HPP_INCLUDED__ +#ifndef __ZMQ_ZMQ_TCP_ENGINE_HPP_INCLUDED__ +#define __ZMQ_ZMQ_TCP_ENGINE_HPP_INCLUDED__ #include "i_engine.hpp" #include "i_poller.hpp" @@ -28,7 +28,7 @@ #include "zmq_encoder.hpp" #include "zmq_decoder.hpp" -namespace zs +namespace zmq { class zmq_tcp_engine_t : public i_engine, public i_poll_events diff --git a/src/zs.cpp b/src/zs.cpp deleted file mode 100644 index ca05db3..0000000 --- a/src/zs.cpp +++ /dev/null @@ -1,222 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zs.h" - -#include -#include -#include - -#include "i_api.hpp" -#include "err.hpp" -#include "dispatcher.hpp" -#include "msg.hpp" - -int zs_msg_init (zs_msg *msg_) -{ - msg_->content = (zs_msg_content*) ZS_VSM; - msg_->vsm_size = 0; - return 0; -} - -int zs_msg_init_size (zs_msg *msg_, size_t size_) -{ - if (size_ <= ZS_MAX_VSM_SIZE) { - msg_->content = (zs_msg_content*) ZS_VSM; - msg_->vsm_size = (uint16_t) size_; - } - else { - msg_->content = (zs_msg_content*) malloc (sizeof (zs_msg_content) + - size_); - if (!msg_->content) { - errno = ENOMEM; - return -1; - } - msg_->shared = 0; - - msg_->content->data = (void*) (msg_->content + 1); - msg_->content->size = size_; - msg_->content->ffn = NULL; - new (&msg_->content->refcnt) zs::atomic_counter_t (); - } - return 0; -} - -int zs_msg_init_data (zs_msg *msg_, void *data_, size_t size_, zs_free_fn *ffn_) -{ - msg_->shared = 0; - msg_->content = (zs_msg_content*) malloc (sizeof (zs_msg_content)); - zs_assert (msg_->content); - msg_->content->data = data_; - msg_->content->size = size_; - msg_->content->ffn = ffn_; - new (&msg_->content->refcnt) zs::atomic_counter_t (); - return 0; -} - -int zs_msg_close (zs_msg *msg_) -{ - // For VSMs and delimiters there are no resources to free - if (msg_->content == (zs_msg_content*) ZS_DELIMITER || - msg_->content == (zs_msg_content*) ZS_VSM || - msg_->content == (zs_msg_content*) ZS_GAP) - return 0; - - // If the content is not shared, or if it is shared and the reference - // count has dropped to zero, deallocate it. - if (!msg_->shared || !msg_->content->refcnt.sub (1)) { - - // We used "placement new" operator to initialize the reference - // counter so we call its destructor now. - msg_->content->refcnt.~atomic_counter_t (); - - if (msg_->content->ffn) - msg_->content->ffn (msg_->content->data); - free (msg_->content); - } - - return 0; -} - -int zs_msg_move (zs_msg *dest_, zs_msg *src_) -{ - zs_msg_close (dest_); - *dest_ = *src_; - zs_msg_init (src_); - return 0; -} - -int zs_msg_copy (zs_msg *dest_, zs_msg *src_) -{ - zs_msg_close (dest_); - - // VSMs and delimiters require no special handling. - if (src_->content != - (zs_msg_content*) ZS_DELIMITER && - src_->content != (zs_msg_content*) ZS_VSM && - src_->content != (zs_msg_content*) ZS_GAP) { - - // One reference is added to shared messages. Non-shared messages - // are turned into shared messages and reference count is set to 2. - if (src_->shared) - src_->content->refcnt.add (1); - else { - src_->shared = true; - src_->content->refcnt.set (2); - } - } - - *dest_ = *src_; - return 0; -} - -void *zs_msg_data (zs_msg *msg_) -{ - if (msg_->content == (zs_msg_content*) ZS_VSM) - return msg_->vsm_data; - if (msg_->content == - (zs_msg_content*) ZS_DELIMITER || - msg_->content == (zs_msg_content*) ZS_GAP) - return NULL; - return msg_->content->data; -} - -size_t zs_msg_size (zs_msg *msg_) -{ - if (msg_->content == (zs_msg_content*) ZS_VSM) - return msg_->vsm_size; - if (msg_->content == - (zs_msg_content*) ZS_DELIMITER || - msg_->content == (zs_msg_content*) ZS_GAP) - return 0; - return msg_->content->size; -} - -int zs_msg_type (zs_msg *msg_) -{ - // If it's a genuine message, return 0. - if (msg_->content >= (zs_msg_content*) ZS_VSM) - return 0; - - // Trick the compiler to believe that content is an integer. - unsigned char *offset = 0; - return (((const unsigned char*) msg_->content) - offset); -} - -void *zs_init (int app_threads_, int io_threads_) -{ - // There should be at least a single thread managed by the dispatcher. - if (app_threads_ < 0 || io_threads_ < 0 || - app_threads_ + io_threads_ == 0) { - errno = EINVAL; - return NULL; - } - - zs::dispatcher_t *dispatcher = - new zs::dispatcher_t (app_threads_, io_threads_); - zs_assert (dispatcher); - return (void*) dispatcher; -} - -int zs_term (void *context_) -{ - ((zs::dispatcher_t*) context_)->shutdown (); - return 0; -} - -void *zs_socket (void *context_, int type_) -{ - return (void*) (((zs::dispatcher_t*) context_)->create_socket (type_)); -} - -int zs_close (void *s_) -{ - ((zs::i_api*) s_)->close (); - return 0; -} - -int zs_bind (void *s_, const char *addr_, zs_opts *opts_) -{ - return (((zs::i_api*) s_)->bind (addr_, opts_)); -} - -int zs_connect (void *s_, const char *addr_, zs_opts *opts_) -{ - return (((zs::i_api*) s_)->connect (addr_, opts_)); -} - -int zs_subscribe (void *s_, const char *criteria_) -{ - return (((zs::i_api*) s_)->subscribe (criteria_)); -} - -int zs_send (void *s_, zs_msg *msg_, int flags_) -{ - return (((zs::i_api*) s_)->send (msg_, flags_)); -} - -int zs_flush (void *s_) -{ - return (((zs::i_api*) s_)->flush ()); -} - -int zs_recv (void *s_, zs_msg *msg_, int flags_) -{ - return (((zs::i_api*) s_)->recv (msg_, flags_)); -} -- cgit v1.2.3