From 1d76284dee8e9b0735a26ee98a3edcd9f5208f09 Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Sun, 20 May 2012 07:40:11 +0200 Subject: Implement SP wire protocol Implements the SP wire protocol, and infrastructure for legacy wire protocol support. Also added an XS_SERVICE_ID socket option to set the service id and renamed the XS_PROTOCOL option to XS_PATTERN_VERSION. The following pattern versions are supported: PAIR: v3 PUBSUB: v1 (legacy), v4 REQREP: v2 PIPELINE: v3 SURVEY: v2 Note that all existing pattern versions have been bumped by 1 to allow for use of legacy protocols (otherwise there would be no way to distinguish between e.g. PUBSUB v3 and PUBSUB v3 using SP). Signed-off-by: Martin Lucina --- src/xpub.cpp | 49 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 4 deletions(-) (limited to 'src/xpub.cpp') diff --git a/src/xpub.cpp b/src/xpub.cpp index fe0b9a7..17626b6 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -35,6 +35,10 @@ xs::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : tmp_filter_id (-1) { options.type = XS_XPUB; + options.sp_pattern = SP_PUBSUB; + options.sp_version = 4; + options.sp_role = SP_PUBSUB_PUB; + options.sp_complement = SP_PUBSUB_SUB; } xs::xpub_t::~xpub_t () @@ -44,6 +48,42 @@ xs::xpub_t::~xpub_t () it->type->pf_destroy ((void*) (core_t*) this, it->instance); } +int xs::xpub_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ != XS_PATTERN_VERSION) { + errno = EINVAL; + return -1; + } + + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + + if (!optval_) { + errno = EFAULT; + return -1; + } + + int version = *(int *) optval_; + switch (version) { + case 1: + options.legacy_protocol = true; + options.sp_version = 1; + break; + case 3: + options.legacy_protocol = false; + options.sp_version = 3; + break; + default: + errno = EINVAL; + return -1; + } + + return 0; +} + void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); @@ -53,7 +93,7 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) // to all data on this pipe, implicitly. Also, if we are using // 0MQ/2.1-style protocol, there's no subscription forwarding. Thus, // we need to subscribe for all messages automatically. - if (icanhasall_|| pipe_->get_protocol () == 1) { + if (icanhasall_|| pipe_->get_sp_version () == 1) { // Find the prefix filter. // TODO: Change this to ALL filter. @@ -121,7 +161,8 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_) int filter_id = XS_FILTER_PREFIX; #endif - if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) { + if (cmd != SP_PUBSUB_CMD_SUBSCRIBE && + cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) { sub.close (); return; } @@ -133,7 +174,7 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_) break; bool unique; - if (cmd == XS_CMD_UNSUBSCRIBE) { + if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) { xs_assert (it != filters.end ()); unique = it->type->pf_unsubscribe ((void*) (core_t*) this, it->instance, pipe_, data + 4, size - 4) ? true : false; @@ -258,7 +299,7 @@ int xs::xpub_t::filter_unsubscribed (const unsigned char *data_, size_t size_) // Place the unsubscription to the queue of pending (un)sunscriptions // to be retrived by the user later on. blob_t unsub (size_ + 4, 0); - put_uint16 ((unsigned char*) unsub.data (), XS_CMD_UNSUBSCRIBE); + put_uint16 ((unsigned char*) unsub.data (), SP_PUBSUB_CMD_UNSUBSCRIBE); put_uint16 ((unsigned char*) unsub.data () + 2, tmp_filter_id); memcpy ((void*) (unsub.data () + 4), data_, size_); -- cgit v1.2.3