summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/pgm_receiver.cpp12
-rw-r--r--src/pgm_receiver.hpp4
-rw-r--r--src/pgm_sender.cpp9
3 files changed, 24 insertions, 1 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 1fd687a..b859241 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -68,6 +68,9 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
set_pollin (socket_handle);
sink = sink_;
+
+ // If there are any subscriptions already queued in the session, drop them.
+ drop_subscriptions ();
}
void zmq::pgm_receiver_t::unplug ()
@@ -101,7 +104,7 @@ void zmq::pgm_receiver_t::terminate ()
void zmq::pgm_receiver_t::activate_out ()
{
- zmq_assert (false);
+ drop_subscriptions ();
}
void zmq::pgm_receiver_t::activate_in ()
@@ -255,5 +258,12 @@ void zmq::pgm_receiver_t::timer_event (int token)
in_event ();
}
+void zmq::pgm_receiver_t::drop_subscriptions ()
+{
+ msg_t msg;
+ while (sink->read (&msg))
+ msg.close ();
+}
+
#endif
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index 825e0c1..aa010dd 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -64,6 +64,10 @@ namespace zmq
private:
+ // PGM is not able to move subscriptions upstream. Thus, drop all
+ // the pending subscriptions.
+ void drop_subscriptions ();
+
// RX timeout timer ID.
enum {rx_timer_id = 0xa1};
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 314a0b4..9b1e215 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -88,6 +88,15 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
// Set POLLOUT for downlink_socket_handle.
set_pollout (handle);
+
+ // PGM is not able to pass subscriptions upstream, thus we have no idea
+ // what messages are peers interested in. Because of that we have to
+ // subscribe for all the messages.
+ msg_t msg;
+ msg.init ();
+ bool ok = sink_->write (&msg);
+ zmq_assert (ok);
+ sink_->flush ();
}
void zmq::pgm_sender_t::unplug ()