From 1a4d6f91194c52795808baa07dcd61a20ff599be Mon Sep 17 00:00:00 2001 From: malosek Date: Fri, 11 Sep 2009 17:58:37 +0200 Subject: added OpenPGM sender - ZMQ_PUB --- src/socket_base.cpp | 105 +++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 92 insertions(+), 13 deletions(-) (limited to 'src/socket_base.cpp') diff --git a/src/socket_base.cpp b/src/socket_base.cpp index b4f7d6b..9412d67 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -17,6 +17,8 @@ along with this program. If not, see . */ +#include + #include #include @@ -35,9 +37,12 @@ #include "uuid.hpp" #include "pipe.hpp" #include "err.hpp" +#include "platform.hpp" +#include "pgm_sender.hpp" -zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : +zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) : object_t (parent_), + type (type_), current (0), active (0), pending_term_acks (0), @@ -145,6 +150,22 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, errno = EFAULT; return -1; + case ZMQ_RATE: + if (optvallen_ != sizeof (uint32_t)) { + errno = EINVAL; + return -1; + } + options.rate = *((int32_t*) optval_); + return 0; + + case ZMQ_RECOVERY_IVL: + if (optvallen_ != sizeof (uint32_t)) { + errno = EINVAL; + return -1; + } + options.recovery_ivl = *((int32_t*) optval_); + return 0; + default: errno = EINVAL; return -1; @@ -170,6 +191,21 @@ int zmq::socket_base_t::connect (const char *addr_) std::string session_name ("#"); session_name += uuid_t ().to_string (); + // Parse addr_ string. + std::string addr_type; + std::string addr_args; + + std::string addr (addr_); + std::string::size_type pos = addr.find ("://"); + + if (pos == std::string::npos) { + errno = EINVAL; + return -1; + } + + addr_type = addr.substr (0, pos); + addr_args = addr.substr (pos + 3); + // Create the session. io_thread_t *io_thread = choose_io_thread (options.affinity); session_t *session = new session_t (io_thread, this, session_name.c_str (), @@ -198,20 +234,63 @@ int zmq::socket_base_t::connect (const char *addr_) send_plug (session); send_own (this, session); - // Create the connecter object. Supply it with the session name so that - // it can bind the new connection to the session once it is established. - zmq_connecter_t *connecter = new zmq_connecter_t ( - choose_io_thread (options.affinity), this, options, - session_name.c_str ()); - int rc = connecter->set_address (addr_); - if (rc != 0) { - delete connecter; - return -1; + if (addr_type == "tcp") { + + // Create the connecter object. Supply it with the session name so that + // it can bind the new connection to the session once it is established. + zmq_connecter_t *connecter = new zmq_connecter_t ( + choose_io_thread (options.affinity), this, options, + session_name.c_str ()); + int rc = connecter->set_address (addr_args.c_str ()); + if (rc != 0) { + delete connecter; + return -1; + } + send_plug (connecter); + send_own (this, connecter); + + return 0; } - send_plug (connecter); - send_own (this, connecter); - return 0; +#if defined ZMQ_HAVE_OPENPGM + if (addr_type == "pgm") { + + switch (type) { + case ZMQ_PUB: + { + pgm_sender_t *pgm_sender = + new pgm_sender_t (choose_io_thread (options.affinity), options, + session_name.c_str ()); + + int rc = pgm_sender->init (addr_args.c_str ()); + if (rc != 0) { + delete pgm_sender; + return -1; + } + + // Reserve a sequence number for following 'attach' command. + session->inc_seqnum (); + send_attach (session, pgm_sender); + + pgm_sender = NULL; + + break; + } + case ZMQ_SUB: + zmq_assert (false); + break; + default: + errno = EINVAL; + return -1; + } + + return 0; + } +#endif + + // Unknown address type. + errno = ENOTSUP; + return -1; } int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) -- cgit v1.2.3