summaryrefslogtreecommitdiff
path: root/src/sub.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-09-21 14:39:59 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-09-21 14:39:59 +0200
commitcb1b6fe32cbf3c7cf5961bb4156f2de743693a3a (patch)
treefad2a866ccb543fd4676c4539c68fb32c52dc3a3 /src/sub.cpp
parent7668b246fc3cf4a2a3b3ee9b1283ad8a4b12ac4f (diff)
initial version of req/rep sockets
Diffstat (limited to 'src/sub.cpp')
-rw-r--r--src/sub.cpp88
1 files changed, 81 insertions, 7 deletions
diff --git a/src/sub.cpp b/src/sub.cpp
index 515a843..73510c6 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -21,18 +21,69 @@
#include "sub.hpp"
#include "err.hpp"
+#include "pipe.hpp"
zmq::sub_t::sub_t (class app_thread_t *parent_) :
socket_base_t (parent_, ZMQ_SUB),
+ active (0),
+ current (0),
all_count (0)
{
}
zmq::sub_t::~sub_t ()
{
+ for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++)
+ in_pipes [i]->term ();
+ in_pipes.clear ();
}
-int zmq::sub_t::setsockopt (int option_, const void *optval_,
+bool zmq::sub_t::xrequires_in ()
+{
+ return true;
+}
+
+bool zmq::sub_t::xrequires_out ()
+{
+ return false;
+}
+
+void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (!outpipe_);
+ in_pipes.push_back (inpipe_);
+ in_pipes.swap (active, in_pipes.size () - 1);
+ active++;
+}
+
+void zmq::sub_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ if (in_pipes.index (pipe_) < active)
+ active--;
+ in_pipes.erase (pipe_);
+}
+
+void zmq::sub_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::sub_t::xkill (class reader_t *pipe_)
+{
+ // Move the pipe to the list of inactive pipes.
+ in_pipes.swap (in_pipes.index (pipe_), active - 1);
+ active--;
+}
+
+void zmq::sub_t::xrevive (class reader_t *pipe_)
+{
+ // Move the pipe to the list of active pipes.
+ in_pipes.swap (in_pipes.index (pipe_), active);
+ active++;
+}
+
+int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ == ZMQ_SUBSCRIBE) {
@@ -75,27 +126,28 @@ int zmq::sub_t::setsockopt (int option_, const void *optval_,
return 0;
}
- return socket_base_t::setsockopt (option_, optval_, optvallen_);
+ errno = EINVAL;
+ return -1;
}
-int zmq::sub_t::send (struct zmq_msg_t *msg_, int flags_)
+int zmq::sub_t::xsend (struct zmq_msg_t *msg_, int flags_)
{
errno = EFAULT;
return -1;
}
-int zmq::sub_t::flush ()
+int zmq::sub_t::xflush ()
{
errno = EFAULT;
return -1;
}
-int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_)
+int zmq::sub_t::xrecv (struct zmq_msg_t *msg_, int flags_)
{
while (true) {
- // Get a message.
- int rc = socket_base_t::recv (msg_, flags_);
+ // Get a message using fair queueing algorithm.
+ int rc = fq (msg_, flags_);
// If there's no message available, return immediately.
if (rc != 0 && errno == EAGAIN)
@@ -131,3 +183,25 @@ int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_)
return 0;
}
}
+
+int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_)
+{
+ // Deallocate old content of the message.
+ zmq_msg_close (msg_);
+
+ // Round-robin over the pipes to get next message.
+ for (int count = active; count != 0; count--) {
+ bool fetched = in_pipes [current]->read (msg_);
+ current++;
+ if (current >= active)
+ current = 0;
+ if (fetched)
+ return 0;
+ }
+
+ // No message is available. Initialise the output parameter
+ // to be a 0-byte message.
+ zmq_msg_init (msg_);
+ errno = EAGAIN;
+ return -1;
+}