summaryrefslogtreecommitdiff
path: root/src/socket_base.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket_base.hpp')
-rw-r--r--src/socket_base.hpp77
1 files changed, 35 insertions, 42 deletions
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 398cd32..120c932 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -27,6 +27,7 @@
#include "i_endpoint.hpp"
#include "object.hpp"
+#include "yarray_item.hpp"
#include "mutex.hpp"
#include "options.hpp"
#include "stdint.hpp"
@@ -34,41 +35,59 @@
namespace zmq
{
- class socket_base_t : public object_t, public i_endpoint
+ class socket_base_t :
+ public object_t, public i_endpoint, public yarray_item_t
{
public:
socket_base_t (class app_thread_t *parent_, int type_);
- virtual ~socket_base_t ();
// Interface for communication with the API layer.
- virtual int setsockopt (int option_, const void *optval_,
+ int setsockopt (int option_, const void *optval_,
size_t optvallen_);
- virtual int bind (const char *addr_);
- virtual int connect (const char *addr_);
- virtual int send (struct zmq_msg_t *msg_, int flags_);
- virtual int flush ();
- virtual int recv (struct zmq_msg_t *msg_, int flags_);
- virtual int close ();
+ int bind (const char *addr_);
+ int connect (const char *addr_);
+ int send (struct zmq_msg_t *msg_, int flags_);
+ int flush ();
+ int recv (struct zmq_msg_t *msg_, int flags_);
+ int close ();
// The list of sessions cannot be accessed via inter-thread
// commands as it is unacceptable to wait for the completion of the
// action till user application yields control of the application
- // thread to 0MQ.
+ // thread to 0MQ. Locking is used instead.
bool register_session (const char *name_, class session_t *session_);
bool unregister_session (const char *name_);
class session_t *find_session (const char *name_);
// i_endpoint interface implementation.
- void attach_inpipe (class reader_t *pipe_);
- void attach_outpipe (class writer_t *pipe_);
- void revive (class reader_t *pipe_);
+ void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);
+ void kill (class reader_t *pipe_);
+ void revive (class reader_t *pipe_);
- // Manipulating index in the app_thread's list of sockets.
- void set_index (int index);
- int get_index ();
+ protected:
+
+ // Destructor is protected. Socket is closed using 'close' function.
+ virtual ~socket_base_t ();
+
+ // Pipe management is done by individual socket types.
+ virtual bool xrequires_in () = 0;
+ virtual bool xrequires_out () = 0;
+ virtual void xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_) = 0;
+ virtual void xdetach_inpipe (class reader_t *pipe_) = 0;
+ virtual void xdetach_outpipe (class writer_t *pipe_) = 0;
+ virtual void xkill (class reader_t *pipe_) = 0;
+ virtual void xrevive (class reader_t *pipe_) = 0;
+
+ // Actual algorithms are to be defined by individual socket types.
+ virtual int xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_) = 0;
+ virtual int xsend (struct zmq_msg_t *msg_, int options_) = 0;
+ virtual int xflush () = 0;
+ virtual int xrecv (struct zmq_msg_t *msg_, int options_) = 0;
private:
@@ -79,14 +98,6 @@ namespace zmq
void process_term_req (class owned_t *object_);
void process_term_ack ();
- // Attempts to distribute the message to all the outbound pipes.
- // Returns false if not possible because of pipe overflow.
- bool distribute (struct zmq_msg_t *msg_, bool flush_);
-
- // Gets a message from one of the inbound pipes. Implementation of
- // fair queueing.
- bool fetch (struct zmq_msg_t *msg_);
-
// Type of the socket.
int type;
@@ -95,21 +106,6 @@ namespace zmq
typedef std::set <class owned_t*> io_objects_t;
io_objects_t io_objects;
- // Inbound pipes, i.e. those the socket is getting messages from.
- typedef std::vector <class reader_t*> in_pipes_t;
- in_pipes_t in_pipes;
-
- // Index of the next inbound pipe to read messages from.
- in_pipes_t::size_type current;
-
- // Number of active inbound pipes. Active pipes are stored in the
- // initial section of the in_pipes array.
- in_pipes_t::size_type active;
-
- // Outbound pipes, i.e. those the socket is sending messages to.
- typedef std::vector <class writer_t*> out_pipes_t;
- out_pipes_t out_pipes;
-
// Number of I/O objects that were already asked to terminate
// but haven't acknowledged it yet.
int pending_term_acks;
@@ -138,9 +134,6 @@ namespace zmq
sessions_t sessions;
mutex_t sessions_sync;
- // Index of the socket in the app_thread's list of sockets.
- int index;
-
socket_base_t (const socket_base_t&);
void operator = (const socket_base_t&);
};