summaryrefslogtreecommitdiff
path: root/src/sub.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-12-28 21:29:31 +0100
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-12-28 21:29:31 +0100
commit72161fb075025410312c6735d681c3de9a36a4e5 (patch)
treed1db0ed04875ccb6b385238abe11a9375ee10f91 /src/sub.cpp
parentc97967ed4b70de700db38cc2661bbe43262bc029 (diff)
format of subscriptions changed (no * needed anymore)
Diffstat (limited to 'src/sub.cpp')
-rw-r--r--src/sub.cpp90
1 files changed, 37 insertions, 53 deletions
diff --git a/src/sub.cpp b/src/sub.cpp
index e5dbe76..95039d7 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include "../bindings/c/zmq.h"
#include "sub.hpp"
@@ -67,41 +69,30 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ == ZMQ_SUBSCRIBE) {
- std::string subscription ((const char*) optval_, optvallen_);
- if (subscription == "*")
+ if (!optvallen_)
all_count++;
- else if (subscription [subscription.size () - 1] == '*')
- prefixes.insert (subscription.substr (0, subscription.size () - 1));
- else
- topics.insert (subscription);
+ else
+ subscriptions.insert (std::string ((const char*) optval_,
+ optvallen_));
return 0;
}
if (option_ == ZMQ_UNSUBSCRIBE) {
- std::string subscription ((const char*) optval_, optvallen_);
- if (subscription == "*") {
+ if (!optvallen_) {
if (!all_count) {
errno = EINVAL;
return -1;
}
all_count--;
}
- else if (subscription [subscription.size () - 1] == '*') {
- subscriptions_t::iterator it = prefixes.find (
- subscription.substr (0, subscription.size () - 1));
- if (it == prefixes.end ()) {
- errno = EINVAL;
- return -1;
- }
- prefixes.erase (it);
- }
else {
- subscriptions_t::iterator it = topics.find (subscription);
- if (it == topics.end ()) {
+ subscriptions_t::iterator it = subscriptions.find (
+ std::string ((const char*) optval_, optvallen_));
+ if (it == subscriptions.end ()) {
errno = EINVAL;
return -1;
}
- topics.erase (it);
+ subscriptions.erase (it);
}
return 0;
}
@@ -124,44 +115,37 @@ int zmq::sub_t::xflush ()
int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
{
- while (true) {
-
- // Get a message using fair queueing algorithm.
- int rc = fq.recv (msg_, flags_);
+ // Get a message using fair queueing algorithm.
+ int rc = fq.recv (msg_, flags_);
- // If there's no message available, return immediately.
- if (rc != 0 && errno == EAGAIN)
- return -1;
+ // If there's no message available, return immediately.
+ if (rc != 0 && errno == EAGAIN)
+ return -1;
- // If there is no subscription return -1/EAGAIN.
- if (!all_count && prefixes.empty () && topics.empty ()) {
- errno = EAGAIN;
- return -1;
- }
-
- // If there is at least one "*" subscription, the message matches.
- if (all_count)
- return 0;
+ // If there is at least one * subscription, the message matches.
+ if (all_count)
+ return 0;
- // Check the message format.
- // TODO: We should either ignore the message or drop the connection
- // if the message doesn't conform with the expected format.
- unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
- zmq_assert (*data <= zmq_msg_size (msg_) - 1);
- std::string topic ((const char*) (data + 1), *data);
-
- // Check whether the message matches at least one prefix subscription.
- for (subscriptions_t::iterator it = prefixes.begin ();
- it != prefixes.end (); it++)
- if (it->size () <= topic.size () &&
- *it == topic.substr (0, it->size ()))
- return 0;
-
- // Check whether the message matches an exact match subscription.
- subscriptions_t::iterator it = topics.find (topic);
- if (it != topics.end ())
+ // Check whether the message matches at least one prefix subscription.
+ // TODO: Make this efficient - O(log(n)) where n is number of characters in
+ // the longest subscription string.
+ for (subscriptions_t::iterator it = subscriptions.begin ();
+ it != subscriptions.end (); it++) {
+ size_t msg_size = zmq_msg_size (msg_);
+ size_t sub_size = it->size ();
+ if (sub_size <= msg_size &&
+ memcmp (zmq_msg_data (msg_), it->data (), sub_size) == 0)
return 0;
}
+
+ // The message did not pass the filter. Trim it.
+ // Note that we are returning a different error code so that the caller
+ // knows there are more messages available. We cannot loop here as
+ // a stream of non-matching messages would create a DoS situation.
+ zmq_msg_close (msg_);
+ zmq_msg_init (msg_);
+ errno = EINPROGRESS;
+ return -1;
}
bool zmq::sub_t::xhas_in ()