From 85cbd7f83c10c70da8fa44fe7673143703f9710d Mon Sep 17 00:00:00 2001 From: malosek Date: Tue, 22 Sep 2009 15:12:51 +0200 Subject: added PGM bus functionality --- src/pgm_socket.cpp | 159 ++++++++++++++--------------------------------------- 1 file changed, 40 insertions(+), 119 deletions(-) (limited to 'src/pgm_socket.cpp') diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 8ceff6c..88ef68e 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -23,10 +23,7 @@ #ifdef ZMQ_HAVE_LINUX #include -#else -#include -#include -#include +#include #endif #include @@ -36,6 +33,7 @@ #include "pgm_socket.hpp" #include "config.hpp" #include "err.hpp" +#include "uuid.hpp" //#define PGM_SOCKET_DEBUG //#define PGM_SOCKET_DEBUG_LEVEL 1 @@ -68,6 +66,21 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : } +int zmq::pgm_socket_t::pgm_create_custom_gsi (const char *data_, pgm_gsi_t *gsi_) +{ + + unsigned char result_md5 [16]; + + MD5_CTX ctx; + MD5_Init (&ctx); + MD5_Update (&ctx, data_, strlen (data_)); + MD5_Final (result_md5, &ctx); + + memcpy (gsi_, result_md5 + 10, 6); + + return 0; +} + int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) { udp_encapsulation = udp_encapsulation_; @@ -118,10 +131,6 @@ int zmq::pgm_socket_t::open_transport (void) // Can not open transport before destroying old one. zmq_assert (g_transport == NULL); - // Set actual_tsi and prev_tsi to zeros. - memset (&tsi, '\0', sizeof (pgm_tsi_t)); - memset (&retired_tsi, '\0', sizeof (pgm_tsi_t)); - // Zero counter used in msgrecv. nbytes_rec = 0; nbytes_processed = 0; @@ -146,12 +155,25 @@ int zmq::pgm_socket_t::open_transport (void) struct group_source_req recv_gsr, send_gsr; size_t recv_gsr_len = 1; - rc = pgm_create_md5_gsi (&gsi); + if (options.identity.size () > 0) { + + // Create gsi from identity string. + rc = pgm_create_custom_gsi (options.identity.c_str (), &gsi); + + } else { + + // Generate random gsi. + rc = pgm_create_custom_gsi (uuid_t ().to_string (), &gsi); + } + if (rc != 0) { errno = EINVAL; return -1; } + zmq_log (1, "Transport GSI: %s, %s(%i)\n", pgm_print_gsi (&gsi), + __FILE__, __LINE__); + // On success, 0 is returned. On invalid arguments, -EINVAL is returned. // If more multicast groups are found than the recv_len parameter, // -ENOMEM is returned. @@ -204,14 +226,6 @@ int zmq::pgm_socket_t::open_transport (void) // Receiver transport. if (receiver) { - // Set transport->may_close_on_failure to true, - // after data los recvmsgv returns -1 errno set to ECONNRESET. - rc = pgm_transport_set_close_on_failure (g_transport, TRUE); - if (rc != 0) { - errno = EINVAL; - return -1; - } - // Set transport->can_send_data = FALSE. // Note that NAKs are still generated by the transport. rc = pgm_transport_set_recv_only (g_transport, false); @@ -543,7 +557,7 @@ void zmq::pgm_socket_t::free_buffer (void *data_) // pgm_transport_recvmsgv is called to fill the pgm_msgv array up to // pgm_msgv_len. In subsequent calls data from pgm_msgv structure are // returned. -ssize_t zmq::pgm_socket_t::receive (void **raw_data_) +ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) { // We just sent all data from pgm_transport_recvmsgv up // and have to return 0 that another engine in this thread is scheduled. @@ -583,9 +597,13 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_) // For data loss nbytes_rec == -1 errno == ECONNRESET. if (nbytes_rec == -1 && errno == ECONNRESET) { - + + // Save lost data TSI. + *tsi_ = &(g_transport->lost_data_tsi); + // In case of dala loss -1 is returned. - zmq_log (1, "Data loss detected, %s(%i)\n", __FILE__, __LINE__); + zmq_log (1, "Data loss detected %s, %s(%i)\n", + pgm_print_tsi (&(g_transport->lost_data_tsi)), __FILE__, __LINE__); nbytes_rec = 0; return -1; } @@ -610,65 +628,9 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_) *raw_data_ = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_base; size_t raw_data_len = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_len; - // Check if peer TSI did not change, this is detection of peer restart. - const pgm_tsi_t *current_tsi = pgm_msgv [pgm_msgv_processed].msgv_tsi; - - // If empty store new TSI. - if (tsi_empty (&tsi)) { - // Store current peer TSI. - memcpy (&tsi, current_tsi, sizeof (pgm_tsi_t)); -#ifdef PGM_SOCKET_DEBUG - uint8_t *gsi = (uint8_t*)(&tsi)->gsi.identifier; -#endif - - zmq_log (1, "First peer TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)\n", - gsi [0], gsi [1], gsi [2], gsi [3], gsi [4], gsi [5], - ntohs (tsi.sport), __FILE__, __LINE__); - } - - // Compare stored TSI with actual. - if (!tsi_equal (&tsi, current_tsi)) { - // Peer change detected. - zmq_log (1, "Peer change detected, %s(%i)\n", __FILE__, __LINE__); - - // Compare with retired TSI, in case of match ignore APDU. - if (tsi_equal (&retired_tsi, current_tsi)) { - zmq_log (1, "Retired TSI - ignoring APDU, %s(%i)\n", - __FILE__, __LINE__); - - // Move the the next pgm_msgv_t structure. - pgm_msgv_processed++; - nbytes_processed +=raw_data_len; - - return 0; + // Save current TSI. + *tsi_ = pgm_msgv [pgm_msgv_processed].msgv_tsi; - } else { - zmq_log (1, "New TSI, %s(%i)\n", __FILE__, __LINE__); - - // Store new TSI and move last valid to retired_tsi - memcpy (&retired_tsi, &tsi, sizeof (pgm_tsi_t)); - memcpy (&tsi, current_tsi, sizeof (pgm_tsi_t)); - -#ifdef PGM_SOCKET_DEBUG - uint8_t *gsi = (uint8_t*)(&retired_tsi)->gsi.identifier; -#endif - zmq_log (1, "retired TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)\n", - gsi [0], gsi [1], gsi [2], gsi [3], gsi [4], gsi [5], - ntohs (retired_tsi.sport), __FILE__, __LINE__); - -#ifdef PGM_SOCKET_DEBUG - gsi = (uint8_t*)(&tsi)->gsi.identifier; -#endif - zmq_log (1, " TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)\n", - gsi [0], gsi [1], gsi [2], gsi [3], gsi [4], gsi [5], - ntohs (tsi.sport), __FILE__, __LINE__); - - // Peers change is recognized as a GAP. - return -1; - } - - } - // Move the the next pgm_msgv_t structure. pgm_msgv_processed++; nbytes_processed +=raw_data_len; @@ -692,47 +654,6 @@ void zmq::pgm_socket_t::process_upstream (void) zmq_assert (dummy_bytes == -1 && errno == EAGAIN); } -bool zmq::pgm_socket_t::tsi_equal (const pgm_tsi_t *tsi_a_, - const pgm_tsi_t *tsi_b_) -{ - // Compare 6B GSI. - const uint8_t *gsi_a = tsi_a_->gsi.identifier; - const uint8_t *gsi_b = tsi_b_->gsi.identifier; - - if (gsi_a [0] != gsi_b [0] || gsi_a [1] != gsi_b [1] || - gsi_a [2] != gsi_b [2] || gsi_a [3] != gsi_b [3] || - gsi_a [4] != gsi_b [4] || gsi_a [5] != gsi_b [5]) { - - return false; - } - - // Compare source port. - if (tsi_a_->sport != tsi_b_->sport) { - return false; - } - - return true; -} - -bool zmq::pgm_socket_t::tsi_empty (const pgm_tsi_t *tsi_) -{ - - uint8_t *gsi = (uint8_t*)tsi_->gsi.identifier; - - // GSI. - if (gsi [0] != 0 || gsi [1] != 0 || gsi [2] != 0 || - gsi [3] != 0 || gsi [4] != 0 || gsi [5] != 0) { - return false; - } - - // Source port. - if (tsi_->sport != 0) { - return false; - } - - return true; -} - #endif #endif -- cgit v1.2.3