diff options
Diffstat (limited to 'src/socket_base.hpp')
-rw-r--r-- | src/socket_base.hpp | 42 |
1 files changed, 41 insertions, 1 deletions
diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 20ac4e2..1f04bda 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -22,8 +22,11 @@ #include <set> #include <map> +#include <vector> #include <string> +#include <utility> +#include "i_endpoint.hpp" #include "object.hpp" #include "mutex.hpp" #include "options.hpp" @@ -32,7 +35,7 @@ namespace zmq { - class socket_base_t : public object_t + class socket_base_t : public object_t, public i_endpoint { public: @@ -57,22 +60,59 @@ namespace zmq bool unregister_session (const char *name_); class session_t *find_session (const char *name_); + // i_endpoint interface implementation. + void revive (class reader_t *pipe_); + private: // Handlers for incoming commands. void process_own (class owned_t *object_); + void process_bind (class owned_t *session_, + class reader_t *in_pipe_, class writer_t *out_pipe_); void process_term_req (class owned_t *object_); void process_term_ack (); + // Attempts to distribute the message to all the outbound pipes. + // Returns false if not possible because of pipe overflow. + bool distribute (struct zmq_msg_t *msg_, bool flush_); + + // Gets a message from one of the inbound pipes. Implementation of + // fair queueing. + bool fetch (struct zmq_msg_t *msg_); + // List of all I/O objects owned by this socket. The socket is // responsible for deallocating them before it quits. typedef std::set <class owned_t*> io_objects_t; io_objects_t io_objects; + // Inbound pipes, i.e. those the socket is getting messages from. + // The second member in the pair indicates the object on the other + // side of the pipe. + typedef std::vector <std::pair <class reader_t*, owned_t*> > + in_pipes_t; + in_pipes_t in_pipes; + + // Index of the next inbound pipe to read messages from. + in_pipes_t::size_type current; + + // Number of active inbound pipes. Active pipes are stored in the + // initial section of the in_pipes array. + in_pipes_t::size_type active; + + // Outbound pipes, i.e. those the socket is sending messages to. + // The second member in the pair indicates the object on the other + // side of the pipe. + typedef std::vector <std::pair <class writer_t*, owned_t*> > + out_pipes_t; + out_pipes_t out_pipes; + // Number of I/O objects that were already asked to terminate // but haven't acknowledged it yet. int pending_term_acks; + // Number of messages received since last command processing. + int ticks; + // Application thread the socket lives in. class app_thread_t *app_thread; |