From 969522bbf55467b6f6e8113be28451d087060843 Mon Sep 17 00:00:00 2001 From: malosek Date: Wed, 16 Sep 2009 10:11:01 +0200 Subject: added OpenPGM receiver - ZMQ_SUB --- src/socket_base.cpp | 75 +++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 67 insertions(+), 8 deletions(-) (limited to 'src/socket_base.cpp') diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 900f1c5..c195e91 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -37,6 +37,7 @@ #include "err.hpp" #include "platform.hpp" #include "pgm_sender.hpp" +#include "pgm_receiver.hpp" zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) : object_t (parent_), @@ -156,6 +157,14 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, options.recovery_ivl = (uint32_t) *((int64_t*) optval_); return 0; + case ZMQ_MCAST_LOOP: + if (optvallen_ != sizeof (bool)) { + errno = EINVAL; + return -1; + } + options.use_multicast_loop = optval_; + return 0; + default: errno = EINVAL; return -1; @@ -164,15 +173,43 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, int zmq::socket_base_t::bind (const char *addr_) { - zmq_listener_t *listener = new zmq_listener_t ( - choose_io_thread (options.affinity), this, options); - int rc = listener->set_address (addr_); - if (rc != 0) + // Parse addr_ string. + std::string addr_type; + std::string addr_args; + + std::string addr (addr_); + std::string::size_type pos = addr.find ("://"); + + if (pos == std::string::npos) { + errno = EINVAL; return -1; + } - send_plug (listener); - send_own (this, listener); - return 0; + addr_type = addr.substr (0, pos); + addr_args = addr.substr (pos + 3); + + if (addr_type == "tcp") { + zmq_listener_t *listener = new zmq_listener_t ( + choose_io_thread (options.affinity), this, options); + int rc = listener->set_address (addr_args.c_str ()); + if (rc != 0) + return -1; + + send_plug (listener); + send_own (this, listener); + return 0; + } + +#if defined ZMQ_HAVE_OPENPGM + if (addr_type == "pgm") { + // In the case of PGM bind behaves the same like connect. + return connect (addr_); + } +#endif + + // Unknown address type. + errno = EFAULT; + return -1; } int zmq::socket_base_t::connect (const char *addr_) @@ -246,6 +283,8 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "pgm") { switch (type) { + + // PGM sender. case ZMQ_PUB: { pgm_sender_t *pgm_sender = @@ -266,9 +305,29 @@ int zmq::socket_base_t::connect (const char *addr_) break; } + + // PGM receiver. case ZMQ_SUB: - zmq_assert (false); + { + pgm_receiver_t *pgm_receiver = + new pgm_receiver_t (choose_io_thread (options.affinity), options, + session_name.c_str ()); + + int rc = pgm_receiver->init (addr_args.c_str ()); + if (rc != 0) { + delete pgm_receiver; + return -1; + } + + // Reserve a sequence number for following 'attach' command. + session->inc_seqnum (); + send_attach (session, pgm_receiver); + + pgm_receiver = NULL; + break; + } + default: errno = EINVAL; return -1; -- cgit v1.2.3