diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2010-12-05 09:48:52 +0100 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2010-12-05 09:48:52 +0100 |
commit | 2daa0bb49d52aeb1aa60c94505bdad72348e5d8e (patch) | |
tree | 930b7c24d4c73c0fa8840ee524dd23d047ec12f2 | |
parent | c80e7b80cc726ca7c29493c2553c8d19792bb6e5 (diff) |
XSUB accepts (un)subscriptions in form of messages.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r-- | src/sub.cpp | 31 | ||||
-rw-r--r-- | src/sub.hpp | 4 | ||||
-rw-r--r-- | src/xsub.cpp | 40 | ||||
-rw-r--r-- | src/xsub.hpp | 3 |
4 files changed, 62 insertions, 16 deletions
diff --git a/src/sub.cpp b/src/sub.cpp index f763558..14f6730 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include "../include/zmq.h" + #include "sub.hpp" zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) : @@ -27,3 +29,32 @@ zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t tid_) : zmq::sub_t::~sub_t () { } + +int zmq::sub_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ != ZMQ_SUBSCRIBE && option_ != ZMQ_UNSUBSCRIBE) { + errno = EINVAL; + return -1; + } + + // Create the subscription message. + zmq_msg_t msg; + zmq_msg_init_size (&msg, optvallen_ + 1); + unsigned char *data = (unsigned char*) zmq_msg_data (&msg); + if (option_ == ZMQ_SUBSCRIBE) + *data = 1; + else if (option_ == ZMQ_UNSUBSCRIBE) + *data = 0; + memcpy (data + 1, optval_, optvallen_); + + // Pass it further on in the stack. + int err; + int rc = xsend (&msg, 0); + if (rc != 0) + err = errno; + zmq_msg_close (&msg); + if (rc != 0) + errno = err; + return rc; +} diff --git a/src/sub.hpp b/src/sub.hpp index 0ea1fc4..44fbc9c 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -32,6 +32,10 @@ namespace zmq sub_t (class ctx_t *parent_, uint32_t tid_); ~sub_t (); + protected: + + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + private: sub_t (const sub_t&); diff --git a/src/xsub.cpp b/src/xsub.cpp index 16e1042..21cf8dd 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -54,24 +54,34 @@ void zmq::xsub_t::process_term (int linger_) socket_base_t::process_term (linger_); } -int zmq::xsub_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) +int zmq::xsub_t::xsend (zmq_msg_t *msg_, int options_) { - if (option_ == ZMQ_SUBSCRIBE) { - subscriptions.add ((unsigned char*) optval_, optvallen_); - return 0; - } - - if (option_ == ZMQ_UNSUBSCRIBE) { - if (!subscriptions.rm ((unsigned char*) optval_, optvallen_)) { - errno = EINVAL; - return -1; - } - return 0; + size_t size = zmq_msg_size (msg_); + unsigned char *data = (unsigned char*) zmq_msg_data (msg_); + + // Malformed subscriptions are dropped silently. + if (size >= 1) { + + // Process a subscription. + if (*data == 1) + subscriptions.add (data + 1, size - 1); + + // Process an unsubscription. Invalid unsubscription is ignored. + if (*data == 0) + subscriptions.rm (data + 1, size - 1); } - errno = EINVAL; - return -1; + int rc = zmq_msg_close (msg_); + zmq_assert (rc == 0); + rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return 0; +} + +bool zmq::xsub_t::xhas_out () +{ + // Subscription can be added/removed anytime. + return true; } int zmq::xsub_t::xrecv (zmq_msg_t *msg_, int flags_) diff --git a/src/xsub.hpp b/src/xsub.hpp index 3d7b08f..5937ae1 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -41,7 +41,8 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_); - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int options_); + bool xhas_out (); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); |