diff options
Diffstat (limited to 'src/pipe.cpp')
-rw-r--r-- | src/pipe.cpp | 76 |
1 files changed, 38 insertions, 38 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index 25dd51c..4911dd0 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -1,17 +1,17 @@ /* - Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2009-2012 250bpm s.r.o. Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - This file is part of 0MQ. + This file is part of Crossroads project. - 0MQ is free software; you can redistribute it and/or modify it under + Crossroads is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. - 0MQ is distributed in the hope that it will be useful, + Crossroads is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. @@ -26,7 +26,7 @@ #include "pipe.hpp" #include "err.hpp" -int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], +int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]) { // Creates two pipe objects. These objects are connected by two ypipes, @@ -50,7 +50,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], return 0; } -zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, +xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, int inhwm_, int outhwm_, bool delay_) : object_t (parent_), inpipe (inpipe_), @@ -69,35 +69,35 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, { } -zmq::pipe_t::~pipe_t () +xs::pipe_t::~pipe_t () { } -void zmq::pipe_t::set_peer (pipe_t *peer_) +void xs::pipe_t::set_peer (pipe_t *peer_) { // Peer can be set once only. - zmq_assert (!peer); + xs_assert (!peer); peer = peer_; } -void zmq::pipe_t::set_event_sink (i_pipe_events *sink_) +void xs::pipe_t::set_event_sink (i_pipe_events *sink_) { // Sink can be set once only. - zmq_assert (!sink); + xs_assert (!sink); sink = sink_; } -void zmq::pipe_t::set_identity (const blob_t &identity_) +void xs::pipe_t::set_identity (const blob_t &identity_) { identity = identity_; } -zmq::blob_t zmq::pipe_t::get_identity () +xs::blob_t xs::pipe_t::get_identity () { return identity; } -bool zmq::pipe_t::check_read () +bool xs::pipe_t::check_read () { if (unlikely (!in_active || (state != active && state != pending))) return false; @@ -113,7 +113,7 @@ bool zmq::pipe_t::check_read () if (inpipe->probe (is_delimiter)) { msg_t msg; bool ok = inpipe->read (&msg); - zmq_assert (ok); + xs_assert (ok); delimit (); return false; } @@ -121,7 +121,7 @@ bool zmq::pipe_t::check_read () return true; } -bool zmq::pipe_t::read (msg_t *msg_) +bool xs::pipe_t::read (msg_t *msg_) { if (unlikely (!in_active || (state != active && state != pending))) return false; @@ -146,7 +146,7 @@ bool zmq::pipe_t::read (msg_t *msg_) return true; } -bool zmq::pipe_t::check_write (msg_t *msg_) +bool xs::pipe_t::check_write (msg_t *msg_) { if (unlikely (!out_active || state != active)) return false; @@ -161,7 +161,7 @@ bool zmq::pipe_t::check_write (msg_t *msg_) return true; } -bool zmq::pipe_t::write (msg_t *msg_) +bool xs::pipe_t::write (msg_t *msg_) { if (unlikely (!check_write (msg_))) return false; @@ -174,20 +174,20 @@ bool zmq::pipe_t::write (msg_t *msg_) return true; } -void zmq::pipe_t::rollback () +void xs::pipe_t::rollback () { // Remove incomplete message from the outbound pipe. msg_t msg; if (outpipe) { while (outpipe->unwrite (&msg)) { - zmq_assert (msg.flags () & msg_t::more); + xs_assert (msg.flags () & msg_t::more); int rc = msg.close (); errno_assert (rc == 0); } } } -void zmq::pipe_t::flush () +void xs::pipe_t::flush () { // If terminate() was already called do nothing. if (state == terminated && state == double_terminated) @@ -201,7 +201,7 @@ void zmq::pipe_t::flush () send_activate_read (peer); } -void zmq::pipe_t::process_activate_read () +void xs::pipe_t::process_activate_read () { if (!in_active && (state == active || state == pending)) { in_active = true; @@ -209,7 +209,7 @@ void zmq::pipe_t::process_activate_read () } } -void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) +void xs::pipe_t::process_activate_write (uint64_t msgs_read_) { // Remember the peers's message sequence number. peers_msgs_read = msgs_read_; @@ -220,11 +220,11 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) } } -void zmq::pipe_t::process_hiccup (void *pipe_) +void xs::pipe_t::process_hiccup (void *pipe_) { // Destroy old outpipe. Note that the read end of the pipe was already // migrated to this thread. - zmq_assert (outpipe); + xs_assert (outpipe); outpipe->flush (); msg_t msg; while (outpipe->read (&msg)) { @@ -234,7 +234,7 @@ void zmq::pipe_t::process_hiccup (void *pipe_) delete outpipe; // Plug in the new outpipe. - zmq_assert (pipe_); + xs_assert (pipe_); outpipe = (upipe_t*) pipe_; out_active = true; @@ -243,7 +243,7 @@ void zmq::pipe_t::process_hiccup (void *pipe_) sink->hiccuped (this); } -void zmq::pipe_t::process_pipe_term () +void xs::pipe_t::process_pipe_term () { // This is the simple case of peer-induced termination. If there are no // more pending messages to read, or if the pipe was configured to drop @@ -283,13 +283,13 @@ void zmq::pipe_t::process_pipe_term () } // pipe_term is invalid in other states. - zmq_assert (false); + xs_assert (false); } -void zmq::pipe_t::process_pipe_term_ack () +void xs::pipe_t::process_pipe_term_ack () { // Notify the user that all the references to the pipe should be dropped. - zmq_assert (sink); + xs_assert (sink); sink->terminated (this); // In terminating and double_terminated states there's nothing to do. @@ -303,7 +303,7 @@ void zmq::pipe_t::process_pipe_term_ack () send_pipe_term_ack (peer); } else - zmq_assert (false); + xs_assert (false); // We'll deallocate the inbound pipe, the peer will deallocate the outbound // pipe (which is an inbound pipe from its point of view). @@ -321,7 +321,7 @@ void zmq::pipe_t::process_pipe_term_ack () delete this; } -void zmq::pipe_t::terminate (bool delay_) +void xs::pipe_t::terminate (bool delay_) { // Overload the value specified at pipe creation. delay = delay_; @@ -364,7 +364,7 @@ void zmq::pipe_t::terminate (bool delay_) // There are no other states. else - zmq_assert (false); + xs_assert (false); // Stop outbound flow of messages. out_active = false; @@ -383,12 +383,12 @@ void zmq::pipe_t::terminate (bool delay_) } } -bool zmq::pipe_t::is_delimiter (msg_t &msg_) +bool xs::pipe_t::is_delimiter (msg_t &msg_) { return msg_.is_delimiter (); } -int zmq::pipe_t::compute_lwm (int hwm_) +int xs::pipe_t::compute_lwm (int hwm_) { // Compute the low water mark. Following point should be taken // into consideration: @@ -416,7 +416,7 @@ int zmq::pipe_t::compute_lwm (int hwm_) return result; } -void zmq::pipe_t::delimit () +void xs::pipe_t::delimit () { if (state == active) { state = delimited; @@ -431,10 +431,10 @@ void zmq::pipe_t::delimit () } // Delimiter in any other state is invalid. - zmq_assert (false); + xs_assert (false); } -void zmq::pipe_t::hiccup () +void xs::pipe_t::hiccup () { // If termination is already under way do nothing. if (state != active) |