summaryrefslogtreecommitdiff
path: root/src/ctx.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-02-16 10:01:47 +0900
committerMartin Sustrik <sustrik@250bpm.com>2012-02-16 10:01:47 +0900
commit4a7aad06d95701cf232198093ce396dcdbb53e5b (patch)
tree8ced8929e603a179d9434099244dfd782e705d5e /src/ctx.cpp
parent1fc63e4dbcf1438eb571d720f57be68852f820f7 (diff)
ZeroMQ renamed to Crossroads
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/ctx.cpp')
-rw-r--r--src/ctx.cpp76
1 files changed, 38 insertions, 38 deletions
diff --git a/src/ctx.cpp b/src/ctx.cpp
index d771f6f..2beda82 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -3,14 +3,14 @@
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
- This file is part of 0MQ.
+ This file is part of Crossroads project.
- 0MQ is free software; you can redistribute it and/or modify it under
+ Crossroads is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
- 0MQ is distributed in the hope that it will be useful,
+ Crossroads is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
@@ -20,7 +20,7 @@
*/
#include "platform.hpp"
-#if defined ZMQ_HAVE_WINDOWS
+#if defined XS_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
@@ -38,17 +38,17 @@
#include "err.hpp"
#include "msg.hpp"
-zmq::ctx_t::ctx_t (uint32_t io_threads_) :
+xs::ctx_t::ctx_t (uint32_t io_threads_) :
tag (0xbadcafe0),
terminating (false)
{
// Initialise the array of mailboxes. Additional three slots are for
- // zmq_term thread and reaper thread.
+ // xs_term thread and reaper thread.
slot_count = max_sockets + io_threads_ + 3;
slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
alloc_assert (slots);
- // Initialise the infrastructure for zmq_term thread.
+ // Initialise the infrastructure for xs_term thread.
slots [term_tid] = &term_mailbox;
// Create the reaper thread.
@@ -74,34 +74,34 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
}
// Create the socket to send logs to.
- log_socket = create_socket (ZMQ_PUB);
- zmq_assert (log_socket);
+ log_socket = create_socket (XS_PUB);
+ xs_assert (log_socket);
int linger = 0;
- int rc = log_socket->setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
+ int rc = log_socket->setsockopt (XS_LINGER, &linger, sizeof (linger));
errno_assert (rc == 0);
int hwm = 1;
- rc = log_socket->setsockopt (ZMQ_SNDHWM, &hwm, sizeof (hwm));
+ rc = log_socket->setsockopt (XS_SNDHWM, &hwm, sizeof (hwm));
errno_assert (rc == 0);
- rc = log_socket->connect ("ipc:///tmp/zmqlogs.ipc");
+ rc = log_socket->connect ("ipc:///tmp/xslogs.ipc");
errno_assert (rc == 0);
// Create the monitor object.
io_thread_t *io_thread = choose_io_thread (0);
- zmq_assert (io_thread);
+ xs_assert (io_thread);
monitor = new (std::nothrow) monitor_t (io_thread);
alloc_assert (monitor);
monitor->start ();
}
-bool zmq::ctx_t::check_tag ()
+bool xs::ctx_t::check_tag ()
{
return tag == 0xbadcafe0;
}
-zmq::ctx_t::~ctx_t ()
+xs::ctx_t::~ctx_t ()
{
// Check that there are no remaining sockets.
- zmq_assert (sockets.empty ());
+ xs_assert (sockets.empty ());
// Ask I/O threads to terminate. If stop signal wasn't sent to I/O
// thread subsequent invocation of destructor would hang-up.
@@ -124,7 +124,7 @@ zmq::ctx_t::~ctx_t ()
tag = 0xdeadbeef;
}
-int zmq::ctx_t::terminate ()
+int xs::ctx_t::terminate ()
{
// Check whether termination was already underway, but interrupted and now
// restarted.
@@ -139,13 +139,13 @@ int zmq::ctx_t::terminate ()
monitor->stop ();
command_t cmd;
int rc = term_mailbox.recv (&cmd, -1);
- zmq_assert (rc == 0);
- zmq_assert (cmd.type == command_t::done);
+ xs_assert (rc == 0);
+ xs_assert (cmd.type == command_t::done);
// Close the logging socket.
log_sync.lock ();
rc = log_socket->close ();
- zmq_assert (rc == 0);
+ xs_assert (rc == 0);
log_socket = NULL;
log_sync.unlock ();
@@ -166,10 +166,10 @@ int zmq::ctx_t::terminate ()
int rc = term_mailbox.recv (&cmd, -1);
if (rc == -1 && errno == EINTR)
return -1;
- zmq_assert (rc == 0);
- zmq_assert (cmd.type == command_t::done);
+ xs_assert (rc == 0);
+ xs_assert (cmd.type == command_t::done);
slot_sync.lock ();
- zmq_assert (sockets.empty ());
+ xs_assert (sockets.empty ());
slot_sync.unlock ();
// Deallocate the resources.
@@ -178,11 +178,11 @@ int zmq::ctx_t::terminate ()
return 0;
}
-zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
+xs::socket_base_t *xs::ctx_t::create_socket (int type_)
{
slot_sync.lock ();
- // Once zmq_term() was called, we can't create new sockets.
+ // Once xs_term() was called, we can't create new sockets.
if (terminating) {
slot_sync.unlock ();
errno = ETERM;
@@ -218,7 +218,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
return s;
}
-void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
+void xs::ctx_t::destroy_socket (class socket_base_t *socket_)
{
slot_sync.lock ();
@@ -230,7 +230,7 @@ void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
// Remove the socket from the list of sockets.
sockets.erase (socket_);
- // If zmq_term() was already called and there are no more socket
+ // If xs_term() was already called and there are no more socket
// we can ask reaper thread to terminate.
if (terminating && sockets.empty ())
reaper->stop ();
@@ -238,17 +238,17 @@ void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
slot_sync.unlock ();
}
-zmq::object_t *zmq::ctx_t::get_reaper ()
+xs::object_t *xs::ctx_t::get_reaper ()
{
return reaper;
}
-void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
+void xs::ctx_t::send_command (uint32_t tid_, const command_t &command_)
{
slots [tid_]->send (command_);
}
-zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
+xs::io_thread_t *xs::ctx_t::choose_io_thread (uint64_t affinity_)
{
if (io_threads.empty ())
return NULL;
@@ -265,11 +265,11 @@ zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
}
}
}
- zmq_assert (min_load != -1);
+ xs_assert (min_load != -1);
return io_threads [result];
}
-int zmq::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
+int xs::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
{
endpoints_sync.lock ();
@@ -285,7 +285,7 @@ int zmq::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
return 0;
}
-void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
+void xs::ctx_t::unregister_endpoints (socket_base_t *socket_)
{
endpoints_sync.lock ();
@@ -303,7 +303,7 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
endpoints_sync.unlock ();
}
-zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
+xs::endpoint_t xs::ctx_t::find_endpoint (const char *addr_)
{
endpoints_sync.lock ();
@@ -326,18 +326,18 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
return *endpoint;
}
-void zmq::ctx_t::log (int sid_, const char *text_)
+void xs::ctx_t::log (int sid_, const char *text_)
{
monitor->log (sid_, text_);
}
-void zmq::ctx_t::publish_logs (const char *text_)
+void xs::ctx_t::publish_logs (const char *text_)
{
log_sync.lock ();
msg_t msg;
msg.init_size (strlen (text_) + 1);
memcpy (msg.data (), text_, strlen (text_) + 1);
- int rc = log_socket->send (&msg, ZMQ_DONTWAIT);
+ int rc = log_socket->send (&msg, XS_DONTWAIT);
errno_assert (rc == 0);
msg.close ();
log_sync.unlock ();
@@ -346,5 +346,5 @@ void zmq::ctx_t::publish_logs (const char *text_)
// The last used socket ID, or 0 if no socket was used so far. Note that this
// is a global variable. Thus, even sockets created in different contexts have
// unique IDs.
-zmq::atomic_counter_t zmq::ctx_t::max_socket_id;
+xs::atomic_counter_t xs::ctx_t::max_socket_id;