summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/options.cpp2
-rw-r--r--src/pgm_receiver.cpp176
-rw-r--r--src/pgm_receiver.hpp37
-rw-r--r--src/pgm_socket.cpp159
-rw-r--r--src/pgm_socket.hpp19
-rw-r--r--src/platform.hpp.in6
6 files changed, 170 insertions, 229 deletions
diff --git a/src/options.cpp b/src/options.cpp
index 8d3de45..120ae7c 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -29,7 +29,7 @@ zmq::options_t::options_t () :
affinity (0),
rate (100),
recovery_ivl (10),
- use_multicast_loop (false),
+ use_multicast_loop (true),
requires_in (false),
requires_out (false)
{
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index f68a909..50a8ff9 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -46,26 +46,21 @@
zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
const options_t &options_, const char *session_name_) :
io_object_t (parent_),
- decoder (NULL),
pgm_socket (true, options_),
options (options_),
session_name (session_name_),
- joined (false),
inout (NULL)
{
}
zmq::pgm_receiver_t::~pgm_receiver_t ()
{
- if (decoder)
- delete decoder;
+ // Destructor should not be called before unplug.
+ zmq_assert (peers.empty ());
}
int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
{
- decoder = new zmq_decoder_t;
- zmq_assert (decoder);
-
return pgm_socket.init (udp_encapsulation_, network_);
}
@@ -75,8 +70,6 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_)
int socket_fd;
int waiting_pipe_fd;
- decoder->set_inout (inout_);
-
// Fill socket_fd and waiting_pipe_fd from PGM transport
pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
@@ -95,9 +88,16 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_)
void zmq::pgm_receiver_t::unplug ()
{
+ // Delete decoders.
+ for (peer_t::iterator it = peers.begin (); it != peers.end (); it++) {
+ if (it->second.decoder != NULL)
+ delete it->second.decoder;
+ }
+
+ peers.clear ();
+
rm_fd (socket_handle);
rm_fd (pipe_handle);
- decoder->set_inout (NULL);
inout = NULL;
}
@@ -106,102 +106,108 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
-void zmq::pgm_receiver_t::reconnect ()
-{
- // Save inout ptr.
- i_inout *inout_tmp = inout;
-
- // PGM receiver is not joined anymore.
- joined = false;
-
- // Unplug - plug PGM transport.
- unplug ();
- delete decoder;
- decoder = new zmq_decoder_t;
- zmq_assert (decoder);
- plug (inout_tmp);
-}
-
// POLLIN event from socket or waiting_pipe.
void zmq::pgm_receiver_t::in_event ()
{
- void *data_with_offset;
+ // Iterator to peers map.
+ peer_t::iterator it;
+
+ // Data from PGM socket.
+ unsigned char *raw_data = NULL;
+ const pgm_tsi_t *tsi = NULL;
ssize_t nbytes = 0;
- // Read all data from pgm socket.
- while ((nbytes = receive_with_offset (&data_with_offset)) > 0) {
-
- // Push all the data to the decoder.
- decoder->write ((unsigned char*)data_with_offset, nbytes);
- }
+ do {
- // Flush any messages decoder may have produced to the dispatcher.
- inout->flush ();
+ // Read data from underlying pgm_socket.
+ nbytes = pgm_socket.receive ((void**) &raw_data, &tsi);
- // Data loss detected.
- if (nbytes == -1) {
+ // No ODATA or RDATA.
+ if (!nbytes)
+ break;
- // Recreate PGM transport.
- reconnect ();
- }
-}
+ // Fid TSI in peers list.
+ it = peers.find (*tsi);
-void zmq::pgm_receiver_t::out_event ()
-{
- zmq_assert (false);
-}
+ // Data loss.
+ if (nbytes == -1) {
-ssize_t zmq::pgm_receiver_t::receive_with_offset
- (void **data_)
-{
+ zmq_assert (it != peers.end ());
- // Data from PGM socket.
- void *rd = NULL;
- unsigned char *raw_data = NULL;
+ // Delete decoder and set joined to false.
+ it->second.joined = false;
+
+ if (it->second.decoder != NULL) {
+ delete it->second.decoder;
+ it->second.decoder = NULL;
+ }
- // Read data from underlying pgm_socket.
- ssize_t nbytes = pgm_socket.receive ((void**) &rd);
- raw_data = (unsigned char*) rd;
+ break;
+ }
- // No ODATA or RDATA.
- if (!nbytes)
- return 0;
+ // Read offset of the fist message in current APDU.
+ zmq_assert ((size_t) nbytes >= sizeof (uint16_t));
+ uint16_t apdu_offset = get_uint16 (raw_data);
- // Data loss.
- if (nbytes == -1) {
- return -1;
- }
+ // Shift raw_data & decrease nbytes by the first message offset
+ // information (sizeof uint16_t).
+ raw_data += sizeof (uint16_t);
+ nbytes -= sizeof (uint16_t);
+ zmq_assert (apdu_offset <= nbytes);
- // Read offset of the fist message in current APDU.
- uint16_t apdu_offset = get_uint16 (raw_data);
+ // New peer.
+ if (it == peers.end ()) {
- // Shift raw_data & decrease nbytes by the first message offset
- // information (sizeof uint16_t).
- *data_ = raw_data + sizeof (uint16_t);
- nbytes -= sizeof (uint16_t);
+ peer_info_t peer_info = {false, NULL};
+ it = peers.insert (std::make_pair (*tsi, peer_info)).first;
- // There is not beginning of the message in current APDU and we
- // are not joined jet -> throwing data.
- if (apdu_offset == 0xFFFF && !joined) {
- *data_ = NULL;
- return 0;
- }
+ zmq_log (1, "New peer TSI: %s, %s(%i).\n", pgm_print_tsi (tsi),
+ __FILE__, __LINE__);
+ }
- // Now is the possibility to join the stream.
- if (!joined) {
-
- // We have to move data to the begining of the first message.
- *data_ = (unsigned char *)*data_ + apdu_offset;
- nbytes -= apdu_offset;
+ // There is not beginning of the message in current APDU and we
+ // are not joined jet -> throwing data.
+ if (apdu_offset == 0xFFFF && !it->second.joined) {
+ break;
+ }
- // Joined the stream.
- joined = true;
+ // Now is the possibility to join the stream.
+ if (!it->second.joined) {
+
+ zmq_assert (it->second.decoder == NULL);
- zmq_log (2, "joined into the stream, %s(%i)\n",
- __FILE__, __LINE__);
- }
+ // We have to move data to the begining of the first message.
+ raw_data += apdu_offset;
+ nbytes -= apdu_offset;
+
+ // Joined the stream.
+ it->second.joined = true;
+
+ // Create and connect decoder for joined peer.
+ it->second.decoder = new zmq_decoder_t;
+ it->second.decoder->set_inout (inout);
+
+ zmq_log (1, "Peer %s joined into the stream, %s(%i)\n",
+ pgm_print_tsi (tsi), __FILE__, __LINE__);
+ }
+
+ if (nbytes > 0) {
+
+ // Push all the data to the decoder.
+ it->second.decoder->write (raw_data, nbytes);
+ }
+
+ } while (nbytes > 0);
+
+ // Flush any messages decoder may have produced to the dispatcher.
+ inout->flush ();
+
+}
- return nbytes;
+void zmq::pgm_receiver_t::out_event ()
+{
+ zmq_assert (false);
}
+
#endif
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index 05b27e2..b573081 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -30,6 +30,8 @@
#include "zmq_decoder.hpp"
#include "pgm_socket.hpp"
+#include <map>
+
namespace zmq
{
@@ -45,7 +47,6 @@ namespace zmq
~pgm_receiver_t ();
int init (bool udp_encapsulation_, const char *network_);
- void reconnect ();
// i_engine interface implementation.
void plug (struct i_inout *inout_);
@@ -57,15 +58,28 @@ namespace zmq
void out_event ();
private:
- // Read exactly iov_len_ count APDUs, function returns number
- // of bytes received. Note that if we did not join message stream
- // before and there is not message beginning in the APDUs being
- // received iov_len for such a APDUs will be 0.
- ssize_t receive_with_offset (void **data_);
-
- // Message decoder.
- zmq_decoder_t *decoder;
-
+
+ // Map to hold TSI, joined and decoder for each peer.
+ struct peer_info_t {
+ bool joined;
+ zmq_decoder_t *decoder;
+ };
+
+ struct tsi_comp {
+ bool operator () (const pgm_tsi_t &ltsi, const pgm_tsi_t &rtsi) const
+ {
+ if (ltsi.sport < rtsi.sport)
+ return true;
+
+ return (std::lexicographical_compare (ltsi.gsi.identifier,
+ ltsi.gsi.identifier + 6,
+ rtsi.gsi.identifier, rtsi.gsi.identifier + 6));
+ }
+ };
+
+ typedef std::map <pgm_tsi_t, peer_info_t, tsi_comp> peer_t;
+ peer_t peers;
+
// PGM socket.
pgm_socket_t pgm_socket;
@@ -75,9 +89,6 @@ namespace zmq
// Name of the session associated with the connecter.
std::string session_name;
- // If receiver joined the messages stream.
- bool joined;
-
// Parent session.
i_inout *inout;
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 8ceff6c..88ef68e 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -23,10 +23,7 @@
#ifdef ZMQ_HAVE_LINUX
#include <pgm/pgm.h>
-#else
-#include <Winsock2.h>
-#include <Wsrm.h>
-#include <ws2spi.h>
+#include <openssl/md5.h>
#endif
#include <string>
@@ -36,6 +33,7 @@
#include "pgm_socket.hpp"
#include "config.hpp"
#include "err.hpp"
+#include "uuid.hpp"
//#define PGM_SOCKET_DEBUG
//#define PGM_SOCKET_DEBUG_LEVEL 1
@@ -68,6 +66,21 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
}
+int zmq::pgm_socket_t::pgm_create_custom_gsi (const char *data_, pgm_gsi_t *gsi_)
+{
+
+ unsigned char result_md5 [16];
+
+ MD5_CTX ctx;
+ MD5_Init (&ctx);
+ MD5_Update (&ctx, data_, strlen (data_));
+ MD5_Final (result_md5, &ctx);
+
+ memcpy (gsi_, result_md5 + 10, 6);
+
+ return 0;
+}
+
int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
{
udp_encapsulation = udp_encapsulation_;
@@ -118,10 +131,6 @@ int zmq::pgm_socket_t::open_transport (void)
// Can not open transport before destroying old one.
zmq_assert (g_transport == NULL);
- // Set actual_tsi and prev_tsi to zeros.
- memset (&tsi, '\0', sizeof (pgm_tsi_t));
- memset (&retired_tsi, '\0', sizeof (pgm_tsi_t));
-
// Zero counter used in msgrecv.
nbytes_rec = 0;
nbytes_processed = 0;
@@ -146,12 +155,25 @@ int zmq::pgm_socket_t::open_transport (void)
struct group_source_req recv_gsr, send_gsr;
size_t recv_gsr_len = 1;
- rc = pgm_create_md5_gsi (&gsi);
+ if (options.identity.size () > 0) {
+
+ // Create gsi from identity string.
+ rc = pgm_create_custom_gsi (options.identity.c_str (), &gsi);
+
+ } else {
+
+ // Generate random gsi.
+ rc = pgm_create_custom_gsi (uuid_t ().to_string (), &gsi);
+ }
+
if (rc != 0) {
errno = EINVAL;
return -1;
}
+ zmq_log (1, "Transport GSI: %s, %s(%i)\n", pgm_print_gsi (&gsi),
+ __FILE__, __LINE__);
+
// On success, 0 is returned. On invalid arguments, -EINVAL is returned.
// If more multicast groups are found than the recv_len parameter,
// -ENOMEM is returned.
@@ -204,14 +226,6 @@ int zmq::pgm_socket_t::open_transport (void)
// Receiver transport.
if (receiver) {
- // Set transport->may_close_on_failure to true,
- // after data los recvmsgv returns -1 errno set to ECONNRESET.
- rc = pgm_transport_set_close_on_failure (g_transport, TRUE);
- if (rc != 0) {
- errno = EINVAL;
- return -1;
- }
-
// Set transport->can_send_data = FALSE.
// Note that NAKs are still generated by the transport.
rc = pgm_transport_set_recv_only (g_transport, false);
@@ -543,7 +557,7 @@ void zmq::pgm_socket_t::free_buffer (void *data_)
// pgm_transport_recvmsgv is called to fill the pgm_msgv array up to
// pgm_msgv_len. In subsequent calls data from pgm_msgv structure are
// returned.
-ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
+ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
{
// We just sent all data from pgm_transport_recvmsgv up
// and have to return 0 that another engine in this thread is scheduled.
@@ -583,9 +597,13 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
// For data loss nbytes_rec == -1 errno == ECONNRESET.
if (nbytes_rec == -1 && errno == ECONNRESET) {
-
+
+ // Save lost data TSI.
+ *tsi_ = &(g_transport->lost_data_tsi);
+
// In case of dala loss -1 is returned.
- zmq_log (1, "Data loss detected, %s(%i)\n", __FILE__, __LINE__);
+ zmq_log (1, "Data loss detected %s, %s(%i)\n",
+ pgm_print_tsi (&(g_transport->lost_data_tsi)), __FILE__, __LINE__);
nbytes_rec = 0;
return -1;
}
@@ -610,65 +628,9 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
*raw_data_ = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_base;
size_t raw_data_len = pgm_msgv[pgm_msgv_processed].msgv_iov->iov_len;
- // Check if peer TSI did not change, this is detection of peer restart.
- const pgm_tsi_t *current_tsi = pgm_msgv [pgm_msgv_processed].msgv_tsi;
-
- // If empty store new TSI.
- if (tsi_empty (&tsi)) {
- // Store current peer TSI.
- memcpy (&tsi, current_tsi, sizeof (pgm_tsi_t));
-#ifdef PGM_SOCKET_DEBUG
- uint8_t *gsi = (uint8_t*)(&tsi)->gsi.identifier;
-#endif
-
- zmq_log (1, "First peer TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)\n",
- gsi [0], gsi [1], gsi [2], gsi [3], gsi [4], gsi [5],
- ntohs (tsi.sport), __FILE__, __LINE__);
- }
-
- // Compare stored TSI with actual.
- if (!tsi_equal (&tsi, current_tsi)) {
- // Peer change detected.
- zmq_log (1, "Peer change detected, %s(%i)\n", __FILE__, __LINE__);
-
- // Compare with retired TSI, in case of match ignore APDU.
- if (tsi_equal (&retired_tsi, current_tsi)) {
- zmq_log (1, "Retired TSI - ignoring APDU, %s(%i)\n",
- __FILE__, __LINE__);
-
- // Move the the next pgm_msgv_t structure.
- pgm_msgv_processed++;
- nbytes_processed +=raw_data_len;
-
- return 0;
+ // Save current TSI.
+ *tsi_ = pgm_msgv [pgm_msgv_processed].msgv_tsi;
- } else {
- zmq_log (1, "New TSI, %s(%i)\n", __FILE__, __LINE__);
-
- // Store new TSI and move last valid to retired_tsi
- memcpy (&retired_tsi, &tsi, sizeof (pgm_tsi_t));
- memcpy (&tsi, current_tsi, sizeof (pgm_tsi_t));
-
-#ifdef PGM_SOCKET_DEBUG
- uint8_t *gsi = (uint8_t*)(&retired_tsi)->gsi.identifier;
-#endif
- zmq_log (1, "retired TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)\n",
- gsi [0], gsi [1], gsi [2], gsi [3], gsi [4], gsi [5],
- ntohs (retired_tsi.sport), __FILE__, __LINE__);
-
-#ifdef PGM_SOCKET_DEBUG
- gsi = (uint8_t*)(&tsi)->gsi.identifier;
-#endif
- zmq_log (1, " TSI: %i.%i.%i.%i.%i.%i.%i, %s(%i)\n",
- gsi [0], gsi [1], gsi [2], gsi [3], gsi [4], gsi [5],
- ntohs (tsi.sport), __FILE__, __LINE__);
-
- // Peers change is recognized as a GAP.
- return -1;
- }
-
- }
-
// Move the the next pgm_msgv_t structure.
pgm_msgv_processed++;
nbytes_processed +=raw_data_len;
@@ -692,47 +654,6 @@ void zmq::pgm_socket_t::process_upstream (void)
zmq_assert (dummy_bytes == -1 && errno == EAGAIN);
}
-bool zmq::pgm_socket_t::tsi_equal (const pgm_tsi_t *tsi_a_,
- const pgm_tsi_t *tsi_b_)
-{
- // Compare 6B GSI.
- const uint8_t *gsi_a = tsi_a_->gsi.identifier;
- const uint8_t *gsi_b = tsi_b_->gsi.identifier;
-
- if (gsi_a [0] != gsi_b [0] || gsi_a [1] != gsi_b [1] ||
- gsi_a [2] != gsi_b [2] || gsi_a [3] != gsi_b [3] ||
- gsi_a [4] != gsi_b [4] || gsi_a [5] != gsi_b [5]) {
-
- return false;
- }
-
- // Compare source port.
- if (tsi_a_->sport != tsi_b_->sport) {
- return false;
- }
-
- return true;
-}
-
-bool zmq::pgm_socket_t::tsi_empty (const pgm_tsi_t *tsi_)
-{
-
- uint8_t *gsi = (uint8_t*)tsi_->gsi.identifier;
-
- // GSI.
- if (gsi [0] != 0 || gsi [1] != 0 || gsi [2] != 0 ||
- gsi [3] != 0 || gsi [4] != 0 || gsi [5] != 0) {
- return false;
- }
-
- // Source port.
- if (tsi_->sport != 0) {
- return false;
- }
-
- return true;
-}
-
#endif
#endif
diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp
index fe4468b..c9d0feb 100644
--- a/src/pgm_socket.hpp
+++ b/src/pgm_socket.hpp
@@ -77,7 +77,7 @@ namespace zmq
void free_buffer (void *data_);
// Receive data from pgm socket.
- ssize_t receive (void **data_);
+ ssize_t receive (void **data_, const pgm_tsi_t **tsi_);
// POLLIN on sender side should mean NAK or SPMR receiving.
// process_upstream function is used to handle such a situation.
@@ -90,21 +90,18 @@ namespace zmq
private:
- // Associated socket options.
- options_t options;
-
// Returns max tsdu size without fragmentation.
size_t get_max_tsdu_size (void);
// Returns maximum count of apdus which fills readbuf_size_
size_t get_max_apdu_at_once (size_t readbuf_size_);
- // Return true if TSI has empty GSI ('\0') and sport 0.
- bool tsi_empty (const pgm_tsi_t *tsi_);
+ // Compute gsi from string.
+ int pgm_create_custom_gsi (const char *data_, pgm_gsi_t *gsi_);
- // Compare TSIs, return true if equal.
- bool tsi_equal (const pgm_tsi_t *tsi_a_, const pgm_tsi_t *tsi_b_);
-
+ // Associated socket options.
+ options_t options;
+
// true when pgm_socket should create receiving side.
bool receiver;
@@ -140,10 +137,10 @@ namespace zmq
enum {pgm_receiver_fd_count = 2};
// TSI of the actual peer.
- pgm_tsi_t tsi;
+// pgm_tsi_t tsi;
// Previous peer TSI.
- pgm_tsi_t retired_tsi;
+// pgm_tsi_t retired_tsi;
#endif
};
diff --git a/src/platform.hpp.in b/src/platform.hpp.in
index 8e345fc..e6bb10c 100644
--- a/src/platform.hpp.in
+++ b/src/platform.hpp.in
@@ -36,6 +36,9 @@
/* Define to 1 if you have the `socket' library (-lsocket). */
#undef HAVE_LIBSOCKET
+/* Define to 1 if you have the `ssl' library (-lssl). */
+#undef HAVE_LIBSSL
+
/* Define to 1 if you have the `stdc++' library (-lstdc++). */
#undef HAVE_LIBSTDC__
@@ -61,6 +64,9 @@
/* Define to 1 if you have the <netinet/tcp.h> header file. */
#undef HAVE_NETINET_TCP_H
+/* Define to 1 if you have the <openssl/md5.h> header file. */
+#undef HAVE_OPENSSL_MD5_H
+
/* Define to 1 if you have the `perror' function. */
#undef HAVE_PERROR