From cc3755a16f00026af882ed14d122cc8aa6d50e82 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 3 Aug 2009 11:30:13 +0200 Subject: renamed from zs to zmq --- src/app_thread.cpp | 70 +++++++++++++++++++++++++++--------------------------- 1 file changed, 35 insertions(+), 35 deletions(-) (limited to 'src/app_thread.cpp') diff --git a/src/app_thread.cpp b/src/app_thread.cpp index ca08976..2406dbd 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -17,9 +17,9 @@ along with this program. If not, see . */ -#include "../include/zs.h" +#include "../include/zmq.h" -#if defined ZS_HAVE_WINDOWS +#if defined ZMQ_HAVE_WINDOWS #include "windows.hpp" #else #include @@ -48,17 +48,17 @@ // system with x86 architecture and gcc or MSVC compiler. #if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\ (defined _MSC_VER && (defined _M_IX86 || defined _M_X64)) -#define ZS_DELAY_COMMANDS +#define ZMQ_DELAY_COMMANDS #endif -zs::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : +zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : object_t (dispatcher_, thread_slot_), tid (0), last_processing_time (0) { } -void zs::app_thread_t::shutdown () +void zmq::app_thread_t::shutdown () { // Deallocate all the sessions associated with the thread. while (!sessions.empty ()) @@ -67,17 +67,17 @@ void zs::app_thread_t::shutdown () delete this; } -zs::app_thread_t::~app_thread_t () +zmq::app_thread_t::~app_thread_t () { } -void zs::app_thread_t::attach_session (session_t *session_) +void zmq::app_thread_t::attach_session (session_t *session_) { session_->set_index (sessions.size ()); sessions.push_back (session_); } -void zs::app_thread_t::detach_session (session_t *session_) +void zmq::app_thread_t::detach_session (session_t *session_) { // O(1) removal of the session from the list. sessions_t::size_type i = session_->get_index (); @@ -86,22 +86,22 @@ void zs::app_thread_t::detach_session (session_t *session_) sessions.pop_back (); } -zs::i_poller *zs::app_thread_t::get_poller () +zmq::i_poller *zmq::app_thread_t::get_poller () { - zs_assert (false); + zmq_assert (false); } -zs::i_signaler *zs::app_thread_t::get_signaler () +zmq::i_signaler *zmq::app_thread_t::get_signaler () { return &pollset; } -bool zs::app_thread_t::is_current () +bool zmq::app_thread_t::is_current () { return !sessions.empty () && tid == getpid (); } -bool zs::app_thread_t::make_current () +bool zmq::app_thread_t::make_current () { // If there are object managed by this slot we cannot assign the slot // to a different thread. @@ -112,7 +112,7 @@ bool zs::app_thread_t::make_current () return true; } -zs::i_api *zs::app_thread_t::create_socket (int type_) +zmq::i_api *zmq::app_thread_t::create_socket (int type_) { i_mux *mux = NULL; i_demux *demux = NULL; @@ -120,43 +120,43 @@ zs::i_api *zs::app_thread_t::create_socket (int type_) i_api *api = NULL; switch (type_) { - case ZS_P2P: + case ZMQ_P2P: mux = new dummy_aggregator_t; - zs_assert (mux); + zmq_assert (mux); demux = new dummy_distributor_t; - zs_assert (demux); + zmq_assert (demux); session = new session_t (this, this, mux, demux, true, false); - zs_assert (session); + zmq_assert (session); api = new p2p_t (this, session); - zs_assert (api); + zmq_assert (api); break; - case ZS_PUB: + case ZMQ_PUB: demux = new data_distributor_t; - zs_assert (demux); + zmq_assert (demux); session = new session_t (this, this, mux, demux, true, false); - zs_assert (session); + zmq_assert (session); api = new pub_t (this, session); - zs_assert (api); + zmq_assert (api); break; - case ZS_SUB: + case ZMQ_SUB: mux = new fair_aggregator_t; - zs_assert (mux); + zmq_assert (mux); session = new session_t (this, this, mux, demux, true, false); - zs_assert (session); + zmq_assert (session); api = new sub_t (this, session); - zs_assert (api); + zmq_assert (api); break; - case ZS_REQ: + case ZMQ_REQ: // TODO - zs_assert (false); + zmq_assert (false); api = new req_t (this, session); - zs_assert (api); + zmq_assert (api); break; - case ZS_REP: + case ZMQ_REP: // TODO - zs_assert (false); + zmq_assert (false); api = new rep_t (this, session); - zs_assert (api); + zmq_assert (api); break; default: errno = EINVAL; @@ -168,14 +168,14 @@ zs::i_api *zs::app_thread_t::create_socket (int type_) return api; } -void zs::app_thread_t::process_commands (bool block_) +void zmq::app_thread_t::process_commands (bool block_) { ypollset_t::signals_t signals; if (block_) signals = pollset.poll (); else { -#if defined ZS_DELAY_COMMANDS +#if defined ZMQ_DELAY_COMMANDS // Optimised version of command processing - it doesn't have to check // for incoming commands each time. It does so only if certain time // elapsed since last command processing. Command delay varies -- cgit v1.2.3