diff options
Diffstat (limited to 'src/xpub.cpp')
-rw-r--r-- | src/xpub.cpp | 153 |
1 files changed, 125 insertions, 28 deletions
diff --git a/src/xpub.cpp b/src/xpub.cpp index 255c063..fe0b9a7 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -21,20 +21,27 @@ #include <string.h> +#include "../include/xs.h" + #include "xpub.hpp" #include "pipe.hpp" +#include "wire.hpp" #include "err.hpp" #include "msg.hpp" xs::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), - more (false) + more (false), + tmp_filter_id (-1) { options.type = XS_XPUB; } xs::xpub_t::~xpub_t () { + // Deallocate all the filters. + for (filters_t::iterator it = filters.begin (); it != filters.end (); ++it) + it->type->pf_destroy ((void*) (core_t*) this, it->instance); } void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) @@ -46,8 +53,27 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) // to all data on this pipe, implicitly. Also, if we are using // 0MQ/2.1-style protocol, there's no subscription forwarding. Thus, // we need to subscribe for all messages automatically. - if (icanhasall_ || pipe_->get_protocol () == 1) - subscriptions.add (NULL, 0, pipe_); + if (icanhasall_|| pipe_->get_protocol () == 1) { + + // Find the prefix filter. + // TODO: Change this to ALL filter. + filters_t::iterator it; + for (it = filters.begin (); it != filters.end (); ++it) + if (it->type->id (NULL) == XS_FILTER_PREFIX) + break; + if (it == filters.end ()) { + filter_t f; + f.type = get_filter (XS_FILTER_PREFIX); + xs_assert (f.type); + f.instance = f.type->pf_create ((void*) (core_t*) this); + xs_assert (f.instance); + filters.push_back (f); + it = filters.end () - 1; + } + + it->type->pf_subscribe ((void*) (core_t*) this, it->instance, pipe_, + NULL, 0); + } // The pipe is active when attached. Let's read the subscriptions from // it, if any. @@ -73,16 +99,72 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_) // TODO: In the case of malformed subscription we will simply ignore // it for now. However, we should close the connection instead. - if (size <= 0 || (*data == 0 && *data == 1)) { + if (size < 4) { + sub.close (); + return; + } + +#if 0 + // TODO: This is 0MQ/3.1 protocol. + if (size < 1) { sub.close (); return; } +#endif + + int cmd = get_uint16 (data); + int filter_id = get_uint16 (data + 2); + +#if 0 + // TODO: This is 0MQ/3.1 protocol. + int cmd = data [0] ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE; + int filter_id = XS_FILTER_PREFIX; +#endif + + if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) { + sub.close (); + return; + } + + // Find the relevant filter. + filters_t::iterator it; + for (it = filters.begin (); it != filters.end (); ++it) + if (it->type->id (NULL) == filter_id) + break; bool unique; - if (*data == 0) - unique = subscriptions.rm (data + 1, size - 1, pipe_); - else - unique = subscriptions.add (data + 1, size - 1, pipe_); + if (cmd == XS_CMD_UNSUBSCRIBE) { + xs_assert (it != filters.end ()); + unique = it->type->pf_unsubscribe ((void*) (core_t*) this, + it->instance, pipe_, data + 4, size - 4) ? true : false; +#if 0 + // TODO: This is 0MQ/3.1 protocol. + unique = it->type->pf_unsubscribe ((void*) (core_t*) this, + it->instance, pipe_, data + 1, size - 1) ? true : false; +#endif + } + else { + + // If the filter of the specified type does not exist yet, + // create it. + if (it == filters.end ()) { + filter_t f; + f.type = get_filter (filter_id); + xs_assert (f.type); + f.instance = f.type->pf_create ((void*) (core_t*) this); + xs_assert (f.instance); + filters.push_back (f); + it = filters.end () - 1; + } + + unique = it->type->pf_subscribe ((void*) (core_t*) this, + it->instance, pipe_, data + 4, size - 4) ? true : false; +#if 0 + // TODO: This is 0MQ/3.1 protocol. + unique = it->type->pf_subscribe ((void*) (core_t*) this, + it->instance, pipe_, data + 1, size - 1) ? true : false; +#endif + } // If the subscription is not a duplicate store it so that it can be // passed to used on next recv call. @@ -100,28 +182,29 @@ void xs::xpub_t::xwrite_activated (pipe_t *pipe_) void xs::xpub_t::xterminated (pipe_t *pipe_) { - // Remove the pipe from the trie. If there are topics that nobody - // is interested in anymore, send corresponding unsubscriptions - // upstream. - subscriptions.rm (pipe_, send_unsubscription, this); + // Remove the pipe from all the filters. + for (filters_t::iterator it = filters.begin (); it != filters.end (); + ++it) { + tmp_filter_id = it->type->id (NULL); + it->type->pf_unsubscribe_all ((void*) (core_t*) this, it->instance, + (void*) pipe_); + tmp_filter_id = -1; + } dist.terminated (pipe_); } -void xs::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) -{ - xpub_t *self = (xpub_t*) arg_; - self->dist.match (pipe_); -} - int xs::xpub_t::xsend (msg_t *msg_, int flags_) { bool msg_more = msg_->flags () & msg_t::more ? true : false; // For the first part of multi-part message, find the matching pipes. - if (!more) - subscriptions.match ((unsigned char*) msg_->data (), msg_->size (), - mark_as_matching, this); + if (!more) { + for (filters_t::iterator it = filters.begin (); it != filters.end (); + ++it) + it->type->pf_match ((void*) (core_t*) this, it->instance, + (unsigned char*) msg_->data (), msg_->size ()); + } // Send the message to all the pipes that were marked as matching // in the previous step. @@ -167,21 +250,35 @@ bool xs::xpub_t::xhas_in () return !pending.empty (); } -void xs::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, - void *arg_) +int xs::xpub_t::filter_unsubscribed (const unsigned char *data_, size_t size_) { - xpub_t *self = (xpub_t*) arg_; - - if (self->options.type != XS_PUB) { + // In XS_PUB socket, the subscriptions are not passed upstream. + if (options.type != XS_PUB) { // Place the unsubscription to the queue of pending (un)sunscriptions // to be retrived by the user later on. - xpub_t *self = (xpub_t*) arg_; + blob_t unsub (size_ + 4, 0); + put_uint16 ((unsigned char*) unsub.data (), XS_CMD_UNSUBSCRIBE); + put_uint16 ((unsigned char*) unsub.data () + 2, tmp_filter_id); + memcpy ((void*) (unsub.data () + 4), data_, size_); + +#if 0 + // TODO: This is 0MQ/3.1 protocol. blob_t unsub (size_ + 1, 0); unsub [0] = 0; memcpy ((void*) (unsub.data () + 1), data_, size_); - self->pending.push_back (unsub); +#endif + + pending.push_back (unsub); } + + return 0; +} + +int xs::xpub_t::filter_matching (void *subscriber_) +{ + dist.match ((xs::pipe_t*) subscriber_); + return 0; } xs::xpub_session_t::xpub_session_t (io_thread_t *io_thread_, bool connect_, |