diff options
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r-- | src/socket_base.cpp | 174 |
1 files changed, 87 insertions, 87 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 49c6e0a..e932990 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -4,14 +4,14 @@ Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - This file is part of 0MQ. + This file is part of Crossroads project. - 0MQ is free software; you can redistribute it and/or modify it under + Crossroads is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. - 0MQ is distributed in the hope that it will be useful, + Crossroads is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. @@ -26,7 +26,7 @@ #include "platform.hpp" -#if defined ZMQ_HAVE_WINDOWS +#if defined XS_HAVE_WINDOWS #include "windows.hpp" #if defined _MSC_VER #include <intrin.h> @@ -62,48 +62,48 @@ #include "xpub.hpp" #include "xsub.hpp" -bool zmq::socket_base_t::check_tag () +bool xs::socket_base_t::check_tag () { return tag == 0xbaddecaf; } -zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, +xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_, uint32_t tid_, int sid_) { socket_base_t *s = NULL; switch (type_) { - case ZMQ_PAIR: + case XS_PAIR: s = new (std::nothrow) pair_t (parent_, tid_, sid_); break; - case ZMQ_PUB: + case XS_PUB: s = new (std::nothrow) pub_t (parent_, tid_, sid_); break; - case ZMQ_SUB: + case XS_SUB: s = new (std::nothrow) sub_t (parent_, tid_, sid_); break; - case ZMQ_REQ: + case XS_REQ: s = new (std::nothrow) req_t (parent_, tid_, sid_); break; - case ZMQ_REP: + case XS_REP: s = new (std::nothrow) rep_t (parent_, tid_, sid_); break; - case ZMQ_XREQ: + case XS_XREQ: s = new (std::nothrow) xreq_t (parent_, tid_, sid_); break; - case ZMQ_XREP: + case XS_XREP: s = new (std::nothrow) xrep_t (parent_, tid_, sid_); break; - case ZMQ_PULL: + case XS_PULL: s = new (std::nothrow) pull_t (parent_, tid_, sid_); break; - case ZMQ_PUSH: + case XS_PUSH: s = new (std::nothrow) push_t (parent_, tid_, sid_); break; - case ZMQ_XPUB: + case XS_XPUB: s = new (std::nothrow) xpub_t (parent_, tid_, sid_); break; - case ZMQ_XSUB: + case XS_XSUB: s = new (std::nothrow) xsub_t (parent_, tid_, sid_); break; default: @@ -114,7 +114,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, return s; } -zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : +xs::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : own_t (parent_, tid_), tag (0xbaddecaf), ctx_terminated (false), @@ -126,32 +126,32 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) : options.socket_id = sid_; } -zmq::socket_base_t::~socket_base_t () +xs::socket_base_t::~socket_base_t () { - zmq_assert (destroyed); + xs_assert (destroyed); // Mark the socket as dead. tag = 0xdeadbeef; } -zmq::mailbox_t *zmq::socket_base_t::get_mailbox () +xs::mailbox_t *xs::socket_base_t::get_mailbox () { return &mailbox; } -void zmq::socket_base_t::stop () +void xs::socket_base_t::stop () { - // Called by ctx when it is terminated (zmq_term). - // 'stop' command is sent from the threads that called zmq_term to + // Called by ctx when it is terminated (xs_term). + // 'stop' command is sent from the threads that called xs_term to // the thread owning the socket. This way, blocking call in the // owner thread can be interrupted. send_stop (); } -int zmq::socket_base_t::parse_uri (const char *uri_, +int xs::socket_base_t::parse_uri (const char *uri_, std::string &protocol_, std::string &address_) { - zmq_assert (uri_ != NULL); + xs_assert (uri_ != NULL); std::string uri (uri_); std::string::size_type pos = uri.find ("://"); @@ -168,7 +168,7 @@ int zmq::socket_base_t::parse_uri (const char *uri_, return 0; } -int zmq::socket_base_t::check_protocol (const std::string &protocol_) +int xs::socket_base_t::check_protocol (const std::string &protocol_) { // First check out whether the protcol is something we are aware of. if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && @@ -177,9 +177,9 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) return -1; } - // If 0MQ is not compiled with OpenPGM, pgm and epgm transports + // If Crossroads is not compiled with OpenPGM, pgm and epgm transports // are not avaialble. -#if !defined ZMQ_HAVE_OPENPGM +#if !defined XS_HAVE_OPENPGM if (protocol_ == "pgm" || protocol_ == "epgm") { errno = EPROTONOSUPPORT; return -1; @@ -187,7 +187,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) #endif // IPC transport is not available on Windows and OpenVMS. -#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS +#if defined XS_HAVE_WINDOWS || defined XS_HAVE_OPENVMS if (protocol_ == "ipc") { // Unknown protocol. errno = EPROTONOSUPPORT; @@ -199,8 +199,8 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) // Specifically, multicast protocols can't be combined with // bi-directional messaging patterns (socket types). if ((protocol_ == "pgm" || protocol_ == "epgm") && - options.type != ZMQ_PUB && options.type != ZMQ_SUB && - options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) { + options.type != XS_PUB && options.type != XS_SUB && + options.type != XS_XPUB && options.type != XS_XSUB) { errno = ENOCOMPATPROTO; return -1; } @@ -209,7 +209,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) return 0; } -void zmq::socket_base_t::attach_pipe (pipe_t *pipe_) +void xs::socket_base_t::attach_pipe (pipe_t *pipe_) { // First, register the pipe so that we can terminate it later on. pipe_->set_event_sink (this); @@ -226,7 +226,7 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_) } } -int zmq::socket_base_t::setsockopt (int option_, const void *optval_, +int xs::socket_base_t::setsockopt (int option_, const void *optval_, size_t optvallen_) { if (unlikely (ctx_terminated)) { @@ -244,7 +244,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, return options.setsockopt (option_, optval_, optvallen_); } -int zmq::socket_base_t::getsockopt (int option_, void *optval_, +int xs::socket_base_t::getsockopt (int option_, void *optval_, size_t *optvallen_) { if (unlikely (ctx_terminated)) { @@ -252,7 +252,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, return -1; } - if (option_ == ZMQ_RCVMORE) { + if (option_ == XS_RCVMORE) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; return -1; @@ -262,7 +262,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, return 0; } - if (option_ == ZMQ_FD) { + if (option_ == XS_FD) { if (*optvallen_ < sizeof (fd_t)) { errno = EINVAL; return -1; @@ -272,7 +272,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, return 0; } - if (option_ == ZMQ_EVENTS) { + if (option_ == XS_EVENTS) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; return -1; @@ -283,9 +283,9 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, errno_assert (rc == 0); *((int*) optval_) = 0; if (has_out ()) - *((int*) optval_) |= ZMQ_POLLOUT; + *((int*) optval_) |= XS_POLLOUT; if (has_in ()) - *((int*) optval_) |= ZMQ_POLLIN; + *((int*) optval_) |= XS_POLLIN; *optvallen_ = sizeof (int); return 0; } @@ -293,7 +293,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, return options.getsockopt (option_, optval_, optvallen_); } -int zmq::socket_base_t::bind (const char *addr_) +int xs::socket_base_t::bind (const char *addr_) { if (unlikely (ctx_terminated)) { errno = ETERM; @@ -344,7 +344,7 @@ int zmq::socket_base_t::bind (const char *addr_) return 0; } -#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#if !defined XS_HAVE_WINDOWS && !defined XS_HAVE_OPENVMS if (protocol == "ipc") { ipc_listener_t *listener = new (std::nothrow) ipc_listener_t ( io_thread, this, options); @@ -359,11 +359,11 @@ int zmq::socket_base_t::bind (const char *addr_) } #endif - zmq_assert (false); + xs_assert (false); return -1; } -int zmq::socket_base_t::connect (const char *addr_) +int xs::socket_base_t::connect (const char *addr_) { if (unlikely (ctx_terminated)) { errno = ETERM; @@ -420,11 +420,11 @@ int zmq::socket_base_t::connect (const char *addr_) if (options.send_identity) { msg_t id; rc = id.init_size (options.identity_size); - zmq_assert (rc == 0); + xs_assert (rc == 0); memcpy (id.data (), options.identity, options.identity_size); id.set_flags (msg_t::identity); bool written = pipes [0]->write (&id); - zmq_assert (written); + xs_assert (written); } // Attach remote end of the pipe to the peer socket. Note that peer's @@ -467,7 +467,7 @@ int zmq::socket_base_t::connect (const char *addr_) return 0; } -int zmq::socket_base_t::send (msg_t *msg_, int flags_) +int xs::socket_base_t::send (msg_t *msg_, int flags_) { // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { @@ -490,7 +490,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) msg_->reset_flags (msg_t::more); // At this point we impose the flags on the message. - if (flags_ & ZMQ_SNDMORE) + if (flags_ & XS_SNDMORE) msg_->set_flags (msg_t::more); // Try to send the message. @@ -502,7 +502,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) // In case of non-blocking send we'll simply propagate // the error - including EAGAIN - up the stack. - if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) + if (flags_ & XS_DONTWAIT || options.sndtimeo == 0) return -1; // Compute the time when the timeout should occur. @@ -533,7 +533,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) return 0; } -int zmq::socket_base_t::recv (msg_t *msg_, int flags_) +int xs::socket_base_t::recv (msg_t *msg_, int flags_) { // Check whether the library haven't been shut down yet. if (unlikely (ctx_terminated)) { @@ -576,7 +576,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // For non-blocking recv, commands are processed in case there's an // activate_reader command already waiting int a command pipe. // If it's not, return EAGAIN. - if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) { + if (flags_ & XS_DONTWAIT || options.rcvtimeo == 0) { if (unlikely (process_commands (0, false) != 0)) return -1; ticks = 0; @@ -621,7 +621,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) return 0; } -int zmq::socket_base_t::close () +int xs::socket_base_t::close () { // Transfer the ownership of the socket from this application thread // to the reaper thread which will take care of the rest of shutdown @@ -631,17 +631,17 @@ int zmq::socket_base_t::close () return 0; } -bool zmq::socket_base_t::has_in () +bool xs::socket_base_t::has_in () { return xhas_in (); } -bool zmq::socket_base_t::has_out () +bool xs::socket_base_t::has_out () { return xhas_out (); } -void zmq::socket_base_t::start_reaping (poller_t *poller_) +void xs::socket_base_t::start_reaping (poller_t *poller_) { // Plug the socket to the reaper thread. poller = poller_; @@ -654,7 +654,7 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) check_destroy (); } -int zmq::socket_base_t::process_commands (int timeout_, bool throttle_) +int xs::socket_base_t::process_commands (int timeout_, bool throttle_) { int rc; command_t cmd; @@ -669,7 +669,7 @@ int zmq::socket_base_t::process_commands (int timeout_, bool throttle_) // commands recently, so that we can throttle the new commands. // Get the CPU's tick counter. If 0, the counter is not available. - uint64_t tsc = zmq::clock_t::rdtsc (); + uint64_t tsc = xs::clock_t::rdtsc (); // Optimised version of command processing - it doesn't have to check // for incoming commands each time. It does so only if certain time @@ -710,25 +710,25 @@ int zmq::socket_base_t::process_commands (int timeout_, bool throttle_) return 0; } -void zmq::socket_base_t::process_stop () +void xs::socket_base_t::process_stop () { - // Here, someone have called zmq_term while the socket was still alive. + // Here, someone have called xs_term while the socket was still alive. // We'll remember the fact so that any blocking call is interrupted and any // further attempt to use the socket will return ETERM. The user is still - // responsible for calling zmq_close on the socket though! + // responsible for calling xs_close on the socket though! ctx_terminated = true; } -void zmq::socket_base_t::process_bind (pipe_t *pipe_) +void xs::socket_base_t::process_bind (pipe_t *pipe_) { attach_pipe (pipe_); } -void zmq::socket_base_t::process_unplug () +void xs::socket_base_t::process_unplug () { } -void zmq::socket_base_t::process_term (int linger_) +void xs::socket_base_t::process_term (int linger_) { // Unregister all inproc endpoints associated with this socket. // Doing this we make sure that no new pipes from other sockets (inproc) @@ -744,55 +744,55 @@ void zmq::socket_base_t::process_term (int linger_) own_t::process_term (linger_); } -void zmq::socket_base_t::process_destroy () +void xs::socket_base_t::process_destroy () { destroyed = true; } -int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_, +int xs::socket_base_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { errno = EINVAL; return -1; } -bool zmq::socket_base_t::xhas_out () +bool xs::socket_base_t::xhas_out () { return false; } -int zmq::socket_base_t::xsend (msg_t *msg_, int flags_) +int xs::socket_base_t::xsend (msg_t *msg_, int flags_) { errno = ENOTSUP; return -1; } -bool zmq::socket_base_t::xhas_in () +bool xs::socket_base_t::xhas_in () { return false; } -int zmq::socket_base_t::xrecv (msg_t *msg_, int flags_) +int xs::socket_base_t::xrecv (msg_t *msg_, int flags_) { errno = ENOTSUP; return -1; } -void zmq::socket_base_t::xread_activated (pipe_t *pipe_) +void xs::socket_base_t::xread_activated (pipe_t *pipe_) { - zmq_assert (false); + xs_assert (false); } -void zmq::socket_base_t::xwrite_activated (pipe_t *pipe_) +void xs::socket_base_t::xwrite_activated (pipe_t *pipe_) { - zmq_assert (false); + xs_assert (false); } -void zmq::socket_base_t::xhiccuped (pipe_t *pipe_) +void xs::socket_base_t::xhiccuped (pipe_t *pipe_) { - zmq_assert (false); + xs_assert (false); } -void zmq::socket_base_t::in_event () +void xs::socket_base_t::in_event () { // This function is invoked only once the socket is running in the context // of the reaper thread. Process any commands from other threads/sockets @@ -802,17 +802,17 @@ void zmq::socket_base_t::in_event () check_destroy (); } -void zmq::socket_base_t::out_event () +void xs::socket_base_t::out_event () { - zmq_assert (false); + xs_assert (false); } -void zmq::socket_base_t::timer_event (int id_) +void xs::socket_base_t::timer_event (int id_) { - zmq_assert (false); + xs_assert (false); } -void zmq::socket_base_t::check_destroy () +void xs::socket_base_t::check_destroy () { // If the object was already marked as destroyed, finish the deallocation. if (destroyed) { @@ -831,22 +831,22 @@ void zmq::socket_base_t::check_destroy () } } -void zmq::socket_base_t::read_activated (pipe_t *pipe_) +void xs::socket_base_t::read_activated (pipe_t *pipe_) { xread_activated (pipe_); } -void zmq::socket_base_t::write_activated (pipe_t *pipe_) +void xs::socket_base_t::write_activated (pipe_t *pipe_) { xwrite_activated (pipe_); } -void zmq::socket_base_t::hiccuped (pipe_t *pipe_) +void xs::socket_base_t::hiccuped (pipe_t *pipe_) { xhiccuped (pipe_); } -void zmq::socket_base_t::terminated (pipe_t *pipe_) +void xs::socket_base_t::terminated (pipe_t *pipe_) { // Notify the specific socket type about the pipe termination. xterminated (pipe_); @@ -858,11 +858,11 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_) unregister_term_ack (); } -void zmq::socket_base_t::extract_flags (msg_t *msg_) +void xs::socket_base_t::extract_flags (msg_t *msg_) { // Test whether IDENTITY flag is valid for this socket type. if (unlikely (msg_->flags () & msg_t::identity)) - zmq_assert (options.recv_identity); + xs_assert (options.recv_identity); // Remove MORE flag. rcvmore = msg_->flags () & msg_t::more ? true : false; |