summaryrefslogtreecommitdiff
path: root/src/pgm_receiver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/pgm_receiver.cpp')
-rw-r--r--src/pgm_receiver.cpp68
1 files changed, 55 insertions, 13 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 048c529..4fadadc 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -1,19 +1,20 @@
/*
- Copyright (c) 2007-2010 iMatix Corporation
+ Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
+ the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
+ GNU Lesser General Public License for more details.
- You should have received a copy of the Lesser GNU General Public License
+ You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
@@ -36,6 +37,7 @@
zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
const options_t &options_) :
io_object_t (parent_),
+ has_rx_timer (false),
pgm_socket (true, options_),
options (options_),
inout (NULL),
@@ -55,7 +57,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
return pgm_socket.init (udp_encapsulation_, network_);
}
-void zmq::pgm_receiver_t::plug (i_inout *inout_)
+void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_inout *inout_)
{
// Retrieve PGM fds and start polling.
int socket_fd;
@@ -72,7 +74,7 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_)
void zmq::pgm_receiver_t::unplug ()
{
// Delete decoders.
- for (peers_t::iterator it = peers.begin (); it != peers.end (); it++) {
+ for (peers_t::iterator it = peers.begin (); it != peers.end (); ++it) {
if (it->second.decoder != NULL)
delete it->second.decoder;
}
@@ -81,19 +83,29 @@ void zmq::pgm_receiver_t::unplug ()
mru_decoder = NULL;
pending_bytes = 0;
- // Stop polling.
+ if (has_rx_timer) {
+ cancel_timer (rx_timer_id);
+ has_rx_timer = false;
+ }
+
rm_fd (socket_handle);
rm_fd (pipe_handle);
inout = NULL;
}
-void zmq::pgm_receiver_t::revive ()
+void zmq::pgm_receiver_t::terminate ()
+{
+ unplug ();
+ delete this;
+}
+
+void zmq::pgm_receiver_t::activate_out ()
{
zmq_assert (false);
}
-void zmq::pgm_receiver_t::resume_input ()
+void zmq::pgm_receiver_t::activate_in ()
{
// It is possible that the most recently used decoder
// processed the whole buffer but failed to write
@@ -129,17 +141,31 @@ void zmq::pgm_receiver_t::in_event ()
zmq_assert (pending_bytes == 0);
+ if (has_rx_timer) {
+ cancel_timer (rx_timer_id);
+ has_rx_timer = false;
+ }
+
// TODO: This loop can effectively block other engines in the same I/O
// thread in the case of high load.
while (true) {
// Get new batch of data.
- ssize_t received = pgm_socket.receive ((void**) &data, &tsi);
+ // Note the workaround made not to break strict-aliasing rules.
+ void *tmp = NULL;
+ ssize_t received = pgm_socket.receive (&tmp, &tsi);
+ data = (unsigned char*) tmp;
// No data to process. This may happen if the packet received is
// neither ODATA nor ODATA.
- if (received == 0)
+ if (received == 0) {
+ if (errno == ENOMEM || errno == EBUSY) {
+ const long timeout = pgm_socket.get_rx_timeout ();
+ add_timer (timeout, rx_timer_id);
+ has_rx_timer = true;
+ }
break;
+ }
// Find the peer based on its TSI.
peers_t::iterator it = peers.find (*tsi);
@@ -161,7 +187,7 @@ void zmq::pgm_receiver_t::in_event ()
// New peer. Add it to the list of know but unjoint peers.
if (it == peers.end ()) {
peer_info_t peer_info = {false, NULL};
- it = peers.insert (std::make_pair (*tsi, peer_info)).first;
+ it = peers.insert (peers_t::value_type (*tsi, peer_info)).first;
}
// Read the offset of the fist message in the current packet.
@@ -189,7 +215,8 @@ void zmq::pgm_receiver_t::in_event ()
it->second.joined = true;
// Create and connect decoder for the peer.
- it->second.decoder = new (std::nothrow) zmq_decoder_t (0);
+ it->second.decoder = new (std::nothrow) decoder_t (0);
+ alloc_assert (it->second.decoder);
it->second.decoder->set_inout (inout);
}
@@ -205,6 +232,12 @@ void zmq::pgm_receiver_t::in_event ()
reset_pollin (pipe_handle);
reset_pollin (socket_handle);
+ // Reset outstanding timer.
+ if (has_rx_timer) {
+ cancel_timer (rx_timer_id);
+ has_rx_timer = false;
+ }
+
break;
}
}
@@ -213,5 +246,14 @@ void zmq::pgm_receiver_t::in_event ()
inout->flush ();
}
+void zmq::pgm_receiver_t::timer_event (int token)
+{
+ zmq_assert (token == rx_timer_id);
+
+ // Timer cancels on return by poller_base.
+ has_rx_timer = false;
+ in_event ();
+}
+
#endif