summaryrefslogtreecommitdiff
path: root/src/xsub.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/xsub.cpp')
-rw-r--r--src/xsub.cpp40
1 files changed, 25 insertions, 15 deletions
diff --git a/src/xsub.cpp b/src/xsub.cpp
index 16e1042..21cf8dd 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -54,24 +54,34 @@ void zmq::xsub_t::process_term (int linger_)
socket_base_t::process_term (linger_);
}
-int zmq::xsub_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
+int zmq::xsub_t::xsend (zmq_msg_t *msg_, int options_)
{
- if (option_ == ZMQ_SUBSCRIBE) {
- subscriptions.add ((unsigned char*) optval_, optvallen_);
- return 0;
- }
-
- if (option_ == ZMQ_UNSUBSCRIBE) {
- if (!subscriptions.rm ((unsigned char*) optval_, optvallen_)) {
- errno = EINVAL;
- return -1;
- }
- return 0;
+ size_t size = zmq_msg_size (msg_);
+ unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
+
+ // Malformed subscriptions are dropped silently.
+ if (size >= 1) {
+
+ // Process a subscription.
+ if (*data == 1)
+ subscriptions.add (data + 1, size - 1);
+
+ // Process an unsubscription. Invalid unsubscription is ignored.
+ if (*data == 0)
+ subscriptions.rm (data + 1, size - 1);
}
- errno = EINVAL;
- return -1;
+ int rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
+ rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+}
+
+bool zmq::xsub_t::xhas_out ()
+{
+ // Subscription can be added/removed anytime.
+ return true;
}
int zmq::xsub_t::xrecv (zmq_msg_t *msg_, int flags_)