summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/pair.cpp2
-rw-r--r--src/pair.hpp2
-rw-r--r--src/pgm_sender.cpp10
-rw-r--r--src/pull.cpp2
-rw-r--r--src/pull.hpp2
-rw-r--r--src/push.cpp2
-rw-r--r--src/push.hpp2
-rw-r--r--src/socket_base.cpp12
-rw-r--r--src/socket_base.hpp5
-rw-r--r--src/xpub.cpp7
-rw-r--r--src/xpub.hpp2
-rw-r--r--src/xrep.cpp2
-rw-r--r--src/xrep.hpp2
-rw-r--r--src/xreq.cpp2
-rw-r--r--src/xreq.hpp2
-rw-r--r--src/xsub.cpp2
-rw-r--r--src/xsub.hpp2
17 files changed, 31 insertions, 29 deletions
diff --git a/src/pair.cpp b/src/pair.cpp
index aa64ce9..dd14d8d 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -36,7 +36,7 @@ xs::pair_t::~pair_t ()
xs_assert (!pipe);
}
-void xs::pair_t::xattach_pipe (pipe_t *pipe_)
+void xs::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (!pipe);
pipe = pipe_;
diff --git a/src/pair.hpp b/src/pair.hpp
index c03ee1e..8ae3acf 100644
--- a/src/pair.hpp
+++ b/src/pair.hpp
@@ -42,7 +42,7 @@ namespace xs
~pair_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipe (xs::pipe_t *pipe_);
+ void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
int xrecv (xs::msg_t *msg_, int flags_);
bool xhas_in ();
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index f68c044..dde6284 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -92,16 +92,6 @@ void xs::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
// Set POLLOUT for downlink_socket_handle.
set_pollout (handle);
-
- // PGM is not able to pass subscriptions upstream, thus we have no idea
- // what messages are peers interested in. Because of that we have to
- // subscribe for all the messages.
- msg_t msg;
- msg.init_size (1);
- *(unsigned char*) msg.data () = 1;
- int rc = session_->write (&msg);
- errno_assert (rc == 0);
- session_->flush ();
}
void xs::pgm_sender_t::unplug ()
diff --git a/src/pull.cpp b/src/pull.cpp
index 9495e12..e168c03 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -34,7 +34,7 @@ xs::pull_t::~pull_t ()
{
}
-void xs::pull_t::xattach_pipe (pipe_t *pipe_)
+void xs::pull_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
fq.attach (pipe_);
diff --git a/src/pull.hpp b/src/pull.hpp
index 446956c..29ca4d9 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -45,7 +45,7 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
- void xattach_pipe (xs::pipe_t *pipe_);
+ void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xrecv (xs::msg_t *msg_, int flags_);
bool xhas_in ();
void xread_activated (xs::pipe_t *pipe_);
diff --git a/src/push.cpp b/src/push.cpp
index b0002e9..45f0c62 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -34,7 +34,7 @@ xs::push_t::~push_t ()
{
}
-void xs::push_t::xattach_pipe (pipe_t *pipe_)
+void xs::push_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
lb.attach (pipe_);
diff --git a/src/push.hpp b/src/push.hpp
index f90479c..85c2279 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -45,7 +45,7 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
- void xattach_pipe (xs::pipe_t *pipe_);
+ void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
bool xhas_out ();
void xwrite_activated (xs::pipe_t *pipe_);
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index a321a71..f3ae291 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -206,14 +206,14 @@ int xs::socket_base_t::check_protocol (const std::string &protocol_)
return 0;
}
-void xs::socket_base_t::attach_pipe (pipe_t *pipe_)
+void xs::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_)
{
// First, register the pipe so that we can terminate it later on.
pipe_->set_event_sink (this);
pipes.push_back (pipe_);
// Let the derived socket type know about new pipe.
- xattach_pipe (pipe_);
+ xattach_pipe (pipe_, icanhasall_);
// If the socket is already being closed, ask any new pipes to terminate
// straight away.
@@ -452,8 +452,14 @@ int xs::socket_base_t::connect (const char *addr_)
rc = pipepair (parents, pipes, hwms, delays);
errno_assert (rc == 0);
+ // PGM does not support subscription forwarding; ask for all data to be
+ // sent to this pipe.
+ bool icanhasall = false;
+ if (protocol == "pgm" || protocol == "epgm")
+ icanhasall = true;
+
// Attach local end of the pipe to the socket object.
- attach_pipe (pipes [0]);
+ attach_pipe (pipes [0], icanhasall);
// Attach remote end of the pipe to the session object later on.
session->attach_pipe (pipes [1]);
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 01a1564..1e35ffa 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -102,7 +102,8 @@ namespace xs
// Concrete algorithms for the x- methods are to be defined by
// individual socket types.
- virtual void xattach_pipe (xs::pipe_t *pipe_) = 0;
+ virtual void xattach_pipe (xs::pipe_t *pipe_,
+ bool icanhasall_ = false) = 0;
// The default implementation assumes there are no specific socket
// options for the particular socket type. If not so, overload this
@@ -157,7 +158,7 @@ namespace xs
int check_protocol (const std::string &protocol_);
// Register the pipe with this socket.
- void attach_pipe (xs::pipe_t *pipe_);
+ void attach_pipe (xs::pipe_t *pipe_, bool icanhasall_ = false);
// Processes commands sent to this socket (if any). If timeout is -1,
// returns only after at least one command was processed.
diff --git a/src/xpub.cpp b/src/xpub.cpp
index bf73399..b4bc135 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -37,11 +37,16 @@ xs::xpub_t::~xpub_t ()
{
}
-void xs::xpub_t::xattach_pipe (pipe_t *pipe_)
+void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
dist.attach (pipe_);
+ // If icanhasall_ is specified, the caller would like to subscribe
+ // to all data on this pipe, implicitly.
+ if (icanhasall_)
+ subscriptions.add (NULL, 0, pipe_);
+
// The pipe is active when attached. Let's read the subscriptions from
// it, if any.
xread_activated (pipe_);
diff --git a/src/xpub.hpp b/src/xpub.hpp
index 92f97a1..a26076e 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -48,7 +48,7 @@ namespace xs
~xpub_t ();
// Implementations of virtual functions from socket_base_t.
- void xattach_pipe (xs::pipe_t *pipe_);
+ void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
bool xhas_out ();
int xrecv (xs::msg_t *msg_, int flags_);
diff --git a/src/xrep.cpp b/src/xrep.cpp
index d65f21b..051159d 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -55,7 +55,7 @@ xs::xrep_t::~xrep_t ()
prefetched_msg.close ();
}
-void xs::xrep_t::xattach_pipe (pipe_t *pipe_)
+void xs::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/xrep.hpp b/src/xrep.hpp
index d73549b..d8723db 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -48,7 +48,7 @@ namespace xs
~xrep_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipe (xs::pipe_t *pipe_);
+ void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (msg_t *msg_, int flags_);
int xrecv (msg_t *msg_, int flags_);
bool xhas_in ();
diff --git a/src/xreq.cpp b/src/xreq.cpp
index eaf318a..d16f60e 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -46,7 +46,7 @@ xs::xreq_t::~xreq_t ()
prefetched_msg.close ();
}
-void xs::xreq_t::xattach_pipe (pipe_t *pipe_)
+void xs::xreq_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
fq.attach (pipe_);
diff --git a/src/xreq.hpp b/src/xreq.hpp
index 837cc8a..ab3a360 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -46,7 +46,7 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
- void xattach_pipe (xs::pipe_t *pipe_);
+ void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
int xrecv (xs::msg_t *msg_, int flags_);
bool xhas_in ();
diff --git a/src/xsub.cpp b/src/xsub.cpp
index 7797bdf..80cc205 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -48,7 +48,7 @@ xs::xsub_t::~xsub_t ()
errno_assert (rc == 0);
}
-void xs::xsub_t::xattach_pipe (pipe_t *pipe_)
+void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
fq.attach (pipe_);
diff --git a/src/xsub.hpp b/src/xsub.hpp
index 6b1e26b..f9e5210 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -45,7 +45,7 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
- void xattach_pipe (xs::pipe_t *pipe_);
+ void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
bool xhas_out ();
int xrecv (xs::msg_t *msg_, int flags_);