diff options
40 files changed, 361 insertions, 221 deletions
| diff --git a/src/command.hpp b/src/command.hpp index ecf2d93..8378369 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -27,12 +27,18 @@  namespace zmq  { +    class object_t; +    class own_t; +    struct i_engine; +    class pipe_t; +    class socket_base_t; +      //  This structure defines the commands that can be sent between threads.      struct command_t      {          //  Object to process the command. -        class object_t *destination; +        zmq::object_t *destination;          enum type_t          { @@ -67,7 +73,7 @@ namespace zmq              //  Sent to socket to let it know about the newly created object.              struct { -                class own_t *object; +                zmq::own_t *object;              } own;              //  Attach the engine to the session. If engine is NULL, it informs @@ -79,7 +85,7 @@ namespace zmq              //  Sent from session to socket to establish pipe(s) between them.              //  Caller have used inc_seqnum beforehand sending the command.              struct { -                class pipe_t *pipe; +                zmq::pipe_t *pipe;              } bind;              //  Sent by pipe writer to inform dormant pipe reader that there @@ -112,7 +118,7 @@ namespace zmq              //  Sent by I/O object ot the socket to request the shutdown of              //  the I/O object.              struct { -                class own_t *object; +                zmq::own_t *object;              } term_req;              //  Sent by socket to I/O object to start its shutdown. @@ -128,7 +134,7 @@ namespace zmq              //  Transfers the ownership of the closed socket              //  to the reaper thread.              struct { -                class socket_base_t *socket; +                zmq::socket_base_t *socket;              } reap;              //  Closed socket notifies the reaper that it's already deallocated. diff --git a/src/ctx.hpp b/src/ctx.hpp index 619d57e..e09149d 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -36,6 +36,12 @@  namespace zmq  { + +    class object_t; +    class io_thread_t; +    class socket_base_t; +    class reaper_t; +      //  Information associated with inproc endpoint. Note that endpoint options      //  are registered as well so that the peer can access them without a need      //  for synchronisation, handshaking or similar. @@ -66,8 +72,8 @@ namespace zmq          int terminate ();          //  Create and destroy a socket. -        class socket_base_t *create_socket (int type_); -        void destroy_socket (class socket_base_t *socket_); +        zmq::socket_base_t *create_socket (int type_); +        void destroy_socket (zmq::socket_base_t *socket_);          //  Send command to the destination thread.          void send_command (uint32_t tid_, const command_t &command_); @@ -75,14 +81,14 @@ namespace zmq          //  Returns the I/O thread that is the least busy at the moment.          //  Affinity specifies which I/O threads are eligible (0 = all).          //  Returns NULL is no I/O thread is available. -        class io_thread_t *choose_io_thread (uint64_t affinity_); +        zmq::io_thread_t *choose_io_thread (uint64_t affinity_);          //  Returns reaper thread object. -        class object_t *get_reaper (); +        zmq::object_t *get_reaper ();          //  Management of inproc endpoints.          int register_endpoint (const char *addr_, endpoint_t &endpoint_); -        void unregister_endpoints (class socket_base_t *socket_); +        void unregister_endpoints (zmq::socket_base_t *socket_);          endpoint_t find_endpoint (const char *addr_);          //  Logging. @@ -120,10 +126,10 @@ namespace zmq          mutex_t slot_sync;          //  The reaper thread. -        class reaper_t *reaper; +        zmq::reaper_t *reaper;          //  I/O threads. -        typedef std::vector <class io_thread_t*> io_threads_t; +        typedef std::vector <zmq::io_thread_t*> io_threads_t;          io_threads_t io_threads;          //  Array of pointers to mailboxes for both application and I/O threads. @@ -142,7 +148,7 @@ namespace zmq          //  PUB socket for logging. The socket is shared among all the threads,          //  thus it is synchronised by a mutex. -        class socket_base_t *log_socket; +        zmq::socket_base_t *log_socket;          mutex_t log_sync;          ctx_t (const ctx_t&); diff --git a/src/decoder.hpp b/src/decoder.hpp index c6f9100..4afd018 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -34,6 +34,8 @@  namespace zmq  { +    class session_base_t; +      //  Helper base class for decoders that know the amount of data to read      //  in advance at any moment. Knowing the amount in advance is a property      //  of the protocol used. 0MQ framing protocol is based size-prefixed @@ -193,7 +195,7 @@ namespace zmq          decoder_t (size_t bufsize_, int64_t maxmsgsize_);          ~decoder_t (); -        void set_session (class session_base_t *session_); +        void set_session (zmq::session_base_t *session_);      private: @@ -202,7 +204,7 @@ namespace zmq          bool flags_ready ();          bool message_ready (); -        class session_base_t *session; +        zmq::session_base_t *session;          unsigned char tmpbuf [8];          msg_t in_progress; diff --git a/src/devpoll.hpp b/src/devpoll.hpp index 1de1af0..382be0f 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -35,6 +35,8 @@  namespace zmq  { +    struct i_poll_events; +      //  Implements socket polling mechanism using the "/dev/poll" interface.      class devpoll_t : public poller_base_t @@ -47,7 +49,7 @@ namespace zmq          ~devpoll_t ();          //  "poller" concept. -        handle_t add_fd (fd_t fd_, struct i_poll_events *events_); +        handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);          void rm_fd (handle_t handle_);          void set_pollin (handle_t handle_);          void reset_pollin (handle_t handle_); @@ -70,7 +72,7 @@ namespace zmq          struct fd_entry_t          {              short events; -            struct i_poll_events *reactor; +            zmq::i_poll_events *reactor;              bool valid;              bool accepted;          }; diff --git a/src/dist.hpp b/src/dist.hpp index a72de6e..be86ab2 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -29,6 +29,9 @@  namespace zmq  { +    class pipe_t; +    class msg_t; +      //  Class manages a set of outbound pipes. It sends each messages to      //  each of them.      class dist_t @@ -39,26 +42,26 @@ namespace zmq          ~dist_t ();          //  Adds the pipe to the distributor object. -        void attach (class pipe_t *pipe_); +        void attach (zmq::pipe_t *pipe_);          //  Activates pipe that have previously reached high watermark. -        void activated (class pipe_t *pipe_); +        void activated (zmq::pipe_t *pipe_);          //  Mark the pipe as matching. Subsequent call to send_to_matching          //  will send message also to this pipe. -        void match (class pipe_t *pipe_); +        void match (zmq::pipe_t *pipe_);          //  Mark all pipes as non-matching.          void unmatch ();          //  Removes the pipe from the distributor object. -        void terminated (class pipe_t *pipe_); +        void terminated (zmq::pipe_t *pipe_);          //  Send the message to the matching outbound pipes. -        int send_to_matching (class msg_t *msg_, int flags_); +        int send_to_matching (zmq::msg_t *msg_, int flags_);          //  Send the message to all the outbound pipes. -        int send_to_all (class msg_t *msg_, int flags_); +        int send_to_all (zmq::msg_t *msg_, int flags_);          bool has_out (); @@ -66,13 +69,13 @@ namespace zmq          //  Write the message to the pipe. Make the pipe inactive if writing          //  fails. In such a case false is returned. -        bool write (class pipe_t *pipe_, class msg_t *msg_); +        bool write (zmq::pipe_t *pipe_, zmq::msg_t *msg_);          //  Put the message to all active pipes. -        void distribute (class msg_t *msg_, int flags_); +        void distribute (zmq::msg_t *msg_, int flags_);          //  List of outbound pipes. -        typedef array_t <class pipe_t, 2> pipes_t; +        typedef array_t <zmq::pipe_t, 2> pipes_t;          pipes_t pipes;          //  Number of all the pipes to send the next message to. diff --git a/src/encoder.hpp b/src/encoder.hpp index 8001c4e..f521b6b 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -33,6 +33,8 @@  namespace zmq  { +    class session_base_t; +      //  Helper base class for encoders. It implements the state machine that      //  fills the outgoing buffer. Derived classes should implement individual      //  state machine actions. @@ -173,14 +175,14 @@ namespace zmq          encoder_t (size_t bufsize_);          ~encoder_t (); -        void set_session (class session_base_t *session_); +        void set_session (zmq::session_base_t *session_);      private:          bool size_ready ();          bool message_ready (); -        class session_base_t *session; +        zmq::session_base_t *session;          msg_t in_progress;          unsigned char tmpbuf [10]; diff --git a/src/epoll.hpp b/src/epoll.hpp index 9bc31a5..3fd4262 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -36,6 +36,8 @@  namespace zmq  { +    struct i_poll_events; +      //  This class implements socket polling mechanism using the Linux-specific      //  epoll mechanism. @@ -49,7 +51,7 @@ namespace zmq          ~epoll_t ();          //  "poller" concept. -        handle_t add_fd (fd_t fd_, struct i_poll_events *events_); +        handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);          void rm_fd (handle_t handle_);          void set_pollin (handle_t handle_);          void reset_pollin (handle_t handle_); @@ -73,7 +75,7 @@ namespace zmq          {              fd_t fd;              epoll_event ev; -            struct i_poll_events *events; +            zmq::i_poll_events *events;          };          //  List of retired event sources. diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 19359b7..93cd8b1 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -25,6 +25,8 @@  namespace zmq  { +    class io_thread_t; +      //  Abstract interface to be implemented by various engines.      struct i_engine @@ -32,7 +34,7 @@ namespace zmq          virtual ~i_engine () {}          //  Plug the engine to the session. -        virtual void plug (class io_thread_t *io_thread_, +        virtual void plug (zmq::io_thread_t *io_thread_,              class session_base_t *session_) = 0;          //  Unplug the engine from the session. diff --git a/src/io_object.hpp b/src/io_object.hpp index bf7a625..689b221 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -31,6 +31,8 @@  namespace zmq  { +    class io_thread_t; +      //  Simple base class for objects that live in I/O threads.      //  It makes communication with the poller object easier and      //  makes defining unneeded event handlers unnecessary. @@ -39,12 +41,12 @@ namespace zmq      {      public: -        io_object_t (class io_thread_t *io_thread_ = NULL); +        io_object_t (zmq::io_thread_t *io_thread_ = NULL);          ~io_object_t ();          //  When migrating an object from one I/O thread to another, first          //  unplug it, then migrate it, then plug it to the new thread. -        void plug (class io_thread_t *io_thread_); +        void plug (zmq::io_thread_t *io_thread_);          void unplug ();      protected: diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 986c88d..00bd9a6 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -33,6 +33,8 @@  namespace zmq  { +    class ctx_t; +      //  Generic part of the I/O thread. Polling-mechanism-specific features      //  are implemented in separate "polling objects". @@ -40,7 +42,7 @@ namespace zmq      {      public: -        io_thread_t (class ctx_t *ctx_, uint32_t tid_); +        io_thread_t (zmq::ctx_t *ctx_, uint32_t tid_);          //  Clean-up. If the thread was started, it's neccessary to call 'stop'          //  before invoking destructor. Otherwise the destructor would hang up. diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index c02245a..d2d1752 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -34,14 +34,17 @@  namespace zmq  { +    class io_thread_t; +    class session_base_t; +      class ipc_connecter_t : public own_t, public io_object_t      {      public:          //  If 'delay' is true connecter first waits for a while, then starts          //  connection process. -        ipc_connecter_t (class io_thread_t *io_thread_, -            class session_base_t *session_, const options_t &options_, +        ipc_connecter_t (zmq::io_thread_t *io_thread_, +            zmq::session_base_t *session_, const options_t &options_,              const char *address_, bool delay_);          ~ipc_connecter_t (); @@ -101,7 +104,7 @@ namespace zmq          bool wait;          //  Reference to the session we belong to. -        class session_base_t *session; +        zmq::session_base_t *session;          //  Current reconnect ivl, updated for backoff strategy          int current_reconnect_ivl; diff --git a/src/ipc_listener.hpp b/src/ipc_listener.hpp index 0f06d23..e1f4817 100644 --- a/src/ipc_listener.hpp +++ b/src/ipc_listener.hpp @@ -35,12 +35,15 @@  namespace zmq  { +    class io_thread_t; +    class socket_base_t; +      class ipc_listener_t : public own_t, public io_object_t      {      public: -        ipc_listener_t (class io_thread_t *io_thread_, -            class socket_base_t *socket_, const options_t &options_); +        ipc_listener_t (zmq::io_thread_t *io_thread_, +            zmq::socket_base_t *socket_, const options_t &options_);          ~ipc_listener_t ();          //  Set address to listen on. @@ -76,7 +79,7 @@ namespace zmq          handle_t handle;          //  Socket the listerner belongs to. -        class socket_base_t *socket; +        zmq::socket_base_t *socket;          ipc_listener_t (const ipc_listener_t&);          const ipc_listener_t &operator = (const ipc_listener_t&); diff --git a/src/kqueue.hpp b/src/kqueue.hpp index 14f4e49..b1352df 100644 --- a/src/kqueue.hpp +++ b/src/kqueue.hpp @@ -35,6 +35,8 @@  namespace zmq  { +    struct i_poll_events; +      //  Implements socket polling mechanism using the BSD-specific      //  kqueue interface. @@ -48,7 +50,7 @@ namespace zmq          ~kqueue_t ();          //  "poller" concept. -        handle_t add_fd (fd_t fd_, struct i_poll_events *events_); +        handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);          void rm_fd (handle_t handle_);          void set_pollin (handle_t handle_);          void reset_pollin (handle_t handle_); @@ -79,7 +81,7 @@ namespace zmq              fd_t fd;              bool flag_pollin;              bool flag_pollout; -            i_poll_events *reactor; +            zmq::i_poll_events *reactor;          };          //  List of retired event sources. @@ -48,7 +48,7 @@ namespace zmq      private:          //  List of outbound pipes. -        typedef array_t <class pipe_t, 2> pipes_t; +        typedef array_t <pipe_t, 2> pipes_t;          pipes_t pipes;          //  Number of active pipes. All the active pipes are located at the diff --git a/src/mtrie.hpp b/src/mtrie.hpp index 8bbc22d..11ef940 100644 --- a/src/mtrie.hpp +++ b/src/mtrie.hpp @@ -29,6 +29,8 @@  namespace zmq  { +    class pipe_t; +      //  Multi-trie. Each node in the trie is a set of pointers to pipes.      class mtrie_t @@ -40,35 +42,35 @@ namespace zmq          //  Add key to the trie. Returns true if it's a new subscription          //  rather than a duplicate. -        bool add (unsigned char *prefix_, size_t size_, class pipe_t *pipe_); +        bool add (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_);          //  Remove all subscriptions for a specific peer from the trie.          //  If there are no subscriptions left on some topics, invoke the          //  supplied callback function. -        void rm (class pipe_t *pipe_, +        void rm (zmq::pipe_t *pipe_,              void (*func_) (unsigned char *data_, size_t size_, void *arg_),              void *arg_);          //  Remove specific subscription from the trie. Return true is it was          //  actually removed rather than de-duplicated. -        bool rm (unsigned char *prefix_, size_t size_, class pipe_t *pipe_); +        bool rm (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_);          //  Signal all the matching pipes.          void match (unsigned char *data_, size_t size_, -            void (*func_) (class pipe_t *pipe_, void *arg_), void *arg_); +            void (*func_) (zmq::pipe_t *pipe_, void *arg_), void *arg_);      private:          bool add_helper (unsigned char *prefix_, size_t size_, -            class pipe_t *pipe_); -        void rm_helper (class pipe_t *pipe_, unsigned char **buff_, +            zmq::pipe_t *pipe_); +        void rm_helper (zmq::pipe_t *pipe_, unsigned char **buff_,              size_t buffsize_, size_t maxbuffsize_,              void (*func_) (unsigned char *data_, size_t size_, void *arg_),              void *arg_);          bool rm_helper (unsigned char *prefix_, size_t size_, -            class pipe_t *pipe_); +            zmq::pipe_t *pipe_); -        typedef std::set <class pipe_t*> pipes_t; +        typedef std::set <zmq::pipe_t*> pipes_t;          pipes_t pipes;          unsigned char min; diff --git a/src/object.hpp b/src/object.hpp index f832596..932cea7 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -26,6 +26,17 @@  namespace zmq  { + +    struct i_engine; +    struct endpoint_t; +    struct command_t; +    class ctx_t; +    class pipe_t; +    class socket_base_t; +    class session_base_t; +    class io_thread_t; +    class own_t; +      //  Base class for all objects that participate in inter-thread      //  communication. @@ -33,51 +44,51 @@ namespace zmq      {      public: -        object_t (class ctx_t *ctx_, uint32_t tid_); +        object_t (zmq::ctx_t *ctx_, uint32_t tid_);          object_t (object_t *parent_);          virtual ~object_t ();          uint32_t get_tid ();          ctx_t *get_ctx (); -        void process_command (struct command_t &cmd_); +        void process_command (zmq::command_t &cmd_);      protected:          //  Using following function, socket is able to access global          //  repository of inproc endpoints. -        int register_endpoint (const char *addr_, struct endpoint_t &endpoint_); -        void unregister_endpoints (class socket_base_t *socket_); -        struct endpoint_t find_endpoint (const char *addr_); -        void destroy_socket (class socket_base_t *socket_); +        int register_endpoint (const char *addr_, zmq::endpoint_t &endpoint_); +        void unregister_endpoints (zmq::socket_base_t *socket_); +        zmq::endpoint_t find_endpoint (const char *addr_); +        void destroy_socket (zmq::socket_base_t *socket_);          //  Logs an message.          void log (const char *format_, ...);          //  Chooses least loaded I/O thread. -        class io_thread_t *choose_io_thread (uint64_t affinity_); +        zmq::io_thread_t *choose_io_thread (uint64_t affinity_);          //  Derived object can use these functions to send commands          //  to other objects.          void send_stop (); -        void send_plug (class own_t *destination_, +        void send_plug (zmq::own_t *destination_,              bool inc_seqnum_ = true); -        void send_own (class own_t *destination_, -            class own_t *object_); -        void send_attach (class session_base_t *destination_, -             struct i_engine *engine_, bool inc_seqnum_ = true); -        void send_bind (class own_t *destination_, class pipe_t *pipe_, +        void send_own (zmq::own_t *destination_, +            zmq::own_t *object_); +        void send_attach (zmq::session_base_t *destination_, +             zmq::i_engine *engine_, bool inc_seqnum_ = true); +        void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_,               bool inc_seqnum_ = true); -        void send_activate_read (class pipe_t *destination_); -        void send_activate_write (class pipe_t *destination_, +        void send_activate_read (zmq::pipe_t *destination_); +        void send_activate_write (zmq::pipe_t *destination_,               uint64_t msgs_read_); -        void send_hiccup (class pipe_t *destination_, void *pipe_); -        void send_pipe_term (class pipe_t *destination_); -        void send_pipe_term_ack (class pipe_t *destination_); -        void send_term_req (class own_t *destination_, -            class own_t *object_); -        void send_term (class own_t *destination_, int linger_); -        void send_term_ack (class own_t *destination_); -        void send_reap (class socket_base_t *socket_); +        void send_hiccup (zmq::pipe_t *destination_, void *pipe_); +        void send_pipe_term (zmq::pipe_t *destination_); +        void send_pipe_term_ack (zmq::pipe_t *destination_); +        void send_term_req (zmq::own_t *destination_, +            zmq::own_t *object_); +        void send_term (zmq::own_t *destination_, int linger_); +        void send_term_ack (zmq::own_t *destination_); +        void send_reap (zmq::socket_base_t *socket_);          void send_reaped ();          void send_done (); @@ -85,18 +96,18 @@ namespace zmq          //  called when command arrives from another thread.          virtual void process_stop ();          virtual void process_plug (); -        virtual void process_own (class own_t *object_); -        virtual void process_attach (struct i_engine *engine_); -        virtual void process_bind (class pipe_t *pipe_); +        virtual void process_own (zmq::own_t *object_); +        virtual void process_attach (zmq::i_engine *engine_); +        virtual void process_bind (zmq::pipe_t *pipe_);          virtual void process_activate_read ();          virtual void process_activate_write (uint64_t msgs_read_);          virtual void process_hiccup (void *pipe_);          virtual void process_pipe_term ();          virtual void process_pipe_term_ack (); -        virtual void process_term_req (class own_t *object_); +        virtual void process_term_req (zmq::own_t *object_);          virtual void process_term (int linger_);          virtual void process_term_ack (); -        virtual void process_reap (class socket_base_t *socket_); +        virtual void process_reap (zmq::socket_base_t *socket_);          virtual void process_reaped ();          //  Special handler called after a command that requires a seqnum @@ -107,7 +118,7 @@ namespace zmq      private:          //  Context provides access to the global state. -        class ctx_t *ctx; +        zmq::ctx_t *ctx;          //  Thread ID of the thread the object belongs to.          uint32_t tid; diff --git a/src/own.hpp b/src/own.hpp index ad5c452..2969ffd 100644 --- a/src/own.hpp +++ b/src/own.hpp @@ -32,6 +32,9 @@  namespace zmq  { +    class ctx_t; +    class io_thread_t; +      //  Base class for objects forming a part of ownership hierarchy.      //  It handles initialisation and destruction of such objects. @@ -44,10 +47,10 @@ namespace zmq          //  The object is not living within an I/O thread. It has it's own          //  thread outside of 0MQ infrastructure. -        own_t (class ctx_t *parent_, uint32_t tid_); +        own_t (zmq::ctx_t *parent_, uint32_t tid_);          //  The object is living within I/O thread. -        own_t (class io_thread_t *io_thread_, const options_t &options_); +        own_t (zmq::io_thread_t *io_thread_, const options_t &options_);          //  When another owned object wants to send command to this object          //  it calls this function to let it know it should not shut down diff --git a/src/pair.hpp b/src/pair.hpp index 67de2fd..fb447f1 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -28,27 +28,32 @@  namespace zmq  { +    class ctx_t; +    class msg_t; +    class pipe_t; +    class io_thread_t; +      class pair_t :          public socket_base_t      {      public: -        pair_t (class ctx_t *parent_, uint32_t tid_); +        pair_t (zmq::ctx_t *parent_, uint32_t tid_);          ~pair_t ();          //  Overloads of functions from socket_base_t. -        void xattach_pipe (class pipe_t *pipe_); -        int xsend (class msg_t *msg_, int flags_); -        int xrecv (class msg_t *msg_, int flags_); +        void xatta | 
