From cb1b6fe32cbf3c7cf5961bb4156f2de743693a3a Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 21 Sep 2009 14:39:59 +0200 Subject: initial version of req/rep sockets --- src/sub.cpp | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 81 insertions(+), 7 deletions(-) (limited to 'src/sub.cpp') diff --git a/src/sub.cpp b/src/sub.cpp index 515a843..73510c6 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -21,18 +21,69 @@ #include "sub.hpp" #include "err.hpp" +#include "pipe.hpp" zmq::sub_t::sub_t (class app_thread_t *parent_) : socket_base_t (parent_, ZMQ_SUB), + active (0), + current (0), all_count (0) { } zmq::sub_t::~sub_t () { + for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++) + in_pipes [i]->term (); + in_pipes.clear (); } -int zmq::sub_t::setsockopt (int option_, const void *optval_, +bool zmq::sub_t::xrequires_in () +{ + return true; +} + +bool zmq::sub_t::xrequires_out () +{ + return false; +} + +void zmq::sub_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (!outpipe_); + in_pipes.push_back (inpipe_); + in_pipes.swap (active, in_pipes.size () - 1); + active++; +} + +void zmq::sub_t::xdetach_inpipe (class reader_t *pipe_) +{ + if (in_pipes.index (pipe_) < active) + active--; + in_pipes.erase (pipe_); +} + +void zmq::sub_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::sub_t::xkill (class reader_t *pipe_) +{ + // Move the pipe to the list of inactive pipes. + in_pipes.swap (in_pipes.index (pipe_), active - 1); + active--; +} + +void zmq::sub_t::xrevive (class reader_t *pipe_) +{ + // Move the pipe to the list of active pipes. + in_pipes.swap (in_pipes.index (pipe_), active); + active++; +} + +int zmq::sub_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { if (option_ == ZMQ_SUBSCRIBE) { @@ -75,27 +126,28 @@ int zmq::sub_t::setsockopt (int option_, const void *optval_, return 0; } - return socket_base_t::setsockopt (option_, optval_, optvallen_); + errno = EINVAL; + return -1; } -int zmq::sub_t::send (struct zmq_msg_t *msg_, int flags_) +int zmq::sub_t::xsend (struct zmq_msg_t *msg_, int flags_) { errno = EFAULT; return -1; } -int zmq::sub_t::flush () +int zmq::sub_t::xflush () { errno = EFAULT; return -1; } -int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_) +int zmq::sub_t::xrecv (struct zmq_msg_t *msg_, int flags_) { while (true) { - // Get a message. - int rc = socket_base_t::recv (msg_, flags_); + // Get a message using fair queueing algorithm. + int rc = fq (msg_, flags_); // If there's no message available, return immediately. if (rc != 0 && errno == EAGAIN) @@ -131,3 +183,25 @@ int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_) return 0; } } + +int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_) +{ + // Deallocate old content of the message. + zmq_msg_close (msg_); + + // Round-robin over the pipes to get next message. + for (int count = active; count != 0; count--) { + bool fetched = in_pipes [current]->read (msg_); + current++; + if (current >= active) + current = 0; + if (fetched) + return 0; + } + + // No message is available. Initialise the output parameter + // to be a 0-byte message. + zmq_msg_init (msg_); + errno = EAGAIN; + return -1; +} -- cgit v1.2.3