diff options
Diffstat (limited to 'src/xsub.cpp')
| -rw-r--r-- | src/xsub.cpp | 55 | 
1 files changed, 48 insertions, 7 deletions
| diff --git a/src/xsub.cpp b/src/xsub.cpp index 7272a8e..18acfea 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -29,6 +29,10 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :      socket_base_t (parent_, tid_, sid_)  {      options.type = XS_XSUB; +    options.sp_pattern = SP_PUBSUB; +    options.sp_version = 3; +    options.sp_role = SP_PUBSUB_SUB; +    options.sp_complement = SP_PUBSUB_PUB;      //  When socket is being closed down we don't want to wait till pending      //  subscription commands are sent to the wire. @@ -42,6 +46,42 @@ xs::xsub_t::~xsub_t ()  {  } +int xs::xsub_t::xsetsockopt (int option_, const void *optval_, +    size_t optvallen_) +{ +    if (option_ != XS_PROTOCOL) { +        errno = EINVAL; +        return -1; +    } + +    if (optvallen_ != sizeof (int)) { +        errno = EINVAL; +        return -1; +    } + +    if (!optval_) { +        errno = EFAULT; +        return -1; +    } + +    int version = *(int *) optval_; +    switch (version) { +    case 1: +        options.legacy_protocol = true; +        options.sp_version = 1; +        break; +    case 3: +        options.legacy_protocol = false; +        options.sp_version = 3; +        break; +    default: +        errno = EINVAL; +        return -1; +    } + +    return 0; +} +  void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)  {      xs_assert (pipe_); @@ -49,7 +89,7 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)      //  Pipes with 0MQ/2.1-style protocol are not eligible for accepting      //  subscriptions. -    if (pipe_->get_protocol () == 1) +    if (pipe_->get_sp_version () == 1)          return;      dist.attach (pipe_); @@ -75,14 +115,14 @@ void xs::xsub_t::xwrite_activated (pipe_t *pipe_)  void xs::xsub_t::xterminated (pipe_t *pipe_)  {      fq.terminated (pipe_); -    if (pipe_->get_protocol () != 1) +    if (pipe_->get_sp_version () != 1)          dist.terminated (pipe_);  }  void xs::xsub_t::xhiccuped (pipe_t *pipe_)  {      //  In 0MQ/2.1 protocol there is no subscription forwarding. -    if (pipe_->get_protocol () == 1) +    if (pipe_->get_sp_version () == 1)          return;      //  Send all the cached subscriptions to the new upstream peer. @@ -104,13 +144,13 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)      }      int cmd = get_uint16 (data);      int filter_id = get_uint16 (data + 2); -    if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) { +    if (cmd != SP_PUBSUB_CMD_SUBSCRIBE && cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) {          errno = EINVAL;          return -1;      }      //  Process the subscription. -    if (cmd == XS_CMD_SUBSCRIBE) { +    if (cmd == SP_PUBSUB_CMD_SUBSCRIBE) {          subscriptions_t::iterator it = subscriptions.insert (             std::make_pair (std::make_pair (filter_id,             blob_t (data + 4, size - 4)), 0)).first; @@ -118,7 +158,7 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)          if (it->second == 1)              return dist.send_to_all (msg_, flags_);      } -    else if (cmd == XS_CMD_UNSUBSCRIBE) { +    else if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) {          subscriptions_t::iterator it = subscriptions.find (              std::make_pair (filter_id, blob_t (data + 4, size - 4)));          if (it != subscriptions.end ()) { @@ -162,7 +202,8 @@ void xs::xsub_t::send_subscription (pipe_t *pipe_, bool subscribe_,      int rc = msg.init_size (size_ + 4);      xs_assert (rc == 0);      unsigned char *data = (unsigned char*) msg.data (); -    put_uint16 (data, subscribe_ ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE); +    put_uint16 (data, subscribe_ ? SP_PUBSUB_CMD_SUBSCRIBE : +        SP_PUBSUB_CMD_UNSUBSCRIBE);      put_uint16 (data + 2, filter_id_);      memcpy (data + 4, data_, size_); | 
