From 1d76284dee8e9b0735a26ee98a3edcd9f5208f09 Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Sun, 20 May 2012 07:40:11 +0200 Subject: Implement SP wire protocol Implements the SP wire protocol, and infrastructure for legacy wire protocol support. Also added an XS_SERVICE_ID socket option to set the service id and renamed the XS_PROTOCOL option to XS_PATTERN_VERSION. The following pattern versions are supported: PAIR: v3 PUBSUB: v1 (legacy), v4 REQREP: v2 PIPELINE: v3 SURVEY: v2 Note that all existing pattern versions have been bumped by 1 to allow for use of legacy protocols (otherwise there would be no way to distinguish between e.g. PUBSUB v3 and PUBSUB v3 using SP). Signed-off-by: Martin Lucina --- include/xs.h | 3 ++- src/options.cpp | 42 ++++++++++++++++++++++++------------- src/options.hpp | 12 +++++++++-- src/pair.cpp | 33 +++++++++++++++++++++++++++++ src/pair.hpp | 1 + src/pgm_receiver.cpp | 31 +++++++++++++++++++++------ src/pgm_receiver.hpp | 4 ++++ src/pgm_sender.cpp | 39 ++++++++++++++++++++++++---------- src/pgm_sender.hpp | 15 ++++++++++--- src/pipe.cpp | 14 ++++++------- src/pipe.hpp | 16 +++++++------- src/pull.cpp | 33 +++++++++++++++++++++++++++++ src/pull.hpp | 1 + src/push.cpp | 33 +++++++++++++++++++++++++++++ src/push.hpp | 1 + src/session_base.cpp | 2 +- src/socket_base.cpp | 4 ++-- src/stream_engine.cpp | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++- src/stream_engine.hpp | 17 +++++++++++++++ src/sub.cpp | 7 +++---- src/wire.hpp | 45 +++++++++++++++++++++++++++++++++++++-- src/xpub.cpp | 49 +++++++++++++++++++++++++++++++++++++++---- src/xpub.hpp | 1 + src/xrep.cpp | 32 ++++++++++++++++++++++++++++ src/xrep.hpp | 1 + src/xreq.cpp | 33 +++++++++++++++++++++++++++++ src/xreq.hpp | 1 + src/xrespondent.cpp | 32 ++++++++++++++++++++++++++++ src/xrespondent.hpp | 1 + src/xsub.cpp | 55 +++++++++++++++++++++++++++++++++++++++++------- src/xsub.hpp | 1 + src/xsurveyor.cpp | 33 +++++++++++++++++++++++++++++ src/xsurveyor.hpp | 1 + tests/libzmq21.cpp | 10 +++++---- tests/wireformat.cpp | 19 +++++++++++------ 35 files changed, 597 insertions(+), 83 deletions(-) diff --git a/include/xs.h b/include/xs.h index 56efa52..9b09812 100644 --- a/include/xs.h +++ b/include/xs.h @@ -206,8 +206,9 @@ XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval, #define XS_SNDTIMEO 28 #define XS_IPV4ONLY 31 #define XS_KEEPALIVE 32 -#define XS_PROTOCOL 33 +#define XS_PATTERN_VERSION 33 #define XS_SURVEY_TIMEOUT 35 +#define XS_SERVICE_ID 36 /* Message options */ #define XS_MORE 1 diff --git a/src/options.cpp b/src/options.cpp index f7bbdc4..bda0c01 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -48,7 +48,12 @@ xs::options_t::options_t () : sndtimeo (-1), ipv4only (1), keepalive (0), - protocol (0), + legacy_protocol (false), + sp_service (0), + sp_pattern (-1), + sp_version (-1), + sp_role (-1), + sp_complement (-1), filter (XS_FILTER_PREFIX), survey_timeout (-1), delay_on_close (true), @@ -236,29 +241,29 @@ int xs::options_t::setsockopt (int option_, const void *optval_, return 0; } - case XS_PROTOCOL: + case XS_FILTER: + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + filter = *((int*) optval_); + return 0; + + case XS_SERVICE_ID: { if (optvallen_ != sizeof (int)) { errno = EINVAL; return -1; } int val = *((int*) optval_); - if (val < 0) { + if (val < 0 || val > 0xffff) { errno = EINVAL; return -1; } - protocol = val; + sp_service = val; return 0; } - case XS_FILTER: - if (optvallen_ != sizeof (int)) { - errno = EINVAL; - return -1; - } - filter = *((int*) optval_); - return 0; - case XS_SURVEY_TIMEOUT: if (type != XS_SURVEYOR) { errno = ENOTSUP; @@ -452,12 +457,21 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (int); return 0; - case XS_PROTOCOL: + case XS_PATTERN_VERSION: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = sp_version; + *optvallen_ = sizeof (int); + return 0; + + case XS_SERVICE_ID: if (*optvallen_ < sizeof (int)) { errno = EINVAL; return -1; } - *((int*) optval_) = protocol; + *((int*) optval_) = sp_service; *optvallen_ = sizeof (int); return 0; diff --git a/src/options.hpp b/src/options.hpp index 805f793..9070c0f 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -92,8 +92,16 @@ namespace xs // If 1, keepalives are to be sent periodically. int keepalive; - // Version of wire protocol to use. - int protocol; + // If true, the legacy non-SP wire protocol is in use. + bool legacy_protocol; + + // SP protocol service id, pattern, version, role and complementary + // role. + int sp_service; + int sp_pattern; + int sp_version; + int sp_role; + int sp_complement; // Filter ID to be used with subscriptions and unsubscriptions. int filter; diff --git a/src/pair.cpp b/src/pair.cpp index 531bfc5..bfb48d3 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -23,12 +23,17 @@ #include "err.hpp" #include "pipe.hpp" #include "msg.hpp" +#include "wire.hpp" xs::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), pipe (NULL) { options.type = XS_PAIR; + options.sp_pattern = SP_PAIR; + options.sp_role = SP_PAIR_PAIR; + options.sp_version = 3; + options.sp_complement = SP_PAIR_PAIR; } xs::pair_t::~pair_t () @@ -36,6 +41,34 @@ xs::pair_t::~pair_t () xs_assert (!pipe); } +int xs::pair_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ != XS_PATTERN_VERSION) { + errno = EINVAL; + return -1; + } + + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + + if (!optval_) { + errno = EFAULT; + return -1; + } + + int version = *(int *) optval_; + if (version != 2) { + errno = EINVAL; + return -1; + } + + options.sp_version = version; + return 0; +} + void xs::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_ != NULL); diff --git a/src/pair.hpp b/src/pair.hpp index 07ed6c5..6a5f167 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -42,6 +42,7 @@ namespace xs ~pair_t (); // Overloads of functions from socket_base_t. + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (xs::msg_t *msg_, int flags_); int xrecv (xs::msg_t *msg_, int flags_); diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 371c657..9aa1233 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -47,6 +47,11 @@ xs::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, pending_bytes (0), rx_timer (NULL) { + // If not using a legacy protocol, fill in desired protocol header. + if (!options.legacy_protocol) { + sp_get_header (desired_header, options.sp_service, options.sp_pattern, + options.sp_version, options.sp_complement); + } } xs::pgm_receiver_t::~pgm_receiver_t () @@ -173,7 +178,7 @@ void xs::pgm_receiver_t::in_event (fd_t fd_) data = (unsigned char*) tmp; // No data to process. This may happen if the packet received is - // neither ODATA nor ODATA. + // neither RDATA nor ODATA. if (received == 0) { if (errno == ENOMEM || errno == EBUSY) { const long timeout = pgm_socket.get_rx_timeout (); @@ -200,18 +205,32 @@ void xs::pgm_receiver_t::in_event (fd_t fd_) break; } - // New peer. Add it to the list of know but unjoint peers. - if (it == peers.end ()) { - peer_info_t peer_info = {false, NULL}; - it = peers.insert (peers_t::value_type (*tsi, peer_info)).first; + // Check protocol header. + if (unlikely (!options.legacy_protocol)) { + if (received < (ssize_t) sizeof desired_header) + // Ignore malformed datagram. + continue; + if (memcmp (data, desired_header, sizeof desired_header) != 0) + // Ignore datagram with incorrect protocol header. + continue; + data += sizeof desired_header; + received -= sizeof desired_header; } // Read the offset of the fist message in the current packet. - xs_assert ((size_t) received >= sizeof (uint16_t)); + if (received < (ssize_t) sizeof (uint16_t)) + // Ignore malformed datagram. + continue; uint16_t offset = get_uint16 (data); data += sizeof (uint16_t); received -= sizeof (uint16_t); + // New peer. Add it to the list of known but disjoint peers. + if (it == peers.end ()) { + peer_info_t peer_info = {false, NULL}; + it = peers.insert (peers_t::value_type (*tsi, peer_info)).first; + } + // Join the stream if needed. if (!it->second.joined) { diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 38c7a7b..c44594a 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -39,6 +39,7 @@ #include "options.hpp" #include "decoder.hpp" #include "pgm_socket.hpp" +#include "wire.hpp" namespace xs { @@ -125,6 +126,9 @@ namespace xs // Receive timer, if active, otherwise NULL. handle_t rx_timer; + // Desired protocol header. + sp_header_t desired_header; + pgm_receiver_t (const pgm_receiver_t&); const pgm_receiver_t &operator = (const pgm_receiver_t&); }; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index d476cbb..528ac7a 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -60,6 +60,25 @@ int xs::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) out_buffer_size = pgm_socket.get_max_tsdu_size (); out_buffer = (unsigned char*) malloc (out_buffer_size); alloc_assert (out_buffer); + encode_buffer = out_buffer; + encode_buffer_size = out_buffer_size; + header_size = 0; + + // If not using a legacy protocol, fill in our datagram header and reserve + // space for it in the datagram. + if (!options.legacy_protocol) { + sp_get_header (out_buffer, options.sp_service, options.sp_pattern, + options.sp_version, options.sp_role); + encode_buffer += SP_HEADER_LENGTH; + encode_buffer_size -= SP_HEADER_LENGTH; + header_size += SP_HEADER_LENGTH; + } + + // Reserve space in the datagram for the offset of the first message. + offset_p = encode_buffer; + encode_buffer += sizeof (uint16_t); + encode_buffer_size -= sizeof (uint16_t); + header_size += sizeof (uint16_t); return rc; } @@ -156,27 +175,25 @@ void xs::pgm_sender_t::in_event (fd_t fd_) void xs::pgm_sender_t::out_event (fd_t fd_) { - // POLLOUT event from send socket. If write buffer is empty, - // try to read new data from the encoder. + // POLLOUT event from send socket. If write buffer is empty (which means + // that the last write succeeded), try to read new data from the encoder. if (write_size == 0) { - // First two bytes (sizeof uint16_t) are used to store message - // offset in following steps. Note that by passing our buffer to - // the get data function we prevent it from returning its own buffer. - unsigned char *bf = out_buffer + sizeof (uint16_t); - size_t bfsz = out_buffer_size - sizeof (uint16_t); + // Pass our own buffer to the get_data () function to prevent it from + // returning its own buffer. int offset = -1; - encoder.get_data (&bf, &bfsz, &offset); + size_t data_size = encode_buffer_size; + encoder.get_data (&encode_buffer, &data_size, &offset); // If there are no data to write stop polling for output. - if (!bfsz) { + if (!data_size) { reset_pollout (handle); return; } // Put offset information in the buffer. - write_size = bfsz + sizeof (uint16_t); - put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset); + write_size = header_size + data_size; + put_uint16 (offset_p, offset == -1 ? 0xffff : (uint16_t) offset); } if (tx_timer) { diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 89f81bf..5a808b6 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -84,12 +84,21 @@ namespace xs handle_t rdata_notify_handle; handle_t pending_notify_handle; - // Output buffer from pgm_socket. + // Output buffer and size for pgm_socket. unsigned char *out_buffer; - - // Output buffer size. size_t out_buffer_size; + // Size of header in each datagram. + size_t header_size; + + // Encoder buffer and size, adjusted from output buffer by size of + // datagram header(s). + unsigned char *encode_buffer; + size_t encode_buffer_size; + + // Position of offset to first message in output buffer. + unsigned char *offset_p; + // Number of bytes in the buffer to be written to the socket. // If zero, there are no data to be sent. size_t write_size; diff --git a/src/pipe.cpp b/src/pipe.cpp index 0a15cc0..c14642f 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -27,7 +27,7 @@ #include "err.hpp" int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2], int protocol_) + int hwms_ [2], bool delays_ [2], int sp_version_) { // Creates two pipe objects. These objects are connected by two ypipes, // each to pass messages in one direction. @@ -38,10 +38,10 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], alloc_assert (upipe2); pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, - hwms_ [1], hwms_ [0], delays_ [0], protocol_); + hwms_ [1], hwms_ [0], delays_ [0], sp_version_); alloc_assert (pipes_ [0]); pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, - hwms_ [0], hwms_ [1], delays_ [1], protocol_); + hwms_ [0], hwms_ [1], delays_ [1], sp_version_); alloc_assert (pipes_ [1]); pipes_ [0]->set_peer (pipes_ [1]); @@ -51,7 +51,7 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], } xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_, int protocol_) : + int inhwm_, int outhwm_, bool delay_, int sp_version_) : object_t (parent_), inpipe (inpipe_), outpipe (outpipe_), @@ -66,7 +66,7 @@ xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, sink (NULL), state (active), delay (delay_), - protocol (protocol_) + sp_version (sp_version_) { } @@ -386,9 +386,9 @@ void xs::pipe_t::terminate (bool delay_) } } -int xs::pipe_t::get_protocol () +int xs::pipe_t::get_sp_version () { - return protocol; + return sp_version; } bool xs::pipe_t::is_delimiter (msg_t &msg_) diff --git a/src/pipe.hpp b/src/pipe.hpp index c298154..dbc6b7e 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -44,7 +44,7 @@ namespace xs // pipe receives all the pending messages before terminating, otherwise it // terminates straight away. int pipepair (xs::object_t *parents_ [2], xs::pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2], int protocol_); + int hwms_ [2], bool delays_ [2], int sp_version_); struct i_pipe_events { @@ -69,7 +69,7 @@ namespace xs // This allows pipepair to create pipe objects. friend int pipepair (xs::object_t *parents_ [2], xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2], - int protocol_); + int sp_version_); public: @@ -111,8 +111,8 @@ namespace xs // before actual shutdown. void terminate (bool delay_); - // Returns the ID of the protocol associated with the pipe. - int get_protocol (); + // Returns the SP pattern version in use on this pipe. + int get_sp_version (); private: @@ -132,7 +132,7 @@ namespace xs // Constructor is private. Pipe can only be created using // pipepair function. pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_, int protocol_); + int inhwm_, int outhwm_, bool delay_, int sp_version_); // Pipepair uses this function to let us know about // the peer pipe object. @@ -192,9 +192,9 @@ namespace xs // asks us to. bool delay; - // ID of the protocol to use. This value is not used by the pipe - // itself, rather it's used by the users of the pipe. - int protocol; + // SP pattern version in use on this pipe. This value is used by the + // pattern classes using the pipe, not the pipe itself. + int sp_version; // Identity of the writer. Used uniquely by the reader side. blob_t identity; diff --git a/src/pull.cpp b/src/pull.cpp index 8ae8208..0cedc39 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -23,17 +23,50 @@ #include "err.hpp" #include "msg.hpp" #include "pipe.hpp" +#include "wire.hpp" xs::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_) { options.type = XS_PULL; + options.sp_pattern = SP_PIPELINE; + options.sp_version = 3; + options.sp_role = SP_PIPELINE_PULL; + options.sp_complement = SP_PIPELINE_PUSH; } xs::pull_t::~pull_t () { } +int xs::pull_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ != XS_PATTERN_VERSION) { + errno = EINVAL; + return -1; + } + + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + + if (!optval_) { + errno = EFAULT; + return -1; + } + + int version = *(int *) optval_; + if (version != 2) { + errno = EINVAL; + return -1; + } + + options.sp_version = version; + return 0; +} + void xs::pull_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); diff --git a/src/pull.hpp b/src/pull.hpp index 04da465..5453bbd 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -45,6 +45,7 @@ namespace xs protected: // Overloads of functions from socket_base_t. + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xrecv (xs::msg_t *msg_, int flags_); bool xhas_in (); diff --git a/src/push.cpp b/src/push.cpp index 59f508f..729c97a 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -23,17 +23,50 @@ #include "pipe.hpp" #include "err.hpp" #include "msg.hpp" +#include "wire.hpp" xs::push_t::push_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_) { options.type = XS_PUSH; + options.sp_pattern = SP_PIPELINE; + options.sp_version = 3; + options.sp_role = SP_PIPELINE_PUSH; + options.sp_complement = SP_PIPELINE_PULL; } xs::push_t::~push_t () { } +int xs::push_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ != XS_PATTERN_VERSION) { + errno = EINVAL; + return -1; + } + + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + + if (!optval_) { + errno = EFAULT; + return -1; + } + + int version = *(int *) optval_; + if (version != 2) { + errno = EINVAL; + return -1; + } + + options.sp_version = version; + return 0; +} + void xs::push_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); diff --git a/src/push.hpp b/src/push.hpp index a112f31..ffe16cc 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -45,6 +45,7 @@ namespace xs protected: // Overloads of functions from socket_base_t. + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (xs::msg_t *msg_, int flags_); bool xhas_out (); diff --git a/src/session_base.cpp b/src/session_base.cpp index 48d8811..da9ee8e 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -298,7 +298,7 @@ void xs::session_base_t::process_attach (i_engine *engine_) pipe_t *pipes [2] = {NULL, NULL}; int hwms [2] = {options.rcvhwm, options.sndhwm}; bool delays [2] = {options.delay_on_close, options.delay_on_disconnect}; - int rc = pipepair (parents, pipes, hwms, delays, options.protocol); + int rc = pipepair (parents, pipes, hwms, delays, options.sp_version); errno_assert (rc == 0); // Plug the local end of the pipe. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 86c01b0..960bd2c 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -451,7 +451,7 @@ int xs::socket_base_t::connect (const char *addr_) pipe_t *ppair [2] = {NULL, NULL}; int hwms [2] = {sndhwm, rcvhwm}; bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; - rc = pipepair (parents, ppair, hwms, delays, options.protocol); + rc = pipepair (parents, ppair, hwms, delays, options.sp_version); errno_assert (rc == 0); // Attach local end of the pipe to this socket object. @@ -535,7 +535,7 @@ int xs::socket_base_t::connect (const char *addr_) pipe_t *ppair [2] = {NULL, NULL}; int hwms [2] = {options.sndhwm, options.rcvhwm}; bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; - rc = pipepair (parents, ppair, hwms, delays, options.protocol); + rc = pipepair (parents, ppair, hwms, delays, options.sp_version); errno_assert (rc == 0); // PGM does not support subscription forwarding; ask for all data to be diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index e70d1d3..45a6af6 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -53,8 +53,21 @@ xs::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : session (NULL), leftover_session (NULL), options (options_), - plugged (false) + plugged (false), + header_pos (in_header), + header_remaining (sizeof in_header), + header_received (false), + header_sent (false) { + // Fill in outgoing SP protocol header and the complementary (desired) + // header. + if (!options.legacy_protocol) { + sp_get_header (out_header, options.sp_service, options.sp_pattern, + options.sp_version, options.sp_role); + sp_get_header (desired_header, options.sp_service, options.sp_pattern, + options.sp_version, options.sp_complement); + } + // Get the socket into non-blocking mode. unblock_socket (s); @@ -155,6 +168,35 @@ void xs::stream_engine_t::in_event (fd_t fd_) { bool disconnection = false; + // If we have not yet received the full protocol header... + if (unlikely (!options.legacy_protocol && !header_received)) { + + // Read remaining header bytes. + int hbytes = read (header_pos, header_remaining); + + // Check whether the peer has closed the connection. + if (hbytes == -1) { + error (); + return; + } + + header_remaining -= hbytes; + header_pos += hbytes; + + // If we did not read the whole header, poll for more. + if (header_remaining) + return; + + // If the protocol headers do not match, close the connection. + if (memcmp (in_header, desired_header, sizeof in_header) != 0) { + error (); + return; + } + + // Done with protocol header; proceed to read data. + header_received = true; + } + // If there's no data to process in the buffer... if (!insize) { @@ -210,6 +252,20 @@ void xs::stream_engine_t::out_event (fd_t fd_) { bool more_data = true; + // If protocol header was not yet sent... + if (unlikely (!options.legacy_protocol && !header_sent)) { + int hbytes = write (out_header, sizeof out_header); + + // It should always be possible to write the full protocol header to a + // freshly connected TCP socket. Therefore, if we get an error or + // partial write here the peer has disconnected. + if (hbytes != sizeof out_header) { + error (); + return; + } + header_sent = true; + } + // If write buffer is empty, try to read new data from the encoder. if (!outsize) { diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 46a6080..70c9ddb 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -30,6 +30,7 @@ #include "encoder.hpp" #include "decoder.hpp" #include "options.hpp" +#include "wire.hpp" namespace xs { @@ -98,6 +99,22 @@ namespace xs bool plugged; + // Outgoing protocol header. + sp_header_t out_header; + + // Desired protocol header. + sp_header_t desired_header; + + // Incoming protocol header. + sp_header_t in_header; + + unsigned char *header_pos; + size_t header_remaining; + + // Protocol header has been received/sent. + bool header_received; + bool header_sent; + stream_engine_t (const stream_engine_t&); const stream_engine_t &operator = (const stream_engine_t&); }; diff --git a/src/sub.cpp b/src/sub.cpp index d29ae8d..79cb63c 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -48,8 +48,7 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { if (option_ != XS_SUBSCRIBE && option_ != XS_UNSUBSCRIBE) { - errno = EINVAL; - return -1; + return xsub_t::xsetsockopt (option_, optval_, optvallen_); } if (optvallen_ > 0 && !optval_) { @@ -202,7 +201,7 @@ int xs::sub_t::filter_subscribed (const unsigned char *data_, size_t size_) int rc = msg.init_size (size_ + 4); errno_assert (rc == 0); unsigned char *data = (unsigned char*) msg.data (); - put_uint16 (data, XS_CMD_SUBSCRIBE); + put_uint16 (data, SP_PUBSUB_CMD_SUBSCRIBE); put_uint16 (data + 2, options.filter); memcpy (data + 4, data_, size_); @@ -225,7 +224,7 @@ int xs::sub_t::filter_unsubscribed (const unsigned char *data_, size_t size_) int rc = msg.init_size (size_ + 4); errno_assert (rc == 0); unsigned char *data = (unsigned char*) msg.data (); - put_uint16 (data, XS_CMD_UNSUBSCRIBE); + put_uint16 (data, SP_PUBSUB_CMD_UNSUBSCRIBE); put_uint16 (data + 2, options.filter); memcpy (data + 4, data_, size_); diff --git a/src/wire.hpp b/src/wire.hpp index f840fce..014021b 100644 --- a/src/wire.hpp +++ b/src/wire.hpp @@ -24,11 +24,52 @@ #include "stdint.hpp" // Protocol-related constants. -#define XS_CMD_SUBSCRIBE 1 -#define XS_CMD_UNSUBSCRIBE 2 + +// Protocol header. +#define SP_HEADER_LENGTH 8 + +// Patterns. +#define SP_PAIR 1 +#define SP_PUBSUB 2 +#define SP_REQREP 3 +#define SP_PIPELINE 4 +#define SP_SURVEY 5 + +// Roles. +#define SP_PAIR_PAIR 1 +#define SP_PUBSUB_PUB 1 +#define SP_PUBSUB_SUB 2 +#define SP_REQREP_REQ 1 +#define SP_REQREP_REP 2 +#define SP_PIPELINE_PUSH 1 +#define SP_PIPELINE_PULL 2 +#define SP_SURVEY_SURVEYOR 1 +#define SP_SURVEY_RESPONDENT 2 + +// PUBSUB pattern commands. +#define SP_PUBSUB_CMD_SUBSCRIBE 1 +#define SP_PUBSUB_CMD_UNSUBSCRIBE 2 namespace xs { + // Protocol header type. + typedef unsigned char sp_header_t [SP_HEADER_LENGTH]; + + // Get the SP protocol header for the specified service, pattern, version + // and role. + inline void sp_get_header (sp_header_t header_, int service_, int pattern_, + int version_, int role_) + { + header_ [0] = 0; // Protocol identifier + header_ [1] = 'S'; // " + header_ [2] = 'P'; // " + header_ [3] = 0; // Reserved, must be zero. + header_ [4] = (service_ >> 8) & 0xff; // Service id high byte + header_ [5] = service_ & 0xff; // Service id low byte + header_ [6] = pattern_ & 0xff; // Pattern + header_ [7] = (version_ & 0xf) << 4; // Pattern version + header_ [7] |= role_ & 0xf; // Pattern role + } // Helper functions to convert different integer types to/from network // byte order. diff --git a/src/xpub.cpp b/src/xpub.cpp index fe0b9a7..17626b6 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -35,6 +35,10 @@ xs::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : tmp_filter_id (-1) { options.type = XS_XPUB; + options.sp_pattern = SP_PUBSUB; + options.sp_version = 4; + options.sp_role = SP_PUBSUB_PUB; + options.sp_complement = SP_PUBSUB_SUB; } xs::xpub_t::~xpub_t () @@ -44,6 +48,42 @@ xs::xpub_t::~xpub_t () it->type->pf_destroy ((void*) (core_t*) this, it->instance); } +int xs::xpub_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ != XS_PATTERN_VERSION) { + errno = EINVAL; + return -1; + } + + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + + if (!optval_) { + errno = EFAULT; + return -1; + } + + int version = *(int *) optval_; + switch (version) { + case 1: + options.legacy_protocol = true; + options.sp_version = 1; + break; + case 3: + options.legacy_protocol = false; + options.sp_version = 3; + break; + default: + errno = EINVAL; + return -1; + } + + return 0; +} + void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); @@ -53,7 +93,7 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) // to all data on this pipe, implicitly. Also, if we are using // 0MQ/2.1-style protocol, there's no subscription forwarding. Thus, // we need to subscribe for all messages automatically. - if (icanhasall_|| pipe_->get_protocol () == 1) { + if (icanhasall_|| pipe_->get_sp_version () == 1) { // Find the prefix filter. // TODO: Change this to ALL filter. @@ -121,7 +161,8 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_) int filter_id = XS_FILTER_PREFIX; #endif - if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) { + if (cmd != SP_PUBSUB_CMD_SUBSCRIBE && + cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) { sub.close (); return; } @@ -133,7 +174,7 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_) break; bool unique; - if (cmd == XS_CMD_UNSUBSCRIBE) { + if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) { xs_assert (it != filters.end ()); unique = it->type->pf_unsubscribe ((void*) (core_t*) this, it->instance, pipe_, data + 4, size - 4) ? true : false; @@ -258,7 +299,7 @@ int xs::xpub_t::filter_unsubscribed (const unsigned char *data_, size_t size_) // Place the unsubscription to the queue of pending (un)sunscriptions // to be retrived by the user later on. blob_t unsub (size_ + 4, 0); - put_uint16 ((unsigned char*) unsub.data (), XS_CMD_UNSUBSCRIBE); + put_uint16 ((unsigned char*) unsub.data (), SP_PUBSUB_CMD_UNSUBSCRIBE); put_uint16 ((unsigned char*) unsub.data () + 2, tmp_filter_id); memcpy ((void*) (unsub.data () + 4), data_, size_); diff --git a/src/xpub.hpp b/src/xpub.hpp index c0e47ff..f9cb737 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -49,6 +49,7 @@ namespace xs ~xpub_t (); // Implementations of virtual functions from socket_base_t. + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (xs::msg_t *msg_, int flags_); bool xhas_out (); diff --git a/src/xrep.cpp b/src/xrep.cpp index 007ed27..30f1419 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -36,6 +36,10 @@ xs::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_, int sid_) : next_peer_id (generate_random ()) { options.type = XS_XREP; + options.sp_pattern = SP_REQREP; + options.sp_version = 2; + options.sp_role = SP_REQREP_REP; + options.sp_complement = SP_REQREP_REQ; // TODO: Uncomment the following line when XREP will become true XREP // rather than generic router socket. @@ -55,6 +59,34 @@ xs::xrep_t::~xrep_t () prefetched_msg.close (); } +int xs::xrep_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ != XS_PATTERN_VERSION) { + errno = EINVAL; + return -1; + } + + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + + if (!optval_) { + errno = EFAULT; + return -1; + } + + int version = *(int *) optval_; + if (version != 1) { + errno = EINVAL; + return -1; + } + + options.sp_version = version; + return 0; +} + void xs::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); diff --git a/src/xrep.hpp b/src/xrep.hpp index 8e16a4c..71a7bf4 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -49,6 +49,7 @@ namespace xs ~xrep_t (); // Overloads of functions from socket_base_t. + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (msg_t *msg_, int flags_); int xrecv (msg_t *msg_, int flags_); diff --git a/src/xreq.cpp b/src/xreq.cpp index 1c6af9d..ac6ba4d 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -22,12 +22,17 @@ #include "xreq.hpp" #include "err.hpp" #include "msg.hpp" +#include "wire.hpp" xs::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), prefetched (false) { options.type = XS_XREQ; + options.sp_pattern = SP_REQREP; + options.sp_version = 2; + options.sp_role = SP_REQREP_REQ; + options.sp_complement = SP_REQREP_REP; // TODO: Uncomment the following line when XREQ will become true XREQ // rather than generic dealer socket. @@ -46,6 +51,34 @@ xs::xreq_t::~xreq_t () prefetched_msg.close (); } +int xs::xreq_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ != XS_PATTERN_VERSION) { + errno = EINVAL; + return -1; + } + + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + + if (!optval_) { + errno = EFAULT; + return -1; + } + + int version = *(int *) optval_; + if (version != 1) { + errno = EINVAL; + return -1; + } + + options.sp_version = version; + return 0; +} + void xs::xreq_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); diff --git a/src/xreq.hpp b/src/xreq.hpp index c4f9baf..9fe6ebb 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -46,6 +46,7 @@ namespace xs protected: // Overloads of functions from socket_base_t. + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (xs::msg_t *msg_, int flags_); int xrecv (xs::msg_t *msg_, int flags_); diff --git a/src/xrespondent.cpp b/src/xrespondent.cpp index c16d900..9d949ce 100644 --- a/src/xrespondent.cpp +++ b/src/xrespondent.cpp @@ -35,6 +35,10 @@ xs::xrespondent_t::xrespondent_t (class ctx_t *parent_, uint32_t tid_, next_peer_id (generate_random ()) { options.type = XS_XRESPONDENT; + options.sp_pattern = SP_SURVEY; + options.sp_version = 2; + options.sp_role = SP_SURVEY_RESPONDENT; + options.sp_complement = SP_SURVEY_SURVEYOR; // If the connection disappears it makes no sense to read any more surveys // from it. The responses will be unroutable anyway. @@ -49,6 +53,34 @@ xs::xrespondent_t::~xrespondent_t () prefetched_msg.close (); } +int xs::xrespondent_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ != XS_PATTERN_VERSION) { + errno = EINVAL; + return -1; + } + + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + + if (!optval_) { + errno = EFAULT; + return -1; + } + + int version = *(int *) optval_; + if (version != 1) { + errno = EINVAL; + return -1; + } + + options.sp_version = version; + return 0; +} + void xs::xrespondent_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); diff --git a/src/xrespondent.hpp b/src/xrespondent.hpp index eb6ac9a..4143e36 100644 --- a/src/xrespondent.hpp +++ b/src/xrespondent.hpp @@ -47,6 +47,7 @@ namespace xs ~xrespondent_t (); // Overloads of functions from socket_base_t. + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (msg_t *msg_, int flags_); int xrecv (msg_t *msg_, int flags_); diff --git a/src/xsub.cpp b/src/xsub.cpp index 7272a8e..9255ceb 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -29,6 +29,10 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_) { options.type = XS_XSUB; + options.sp_pattern = SP_PUBSUB; + options.sp_version = 4; + options.sp_role = SP_PUBSUB_SUB; + options.sp_complement = SP_PUBSUB_PUB; // When socket is being closed down we don't want to wait till pending // subscription commands are sent to the wire. @@ -42,6 +46,42 @@ xs::xsub_t::~xsub_t () { } +int xs::xsub_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ != XS_PATTERN_VERSION) { + errno = EINVAL; + return -1; + } + + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + + if (!optval_) { + errno = EFAULT; + return -1; + } + + int version = *(int *) optval_; + switch (version) { + case 1: + options.legacy_protocol = true; + options.sp_version = 1; + break; + case 3: + options.legacy_protocol = false; + options.sp_version = 3; + break; + default: + errno = EINVAL; + return -1; + } + + return 0; +} + void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); @@ -49,7 +89,7 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) // Pipes with 0MQ/2.1-style protocol are not eligible for accepting // subscriptions. - if (pipe_->get_protocol () == 1) + if (pipe_->get_sp_version () == 1) return; dist.attach (pipe_); @@ -75,14 +115,14 @@ void xs::xsub_t::xwrite_activated (pipe_t *pipe_) void xs::xsub_t::xterminated (pipe_t *pipe_) { fq.terminated (pipe_); - if (pipe_->get_protocol () != 1) + if (pipe_->get_sp_version () != 1) dist.terminated (pipe_); } void xs::xsub_t::xhiccuped (pipe_t *pipe_) { // In 0MQ/2.1 protocol there is no subscription forwarding. - if (pipe_->get_protocol () == 1) + if (pipe_->get_sp_version () == 1) return; // Send all the cached subscriptions to the new upstream peer. @@ -104,13 +144,13 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_) } int cmd = get_uint16 (data); int filter_id = get_uint16 (data + 2); - if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) { + if (cmd != SP_PUBSUB_CMD_SUBSCRIBE && cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) { errno = EINVAL; return -1; } // Process the subscription. - if (cmd == XS_CMD_SUBSCRIBE) { + if (cmd == SP_PUBSUB_CMD_SUBSCRIBE) { subscriptions_t::iterator it = subscriptions.insert ( std::make_pair (std::make_pair (filter_id, blob_t (data + 4, size - 4)), 0)).first; @@ -118,7 +158,7 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_) if (it->second == 1) return dist.send_to_all (msg_, flags_); } - else if (cmd == XS_CMD_UNSUBSCRIBE) { + else if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) { subscriptions_t::iterator it = subscriptions.find ( std::make_pair (filter_id, blob_t (data + 4, size - 4))); if (it != subscriptions.end ()) { @@ -162,7 +202,8 @@ void xs::xsub_t::send_subscription (pipe_t *pipe_, bool subscribe_, int rc = msg.init_size (size_ + 4); xs_assert (rc == 0); unsigned char *data = (unsigned char*) msg.data (); - put_uint16 (data, subscribe_ ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE); + put_uint16 (data, subscribe_ ? SP_PUBSUB_CMD_SUBSCRIBE : + SP_PUBSUB_CMD_UNSUBSCRIBE); put_uint16 (data + 2, filter_id_); memcpy (data + 4, data_, size_); diff --git a/src/xsub.hpp b/src/xsub.hpp index 5bc11c4..fcaf61c 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -47,6 +47,7 @@ namespace xs protected: // Overloads of functions from socket_base_t. + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (xs::msg_t *msg_, int flags_); bool xhas_out (); diff --git a/src/xsurveyor.cpp b/src/xsurveyor.cpp index 985b6d8..7016550 100644 --- a/src/xsurveyor.cpp +++ b/src/xsurveyor.cpp @@ -21,11 +21,16 @@ #include "xsurveyor.hpp" #include "err.hpp" #include "msg.hpp" +#include "wire.hpp" xs::xsurveyor_t::xsurveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_) { options.type = XS_XSURVEYOR; + options.sp_pattern = SP_SURVEY; + options.sp_version = 2; + options.sp_role = SP_SURVEY_SURVEYOR; + options.sp_complement = SP_SURVEY_RESPONDENT; // When the XSURVEYOR socket is close it makes no sense to send any pending // surveys. The responses will be unroutable anyway. @@ -36,6 +41,34 @@ xs::xsurveyor_t::~xsurveyor_t () { } +int xs::xsurveyor_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ != XS_PATTERN_VERSION) { + errno = EINVAL; + return -1; + } + + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + + if (!optval_) { + errno = EFAULT; + return -1; + } + + int version = *(int *) optval_; + if (version != 1) { + errno = EINVAL; + return -1; + } + + options.sp_version = version; + return 0; +} + void xs::xsurveyor_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); diff --git a/src/xsurveyor.hpp b/src/xsurveyor.hpp index b0c3f9c..e312667 100644 --- a/src/xsurveyor.hpp +++ b/src/xsurveyor.hpp @@ -45,6 +45,7 @@ namespace xs protected: // Overloads of functions from socket_base_t. + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (xs::msg_t *msg_, int flags_); int xrecv (xs::msg_t *msg_, int flags_); diff --git a/tests/libzmq21.cpp b/tests/libzmq21.cpp index ecf271b..3323ef0 100644 --- a/tests/libzmq21.cpp +++ b/tests/libzmq21.cpp @@ -53,8 +53,9 @@ int XS_TEST_MAIN () assert (ctx); void *pub = xs_socket (ctx, XS_PUB); assert (pub); - int protocol = 1; - int rc = xs_setsockopt (pub, XS_PROTOCOL, &protocol, sizeof (protocol)); + int pattern_version = 1; + int rc = xs_setsockopt (pub, XS_PATTERN_VERSION, &pattern_version, + sizeof (pattern_version)); assert (rc == 0); rc = xs_bind (pub, "tcp://127.0.0.1:5560"); assert (rc != -1); @@ -99,8 +100,9 @@ int XS_TEST_MAIN () assert (ctx); void *sub = xs_socket (ctx, XS_SUB); assert (sub); - protocol = 1; - rc = xs_setsockopt (sub, XS_PROTOCOL, &protocol, sizeof (protocol)); + pattern_version = 1; + rc = xs_setsockopt (sub, XS_PATTERN_VERSION, &pattern_version, + sizeof (pattern_version)); assert (rc == 0); rc = xs_setsockopt (sub, XS_SUBSCRIBE, "", 0); assert (rc == 0); diff --git a/tests/wireformat.cpp b/tests/wireformat.cpp index f3e0f96..f12f5aa 100644 --- a/tests/wireformat.cpp +++ b/tests/wireformat.cpp @@ -49,7 +49,11 @@ int XS_TEST_MAIN () assert (push); // Bind the peer and get the message. - int rc = xs_bind (pull, "tcp://127.0.0.1:5560"); + int service_id = 0x1234; + int rc = xs_setsockopt (pull, XS_SERVICE_ID, &service_id, + sizeof service_id); + assert (rc == 0); + rc = xs_bind (pull, "tcp://127.0.0.1:5560"); assert (rc != -1); rc = xs_bind (push, "tcp://127.0.0.1:5561"); assert (rc != -1); @@ -71,10 +75,10 @@ int XS_TEST_MAIN () assert (rc == 0); // Let's send some data and check if it arrived - rc = send (rpush, "\x04\0abc", 5, 0); - assert (rc == 5); + rc = send (rpush, "\0SP\0\x12\x34\x04\x31\x04\0abc", 13, 0); + assert (rc == 13); unsigned char buf [3]; - unsigned char buf2 [3]; + unsigned char buf2 [8]; rc = xs_recv (pull, buf, sizeof (buf), 0); assert (rc == 3); assert (!memcmp (buf, "abc", 3)); @@ -83,8 +87,11 @@ int XS_TEST_MAIN () rc = xs_send (push, buf, sizeof (buf), 0); assert (rc == 3); rc = recv (rpull, (char*) buf2, sizeof (buf2), 0); - assert (rc == 3); - assert (!memcmp (buf2, "\x04\0abc", 3)); + assert (rc == 8); + assert (!memcmp (buf2, "\0SP\0\0\0\x04\x31", 8)); + rc = recv (rpull, (char*) buf2, 5, 0); + assert (rc == 5); + assert (!memcmp (buf2, "\x04\0abc", 5)); #if defined XS_HAVE_WINDOWS rc = closesocket (rpush); -- cgit v1.2.3