From e49115224a7957b0e5d49326bc02ae6af186eaf9 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 15 Dec 2009 09:09:19 +0100 Subject: zmq_encoder/decoder are able to add/trim prefixes from messages; fair queueing and load balancing algorithms factorised into separate classes --- src/xreq.cpp | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) (limited to 'src/xreq.cpp') diff --git a/src/xreq.cpp b/src/xreq.cpp index d359dc0..9b95393 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -21,7 +21,6 @@ #include "xreq.hpp" #include "err.hpp" -#include "pipe.hpp" zmq::xreq_t::xreq_t (class app_thread_t *parent_) : socket_base_t (parent_) @@ -37,27 +36,31 @@ zmq::xreq_t::~xreq_t () void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_) { - zmq_assert (false); + zmq_assert (inpipe_ && outpipe_); + fq.attach (inpipe_); + lb.attach (outpipe_); } void zmq::xreq_t::xdetach_inpipe (class reader_t *pipe_) { - zmq_assert (false); + zmq_assert (pipe_); + fq.detach (pipe_); } void zmq::xreq_t::xdetach_outpipe (class writer_t *pipe_) { - zmq_assert (false); + zmq_assert (pipe_); + lb.detach (pipe_); } void zmq::xreq_t::xkill (class reader_t *pipe_) { - zmq_assert (false); + fq.kill (pipe_); } void zmq::xreq_t::xrevive (class reader_t *pipe_) { - zmq_assert (false); + fq.revive (pipe_); } int zmq::xreq_t::xsetsockopt (int option_, const void *optval_, @@ -69,32 +72,29 @@ int zmq::xreq_t::xsetsockopt (int option_, const void *optval_, int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) { - zmq_assert (false); - return -1; + return lb.send (msg_, flags_); } int zmq::xreq_t::xflush () { + // TODO: Implement flushing. zmq_assert (false); return -1; } int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_) { - zmq_assert (false); - return -1; + return fq.recv (msg_, flags_); } bool zmq::xreq_t::xhas_in () { - zmq_assert (false); - return false; + return fq.has_in (); } bool zmq::xreq_t::xhas_out () { - zmq_assert (false); - return false; + return lb.has_out (); } -- cgit v1.2.3