From 1d76284dee8e9b0735a26ee98a3edcd9f5208f09 Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Sun, 20 May 2012 07:40:11 +0200 Subject: Implement SP wire protocol Implements the SP wire protocol, and infrastructure for legacy wire protocol support. Also added an XS_SERVICE_ID socket option to set the service id and renamed the XS_PROTOCOL option to XS_PATTERN_VERSION. The following pattern versions are supported: PAIR: v3 PUBSUB: v1 (legacy), v4 REQREP: v2 PIPELINE: v3 SURVEY: v2 Note that all existing pattern versions have been bumped by 1 to allow for use of legacy protocols (otherwise there would be no way to distinguish between e.g. PUBSUB v3 and PUBSUB v3 using SP). Signed-off-by: Martin Lucina --- src/pgm_receiver.cpp | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) (limited to 'src/pgm_receiver.cpp') diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 371c657..9aa1233 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -47,6 +47,11 @@ xs::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, pending_bytes (0), rx_timer (NULL) { + // If not using a legacy protocol, fill in desired protocol header. + if (!options.legacy_protocol) { + sp_get_header (desired_header, options.sp_service, options.sp_pattern, + options.sp_version, options.sp_complement); + } } xs::pgm_receiver_t::~pgm_receiver_t () @@ -173,7 +178,7 @@ void xs::pgm_receiver_t::in_event (fd_t fd_) data = (unsigned char*) tmp; // No data to process. This may happen if the packet received is - // neither ODATA nor ODATA. + // neither RDATA nor ODATA. if (received == 0) { if (errno == ENOMEM || errno == EBUSY) { const long timeout = pgm_socket.get_rx_timeout (); @@ -200,18 +205,32 @@ void xs::pgm_receiver_t::in_event (fd_t fd_) break; } - // New peer. Add it to the list of know but unjoint peers. - if (it == peers.end ()) { - peer_info_t peer_info = {false, NULL}; - it = peers.insert (peers_t::value_type (*tsi, peer_info)).first; + // Check protocol header. + if (unlikely (!options.legacy_protocol)) { + if (received < (ssize_t) sizeof desired_header) + // Ignore malformed datagram. + continue; + if (memcmp (data, desired_header, sizeof desired_header) != 0) + // Ignore datagram with incorrect protocol header. + continue; + data += sizeof desired_header; + received -= sizeof desired_header; } // Read the offset of the fist message in the current packet. - xs_assert ((size_t) received >= sizeof (uint16_t)); + if (received < (ssize_t) sizeof (uint16_t)) + // Ignore malformed datagram. + continue; uint16_t offset = get_uint16 (data); data += sizeof (uint16_t); received -= sizeof (uint16_t); + // New peer. Add it to the list of known but disjoint peers. + if (it == peers.end ()) { + peer_info_t peer_info = {false, NULL}; + it = peers.insert (peers_t::value_type (*tsi, peer_info)).first; + } + // Join the stream if needed. if (!it->second.joined) { -- cgit v1.2.3