diff options
Diffstat (limited to 'src/xpub.cpp')
| -rw-r--r-- | src/xpub.cpp | 49 | 
1 files changed, 45 insertions, 4 deletions
| diff --git a/src/xpub.cpp b/src/xpub.cpp index fbb45fb..bc307e6 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -35,6 +35,10 @@ xs::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :      tmp_filter_id (-1)  {      options.type = XS_XPUB; +    options.sp_pattern = SP_PUBSUB; +    options.sp_version = 4; +    options.sp_role = SP_PUBSUB_PUB; +    options.sp_complement = SP_PUBSUB_SUB;  }  xs::xpub_t::~xpub_t () @@ -44,6 +48,42 @@ xs::xpub_t::~xpub_t ()          it->type->pf_destroy ((void*) (core_t*) this, it->instance);  } +int xs::xpub_t::xsetsockopt (int option_, const void *optval_, +    size_t optvallen_) +{ +    if (option_ != XS_PATTERN_VERSION) { +        errno = EINVAL; +        return -1; +    } + +    if (optvallen_ != sizeof (int)) { +        errno = EINVAL; +        return -1; +    } + +    if (!optval_) { +        errno = EFAULT; +        return -1; +    } + +    int version = *(int *) optval_; +    switch (version) { +    case 1: +        options.legacy_protocol = true; +        options.sp_version = 1; +        break; +    case 3: +        options.legacy_protocol = false; +        options.sp_version = 3; +        break; +    default: +        errno = EINVAL; +        return -1; +    } + +    return 0; +} +  void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)  {      xs_assert (pipe_); @@ -53,7 +93,7 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)      //  to all data on this pipe, implicitly. Also, if we are using      //  0MQ/2.1-style protocol, there's no subscription forwarding. Thus,      //  we need to subscribe for all messages automatically. -    if (icanhasall_|| pipe_->get_protocol () == 1) { +    if (icanhasall_|| pipe_->get_sp_version () == 1) {          //  Find the prefix filter.          //  TODO: Change this to ALL filter. @@ -121,7 +161,8 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_)          int filter_id = XS_FILTER_PREFIX;  #endif -        if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) { +        if (cmd != SP_PUBSUB_CMD_SUBSCRIBE && +              cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) {              sub.close ();              return;          } @@ -133,7 +174,7 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_)                  break;          bool unique; -		if (cmd == XS_CMD_UNSUBSCRIBE) { +		if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) {              xs_assert (it != filters.end ());              unique = it->type->pf_unsubscribe ((void*) (core_t*) this,                  it->instance, pipe_, data + 4, size - 4) ? true : false; @@ -258,7 +299,7 @@ int xs::xpub_t::filter_unsubscribed (const unsigned char *data_, size_t size_)  		//  Place the unsubscription to the queue of pending (un)sunscriptions  		//  to be retrived by the user later on.  		blob_t unsub (size_ + 4, 0); -        put_uint16 ((unsigned char*) unsub.data (), XS_CMD_UNSUBSCRIBE); +        put_uint16 ((unsigned char*) unsub.data (), SP_PUBSUB_CMD_UNSUBSCRIBE);          put_uint16 ((unsigned char*) unsub.data () + 2, tmp_filter_id);  		memcpy ((void*) (unsub.data () + 4), data_, size_); | 
