From cb1b6fe32cbf3c7cf5961bb4156f2de743693a3a Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 21 Sep 2009 14:39:59 +0200 Subject: initial version of req/rep sockets --- src/sub.hpp | 38 +++++++++++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 5 deletions(-) (limited to 'src/sub.hpp') diff --git a/src/sub.hpp b/src/sub.hpp index 14fa687..29da27a 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -24,6 +24,7 @@ #include #include "socket_base.hpp" +#include "yarray.hpp" namespace zmq { @@ -35,14 +36,38 @@ namespace zmq sub_t (class app_thread_t *parent_); ~sub_t (); - // Overloads of API functions from socket_base_t. - int setsockopt (int option_, const void *optval_, size_t optvallen_); - int send (struct zmq_msg_t *msg_, int flags_); - int flush (); - int recv (struct zmq_msg_t *msg_, int flags_); + protected: + + // Overloads of functions from socket_base_t. + bool xrequires_in (); + bool xrequires_out (); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (struct zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (struct zmq_msg_t *msg_, int flags_); private: + // Helper function to return one message choosed using + // fair queueing algorithm. + int fq (struct zmq_msg_t *msg_, int flags_); + + // Inbound pipes, i.e. those the socket is getting messages from. + typedef yarray_t in_pipes_t; + in_pipes_t in_pipes; + + // Number of active inbound pipes. Active pipes are stored in the + // initial section of the in_pipes array. + in_pipes_t::size_type active; + + // Index of the next inbound pipe to read messages from. + in_pipes_t::size_type current; + // Number of active "*" subscriptions. int all_count; @@ -52,6 +77,9 @@ namespace zmq // List of all exact match subscriptions. subscriptions_t topics; + + sub_t (const sub_t&); + void operator = (const sub_t&); }; } -- cgit v1.2.3