summaryrefslogtreecommitdiff
path: root/src/xpub.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/xpub.cpp')
-rw-r--r--src/xpub.cpp49
1 files changed, 45 insertions, 4 deletions
diff --git a/src/xpub.cpp b/src/xpub.cpp
index fbb45fb..bc307e6 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -35,6 +35,10 @@ xs::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
tmp_filter_id (-1)
{
options.type = XS_XPUB;
+ options.sp_pattern = SP_PUBSUB;
+ options.sp_version = 4;
+ options.sp_role = SP_PUBSUB_PUB;
+ options.sp_complement = SP_PUBSUB_SUB;
}
xs::xpub_t::~xpub_t ()
@@ -44,6 +48,42 @@ xs::xpub_t::~xpub_t ()
it->type->pf_destroy ((void*) (core_t*) this, it->instance);
}
+int xs::xpub_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PATTERN_VERSION) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ switch (version) {
+ case 1:
+ options.legacy_protocol = true;
+ options.sp_version = 1;
+ break;
+ case 3:
+ options.legacy_protocol = false;
+ options.sp_version = 3;
+ break;
+ default:
+ errno = EINVAL;
+ return -1;
+ }
+
+ return 0;
+}
+
void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
@@ -53,7 +93,7 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
// to all data on this pipe, implicitly. Also, if we are using
// 0MQ/2.1-style protocol, there's no subscription forwarding. Thus,
// we need to subscribe for all messages automatically.
- if (icanhasall_|| pipe_->get_protocol () == 1) {
+ if (icanhasall_|| pipe_->get_sp_version () == 1) {
// Find the prefix filter.
// TODO: Change this to ALL filter.
@@ -121,7 +161,8 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_)
int filter_id = XS_FILTER_PREFIX;
#endif
- if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) {
+ if (cmd != SP_PUBSUB_CMD_SUBSCRIBE &&
+ cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) {
sub.close ();
return;
}
@@ -133,7 +174,7 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_)
break;
bool unique;
- if (cmd == XS_CMD_UNSUBSCRIBE) {
+ if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) {
xs_assert (it != filters.end ());
unique = it->type->pf_unsubscribe ((void*) (core_t*) this,
it->instance, pipe_, data + 4, size - 4) ? true : false;
@@ -258,7 +299,7 @@ int xs::xpub_t::filter_unsubscribed (const unsigned char *data_, size_t size_)
// Place the unsubscription to the queue of pending (un)sunscriptions
// to be retrived by the user later on.
blob_t unsub (size_ + 4, 0);
- put_uint16 ((unsigned char*) unsub.data (), XS_CMD_UNSUBSCRIBE);
+ put_uint16 ((unsigned char*) unsub.data (), SP_PUBSUB_CMD_UNSUBSCRIBE);
put_uint16 ((unsigned char*) unsub.data () + 2, tmp_filter_id);
memcpy ((void*) (unsub.data () + 4), data_, size_);