summaryrefslogtreecommitdiff
path: root/src/app_thread.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/app_thread.cpp')
-rw-r--r--src/app_thread.cpp70
1 files changed, 35 insertions, 35 deletions
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index ca08976..2406dbd 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -17,9 +17,9 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zs.h"
+#include "../include/zmq.h"
-#if defined ZS_HAVE_WINDOWS
+#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
@@ -48,17 +48,17 @@
// system with x86 architecture and gcc or MSVC compiler.
#if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\
(defined _MSC_VER && (defined _M_IX86 || defined _M_X64))
-#define ZS_DELAY_COMMANDS
+#define ZMQ_DELAY_COMMANDS
#endif
-zs::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
+zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
object_t (dispatcher_, thread_slot_),
tid (0),
last_processing_time (0)
{
}
-void zs::app_thread_t::shutdown ()
+void zmq::app_thread_t::shutdown ()
{
// Deallocate all the sessions associated with the thread.
while (!sessions.empty ())
@@ -67,17 +67,17 @@ void zs::app_thread_t::shutdown ()
delete this;
}
-zs::app_thread_t::~app_thread_t ()
+zmq::app_thread_t::~app_thread_t ()
{
}
-void zs::app_thread_t::attach_session (session_t *session_)
+void zmq::app_thread_t::attach_session (session_t *session_)
{
session_->set_index (sessions.size ());
sessions.push_back (session_);
}
-void zs::app_thread_t::detach_session (session_t *session_)
+void zmq::app_thread_t::detach_session (session_t *session_)
{
// O(1) removal of the session from the list.
sessions_t::size_type i = session_->get_index ();
@@ -86,22 +86,22 @@ void zs::app_thread_t::detach_session (session_t *session_)
sessions.pop_back ();
}
-zs::i_poller *zs::app_thread_t::get_poller ()
+zmq::i_poller *zmq::app_thread_t::get_poller ()
{
- zs_assert (false);
+ zmq_assert (false);
}
-zs::i_signaler *zs::app_thread_t::get_signaler ()
+zmq::i_signaler *zmq::app_thread_t::get_signaler ()
{
return &pollset;
}
-bool zs::app_thread_t::is_current ()
+bool zmq::app_thread_t::is_current ()
{
return !sessions.empty () && tid == getpid ();
}
-bool zs::app_thread_t::make_current ()
+bool zmq::app_thread_t::make_current ()
{
// If there are object managed by this slot we cannot assign the slot
// to a different thread.
@@ -112,7 +112,7 @@ bool zs::app_thread_t::make_current ()
return true;
}
-zs::i_api *zs::app_thread_t::create_socket (int type_)
+zmq::i_api *zmq::app_thread_t::create_socket (int type_)
{
i_mux *mux = NULL;
i_demux *demux = NULL;
@@ -120,43 +120,43 @@ zs::i_api *zs::app_thread_t::create_socket (int type_)
i_api *api = NULL;
switch (type_) {
- case ZS_P2P:
+ case ZMQ_P2P:
mux = new dummy_aggregator_t;
- zs_assert (mux);
+ zmq_assert (mux);
demux = new dummy_distributor_t;
- zs_assert (demux);
+ zmq_assert (demux);
session = new session_t (this, this, mux, demux, true, false);
- zs_assert (session);
+ zmq_assert (session);
api = new p2p_t (this, session);
- zs_assert (api);
+ zmq_assert (api);
break;
- case ZS_PUB:
+ case ZMQ_PUB:
demux = new data_distributor_t;
- zs_assert (demux);
+ zmq_assert (demux);
session = new session_t (this, this, mux, demux, true, false);
- zs_assert (session);
+ zmq_assert (session);
api = new pub_t (this, session);
- zs_assert (api);
+ zmq_assert (api);
break;
- case ZS_SUB:
+ case ZMQ_SUB:
mux = new fair_aggregator_t;
- zs_assert (mux);
+ zmq_assert (mux);
session = new session_t (this, this, mux, demux, true, false);
- zs_assert (session);
+ zmq_assert (session);
api = new sub_t (this, session);
- zs_assert (api);
+ zmq_assert (api);
break;
- case ZS_REQ:
+ case ZMQ_REQ:
// TODO
- zs_assert (false);
+ zmq_assert (false);
api = new req_t (this, session);
- zs_assert (api);
+ zmq_assert (api);
break;
- case ZS_REP:
+ case ZMQ_REP:
// TODO
- zs_assert (false);
+ zmq_assert (false);
api = new rep_t (this, session);
- zs_assert (api);
+ zmq_assert (api);
break;
default:
errno = EINVAL;
@@ -168,14 +168,14 @@ zs::i_api *zs::app_thread_t::create_socket (int type_)
return api;
}
-void zs::app_thread_t::process_commands (bool block_)
+void zmq::app_thread_t::process_commands (bool block_)
{
ypollset_t::signals_t signals;
if (block_)
signals = pollset.poll ();
else {
-#if defined ZS_DELAY_COMMANDS
+#if defined ZMQ_DELAY_COMMANDS
// Optimised version of command processing - it doesn't have to check
// for incoming commands each time. It does so only if certain time
// elapsed since last command processing. Command delay varies