From c7542981d18b13b251d5a3129f1ec7ba24aeb9a1 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 11 Jul 2011 10:18:30 +0200 Subject: PGM transport reconciled with subscription forwarding As PGM is not capable of passing subscriptions upstream, subscriptions are ignored at sub side and engine subscribes for all messages on pub side. Signed-off-by: Martin Sustrik --- src/pgm_receiver.cpp | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) (limited to 'src/pgm_receiver.cpp') diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 1fd687a..b859241 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -68,6 +68,9 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) set_pollin (socket_handle); sink = sink_; + + // If there are any subscriptions already queued in the session, drop them. + drop_subscriptions (); } void zmq::pgm_receiver_t::unplug () @@ -101,7 +104,7 @@ void zmq::pgm_receiver_t::terminate () void zmq::pgm_receiver_t::activate_out () { - zmq_assert (false); + drop_subscriptions (); } void zmq::pgm_receiver_t::activate_in () @@ -255,5 +258,12 @@ void zmq::pgm_receiver_t::timer_event (int token) in_event (); } +void zmq::pgm_receiver_t::drop_subscriptions () +{ + msg_t msg; + while (sink->read (&msg)) + msg.close (); +} + #endif -- cgit v1.2.3