summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp174
1 files changed, 87 insertions, 87 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 49c6e0a..e932990 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -4,14 +4,14 @@
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
- This file is part of 0MQ.
+ This file is part of Crossroads project.
- 0MQ is free software; you can redistribute it and/or modify it under
+ Crossroads is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
- 0MQ is distributed in the hope that it will be useful,
+ Crossroads is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
@@ -26,7 +26,7 @@
#include "platform.hpp"
-#if defined ZMQ_HAVE_WINDOWS
+#if defined XS_HAVE_WINDOWS
#include "windows.hpp"
#if defined _MSC_VER
#include <intrin.h>
@@ -62,48 +62,48 @@
#include "xpub.hpp"
#include "xsub.hpp"
-bool zmq::socket_base_t::check_tag ()
+bool xs::socket_base_t::check_tag ()
{
return tag == 0xbaddecaf;
}
-zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
+xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_,
uint32_t tid_, int sid_)
{
socket_base_t *s = NULL;
switch (type_) {
- case ZMQ_PAIR:
+ case XS_PAIR:
s = new (std::nothrow) pair_t (parent_, tid_, sid_);
break;
- case ZMQ_PUB:
+ case XS_PUB:
s = new (std::nothrow) pub_t (parent_, tid_, sid_);
break;
- case ZMQ_SUB:
+ case XS_SUB:
s = new (std::nothrow) sub_t (parent_, tid_, sid_);
break;
- case ZMQ_REQ:
+ case XS_REQ:
s = new (std::nothrow) req_t (parent_, tid_, sid_);
break;
- case ZMQ_REP:
+ case XS_REP:
s = new (std::nothrow) rep_t (parent_, tid_, sid_);
break;
- case ZMQ_XREQ:
+ case XS_XREQ:
s = new (std::nothrow) xreq_t (parent_, tid_, sid_);
break;
- case ZMQ_XREP:
+ case XS_XREP:
s = new (std::nothrow) xrep_t (parent_, tid_, sid_);
break;
- case ZMQ_PULL:
+ case XS_PULL:
s = new (std::nothrow) pull_t (parent_, tid_, sid_);
break;
- case ZMQ_PUSH:
+ case XS_PUSH:
s = new (std::nothrow) push_t (parent_, tid_, sid_);
break;
- case ZMQ_XPUB:
+ case XS_XPUB:
s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
break;
- case ZMQ_XSUB:
+ case XS_XSUB:
s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
break;
default:
@@ -114,7 +114,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
return s;
}
-zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
+xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
own_t (parent_, tid_),
tag (0xbaddecaf),
ctx_terminated (false),
@@ -126,32 +126,32 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
options.socket_id = sid_;
}
-zmq::socket_base_t::~socket_base_t ()
+xs::socket_base_t::~socket_base_t ()
{
- zmq_assert (destroyed);
+ xs_assert (destroyed);
// Mark the socket as dead.
tag = 0xdeadbeef;
}
-zmq::mailbox_t *zmq::socket_base_t::get_mailbox ()
+xs::mailbox_t *xs::socket_base_t::get_mailbox ()
{
return &mailbox;
}
-void zmq::socket_base_t::stop ()
+void xs::socket_base_t::stop ()
{
- // Called by ctx when it is terminated (zmq_term).
- // 'stop' command is sent from the threads that called zmq_term to
+ // Called by ctx when it is terminated (xs_term).
+ // 'stop' command is sent from the threads that called xs_term to
// the thread owning the socket. This way, blocking call in the
// owner thread can be interrupted.
send_stop ();
}
-int zmq::socket_base_t::parse_uri (const char *uri_,
+int xs::socket_base_t::parse_uri (const char *uri_,
std::string &protocol_, std::string &address_)
{
- zmq_assert (uri_ != NULL);
+ xs_assert (uri_ != NULL);
std::string uri (uri_);
std::string::size_type pos = uri.find ("://");
@@ -168,7 +168,7 @@ int zmq::socket_base_t::parse_uri (const char *uri_,
return 0;
}
-int zmq::socket_base_t::check_protocol (const std::string &protocol_)
+int xs::socket_base_t::check_protocol (const std::string &protocol_)
{
// First check out whether the protcol is something we are aware of.
if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" &&
@@ -177,9 +177,9 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
return -1;
}
- // If 0MQ is not compiled with OpenPGM, pgm and epgm transports
+ // If Crossroads is not compiled with OpenPGM, pgm and epgm transports
// are not avaialble.
-#if !defined ZMQ_HAVE_OPENPGM
+#if !defined XS_HAVE_OPENPGM
if (protocol_ == "pgm" || protocol_ == "epgm") {
errno = EPROTONOSUPPORT;
return -1;
@@ -187,7 +187,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
#endif
// IPC transport is not available on Windows and OpenVMS.
-#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+#if defined XS_HAVE_WINDOWS || defined XS_HAVE_OPENVMS
if (protocol_ == "ipc") {
// Unknown protocol.
errno = EPROTONOSUPPORT;
@@ -199,8 +199,8 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
// Specifically, multicast protocols can't be combined with
// bi-directional messaging patterns (socket types).
if ((protocol_ == "pgm" || protocol_ == "epgm") &&
- options.type != ZMQ_PUB && options.type != ZMQ_SUB &&
- options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
+ options.type != XS_PUB && options.type != XS_SUB &&
+ options.type != XS_XPUB && options.type != XS_XSUB) {
errno = ENOCOMPATPROTO;
return -1;
}
@@ -209,7 +209,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
return 0;
}
-void zmq::socket_base_t::attach_pipe (pipe_t *pipe_)
+void xs::socket_base_t::attach_pipe (pipe_t *pipe_)
{
// First, register the pipe so that we can terminate it later on.
pipe_->set_event_sink (this);
@@ -226,7 +226,7 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_)
}
}
-int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
+int xs::socket_base_t::setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (unlikely (ctx_terminated)) {
@@ -244,7 +244,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
return options.setsockopt (option_, optval_, optvallen_);
}
-int zmq::socket_base_t::getsockopt (int option_, void *optval_,
+int xs::socket_base_t::getsockopt (int option_, void *optval_,
size_t *optvallen_)
{
if (unlikely (ctx_terminated)) {
@@ -252,7 +252,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return -1;
}
- if (option_ == ZMQ_RCVMORE) {
+ if (option_ == XS_RCVMORE) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
@@ -262,7 +262,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return 0;
}
- if (option_ == ZMQ_FD) {
+ if (option_ == XS_FD) {
if (*optvallen_ < sizeof (fd_t)) {
errno = EINVAL;
return -1;
@@ -272,7 +272,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return 0;
}
- if (option_ == ZMQ_EVENTS) {
+ if (option_ == XS_EVENTS) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
@@ -283,9 +283,9 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
errno_assert (rc == 0);
*((int*) optval_) = 0;
if (has_out ())
- *((int*) optval_) |= ZMQ_POLLOUT;
+ *((int*) optval_) |= XS_POLLOUT;
if (has_in ())
- *((int*) optval_) |= ZMQ_POLLIN;
+ *((int*) optval_) |= XS_POLLIN;
*optvallen_ = sizeof (int);
return 0;
}
@@ -293,7 +293,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return options.getsockopt (option_, optval_, optvallen_);
}
-int zmq::socket_base_t::bind (const char *addr_)
+int xs::socket_base_t::bind (const char *addr_)
{
if (unlikely (ctx_terminated)) {
errno = ETERM;
@@ -344,7 +344,7 @@ int zmq::socket_base_t::bind (const char *addr_)
return 0;
}
-#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
+#if !defined XS_HAVE_WINDOWS && !defined XS_HAVE_OPENVMS
if (protocol == "ipc") {
ipc_listener_t *listener = new (std::nothrow) ipc_listener_t (
io_thread, this, options);
@@ -359,11 +359,11 @@ int zmq::socket_base_t::bind (const char *addr_)
}
#endif
- zmq_assert (false);
+ xs_assert (false);
return -1;
}
-int zmq::socket_base_t::connect (const char *addr_)
+int xs::socket_base_t::connect (const char *addr_)
{
if (unlikely (ctx_terminated)) {
errno = ETERM;
@@ -420,11 +420,11 @@ int zmq::socket_base_t::connect (const char *addr_)
if (options.send_identity) {
msg_t id;
rc = id.init_size (options.identity_size);
- zmq_assert (rc == 0);
+ xs_assert (rc == 0);
memcpy (id.data (), options.identity, options.identity_size);
id.set_flags (msg_t::identity);
bool written = pipes [0]->write (&id);
- zmq_assert (written);
+ xs_assert (written);
}
// Attach remote end of the pipe to the peer socket. Note that peer's
@@ -467,7 +467,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return 0;
}
-int zmq::socket_base_t::send (msg_t *msg_, int flags_)
+int xs::socket_base_t::send (msg_t *msg_, int flags_)
{
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
@@ -490,7 +490,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
msg_->reset_flags (msg_t::more);
// At this point we impose the flags on the message.
- if (flags_ & ZMQ_SNDMORE)
+ if (flags_ & XS_SNDMORE)
msg_->set_flags (msg_t::more);
// Try to send the message.
@@ -502,7 +502,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
// In case of non-blocking send we'll simply propagate
// the error - including EAGAIN - up the stack.
- if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0)
+ if (flags_ & XS_DONTWAIT || options.sndtimeo == 0)
return -1;
// Compute the time when the timeout should occur.
@@ -533,7 +533,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
return 0;
}
-int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
+int xs::socket_base_t::recv (msg_t *msg_, int flags_)
{
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
@@ -576,7 +576,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// For non-blocking recv, commands are processed in case there's an
// activate_reader command already waiting int a command pipe.
// If it's not, return EAGAIN.
- if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
+ if (flags_ & XS_DONTWAIT || options.rcvtimeo == 0) {
if (unlikely (process_commands (0, false) != 0))
return -1;
ticks = 0;
@@ -621,7 +621,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
return 0;
}
-int zmq::socket_base_t::close ()
+int xs::socket_base_t::close ()
{
// Transfer the ownership of the socket from this application thread
// to the reaper thread which will take care of the rest of shutdown
@@ -631,17 +631,17 @@ int zmq::socket_base_t::close ()
return 0;
}
-bool zmq::socket_base_t::has_in ()
+bool xs::socket_base_t::has_in ()
{
return xhas_in ();
}
-bool zmq::socket_base_t::has_out ()
+bool xs::socket_base_t::has_out ()
{
return xhas_out ();
}
-void zmq::socket_base_t::start_reaping (poller_t *poller_)
+void xs::socket_base_t::start_reaping (poller_t *poller_)
{
// Plug the socket to the reaper thread.
poller = poller_;
@@ -654,7 +654,7 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
check_destroy ();
}
-int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
+int xs::socket_base_t::process_commands (int timeout_, bool throttle_)
{
int rc;
command_t cmd;
@@ -669,7 +669,7 @@ int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
// commands recently, so that we can throttle the new commands.
// Get the CPU's tick counter. If 0, the counter is not available.
- uint64_t tsc = zmq::clock_t::rdtsc ();
+ uint64_t tsc = xs::clock_t::rdtsc ();
// Optimised version of command processing - it doesn't have to check
// for incoming commands each time. It does so only if certain time
@@ -710,25 +710,25 @@ int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
return 0;
}
-void zmq::socket_base_t::process_stop ()
+void xs::socket_base_t::process_stop ()
{
- // Here, someone have called zmq_term while the socket was still alive.
+ // Here, someone have called xs_term while the socket was still alive.
// We'll remember the fact so that any blocking call is interrupted and any
// further attempt to use the socket will return ETERM. The user is still
- // responsible for calling zmq_close on the socket though!
+ // responsible for calling xs_close on the socket though!
ctx_terminated = true;
}
-void zmq::socket_base_t::process_bind (pipe_t *pipe_)
+void xs::socket_base_t::process_bind (pipe_t *pipe_)
{
attach_pipe (pipe_);
}
-void zmq::socket_base_t::process_unplug ()
+void xs::socket_base_t::process_unplug ()
{
}
-void zmq::socket_base_t::process_term (int linger_)
+void xs::socket_base_t::process_term (int linger_)
{
// Unregister all inproc endpoints associated with this socket.
// Doing this we make sure that no new pipes from other sockets (inproc)
@@ -744,55 +744,55 @@ void zmq::socket_base_t::process_term (int linger_)
own_t::process_term (linger_);
}
-void zmq::socket_base_t::process_destroy ()
+void xs::socket_base_t::process_destroy ()
{
destroyed = true;
}
-int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_,
+int xs::socket_base_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
errno = EINVAL;
return -1;
}
-bool zmq::socket_base_t::xhas_out ()
+bool xs::socket_base_t::xhas_out ()
{
return false;
}
-int zmq::socket_base_t::xsend (msg_t *msg_, int flags_)
+int xs::socket_base_t::xsend (msg_t *msg_, int flags_)
{
errno = ENOTSUP;
return -1;
}
-bool zmq::socket_base_t::xhas_in ()
+bool xs::socket_base_t::xhas_in ()
{
return false;
}
-int zmq::socket_base_t::xrecv (msg_t *msg_, int flags_)
+int xs::socket_base_t::xrecv (msg_t *msg_, int flags_)
{
errno = ENOTSUP;
return -1;
}
-void zmq::socket_base_t::xread_activated (pipe_t *pipe_)
+void xs::socket_base_t::xread_activated (pipe_t *pipe_)
{
- zmq_assert (false);
+ xs_assert (false);
}
-void zmq::socket_base_t::xwrite_activated (pipe_t *pipe_)
+void xs::socket_base_t::xwrite_activated (pipe_t *pipe_)
{
- zmq_assert (false);
+ xs_assert (false);
}
-void zmq::socket_base_t::xhiccuped (pipe_t *pipe_)
+void xs::socket_base_t::xhiccuped (pipe_t *pipe_)
{
- zmq_assert (false);
+ xs_assert (false);
}
-void zmq::socket_base_t::in_event ()
+void xs::socket_base_t::in_event ()
{
// This function is invoked only once the socket is running in the context
// of the reaper thread. Process any commands from other threads/sockets
@@ -802,17 +802,17 @@ void zmq::socket_base_t::in_event ()
check_destroy ();
}
-void zmq::socket_base_t::out_event ()
+void xs::socket_base_t::out_event ()
{
- zmq_assert (false);
+ xs_assert (false);
}
-void zmq::socket_base_t::timer_event (int id_)
+void xs::socket_base_t::timer_event (int id_)
{
- zmq_assert (false);
+ xs_assert (false);
}
-void zmq::socket_base_t::check_destroy ()
+void xs::socket_base_t::check_destroy ()
{
// If the object was already marked as destroyed, finish the deallocation.
if (destroyed) {
@@ -831,22 +831,22 @@ void zmq::socket_base_t::check_destroy ()
}
}
-void zmq::socket_base_t::read_activated (pipe_t *pipe_)
+void xs::socket_base_t::read_activated (pipe_t *pipe_)
{
xread_activated (pipe_);
}
-void zmq::socket_base_t::write_activated (pipe_t *pipe_)
+void xs::socket_base_t::write_activated (pipe_t *pipe_)
{
xwrite_activated (pipe_);
}
-void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
+void xs::socket_base_t::hiccuped (pipe_t *pipe_)
{
xhiccuped (pipe_);
}
-void zmq::socket_base_t::terminated (pipe_t *pipe_)
+void xs::socket_base_t::terminated (pipe_t *pipe_)
{
// Notify the specific socket type about the pipe termination.
xterminated (pipe_);
@@ -858,11 +858,11 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)
unregister_term_ack ();
}
-void zmq::socket_base_t::extract_flags (msg_t *msg_)
+void xs::socket_base_t::extract_flags (msg_t *msg_)
{
// Test whether IDENTITY flag is valid for this socket type.
if (unlikely (msg_->flags () & msg_t::identity))
- zmq_assert (options.recv_identity);
+ xs_assert (options.recv_identity);
// Remove MORE flag.
rcvmore = msg_->flags () & msg_t::more ? true : false;