diff options
Diffstat (limited to 'src/pipe.cpp')
| -rw-r--r-- | src/pipe.cpp | 76 | 
1 files changed, 38 insertions, 38 deletions
| diff --git a/src/pipe.cpp b/src/pipe.cpp index 25dd51c..4911dd0 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -1,17 +1,17 @@  /* -    Copyright (c) 2009-2011 250bpm s.r.o. +    Copyright (c) 2009-2012 250bpm s.r.o.      Copyright (c) 2007-2009 iMatix Corporation      Copyright (c) 2011 VMware, Inc.      Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file -    This file is part of 0MQ. +    This file is part of Crossroads project. -    0MQ is free software; you can redistribute it and/or modify it under +    Crossroads is free software; you can redistribute it and/or modify it under      the terms of the GNU Lesser General Public License as published by      the Free Software Foundation; either version 3 of the License, or      (at your option) any later version. -    0MQ is distributed in the hope that it will be useful, +    Crossroads is distributed in the hope that it will be useful,      but WITHOUT ANY WARRANTY; without even the implied warranty of      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the      GNU Lesser General Public License for more details. @@ -26,7 +26,7 @@  #include "pipe.hpp"  #include "err.hpp" -int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], +int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],      int hwms_ [2], bool delays_ [2])  {      //   Creates two pipe objects. These objects are connected by two ypipes, @@ -50,7 +50,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],      return 0;  } -zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, +xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,        int inhwm_, int outhwm_, bool delay_) :      object_t (parent_),      inpipe (inpipe_), @@ -69,35 +69,35 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,  {  } -zmq::pipe_t::~pipe_t () +xs::pipe_t::~pipe_t ()  {  } -void zmq::pipe_t::set_peer (pipe_t *peer_) +void xs::pipe_t::set_peer (pipe_t *peer_)  {      //  Peer can be set once only. -    zmq_assert (!peer); +    xs_assert (!peer);      peer = peer_;  } -void zmq::pipe_t::set_event_sink (i_pipe_events *sink_) +void xs::pipe_t::set_event_sink (i_pipe_events *sink_)  {      // Sink can be set once only. -    zmq_assert (!sink); +    xs_assert (!sink);      sink = sink_;  } -void zmq::pipe_t::set_identity (const blob_t &identity_) +void xs::pipe_t::set_identity (const blob_t &identity_)  {      identity = identity_;  } -zmq::blob_t zmq::pipe_t::get_identity () +xs::blob_t xs::pipe_t::get_identity ()  {      return identity;  } -bool zmq::pipe_t::check_read () +bool xs::pipe_t::check_read ()  {      if (unlikely (!in_active || (state != active && state != pending)))          return false; @@ -113,7 +113,7 @@ bool zmq::pipe_t::check_read ()      if (inpipe->probe (is_delimiter)) {          msg_t msg;          bool ok = inpipe->read (&msg); -        zmq_assert (ok); +        xs_assert (ok);          delimit ();          return false;      } @@ -121,7 +121,7 @@ bool zmq::pipe_t::check_read ()      return true;  } -bool zmq::pipe_t::read (msg_t *msg_) +bool xs::pipe_t::read (msg_t *msg_)  {      if (unlikely (!in_active || (state != active && state != pending)))          return false; @@ -146,7 +146,7 @@ bool zmq::pipe_t::read (msg_t *msg_)      return true;  } -bool zmq::pipe_t::check_write (msg_t *msg_) +bool xs::pipe_t::check_write (msg_t *msg_)  {      if (unlikely (!out_active || state != active))          return false; @@ -161,7 +161,7 @@ bool zmq::pipe_t::check_write (msg_t *msg_)      return true;  } -bool zmq::pipe_t::write (msg_t *msg_) +bool xs::pipe_t::write (msg_t *msg_)  {      if (unlikely (!check_write (msg_)))          return false; @@ -174,20 +174,20 @@ bool zmq::pipe_t::write (msg_t *msg_)      return true;  } -void zmq::pipe_t::rollback () +void xs::pipe_t::rollback ()  {      //  Remove incomplete message from the outbound pipe.      msg_t msg;      if (outpipe) {  		while (outpipe->unwrite (&msg)) { -		    zmq_assert (msg.flags () & msg_t::more); +		    xs_assert (msg.flags () & msg_t::more);  		    int rc = msg.close ();  		    errno_assert (rc == 0);  		}      }  } -void zmq::pipe_t::flush () +void xs::pipe_t::flush ()  {      //  If terminate() was already called do nothing.      if (state == terminated && state == double_terminated) @@ -201,7 +201,7 @@ void zmq::pipe_t::flush ()          send_activate_read (peer);  } -void zmq::pipe_t::process_activate_read () +void xs::pipe_t::process_activate_read ()  {      if (!in_active && (state == active || state == pending)) {          in_active = true; @@ -209,7 +209,7 @@ void zmq::pipe_t::process_activate_read ()      }  } -void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) +void xs::pipe_t::process_activate_write (uint64_t msgs_read_)  {      //  Remember the peers's message sequence number.      peers_msgs_read = msgs_read_; @@ -220,11 +220,11 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)      }  } -void zmq::pipe_t::process_hiccup (void *pipe_) +void xs::pipe_t::process_hiccup (void *pipe_)  {      //  Destroy old outpipe. Note that the read end of the pipe was already      //  migrated to this thread. -    zmq_assert (outpipe); +    xs_assert (outpipe);      outpipe->flush ();      msg_t msg;      while (outpipe->read (&msg)) { @@ -234,7 +234,7 @@ void zmq::pipe_t::process_hiccup (void *pipe_)      delete outpipe;      //  Plug in the new outpipe. -    zmq_assert (pipe_); +    xs_assert (pipe_);      outpipe = (upipe_t*) pipe_;      out_active = true; @@ -243,7 +243,7 @@ void zmq::pipe_t::process_hiccup (void *pipe_)          sink->hiccuped (this);  } -void zmq::pipe_t::process_pipe_term () +void xs::pipe_t::process_pipe_term ()  {      //  This is the simple case of peer-induced termination. If there are no      //  more pending messages to read, or if the pipe was configured to drop @@ -283,13 +283,13 @@ void zmq::pipe_t::process_pipe_term ()      }      //  pipe_term is invalid in other states. -    zmq_assert (false); +    xs_assert (false);  } -void zmq::pipe_t::process_pipe_term_ack () +void xs::pipe_t::process_pipe_term_ack ()  {      //  Notify the user that all the references to the pipe should be dropped. -    zmq_assert (sink); +    xs_assert (sink);      sink->terminated (this);      //  In terminating and double_terminated states there's nothing to do. @@ -303,7 +303,7 @@ void zmq::pipe_t::process_pipe_term_ack ()          send_pipe_term_ack (peer);      }      else -        zmq_assert (false); +        xs_assert (false);      //  We'll deallocate the inbound pipe, the peer will deallocate the outbound      //  pipe (which is an inbound pipe from its point of view). @@ -321,7 +321,7 @@ void zmq::pipe_t::process_pipe_term_ack ()      delete this;  } -void zmq::pipe_t::terminate (bool delay_) +void xs::pipe_t::terminate (bool delay_)  {      //  Overload the value specified at pipe creation.      delay = delay_; @@ -364,7 +364,7 @@ void zmq::pipe_t::terminate (bool delay_)      //  There are no other states.      else -        zmq_assert (false); +        xs_assert (false);      //  Stop outbound flow of messages.      out_active = false; @@ -383,12 +383,12 @@ void zmq::pipe_t::terminate (bool delay_)      }  } -bool zmq::pipe_t::is_delimiter (msg_t &msg_) +bool xs::pipe_t::is_delimiter (msg_t &msg_)  {      return msg_.is_delimiter ();  } -int zmq::pipe_t::compute_lwm (int hwm_) +int xs::pipe_t::compute_lwm (int hwm_)  {      //  Compute the low water mark. Following point should be taken      //  into consideration: @@ -416,7 +416,7 @@ int zmq::pipe_t::compute_lwm (int hwm_)      return result;  } -void zmq::pipe_t::delimit () +void xs::pipe_t::delimit ()  {      if (state == active) {          state = delimited; @@ -431,10 +431,10 @@ void zmq::pipe_t::delimit ()      }      //  Delimiter in any other state is invalid. -    zmq_assert (false); +    xs_assert (false);  } -void zmq::pipe_t::hiccup () +void xs::pipe_t::hiccup ()  {      //  If termination is already under way do nothing.      if (state != active) | 
