diff options
Diffstat (limited to 'src/xsub.cpp')
-rw-r--r-- | src/xsub.cpp | 55 |
1 files changed, 7 insertions, 48 deletions
diff --git a/src/xsub.cpp b/src/xsub.cpp index 0a9ff7e..599b947 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -29,10 +29,6 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_) { options.type = XS_XSUB; - options.sp_pattern = SP_PUBSUB; - options.sp_version = 4; - options.sp_role = SP_PUBSUB_SUB; - options.sp_complement = SP_PUBSUB_PUB; // When socket is being closed down we don't want to wait till pending // subscription commands are sent to the wire. @@ -46,42 +42,6 @@ xs::xsub_t::~xsub_t () { } -int xs::xsub_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - if (option_ != XS_PATTERN_VERSION) { - errno = EINVAL; - return -1; - } - - if (optvallen_ != sizeof (int)) { - errno = EINVAL; - return -1; - } - - if (!optval_) { - errno = EFAULT; - return -1; - } - - int version = *(int *) optval_; - switch (version) { - case 1: - options.legacy_protocol = true; - options.sp_version = 1; - break; - case 3: - options.legacy_protocol = false; - options.sp_version = 3; - break; - default: - errno = EINVAL; - return -1; - } - - return 0; -} - void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); @@ -89,7 +49,7 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) // Pipes with 0MQ/2.1-style protocol are not eligible for accepting // subscriptions. - if (pipe_->get_sp_version () == 1) + if (pipe_->get_protocol () == 1) return; dist.attach (pipe_); @@ -115,14 +75,14 @@ void xs::xsub_t::xwrite_activated (pipe_t *pipe_) void xs::xsub_t::xterminated (pipe_t *pipe_) { fq.terminated (pipe_); - if (pipe_->get_sp_version () != 1) + if (pipe_->get_protocol () != 1) dist.terminated (pipe_); } void xs::xsub_t::xhiccuped (pipe_t *pipe_) { // In 0MQ/2.1 protocol there is no subscription forwarding. - if (pipe_->get_sp_version () == 1) + if (pipe_->get_protocol () == 1) return; // Send all the cached subscriptions to the new upstream peer. @@ -144,13 +104,13 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_) } int cmd = get_uint16 (data); int filter_id = get_uint16 (data + 2); - if (cmd != SP_PUBSUB_CMD_SUBSCRIBE && cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) { + if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) { errno = EINVAL; return -1; } // Process the subscription. - if (cmd == SP_PUBSUB_CMD_SUBSCRIBE) { + if (cmd == XS_CMD_SUBSCRIBE) { subscriptions_t::iterator it = subscriptions.insert ( std::make_pair (std::make_pair (filter_id, blob_t (data + 4, size - 4)), 0)).first; @@ -158,7 +118,7 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_) if (it->second == 1) return dist.send_to_all (msg_, flags_); } - else if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) { + else if (cmd == XS_CMD_UNSUBSCRIBE) { subscriptions_t::iterator it = subscriptions.find ( std::make_pair (filter_id, blob_t (data + 4, size - 4))); if (it != subscriptions.end ()) { @@ -202,8 +162,7 @@ void xs::xsub_t::send_subscription (pipe_t *pipe_, bool subscribe_, int rc = msg.init_size (size_ + 4); errno_assert (rc == 0); unsigned char *data = (unsigned char*) msg.data (); - put_uint16 (data, subscribe_ ? SP_PUBSUB_CMD_SUBSCRIBE : - SP_PUBSUB_CMD_UNSUBSCRIBE); + put_uint16 (data, subscribe_ ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE); put_uint16 (data + 2, filter_id_); memcpy (data + 4, data_, size_); |