summaryrefslogtreecommitdiff
path: root/src/xpub.hpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-05-30 10:07:34 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-05-30 10:07:34 +0200
commit0b59866a84f733e5a53b0d2f32570581691747ef (patch)
tree8861d97915544dc4385177931f299a6f27603c92 /src/xpub.hpp
parent311fb0d852374e769d8ff791c9df38f0464960c6 (diff)
Patches from sub-forward branch incorporated
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/xpub.hpp')
-rw-r--r--src/xpub.hpp26
1 files changed, 26 insertions, 0 deletions
diff --git a/src/xpub.hpp b/src/xpub.hpp
index 8a6ff73..b824548 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -21,9 +21,14 @@
#ifndef __ZMQ_XPUB_HPP_INCLUDED__
#define __ZMQ_XPUB_HPP_INCLUDED__
+#include <deque>
+
#include "socket_base.hpp"
+#include "mtrie.hpp"
#include "array.hpp"
+#include "blob.hpp"
#include "dist.hpp"
+#include "fq.hpp"
namespace zmq
{
@@ -42,14 +47,35 @@ namespace zmq
bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
+ void xread_activated (class pipe_t *pipe_);
void xwrite_activated (class pipe_t *pipe_);
void xterminated (class pipe_t *pipe_);
private:
+ // Applies the subscription to the trie. Return false if it is a
+ // duplicate.
+ bool apply_subscription (class msg_t *sub_, class pipe_t *pipe_);
+
+ // Function to be applied to the trie to send all the subsciptions
+ // upstream.
+ static void send_unsubscription (unsigned char *data_, size_t size_,
+ void *arg_);
+
+ // List of all subscriptions mapped to corresponding pipes.
+ mtrie_t subscriptions;
+
// Distributor of messages holding the list of outbound pipes.
dist_t dist;
+ // Object to fair-queue the subscription requests.
+ fq_t fq;
+
+ // List of pending (un)subscriptions, ie. those that were already
+ // applied to the trie, but not yet received by the user.
+ typedef std::deque <blob_t> pending_t;
+ pending_t pending;
+
xpub_t (const xpub_t&);
const xpub_t &operator = (const xpub_t&);
};