diff options
Diffstat (limited to 'src/xpub.cpp')
-rw-r--r-- | src/xpub.cpp | 49 |
1 files changed, 45 insertions, 4 deletions
diff --git a/src/xpub.cpp b/src/xpub.cpp index fbb45fb..bc307e6 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -35,6 +35,10 @@ xs::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : tmp_filter_id (-1) { options.type = XS_XPUB; + options.sp_pattern = SP_PUBSUB; + options.sp_version = 4; + options.sp_role = SP_PUBSUB_PUB; + options.sp_complement = SP_PUBSUB_SUB; } xs::xpub_t::~xpub_t () @@ -44,6 +48,42 @@ xs::xpub_t::~xpub_t () it->type->pf_destroy ((void*) (core_t*) this, it->instance); } +int xs::xpub_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ != XS_PATTERN_VERSION) { + errno = EINVAL; + return -1; + } + + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + + if (!optval_) { + errno = EFAULT; + return -1; + } + + int version = *(int *) optval_; + switch (version) { + case 1: + options.legacy_protocol = true; + options.sp_version = 1; + break; + case 3: + options.legacy_protocol = false; + options.sp_version = 3; + break; + default: + errno = EINVAL; + return -1; + } + + return 0; +} + void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); @@ -53,7 +93,7 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) // to all data on this pipe, implicitly. Also, if we are using // 0MQ/2.1-style protocol, there's no subscription forwarding. Thus, // we need to subscribe for all messages automatically. - if (icanhasall_|| pipe_->get_protocol () == 1) { + if (icanhasall_|| pipe_->get_sp_version () == 1) { // Find the prefix filter. // TODO: Change this to ALL filter. @@ -121,7 +161,8 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_) int filter_id = XS_FILTER_PREFIX; #endif - if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) { + if (cmd != SP_PUBSUB_CMD_SUBSCRIBE && + cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) { sub.close (); return; } @@ -133,7 +174,7 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_) break; bool unique; - if (cmd == XS_CMD_UNSUBSCRIBE) { + if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) { xs_assert (it != filters.end ()); unique = it->type->pf_unsubscribe ((void*) (core_t*) this, it->instance, pipe_, data + 4, size - 4) ? true : false; @@ -258,7 +299,7 @@ int xs::xpub_t::filter_unsubscribed (const unsigned char *data_, size_t size_) // Place the unsubscription to the queue of pending (un)sunscriptions // to be retrived by the user later on. blob_t unsub (size_ + 4, 0); - put_uint16 ((unsigned char*) unsub.data (), XS_CMD_UNSUBSCRIBE); + put_uint16 ((unsigned char*) unsub.data (), SP_PUBSUB_CMD_UNSUBSCRIBE); put_uint16 ((unsigned char*) unsub.data () + 2, tmp_filter_id); memcpy ((void*) (unsub.data () + 4), data_, size_); |