summaryrefslogtreecommitdiff
path: root/src/pgm_receiver.cpp
diff options
context:
space:
mode:
authorMartin Lucina <martin@lucina.net>2012-05-20 07:40:11 +0200
committerMartin Lucina <martin@lucina.net>2012-05-20 07:40:11 +0200
commit1d76284dee8e9b0735a26ee98a3edcd9f5208f09 (patch)
treee6ac09d125e5353a3cfb4fbfd25b76f6dc7c308a /src/pgm_receiver.cpp
parent8c23de9f2abc2ec21d4b74785fd175050909176e (diff)
Implement SP wire protocolsp
Implements the SP wire protocol, and infrastructure for legacy wire protocol support. Also added an XS_SERVICE_ID socket option to set the service id and renamed the XS_PROTOCOL option to XS_PATTERN_VERSION. The following pattern versions are supported: PAIR: v3 PUBSUB: v1 (legacy), v4 REQREP: v2 PIPELINE: v3 SURVEY: v2 Note that all existing pattern versions have been bumped by 1 to allow for use of legacy protocols (otherwise there would be no way to distinguish between e.g. PUBSUB v3 and PUBSUB v3 using SP). Signed-off-by: Martin Lucina <martin@lucina.net>
Diffstat (limited to 'src/pgm_receiver.cpp')
-rw-r--r--src/pgm_receiver.cpp31
1 files changed, 25 insertions, 6 deletions
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 371c657..9aa1233 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -47,6 +47,11 @@ xs::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
pending_bytes (0),
rx_timer (NULL)
{
+ // If not using a legacy protocol, fill in desired protocol header.
+ if (!options.legacy_protocol) {
+ sp_get_header (desired_header, options.sp_service, options.sp_pattern,
+ options.sp_version, options.sp_complement);
+ }
}
xs::pgm_receiver_t::~pgm_receiver_t ()
@@ -173,7 +178,7 @@ void xs::pgm_receiver_t::in_event (fd_t fd_)
data = (unsigned char*) tmp;
// No data to process. This may happen if the packet received is
- // neither ODATA nor ODATA.
+ // neither RDATA nor ODATA.
if (received == 0) {
if (errno == ENOMEM || errno == EBUSY) {
const long timeout = pgm_socket.get_rx_timeout ();
@@ -200,18 +205,32 @@ void xs::pgm_receiver_t::in_event (fd_t fd_)
break;
}
- // New peer. Add it to the list of know but unjoint peers.
- if (it == peers.end ()) {
- peer_info_t peer_info = {false, NULL};
- it = peers.insert (peers_t::value_type (*tsi, peer_info)).first;
+ // Check protocol header.
+ if (unlikely (!options.legacy_protocol)) {
+ if (received < (ssize_t) sizeof desired_header)
+ // Ignore malformed datagram.
+ continue;
+ if (memcmp (data, desired_header, sizeof desired_header) != 0)
+ // Ignore datagram with incorrect protocol header.
+ continue;
+ data += sizeof desired_header;
+ received -= sizeof desired_header;
}
// Read the offset of the fist message in the current packet.
- xs_assert ((size_t) received >= sizeof (uint16_t));
+ if (received < (ssize_t) sizeof (uint16_t))
+ // Ignore malformed datagram.
+ continue;
uint16_t offset = get_uint16 (data);
data += sizeof (uint16_t);
received -= sizeof (uint16_t);
+ // New peer. Add it to the list of known but disjoint peers.
+ if (it == peers.end ()) {
+ peer_info_t peer_info = {false, NULL};
+ it = peers.insert (peers_t::value_type (*tsi, peer_info)).first;
+ }
+
// Join the stream if needed.
if (!it->second.joined) {