diff options
50 files changed, 494 insertions, 82 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index 7992ab8..3b7dec6 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -57,7 +57,7 @@ libzmq_la_SOURCES = \      req.hpp \      router.hpp \      select.hpp \ -    session.hpp \ +    session_base.hpp \      signaler.hpp \      socket_base.hpp \      stdint.hpp \ @@ -117,7 +117,7 @@ libzmq_la_SOURCES = \      rep.cpp \      req.cpp \      select.cpp \ -    session.cpp \ +    session_base.cpp \      signaler.cpp \      socket_base.cpp \      stream_engine.cpp \ diff --git a/src/decoder.cpp b/src/decoder.cpp index 01ce0bb..9e93b73 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -22,7 +22,7 @@  #include <string.h>  #include "decoder.hpp" -#include "session.hpp" +#include "session_base.hpp"  #include "wire.hpp"  #include "err.hpp" @@ -44,7 +44,7 @@ zmq::decoder_t::~decoder_t ()      errno_assert (rc == 0);  } -void zmq::decoder_t::set_session (session_t *session_) +void zmq::decoder_t::set_session (session_base_t *session_)  {      session = session_;  } diff --git a/src/decoder.hpp b/src/decoder.hpp index 3ac4f7c..01021c4 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -184,7 +184,7 @@ namespace zmq          decoder_t (size_t bufsize_, int64_t maxmsgsize_);          ~decoder_t (); -        void set_session (class session_t *session_); +        void set_session (class session_base_t *session_);      private: @@ -193,7 +193,7 @@ namespace zmq          bool flags_ready ();          bool message_ready (); -        class session_t *session; +        class session_base_t *session;          unsigned char tmpbuf [8];          msg_t in_progress; diff --git a/src/encoder.cpp b/src/encoder.cpp index 087735d..6d09384 100644 --- a/src/encoder.cpp +++ b/src/encoder.cpp @@ -19,7 +19,7 @@  */  #include "encoder.hpp" -#include "session.hpp" +#include "session_base.hpp"  #include "wire.hpp"  zmq::encoder_t::encoder_t (size_t bufsize_) : @@ -39,7 +39,7 @@ zmq::encoder_t::~encoder_t ()      errno_assert (rc == 0);  } -void zmq::encoder_t::set_session (session_t *session_) +void zmq::encoder_t::set_session (session_base_t *session_)  {      session = session_;  } diff --git a/src/encoder.hpp b/src/encoder.hpp index b8784a3..f7e3cbc 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -163,14 +163,14 @@ namespace zmq          encoder_t (size_t bufsize_);          ~encoder_t (); -        void set_session (class session_t *session_); +        void set_session (class session_base_t *session_);      private:          bool size_ready ();          bool message_ready (); -        class session_t *session; +        class session_base_t *session;          msg_t in_progress;          unsigned char tmpbuf [10]; diff --git a/src/i_engine.hpp b/src/i_engine.hpp index c49a107..26e475b 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -32,7 +32,7 @@ namespace zmq          //  Plug the engine to the session.          virtual void plug (class io_thread_t *io_thread_, -            class session_t *session_) = 0; +            class session_base_t *session_) = 0;          //  Unplug the engine from the session.          virtual void unplug () = 0; diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index 9b8520d..a54e8fe 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -38,7 +38,7 @@  #include <sys/un.h>  zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, -      class session_t *session_, const options_t &options_, +      class session_base_t *session_, const options_t &options_,        const char *address_, bool wait_) :      own_t (io_thread_, options_),      io_object_t (io_thread_), diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index 0bb9d69..721bcf4 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -41,7 +41,7 @@ namespace zmq          //  If 'delay' is true connecter first waits for a while, then starts          //  connection process.          ipc_connecter_t (class io_thread_t *io_thread_, -            class session_t *session_, const options_t &options_, +            class session_base_t *session_, const options_t &options_,              const char *address_, bool delay_);          ~ipc_connecter_t (); @@ -101,7 +101,7 @@ namespace zmq          bool wait;          //  Reference to the session we belong to. -        class session_t *session; +        class session_base_t *session;          //  Current reconnect ivl, updated for backoff strategy          int current_reconnect_ivl; diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index cad58ba..5ba41be 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -29,7 +29,7 @@  #include "stream_engine.hpp"  #include "ipc_address.hpp"  #include "io_thread.hpp" -#include "session.hpp" +#include "session_base.hpp"  #include "config.hpp"  #include "err.hpp"  #include "ip.hpp" @@ -87,9 +87,9 @@ void zmq::ipc_listener_t::in_event ()      zmq_assert (io_thread);      //  Create and launch a session object.  -    session_t *session = new (std::nothrow) -        session_t (io_thread, false, socket, options, NULL, NULL); -    alloc_assert (session); +    session_base_t *session = session_base_t::create (io_thread, false, socket, +        options, NULL, NULL); +    errno_assert (session);      session->inc_seqnum ();      launch_child (session);      send_attach (session, engine, false); diff --git a/src/object.cpp b/src/object.cpp index 7f7d7f8..807fb04 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -26,7 +26,7 @@  #include "err.hpp"  #include "pipe.hpp"  #include "io_thread.hpp" -#include "session.hpp" +#include "session_base.hpp"  #include "socket_base.hpp"  zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) : @@ -201,8 +201,8 @@ void zmq::object_t::send_own (own_t *destination_, own_t *object_)      send_command (cmd);  } -void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, -    bool inc_seqnum_) +void zmq::object_t::send_attach (session_base_t *destination_, +    i_engine *engine_, bool inc_seqnum_)  {      if (inc_seqnum_)          destination_->inc_seqnum (); diff --git a/src/object.hpp b/src/object.hpp index e05b958..1a38b24 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -62,7 +62,7 @@ namespace zmq              bool inc_seqnum_ = true);          void send_own (class own_t *destination_,              class own_t *object_); -        void send_attach (class session_t *destination_, +        void send_attach (class session_base_t *destination_,               struct i_engine *engine_, bool inc_seqnum_ = true);          void send_bind (class own_t *destination_, class pipe_t *pipe_,               bool inc_seqnum_ = true); diff --git a/src/pair.cpp b/src/pair.cpp index 12a1881..2fa4eac 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -116,3 +116,15 @@ bool zmq::pair_t::xhas_out ()      return result;  } +zmq::pair_session_t::pair_session_t (io_thread_t *io_thread_, bool connect_, +      socket_base_t *socket_, const options_t &options_, +      const char *protocol_, const char *address_) : +    session_base_t (io_thread_, connect_, socket_, options_, protocol_, +         address_) +{ +} + +zmq::pair_session_t::~pair_session_t () +{ +} + diff --git a/src/pair.hpp b/src/pair.hpp index 59300ae..e7390d6 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -22,6 +22,7 @@  #define __ZMQ_PAIR_HPP_INCLUDED__  #include "socket_base.hpp" +#include "session_base.hpp"  namespace zmq  { @@ -52,6 +53,21 @@ namespace zmq          const pair_t &operator = (const pair_t&);      }; +    class pair_session_t : public session_base_t +    { +    public: + +        pair_session_t (class io_thread_t *io_thread_, bool connect_, +            class socket_base_t *socket_, const options_t &options_, +            const char *protocol_, const char *address_); +        ~pair_session_t (); + +    private: + +        pair_session_t (const pair_session_t&); +        const pair_session_t &operator = (const pair_session_t&); +    }; +  }  #endif diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 5c1517d..6c292cd 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -29,7 +29,7 @@  #endif  #include "pgm_receiver.hpp" -#include "session.hpp" +#include "session_base.hpp"  #include "stdint.hpp"  #include "wire.hpp"  #include "err.hpp" @@ -57,7 +57,8 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)      return pgm_socket.init (udp_encapsulation_, network_);  } -void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, session_t *session_) +void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, +    session_base_t *session_)  {      //  Retrieve PGM fds and start polling.      fd_t socket_fd = retired_fd; diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index f66c592..b9e9a05 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -52,7 +52,8 @@ namespace zmq          int init (bool udp_encapsulation_, const char *network_);          //  i_engine interface implementation. -        void plug (class io_thread_t *io_thread_, class session_t *session_); +        void plug (class io_thread_t *io_thread_, +            class session_base_t *session_);          void unplug ();          void terminate ();          void activate_in (); @@ -105,7 +106,7 @@ namespace zmq          options_t options;          //  Associated session. -        class session_t *session; +        class session_base_t *session;          //  Most recently used decoder.          decoder_t *mru_decoder; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index e103d9a..733b1ec 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -30,7 +30,7 @@  #include "io_thread.hpp"  #include "pgm_sender.hpp" -#include "session.hpp" +#include "session_base.hpp"  #include "err.hpp"  #include "wire.hpp"  #include "stdint.hpp" @@ -62,7 +62,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)      return rc;  } -void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_t *session_) +void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)  {      //  Alocate 2 fds for PGM socket.      fd_t downlink_socket_fd = retired_fd; diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 366e385..d3d5924 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -50,7 +50,8 @@ namespace zmq          int init (bool udp_encapsulation_, const char *network_);          //  i_engine interface implementation. -        void plug (class io_thread_t *io_thread_, class session_t *session_); +        void plug (class io_thread_t *io_thread_, +            class session_base_t *session_);          void unplug ();          void terminate ();          void activate_in (); diff --git a/src/pub.cpp b/src/pub.cpp index 4787c32..15ec291 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -43,3 +43,15 @@ bool zmq::pub_t::xhas_in ()      return false;  } +zmq::pub_session_t::pub_session_t (io_thread_t *io_thread_, bool connect_, +      socket_base_t *socket_, const options_t &options_, +      const char *protocol_, const char *address_) : +    xpub_session_t (io_thread_, connect_, socket_, options_, protocol_, +        address_) +{ +} + +zmq::pub_session_t::~pub_session_t () +{ +} + diff --git a/src/pub.hpp b/src/pub.hpp index c8db55f..4a4da0f 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -43,6 +43,21 @@ namespace zmq          const pub_t &operator = (const pub_t&);      }; +    class pub_session_t : public xpub_session_t +    { +    public: + +        pub_session_t (class io_thread_t *io_thread_, bool connect_, +            class socket_base_t *socket_, const options_t &options_, +            const char *protocol_, const char *address_); +        ~pub_session_t (); + +    private: + +        pub_session_t (const pub_session_t&); +        const pub_session_t &operator = (const pub_session_t&); +    }; +  }  #endif diff --git a/src/pull.cpp b/src/pull.cpp index afde236..06575da 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -59,3 +59,15 @@ bool zmq::pull_t::xhas_in ()      return fq.has_in ();  } +zmq::pull_session_t::pull_session_t (io_thread_t *io_thread_, bool connect_, +      socket_base_t *socket_, const options_t &options_, +      const char *protocol_, const char *address_) : +    session_base_t (io_thread_, connect_, socket_, options_, protocol_, +        address_) +{ +} + +zmq::pull_session_t::~pull_session_t () +{ +} + diff --git a/src/pull.hpp b/src/pull.hpp index be82af9..6a46ead 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -22,6 +22,7 @@  #define __ZMQ_PULL_HPP_INCLUDED__  #include "socket_base.hpp" +#include "session_base.hpp"  #include "fq.hpp"  namespace zmq @@ -54,6 +55,21 @@ namespace zmq      }; +    class pull_session_t : public session_base_t +    { +    public: + +        pull_session_t (class io_thread_t *io_thread_, bool connect_, +            class socket_base_t *socket_, const options_t &options_, +            const char *protocol_, const char *address_); +        ~pull_session_t (); + +    private: + +        pull_session_t (const pull_session_t&); +        const pull_session_t &operator = (const pull_session_t&); +    }; +  }  #endif diff --git a/src/push.cpp b/src/push.cpp index 77cc9d8..e91b789 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -59,3 +59,15 @@ bool zmq::push_t::xhas_out ()      return lb.has_out ();  } +zmq::push_session_t::push_session_t (io_thread_t *io_thread_, bool connect_, +      socket_base_t *socket_, const options_t &options_, +      const char *protocol_, const char *address_) : +    session_base_t (io_thread_, connect_, socket_, options_, protocol_, +        address_) +{ +} + +zmq::push_session_t::~push_session_t () +{ +} + diff --git a/src/push.hpp b/src/push.hpp index 222a62d..1feb71d 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -22,6 +22,7 @@  #define __ZMQ_PUSH_HPP_INCLUDED__  #include "socket_base.hpp" +#include "session_base.hpp"  #include "lb.hpp"  namespace zmq @@ -53,6 +54,21 @@ namespace zmq          const push_t &operator = (const push_t&);      }; +    class push_session_t : public session_base_t +    { +    public: + +        push_session_t (class io_thread_t *io_thread_, bool connect_, +            class socket_base_t *socket_, const options_t &options_, +            const char *protocol_, const char *address_); +        ~push_session_t (); + +    private: + +        push_session_t (const push_session_t&); +        const push_session_t &operator = (const push_session_t&); +    }; +  }  #endif diff --git a/src/rep.cpp b/src/rep.cpp index 2ad494d..564fa89 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -110,3 +110,15 @@ bool zmq::rep_t::xhas_out ()      return xrep_t::xhas_out ();  } +zmq::rep_session_t::rep_session_t (io_thread_t *io_thread_, bool connect_, +      socket_base_t *socket_, const options_t &options_, +      const char *protocol_, const char *address_) : +    xrep_session_t (io_thread_, connect_, socket_, options_, protocol_, +        address_) +{ +} + +zmq::rep_session_t::~rep_session_t () +{ +} + diff --git a/src/rep.hpp b/src/rep.hpp index a13853d..55d57bd 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -54,6 +54,21 @@ namespace zmq      }; +    class rep_session_t : public xrep_session_t +    { +    public: + +        rep_session_t (class io_thread_t *io_thread_, bool connect_, +            class socket_base_t *socket_, const options_t &options_, +            const char *protocol_, const char *address_); +        ~rep_session_t (); + +    private: + +        rep_session_t (const rep_session_t&); +        const rep_session_t &operator = (const rep_session_t&); +    }; +  }  #endif diff --git a/src/req.cpp b/src/req.cpp index b3a9359..323e058 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -146,4 +146,15 @@ bool zmq::req_t::xhas_out ()      return xreq_t::xhas_out ();  } +zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_, +      socket_base_t *socket_, const options_t &options_, +      const char *protocol_, const char *address_) : +    xreq_session_t (io_thread_, connect_, socket_, options_, protocol_, +        address_) +{ +} + +zmq::req_session_t::~req_session_t () +{ +} diff --git a/src/req.hpp b/src/req.hpp index 50dcb44..2c2cbc4 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -58,6 +58,21 @@ namespace zmq          const req_t &operator = (const req_t&);      }; +    class req_session_t : public xreq_session_t +    { +    public: + +        req_session_t (class io_thread_t *io_thread_, bool connect_, +            class socket_base_t *socket_, const options_t &options_, +            const char *protocol_, const char *address_); +        ~req_session_t (); + +    private: + +        req_session_t (const req_session_t&); +        const req_session_t &operator = (const req_session_t&); +    }; +  }  #endif diff --git a/src/router.cpp b/src/router.cpp index 2c9ade9..c8cc278 100755 --- a/src/router.cpp +++ b/src/router.cpp @@ -270,5 +270,15 @@ bool zmq::router_t::xhas_out ()      return true;  } +zmq::router_session_t::router_session_t (io_thread_t *io_thread_, bool connect_, +      socket_base_t *socket_, const options_t &options_, +      const char *protocol_, const char *address_) : +    session_base_t (io_thread_, connect_, socket_, options_, protocol_, +        address_) +{ +} +zmq::router_session_t::~router_session_t () +{ +} diff --git a/src/router.hpp b/src/router.hpp index aeac865..9a5c0f9 100755 --- a/src/router.hpp +++ b/src/router.hpp @@ -25,6 +25,7 @@  #include <deque>  #include "socket_base.hpp" +#include "session_base.hpp"  #include "stdint.hpp"  #include "msg.hpp"  #include "fq.hpp" @@ -102,6 +103,21 @@ namespace zmq          const router_t &operator = (const router_t&);      }; +    class router_session_t : public session_base_t +    { +    public: + +        router_session_t (class io_thread_t *io_thread_, bool connect_, +            class socket_base_t *socket_, const options_t &options_, +            const char *protocol_, const char *address_); +        ~router_session_t (); + +    private: + +        router_session_t (const router_session_t&); +        const router_session_t &operator = (const router_session_t&); +    }; +  }  #endif diff --git a/src/session.cpp b/src/session_base.cpp index 8001ba8..7d4c5ab 100644 --- a/src/session.cpp +++ b/src/session_base.cpp @@ -18,7 +18,7 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ -#include "session.hpp" +#include "session_base.hpp"  #include "socket_base.hpp"  #include "i_engine.hpp"  #include "err.hpp" @@ -30,8 +30,82 @@  #include "pgm_sender.hpp"  #include "pgm_receiver.hpp" -zmq::session_t::session_t (class io_thread_t *io_thread_, bool connect_, -      class socket_base_t *socket_, const options_t &options_, +#include "req.hpp" +#include "xreq.hpp" +#include "rep.hpp" +#include "xrep.hpp" +#include "pub.hpp" +#include "xpub.hpp" +#include "sub.hpp" +#include "xsub.hpp" +#include "push.hpp" +#include "pull.hpp" +#include "router.hpp" +#include "pair.hpp" + +zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, +    bool connect_, class socket_base_t *socket_, const options_t &options_, +    const char *protocol_, const char *address_) +{ +    session_base_t *s = NULL; +    switch (options_.type) { +    case ZMQ_REQ: +        s = new (std::nothrow) req_session_t (io_thread_, connect_, +            socket_, options_, protocol_, address_); +        break; +    case ZMQ_XREQ: +        s = new (std::nothrow) xreq_session_t (io_thread_, connect_, +            socket_, options_, protocol_, address_); +    case ZMQ_REP: +        s = new (std::nothrow) rep_session_t (io_thread_, connect_, +            socket_, options_, protocol_, address_); +        break; +    case ZMQ_XREP: +        s = new (std::nothrow) xrep_session_t (io_thread_, connect_, +            socket_, options_, protocol_, address_); +        break; +    case ZMQ_PUB: +        s = new (std::nothrow) pub_session_t (io_thread_, connect_, +            socket_, options_, protocol_, address_); +        break; +    case ZMQ_XPUB: +        s = new (std::nothrow) xpub_session_t (io_thread_, connect_, +            socket_, options_, protocol_, address_); +        break; +    cas | 
