diff options
Diffstat (limited to 'src/xsub.cpp')
-rw-r--r-- | src/xsub.cpp | 40 |
1 files changed, 25 insertions, 15 deletions
diff --git a/src/xsub.cpp b/src/xsub.cpp index 16e1042..21cf8dd 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -54,24 +54,34 @@ void zmq::xsub_t::process_term (int linger_) socket_base_t::process_term (linger_); } -int zmq::xsub_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) +int zmq::xsub_t::xsend (zmq_msg_t *msg_, int options_) { - if (option_ == ZMQ_SUBSCRIBE) { - subscriptions.add ((unsigned char*) optval_, optvallen_); - return 0; - } - - if (option_ == ZMQ_UNSUBSCRIBE) { - if (!subscriptions.rm ((unsigned char*) optval_, optvallen_)) { - errno = EINVAL; - return -1; - } - return 0; + size_t size = zmq_msg_size (msg_); + unsigned char *data = (unsigned char*) zmq_msg_data (msg_); + + // Malformed subscriptions are dropped silently. + if (size >= 1) { + + // Process a subscription. + if (*data == 1) + subscriptions.add (data + 1, size - 1); + + // Process an unsubscription. Invalid unsubscription is ignored. + if (*data == 0) + subscriptions.rm (data + 1, size - 1); } - errno = EINVAL; - return -1; + int rc = zmq_msg_close (msg_); + zmq_assert (rc == 0); + rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return 0; +} + +bool zmq::xsub_t::xhas_out () +{ + // Subscription can be added/removed anytime. + return true; } int zmq::xsub_t::xrecv (zmq_msg_t *msg_, int flags_) |