summaryrefslogtreecommitdiff
path: root/src/pgm_receiver.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-07-24 18:25:30 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-07-24 18:25:30 +0200
commit7c1dca546d9e49e7af372e4fff9e6a87058a7f12 (patch)
treef00c6760dcd14b944457928405e7e2eca23b1ff8 /src/pgm_receiver.cpp
parentf716b571baf59c1b622c7666bb8bf2905126a3d4 (diff)
Session classes merged into a single class
Removal of ZMQ_IDENTITY resulted in various session classes doing almost the same thing. This patch merges the classes into a single class. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/pgm_receiver.cpp')
-rw-r--r--src/pgm_receiver.cpp17
1 files changed, 9 insertions, 8 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index b859241..dcd002e 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -29,9 +29,10 @@
#endif
#include "pgm_receiver.hpp"
-#include "err.hpp"
+#include "session.hpp"
#include "stdint.hpp"
#include "wire.hpp"
+#include "err.hpp"
zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
const options_t &options_) :
@@ -39,7 +40,7 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
has_rx_timer (false),
pgm_socket (true, options_),
options (options_),
- sink (NULL),
+ session (NULL),
mru_decoder (NULL),
pending_bytes (0)
{
@@ -56,7 +57,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
return pgm_socket.init (udp_encapsulation_, network_);
}
-void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
+void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, session_t *session_)
{
// Retrieve PGM fds and start polling.
int socket_fd;
@@ -67,7 +68,7 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
set_pollin (pipe_handle);
set_pollin (socket_handle);
- sink = sink_;
+ session = session_;
// If there are any subscriptions already queued in the session, drop them.
drop_subscriptions ();
@@ -93,7 +94,7 @@ void zmq::pgm_receiver_t::unplug ()
rm_fd (socket_handle);
rm_fd (pipe_handle);
- sink = NULL;
+ session = NULL;
}
void zmq::pgm_receiver_t::terminate ()
@@ -220,7 +221,7 @@ void zmq::pgm_receiver_t::in_event ()
it->second.decoder = new (std::nothrow) decoder_t (0,
options.maxmsgsize);
alloc_assert (it->second.decoder);
- it->second.decoder->set_sink (sink);
+ it->second.decoder->set_session (session);
}
mru_decoder = it->second.decoder;
@@ -246,7 +247,7 @@ void zmq::pgm_receiver_t::in_event ()
}
// Flush any messages decoder may have produced.
- sink->flush ();
+ session->flush ();
}
void zmq::pgm_receiver_t::timer_event (int token)
@@ -261,7 +262,7 @@ void zmq::pgm_receiver_t::timer_event (int token)
void zmq::pgm_receiver_t::drop_subscriptions ()
{
msg_t msg;
- while (sink->read (&msg))
+ while (session->read (&msg))
msg.close ();
}