summaryrefslogtreecommitdiff
path: root/src/pipe.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/pipe.cpp')
-rw-r--r--src/pipe.cpp76
1 files changed, 38 insertions, 38 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 25dd51c..4911dd0 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -1,17 +1,17 @@
/*
- Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2009-2012 250bpm s.r.o.
Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
- This file is part of 0MQ.
+ This file is part of Crossroads project.
- 0MQ is free software; you can redistribute it and/or modify it under
+ Crossroads is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
- 0MQ is distributed in the hope that it will be useful,
+ Crossroads is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
@@ -26,7 +26,7 @@
#include "pipe.hpp"
#include "err.hpp"
-int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
+int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
int hwms_ [2], bool delays_ [2])
{
// Creates two pipe objects. These objects are connected by two ypipes,
@@ -50,7 +50,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
return 0;
}
-zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
+xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
int inhwm_, int outhwm_, bool delay_) :
object_t (parent_),
inpipe (inpipe_),
@@ -69,35 +69,35 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
{
}
-zmq::pipe_t::~pipe_t ()
+xs::pipe_t::~pipe_t ()
{
}
-void zmq::pipe_t::set_peer (pipe_t *peer_)
+void xs::pipe_t::set_peer (pipe_t *peer_)
{
// Peer can be set once only.
- zmq_assert (!peer);
+ xs_assert (!peer);
peer = peer_;
}
-void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
+void xs::pipe_t::set_event_sink (i_pipe_events *sink_)
{
// Sink can be set once only.
- zmq_assert (!sink);
+ xs_assert (!sink);
sink = sink_;
}
-void zmq::pipe_t::set_identity (const blob_t &identity_)
+void xs::pipe_t::set_identity (const blob_t &identity_)
{
identity = identity_;
}
-zmq::blob_t zmq::pipe_t::get_identity ()
+xs::blob_t xs::pipe_t::get_identity ()
{
return identity;
}
-bool zmq::pipe_t::check_read ()
+bool xs::pipe_t::check_read ()
{
if (unlikely (!in_active || (state != active && state != pending)))
return false;
@@ -113,7 +113,7 @@ bool zmq::pipe_t::check_read ()
if (inpipe->probe (is_delimiter)) {
msg_t msg;
bool ok = inpipe->read (&msg);
- zmq_assert (ok);
+ xs_assert (ok);
delimit ();
return false;
}
@@ -121,7 +121,7 @@ bool zmq::pipe_t::check_read ()
return true;
}
-bool zmq::pipe_t::read (msg_t *msg_)
+bool xs::pipe_t::read (msg_t *msg_)
{
if (unlikely (!in_active || (state != active && state != pending)))
return false;
@@ -146,7 +146,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
return true;
}
-bool zmq::pipe_t::check_write (msg_t *msg_)
+bool xs::pipe_t::check_write (msg_t *msg_)
{
if (unlikely (!out_active || state != active))
return false;
@@ -161,7 +161,7 @@ bool zmq::pipe_t::check_write (msg_t *msg_)
return true;
}
-bool zmq::pipe_t::write (msg_t *msg_)
+bool xs::pipe_t::write (msg_t *msg_)
{
if (unlikely (!check_write (msg_)))
return false;
@@ -174,20 +174,20 @@ bool zmq::pipe_t::write (msg_t *msg_)
return true;
}
-void zmq::pipe_t::rollback ()
+void xs::pipe_t::rollback ()
{
// Remove incomplete message from the outbound pipe.
msg_t msg;
if (outpipe) {
while (outpipe->unwrite (&msg)) {
- zmq_assert (msg.flags () & msg_t::more);
+ xs_assert (msg.flags () & msg_t::more);
int rc = msg.close ();
errno_assert (rc == 0);
}
}
}
-void zmq::pipe_t::flush ()
+void xs::pipe_t::flush ()
{
// If terminate() was already called do nothing.
if (state == terminated && state == double_terminated)
@@ -201,7 +201,7 @@ void zmq::pipe_t::flush ()
send_activate_read (peer);
}
-void zmq::pipe_t::process_activate_read ()
+void xs::pipe_t::process_activate_read ()
{
if (!in_active && (state == active || state == pending)) {
in_active = true;
@@ -209,7 +209,7 @@ void zmq::pipe_t::process_activate_read ()
}
}
-void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
+void xs::pipe_t::process_activate_write (uint64_t msgs_read_)
{
// Remember the peers's message sequence number.
peers_msgs_read = msgs_read_;
@@ -220,11 +220,11 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
}
}
-void zmq::pipe_t::process_hiccup (void *pipe_)
+void xs::pipe_t::process_hiccup (void *pipe_)
{
// Destroy old outpipe. Note that the read end of the pipe was already
// migrated to this thread.
- zmq_assert (outpipe);
+ xs_assert (outpipe);
outpipe->flush ();
msg_t msg;
while (outpipe->read (&msg)) {
@@ -234,7 +234,7 @@ void zmq::pipe_t::process_hiccup (void *pipe_)
delete outpipe;
// Plug in the new outpipe.
- zmq_assert (pipe_);
+ xs_assert (pipe_);
outpipe = (upipe_t*) pipe_;
out_active = true;
@@ -243,7 +243,7 @@ void zmq::pipe_t::process_hiccup (void *pipe_)
sink->hiccuped (this);
}
-void zmq::pipe_t::process_pipe_term ()
+void xs::pipe_t::process_pipe_term ()
{
// This is the simple case of peer-induced termination. If there are no
// more pending messages to read, or if the pipe was configured to drop
@@ -283,13 +283,13 @@ void zmq::pipe_t::process_pipe_term ()
}
// pipe_term is invalid in other states.
- zmq_assert (false);
+ xs_assert (false);
}
-void zmq::pipe_t::process_pipe_term_ack ()
+void xs::pipe_t::process_pipe_term_ack ()
{
// Notify the user that all the references to the pipe should be dropped.
- zmq_assert (sink);
+ xs_assert (sink);
sink->terminated (this);
// In terminating and double_terminated states there's nothing to do.
@@ -303,7 +303,7 @@ void zmq::pipe_t::process_pipe_term_ack ()
send_pipe_term_ack (peer);
}
else
- zmq_assert (false);
+ xs_assert (false);
// We'll deallocate the inbound pipe, the peer will deallocate the outbound
// pipe (which is an inbound pipe from its point of view).
@@ -321,7 +321,7 @@ void zmq::pipe_t::process_pipe_term_ack ()
delete this;
}
-void zmq::pipe_t::terminate (bool delay_)
+void xs::pipe_t::terminate (bool delay_)
{
// Overload the value specified at pipe creation.
delay = delay_;
@@ -364,7 +364,7 @@ void zmq::pipe_t::terminate (bool delay_)
// There are no other states.
else
- zmq_assert (false);
+ xs_assert (false);
// Stop outbound flow of messages.
out_active = false;
@@ -383,12 +383,12 @@ void zmq::pipe_t::terminate (bool delay_)
}
}
-bool zmq::pipe_t::is_delimiter (msg_t &msg_)
+bool xs::pipe_t::is_delimiter (msg_t &msg_)
{
return msg_.is_delimiter ();
}
-int zmq::pipe_t::compute_lwm (int hwm_)
+int xs::pipe_t::compute_lwm (int hwm_)
{
// Compute the low water mark. Following point should be taken
// into consideration:
@@ -416,7 +416,7 @@ int zmq::pipe_t::compute_lwm (int hwm_)
return result;
}
-void zmq::pipe_t::delimit ()
+void xs::pipe_t::delimit ()
{
if (state == active) {
state = delimited;
@@ -431,10 +431,10 @@ void zmq::pipe_t::delimit ()
}
// Delimiter in any other state is invalid.
- zmq_assert (false);
+ xs_assert (false);
}
-void zmq::pipe_t::hiccup ()
+void xs::pipe_t::hiccup ()
{
// If termination is already under way do nothing.
if (state != active)