summaryrefslogtreecommitdiff
path: root/src/sub.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/sub.hpp')
-rw-r--r--src/sub.hpp38
1 files changed, 33 insertions, 5 deletions
diff --git a/src/sub.hpp b/src/sub.hpp
index 14fa687..29da27a 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -24,6 +24,7 @@
#include <string>
#include "socket_base.hpp"
+#include "yarray.hpp"
namespace zmq
{
@@ -35,14 +36,38 @@ namespace zmq
sub_t (class app_thread_t *parent_);
~sub_t ();
- // Overloads of API functions from socket_base_t.
- int setsockopt (int option_, const void *optval_, size_t optvallen_);
- int send (struct zmq_msg_t *msg_, int flags_);
- int flush ();
- int recv (struct zmq_msg_t *msg_, int flags_);
+ protected:
+
+ // Overloads of functions from socket_base_t.
+ bool xrequires_in ();
+ bool xrequires_out ();
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (struct zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (struct zmq_msg_t *msg_, int flags_);
private:
+ // Helper function to return one message choosed using
+ // fair queueing algorithm.
+ int fq (struct zmq_msg_t *msg_, int flags_);
+
+ // Inbound pipes, i.e. those the socket is getting messages from.
+ typedef yarray_t <class reader_t> in_pipes_t;
+ in_pipes_t in_pipes;
+
+ // Number of active inbound pipes. Active pipes are stored in the
+ // initial section of the in_pipes array.
+ in_pipes_t::size_type active;
+
+ // Index of the next inbound pipe to read messages from.
+ in_pipes_t::size_type current;
+
// Number of active "*" subscriptions.
int all_count;
@@ -52,6 +77,9 @@ namespace zmq
// List of all exact match subscriptions.
subscriptions_t topics;
+
+ sub_t (const sub_t&);
+ void operator = (const sub_t&);
};
}