summaryrefslogtreecommitdiff
path: root/src/fair_aggregator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/fair_aggregator.cpp')
-rw-r--r--src/fair_aggregator.cpp30
1 files changed, 15 insertions, 15 deletions
diff --git a/src/fair_aggregator.cpp b/src/fair_aggregator.cpp
index 65bfac0..1e6937f 100644
--- a/src/fair_aggregator.cpp
+++ b/src/fair_aggregator.cpp
@@ -17,7 +17,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "../include/zs.h"
+#include "../include/zmq.h"
#include "fair_aggregator.hpp"
#include "err.hpp"
@@ -30,27 +30,27 @@
pipes [i1_]->set_index (i1_);\
pipes [i2_]->set_index (i2_);
-zs::fair_aggregator_t::fair_aggregator_t () :
+zmq::fair_aggregator_t::fair_aggregator_t () :
session (NULL),
active (0),
current (0)
{
}
-void zs::fair_aggregator_t::set_session (session_t *session_)
+void zmq::fair_aggregator_t::set_session (session_t *session_)
{
- zs_assert (!session);
+ zmq_assert (!session);
session = session_;
}
-void zs::fair_aggregator_t::shutdown ()
+void zmq::fair_aggregator_t::shutdown ()
{
// No need to deallocate pipes here. They'll be deallocated during the
// shutdown of the dispatcher.
delete this;
}
-void zs::fair_aggregator_t::terminate ()
+void zmq::fair_aggregator_t::terminate ()
{
// Pipe unregisters itself during the call to terminate, so the pipes
// list shinks by one in each iteration.
@@ -60,11 +60,11 @@ void zs::fair_aggregator_t::terminate ()
delete this;
}
-zs::fair_aggregator_t::~fair_aggregator_t ()
+zmq::fair_aggregator_t::~fair_aggregator_t ()
{
}
-void zs::fair_aggregator_t::attach_pipe (pipe_reader_t *pipe_)
+void zmq::fair_aggregator_t::attach_pipe (pipe_reader_t *pipe_)
{
// Associate new pipe with the mux object.
pipe_->set_mux (this);
@@ -76,7 +76,7 @@ void zs::fair_aggregator_t::attach_pipe (pipe_reader_t *pipe_)
session->revive ();
}
-void zs::fair_aggregator_t::detach_pipe (pipe_reader_t *pipe_)
+void zmq::fair_aggregator_t::detach_pipe (pipe_reader_t *pipe_)
{
// Move the pipe from the list of active pipes to the list of idle pipes.
deactivate (pipe_);
@@ -86,15 +86,15 @@ void zs::fair_aggregator_t::detach_pipe (pipe_reader_t *pipe_)
pipes.pop_back ();
}
-bool zs::fair_aggregator_t::empty ()
+bool zmq::fair_aggregator_t::empty ()
{
return pipes.empty ();
}
-bool zs::fair_aggregator_t::recv (zs_msg *msg_)
+bool zmq::fair_aggregator_t::recv (zmq_msg *msg_)
{
// Deallocate old content of the message.
- zs_msg_close (msg_);
+ zmq_msg_close (msg_);
// O(1) fair queueing. Round-robin over the active pipes to get
// next message.
@@ -110,11 +110,11 @@ bool zs::fair_aggregator_t::recv (zs_msg *msg_)
// No message is available. Initialise the output parameter
// to be a 0-byte message.
- zs_msg_init (msg_);
+ zmq_msg_init (msg_);
return false;
}
-void zs::fair_aggregator_t::deactivate (pipe_reader_t *pipe_)
+void zmq::fair_aggregator_t::deactivate (pipe_reader_t *pipe_)
{
int index = pipe_->get_index ();
@@ -133,7 +133,7 @@ void zs::fair_aggregator_t::deactivate (pipe_reader_t *pipe_)
}
}
-void zs::fair_aggregator_t::reactivate (pipe_reader_t *pipe_)
+void zmq::fair_aggregator_t::reactivate (pipe_reader_t *pipe_)
{
// Revive an idle pipe.
swap_pipes (pipe_->get_index (), active);