summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-09-15 10:00:23 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-09-15 10:00:23 +0200
commitf78d9b6bfca13e298c29fadabbbc870b37a0a573 (patch)
tree89b110d56183ef2958b894c26dfe2e5fea980537
parent78b02d142e82015a2146b7d40f7e0a729ad0e89b (diff)
Session class separated into socket-type-specific sessions
This is a preliminary patch allowing for socket-type-specific functionality in the I/O thread. For example, message format can be checked asynchronously and misbehaved connections dropped straight away. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--src/Makefile.am4
-rw-r--r--src/decoder.cpp4
-rw-r--r--src/decoder.hpp4
-rw-r--r--src/encoder.cpp4
-rw-r--r--src/encoder.hpp4
-rw-r--r--src/i_engine.hpp2
-rw-r--r--src/ipc_connecter.cpp2
-rw-r--r--src/ipc_connecter.hpp4
-rw-r--r--src/ipc_listener.cpp8
-rw-r--r--src/object.cpp6
-rw-r--r--src/object.hpp2
-rw-r--r--src/pair.cpp12
-rw-r--r--src/pair.hpp16
-rw-r--r--src/pgm_receiver.cpp5
-rw-r--r--src/pgm_receiver.hpp5
-rw-r--r--src/pgm_sender.cpp4
-rw-r--r--src/pgm_sender.hpp3
-rw-r--r--src/pub.cpp12
-rw-r--r--src/pub.hpp15
-rw-r--r--src/pull.cpp12
-rw-r--r--src/pull.hpp16
-rw-r--r--src/push.cpp12
-rw-r--r--src/push.hpp16
-rw-r--r--src/rep.cpp12
-rw-r--r--src/rep.hpp15
-rw-r--r--src/req.cpp11
-rw-r--r--src/req.hpp15
-rwxr-xr-xsrc/router.cpp10
-rwxr-xr-xsrc/router.hpp16
-rw-r--r--src/session_base.cpp (renamed from src/session.cpp)116
-rw-r--r--src/session_base.hpp (renamed from src/session.hpp)27
-rw-r--r--src/socket_base.cpp8
-rw-r--r--src/stream_engine.cpp5
-rw-r--r--src/stream_engine.hpp7
-rw-r--r--src/sub.cpp12
-rw-r--r--src/sub.hpp15
-rw-r--r--src/tcp_connecter.cpp2
-rw-r--r--src/tcp_connecter.hpp4
-rw-r--r--src/tcp_listener.cpp8
-rw-r--r--src/vtcp_connecter.cpp2
-rw-r--r--src/vtcp_connecter.hpp4
-rw-r--r--src/vtcp_listener.cpp6
-rw-r--r--src/xpub.cpp12
-rw-r--r--src/xpub.hpp16
-rw-r--r--src/xrep.cpp10
-rw-r--r--src/xrep.hpp16
-rw-r--r--src/xreq.cpp12
-rw-r--r--src/xreq.hpp16
-rw-r--r--src/xsub.cpp11
-rw-r--r--src/xsub.hpp16
50 files changed, 494 insertions, 82 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 7992ab8..3b7dec6 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -57,7 +57,7 @@ libzmq_la_SOURCES = \
req.hpp \
router.hpp \
select.hpp \
- session.hpp \
+ session_base.hpp \
signaler.hpp \
socket_base.hpp \
stdint.hpp \
@@ -117,7 +117,7 @@ libzmq_la_SOURCES = \
rep.cpp \
req.cpp \
select.cpp \
- session.cpp \
+ session_base.cpp \
signaler.cpp \
socket_base.cpp \
stream_engine.cpp \
diff --git a/src/decoder.cpp b/src/decoder.cpp
index 01ce0bb..9e93b73 100644
--- a/src/decoder.cpp
+++ b/src/decoder.cpp
@@ -22,7 +22,7 @@
#include <string.h>
#include "decoder.hpp"
-#include "session.hpp"
+#include "session_base.hpp"
#include "wire.hpp"
#include "err.hpp"
@@ -44,7 +44,7 @@ zmq::decoder_t::~decoder_t ()
errno_assert (rc == 0);
}
-void zmq::decoder_t::set_session (session_t *session_)
+void zmq::decoder_t::set_session (session_base_t *session_)
{
session = session_;
}
diff --git a/src/decoder.hpp b/src/decoder.hpp
index 3ac4f7c..01021c4 100644
--- a/src/decoder.hpp
+++ b/src/decoder.hpp
@@ -184,7 +184,7 @@ namespace zmq
decoder_t (size_t bufsize_, int64_t maxmsgsize_);
~decoder_t ();
- void set_session (class session_t *session_);
+ void set_session (class session_base_t *session_);
private:
@@ -193,7 +193,7 @@ namespace zmq
bool flags_ready ();
bool message_ready ();
- class session_t *session;
+ class session_base_t *session;
unsigned char tmpbuf [8];
msg_t in_progress;
diff --git a/src/encoder.cpp b/src/encoder.cpp
index 087735d..6d09384 100644
--- a/src/encoder.cpp
+++ b/src/encoder.cpp
@@ -19,7 +19,7 @@
*/
#include "encoder.hpp"
-#include "session.hpp"
+#include "session_base.hpp"
#include "wire.hpp"
zmq::encoder_t::encoder_t (size_t bufsize_) :
@@ -39,7 +39,7 @@ zmq::encoder_t::~encoder_t ()
errno_assert (rc == 0);
}
-void zmq::encoder_t::set_session (session_t *session_)
+void zmq::encoder_t::set_session (session_base_t *session_)
{
session = session_;
}
diff --git a/src/encoder.hpp b/src/encoder.hpp
index b8784a3..f7e3cbc 100644
--- a/src/encoder.hpp
+++ b/src/encoder.hpp
@@ -163,14 +163,14 @@ namespace zmq
encoder_t (size_t bufsize_);
~encoder_t ();
- void set_session (class session_t *session_);
+ void set_session (class session_base_t *session_);
private:
bool size_ready ();
bool message_ready ();
- class session_t *session;
+ class session_base_t *session;
msg_t in_progress;
unsigned char tmpbuf [10];
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index c49a107..26e475b 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -32,7 +32,7 @@ namespace zmq
// Plug the engine to the session.
virtual void plug (class io_thread_t *io_thread_,
- class session_t *session_) = 0;
+ class session_base_t *session_) = 0;
// Unplug the engine from the session.
virtual void unplug () = 0;
diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp
index 9b8520d..a54e8fe 100644
--- a/src/ipc_connecter.cpp
+++ b/src/ipc_connecter.cpp
@@ -38,7 +38,7 @@
#include <sys/un.h>
zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
- class session_t *session_, const options_t &options_,
+ class session_base_t *session_, const options_t &options_,
const char *address_, bool wait_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp
index 0bb9d69..721bcf4 100644
--- a/src/ipc_connecter.hpp
+++ b/src/ipc_connecter.hpp
@@ -41,7 +41,7 @@ namespace zmq
// If 'delay' is true connecter first waits for a while, then starts
// connection process.
ipc_connecter_t (class io_thread_t *io_thread_,
- class session_t *session_, const options_t &options_,
+ class session_base_t *session_, const options_t &options_,
const char *address_, bool delay_);
~ipc_connecter_t ();
@@ -101,7 +101,7 @@ namespace zmq
bool wait;
// Reference to the session we belong to.
- class session_t *session;
+ class session_base_t *session;
// Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl;
diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp
index cad58ba..5ba41be 100644
--- a/src/ipc_listener.cpp
+++ b/src/ipc_listener.cpp
@@ -29,7 +29,7 @@
#include "stream_engine.hpp"
#include "ipc_address.hpp"
#include "io_thread.hpp"
-#include "session.hpp"
+#include "session_base.hpp"
#include "config.hpp"
#include "err.hpp"
#include "ip.hpp"
@@ -87,9 +87,9 @@ void zmq::ipc_listener_t::in_event ()
zmq_assert (io_thread);
// Create and launch a session object.
- session_t *session = new (std::nothrow)
- session_t (io_thread, false, socket, options, NULL, NULL);
- alloc_assert (session);
+ session_base_t *session = session_base_t::create (io_thread, false, socket,
+ options, NULL, NULL);
+ errno_assert (session);
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
diff --git a/src/object.cpp b/src/object.cpp
index 7f7d7f8..807fb04 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -26,7 +26,7 @@
#include "err.hpp"
#include "pipe.hpp"
#include "io_thread.hpp"
-#include "session.hpp"
+#include "session_base.hpp"
#include "socket_base.hpp"
zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) :
@@ -201,8 +201,8 @@ void zmq::object_t::send_own (own_t *destination_, own_t *object_)
send_command (cmd);
}
-void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
- bool inc_seqnum_)
+void zmq::object_t::send_attach (session_base_t *destination_,
+ i_engine *engine_, bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
diff --git a/src/object.hpp b/src/object.hpp
index e05b958..1a38b24 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -62,7 +62,7 @@ namespace zmq
bool inc_seqnum_ = true);
void send_own (class own_t *destination_,
class own_t *object_);
- void send_attach (class session_t *destination_,
+ void send_attach (class session_base_t *destination_,
struct i_engine *engine_, bool inc_seqnum_ = true);
void send_bind (class own_t *destination_, class pipe_t *pipe_,
bool inc_seqnum_ = true);
diff --git a/src/pair.cpp b/src/pair.cpp
index 12a1881..2fa4eac 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -116,3 +116,15 @@ bool zmq::pair_t::xhas_out ()
return result;
}
+zmq::pair_session_t::pair_session_t (io_thread_t *io_thread_, bool connect_,
+ socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ session_base_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+zmq::pair_session_t::~pair_session_t ()
+{
+}
+
diff --git a/src/pair.hpp b/src/pair.hpp
index 59300ae..e7390d6 100644
--- a/src/pair.hpp
+++ b/src/pair.hpp
@@ -22,6 +22,7 @@
#define __ZMQ_PAIR_HPP_INCLUDED__
#include "socket_base.hpp"
+#include "session_base.hpp"
namespace zmq
{
@@ -52,6 +53,21 @@ namespace zmq
const pair_t &operator = (const pair_t&);
};
+ class pair_session_t : public session_base_t
+ {
+ public:
+
+ pair_session_t (class io_thread_t *io_thread_, bool connect_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~pair_session_t ();
+
+ private:
+
+ pair_session_t (const pair_session_t&);
+ const pair_session_t &operator = (const pair_session_t&);
+ };
+
}
#endif
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 5c1517d..6c292cd 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -29,7 +29,7 @@
#endif
#include "pgm_receiver.hpp"
-#include "session.hpp"
+#include "session_base.hpp"
#include "stdint.hpp"
#include "wire.hpp"
#include "err.hpp"
@@ -57,7 +57,8 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
return pgm_socket.init (udp_encapsulation_, network_);
}
-void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, session_t *session_)
+void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_,
+ session_base_t *session_)
{
// Retrieve PGM fds and start polling.
fd_t socket_fd = retired_fd;
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index f66c592..b9e9a05 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -52,7 +52,8 @@ namespace zmq
int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation.
- void plug (class io_thread_t *io_thread_, class session_t *session_);
+ void plug (class io_thread_t *io_thread_,
+ class session_base_t *session_);
void unplug ();
void terminate ();
void activate_in ();
@@ -105,7 +106,7 @@ namespace zmq
options_t options;
// Associated session.
- class session_t *session;
+ class session_base_t *session;
// Most recently used decoder.
decoder_t *mru_decoder;
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index e103d9a..733b1ec 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -30,7 +30,7 @@
#include "io_thread.hpp"
#include "pgm_sender.hpp"
-#include "session.hpp"
+#include "session_base.hpp"
#include "err.hpp"
#include "wire.hpp"
#include "stdint.hpp"
@@ -62,7 +62,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
return rc;
}
-void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_t *session_)
+void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
{
// Alocate 2 fds for PGM socket.
fd_t downlink_socket_fd = retired_fd;
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 366e385..d3d5924 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -50,7 +50,8 @@ namespace zmq
int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation.
- void plug (class io_thread_t *io_thread_, class session_t *session_);
+ void plug (class io_thread_t *io_thread_,
+ class session_base_t *session_);
void unplug ();
void terminate ();
void activate_in ();
diff --git a/src/pub.cpp b/src/pub.cpp
index 4787c32..15ec291 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -43,3 +43,15 @@ bool zmq::pub_t::xhas_in ()
return false;
}
+zmq::pub_session_t::pub_session_t (io_thread_t *io_thread_, bool connect_,
+ socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ xpub_session_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+zmq::pub_session_t::~pub_session_t ()
+{
+}
+
diff --git a/src/pub.hpp b/src/pub.hpp
index c8db55f..4a4da0f 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -43,6 +43,21 @@ namespace zmq
const pub_t &operator = (const pub_t&);
};
+ class pub_session_t : public xpub_session_t
+ {
+ public:
+
+ pub_session_t (class io_thread_t *io_thread_, bool connect_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~pub_session_t ();
+
+ private:
+
+ pub_session_t (const pub_session_t&);
+ const pub_session_t &operator = (const pub_session_t&);
+ };
+
}
#endif
diff --git a/src/pull.cpp b/src/pull.cpp
index afde236..06575da 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -59,3 +59,15 @@ bool zmq::pull_t::xhas_in ()
return fq.has_in ();
}
+zmq::pull_session_t::pull_session_t (io_thread_t *io_thread_, bool connect_,
+ socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ session_base_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+zmq::pull_session_t::~pull_session_t ()
+{
+}
+
diff --git a/src/pull.hpp b/src/pull.hpp
index be82af9..6a46ead 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -22,6 +22,7 @@
#define __ZMQ_PULL_HPP_INCLUDED__
#include "socket_base.hpp"
+#include "session_base.hpp"
#include "fq.hpp"
namespace zmq
@@ -54,6 +55,21 @@ namespace zmq
};
+ class pull_session_t : public session_base_t
+ {
+ public:
+
+ pull_session_t (class io_thread_t *io_thread_, bool connect_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~pull_session_t ();
+
+ private:
+
+ pull_session_t (const pull_session_t&);
+ const pull_session_t &operator = (const pull_session_t&);
+ };
+
}
#endif
diff --git a/src/push.cpp b/src/push.cpp
index 77cc9d8..e91b789 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -59,3 +59,15 @@ bool zmq::push_t::xhas_out ()
return lb.has_out ();
}
+zmq::push_session_t::push_session_t (io_thread_t *io_thread_, bool connect_,
+ socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ session_base_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+zmq::push_session_t::~push_session_t ()
+{
+}
+
diff --git a/src/push.hpp b/src/push.hpp
index 222a62d..1feb71d 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -22,6 +22,7 @@
#define __ZMQ_PUSH_HPP_INCLUDED__
#include "socket_base.hpp"
+#include "session_base.hpp"
#include "lb.hpp"
namespace zmq
@@ -53,6 +54,21 @@ namespace zmq
const push_t &operator = (const push_t&);
};
+ class push_session_t : public session_base_t
+ {
+ public:
+
+ push_session_t (class io_thread_t *io_thread_, bool connect_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~push_session_t ();
+
+ private:
+
+ push_session_t (const push_session_t&);
+ const push_session_t &operator = (const push_session_t&);
+ };
+
}
#endif
diff --git a/src/rep.cpp b/src/rep.cpp
index 2ad494d..564fa89 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -110,3 +110,15 @@ bool zmq::rep_t::xhas_out ()
return xrep_t::xhas_out ();
}
+zmq::rep_session_t::rep_session_t (io_thread_t *io_thread_, bool connect_,
+ socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ xrep_session_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+zmq::rep_session_t::~rep_session_t ()
+{
+}
+
diff --git a/src/rep.hpp b/src/rep.hpp
index a13853d..55d57bd 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -54,6 +54,21 @@ namespace zmq
};
+ class rep_session_t : public xrep_session_t
+ {
+ public:
+
+ rep_session_t (class io_thread_t *io_thread_, bool connect_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~rep_session_t ();
+
+ private:
+
+ rep_session_t (const rep_session_t&);
+ const rep_session_t &operator = (const rep_session_t&);
+ };
+
}
#endif
diff --git a/src/req.cpp b/src/req.cpp
index b3a9359..323e058 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -146,4 +146,15 @@ bool zmq::req_t::xhas_out ()
return xreq_t::xhas_out ();
}
+zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
+ socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ xreq_session_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+zmq::req_session_t::~req_session_t ()
+{
+}
diff --git a/src/req.hpp b/src/req.hpp
index 50dcb44..2c2cbc4 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -58,6 +58,21 @@ namespace zmq
const req_t &operator = (const req_t&);
};
+ class req_session_t : public xreq_session_t
+ {
+ public:
+
+ req_session_t (class io_thread_t *io_thread_, bool connect_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~req_session_t ();
+
+ private:
+
+ req_session_t (const req_session_t&);
+ const req_session_t &operator = (const req_session_t&);
+ };
+
}
#endif
diff --git a/src/router.cpp b/src/router.cpp
index 2c9ade9..c8cc278 100755
--- a/src/router.cpp
+++ b/src/router.cpp
@@ -270,5 +270,15 @@ bool zmq::router_t::xhas_out ()
return true;
}
+zmq::router_session_t::router_session_t (io_thread_t *io_thread_, bool connect_,
+ socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ session_base_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+zmq::router_session_t::~router_session_t ()
+{
+}
diff --git a/src/router.hpp b/src/router.hpp
index aeac865..9a5c0f9 100755
--- a/src/router.hpp
+++ b/src/router.hpp
@@ -25,6 +25,7 @@
#include <deque>
#include "socket_base.hpp"
+#include "session_base.hpp"
#include "stdint.hpp"
#include "msg.hpp"
#include "fq.hpp"
@@ -102,6 +103,21 @@ namespace zmq
const router_t &operator = (const router_t&);
};
+ class router_session_t : public session_base_t
+ {
+ public:
+
+ router_session_t (class io_thread_t *io_thread_, bool connect_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~router_session_t ();
+
+ private:
+
+ router_session_t (const router_session_t&);
+ const router_session_t &operator = (const router_session_t&);
+ };
+
}
#endif
diff --git a/src/session.cpp b/src/session_base.cpp
index 8001ba8..7d4c5ab 100644
--- a/src/session.cpp
+++ b/src/session_base.cpp
@@ -18,7 +18,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "session.hpp"
+#include "session_base.hpp"
#include "socket_base.hpp"
#include "i_engine.hpp"
#include "err.hpp"
@@ -30,8 +30,82 @@
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
-zmq::session_t::session_t (class io_thread_t *io_thread_, bool connect_,
- class socket_base_t *socket_, const options_t &options_,
+#include "req.hpp"
+#include "xreq.hpp"
+#include "rep.hpp"
+#include "xrep.hpp"
+#include "pub.hpp"
+#include "xpub.hpp"
+#include "sub.hpp"
+#include "xsub.hpp"
+#include "push.hpp"
+#include "pull.hpp"
+#include "router.hpp"
+#include "pair.hpp"
+
+zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
+ bool connect_, class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_)
+{
+ session_base_t *s = NULL;
+ switch (options_.type) {
+ case ZMQ_REQ:
+ s = new (std::nothrow) req_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case ZMQ_XREQ:
+ s = new (std::nothrow) xreq_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ case ZMQ_REP:
+ s = new (std::nothrow) rep_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case ZMQ_XREP:
+ s = new (std::nothrow) xrep_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case ZMQ_PUB:
+ s = new (std::nothrow) pub_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case ZMQ_XPUB:
+ s = new (std::nothrow) xpub_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case ZMQ_SUB:
+ s = new (std::nothrow) sub_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case ZMQ_XSUB:
+ s = new (std::nothrow) xsub_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case ZMQ_PUSH:
+ s = new (std::nothrow) push_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case ZMQ_PULL:
+ s = new (std::nothrow) pull_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case ZMQ_ROUTER:
+ s = new (std::nothrow) router_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case ZMQ_PAIR:
+ s = new (std::nothrow) pair_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ default:
+ errno = EINVAL;
+ return NULL;
+ }
+ alloc_assert (s);
+ return s;
+}
+
+zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
+ bool connect_, class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
@@ -50,7 +124,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, bool connect_,
address = address_;
}
-zmq::session_t::~session_t ()
+zmq::session_base_t::~session_base_t ()
{
zmq_assert (!pipe);
@@ -65,7 +139,7 @@ zmq::session_t::~session_t ()
engine->terminate ();
}
-void zmq::session_t::attach_pipe (pipe_t *pipe_)
+void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
{
zmq_assert (!is_terminating ());
zmq_assert (!pipe);
@@ -74,7 +148,7 @@ void zmq::session_t::attach_pipe (pipe_t *pipe_)
pipe->set_event_sink (this);
}
-bool zmq::session_t::read (msg_t *msg_)
+bool zmq::session_base_t::read (msg_t *msg_)
{
if (!pipe)
return false;
@@ -87,7 +161,7 @@ bool zmq::session_t::read (msg_t *msg_)
return true;
}
-bool zmq::session_t::write (msg_t *msg_)
+bool zmq::session_base_t::write (msg_t *msg_)
{
if (pipe && pipe->write (msg_)) {
int rc = msg_->init ();
@@ -98,13 +172,13 @@ bool zmq::session_t::write (msg_t *msg_)
return false;
}
-void zmq::session_t::flush ()
+void zmq::session_base_t::flush ()
{
if (pipe)
pipe->flush ();
}
-void zmq::session_t::clean_pipes ()
+void zmq::session_base_t::clean_pipes ()
{
if (pipe) {
@@ -128,7 +202,7 @@ void zmq::session_t::clean_pipes ()
}
}
-void zmq::session_t::terminated (pipe_t *pipe_)
+void zmq::session_base_t::terminated (pipe_t *pipe_)
{
// Drop the reference to the deallocated pipe.
zmq_assert (pipe == pipe_);
@@ -141,7 +215,7 @@ void zmq::session_t::terminated (pipe_t *pipe_)
proceed_with_term ();
}
-void zmq::session_t::read_activated (pipe_t *pipe_)
+void zmq::session_base_t::read_activated (pipe_t *pipe_)
{
zmq_assert (pipe == pipe_);
@@ -151,7 +225,7 @@ void zmq::session_t::read_activated (pipe_t *pipe_)
pipe->check_read ();
}
-void zmq::session_t::write_activated (pipe_t *pipe_)
+void zmq::session_base_t::write_activated (pipe_t *pipe_)
{
zmq_assert (pipe == pipe_);
@@ -159,20 +233,20 @@ void zmq::session_t::write_activated (pipe_t *pipe_)
engine->activate_in ();
}
-void zmq::session_t::hiccuped (pipe_t *pipe_)
+void zmq::session_base_t::hiccuped (pipe_t *pipe_)
{
// Hiccups are always sent from session to socket, not the other
// way round.
zmq_assert (false);
}
-void zmq::session_t::process_plug ()
+void zmq::session_base_t::process_plug ()
{
if (connect)
start_connecting (false);
}
-void zmq::session_t::process_attach (i_engine *engine_)
+void zmq::session_base_t::process_attach (i_engine *engine_)
{
// If some other object (e.g. init) notifies us that the connection failed
// without creating an engine we need to start the reconnection process.
@@ -208,7 +282,7 @@ void zmq::session_t::process_attach (i_engine *engine_)
engine->plug (io_thread, this);
}
-void zmq::session_t::detach ()
+void zmq::session_base_t::detach ()
{
// Engine is dead. Let's forget about it.
engine = NULL;
@@ -224,7 +298,7 @@ void zmq::session_t::detach ()
pipe->check_read ();
}
-void zmq::session_t::process_term (int linger_)
+void zmq::session_base_t::process_term (int linger_)
{
zmq_assert (!pending);
@@ -257,7 +331,7 @@ void zmq::session_t::process_term (int linger_)
pipe->check_read ();
}
-void zmq::session_t::proceed_with_term ()
+void zmq::session_base_t::proceed_with_term ()
{
// The pending phase have just ended.
pending = false;
@@ -266,7 +340,7 @@ void zmq::session_t::proceed_with_term ()
own_t::process_term (0);
}
-void zmq::session_t::timer_event (int id_)
+void zmq::session_base_t::timer_event (int id_)
{
// Linger period expired. We can proceed with termination even though
// there are still pending messages to be sent.
@@ -278,7 +352,7 @@ void zmq::session_t::timer_event (int id_)
pipe->terminate (false);
}
-void zmq::session_t::detached ()
+void zmq::session_base_t::detached ()
{
// Transient session self-destructs after peer disconnects.
if (!connect) {
@@ -295,7 +369,7 @@ void zmq::session_t::detached ()
pipe->hiccup ();
}
-void zmq::session_t::start_connecting (bool wait_)
+void zmq::session_base_t::start_connecting (bool wait_)
{
zmq_assert (connect);
diff --git a/src/session.hpp b/src/session_base.hpp
index a155357..175a11d 100644
--- a/src/session.hpp
+++ b/src/session_base.hpp
@@ -18,8 +18,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_SESSION_HPP_INCLUDED__
-#define __ZMQ_SESSION_HPP_INCLUDED__
+#ifndef __ZMQ_SESSION_BASE_HPP_INCLUDED__
+#define __ZMQ_SESSION_BASE_HPP_INCLUDED__
#include <string>
@@ -31,16 +31,18 @@
namespace zmq
{
- class session_t :
+ class session_base_t :
public own_t,
public io_object_t,
public i_pipe_events
{
public:
- session_t (class io_thread_t *io_thread_, bool connect_,
- class socket_base_t *socket_, const options_t &options_,
- const char *protocol_, const char *address_);
+ // Create a session of the particular type.
+ static session_base_t *create (class io_thread_t *io_thread_,
+ bool connect_, class socket_base_t *socket_,
+ const options_t &options_, const char *protocol_,
+ const char *address_);
// To be used once only, when creating the session.
void attach_pipe (class pipe_t *pipe_);
@@ -57,9 +59,14 @@ namespace zmq
void hiccuped (class pipe_t *pipe_);
void terminated (class pipe_t *pipe_);
- private:
+ protected:
- ~session_t ();
+ session_base_t (class io_thread_t *io_thread_, bool connect_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~session_base_t ();
+
+ private:
void start_connecting (bool wait_);
@@ -115,8 +122,8 @@ namespace zmq
std::string protocol;
std::string address;
- session_t (const session_t&);
- const session_t &operator = (const session_t&);
+ session_base_t (const session_base_t&);
+ const session_base_t &operator = (const session_base_t&);
};
}
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 4209a69..a4d89db 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -39,7 +39,7 @@
#include "vtcp_listener.hpp"
#include "tcp_connecter.hpp"
#include "io_thread.hpp"
-#include "session.hpp"
+#include "session_base.hpp"
#include "config.hpp"
#include "clock.hpp"
#include "pipe.hpp"
@@ -480,9 +480,9 @@ int zmq::socket_base_t::connect (const char *addr_)
}
// Create session.
- session_t *session = new (std::nothrow) session_t (
- io_thread, true, this, options, protocol.c_str (), address.c_str ());
- alloc_assert (session);
+ session_base_t *session = session_base_t::create (io_thread, true, this,
+ options, protocol.c_str (), address.c_str ());
+ errno_assert (session);
// Create a bi-directional pipe.
object_t *parents [2] = {this, session};
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index 15e7c21..2647795 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -36,7 +36,7 @@
#include "stream_engine.hpp"
#include "io_thread.hpp"
-#include "session.hpp"
+#include "session_base.hpp"
#include "config.hpp"
#include "err.hpp"
#include "ip.hpp"
@@ -102,7 +102,8 @@ zmq::stream_engine_t::~stream_engine_t ()
}
}
-void zmq::stream_engine_t::plug (io_thread_t *io_thread_, session_t *session_)
+void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
+ session_base_t *session_)
{
zmq_assert (!plugged);
plugged = true;
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp
index ac9a5be..92fc55f 100644
--- a/src/stream_engine.hpp
+++ b/src/stream_engine.hpp
@@ -44,7 +44,8 @@ namespace zmq
~stream_engine_t ();
// i_engine interface implementation.
- void plug (class io_thread_t *io_thread_, class session_t *session_);
+ void plug (class io_thread_t *io_thread_,
+ class session_base_t *session_);
void unplug ();
void terminate ();
void activate_in ();
@@ -84,10 +85,10 @@ namespace zmq
encoder_t encoder;
// The session this engine is attached to.
- class session_t *session;
+ class session_base_t *session;
// Detached transient session.
- class session_t *leftover_session;
+ class session_base_t *leftover_session;
options_t options;
diff --git a/src/sub.cpp b/src/sub.cpp
index 81082a2..d9f2f2e 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -79,3 +79,15 @@ bool zmq::sub_t::xhas_out ()
return false;
}
+zmq::sub_session_t::sub_session_t (io_thread_t *io_thread_, bool connect_,
+ socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ xsub_session_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+zmq::sub_session_t::~sub_session_t ()
+{
+}
+
diff --git a/src/sub.hpp b/src/sub.hpp
index b5980ba..7d3cf0b 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -45,6 +45,21 @@ namespace zmq
const sub_t &operator = (const sub_t&);
};
+ class sub_session_t : public xsub_session_t
+ {
+ public:
+
+ sub_session_t (class io_thread_t *io_thread_, bool connect_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~sub_session_t ();
+
+ private:
+
+ sub_session_t (const sub_session_t&);
+ const sub_session_t &operator = (const sub_session_t&);
+ };
+
}
#endif
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index 487f4f5..fe99252 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -46,7 +46,7 @@
#endif
zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
- class session_t *session_, const options_t &options_,
+ class session_base_t *session_, const options_t &options_,
const char *address_, bool wait_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp
index 17f8a70..d1a93cd 100644
--- a/src/tcp_connecter.hpp
+++ b/src/tcp_connecter.hpp
@@ -37,7 +37,7 @@ namespace zmq
// If 'delay' is true connecter first waits for a while, then starts
// connection process.
tcp_connecter_t (class io_thread_t *io_thread_,
- class session_t *session_, const options_t &options_,
+ class session_base_t *session_, const options_t &options_,
const char *address_, bool delay_);
~tcp_connecter_t ();
@@ -97,7 +97,7 @@ namespace zmq
bool wait;
// Reference to the session we belong to.
- class session_t *session;
+ class session_base_t *session;
// Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl;
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index da476a4..9b6068c 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -26,7 +26,7 @@
#include "tcp_listener.hpp"
#include "stream_engine.hpp"
#include "io_thread.hpp"
-#include "session.hpp"
+#include "session_base.hpp"
#include "config.hpp"
#include "err.hpp"
#include "ip.hpp"
@@ -97,9 +97,9 @@ void zmq::tcp_listener_t::in_event ()
zmq_assert (io_thread);
// Create and launch a session object.
- session_t *session = new (std::nothrow)
- session_t (io_thread, false, socket, options, NULL, NULL);
- alloc_assert (session);
+ session_base_t *session = session_base_t::create (io_thread, false, socket,
+ options, NULL, NULL);
+ errno_assert (session);
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
diff --git a/src/vtcp_connecter.cpp b/src/vtcp_connecter.cpp
index 3d2900e..5dc147e 100644
--- a/src/vtcp_connecter.cpp
+++ b/src/vtcp_connecter.cpp
@@ -50,7 +50,7 @@
#endif
zmq::vtcp_connecter_t::vtcp_connecter_t (class io_thread_t *io_thread_,
- class session_t *session_, const options_t &options_,
+ class session_base_t *session_, const options_t &options_,
const char *address_, bool wait_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
diff --git a/src/vtcp_connecter.hpp b/src/vtcp_connecter.hpp
index f467b5f..fe5260e 100644
--- a/src/vtcp_connecter.hpp
+++ b/src/vtcp_connecter.hpp
@@ -43,7 +43,7 @@ namespace zmq
// If 'delay' is true connecter first waits for a while, then starts
// connection process.
vtcp_connecter_t (class io_thread_t *io_thread_,
- class session_t *session_, const options_t &options_,
+ class session_base_t *session_, const options_t &options_,
const char *address_, bool delay_);
~vtcp_connecter_t ();
@@ -104,7 +104,7 @@ namespace zmq
bool wait;
// Reference to the session we belong to.
- class session_t *session;
+ class session_base_t *session;
// Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl;
diff --git a/src/vtcp_listener.cpp b/src/vtcp_listener.cpp
index b394833..7e496e5 100644
--- a/src/vtcp_listener.cpp
+++ b/src/vtcp_listener.cpp
@@ -27,7 +27,7 @@
#include <vtcp.h>
#include "stream_engine.hpp"
-#include "session.hpp"
+#include "session_base.hpp"
#include "stdint.hpp"
#include "err.hpp"
#include "ip.hpp"
@@ -113,8 +113,8 @@ void zmq::vtcp_listener_t::in_event ()
zmq_assert (io_thread);
// Create and launch a session object.
- session_t *session = new (std::nothrow)
- session_t (io_thread, false, socket, options, NULL, NULL);
+ session_base_t *session = session_base_t::create (io_thread, false, socket,
+ options, NULL, NULL);
alloc_assert (session);
session->inc_seqnum ();
launch_child (session);
diff --git a/src/xpub.cpp b/src/xpub.cpp
index 8da9cf9..a245fea 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -169,3 +169,15 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
}
}
+zmq::xpub_session_t::xpub_session_t (io_thread_t *io_thread_, bool connect_,
+ socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ session_base_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+zmq::xpub_session_t::~xpub_session_t ()
+{
+}
+
diff --git a/src/xpub.hpp b/src/xpub.hpp
index 001fa2d..b410e6c 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -25,6 +25,7 @@
#include <string>
#include "socket_base.hpp"
+#include "session_base.hpp"
#include "mtrie.hpp"
#include "array.hpp"
#include "dist.hpp"
@@ -79,6 +80,21 @@ namespace zmq
const xpub_t &operator = (const xpub_t&);
};
+ class xpub_session_t : public session_base_t
+ {
+ public:
+
+ xpub_session_t (class io_thread_t *io_thread_, bool connect_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~xpub_session_t ();
+
+ private:
+
+ xpub_session_t (const xpub_session_t&);
+ const xpub_session_t &operator = (const xpub_session_t&);
+ };
+
}
#endif
diff --git a/src/xrep.cpp b/src/xrep.cpp
index a11b8c1..c304463 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -243,5 +243,15 @@ bool zmq::xrep_t::xhas_out ()
return true;
}
+zmq::xrep_session_t::xrep_session_t (io_thread_t *io_thread_, bool connect_,
+ socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ session_base_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+zmq::xrep_session_t::~xrep_session_t ()
+{
+}
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 07f10ba..562f87d 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -24,6 +24,7 @@
#include <map>
#include "socket_base.hpp"
+#include "session_base.hpp"
#include "stdint.hpp"
#include "msg.hpp"
#include "fq.hpp"
@@ -93,6 +94,21 @@ namespace zmq
const xrep_t &operator = (const xrep_t&);
};
+ class xrep_session_t : public session_base_t
+ {
+ public:
+
+ xrep_session_t (class io_thread_t *io_thread_, bool connect_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~xrep_session_t ();
+
+ private:
+
+ xrep_session_t (const xrep_session_t&);
+ const xrep_session_t &operator = (const xrep_session_t&);
+ };
+
}
#endif
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 7b66137..79b3b94 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -79,3 +79,15 @@ void zmq::xreq_t::xterminated (pipe_t *pipe_)
lb.terminated (pipe_);
}
+zmq::xreq_session_t::xreq_session_t (io_thread_t *io_thread_, bool connect_,
+ socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ session_base_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+zmq::xreq_session_t::~xreq_session_t ()
+{
+}
+
diff --git a/src/xreq.hpp b/src/xreq.hpp
index a427ba9..d7e28c4 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -23,6 +23,7 @@
#define __ZMQ_XREQ_HPP_INCLUDED__
#include "socket_base.hpp"
+#include "session_base.hpp"
#include "fq.hpp"
#include "lb.hpp"
@@ -60,6 +61,21 @@ namespace zmq
const xreq_t &operator = (const xreq_t&);
};
+ class xreq_session_t : public session_base_t
+ {
+ public:
+
+ xreq_session_t (class io_thread_t *io_thread_, bool connect_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~xreq_session_t ();
+
+ private:
+
+ xreq_session_t (const xreq_session_t&);
+ const xreq_session_t &operator = (const xreq_session_t&);
+ };
+
}
#endif
diff --git a/src/xsub.cpp b/src/xsub.cpp
index 4122c67..b24f082 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -213,4 +213,15 @@ void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_,
zmq_assert (sent);
}
+zmq::xsub_session_t::xsub_session_t (io_thread_t *io_thread_, bool connect_,
+ socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ session_base_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+zmq::xsub_session_t::~xsub_session_t ()
+{
+}
diff --git a/src/xsub.hpp b/src/xsub.hpp
index ea59cdb..310df6e 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -22,6 +22,7 @@
#define __ZMQ_XSUB_HPP_INCLUDED__
#include "socket_base.hpp"
+#include "session_base.hpp"
#include "dist.hpp"
#include "fq.hpp"
#include "trie.hpp"
@@ -83,6 +84,21 @@ namespace zmq
const xsub_t &operator = (const xsub_t&);
};
+ class xsub_session_t : public session_base_t
+ {
+ public:
+
+ xsub_session_t (class io_thread_t *io_thread_, bool connect_,
+ class socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~xsub_session_t ();
+
+ private:
+
+ xsub_session_t (const xsub_session_t&);
+ const xsub_session_t &operator = (const xsub_session_t&);
+ };
+
}
#endif