From 1d76284dee8e9b0735a26ee98a3edcd9f5208f09 Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Sun, 20 May 2012 07:40:11 +0200 Subject: Implement SP wire protocol Implements the SP wire protocol, and infrastructure for legacy wire protocol support. Also added an XS_SERVICE_ID socket option to set the service id and renamed the XS_PROTOCOL option to XS_PATTERN_VERSION. The following pattern versions are supported: PAIR: v3 PUBSUB: v1 (legacy), v4 REQREP: v2 PIPELINE: v3 SURVEY: v2 Note that all existing pattern versions have been bumped by 1 to allow for use of legacy protocols (otherwise there would be no way to distinguish between e.g. PUBSUB v3 and PUBSUB v3 using SP). Signed-off-by: Martin Lucina --- src/xsub.cpp | 55 ++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 48 insertions(+), 7 deletions(-) (limited to 'src/xsub.cpp') diff --git a/src/xsub.cpp b/src/xsub.cpp index 7272a8e..9255ceb 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -29,6 +29,10 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_) { options.type = XS_XSUB; + options.sp_pattern = SP_PUBSUB; + options.sp_version = 4; + options.sp_role = SP_PUBSUB_SUB; + options.sp_complement = SP_PUBSUB_PUB; // When socket is being closed down we don't want to wait till pending // subscription commands are sent to the wire. @@ -42,6 +46,42 @@ xs::xsub_t::~xsub_t () { } +int xs::xsub_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ != XS_PATTERN_VERSION) { + errno = EINVAL; + return -1; + } + + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + + if (!optval_) { + errno = EFAULT; + return -1; + } + + int version = *(int *) optval_; + switch (version) { + case 1: + options.legacy_protocol = true; + options.sp_version = 1; + break; + case 3: + options.legacy_protocol = false; + options.sp_version = 3; + break; + default: + errno = EINVAL; + return -1; + } + + return 0; +} + void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); @@ -49,7 +89,7 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) // Pipes with 0MQ/2.1-style protocol are not eligible for accepting // subscriptions. - if (pipe_->get_protocol () == 1) + if (pipe_->get_sp_version () == 1) return; dist.attach (pipe_); @@ -75,14 +115,14 @@ void xs::xsub_t::xwrite_activated (pipe_t *pipe_) void xs::xsub_t::xterminated (pipe_t *pipe_) { fq.terminated (pipe_); - if (pipe_->get_protocol () != 1) + if (pipe_->get_sp_version () != 1) dist.terminated (pipe_); } void xs::xsub_t::xhiccuped (pipe_t *pipe_) { // In 0MQ/2.1 protocol there is no subscription forwarding. - if (pipe_->get_protocol () == 1) + if (pipe_->get_sp_version () == 1) return; // Send all the cached subscriptions to the new upstream peer. @@ -104,13 +144,13 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_) } int cmd = get_uint16 (data); int filter_id = get_uint16 (data + 2); - if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) { + if (cmd != SP_PUBSUB_CMD_SUBSCRIBE && cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) { errno = EINVAL; return -1; } // Process the subscription. - if (cmd == XS_CMD_SUBSCRIBE) { + if (cmd == SP_PUBSUB_CMD_SUBSCRIBE) { subscriptions_t::iterator it = subscriptions.insert ( std::make_pair (std::make_pair (filter_id, blob_t (data + 4, size - 4)), 0)).first; @@ -118,7 +158,7 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_) if (it->second == 1) return dist.send_to_all (msg_, flags_); } - else if (cmd == XS_CMD_UNSUBSCRIBE) { + else if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) { subscriptions_t::iterator it = subscriptions.find ( std::make_pair (filter_id, blob_t (data + 4, size - 4))); if (it != subscriptions.end ()) { @@ -162,7 +202,8 @@ void xs::xsub_t::send_subscription (pipe_t *pipe_, bool subscribe_, int rc = msg.init_size (size_ + 4); xs_assert (rc == 0); unsigned char *data = (unsigned char*) msg.data (); - put_uint16 (data, subscribe_ ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE); + put_uint16 (data, subscribe_ ? SP_PUBSUB_CMD_SUBSCRIBE : + SP_PUBSUB_CMD_UNSUBSCRIBE); put_uint16 (data + 2, filter_id_); memcpy (data + 4, data_, size_); -- cgit v1.2.3