summaryrefslogtreecommitdiff
path: root/src/socket_base.hpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-27 10:54:28 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-27 10:54:28 +0200
commit99c5d9283622a0b37ee80f83ff4875c059fc5990 (patch)
tree3460ec503898d2184dc807e47eea679d24d96d5c /src/socket_base.hpp
parentaacdb7a454686bfac93164d0e67e785658d48a3c (diff)
pipes added
Diffstat (limited to 'src/socket_base.hpp')
-rw-r--r--src/socket_base.hpp42
1 files changed, 41 insertions, 1 deletions
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 20ac4e2..1f04bda 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -22,8 +22,11 @@
#include <set>
#include <map>
+#include <vector>
#include <string>
+#include <utility>
+#include "i_endpoint.hpp"
#include "object.hpp"
#include "mutex.hpp"
#include "options.hpp"
@@ -32,7 +35,7 @@
namespace zmq
{
- class socket_base_t : public object_t
+ class socket_base_t : public object_t, public i_endpoint
{
public:
@@ -57,22 +60,59 @@ namespace zmq
bool unregister_session (const char *name_);
class session_t *find_session (const char *name_);
+ // i_endpoint interface implementation.
+ void revive (class reader_t *pipe_);
+
private:
// Handlers for incoming commands.
void process_own (class owned_t *object_);
+ void process_bind (class owned_t *session_,
+ class reader_t *in_pipe_, class writer_t *out_pipe_);
void process_term_req (class owned_t *object_);
void process_term_ack ();
+ // Attempts to distribute the message to all the outbound pipes.
+ // Returns false if not possible because of pipe overflow.
+ bool distribute (struct zmq_msg_t *msg_, bool flush_);
+
+ // Gets a message from one of the inbound pipes. Implementation of
+ // fair queueing.
+ bool fetch (struct zmq_msg_t *msg_);
+
// List of all I/O objects owned by this socket. The socket is
// responsible for deallocating them before it quits.
typedef std::set <class owned_t*> io_objects_t;
io_objects_t io_objects;
+ // Inbound pipes, i.e. those the socket is getting messages from.
+ // The second member in the pair indicates the object on the other
+ // side of the pipe.
+ typedef std::vector <std::pair <class reader_t*, owned_t*> >
+ in_pipes_t;
+ in_pipes_t in_pipes;
+
+ // Index of the next inbound pipe to read messages from.
+ in_pipes_t::size_type current;
+
+ // Number of active inbound pipes. Active pipes are stored in the
+ // initial section of the in_pipes array.
+ in_pipes_t::size_type active;
+
+ // Outbound pipes, i.e. those the socket is sending messages to.
+ // The second member in the pair indicates the object on the other
+ // side of the pipe.
+ typedef std::vector <std::pair <class writer_t*, owned_t*> >
+ out_pipes_t;
+ out_pipes_t out_pipes;
+
// Number of I/O objects that were already asked to terminate
// but haven't acknowledged it yet.
int pending_term_acks;
+ // Number of messages received since last command processing.
+ int ticks;
+
// Application thread the socket lives in.
class app_thread_t *app_thread;