summaryrefslogtreecommitdiff
path: root/src/xpub.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/xpub.cpp')
-rw-r--r--src/xpub.cpp49
1 files changed, 4 insertions, 45 deletions
diff --git a/src/xpub.cpp b/src/xpub.cpp
index bc307e6..fbb45fb 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -35,10 +35,6 @@ xs::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
tmp_filter_id (-1)
{
options.type = XS_XPUB;
- options.sp_pattern = SP_PUBSUB;
- options.sp_version = 4;
- options.sp_role = SP_PUBSUB_PUB;
- options.sp_complement = SP_PUBSUB_SUB;
}
xs::xpub_t::~xpub_t ()
@@ -48,42 +44,6 @@ xs::xpub_t::~xpub_t ()
it->type->pf_destroy ((void*) (core_t*) this, it->instance);
}
-int xs::xpub_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PATTERN_VERSION) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- switch (version) {
- case 1:
- options.legacy_protocol = true;
- options.sp_version = 1;
- break;
- case 3:
- options.legacy_protocol = false;
- options.sp_version = 3;
- break;
- default:
- errno = EINVAL;
- return -1;
- }
-
- return 0;
-}
-
void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
@@ -93,7 +53,7 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
// to all data on this pipe, implicitly. Also, if we are using
// 0MQ/2.1-style protocol, there's no subscription forwarding. Thus,
// we need to subscribe for all messages automatically.
- if (icanhasall_|| pipe_->get_sp_version () == 1) {
+ if (icanhasall_|| pipe_->get_protocol () == 1) {
// Find the prefix filter.
// TODO: Change this to ALL filter.
@@ -161,8 +121,7 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_)
int filter_id = XS_FILTER_PREFIX;
#endif
- if (cmd != SP_PUBSUB_CMD_SUBSCRIBE &&
- cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) {
+ if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) {
sub.close ();
return;
}
@@ -174,7 +133,7 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_)
break;
bool unique;
- if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) {
+ if (cmd == XS_CMD_UNSUBSCRIBE) {
xs_assert (it != filters.end ());
unique = it->type->pf_unsubscribe ((void*) (core_t*) this,
it->instance, pipe_, data + 4, size - 4) ? true : false;
@@ -299,7 +258,7 @@ int xs::xpub_t::filter_unsubscribed (const unsigned char *data_, size_t size_)
// Place the unsubscription to the queue of pending (un)sunscriptions
// to be retrived by the user later on.
blob_t unsub (size_ + 4, 0);
- put_uint16 ((unsigned char*) unsub.data (), SP_PUBSUB_CMD_UNSUBSCRIBE);
+ put_uint16 ((unsigned char*) unsub.data (), XS_CMD_UNSUBSCRIBE);
put_uint16 ((unsigned char*) unsub.data () + 2, tmp_filter_id);
memcpy ((void*) (unsub.data () + 4), data_, size_);