From c79abee6bcaa996f50be71bd1d3075e3affb469d Mon Sep 17 00:00:00 2001 From: AJ Lewis Date: Wed, 9 Nov 2011 15:22:20 +0100 Subject: Get AIX 6.1 compiling again by making msg_t class explicit Older versions of gcc have problems with in-line forward declarations when there's a naming conflict with a global symbol. Signed-off-by: AJ Lewis Expand the original patch to all such forward declarations. Signed-off-by: Martin Sustrik --- src/command.hpp | 16 ++++++++---- src/ctx.hpp | 22 ++++++++++------ src/decoder.hpp | 6 +++-- src/devpoll.hpp | 6 +++-- src/dist.hpp | 21 +++++++++------- src/encoder.hpp | 6 +++-- src/epoll.hpp | 6 +++-- src/i_engine.hpp | 4 ++- src/io_object.hpp | 6 +++-- src/io_thread.hpp | 4 ++- src/ipc_connecter.hpp | 9 ++++--- src/ipc_listener.hpp | 9 ++++--- src/kqueue.hpp | 6 +++-- src/lb.hpp | 2 +- src/mtrie.hpp | 18 ++++++++------ src/object.hpp | 69 +++++++++++++++++++++++++++++---------------------- src/own.hpp | 7 ++++-- src/pair.hpp | 25 +++++++++++-------- src/pgm_receiver.hpp | 11 +++++--- src/pgm_sender.hpp | 9 ++++--- src/pipe.hpp | 17 +++++++------ src/poll.hpp | 6 +++-- src/poller_base.hpp | 8 +++--- src/pub.hpp | 13 +++++++--- src/pull.hpp | 19 ++++++++------ src/push.hpp | 19 ++++++++------ src/reaper.hpp | 7 ++++-- src/rep.hpp | 15 +++++++---- src/req.hpp | 15 +++++++---- src/select.hpp | 6 +++-- src/session_base.hpp | 34 ++++++++++++++----------- src/socket_base.hpp | 22 +++++++++------- src/stream_engine.hpp | 11 +++++--- src/sub.hpp | 13 +++++++--- src/tcp_connecter.hpp | 9 ++++--- src/tcp_listener.hpp | 9 ++++--- src/xpub.hpp | 25 +++++++++++-------- src/xrep.hpp | 25 +++++++++++-------- src/xreq.hpp | 24 +++++++++++------- src/xsub.hpp | 23 +++++++++-------- 40 files changed, 361 insertions(+), 221 deletions(-) diff --git a/src/command.hpp b/src/command.hpp index ecf2d93..8378369 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -27,12 +27,18 @@ namespace zmq { + class object_t; + class own_t; + struct i_engine; + class pipe_t; + class socket_base_t; + // This structure defines the commands that can be sent between threads. struct command_t { // Object to process the command. - class object_t *destination; + zmq::object_t *destination; enum type_t { @@ -67,7 +73,7 @@ namespace zmq // Sent to socket to let it know about the newly created object. struct { - class own_t *object; + zmq::own_t *object; } own; // Attach the engine to the session. If engine is NULL, it informs @@ -79,7 +85,7 @@ namespace zmq // Sent from session to socket to establish pipe(s) between them. // Caller have used inc_seqnum beforehand sending the command. struct { - class pipe_t *pipe; + zmq::pipe_t *pipe; } bind; // Sent by pipe writer to inform dormant pipe reader that there @@ -112,7 +118,7 @@ namespace zmq // Sent by I/O object ot the socket to request the shutdown of // the I/O object. struct { - class own_t *object; + zmq::own_t *object; } term_req; // Sent by socket to I/O object to start its shutdown. @@ -128,7 +134,7 @@ namespace zmq // Transfers the ownership of the closed socket // to the reaper thread. struct { - class socket_base_t *socket; + zmq::socket_base_t *socket; } reap; // Closed socket notifies the reaper that it's already deallocated. diff --git a/src/ctx.hpp b/src/ctx.hpp index 619d57e..e09149d 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -36,6 +36,12 @@ namespace zmq { + + class object_t; + class io_thread_t; + class socket_base_t; + class reaper_t; + // Information associated with inproc endpoint. Note that endpoint options // are registered as well so that the peer can access them without a need // for synchronisation, handshaking or similar. @@ -66,8 +72,8 @@ namespace zmq int terminate (); // Create and destroy a socket. - class socket_base_t *create_socket (int type_); - void destroy_socket (class socket_base_t *socket_); + zmq::socket_base_t *create_socket (int type_); + void destroy_socket (zmq::socket_base_t *socket_); // Send command to the destination thread. void send_command (uint32_t tid_, const command_t &command_); @@ -75,14 +81,14 @@ namespace zmq // Returns the I/O thread that is the least busy at the moment. // Affinity specifies which I/O threads are eligible (0 = all). // Returns NULL is no I/O thread is available. - class io_thread_t *choose_io_thread (uint64_t affinity_); + zmq::io_thread_t *choose_io_thread (uint64_t affinity_); // Returns reaper thread object. - class object_t *get_reaper (); + zmq::object_t *get_reaper (); // Management of inproc endpoints. int register_endpoint (const char *addr_, endpoint_t &endpoint_); - void unregister_endpoints (class socket_base_t *socket_); + void unregister_endpoints (zmq::socket_base_t *socket_); endpoint_t find_endpoint (const char *addr_); // Logging. @@ -120,10 +126,10 @@ namespace zmq mutex_t slot_sync; // The reaper thread. - class reaper_t *reaper; + zmq::reaper_t *reaper; // I/O threads. - typedef std::vector io_threads_t; + typedef std::vector io_threads_t; io_threads_t io_threads; // Array of pointers to mailboxes for both application and I/O threads. @@ -142,7 +148,7 @@ namespace zmq // PUB socket for logging. The socket is shared among all the threads, // thus it is synchronised by a mutex. - class socket_base_t *log_socket; + zmq::socket_base_t *log_socket; mutex_t log_sync; ctx_t (const ctx_t&); diff --git a/src/decoder.hpp b/src/decoder.hpp index c6f9100..4afd018 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -34,6 +34,8 @@ namespace zmq { + class session_base_t; + // Helper base class for decoders that know the amount of data to read // in advance at any moment. Knowing the amount in advance is a property // of the protocol used. 0MQ framing protocol is based size-prefixed @@ -193,7 +195,7 @@ namespace zmq decoder_t (size_t bufsize_, int64_t maxmsgsize_); ~decoder_t (); - void set_session (class session_base_t *session_); + void set_session (zmq::session_base_t *session_); private: @@ -202,7 +204,7 @@ namespace zmq bool flags_ready (); bool message_ready (); - class session_base_t *session; + zmq::session_base_t *session; unsigned char tmpbuf [8]; msg_t in_progress; diff --git a/src/devpoll.hpp b/src/devpoll.hpp index 1de1af0..382be0f 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -35,6 +35,8 @@ namespace zmq { + struct i_poll_events; + // Implements socket polling mechanism using the "/dev/poll" interface. class devpoll_t : public poller_base_t @@ -47,7 +49,7 @@ namespace zmq ~devpoll_t (); // "poller" concept. - handle_t add_fd (fd_t fd_, struct i_poll_events *events_); + handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); @@ -70,7 +72,7 @@ namespace zmq struct fd_entry_t { short events; - struct i_poll_events *reactor; + zmq::i_poll_events *reactor; bool valid; bool accepted; }; diff --git a/src/dist.hpp b/src/dist.hpp index a72de6e..be86ab2 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -29,6 +29,9 @@ namespace zmq { + class pipe_t; + class msg_t; + // Class manages a set of outbound pipes. It sends each messages to // each of them. class dist_t @@ -39,26 +42,26 @@ namespace zmq ~dist_t (); // Adds the pipe to the distributor object. - void attach (class pipe_t *pipe_); + void attach (zmq::pipe_t *pipe_); // Activates pipe that have previously reached high watermark. - void activated (class pipe_t *pipe_); + void activated (zmq::pipe_t *pipe_); // Mark the pipe as matching. Subsequent call to send_to_matching // will send message also to this pipe. - void match (class pipe_t *pipe_); + void match (zmq::pipe_t *pipe_); // Mark all pipes as non-matching. void unmatch (); // Removes the pipe from the distributor object. - void terminated (class pipe_t *pipe_); + void terminated (zmq::pipe_t *pipe_); // Send the message to the matching outbound pipes. - int send_to_matching (class msg_t *msg_, int flags_); + int send_to_matching (zmq::msg_t *msg_, int flags_); // Send the message to all the outbound pipes. - int send_to_all (class msg_t *msg_, int flags_); + int send_to_all (zmq::msg_t *msg_, int flags_); bool has_out (); @@ -66,13 +69,13 @@ namespace zmq // Write the message to the pipe. Make the pipe inactive if writing // fails. In such a case false is returned. - bool write (class pipe_t *pipe_, class msg_t *msg_); + bool write (zmq::pipe_t *pipe_, zmq::msg_t *msg_); // Put the message to all active pipes. - void distribute (class msg_t *msg_, int flags_); + void distribute (zmq::msg_t *msg_, int flags_); // List of outbound pipes. - typedef array_t pipes_t; + typedef array_t pipes_t; pipes_t pipes; // Number of all the pipes to send the next message to. diff --git a/src/encoder.hpp b/src/encoder.hpp index 8001c4e..f521b6b 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -33,6 +33,8 @@ namespace zmq { + class session_base_t; + // Helper base class for encoders. It implements the state machine that // fills the outgoing buffer. Derived classes should implement individual // state machine actions. @@ -173,14 +175,14 @@ namespace zmq encoder_t (size_t bufsize_); ~encoder_t (); - void set_session (class session_base_t *session_); + void set_session (zmq::session_base_t *session_); private: bool size_ready (); bool message_ready (); - class session_base_t *session; + zmq::session_base_t *session; msg_t in_progress; unsigned char tmpbuf [10]; diff --git a/src/epoll.hpp b/src/epoll.hpp index 9bc31a5..3fd4262 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -36,6 +36,8 @@ namespace zmq { + struct i_poll_events; + // This class implements socket polling mechanism using the Linux-specific // epoll mechanism. @@ -49,7 +51,7 @@ namespace zmq ~epoll_t (); // "poller" concept. - handle_t add_fd (fd_t fd_, struct i_poll_events *events_); + handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); @@ -73,7 +75,7 @@ namespace zmq { fd_t fd; epoll_event ev; - struct i_poll_events *events; + zmq::i_poll_events *events; }; // List of retired event sources. diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 19359b7..93cd8b1 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -25,6 +25,8 @@ namespace zmq { + class io_thread_t; + // Abstract interface to be implemented by various engines. struct i_engine @@ -32,7 +34,7 @@ namespace zmq virtual ~i_engine () {} // Plug the engine to the session. - virtual void plug (class io_thread_t *io_thread_, + virtual void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_) = 0; // Unplug the engine from the session. diff --git a/src/io_object.hpp b/src/io_object.hpp index bf7a625..689b221 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -31,6 +31,8 @@ namespace zmq { + class io_thread_t; + // Simple base class for objects that live in I/O threads. // It makes communication with the poller object easier and // makes defining unneeded event handlers unnecessary. @@ -39,12 +41,12 @@ namespace zmq { public: - io_object_t (class io_thread_t *io_thread_ = NULL); + io_object_t (zmq::io_thread_t *io_thread_ = NULL); ~io_object_t (); // When migrating an object from one I/O thread to another, first // unplug it, then migrate it, then plug it to the new thread. - void plug (class io_thread_t *io_thread_); + void plug (zmq::io_thread_t *io_thread_); void unplug (); protected: diff --git a/src/io_thread.hpp b/src/io_thread.hpp index 986c88d..00bd9a6 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -33,6 +33,8 @@ namespace zmq { + class ctx_t; + // Generic part of the I/O thread. Polling-mechanism-specific features // are implemented in separate "polling objects". @@ -40,7 +42,7 @@ namespace zmq { public: - io_thread_t (class ctx_t *ctx_, uint32_t tid_); + io_thread_t (zmq::ctx_t *ctx_, uint32_t tid_); // Clean-up. If the thread was started, it's neccessary to call 'stop' // before invoking destructor. Otherwise the destructor would hang up. diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index c02245a..d2d1752 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -34,14 +34,17 @@ namespace zmq { + class io_thread_t; + class session_base_t; + class ipc_connecter_t : public own_t, public io_object_t { public: // If 'delay' is true connecter first waits for a while, then starts // connection process. - ipc_connecter_t (class io_thread_t *io_thread_, - class session_base_t *session_, const options_t &options_, + ipc_connecter_t (zmq::io_thread_t *io_thread_, + zmq::session_base_t *session_, const options_t &options_, const char *address_, bool delay_); ~ipc_connecter_t (); @@ -101,7 +104,7 @@ namespace zmq bool wait; // Reference to the session we belong to. - class session_base_t *session; + zmq::session_base_t *session; // Current reconnect ivl, updated for backoff strategy int current_reconnect_ivl; diff --git a/src/ipc_listener.hpp b/src/ipc_listener.hpp index 0f06d23..e1f4817 100644 --- a/src/ipc_listener.hpp +++ b/src/ipc_listener.hpp @@ -35,12 +35,15 @@ namespace zmq { + class io_thread_t; + class socket_base_t; + class ipc_listener_t : public own_t, public io_object_t { public: - ipc_listener_t (class io_thread_t *io_thread_, - class socket_base_t *socket_, const options_t &options_); + ipc_listener_t (zmq::io_thread_t *io_thread_, + zmq::socket_base_t *socket_, const options_t &options_); ~ipc_listener_t (); // Set address to listen on. @@ -76,7 +79,7 @@ namespace zmq handle_t handle; // Socket the listerner belongs to. - class socket_base_t *socket; + zmq::socket_base_t *socket; ipc_listener_t (const ipc_listener_t&); const ipc_listener_t &operator = (const ipc_listener_t&); diff --git a/src/kqueue.hpp b/src/kqueue.hpp index 14f4e49..b1352df 100644 --- a/src/kqueue.hpp +++ b/src/kqueue.hpp @@ -35,6 +35,8 @@ namespace zmq { + struct i_poll_events; + // Implements socket polling mechanism using the BSD-specific // kqueue interface. @@ -48,7 +50,7 @@ namespace zmq ~kqueue_t (); // "poller" concept. - handle_t add_fd (fd_t fd_, struct i_poll_events *events_); + handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); @@ -79,7 +81,7 @@ namespace zmq fd_t fd; bool flag_pollin; bool flag_pollout; - i_poll_events *reactor; + zmq::i_poll_events *reactor; }; // List of retired event sources. diff --git a/src/lb.hpp b/src/lb.hpp index 1de8549..91189f5 100644 --- a/src/lb.hpp +++ b/src/lb.hpp @@ -48,7 +48,7 @@ namespace zmq private: // List of outbound pipes. - typedef array_t pipes_t; + typedef array_t pipes_t; pipes_t pipes; // Number of active pipes. All the active pipes are located at the diff --git a/src/mtrie.hpp b/src/mtrie.hpp index 8bbc22d..11ef940 100644 --- a/src/mtrie.hpp +++ b/src/mtrie.hpp @@ -29,6 +29,8 @@ namespace zmq { + class pipe_t; + // Multi-trie. Each node in the trie is a set of pointers to pipes. class mtrie_t @@ -40,35 +42,35 @@ namespace zmq // Add key to the trie. Returns true if it's a new subscription // rather than a duplicate. - bool add (unsigned char *prefix_, size_t size_, class pipe_t *pipe_); + bool add (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_); // Remove all subscriptions for a specific peer from the trie. // If there are no subscriptions left on some topics, invoke the // supplied callback function. - void rm (class pipe_t *pipe_, + void rm (zmq::pipe_t *pipe_, void (*func_) (unsigned char *data_, size_t size_, void *arg_), void *arg_); // Remove specific subscription from the trie. Return true is it was // actually removed rather than de-duplicated. - bool rm (unsigned char *prefix_, size_t size_, class pipe_t *pipe_); + bool rm (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_); // Signal all the matching pipes. void match (unsigned char *data_, size_t size_, - void (*func_) (class pipe_t *pipe_, void *arg_), void *arg_); + void (*func_) (zmq::pipe_t *pipe_, void *arg_), void *arg_); private: bool add_helper (unsigned char *prefix_, size_t size_, - class pipe_t *pipe_); - void rm_helper (class pipe_t *pipe_, unsigned char **buff_, + zmq::pipe_t *pipe_); + void rm_helper (zmq::pipe_t *pipe_, unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_, void (*func_) (unsigned char *data_, size_t size_, void *arg_), void *arg_); bool rm_helper (unsigned char *prefix_, size_t size_, - class pipe_t *pipe_); + zmq::pipe_t *pipe_); - typedef std::set pipes_t; + typedef std::set pipes_t; pipes_t pipes; unsigned char min; diff --git a/src/object.hpp b/src/object.hpp index f832596..932cea7 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -26,6 +26,17 @@ namespace zmq { + + struct i_engine; + struct endpoint_t; + struct command_t; + class ctx_t; + class pipe_t; + class socket_base_t; + class session_base_t; + class io_thread_t; + class own_t; + // Base class for all objects that participate in inter-thread // communication. @@ -33,51 +44,51 @@ namespace zmq { public: - object_t (class ctx_t *ctx_, uint32_t tid_); + object_t (zmq::ctx_t *ctx_, uint32_t tid_); object_t (object_t *parent_); virtual ~object_t (); uint32_t get_tid (); ctx_t *get_ctx (); - void process_command (struct command_t &cmd_); + void process_command (zmq::command_t &cmd_); protected: // Using following function, socket is able to access global // repository of inproc endpoints. - int register_endpoint (const char *addr_, struct endpoint_t &endpoint_); - void unregister_endpoints (class socket_base_t *socket_); - struct endpoint_t find_endpoint (const char *addr_); - void destroy_socket (class socket_base_t *socket_); + int register_endpoint (const char *addr_, zmq::endpoint_t &endpoint_); + void unregister_endpoints (zmq::socket_base_t *socket_); + zmq::endpoint_t find_endpoint (const char *addr_); + void destroy_socket (zmq::socket_base_t *socket_); // Logs an message. void log (const char *format_, ...); // Chooses least loaded I/O thread. - class io_thread_t *choose_io_thread (uint64_t affinity_); + zmq::io_thread_t *choose_io_thread (uint64_t affinity_); // Derived object can use these functions to send commands // to other objects. void send_stop (); - void send_plug (class own_t *destination_, + void send_plug (zmq::own_t *destination_, bool inc_seqnum_ = true); - void send_own (class own_t *destination_, - class own_t *object_); - void send_attach (class session_base_t *destination_, - struct i_engine *engine_, bool inc_seqnum_ = true); - void send_bind (class own_t *destination_, class pipe_t *pipe_, + void send_own (zmq::own_t *destination_, + zmq::own_t *object_); + void send_attach (zmq::session_base_t *destination_, + zmq::i_engine *engine_, bool inc_seqnum_ = true); + void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_ = true); - void send_activate_read (class pipe_t *destination_); - void send_activate_write (class pipe_t *destination_, + void send_activate_read (zmq::pipe_t *destination_); + void send_activate_write (zmq::pipe_t *destination_, uint64_t msgs_read_); - void send_hiccup (class pipe_t *destination_, void *pipe_); - void send_pipe_term (class pipe_t *destination_); - void send_pipe_term_ack (class pipe_t *destination_); - void send_term_req (class own_t *destination_, - class own_t *object_); - void send_term (class own_t *destination_, int linger_); - void send_term_ack (class own_t *destination_); - void send_reap (class socket_base_t *socket_); + void send_hiccup (zmq::pipe_t *destination_, void *pipe_); + void send_pipe_term (zmq::pipe_t *destination_); + void send_pipe_term_ack (zmq::pipe_t *destination_); + void send_term_req (zmq::own_t *destination_, + zmq::own_t *object_); + void send_term (zmq::own_t *destination_, int linger_); + void send_term_ack (zmq::own_t *destination_); + void send_reap (zmq::socket_base_t *socket_); void send_reaped (); void send_done (); @@ -85,18 +96,18 @@ namespace zmq // called when command arrives from another thread. virtual void process_stop (); virtual void process_plug (); - virtual void process_own (class own_t *object_); - virtual void process_attach (struct i_engine *engine_); - virtual void process_bind (class pipe_t *pipe_); + virtual void process_own (zmq::own_t *object_); + virtual void process_attach (zmq::i_engine *engine_); + virtual void process_bind (zmq::pipe_t *pipe_); virtual void process_activate_read (); virtual void process_activate_write (uint64_t msgs_read_); virtual void process_hiccup (void *pipe_); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); - virtual void process_term_req (class own_t *object_); + virtual void process_term_req (zmq::own_t *object_); virtual void process_term (int linger_); virtual void process_term_ack (); - virtual void process_reap (class socket_base_t *socket_); + virtual void process_reap (zmq::socket_base_t *socket_); virtual void process_reaped (); // Special handler called after a command that requires a seqnum @@ -107,7 +118,7 @@ namespace zmq private: // Context provides access to the global state. - class ctx_t *ctx; + zmq::ctx_t *ctx; // Thread ID of the thread the object belongs to. uint32_t tid; diff --git a/src/own.hpp b/src/own.hpp index ad5c452..2969ffd 100644 --- a/src/own.hpp +++ b/src/own.hpp @@ -32,6 +32,9 @@ namespace zmq { + class ctx_t; + class io_thread_t; + // Base class for objects forming a part of ownership hierarchy. // It handles initialisation and destruction of such objects. @@ -44,10 +47,10 @@ namespace zmq // The object is not living within an I/O thread. It has it's own // thread outside of 0MQ infrastructure. - own_t (class ctx_t *parent_, uint32_t tid_); + own_t (zmq::ctx_t *parent_, uint32_t tid_); // The object is living within I/O thread. - own_t (class io_thread_t *io_thread_, const options_t &options_); + own_t (zmq::io_thread_t *io_thread_, const options_t &options_); // When another owned object wants to send command to this object // it calls this function to let it know it should not shut down diff --git a/src/pair.hpp b/src/pair.hpp index 67de2fd..fb447f1 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -28,27 +28,32 @@ namespace zmq { + class ctx_t; + class msg_t; + class pipe_t; + class io_thread_t; + class pair_t : public socket_base_t { public: - pair_t (class ctx_t *parent_, uint32_t tid_); + pair_t (zmq::ctx_t *parent_, uint32_t tid_); ~pair_t (); // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_); - int xsend (class msg_t *msg_, int flags_); - int xrecv (class msg_t *msg_, int flags_); + void xattach_pipe (zmq::pipe_t *pipe_); + int xsend (zmq::msg_t *msg_, int flags_); + int xrecv (zmq::msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); - void xread_activated (class pipe_t *pipe_); - void xwrite_activated (class pipe_t *pipe_); - void xterminated (class pipe_t *pipe_); + void xread_activated (zmq::pipe_t *pipe_); + void xwrite_activated (zmq::pipe_t *pipe_); + void xterminated (zmq::pipe_t *pipe_); private: - class pipe_t *pipe; + zmq::pipe_t *pipe; pair_t (const pair_t&); const pair_t &operator = (const pair_t&); @@ -58,8 +63,8 @@ namespace zmq { public: - pair_session_t (class io_thread_t *io_thread_, bool connect_, - class socket_base_t *socket_, const options_t &options_, + pair_session_t (zmq::io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~pair_session_t (); diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 3c1d394..7c97d5a 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -43,19 +43,22 @@ namespace zmq { + class io_thread_t; + class session_base_t; + class pgm_receiver_t : public io_object_t, public i_engine { public: - pgm_receiver_t (class io_thread_t *parent_, const options_t &options_); + pgm_receiver_t (zmq::io_thread_t *parent_, const options_t &options_); ~pgm_receiver_t (); int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. - void plug (class io_thread_t *io_thread_, - class session_base_t *session_); + void plug (zmq::io_thread_t *io_thread_, + zmq::session_base_t *session_); void unplug (); void terminate (); void activate_in (); @@ -108,7 +111,7 @@ namespace zmq options_t options; // Associated session. - class session_base_t *session; + zmq::session_base_t *session; // Most recently used decoder. decoder_t *mru_decoder; diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index d8f046d..99405f0 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -41,19 +41,22 @@ namespace zmq { + class io_thread_t; + class session_base_t; + class pgm_sender_t : public io_object_t, public i_engine { public: - pgm_sender_t (class io_thread_t *parent_, const options_t &options_); + pgm_sender_t (zmq::io_thread_t *parent_, const options_t &options_); ~pgm_sender_t (); int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. - void plug (class io_thread_t *io_thread_, - class session_base_t *session_); + void plug (zmq::io_thread_t *io_thread_, + zmq::session_base_t *session_); void unplug (); void terminate (); void activate_in (); diff --git a/src/pipe.hpp b/src/pipe.hpp index 75a2021..f438257 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -34,23 +34,26 @@ namespace zmq { + class object_t; + class pipe_t; + // Create a pipepair for bi-directional transfer of messages. // First HWM is for messages passed from first pipe to the second pipe. // Second HWM is for messages passed from second pipe to the first pipe. // Delay specifies how the pipe behaves when the peer terminates. If true // pipe receives all the pending messages before terminating, otherwise it // terminates straight away. - int pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], + int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]); struct i_pipe_events { virtual ~i_pipe_events () {} - virtual void read_activated (class pipe_t *pipe_) = 0; - virtual void write_activated (class pipe_t *pipe_) = 0; - virtual void hiccuped (class pipe_t *pipe_) = 0; - virtual void terminated (class pipe_t *pipe_) = 0; + virtual void read_activated (zmq::pipe_t *pipe_) = 0; + virtual void write_activated (zmq::pipe_t *pipe_) = 0; + virtual void hiccuped (zmq::pipe_t *pipe_) = 0; + virtual void terminated (zmq::pipe_t *pipe_) = 0; }; // Note that pipe can be stored in three different arrays. @@ -64,8 +67,8 @@ namespace zmq public array_item_t <3> { // This allows pipepair to create pipe objects. - friend int pipepair (class object_t *parents_ [2], - class pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]); + friend int pipepair (zmq::object_t *parents_ [2], + zmq::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]); public: diff --git a/src/poll.hpp b/src/poll.hpp index 700256d..9a7fbc3 100644 --- a/src/poll.hpp +++ b/src/poll.hpp @@ -37,6 +37,8 @@ namespace zmq { + struct i_poll_events; + // Implements socket polling mechanism using the POSIX.1-2001 // poll() system call. @@ -50,7 +52,7 @@ namespace zmq ~poll_t (); // "poller" concept. - handle_t add_fd (fd_t fd_, struct i_poll_events *events_); + handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); @@ -70,7 +72,7 @@ namespace zmq struct fd_entry_t { fd_t index; - struct i_poll_events *events; + zmq::i_poll_events *events; }; // This table stores data for registered descriptors. diff --git a/src/poller_base.hpp b/src/poller_base.hpp index 808ed38..c184358 100644 --- a/src/poller_base.hpp +++ b/src/poller_base.hpp @@ -29,6 +29,8 @@ namespace zmq { + struct i_poll_events; + class poller_base_t { public: @@ -43,10 +45,10 @@ namespace zmq // Add a timeout to expire in timeout_ milliseconds. After the // expiration timer_event on sink_ object will be called with // argument set to id_. - void add_timer (int timeout_, struct i_poll_events *sink_, int id_); + void add_timer (int timeout_, zmq::i_poll_events *sink_, int id_); // Cancel the timer created by sink_ object with ID equal to id_. - void cancel_timer (struct i_poll_events *sink_, int id_); + void cancel_timer (zmq::i_poll_events *sink_, int id_); protected: @@ -65,7 +67,7 @@ namespace zmq // List of active timers. struct timer_info_t { - struct i_poll_events *sink; + zmq::i_poll_events *sink; int id; }; typedef std::multimap timers_t; diff --git a/src/pub.hpp b/src/pub.hpp index d418fd4..4672600 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -27,15 +27,20 @@ namespace zmq { + class ctx_t; + class io_thread_t; + class socket_base_t; + class msg_t; + class pub_t : public xpub_t { public: - pub_t (class ctx_t *parent_, uint32_t tid_); + pub_t (zmq::ctx_t *parent_, uint32_t tid_); ~pub_t (); // Implementations of virtual functions from socket_base_t. - int xrecv (class msg_t *msg_, int flags_); + int xrecv (zmq::msg_t *msg_, int flags_); bool xhas_in (); private: @@ -48,8 +53,8 @@ namespace zmq { public: - pub_session_t (class io_thread_t *io_thread_, bool connect_, - class socket_base_t *socket_, const options_t &options_, + pub_session_t (zmq::io_thread_t *io_thread_, bool connect_, + zmq::socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~pub_session_t (); diff --git a/src/pull.hpp b/src/pull.hpp index fa36d49..da3bee1 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -29,22 +29,27 @@ namespace zmq { + class ctx_t; + class pipe_t; + class msg_t; + class io_thread_t; + class pull_t : public socket_base_t { public: - pull_t (class ctx_t *parent_, uint32_t tid_); + pull_t (zmq::ctx_t *parent_, uint32_t tid_); ~pull_t (); protected: // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_); - int xrecv (class msg_t *msg_, int flags_); + void xattach_pipe (zmq::pipe_t *pipe_); + int xrecv (zmq::msg_t *msg_, int flags_); bool xhas_in (); - void xread_activated (class pipe_t *pipe_); - void xterminated (class pipe_t *pipe_); + void xread_activated (zmq::pipe_t *pipe_); + void xterminated (zmq::pipe_t *pipe_); private: @@ -60,8 +65,8 @@ namespace zmq { public: - pull_session_t (class io_thread_t *io_thread_, bool connect_, - class socket_base_t *socket_, const options_t &options_, + pull_session_t (zmq::io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~pull_session_t (); diff --git a/src/push.hpp b/src/push.hpp index ea93693..edee19d 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -29,22 +29,27 @@ namespace zmq { + class ctx_t; + class pipe_t; + class msg_t; + class io_thread_t; + class push_t : public socket_base_t { public: - push_t (class ctx_t *parent_, uint32_t tid_); + push_t (zmq::ctx_t *parent_, uint32_t tid_); ~push_t (); protected: // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_); - int xsend (class msg_t *msg_, int flags_); + void xattach_pipe (zmq::pipe_t *pipe_); + int xsend (zmq::msg_t *msg_, int flags_); bool xhas_out (); - void xwrite_activated (class pipe_t *pipe_); - void xterminated (class pipe_t *pipe_); + void xwrite_activated (zmq::pipe_t *pipe_); + void xterminated (zmq::pipe_t *pipe_); private: @@ -59,8 +64,8 @@ namespace zmq { public: - push_session_t (class io_thread_t *io_thread_, bool connect_, - class socket_base_t *socket_, const options_t &options_, + push_session_t (zmq::io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~push_session_t (); diff --git a/src/reaper.hpp b/src/reaper.hpp index 1c1533f..66a3db9 100644 --- a/src/reaper.hpp +++ b/src/reaper.hpp @@ -29,11 +29,14 @@ namespace zmq { + class ctx_t; + class socket_base_t; + class reaper_t : public object_t, public i_poll_events { public: - reaper_t (class ctx_t *ctx_, uint32_t tid_); + reaper_t (zmq::ctx_t *ctx_, uint32_t tid_); ~reaper_t (); mailbox_t *get_mailbox (); @@ -50,7 +53,7 @@ namespace zmq // Command handlers. void process_stop (); - void process_reap (class socket_base_t *socket_); + void process_reap (zmq::socket_base_t *socket_); void process_reaped (); // Reaper thread accesses incoming commands via this mailbox. diff --git a/src/rep.hpp b/src/rep.hpp index de9c2b8..04b3d86 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -27,16 +27,21 @@ namespace zmq { + class ctx_t; + class msg_t; + class io_thread_t; + class socket_base_t; + class rep_t : public xrep_t { public: - rep_t (class ctx_t *parent_, uint32_t tid_); + rep_t (zmq::ctx_t *parent_, uint32_t tid_); ~rep_t (); // Overloads of functions from socket_base_t. - int xsend (class msg_t *msg_, int flags_); - int xrecv (class msg_t *msg_, int flags_); + int xsend (zmq::msg_t *msg_, int flags_); + int xrecv (zmq::msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); @@ -59,8 +64,8 @@ namespace zmq { public: - rep_session_t (class io_thread_t *io_thread_, bool connect_, - class socket_base_t *socket_, const options_t &options_, + rep_session_t (zmq::io_thread_t *io_thread_, bool connect_, + zmq::socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~rep_session_t (); diff --git a/src/req.hpp b/src/req.hpp index 8fae9d4..e743cb8 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -29,16 +29,21 @@ namespace zmq { + class ctx_t; + class msg_t; + class io_thread_t; + class socket_base_t; + class req_t : public xreq_t { public: - req_t (class ctx_t *parent_, uint32_t tid_); + req_t (zmq::ctx_t *parent_, uint32_t tid_); ~req_t (); // Overloads of functions from socket_base_t. - int xsend (class msg_t *msg_, int flags_); - int xrecv (class msg_t *msg_, int flags_); + int xsend (zmq::msg_t *msg_, int flags_); + int xrecv (zmq::msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); @@ -60,8 +65,8 @@ namespace zmq { public: - req_session_t (class io_thread_t *io_thread_, bool connect_, - class socket_base_t *socket_, const options_t &options_, + req_session_t (zmq::io_thread_t *io_thread_, bool connect_, + zmq::socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~req_session_t (); diff --git a/src/select.hpp b/src/select.hpp index 9231b6c..9f19a0f 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -47,6 +47,8 @@ namespace zmq { + struct i_poll_events; + // Implements socket polling mechanism using POSIX.1-2001 select() // function. @@ -60,7 +62,7 @@ namespace zmq ~select_t (); // "poller" concept. - handle_t add_fd (fd_t fd_, struct i_poll_events *events_); + handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_); void rm_fd (handle_t handle_); void set_pollin (handle_t handle_); void reset_pollin (handle_t handle_); @@ -80,7 +82,7 @@ namespace zmq struct fd_entry_t { fd_t fd; - struct i_poll_events *events; + zmq::i_poll_events *events; }; // Checks if an fd_entry_t is retired. diff --git a/src/session_base.hpp b/src/session_base.hpp index c89628f..6be110b 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -26,13 +26,17 @@ #include #include "own.hpp" -#include "i_engine.hpp" #include "io_object.hpp" #include "pipe.hpp" namespace zmq { + class pipe_t; + class io_thread_t; + class socket_base_t; + struct i_engine; + class session_base_t : public own_t, public io_object_t, @@ -41,13 +45,13 @@ namespace zmq public: // Create a session of the particular type. - static session_base_t *create (class io_thread_t *io_thread_, - bool connect_, class socket_base_t *socket_, + static session_base_t *create (zmq::io_thread_t *io_thread_, + bool connect_, zmq::socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); // To be used once only, when creating the session. - void attach_pipe (class pipe_t *pipe_); + void attach_pipe (zmq::pipe_t *pipe_); // Following functions are the interface exposed towards the engine. virtual int read (msg_t *msg_); @@ -56,15 +60,15 @@ namespace zmq void detach (); // i_pipe_events interface implementation. - void read_activated (class pipe_t *pipe_); - void write_activated (class pipe_t *pipe_); - void hiccuped (class pipe_t *pipe_); - void terminated (class pipe_t *pipe_); + void read_activated (zmq::pipe_t *pipe_); + void write_activated (zmq::pipe_t *pipe_); + void hiccuped (zmq::pipe_t *pipe_); + void terminated (zmq::pipe_t *pipe_); protected: - session_base_t (class io_thread_t *io_thread_, bool connect_, - class socket_base_t *socket_, const options_t &options_, + session_base_t (zmq::io_thread_t *io_thread_, bool connect_, + zmq::socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~session_base_t (); @@ -76,7 +80,7 @@ namespace zmq // Handlers for incoming commands. void process_plug (); - void process_attach (struct i_engine *engine_); + void process_attach (zmq::i_engine *engine_); void process_term (int linger_); // i_poll_events handlers. @@ -94,7 +98,7 @@ namespace zmq bool connect; // Pipe connecting the session to its socket. - class pipe_t *pipe; + zmq::pipe_t *pipe; // This flag is true if the remainder of the message being processed // is still in the in pipe. @@ -105,14 +109,14 @@ namespace zmq bool pending; // The protocol I/O engine connected to the session. - struct i_engine *engine; + zmq::i_engine *engine; // The socket the session belongs to. - class socket_base_t *socket; + zmq::socket_base_t *socket; // I/O thread the session is living in. It will be used to plug in // the engines into the same thread. - class io_thread_t *io_thread; + zmq::io_thread_t *io_thread; // ID of the linger timer enum {linger_timer_id = 0x20}; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index bc978ba..dda87f4 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -38,6 +38,10 @@ namespace zmq { + class ctx_t; + class msg_t; + class pipe_t; + class socket_base_t : public own_t, public array_item_t <>, @@ -52,7 +56,7 @@ namespace zmq bool check_tag (); // Create a socket of a specified type. - static socket_base_t *create (int type_, class ctx_t *parent_, + static socket_base_t *create (int type_, zmq::ctx_t *parent_, uint32_t tid_); // Returns the mailbox associated with this socket. @@ -67,8 +71,8 @@ namespace zmq int getsockopt (int option_, void *optval_, size_t *optvallen_); int bind (const char *addr_); int connect (const char *addr_); - int send (class msg_t *msg_, int flags_); - int recv (class msg_t *msg_, int flags_); + int send (zmq::msg_t *msg_, int flags_); + int recv (zmq::msg_t *msg_, int flags_); int close (); // These functions are used by the polling mechanism to determine @@ -94,12 +98,12 @@ namespace zmq protected: - socket_base_t (class ctx_t *parent_, uint32_t tid_); + socket_base_t (zmq::ctx_t *parent_, uint32_t tid_); virtual ~socket_base_t (); // Concrete algorithms for the x- methods are to be defined by // individual socket types. - virtual void xattach_pipe (class pipe_t *pipe_) = 0; + virtual void xattach_pipe (zmq::pipe_t *pipe_) = 0; // The default implementation assumes there are no specific socket // options for the particular socket type. If not so, overload this @@ -109,11 +113,11 @@ namespace zmq // The default implementation assumes that send is not supported. virtual bool xhas_out (); - virtual int xsend (class msg_t *msg_, int flags_); + virtual int xsend (zmq::msg_t *msg_, int flags_); // The default implementation assumes that recv in not supported. virtual bool xhas_in (); - virtual int xrecv (class msg_t *msg_, int flags_); + virtual int xrecv (zmq::msg_t *msg_, int flags_); // i_pipe_events will be forwarded to these functions. virtual void xread_activated (pipe_t *pipe_); @@ -154,7 +158,7 @@ namespace zmq int check_protocol (const std::string &protocol_); // Register the pipe with this socket. - void attach_pipe (class pipe_t *pipe_); + void attach_pipe (zmq::pipe_t *pipe_); // Processes commands sent to this socket (if any). If timeout is -1, // returns only after at least one command was processed. @@ -164,7 +168,7 @@ namespace zmq // Handlers for incoming commands. void process_stop (); - void process_bind (class pipe_t *pipe_); + void process_bind (zmq::pipe_t *pipe_); void process_unplug (); void process_term (int linger_); diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 6d122ed..30b190b 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -34,6 +34,9 @@ namespace zmq { + class io_thread_t; + class session_base_t; + // This engine handles any socket with SOCK_STREAM semantics, // e.g. TCP socket or an UNIX domain socket. @@ -45,8 +48,8 @@ namespace zmq ~stream_engine_t (); // i_engine interface implementation. - void plug (class io_thread_t *io_thread_, - class session_base_t *session_); + void plug (zmq::io_thread_t *io_thread_, + zmq::session_base_t *session_); void unplug (); void terminate (); void activate_in (); @@ -86,10 +89,10 @@ namespace zmq encoder_t encoder; // The session this engine is attached to. - class session_base_t *session; + zmq::session_base_t *session; // Detached transient session. - class session_base_t *leftover_session; + zmq::session_base_t *leftover_session; options_t options; diff --git a/src/sub.hpp b/src/sub.hpp index bb46641..1c69b06 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -27,17 +27,22 @@ namespace zmq { + class ctx_t; + class msg_t; + class io_thread_t; + class socket_base_t; + class sub_t : public xsub_t { public: - sub_t (class ctx_t *parent_, uint32_t tid_); + sub_t (zmq::ctx_t *parent_, uint32_t tid_); ~sub_t (); protected: int xsetsockopt (int option_, const void *optval_, size_t optvallen_); - int xsend (class msg_t *msg_, int flags_); + int xsend (zmq::msg_t *msg_, int flags_); bool xhas_out (); private: @@ -50,8 +55,8 @@ namespace zmq { public: - sub_session_t (class io_thread_t *io_thread_, bool connect_, - class socket_base_t *socket_, const options_t &options_, + sub_session_t (zmq::io_thread_t *io_thread_, bool connect_, + zmq::socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~sub_session_t (); diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index e420c82..fc3b9f2 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -31,14 +31,17 @@ namespace zmq { + class io_thread_t; + class session_base_t; + class tcp_connecter_t : public own_t, public io_object_t { public: // If 'delay' is true connecter first waits for a while, then starts // connection process. - tcp_connecter_t (class io_thread_t *io_thread_, - class session_base_t *session_, const options_t &options_, + tcp_connecter_t (zmq::io_thread_t *io_thread_, + zmq::session_base_t *session_, const options_t &options_, const char *address_, bool delay_); ~tcp_connecter_t (); @@ -98,7 +101,7 @@ namespace zmq bool wait; // Reference to the session we belong to. - class session_base_t *session; + zmq::session_base_t *session; // Current reconnect ivl, updated for backoff strategy int current_reconnect_ivl; diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp index e712998..c2116b3 100644 --- a/src/tcp_listener.hpp +++ b/src/tcp_listener.hpp @@ -31,12 +31,15 @@ namespace zmq { + class io_thread_t; + class socket_base_t; + class tcp_listener_t : public own_t, public io_object_t { public: - tcp_listener_t (class io_thread_t *io_thread_, - class socket_base_t *socket_, const options_t &options_); + tcp_listener_t (zmq::io_thread_t *io_thread_, + zmq::socket_base_t *socket_, const options_t &options_); ~tcp_listener_t (); // Set address to listen on. @@ -72,7 +75,7 @@ namespace zmq handle_t handle; // Socket the listerner belongs to. - class socket_base_t *socket; + zmq::socket_base_t *socket; tcp_listener_t (const tcp_listener_t&); const tcp_listener_t &operator = (const tcp_listener_t&); diff --git a/src/xpub.hpp b/src/xpub.hpp index 14ffc58..3f80d4a 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -33,23 +33,28 @@ namespace zmq { + class ctx_t; + class msg_t; + class pipe_t; + class io_thread_t; + class xpub_t : public socket_base_t { public: - xpub_t (class ctx_t *parent_, uint32_t tid_); + xpub_t (zmq::ctx_t *parent_, uint32_t tid_); ~xpub_t (); // Implementations of virtual functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_); - int xsend (class msg_t *msg_, int flags_); + void xattach_pipe (zmq::pipe_t *pipe_); + int xsend (zmq::msg_t *msg_, int flags_); bool xhas_out (); - int xrecv (class msg_t *msg_, int flags_); + int xrecv (zmq::msg_t *msg_, int flags_); bool xhas_in (); - void xread_activated (class pipe_t *pipe_); - void xwrite_activated (class pipe_t *pipe_); - void xterminated (class pipe_t *pipe_); + void xread_activated (zmq::pipe_t *pipe_); + void xwrite_activated (zmq::pipe_t *pipe_); + void xterminated (zmq::pipe_t *pipe_); private: @@ -59,7 +64,7 @@ namespace zmq void *arg_); // Function to be applied to each matching pipes. - static void mark_as_matching (class pipe_t *pipe_, void *arg_); + static void mark_as_matching (zmq::pipe_t *pipe_, void *arg_); // List of all subscriptions mapped to corresponding pipes. mtrie_t subscriptions; @@ -84,8 +89,8 @@ namespace zmq { public: - xpub_session_t (class io_thread_t *io_thread_, bool connect_, - class socket_base_t *socket_, const options_t &options_, + xpub_session_t (zmq::io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~xpub_session_t (); diff --git a/src/xrep.hpp b/src/xrep.hpp index fc02b11..df82d00 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -35,24 +35,27 @@ namespace zmq { + class ctx_t; + class pipe_t; + // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. class xrep_t : public socket_base_t { public: - xrep_t (class ctx_t *parent_, uint32_t tid_); + xrep_t (zmq::ctx_t *parent_, uint32_t tid_); ~xrep_t (); // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_); - int xsend (class msg_t *msg_, int flags_); - int xrecv (class msg_t *msg_, int flags_); + void xattach_pipe (zmq::pipe_t *pipe_); + int xsend (msg_t *msg_, int flags_); + int xrecv (msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); - void xread_activated (class pipe_t *pipe_); - void xwrite_activated (class pipe_t *pipe_); - void xterminated (class pipe_t *pipe_); + void xread_activated (zmq::pipe_t *pipe_); + void xwrite_activated (zmq::pipe_t *pipe_); + void xterminated (zmq::pipe_t *pipe_); protected: @@ -75,7 +78,7 @@ namespace zmq struct outpipe_t { - class pipe_t *pipe; + zmq::pipe_t *pipe; bool active; }; @@ -84,7 +87,7 @@ namespace zmq outpipes_t outpipes; // The pipe we are currently writing to. - class pipe_t *current_out; + zmq::pipe_t *current_out; // If true, more outgoing message parts are expected. bool more_out; @@ -101,8 +104,8 @@ namespace zmq { public: - xrep_session_t (class io_thread_t *io_thread_, bool connect_, - class socket_base_t *socket_, const options_t &options_, + xrep_session_t (zmq::io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~xrep_session_t (); diff --git a/src/xreq.hpp b/src/xreq.hpp index 1d979c5..4c94cad 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -29,25 +29,31 @@ namespace zmq { + class ctx_t; + class msg_t; + class pipe_t; + class io_thread_t; + class socket_base_t; + class xreq_t : public socket_base_t { public: - xreq_t (class ctx_t *parent_, uint32_t tid_); + xreq_t (zmq::ctx_t *parent_, uint32_t tid_); ~xreq_t (); protected: // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_); - int xsend (class msg_t *msg_, int flags_); - int xrecv (class msg_t *msg_, int flags_); + void xattach_pipe (zmq::pipe_t *pipe_); + int xsend (zmq::msg_t *msg_, int flags_); + int xrecv (zmq::msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); - void xread_activated (class pipe_t *pipe_); - void xwrite_activated (class pipe_t *pipe_); - void xterminated (class pipe_t *pipe_); + void xread_activated (zmq::pipe_t *pipe_); + void xwrite_activated (zmq::pipe_t *pipe_); + void xterminated (zmq::pipe_t *pipe_); private: @@ -64,8 +70,8 @@ namespace zmq { public: - xreq_session_t (class io_thread_t *io_thread_, bool connect_, - class socket_base_t *socket_, const options_t &options_, + xreq_session_t (zmq::io_thread_t *io_thread_, bool connect_, + zmq::socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~xreq_session_t (); diff --git a/src/xsub.hpp b/src/xsub.hpp index 1eac390..24f8157 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -26,36 +26,39 @@ #include "dist.hpp" #include "fq.hpp" #include "trie.hpp" -#include "msg.hpp" namespace zmq { + class ctx_t; + class pipe_t; + class io_thread_t; + class xsub_t : public socket_base_t { public: - xsub_t (class ctx_t *parent_, uint32_t tid_); + xsub_t (zmq::ctx_t *parent_, uint32_t tid_); ~xsub_t (); protected: // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_); - int xsend (class msg_t *msg_, int flags_); + void xattach_pipe (zmq::pipe_t *pipe_); + int xsend (zmq::msg_t *msg_, int flags_); bool xhas_out (); - int xrecv (class msg_t *msg_, int flags_); + int xrecv (zmq::msg_t *msg_, int flags_); bool xhas_in (); - void xread_activated (class pipe_t *pipe_); - void xwrite_activated (class pipe_t *pipe_); + void xread_activated (zmq::pipe_t *pipe_); + void xwrite_activated (zmq::pipe_t *pipe_); void xhiccuped (pipe_t *pipe_); - void xterminated (class pipe_t *pipe_); + void xterminated (zmq::pipe_t *pipe_); private: // Check whether the message matches at least one subscription. - bool match (class msg_t *msg_); + bool match (zmq::msg_t *msg_); // Function to be applied to the trie to send all the subsciptions // upstream. @@ -89,7 +92,7 @@ namespace zmq public: xsub_session_t (class io_thread_t *io_thread_, bool connect_, - class socket_base_t *socket_, const options_t &options_, + socket_base_t *socket_, const options_t &options_, const char *protocol_, const char *address_); ~xsub_session_t (); -- cgit v1.2.3