summaryrefslogtreecommitdiff
path: root/src/pgm_socket.cpp
diff options
context:
space:
mode:
authormalosek <malosek@fastmq.com>2009-09-22 15:12:51 +0200
committermalosek <malosek@fastmq.com>2009-09-22 15:12:51 +0200
commit85cbd7f83c10c70da8fa44fe7673143703f9710d (patch)
tree4552f0e9d0258c9b5f39e25ca7c8a607d7f28497 /src/pgm_socket.cpp
parent3bd8f83f6d412221e4673ceb90b8ca7fa74ff2f1 (diff)
added PGM bus functionality
Diffstat (limited to 'src/pgm_socket.cpp')
-rw-r--r--src/pgm_socket.cpp159
1 files changed, 40 insertions, 119 deletions
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 8ceff6c..88ef68e 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -23,10 +23,7 @@
#ifdef ZMQ_HAVE_LINUX
#include <pgm/pgm.h>
-#else
-#include <Winsock2.h>
-#include <Wsrm.h>
-#include <ws2spi.h>
+#include <openssl/md5.h>
#endif
#include <string>
@@ -36,6 +33,7 @@
#include "pgm_socket.hpp"
#include "config.hpp"
#include "err.hpp"
+#include "uuid.hpp"
//#define PGM_SOCKET_DEBUG
//#define PGM_SOCKET_DEBUG_LEVEL 1
@@ -68,6 +66,21 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
}
+int zmq::pgm_socket_t::pgm_create_custom_gsi (const char *data_, pgm_gsi_t *gsi_)
+{
+
+ unsigned char result_md5 [16];
+
+ MD5_CTX ctx;
+ MD5_Init (&ctx);
+ MD5_Update (&ctx, data_, strlen (data_));
+ MD5_Final (result_md5, &ctx);
+
+ memcpy (gsi_, result_md5 + 10, 6);
+
+ return 0;
+}
+
int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
{
udp_encapsulation = udp_encapsulation_;
@@ -118,10 +131,6 @@ int zmq::pgm_socket_t::open_transport (void)
// Can not open transport before destroying old one.
zmq_assert (g_transport == NULL);
- // Set actual_tsi and prev_tsi to zeros.
- memset (&tsi, '\0', sizeof (pgm_tsi_t));
- memset (&retired_tsi, '\0', sizeof (pgm_tsi_t));
-
// Zero counter used in msgrecv.
nbytes_rec = 0;
nbytes_processed = 0;
@@ -146,12 +155,25 @@ int zmq::pgm_socket_t::open_transport (void)
struct group_source_req recv_gsr, send_gsr;
size_t recv_gsr_len = 1;
- rc = pgm_create_md5_gsi (&gsi);
+ if (options.identity.size () > 0) {
+
+ // Create gsi from identity string.
+ rc = pgm_create_custom_gsi (options.identity.c_str (), &gsi);
+
+ } else {
+
+ // Generate random gsi.
+ rc = pgm_create_custom_gsi (uuid_t ().to_string (), &gsi);
+ }
+
if (rc != 0) {
errno = EINVAL;
return -1;
}
+ zmq_log (1, "Transport GSI: %s, %s(%i)\n", pgm_print_gsi (&gsi),
+ __FILE__, __LINE__);
+
// On success, 0 is returned. On invalid arguments, -EINVAL is returned.
// If more multicast groups are found than the recv_len parameter,
// -ENOMEM is returned.
@@ -204,14 +226,6 @@ int zmq::pgm_socket_t::open_transport (void)
// Receiver transport.
if (receiver) {
- // Set transport->may_close_on_failure to true,
- // after data los recvmsgv returns -1 errno set to ECONNRESET.
- rc = pgm_transport_set_close_on_failure (g_transport, TRUE);
- if (rc != 0) {
- errno = EINVAL;
- return -1;
- }
-
// Set transport->can_send_data = FALSE.
// Note that NAKs are still generated by the transport.
rc = pgm_transport_set_recv_only (g_transport, false);
@@ -543,7 +557,7 @@ void zmq::pgm_socket_t::free_buffer (void *data_)
// pgm_transport_recvmsgv is called to fill the pgm_msgv array up to
// pgm_msgv_len. In subsequent calls data from pgm_msgv structure are
// returned.
-ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
+ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
{
// We just sent all data from pgm_transport_recvmsgv up
// and have to return 0 that another engine in this thread is scheduled.
@@ -583,9 +597,13 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
// For data loss nbytes_rec == -1 errno == ECONNRESET.
if (nbytes_rec == -1 && errno == ECONNRESET) {
-
+
+ // Save lost data TSI.
+ *tsi_ = &(g_transport->lost_data_tsi);
+
// In case of dala loss -1 is returned.
- zmq_log (1, "Data loss detected, %s(%i)\n", __FILE__, __LINE__);
+ zmq_log (1, "Data loss detected %s, %s(%i)\n",
+ pgm_print_tsi (&(g_transport->lost_data_tsi)), __FILE__, __LINE__);
nbytes_rec = 0;
return -1;
}
@@ -610,65 +628,9 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
*raw_data_ = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_base;
size_t raw_data_len = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_len;
- // Check if peer TSI did not change, this is detection of peer restart.
- const pgm_tsi_t *current_tsi = pgm_msgv [pgm_msgv_processed].msgv_tsi;
-
- // If empty store new TSI.
- if (tsi_empty (&tsi)) {
- // Store current peer TSI.
- memcpy (&tsi, current_tsi, sizeof (pgm_tsi_t));
-#ifdef PGM_SOCKET_DEBUG
- uint8_t *gsi = (uint8_t*)(&tsi)->gsi.identifier;
-#endif
-
- zmq_log (1, "First peer TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)\n",
- gsi [0], gsi [1], gsi [2], gsi [3], gsi [4], gsi [5],
- ntohs (tsi.sport), __FILE__, __LINE__);
- }
-
- // Compare stored TSI with actual.
- if (!tsi_equal (&tsi, current_tsi)) {
- // Peer change detected.
- zmq_log (1, "Peer change detected, %s(%i)\n", __FILE__, __LINE__);
-
- // Compare with retired TSI, in case of match ignore APDU.
- if (tsi_equal (&retired_tsi, current_tsi)) {
- zmq_log (1, "Retired TSI - ignoring APDU, %s(%i)\n",
- __FILE__, __LINE__);
-
- // Move the the next pgm_msgv_t structure.
- pgm_msgv_processed++;
- nbytes_processed +=raw_data_len;
-
- return 0;
+ // Save current TSI.
+ *tsi_ = pgm_msgv [pgm_msgv_processed].msgv_tsi;
- } else {
- zmq_log (1, "New TSI, %s(%i)\n", __FILE__, __LINE__);
-
- // Store new TSI and move last valid to retired_tsi
- memcpy (&retired_tsi, &tsi, sizeof (pgm_tsi_t));
- memcpy (&tsi, current_tsi, sizeof (pgm_tsi_t));
-
-#ifdef PGM_SOCKET_DEBUG
- uint8_t *gsi = (uint8_t*)(&retired_tsi)->gsi.identifier;
-#endif
- zmq_log (1, "retired TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)\n",
- gsi [0], gsi [1], gsi [2], gsi [3], gsi [4], gsi [5],
- ntohs (retired_tsi.sport), __FILE__, __LINE__);
-
-#ifdef PGM_SOCKET_DEBUG
- gsi = (uint8_t*)(&tsi)->gsi.identifier;
-#endif
- zmq_log (1, " TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)\n",
- gsi [0], gsi [1], gsi [2], gsi [3], gsi [4], gsi [5],
- ntohs (tsi.sport), __FILE__, __LINE__);
-
- // Peers change is recognized as a GAP.
- return -1;
- }
-
- }
-
// Move the the next pgm_msgv_t structure.
pgm_msgv_processed++;
nbytes_processed +=raw_data_len;
@@ -692,47 +654,6 @@ void zmq::pgm_socket_t::process_upstream (void)
zmq_assert (dummy_bytes == -1 && errno == EAGAIN);
}
-bool zmq::pgm_socket_t::tsi_equal (const pgm_tsi_t *tsi_a_,
- const pgm_tsi_t *tsi_b_)
-{
- // Compare 6B GSI.
- const uint8_t *gsi_a = tsi_a_->gsi.identifier;
- const uint8_t *gsi_b = tsi_b_->gsi.identifier;
-
- if (gsi_a [0] != gsi_b [0] || gsi_a [1] != gsi_b [1] ||
- gsi_a [2] != gsi_b [2] || gsi_a [3] != gsi_b [3] ||
- gsi_a [4] != gsi_b [4] || gsi_a [5] != gsi_b [5]) {
-
- return false;
- }
-
- // Compare source port.
- if (tsi_a_->sport != tsi_b_->sport) {
- return false;
- }
-
- return true;
-}
-
-bool zmq::pgm_socket_t::tsi_empty (const pgm_tsi_t *tsi_)
-{
-
- uint8_t *gsi = (uint8_t*)tsi_->gsi.identifier;
-
- // GSI.
- if (gsi [0] != 0 || gsi [1] != 0 || gsi [2] != 0 ||
- gsi [3] != 0 || gsi [4] != 0 || gsi [5] != 0) {
- return false;
- }
-
- // Source port.
- if (tsi_->sport != 0) {
- return false;
- }
-
- return true;
-}
-
#endif
#endif