From 7c1dca546d9e49e7af372e4fff9e6a87058a7f12 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 24 Jul 2011 18:25:30 +0200 Subject: Session classes merged into a single class Removal of ZMQ_IDENTITY resulted in various session classes doing almost the same thing. This patch merges the classes into a single class. Signed-off-by: Martin Sustrik --- src/pgm_receiver.cpp | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) (limited to 'src/pgm_receiver.cpp') diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index b859241..dcd002e 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -29,9 +29,10 @@ #endif #include "pgm_receiver.hpp" -#include "err.hpp" +#include "session.hpp" #include "stdint.hpp" #include "wire.hpp" +#include "err.hpp" zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, const options_t &options_) : @@ -39,7 +40,7 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, has_rx_timer (false), pgm_socket (true, options_), options (options_), - sink (NULL), + session (NULL), mru_decoder (NULL), pending_bytes (0) { @@ -56,7 +57,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_) return pgm_socket.init (udp_encapsulation_, network_); } -void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) +void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, session_t *session_) { // Retrieve PGM fds and start polling. int socket_fd; @@ -67,7 +68,7 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) set_pollin (pipe_handle); set_pollin (socket_handle); - sink = sink_; + session = session_; // If there are any subscriptions already queued in the session, drop them. drop_subscriptions (); @@ -93,7 +94,7 @@ void zmq::pgm_receiver_t::unplug () rm_fd (socket_handle); rm_fd (pipe_handle); - sink = NULL; + session = NULL; } void zmq::pgm_receiver_t::terminate () @@ -220,7 +221,7 @@ void zmq::pgm_receiver_t::in_event () it->second.decoder = new (std::nothrow) decoder_t (0, options.maxmsgsize); alloc_assert (it->second.decoder); - it->second.decoder->set_sink (sink); + it->second.decoder->set_session (session); } mru_decoder = it->second.decoder; @@ -246,7 +247,7 @@ void zmq::pgm_receiver_t::in_event () } // Flush any messages decoder may have produced. - sink->flush (); + session->flush (); } void zmq::pgm_receiver_t::timer_event (int token) @@ -261,7 +262,7 @@ void zmq::pgm_receiver_t::timer_event (int token) void zmq::pgm_receiver_t::drop_subscriptions () { msg_t msg; - while (sink->read (&msg)) + while (session->read (&msg)) msg.close (); } -- cgit v1.2.3