diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/pgm_receiver.cpp | 12 | ||||
-rw-r--r-- | src/pgm_receiver.hpp | 4 | ||||
-rw-r--r-- | src/pgm_sender.cpp | 9 |
3 files changed, 24 insertions, 1 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 1fd687a..b859241 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -68,6 +68,9 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) set_pollin (socket_handle); sink = sink_; + + // If there are any subscriptions already queued in the session, drop them. + drop_subscriptions (); } void zmq::pgm_receiver_t::unplug () @@ -101,7 +104,7 @@ void zmq::pgm_receiver_t::terminate () void zmq::pgm_receiver_t::activate_out () { - zmq_assert (false); + drop_subscriptions (); } void zmq::pgm_receiver_t::activate_in () @@ -255,5 +258,12 @@ void zmq::pgm_receiver_t::timer_event (int token) in_event (); } +void zmq::pgm_receiver_t::drop_subscriptions () +{ + msg_t msg; + while (sink->read (&msg)) + msg.close (); +} + #endif diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 825e0c1..aa010dd 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -64,6 +64,10 @@ namespace zmq private: + // PGM is not able to move subscriptions upstream. Thus, drop all + // the pending subscriptions. + void drop_subscriptions (); + // RX timeout timer ID. enum {rx_timer_id = 0xa1}; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 314a0b4..9b1e215 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -88,6 +88,15 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) // Set POLLOUT for downlink_socket_handle. set_pollout (handle); + + // PGM is not able to pass subscriptions upstream, thus we have no idea + // what messages are peers interested in. Because of that we have to + // subscribe for all the messages. + msg_t msg; + msg.init (); + bool ok = sink_->write (&msg); + zmq_assert (ok); + sink_->flush (); } void zmq::pgm_sender_t::unplug () |