From 2daa0bb49d52aeb1aa60c94505bdad72348e5d8e Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 5 Dec 2010 09:48:52 +0100 Subject: XSUB accepts (un)subscriptions in form of messages. Signed-off-by: Martin Sustrik --- src/xsub.cpp | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-) (limited to 'src/xsub.cpp') diff --git a/src/xsub.cpp b/src/xsub.cpp index 16e1042..21cf8dd 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -54,24 +54,34 @@ void zmq::xsub_t::process_term (int linger_) socket_base_t::process_term (linger_); } -int zmq::xsub_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) +int zmq::xsub_t::xsend (zmq_msg_t *msg_, int options_) { - if (option_ == ZMQ_SUBSCRIBE) { - subscriptions.add ((unsigned char*) optval_, optvallen_); - return 0; - } - - if (option_ == ZMQ_UNSUBSCRIBE) { - if (!subscriptions.rm ((unsigned char*) optval_, optvallen_)) { - errno = EINVAL; - return -1; - } - return 0; + size_t size = zmq_msg_size (msg_); + unsigned char *data = (unsigned char*) zmq_msg_data (msg_); + + // Malformed subscriptions are dropped silently. + if (size >= 1) { + + // Process a subscription. + if (*data == 1) + subscriptions.add (data + 1, size - 1); + + // Process an unsubscription. Invalid unsubscription is ignored. + if (*data == 0) + subscriptions.rm (data + 1, size - 1); } - errno = EINVAL; - return -1; + int rc = zmq_msg_close (msg_); + zmq_assert (rc == 0); + rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return 0; +} + +bool zmq::xsub_t::xhas_out () +{ + // Subscription can be added/removed anytime. + return true; } int zmq::xsub_t::xrecv (zmq_msg_t *msg_, int flags_) -- cgit v1.2.3