summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-12-28 21:29:31 +0100
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-12-28 21:29:31 +0100
commit72161fb075025410312c6735d681c3de9a36a4e5 (patch)
treed1db0ed04875ccb6b385238abe11a9375ee10f91
parentc97967ed4b70de700db38cc2661bbe43262bc029 (diff)
format of subscriptions changed (no * needed anymore)
-rw-r--r--src/socket_base.cpp11
-rw-r--r--src/sub.cpp90
-rw-r--r--src/sub.hpp9
-rw-r--r--src/zmq_engine.cpp7
4 files changed, 51 insertions, 66 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 43209d5..2348f67 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -326,17 +326,20 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
if (errno != EAGAIN)
return -1;
app_thread->process_commands (false, false);
- ticks = 0;
rc = xrecv (msg_, flags_);
+ ticks = 0;
}
else {
while (rc != 0) {
- if (errno != EAGAIN)
+ if (errno == EINPROGRESS)
+ app_thread->process_commands (false, true);
+ else if (errno == EAGAIN)
+ app_thread->process_commands (true, false);
+ else
return -1;
- app_thread->process_commands (true, false);
- ticks = 0;
rc = xrecv (msg_, flags_);
}
+ ticks = 0;
}
diff --git a/src/sub.cpp b/src/sub.cpp
index e5dbe76..95039d7 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include "../bindings/c/zmq.h"
#include "sub.hpp"
@@ -67,41 +69,30 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ == ZMQ_SUBSCRIBE) {
- std::string subscription ((const char*) optval_, optvallen_);
- if (subscription == "*")
+ if (!optvallen_)
all_count++;
- else if (subscription [subscription.size () - 1] == '*')
- prefixes.insert (subscription.substr (0, subscription.size () - 1));
- else
- topics.insert (subscription);
+ else
+ subscriptions.insert (std::string ((const char*) optval_,
+ optvallen_));
return 0;
}
if (option_ == ZMQ_UNSUBSCRIBE) {
- std::string subscription ((const char*) optval_, optvallen_);
- if (subscription == "*") {
+ if (!optvallen_) {
if (!all_count) {
errno = EINVAL;
return -1;
}
all_count--;
}
- else if (subscription [subscription.size () - 1] == '*') {
- subscriptions_t::iterator it = prefixes.find (
- subscription.substr (0, subscription.size () - 1));
- if (it == prefixes.end ()) {
- errno = EINVAL;
- return -1;
- }
- prefixes.erase (it);
- }
else {
- subscriptions_t::iterator it = topics.find (subscription);
- if (it == topics.end ()) {
+ subscriptions_t::iterator it = subscriptions.find (
+ std::string ((const char*) optval_, optvallen_));
+ if (it == subscriptions.end ()) {
errno = EINVAL;
return -1;
}
- topics.erase (it);
+ subscriptions.erase (it);
}
return 0;
}
@@ -124,44 +115,37 @@ int zmq::sub_t::xflush ()
int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
{
- while (true) {
-
- // Get a message using fair queueing algorithm.
- int rc = fq.recv (msg_, flags_);
+ // Get a message using fair queueing algorithm.
+ int rc = fq.recv (msg_, flags_);
- // If there's no message available, return immediately.
- if (rc != 0 && errno == EAGAIN)
- return -1;
+ // If there's no message available, return immediately.
+ if (rc != 0 && errno == EAGAIN)
+ return -1;
- // If there is no subscription return -1/EAGAIN.
- if (!all_count && prefixes.empty () && topics.empty ()) {
- errno = EAGAIN;
- return -1;
- }
-
- // If there is at least one "*" subscription, the message matches.
- if (all_count)
- return 0;
+ // If there is at least one * subscription, the message matches.
+ if (all_count)
+ return 0;
- // Check the message format.
- // TODO: We should either ignore the message or drop the connection
- // if the message doesn't conform with the expected format.
- unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
- zmq_assert (*data <= zmq_msg_size (msg_) - 1);
- std::string topic ((const char*) (data + 1), *data);
-
- // Check whether the message matches at least one prefix subscription.
- for (subscriptions_t::iterator it = prefixes.begin ();
- it != prefixes.end (); it++)
- if (it->size () <= topic.size () &&
- *it == topic.substr (0, it->size ()))
- return 0;
-
- // Check whether the message matches an exact match subscription.
- subscriptions_t::iterator it = topics.find (topic);
- if (it != topics.end ())
+ // Check whether the message matches at least one prefix subscription.
+ // TODO: Make this efficient - O(log(n)) where n is number of characters in
+ // the longest subscription string.
+ for (subscriptions_t::iterator it = subscriptions.begin ();
+ it != subscriptions.end (); it++) {
+ size_t msg_size = zmq_msg_size (msg_);
+ size_t sub_size = it->size ();
+ if (sub_size <= msg_size &&
+ memcmp (zmq_msg_data (msg_), it->data (), sub_size) == 0)
return 0;
}
+
+ // The message did not pass the filter. Trim it.
+ // Note that we are returning a different error code so that the caller
+ // knows there are more messages available. We cannot loop here as
+ // a stream of non-matching messages would create a DoS situation.
+ zmq_msg_close (msg_);
+ zmq_msg_init (msg_);
+ errno = EINPROGRESS;
+ return -1;
}
bool zmq::sub_t::xhas_in ()
diff --git a/src/sub.hpp b/src/sub.hpp
index 1eafdac..a7cd134 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -56,16 +56,11 @@ namespace zmq
// Fair queueing object for inbound pipes.
fq_t fq;
- // Number of active "*" subscriptions.
+ // Number of active * subscriptions.
int all_count;
typedef std::multiset <std::string> subscriptions_t;
-
- // List of all prefix subscriptions.
- subscriptions_t prefixes;
-
- // List of all exact match subscriptions.
- subscriptions_t topics;
+ subscriptions_t subscriptions;
sub_t (const sub_t&);
void operator = (const sub_t&);
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index cfc87a7..b568993 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -101,8 +101,11 @@ void zmq::zmq_engine_t::in_event ()
insize -= processed;
// Stop polling for input if we got stuck.
- if (processed < insize)
- reset_pollin (handle);
+ if (processed < insize) {
+ zmq_assert (false);
+ // TODO: This may happen is queue limits are implemented.
+ // reset_pollin (handle);
+ }
// Flush all messages the decoder may have produced.
inout->flush ();