summaryrefslogtreecommitdiff
path: root/src/pgm_receiver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/pgm_receiver.cpp')
-rw-r--r--src/pgm_receiver.cpp176
1 files changed, 91 insertions, 85 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index f68a909..50a8ff9 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -46,26 +46,21 @@
zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
const options_t &options_, const char *session_name_) :
io_object_t (parent_),
- decoder (NULL),
pgm_socket (true, options_),
options (options_),
session_name (session_name_),
- joined (false),
inout (NULL)
{
}
zmq::pgm_receiver_t::~pgm_receiver_t ()
{
- if (decoder)
- delete decoder;
+ // Destructor should not be called before unplug.
+ zmq_assert (peers.empty ());
}
int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
{
- decoder = new zmq_decoder_t;
- zmq_assert (decoder);
-
return pgm_socket.init (udp_encapsulation_, network_);
}
@@ -75,8 +70,6 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_)
int socket_fd;
int waiting_pipe_fd;
- decoder->set_inout (inout_);
-
// Fill socket_fd and waiting_pipe_fd from PGM transport
pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
@@ -95,9 +88,16 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_)
void zmq::pgm_receiver_t::unplug ()
{
+ // Delete decoders.
+ for (peer_t::iterator it = peers.begin (); it != peers.end (); it++) {
+ if (it->second.decoder != NULL)
+ delete it->second.decoder;
+ }
+
+ peers.clear ();
+
rm_fd (socket_handle);
rm_fd (pipe_handle);
- decoder->set_inout (NULL);
inout = NULL;
}
@@ -106,102 +106,108 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
-void zmq::pgm_receiver_t::reconnect ()
-{
- // Save inout ptr.
- i_inout *inout_tmp = inout;
-
- // PGM receiver is not joined anymore.
- joined = false;
-
- // Unplug - plug PGM transport.
- unplug ();
- delete decoder;
- decoder = new zmq_decoder_t;
- zmq_assert (decoder);
- plug (inout_tmp);
-}
-
// POLLIN event from socket or waiting_pipe.
void zmq::pgm_receiver_t::in_event ()
{
- void *data_with_offset;
+ // Iterator to peers map.
+ peer_t::iterator it;
+
+ // Data from PGM socket.
+ unsigned char *raw_data = NULL;
+ const pgm_tsi_t *tsi = NULL;
ssize_t nbytes = 0;
- // Read all data from pgm socket.
- while ((nbytes = receive_with_offset (&data_with_offset)) > 0) {
-
- // Push all the data to the decoder.
- decoder->write ((unsigned char*)data_with_offset, nbytes);
- }
+ do {
- // Flush any messages decoder may have produced to the dispatcher.
- inout->flush ();
+ // Read data from underlying pgm_socket.
+ nbytes = pgm_socket.receive ((void**) &raw_data, &tsi);
- // Data loss detected.
- if (nbytes == -1) {
+ // No ODATA or RDATA.
+ if (!nbytes)
+ break;
- // Recreate PGM transport.
- reconnect ();
- }
-}
+ // Fid TSI in peers list.
+ it = peers.find (*tsi);
-void zmq::pgm_receiver_t::out_event ()
-{
- zmq_assert (false);
-}
+ // Data loss.
+ if (nbytes == -1) {
-ssize_t zmq::pgm_receiver_t::receive_with_offset
- (void **data_)
-{
+ zmq_assert (it != peers.end ());
- // Data from PGM socket.
- void *rd = NULL;
- unsigned char *raw_data = NULL;
+ // Delete decoder and set joined to false.
+ it->second.joined = false;
+
+ if (it->second.decoder != NULL) {
+ delete it->second.decoder;
+ it->second.decoder = NULL;
+ }
- // Read data from underlying pgm_socket.
- ssize_t nbytes = pgm_socket.receive ((void**) &rd);
- raw_data = (unsigned char*) rd;
+ break;
+ }
- // No ODATA or RDATA.
- if (!nbytes)
- return 0;
+ // Read offset of the fist message in current APDU.
+ zmq_assert ((size_t) nbytes >= sizeof (uint16_t));
+ uint16_t apdu_offset = get_uint16 (raw_data);
- // Data loss.
- if (nbytes == -1) {
- return -1;
- }
+ // Shift raw_data & decrease nbytes by the first message offset
+ // information (sizeof uint16_t).
+ raw_data += sizeof (uint16_t);
+ nbytes -= sizeof (uint16_t);
+ zmq_assert (apdu_offset <= nbytes);
- // Read offset of the fist message in current APDU.
- uint16_t apdu_offset = get_uint16 (raw_data);
+ // New peer.
+ if (it == peers.end ()) {
- // Shift raw_data & decrease nbytes by the first message offset
- // information (sizeof uint16_t).
- *data_ = raw_data + sizeof (uint16_t);
- nbytes -= sizeof (uint16_t);
+ peer_info_t peer_info = {false, NULL};
+ it = peers.insert (std::make_pair (*tsi, peer_info)).first;
- // There is not beginning of the message in current APDU and we
- // are not joined jet -> throwing data.
- if (apdu_offset == 0xFFFF && !joined) {
- *data_ = NULL;
- return 0;
- }
+ zmq_log (1, "New peer TSI: %s, %s(%i).\n", pgm_print_tsi (tsi),
+ __FILE__, __LINE__);
+ }
- // Now is the possibility to join the stream.
- if (!joined) {
-
- // We have to move data to the begining of the first message.
- *data_ = (unsigned char *)*data_ + apdu_offset;
- nbytes -= apdu_offset;
+ // There is not beginning of the message in current APDU and we
+ // are not joined jet -> throwing data.
+ if (apdu_offset == 0xFFFF && !it->second.joined) {
+ break;
+ }
- // Joined the stream.
- joined = true;
+ // Now is the possibility to join the stream.
+ if (!it->second.joined) {
+
+ zmq_assert (it->second.decoder == NULL);
- zmq_log (2, "joined into the stream, %s(%i)\n",
- __FILE__, __LINE__);
- }
+ // We have to move data to the begining of the first message.
+ raw_data += apdu_offset;
+ nbytes -= apdu_offset;
+
+ // Joined the stream.
+ it->second.joined = true;
+
+ // Create and connect decoder for joined peer.
+ it->second.decoder = new zmq_decoder_t;
+ it->second.decoder->set_inout (inout);
+
+ zmq_log (1, "Peer %s joined into the stream, %s(%i)\n",
+ pgm_print_tsi (tsi), __FILE__, __LINE__);
+ }
+
+ if (nbytes > 0) {
+
+ // Push all the data to the decoder.
+ it->second.decoder->write (raw_data, nbytes);
+ }
+
+ } while (nbytes > 0);
+
+ // Flush any messages decoder may have produced to the dispatcher.
+ inout->flush ();
+
+}
- return nbytes;
+void zmq::pgm_receiver_t::out_event ()
+{
+ zmq_assert (false);
}
+
#endif