summaryrefslogtreecommitdiff
path: root/src/sub.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/sub.cpp')
-rw-r--r--src/sub.cpp73
1 files changed, 50 insertions, 23 deletions
diff --git a/src/sub.cpp b/src/sub.cpp
index 2aa3901..d29ae8d 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -78,37 +78,18 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_,
int rc = it->type->sf_subscribe ((void*) (core_t*) this, it->instance,
(const unsigned char*) optval_, optvallen_);
errno_assert (rc == 0);
+ return 0;
}
else if (option_ == XS_UNSUBSCRIBE) {
xs_assert (it != filters.end ());
int rc = it->type->sf_unsubscribe ((void*) (core_t*) this, it->instance,
(const unsigned char*) optval_, optvallen_);
errno_assert (rc == 0);
+ return 0;
}
-
- // Create the subscription message.
- msg_t msg;
- int rc = msg.init_size (optvallen_ + 4);
- errno_assert (rc == 0);
- unsigned char *data = (unsigned char*) msg.data ();
- if (option_ == XS_SUBSCRIBE)
- put_uint16 (data, XS_CMD_SUBSCRIBE);
- else if (option_ == XS_UNSUBSCRIBE)
- put_uint16 (data, XS_CMD_UNSUBSCRIBE);
- put_uint16 (data + 2, options.filter);
- memcpy (data + 4, optval_, optvallen_);
-
- // Pass it further on in the stack.
- int err = 0;
- rc = xsub_t::xsend (&msg, 0);
- if (rc != 0)
- err = errno;
- int rc2 = msg.close ();
- errno_assert (rc2 == 0);
- if (rc != 0)
- errno = err;
- return rc;
+ xs_assert (false);
+ return -1;
}
int xs::sub_t::xsend (msg_t *msg_, int flags_)
@@ -214,6 +195,52 @@ bool xs::sub_t::match (msg_t *msg_)
return false;
}
+int xs::sub_t::filter_subscribed (const unsigned char *data_, size_t size_)
+{
+ // Create the subscription message.
+ msg_t msg;
+ int rc = msg.init_size (size_ + 4);
+ errno_assert (rc == 0);
+ unsigned char *data = (unsigned char*) msg.data ();
+ put_uint16 (data, XS_CMD_SUBSCRIBE);
+ put_uint16 (data + 2, options.filter);
+ memcpy (data + 4, data_, size_);
+
+ // Pass it further on in the stack.
+ int err = 0;
+ rc = xsub_t::xsend (&msg, 0);
+ if (rc != 0)
+ err = errno;
+ int rc2 = msg.close ();
+ errno_assert (rc2 == 0);
+ if (rc != 0)
+ errno = err;
+ return rc;
+}
+
+int xs::sub_t::filter_unsubscribed (const unsigned char *data_, size_t size_)
+{
+ // Create the unsubscription message.
+ msg_t msg;
+ int rc = msg.init_size (size_ + 4);
+ errno_assert (rc == 0);
+ unsigned char *data = (unsigned char*) msg.data ();
+ put_uint16 (data, XS_CMD_UNSUBSCRIBE);
+ put_uint16 (data + 2, options.filter);
+ memcpy (data + 4, data_, size_);
+
+ // Pass it further on in the stack.
+ int err = 0;
+ rc = xsub_t::xsend (&msg, 0);
+ if (rc != 0)
+ err = errno;
+ int rc2 = msg.close ();
+ errno_assert (rc2 == 0);
+ if (rc != 0)
+ errno = err;
+ return rc;
+}
+
xs::sub_session_t::sub_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :