summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
authormalosek <malosek@fastmq.com>2009-09-16 10:11:01 +0200
committermalosek <malosek@fastmq.com>2009-09-16 10:11:01 +0200
commit969522bbf55467b6f6e8113be28451d087060843 (patch)
tree6a78392b4f76bca99b54c1eb7d44550b4dee34c5 /src/socket_base.cpp
parent0381a78c0484012e760d61051f325c71136df17e (diff)
added OpenPGM receiver - ZMQ_SUB
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp75
1 files changed, 67 insertions, 8 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 900f1c5..c195e91 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -37,6 +37,7 @@
#include "err.hpp"
#include "platform.hpp"
#include "pgm_sender.hpp"
+#include "pgm_receiver.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) :
object_t (parent_),
@@ -156,6 +157,14 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
options.recovery_ivl = (uint32_t) *((int64_t*) optval_);
return 0;
+ case ZMQ_MCAST_LOOP:
+ if (optvallen_ != sizeof (bool)) {
+ errno = EINVAL;
+ return -1;
+ }
+ options.use_multicast_loop = optval_;
+ return 0;
+
default:
errno = EINVAL;
return -1;
@@ -164,15 +173,43 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
int zmq::socket_base_t::bind (const char *addr_)
{
- zmq_listener_t *listener = new zmq_listener_t (
- choose_io_thread (options.affinity), this, options);
- int rc = listener->set_address (addr_);
- if (rc != 0)
+ // Parse addr_ string.
+ std::string addr_type;
+ std::string addr_args;
+
+ std::string addr (addr_);
+ std::string::size_type pos = addr.find ("://");
+
+ if (pos == std::string::npos) {
+ errno = EINVAL;
return -1;
+ }
- send_plug (listener);
- send_own (this, listener);
- return 0;
+ addr_type = addr.substr (0, pos);
+ addr_args = addr.substr (pos + 3);
+
+ if (addr_type == "tcp") {
+ zmq_listener_t *listener = new zmq_listener_t (
+ choose_io_thread (options.affinity), this, options);
+ int rc = listener->set_address (addr_args.c_str ());
+ if (rc != 0)
+ return -1;
+
+ send_plug (listener);
+ send_own (this, listener);
+ return 0;
+ }
+
+#if defined ZMQ_HAVE_OPENPGM
+ if (addr_type == "pgm") {
+ // In the case of PGM bind behaves the same like connect.
+ return connect (addr_);
+ }
+#endif
+
+ // Unknown address type.
+ errno = EFAULT;
+ return -1;
}
int zmq::socket_base_t::connect (const char *addr_)
@@ -246,6 +283,8 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "pgm") {
switch (type) {
+
+ // PGM sender.
case ZMQ_PUB:
{
pgm_sender_t *pgm_sender =
@@ -266,9 +305,29 @@ int zmq::socket_base_t::connect (const char *addr_)
break;
}
+
+ // PGM receiver.
case ZMQ_SUB:
- zmq_assert (false);
+ {
+ pgm_receiver_t *pgm_receiver =
+ new pgm_receiver_t (choose_io_thread (options.affinity), options,
+ session_name.c_str ());
+
+ int rc = pgm_receiver->init (addr_args.c_str ());
+ if (rc != 0) {
+ delete pgm_receiver;
+ return -1;
+ }
+
+ // Reserve a sequence number for following 'attach' command.
+ session->inc_seqnum ();
+ send_attach (session, pgm_receiver);
+
+ pgm_receiver = NULL;
+
break;
+ }
+
default:
errno = EINVAL;
return -1;