From 85cbd7f83c10c70da8fa44fe7673143703f9710d Mon Sep 17 00:00:00 2001 From: malosek Date: Tue, 22 Sep 2009 15:12:51 +0200 Subject: added PGM bus functionality --- src/pgm_receiver.cpp | 176 ++++++++++++++++++++++++++------------------------- src/pgm_receiver.hpp | 37 +++++++---- src/pgm_socket.cpp | 159 ++++++++++++---------------------------------- src/pgm_socket.hpp | 19 +++--- src/platform.hpp.in | 6 ++ 5 files changed, 169 insertions(+), 228 deletions(-) (limited to 'src') diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index f68a909..50a8ff9 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -46,26 +46,21 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, const options_t &options_, const char *session_name_) : io_object_t (parent_), - decoder (NULL), pgm_socket (true, options_), options (options_), session_name (session_name_), - joined (false), inout (NULL) { } zmq::pgm_receiver_t::~pgm_receiver_t () { - if (decoder) - delete decoder; + // Destructor should not be called before unplug. + zmq_assert (peers.empty ()); } int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_) { - decoder = new zmq_decoder_t; - zmq_assert (decoder); - return pgm_socket.init (udp_encapsulation_, network_); } @@ -75,8 +70,6 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_) int socket_fd; int waiting_pipe_fd; - decoder->set_inout (inout_); - // Fill socket_fd and waiting_pipe_fd from PGM transport pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd); @@ -95,9 +88,16 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_) void zmq::pgm_receiver_t::unplug () { + // Delete decoders. + for (peer_t::iterator it = peers.begin (); it != peers.end (); it++) { + if (it->second.decoder != NULL) + delete it->second.decoder; + } + + peers.clear (); + rm_fd (socket_handle); rm_fd (pipe_handle); - decoder->set_inout (NULL); inout = NULL; } @@ -106,102 +106,108 @@ void zmq::pgm_receiver_t::revive () zmq_assert (false); } -void zmq::pgm_receiver_t::reconnect () -{ - // Save inout ptr. - i_inout *inout_tmp = inout; - - // PGM receiver is not joined anymore. - joined = false; - - // Unplug - plug PGM transport. - unplug (); - delete decoder; - decoder = new zmq_decoder_t; - zmq_assert (decoder); - plug (inout_tmp); -} - // POLLIN event from socket or waiting_pipe. void zmq::pgm_receiver_t::in_event () { - void *data_with_offset; + // Iterator to peers map. + peer_t::iterator it; + + // Data from PGM socket. + unsigned char *raw_data = NULL; + const pgm_tsi_t *tsi = NULL; ssize_t nbytes = 0; - // Read all data from pgm socket. - while ((nbytes = receive_with_offset (&data_with_offset)) > 0) { - - // Push all the data to the decoder. - decoder->write ((unsigned char*)data_with_offset, nbytes); - } + do { - // Flush any messages decoder may have produced to the dispatcher. - inout->flush (); + // Read data from underlying pgm_socket. + nbytes = pgm_socket.receive ((void**) &raw_data, &tsi); - // Data loss detected. - if (nbytes == -1) { + // No ODATA or RDATA. + if (!nbytes) + break; - // Recreate PGM transport. - reconnect (); - } -} + // Fid TSI in peers list. + it = peers.find (*tsi); -void zmq::pgm_receiver_t::out_event () -{ - zmq_assert (false); -} + // Data loss. + if (nbytes == -1) { -ssize_t zmq::pgm_receiver_t::receive_with_offset - (void **data_) -{ + zmq_assert (it != peers.end ()); - // Data from PGM socket. - void *rd = NULL; - unsigned char *raw_data = NULL; + // Delete decoder and set joined to false. + it->second.joined = false; + + if (it->second.decoder != NULL) { + delete it->second.decoder; + it->second.decoder = NULL; + } - // Read data from underlying pgm_socket. - ssize_t nbytes = pgm_socket.receive ((void**) &rd); - raw_data = (unsigned char*) rd; + break; + } - // No ODATA or RDATA. - if (!nbytes) - return 0; + // Read offset of the fist message in current APDU. + zmq_assert ((size_t) nbytes >= sizeof (uint16_t)); + uint16_t apdu_offset = get_uint16 (raw_data); - // Data loss. - if (nbytes == -1) { - return -1; - } + // Shift raw_data & decrease nbytes by the first message offset + // information (sizeof uint16_t). + raw_data += sizeof (uint16_t); + nbytes -= sizeof (uint16_t); + zmq_assert (apdu_offset <= nbytes); - // Read offset of the fist message in current APDU. - uint16_t apdu_offset = get_uint16 (raw_data); + // New peer. + if (it == peers.end ()) { - // Shift raw_data & decrease nbytes by the first message offset - // information (sizeof uint16_t). - *data_ = raw_data + sizeof (uint16_t); - nbytes -= sizeof (uint16_t); + peer_info_t peer_info = {false, NULL}; + it = peers.insert (std::make_pair (*tsi, peer_info)).first; - // There is not beginning of the message in current APDU and we - // are not joined jet -> throwing data. - if (apdu_offset == 0xFFFF && !joined) { - *data_ = NULL; - return 0; - } + zmq_log (1, "New peer TSI: %s, %s(%i).\n", pgm_print_tsi (tsi), + __FILE__, __LINE__); + } - // Now is the possibility to join the stream. - if (!joined) { - - // We have to move data to the begining of the first message. - *data_ = (unsigned char *)*data_ + apdu_offset; - nbytes -= apdu_offset; + // There is not beginning of the message in current APDU and we + // are not joined jet -> throwing data. + if (apdu_offset == 0xFFFF && !it->second.joined) { + break; + } - // Joined the stream. - joined = true; + // Now is the possibility to join the stream. + if (!it->second.joined) { + + zmq_assert (it->second.decoder == NULL); - zmq_log (2, "joined into the stream, %s(%i)\n", - __FILE__, __LINE__); - } + // We have to move data to the begining of the first message. + raw_data += apdu_offset; + nbytes -= apdu_offset; + + // Joined the stream. + it->second.joined = true; + + // Create and connect decoder for joined peer. + it->second.decoder = new zmq_decoder_t; + it->second.decoder->set_inout (inout); + + zmq_log (1, "Peer %s joined into the stream, %s(%i)\n", + pgm_print_tsi (tsi), __FILE__, __LINE__); + } + + if (nbytes > 0) { + + // Push all the data to the decoder. + it->second.decoder->write (raw_data, nbytes); + } + + } while (nbytes > 0); + + // Flush any messages decoder may have produced to the dispatcher. + inout->flush (); + +} - return nbytes; +void zmq::pgm_receiver_t::out_event () +{ + zmq_assert (false); } + #endif diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 05b27e2..b573081 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -30,6 +30,8 @@ #include "zmq_decoder.hpp" #include "pgm_socket.hpp" +#include + namespace zmq { @@ -45,7 +47,6 @@ namespace zmq ~pgm_receiver_t (); int init (bool udp_encapsulation_, const char *network_); - void reconnect (); // i_engine interface implementation. void plug (struct i_inout *inout_); @@ -57,15 +58,28 @@ namespace zmq void out_event (); private: - // Read exactly iov_len_ count APDUs, function returns number - // of bytes received. Note that if we did not join message stream - // before and there is not message beginning in the APDUs being - // received iov_len for such a APDUs will be 0. - ssize_t receive_with_offset (void **data_); - - // Message decoder. - zmq_decoder_t *decoder; - + + // Map to hold TSI, joined and decoder for each peer. + struct peer_info_t { + bool joined; + zmq_decoder_t *decoder; + }; + + struct tsi_comp { + bool operator () (const pgm_tsi_t <si, const pgm_tsi_t &rtsi) const + { + if (ltsi.sport < rtsi.sport) + return true; + + return (std::lexicographical_compare (ltsi.gsi.identifier, + ltsi.gsi.identifier + 6, + rtsi.gsi.identifier, rtsi.gsi.identifier + 6)); + } + }; + + typedef std::map peer_t; + peer_t peers; + // PGM socket. pgm_socket_t pgm_socket; @@ -75,9 +89,6 @@ namespace zmq // Name of the session associated with the connecter. std::string session_name; - // If receiver joined the messages stream. - bool joined; - // Parent session. i_inout *inout; diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 8ceff6c..88ef68e 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -23,10 +23,7 @@ #ifdef ZMQ_HAVE_LINUX #include -#else -#include -#include -#include +#include #endif #include @@ -36,6 +33,7 @@ #include "pgm_socket.hpp" #include "config.hpp" #include "err.hpp" +#include "uuid.hpp" //#define PGM_SOCKET_DEBUG //#define PGM_SOCKET_DEBUG_LEVEL 1 @@ -68,6 +66,21 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : } +int zmq::pgm_socket_t::pgm_create_custom_gsi (const char *data_, pgm_gsi_t *gsi_) +{ + + unsigned char result_md5 [16]; + + MD5_CTX ctx; + MD5_Init (&ctx); + MD5_Update (&ctx, data_, strlen (data_)); + MD5_Final (result_md5, &ctx); + + memcpy (gsi_, result_md5 + 10, 6); + + return 0; +} + int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) { udp_encapsulation = udp_encapsulation_; @@ -118,10 +131,6 @@ int zmq::pgm_socket_t::open_transport (void) // Can not open transport before destroying old one. zmq_assert (g_transport == NULL); - // Set actual_tsi and prev_tsi to zeros. - memset (&tsi, '\0', sizeof (pgm_tsi_t)); - memset (&retired_tsi, '\0', sizeof (pgm_tsi_t)); - // Zero counter used in msgrecv. nbytes_rec = 0; nbytes_processed = 0; @@ -146,12 +155,25 @@ int zmq::pgm_socket_t::open_transport (void) struct group_source_req recv_gsr, send_gsr; size_t recv_gsr_len = 1; - rc = pgm_create_md5_gsi (&gsi); + if (options.identity.size () > 0) { + + // Create gsi from identity string. + rc = pgm_create_custom_gsi (options.identity.c_str (), &gsi); + + } else { + + // Generate random gsi. + rc = pgm_create_custom_gsi (uuid_t ().to_string (), &gsi); + } + if (rc != 0) { errno = EINVAL; return -1; } + zmq_log (1, "Transport GSI: %s, %s(%i)\n", pgm_print_gsi (&gsi), + __FILE__, __LINE__); + // On success, 0 is returned. On invalid arguments, -EINVAL is returned. // If more multicast groups are found than the recv_len parameter, // -ENOMEM is returned. @@ -204,14 +226,6 @@ int zmq::pgm_socket_t::open_transport (void) // Receiver transport. if (receiver) { - // Set transport->may_close_on_failure to true, - // after data los recvmsgv returns -1 errno set to ECONNRESET. - rc = pgm_transport_set_close_on_failure (g_transport, TRUE); - if (rc != 0) { - errno = EINVAL; - return -1; - } - // Set transport->can_send_data = FALSE. // Note that NAKs are still generated by the transport. rc = pgm_transport_set_recv_only (g_transport, false); @@ -543,7 +557,7 @@ void zmq::pgm_socket_t::free_buffer (void *data_) // pgm_transport_recvmsgv is called to fill the pgm_msgv array up to // pgm_msgv_len. In subsequent calls data from pgm_msgv structure are // returned. -ssize_t zmq::pgm_socket_t::receive (void **raw_data_) +ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) { // We just sent all data from pgm_transport_recvmsgv up // and have to return 0 that another engine in this thread is scheduled. @@ -583,9 +597,13 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_) // For data loss nbytes_rec == -1 errno == ECONNRESET. if (nbytes_rec == -1 && errno == ECONNRESET) { - + + // Save lost data TSI. + *tsi_ = &(g_transport->lost_data_tsi); + // In case of dala loss -1 is returned. - zmq_log (1, "Data loss detected, %s(%i)\n", __FILE__, __LINE__); + zmq_log (1, "Data loss detected %s, %s(%i)\n", + pgm_print_tsi (&(g_transport->lost_data_tsi)), __FILE__, __LINE__); nbytes_rec = 0; return -1; } @@ -610,65 +628,9 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_) *raw_data_ = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_base; size_t raw_data_len = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_len; - // Check if peer TSI did not change, this is detection of peer restart. - const pgm_tsi_t *current_tsi = pgm_msgv [pgm_msgv_processed].msgv_tsi; - - // If empty store new TSI. - if (tsi_empty (&tsi)) { - // Store current peer TSI. - memcpy (&tsi, current_tsi, sizeof (pgm_tsi_t)); -#ifdef PGM_SOCKET_DEBUG - uint8_t *gsi = (uint8_t*)(&tsi)->gsi.identifier; -#endif - - zmq_log (1, "First peer TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)\n", - gsi [0], gsi [1], gsi [2], gsi [3], gsi [4], gsi [5], - ntohs (tsi.sport), __FILE__, __LINE__); - } - - // Compare stored TSI with actual. - if (!tsi_equal (&tsi, current_tsi)) { - // Peer change detected. - zmq_log (1, "Peer change detected, %s(%i)\n", __FILE__, __LINE__); - - // Compare with retired TSI, in case of match ignore APDU. - if (tsi_equal (&retired_tsi, current_tsi)) { - zmq_log (1, "Retired TSI - ignoring APDU, %s(%i)\n", - __FILE__, __LINE__); - - // Move the the next pgm_msgv_t structure. - pgm_msgv_processed++; - nbytes_processed +=raw_data_len; - - return 0; + // Save current TSI. + *tsi_ = pgm_msgv [pgm_msgv_processed].msgv_tsi; - } else { - zmq_log (1, "New TSI, %s(%i)\n", __FILE__, __LINE__); - - // Store new TSI and move last valid to retired_tsi - memcpy (&retired_tsi, &tsi, sizeof (pgm_tsi_t)); - memcpy (&tsi, current_tsi, sizeof (pgm_tsi_t)); - -#ifdef PGM_SOCKET_DEBUG - uint8_t *gsi = (uint8_t*)(&retired_tsi)->gsi.identifier; -#endif - zmq_log (1, "retired TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)\n", - gsi [0], gsi [1], gsi [2], gsi [3], gsi [4], gsi [5], - ntohs (retired_tsi.sport), __FILE__, __LINE__); - -#ifdef PGM_SOCKET_DEBUG - gsi = (uint8_t*)(&tsi)->gsi.identifier; -#endif - zmq_log (1, " TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)\n", - gsi [0], gsi [1], gsi [2], gsi [3], gsi [4], gsi [5], - ntohs (tsi.sport), __FILE__, __LINE__); - - // Peers change is recognized as a GAP. - return -1; - } - - } - // Move the the next pgm_msgv_t structure. pgm_msgv_processed++; nbytes_processed +=raw_data_len; @@ -692,47 +654,6 @@ void zmq::pgm_socket_t::process_upstream (void) zmq_assert (dummy_bytes == -1 && errno == EAGAIN); } -bool zmq::pgm_socket_t::tsi_equal (const pgm_tsi_t *tsi_a_, - const pgm_tsi_t *tsi_b_) -{ - // Compare 6B GSI. - const uint8_t *gsi_a = tsi_a_->gsi.identifier; - const uint8_t *gsi_b = tsi_b_->gsi.identifier; - - if (gsi_a [0] != gsi_b [0] || gsi_a [1] != gsi_b [1] || - gsi_a [2] != gsi_b [2] || gsi_a [3] != gsi_b [3] || - gsi_a [4] != gsi_b [4] || gsi_a [5] != gsi_b [5]) { - - return false; - } - - // Compare source port. - if (tsi_a_->sport != tsi_b_->sport) { - return false; - } - - return true; -} - -bool zmq::pgm_socket_t::tsi_empty (const pgm_tsi_t *tsi_) -{ - - uint8_t *gsi = (uint8_t*)tsi_->gsi.identifier; - - // GSI. - if (gsi [0] != 0 || gsi [1] != 0 || gsi [2] != 0 || - gsi [3] != 0 || gsi [4] != 0 || gsi [5] != 0) { - return false; - } - - // Source port. - if (tsi_->sport != 0) { - return false; - } - - return true; -} - #endif #endif diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index fe4468b..c9d0feb 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -77,7 +77,7 @@ namespace zmq void free_buffer (void *data_); // Receive data from pgm socket. - ssize_t receive (void **data_); + ssize_t receive (void **data_, const pgm_tsi_t **tsi_); // POLLIN on sender side should mean NAK or SPMR receiving. // process_upstream function is used to handle such a situation. @@ -90,21 +90,18 @@ namespace zmq private: - // Associated socket options. - options_t options; - // Returns max tsdu size without fragmentation. size_t get_max_tsdu_size (void); // Returns maximum count of apdus which fills readbuf_size_ size_t get_max_apdu_at_once (size_t readbuf_size_); - // Return true if TSI has empty GSI ('\0') and sport 0. - bool tsi_empty (const pgm_tsi_t *tsi_); + // Compute gsi from string. + int pgm_create_custom_gsi (const char *data_, pgm_gsi_t *gsi_); - // Compare TSIs, return true if equal. - bool tsi_equal (const pgm_tsi_t *tsi_a_, const pgm_tsi_t *tsi_b_); - + // Associated socket options. + options_t options; + // true when pgm_socket should create receiving side. bool receiver; @@ -140,10 +137,10 @@ namespace zmq enum {pgm_receiver_fd_count = 2}; // TSI of the actual peer. - pgm_tsi_t tsi; +// pgm_tsi_t tsi; // Previous peer TSI. - pgm_tsi_t retired_tsi; +// pgm_tsi_t retired_tsi; #endif }; diff --git a/src/platform.hpp.in b/src/platform.hpp.in index 8e345fc..e6bb10c 100644 --- a/src/platform.hpp.in +++ b/src/platform.hpp.in @@ -36,6 +36,9 @@ /* Define to 1 if you have the `socket' library (-lsocket). */ #undef HAVE_LIBSOCKET +/* Define to 1 if you have the `ssl' library (-lssl). */ +#undef HAVE_LIBSSL + /* Define to 1 if you have the `stdc++' library (-lstdc++). */ #undef HAVE_LIBSTDC__ @@ -61,6 +64,9 @@ /* Define to 1 if you have the header file. */ #undef HAVE_NETINET_TCP_H +/* Define to 1 if you have the header file. */ +#undef HAVE_OPENSSL_MD5_H + /* Define to 1 if you have the `perror' function. */ #undef HAVE_PERROR -- cgit v1.2.3