From 0b59866a84f733e5a53b0d2f32570581691747ef Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 30 May 2011 10:07:34 +0200 Subject: Patches from sub-forward branch incorporated Signed-off-by: Martin Sustrik --- src/xsub.hpp | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) (limited to 'src/xsub.hpp') diff --git a/src/xsub.hpp b/src/xsub.hpp index ebd6259..03b3178 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -22,7 +22,10 @@ #define __ZMQ_XSUB_HPP_INCLUDED__ #include "socket_base.hpp" +#include "dist.hpp" #include "fq.hpp" +#include "trie.hpp" +#include "msg.hpp" namespace zmq { @@ -39,18 +42,43 @@ namespace zmq // Overloads of functions from socket_base_t. void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); - int xsend (class msg_t *msg_, int options_); + int xsend (class msg_t *msg_, int flags_); bool xhas_out (); int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); void xread_activated (class pipe_t *pipe_); + void xwrite_activated (class pipe_t *pipe_); + void xhiccuped (pipe_t *pipe_); void xterminated (class pipe_t *pipe_); private: + // Check whether the message matches at least one subscription. + bool match (class msg_t *msg_); + + // Function to be applied to the trie to send all the subsciptions + // upstream. + static void send_subscription (unsigned char *data_, size_t size_, + void *arg_); + // Fair queueing object for inbound pipes. fq_t fq; + // Object for distributing the subscriptions upstream. + dist_t dist; + + // The repository of subscriptions. + trie_t subscriptions; + + // If true, 'message' contains a matching message to return on the + // next recv call. + bool has_message; + msg_t message; + + // If true, part of a multipart message was already received, but + // there are following parts still waiting. + bool more; + xsub_t (const xsub_t&); const xsub_t &operator = (const xsub_t&); }; -- cgit v1.2.3