summaryrefslogtreecommitdiff
path: root/src/sub.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/sub.cpp')
-rw-r--r--src/sub.cpp92
1 files changed, 67 insertions, 25 deletions
diff --git a/src/sub.cpp b/src/sub.cpp
index bdc27da..b8e3990 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -26,14 +26,17 @@
zmq::sub_t::sub_t (class app_thread_t *parent_) :
socket_base_t (parent_),
- all_count (0)
+ all_count (0),
+ has_message (false)
{
options.requires_in = true;
options.requires_out = false;
+ zmq_msg_init (&message);
}
zmq::sub_t::~sub_t ()
{
+ zmq_msg_close (&message);
}
void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
@@ -115,16 +118,72 @@ int zmq::sub_t::xflush ()
int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
{
- // Get a message using fair queueing algorithm.
- int rc = fq.recv (msg_, flags_);
+ // If there's already a message prepared by a previous call to zmq_poll,
+ // return it straight ahead.
+ if (has_message) {
+ zmq_msg_move (msg_, &message);
+ has_message = false;
+ return 0;
+ }
+
+ // TODO: This can result in infinite loop in the case of continuous
+ // stream of non-matching messages which breaks the non-blocking recv
+ // semantics.
+ while (true) {
- // If there's no message available, return immediately.
- if (rc != 0 && errno == EAGAIN)
- return -1;
+ // Get a message using fair queueing algorithm.
+ int rc = fq.recv (msg_, flags_);
+ // If there's no message available, return immediately.
+ // The same when error occurs.
+ if (rc != 0)
+ return -1;
+
+ // Check whether the message matches at least one subscription.
+ if (match (msg_))
+ return 0;
+ }
+}
+
+bool zmq::sub_t::xhas_in ()
+{
+ // If there's already a message prepared by a previous call to zmq_poll,
+ // return straight ahead.
+ if (has_message)
+ return true;
+
+ // TODO: This can result in infinite loop in the case of continuous
+ // stream of non-matching messages.
+ while (true) {
+
+ // Get a message using fair queueing algorithm.
+ int rc = fq.recv (&message, ZMQ_NOBLOCK);
+
+ // If there's no message available, return immediately.
+ // The same when error occurs.
+ if (rc != 0) {
+ zmq_assert (errno == EAGAIN);
+ return false;
+ }
+
+ // Check whether the message matches at least one subscription.
+ if (match (&message)) {
+ has_message = true;
+ return true;
+ }
+ }
+}
+
+bool zmq::sub_t::xhas_out ()
+{
+ return false;
+}
+
+bool zmq::sub_t::match (zmq_msg_t *msg_)
+{
// If there is at least one * subscription, the message matches.
if (all_count)
- return 0;
+ return true;
// Check whether the message matches at least one prefix subscription.
// TODO: Make this efficient - O(log(n)) where n is number of characters in
@@ -135,25 +194,8 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
size_t sub_size = it->size ();
if (sub_size <= msg_size &&
memcmp (zmq_msg_data (msg_), it->data (), sub_size) == 0)
- return 0;
+ return true;
}
- // The message did not pass the filter. Trim it.
- // Note that we are returning a different error code so that the caller
- // knows there are more messages available. We cannot loop here as
- // a stream of non-matching messages would create a DoS situation.
- zmq_msg_close (msg_);
- zmq_msg_init (msg_);
- errno = EINPROGRESS;
- return -1;
-}
-
-bool zmq::sub_t::xhas_in ()
-{
- return fq.has_in ();
-}
-
-bool zmq::sub_t::xhas_out ()
-{
return false;
}