summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/xs/xs.h3
-rw-r--r--src/options.cpp42
-rw-r--r--src/options.hpp12
-rw-r--r--src/pair.cpp33
-rw-r--r--src/pair.hpp1
-rw-r--r--src/pgm_receiver.cpp31
-rw-r--r--src/pgm_receiver.hpp4
-rw-r--r--src/pgm_sender.cpp39
-rw-r--r--src/pgm_sender.hpp15
-rw-r--r--src/pipe.cpp14
-rw-r--r--src/pipe.hpp16
-rw-r--r--src/pull.cpp33
-rw-r--r--src/pull.hpp1
-rw-r--r--src/push.cpp33
-rw-r--r--src/push.hpp1
-rw-r--r--src/session_base.cpp2
-rw-r--r--src/socket_base.cpp4
-rw-r--r--src/stream_engine.cpp58
-rw-r--r--src/stream_engine.hpp17
-rw-r--r--src/sub.cpp7
-rw-r--r--src/wire.hpp45
-rw-r--r--src/xpub.cpp49
-rw-r--r--src/xpub.hpp1
-rw-r--r--src/xrep.cpp32
-rw-r--r--src/xrep.hpp1
-rw-r--r--src/xreq.cpp33
-rw-r--r--src/xreq.hpp1
-rw-r--r--src/xrespondent.cpp32
-rw-r--r--src/xrespondent.hpp1
-rw-r--r--src/xsub.cpp55
-rw-r--r--src/xsub.hpp1
-rw-r--r--src/xsurveyor.cpp33
-rw-r--r--src/xsurveyor.hpp1
-rw-r--r--tests/libzmq21.cpp10
-rw-r--r--tests/wireformat.cpp19
35 files changed, 597 insertions, 83 deletions
diff --git a/include/xs/xs.h b/include/xs/xs.h
index 56efa52..9b09812 100644
--- a/include/xs/xs.h
+++ b/include/xs/xs.h
@@ -206,8 +206,9 @@ XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval,
#define XS_SNDTIMEO 28
#define XS_IPV4ONLY 31
#define XS_KEEPALIVE 32
-#define XS_PROTOCOL 33
+#define XS_PATTERN_VERSION 33
#define XS_SURVEY_TIMEOUT 35
+#define XS_SERVICE_ID 36
/* Message options */
#define XS_MORE 1
diff --git a/src/options.cpp b/src/options.cpp
index 62c03a2..26fa62c 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -48,7 +48,12 @@ xs::options_t::options_t () :
sndtimeo (-1),
ipv4only (1),
keepalive (0),
- protocol (0),
+ legacy_protocol (false),
+ sp_service (0),
+ sp_pattern (-1),
+ sp_version (-1),
+ sp_role (-1),
+ sp_complement (-1),
filter (XS_FILTER_PREFIX),
survey_timeout (-1),
delay_on_close (true),
@@ -236,29 +241,29 @@ int xs::options_t::setsockopt (int option_, const void *optval_,
return 0;
}
- case XS_PROTOCOL:
+ case XS_FILTER:
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ filter = *((int*) optval_);
+ return 0;
+
+ case XS_SERVICE_ID:
{
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
int val = *((int*) optval_);
- if (val < 0) {
+ if (val < 0 || val > 0xffff) {
errno = EINVAL;
return -1;
}
- protocol = val;
+ sp_service = val;
return 0;
}
- case XS_FILTER:
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
- filter = *((int*) optval_);
- return 0;
-
case XS_SURVEY_TIMEOUT:
if (type != XS_SURVEYOR) {
errno = ENOTSUP;
@@ -452,12 +457,21 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int);
return 0;
- case XS_PROTOCOL:
+ case XS_PATTERN_VERSION:
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = sp_version;
+ *optvallen_ = sizeof (int);
+ return 0;
+
+ case XS_SERVICE_ID:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
- *((int*) optval_) = protocol;
+ *((int*) optval_) = sp_service;
*optvallen_ = sizeof (int);
return 0;
diff --git a/src/options.hpp b/src/options.hpp
index 805f793..9070c0f 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -92,8 +92,16 @@ namespace xs
// If 1, keepalives are to be sent periodically.
int keepalive;
- // Version of wire protocol to use.
- int protocol;
+ // If true, the legacy non-SP wire protocol is in use.
+ bool legacy_protocol;
+
+ // SP protocol service id, pattern, version, role and complementary
+ // role.
+ int sp_service;
+ int sp_pattern;
+ int sp_version;
+ int sp_role;
+ int sp_complement;
// Filter ID to be used with subscriptions and unsubscriptions.
int filter;
diff --git a/src/pair.cpp b/src/pair.cpp
index 531bfc5..bfb48d3 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -23,12 +23,17 @@
#include "err.hpp"
#include "pipe.hpp"
#include "msg.hpp"
+#include "wire.hpp"
xs::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
pipe (NULL)
{
options.type = XS_PAIR;
+ options.sp_pattern = SP_PAIR;
+ options.sp_role = SP_PAIR_PAIR;
+ options.sp_version = 3;
+ options.sp_complement = SP_PAIR_PAIR;
}
xs::pair_t::~pair_t ()
@@ -36,6 +41,34 @@ xs::pair_t::~pair_t ()
xs_assert (!pipe);
}
+int xs::pair_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PATTERN_VERSION) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ if (version != 2) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ options.sp_version = version;
+ return 0;
+}
+
void xs::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_ != NULL);
diff --git a/src/pair.hpp b/src/pair.hpp
index 07ed6c5..6a5f167 100644
--- a/src/pair.hpp
+++ b/src/pair.hpp
@@ -42,6 +42,7 @@ namespace xs
~pair_t ();
// Overloads of functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
int xrecv (xs::msg_t *msg_, int flags_);
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 371c657..9aa1233 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -47,6 +47,11 @@ xs::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
pending_bytes (0),
rx_timer (NULL)
{
+ // If not using a legacy protocol, fill in desired protocol header.
+ if (!options.legacy_protocol) {
+ sp_get_header (desired_header, options.sp_service, options.sp_pattern,
+ options.sp_version, options.sp_complement);
+ }
}
xs::pgm_receiver_t::~pgm_receiver_t ()
@@ -173,7 +178,7 @@ void xs::pgm_receiver_t::in_event (fd_t fd_)
data = (unsigned char*) tmp;
// No data to process. This may happen if the packet received is
- // neither ODATA nor ODATA.
+ // neither RDATA nor ODATA.
if (received == 0) {
if (errno == ENOMEM || errno == EBUSY) {
const long timeout = pgm_socket.get_rx_timeout ();
@@ -200,18 +205,32 @@ void xs::pgm_receiver_t::in_event (fd_t fd_)
break;
}
- // New peer. Add it to the list of know but unjoint peers.
- if (it == peers.end ()) {
- peer_info_t peer_info = {false, NULL};
- it = peers.insert (peers_t::value_type (*tsi, peer_info)).first;
+ // Check protocol header.
+ if (unlikely (!options.legacy_protocol)) {
+ if (received < (ssize_t) sizeof desired_header)
+ // Ignore malformed datagram.
+ continue;
+ if (memcmp (data, desired_header, sizeof desired_header) != 0)
+ // Ignore datagram with incorrect protocol header.
+ continue;
+ data += sizeof desired_header;
+ received -= sizeof desired_header;
}
// Read the offset of the fist message in the current packet.
- xs_assert ((size_t) received >= sizeof (uint16_t));
+ if (received < (ssize_t) sizeof (uint16_t))
+ // Ignore malformed datagram.
+ continue;
uint16_t offset = get_uint16 (data);
data += sizeof (uint16_t);
received -= sizeof (uint16_t);
+ // New peer. Add it to the list of known but disjoint peers.
+ if (it == peers.end ()) {
+ peer_info_t peer_info = {false, NULL};
+ it = peers.insert (peers_t::value_type (*tsi, peer_info)).first;
+ }
+
// Join the stream if needed.
if (!it->second.joined) {
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index 38c7a7b..c44594a 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -39,6 +39,7 @@
#include "options.hpp"
#include "decoder.hpp"
#include "pgm_socket.hpp"
+#include "wire.hpp"
namespace xs
{
@@ -125,6 +126,9 @@ namespace xs
// Receive timer, if active, otherwise NULL.
handle_t rx_timer;
+ // Desired protocol header.
+ sp_header_t desired_header;
+
pgm_receiver_t (const pgm_receiver_t&);
const pgm_receiver_t &operator = (const pgm_receiver_t&);
};
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index d476cbb..528ac7a 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -60,6 +60,25 @@ int xs::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
out_buffer_size = pgm_socket.get_max_tsdu_size ();
out_buffer = (unsigned char*) malloc (out_buffer_size);
alloc_assert (out_buffer);
+ encode_buffer = out_buffer;
+ encode_buffer_size = out_buffer_size;
+ header_size = 0;
+
+ // If not using a legacy protocol, fill in our datagram header and reserve
+ // space for it in the datagram.
+ if (!options.legacy_protocol) {
+ sp_get_header (out_buffer, options.sp_service, options.sp_pattern,
+ options.sp_version, options.sp_role);
+ encode_buffer += SP_HEADER_LENGTH;
+ encode_buffer_size -= SP_HEADER_LENGTH;
+ header_size += SP_HEADER_LENGTH;
+ }
+
+ // Reserve space in the datagram for the offset of the first message.
+ offset_p = encode_buffer;
+ encode_buffer += sizeof (uint16_t);
+ encode_buffer_size -= sizeof (uint16_t);
+ header_size += sizeof (uint16_t);
return rc;
}
@@ -156,27 +175,25 @@ void xs::pgm_sender_t::in_event (fd_t fd_)
void xs::pgm_sender_t::out_event (fd_t fd_)
{
- // POLLOUT event from send socket. If write buffer is empty,
- // try to read new data from the encoder.
+ // POLLOUT event from send socket. If write buffer is empty (which means
+ // that the last write succeeded), try to read new data from the encoder.
if (write_size == 0) {
- // First two bytes (sizeof uint16_t) are used to store message
- // offset in following steps. Note that by passing our buffer to
- // the get data function we prevent it from returning its own buffer.
- unsigned char *bf = out_buffer + sizeof (uint16_t);
- size_t bfsz = out_buffer_size - sizeof (uint16_t);
+ // Pass our own buffer to the get_data () function to prevent it from
+ // returning its own buffer.
int offset = -1;
- encoder.get_data (&bf, &bfsz, &offset);
+ size_t data_size = encode_buffer_size;
+ encoder.get_data (&encode_buffer, &data_size, &offset);
// If there are no data to write stop polling for output.
- if (!bfsz) {
+ if (!data_size) {
reset_pollout (handle);
return;
}
// Put offset information in the buffer.
- write_size = bfsz + sizeof (uint16_t);
- put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);
+ write_size = header_size + data_size;
+ put_uint16 (offset_p, offset == -1 ? 0xffff : (uint16_t) offset);
}
if (tx_timer) {
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 89f81bf..5a808b6 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -84,12 +84,21 @@ namespace xs
handle_t rdata_notify_handle;
handle_t pending_notify_handle;
- // Output buffer from pgm_socket.
+ // Output buffer and size for pgm_socket.
unsigned char *out_buffer;
-
- // Output buffer size.
size_t out_buffer_size;
+ // Size of header in each datagram.
+ size_t header_size;
+
+ // Encoder buffer and size, adjusted from output buffer by size of
+ // datagram header(s).
+ unsigned char *encode_buffer;
+ size_t encode_buffer_size;
+
+ // Position of offset to first message in output buffer.
+ unsigned char *offset_p;
+
// Number of bytes in the buffer to be written to the socket.
// If zero, there are no data to be sent.
size_t write_size;
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 0a15cc0..c14642f 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -27,7 +27,7 @@
#include "err.hpp"
int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
- int hwms_ [2], bool delays_ [2], int protocol_)
+ int hwms_ [2], bool delays_ [2], int sp_version_)
{
// Creates two pipe objects. These objects are connected by two ypipes,
// each to pass messages in one direction.
@@ -38,10 +38,10 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
alloc_assert (upipe2);
pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
- hwms_ [1], hwms_ [0], delays_ [0], protocol_);
+ hwms_ [1], hwms_ [0], delays_ [0], sp_version_);
alloc_assert (pipes_ [0]);
pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
- hwms_ [0], hwms_ [1], delays_ [1], protocol_);
+ hwms_ [0], hwms_ [1], delays_ [1], sp_version_);
alloc_assert (pipes_ [1]);
pipes_ [0]->set_peer (pipes_ [1]);
@@ -51,7 +51,7 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
}
xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
- int inhwm_, int outhwm_, bool delay_, int protocol_) :
+ int inhwm_, int outhwm_, bool delay_, int sp_version_) :
object_t (parent_),
inpipe (inpipe_),
outpipe (outpipe_),
@@ -66,7 +66,7 @@ xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
sink (NULL),
state (active),
delay (delay_),
- protocol (protocol_)
+ sp_version (sp_version_)
{
}
@@ -386,9 +386,9 @@ void xs::pipe_t::terminate (bool delay_)
}
}
-int xs::pipe_t::get_protocol ()
+int xs::pipe_t::get_sp_version ()
{
- return protocol;
+ return sp_version;
}
bool xs::pipe_t::is_delimiter (msg_t &msg_)
diff --git a/src/pipe.hpp b/src/pipe.hpp
index c298154..dbc6b7e 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -44,7 +44,7 @@ namespace xs
// pipe receives all the pending messages before terminating, otherwise it
// terminates straight away.
int pipepair (xs::object_t *parents_ [2], xs::pipe_t* pipes_ [2],
- int hwms_ [2], bool delays_ [2], int protocol_);
+ int hwms_ [2], bool delays_ [2], int sp_version_);
struct i_pipe_events
{
@@ -69,7 +69,7 @@ namespace xs
// This allows pipepair to create pipe objects.
friend int pipepair (xs::object_t *parents_ [2],
xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2],
- int protocol_);
+ int sp_version_);
public:
@@ -111,8 +111,8 @@ namespace xs
// before actual shutdown.
void terminate (bool delay_);
- // Returns the ID of the protocol associated with the pipe.
- int get_protocol ();
+ // Returns the SP pattern version in use on this pipe.
+ int get_sp_version ();
private:
@@ -132,7 +132,7 @@ namespace xs
// Constructor is private. Pipe can only be created using
// pipepair function.
pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
- int inhwm_, int outhwm_, bool delay_, int protocol_);
+ int inhwm_, int outhwm_, bool delay_, int sp_version_);
// Pipepair uses this function to let us know about
// the peer pipe object.
@@ -192,9 +192,9 @@ namespace xs
// asks us to.
bool delay;
- // ID of the protocol to use. This value is not used by the pipe
- // itself, rather it's used by the users of the pipe.
- int protocol;
+ // SP pattern version in use on this pipe. This value is used by the
+ // pattern classes using the pipe, not the pipe itself.
+ int sp_version;
// Identity of the writer. Used uniquely by the reader side.
blob_t identity;
diff --git a/src/pull.cpp b/src/pull.cpp
index 8ae8208..0cedc39 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -23,17 +23,50 @@
#include "err.hpp"
#include "msg.hpp"
#include "pipe.hpp"
+#include "wire.hpp"
xs::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_PULL;
+ options.sp_pattern = SP_PIPELINE;
+ options.sp_version = 3;
+ options.sp_role = SP_PIPELINE_PULL;
+ options.sp_complement = SP_PIPELINE_PUSH;
}
xs::pull_t::~pull_t ()
{
}
+int xs::pull_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PATTERN_VERSION) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ if (version != 2) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ options.sp_version = version;
+ return 0;
+}
+
void xs::pull_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/pull.hpp b/src/pull.hpp
index 04da465..5453bbd 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -45,6 +45,7 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xrecv (xs::msg_t *msg_, int flags_);
bool xhas_in ();
diff --git a/src/push.cpp b/src/push.cpp
index 59f508f..729c97a 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -23,17 +23,50 @@
#include "pipe.hpp"
#include "err.hpp"
#include "msg.hpp"
+#include "wire.hpp"
xs::push_t::push_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_PUSH;
+ options.sp_pattern = SP_PIPELINE;
+ options.sp_version = 3;
+ options.sp_role = SP_PIPELINE_PUSH;
+ options.sp_complement = SP_PIPELINE_PULL;
}
xs::push_t::~push_t ()
{
}
+int xs::push_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PATTERN_VERSION) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ if (version != 2) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ options.sp_version = version;
+ return 0;
+}
+
void xs::push_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/push.hpp b/src/push.hpp
index a112f31..ffe16cc 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -45,6 +45,7 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
bool xhas_out ();
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 48d8811..da9ee8e 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -298,7 +298,7 @@ void xs::session_base_t::process_attach (i_engine *engine_)
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.rcvhwm, options.sndhwm};
bool delays [2] = {options.delay_on_close, options.delay_on_disconnect};
- int rc = pipepair (parents, pipes, hwms, delays, options.protocol);
+ int rc = pipepair (parents, pipes, hwms, delays, options.sp_version);
errno_assert (rc == 0);
// Plug the local end of the pipe.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 86c01b0..960bd2c 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -451,7 +451,7 @@ int xs::socket_base_t::connect (const char *addr_)
pipe_t *ppair [2] = {NULL, NULL};
int hwms [2] = {sndhwm, rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
- rc = pipepair (parents, ppair, hwms, delays, options.protocol);
+ rc = pipepair (parents, ppair, hwms, delays, options.sp_version);
errno_assert (rc == 0);
// Attach local end of the pipe to this socket object.
@@ -535,7 +535,7 @@ int xs::socket_base_t::connect (const char *addr_)
pipe_t *ppair [2] = {NULL, NULL};
int hwms [2] = {options.sndhwm, options.rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
- rc = pipepair (parents, ppair, hwms, delays, options.protocol);
+ rc = pipepair (parents, ppair, hwms, delays, options.sp_version);
errno_assert (rc == 0);
// PGM does not support subscription forwarding; ask for all data to be
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index e70d1d3..45a6af6 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -53,8 +53,21 @@ xs::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
session (NULL),
leftover_session (NULL),
options (options_),
- plugged (false)
+ plugged (false),
+ header_pos (in_header),
+ header_remaining (sizeof in_header),
+ header_received (false),
+ header_sent (false)
{
+ // Fill in outgoing SP protocol header and the complementary (desired)
+ // header.
+ if (!options.legacy_protocol) {
+ sp_get_header (out_header, options.sp_service, options.sp_pattern,
+ options.sp_version, options.sp_role);
+ sp_get_header (desired_header, options.sp_service, options.sp_pattern,
+ options.sp_version, options.sp_complement);
+ }
+
// Get the socket into non-blocking mode.
unblock_socket (s);
@@ -155,6 +168,35 @@ void xs::stream_engine_t::in_event (fd_t fd_)
{
bool disconnection = false;
+ // If we have not yet received the full protocol header...
+ if (unlikely (!options.legacy_protocol && !header_received)) {
+
+ // Read remaining header bytes.
+ int hbytes = read (header_pos, header_remaining);
+
+ // Check whether the peer has closed the connection.
+ if (hbytes == -1) {
+ error ();
+ return;
+ }
+
+ header_remaining -= hbytes;
+ header_pos += hbytes;
+
+ // If we did not read the whole header, poll for more.
+ if (header_remaining)
+ return;
+
+ // If the protocol headers do not match, close the connection.
+ if (memcmp (in_header, desired_header, sizeof in_header) != 0) {
+ error ();
+ return;
+ }
+
+ // Done with protocol header; proceed to read data.
+ header_received = true;
+ }
+
// If there's no data to process in the buffer...
if (!insize) {
@@ -210,6 +252,20 @@ void xs::stream_engine_t::out_event (fd_t fd_)
{
bool more_data = true;
+ // If protocol header was not yet sent...
+ if (unlikely (!options.legacy_protocol && !header_sent)) {
+ int hbytes = write (out_header, sizeof out_header);
+
+ // It should always be possible to write the full protocol header to a
+ // freshly connected TCP socket. Therefore, if we get an error or
+ // partial write here the peer has disconnected.
+ if (hbytes != sizeof out_header) {
+ error ();
+ return;
+ }
+ header_sent = true;
+ }
+
// If write buffer is empty, try to read new data from the encoder.
if (!outsize) {
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp
index 46a6080..70c9ddb 100644
--- a/src/stream_engine.hpp
+++ b/src/stream_engine.hpp
@@ -30,6 +30,7 @@
#include "encoder.hpp"
#include "decoder.hpp"
#include "options.hpp"
+#include "wire.hpp"
namespace xs
{
@@ -98,6 +99,22 @@ namespace xs
bool plugged;
+ // Outgoing protocol header.
+ sp_header_t out_header;
+
+ // Desired protocol header.
+ sp_header_t desired_header;
+
+ // Incoming protocol header.
+ sp_header_t in_header;
+
+ unsigned char *header_pos;
+ size_t header_remaining;
+
+ // Protocol header has been received/sent.
+ bool header_received;
+ bool header_sent;
+
stream_engine_t (const stream_engine_t&);
const stream_engine_t &operator = (const stream_engine_t&);
};
diff --git a/src/sub.cpp b/src/sub.cpp
index d29ae8d..79cb63c 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -48,8 +48,7 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ != XS_SUBSCRIBE && option_ != XS_UNSUBSCRIBE) {
- errno = EINVAL;
- return -1;
+ return xsub_t::xsetsockopt (option_, optval_, optvallen_);
}
if (optvallen_ > 0 && !optval_) {
@@ -202,7 +201,7 @@ int xs::sub_t::filter_subscribed (const unsigned char *data_, size_t size_)
int rc = msg.init_size (size_ + 4);
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
- put_uint16 (data, XS_CMD_SUBSCRIBE);
+ put_uint16 (data, SP_PUBSUB_CMD_SUBSCRIBE);
put_uint16 (data + 2, options.filter);
memcpy (data + 4, data_, size_);
@@ -225,7 +224,7 @@ int xs::sub_t::filter_unsubscribed (const unsigned char *data_, size_t size_)
int rc = msg.init_size (size_ + 4);
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
- put_uint16 (data, XS_CMD_UNSUBSCRIBE);
+ put_uint16 (data, SP_PUBSUB_CMD_UNSUBSCRIBE);
put_uint16 (data + 2, options.filter);
memcpy (data + 4, data_, size_);
diff --git a/src/wire.hpp b/src/wire.hpp
index f840fce..014021b 100644
--- a/src/wire.hpp
+++ b/src/wire.hpp
@@ -24,11 +24,52 @@
#include "stdint.hpp"
// Protocol-related constants.
-#define XS_CMD_SUBSCRIBE 1
-#define XS_CMD_UNSUBSCRIBE 2
+
+// Protocol header.
+#define SP_HEADER_LENGTH 8
+
+// Patterns.
+#define SP_PAIR 1
+#define SP_PUBSUB 2
+#define SP_REQREP 3
+#define SP_PIPELINE 4
+#define SP_SURVEY 5
+
+// Roles.
+#define SP_PAIR_PAIR 1
+#define SP_PUBSUB_PUB 1
+#define SP_PUBSUB_SUB 2
+#define SP_REQREP_REQ 1
+#define SP_REQREP_REP 2
+#define SP_PIPELINE_PUSH 1
+#define SP_PIPELINE_PULL 2
+#define SP_SURVEY_SURVEYOR 1
+#define SP_SURVEY_RESPONDENT 2
+
+// PUBSUB pattern commands.
+#define SP_PUBSUB_CMD_SUBSCRIBE 1
+#define SP_PUBSUB_CMD_UNSUBSCRIBE 2
namespace xs
{
+ // Protocol header type.
+ typedef unsigned char sp_header_t [SP_HEADER_LENGTH];
+
+ // Get the SP protocol header for the specified service, pattern, version
+ // and role.
+ inline void sp_get_header (sp_header_t header_, int service_, int pattern_,
+ int version_, int role_)
+ {
+ header_ [0] = 0; // Protocol identifier
+ header_ [1] = 'S'; // "
+ header_ [2] = 'P'; // "
+ header_ [3] = 0; // Reserved, must be zero.
+ header_ [4] = (service_ >> 8) & 0xff; // Service id high byte
+ header_ [5] = service_ & 0xff; // Service id low byte
+ header_ [6] = pattern_ & 0xff; // Pattern
+ header_ [7] = (version_ & 0xf) << 4; // Pattern version
+ header_ [7] |= role_ & 0xf; // Pattern role
+ }
// Helper functions to convert different integer types to/from network
// byte order.
diff --git a/src/xpub.cpp b/src/xpub.cpp
index fbb45fb..bc307e6 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -35,6 +35,10 @@ xs::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
tmp_filter_id (-1)
{
options.type = XS_XPUB;
+ options.sp_pattern = SP_PUBSUB;
+ options.sp_version = 4;
+ options.sp_role = SP_PUBSUB_PUB;
+ options.sp_complement = SP_PUBSUB_SUB;
}
xs::xpub_t::~xpub_t ()
@@ -44,6 +48,42 @@ xs::xpub_t::~xpub_t ()
it->type->pf_destroy ((void*) (core_t*) this, it->instance);
}
+int xs::xpub_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PATTERN_VERSION) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ switch (version) {
+ case 1:
+ options.legacy_protocol = true;
+ options.sp_version = 1;
+ break;
+ case 3:
+ options.legacy_protocol = false;
+ options.sp_version = 3;
+ break;
+ default:
+ errno = EINVAL;
+ return -1;
+ }
+
+ return 0;
+}
+
void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
@@ -53,7 +93,7 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
// to all data on this pipe, implicitly. Also, if we are using
// 0MQ/2.1-style protocol, there's no subscription forwarding. Thus,
// we need to subscribe for all messages automatically.
- if (icanhasall_|| pipe_->get_protocol () == 1) {
+ if (icanhasall_|| pipe_->get_sp_version () == 1) {
// Find the prefix filter.
// TODO: Change this to ALL filter.
@@ -121,7 +161,8 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_)
int filter_id = XS_FILTER_PREFIX;
#endif
- if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) {
+ if (cmd != SP_PUBSUB_CMD_SUBSCRIBE &&
+ cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) {
sub.close ();
return;
}
@@ -133,7 +174,7 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_)
break;
bool unique;
- if (cmd == XS_CMD_UNSUBSCRIBE) {
+ if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) {
xs_assert (it != filters.end ());
unique = it->type->pf_unsubscribe ((void*) (core_t*) this,
it->instance, pipe_, data + 4, size - 4) ? true : false;
@@ -258,7 +299,7 @@ int xs::xpub_t::filter_unsubscribed (const unsigned char *data_, size_t size_)
// Place the unsubscription to the queue of pending (un)sunscriptions
// to be retrived by the user later on.
blob_t unsub (size_ + 4, 0);
- put_uint16 ((unsigned char*) unsub.data (), XS_CMD_UNSUBSCRIBE);
+ put_uint16 ((unsigned char*) unsub.data (), SP_PUBSUB_CMD_UNSUBSCRIBE);
put_uint16 ((unsigned char*) unsub.data () + 2, tmp_filter_id);
memcpy ((void*) (unsub.data () + 4), data_, size_);
diff --git a/src/xpub.hpp b/src/xpub.hpp
index c76f03c..87cf6f5 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -49,6 +49,7 @@ namespace xs
~xpub_t ();
// Implementations of virtual functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
bool xhas_out ();
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 007ed27..30f1419 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -36,6 +36,10 @@ xs::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
next_peer_id (generate_random ())
{
options.type = XS_XREP;
+ options.sp_pattern = SP_REQREP;
+ options.sp_version = 2;
+ options.sp_role = SP_REQREP_REP;
+ options.sp_complement = SP_REQREP_REQ;
// TODO: Uncomment the following line when XREP will become true XREP
// rather than generic router socket.
@@ -55,6 +59,34 @@ xs::xrep_t::~xrep_t ()
prefetched_msg.close ();
}
+int xs::xrep_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PATTERN_VERSION) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ if (version != 1) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ options.sp_version = version;
+ return 0;
+}
+
void xs::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 8e16a4c..71a7bf4 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -49,6 +49,7 @@ namespace xs
~xrep_t ();
// Overloads of functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (msg_t *msg_, int flags_);
int xrecv (msg_t *msg_, int flags_);
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 1c6af9d..ac6ba4d 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -22,12 +22,17 @@
#include "xreq.hpp"
#include "err.hpp"
#include "msg.hpp"
+#include "wire.hpp"
xs::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
prefetched (false)
{
options.type = XS_XREQ;
+ options.sp_pattern = SP_REQREP;
+ options.sp_version = 2;
+ options.sp_role = SP_REQREP_REQ;
+ options.sp_complement = SP_REQREP_REP;
// TODO: Uncomment the following line when XREQ will become true XREQ
// rather than generic dealer socket.
@@ -46,6 +51,34 @@ xs::xreq_t::~xreq_t ()
prefetched_msg.close ();
}
+int xs::xreq_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PATTERN_VERSION) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ if (version != 1) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ options.sp_version = version;
+ return 0;
+}
+
void xs::xreq_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/xreq.hpp b/src/xreq.hpp
index c4f9baf..9fe6ebb 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -46,6 +46,7 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
int xrecv (xs::msg_t *msg_, int flags_);
diff --git a/src/xrespondent.cpp b/src/xrespondent.cpp
index c16d900..9d949ce 100644
--- a/src/xrespondent.cpp
+++ b/src/xrespondent.cpp
@@ -35,6 +35,10 @@ xs::xrespondent_t::xrespondent_t (class ctx_t *parent_, uint32_t tid_,
next_peer_id (generate_random ())
{
options.type = XS_XRESPONDENT;
+ options.sp_pattern = SP_SURVEY;
+ options.sp_version = 2;
+ options.sp_role = SP_SURVEY_RESPONDENT;
+ options.sp_complement = SP_SURVEY_SURVEYOR;
// If the connection disappears it makes no sense to read any more surveys
// from it. The responses will be unroutable anyway.
@@ -49,6 +53,34 @@ xs::xrespondent_t::~xrespondent_t ()
prefetched_msg.close ();
}
+int xs::xrespondent_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PATTERN_VERSION) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ if (version != 1) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ options.sp_version = version;
+ return 0;
+}
+
void xs::xrespondent_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/xrespondent.hpp b/src/xrespondent.hpp
index eb6ac9a..4143e36 100644
--- a/src/xrespondent.hpp
+++ b/src/xrespondent.hpp
@@ -47,6 +47,7 @@ namespace xs
~xrespondent_t ();
// Overloads of functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (msg_t *msg_, int flags_);
int xrecv (msg_t *msg_, int flags_);
diff --git a/src/xsub.cpp b/src/xsub.cpp
index 7272a8e..9255ceb 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -29,6 +29,10 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_XSUB;
+ options.sp_pattern = SP_PUBSUB;
+ options.sp_version = 4;
+ options.sp_role = SP_PUBSUB_SUB;
+ options.sp_complement = SP_PUBSUB_PUB;
// When socket is being closed down we don't want to wait till pending
// subscription commands are sent to the wire.
@@ -42,6 +46,42 @@ xs::xsub_t::~xsub_t ()
{
}
+int xs::xsub_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PATTERN_VERSION) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ switch (version) {
+ case 1:
+ options.legacy_protocol = true;
+ options.sp_version = 1;
+ break;
+ case 3:
+ options.legacy_protocol = false;
+ options.sp_version = 3;
+ break;
+ default:
+ errno = EINVAL;
+ return -1;
+ }
+
+ return 0;
+}
+
void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
@@ -49,7 +89,7 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
// Pipes with 0MQ/2.1-style protocol are not eligible for accepting
// subscriptions.
- if (pipe_->get_protocol () == 1)
+ if (pipe_->get_sp_version () == 1)
return;
dist.attach (pipe_);
@@ -75,14 +115,14 @@ void xs::xsub_t::xwrite_activated (pipe_t *pipe_)
void xs::xsub_t::xterminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
- if (pipe_->get_protocol () != 1)
+ if (pipe_->get_sp_version () != 1)
dist.terminated (pipe_);
}
void xs::xsub_t::xhiccuped (pipe_t *pipe_)
{
// In 0MQ/2.1 protocol there is no subscription forwarding.
- if (pipe_->get_protocol () == 1)
+ if (pipe_->get_sp_version () == 1)
return;
// Send all the cached subscriptions to the new upstream peer.
@@ -104,13 +144,13 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)
}
int cmd = get_uint16 (data);
int filter_id = get_uint16 (data + 2);
- if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) {
+ if (cmd != SP_PUBSUB_CMD_SUBSCRIBE && cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) {
errno = EINVAL;
return -1;
}
// Process the subscription.
- if (cmd == XS_CMD_SUBSCRIBE) {
+ if (cmd == SP_PUBSUB_CMD_SUBSCRIBE) {
subscriptions_t::iterator it = subscriptions.insert (
std::make_pair (std::make_pair (filter_id,
blob_t (data + 4, size - 4)), 0)).first;
@@ -118,7 +158,7 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)
if (it->second == 1)
return dist.send_to_all (msg_, flags_);
}
- else if (cmd == XS_CMD_UNSUBSCRIBE) {
+ else if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) {
subscriptions_t::iterator it = subscriptions.find (
std::make_pair (filter_id, blob_t (data + 4, size - 4)));
if (it != subscriptions.end ()) {
@@ -162,7 +202,8 @@ void xs::xsub_t::send_subscription (pipe_t *pipe_, bool subscribe_,
int rc = msg.init_size (size_ + 4);
xs_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
- put_uint16 (data, subscribe_ ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE);
+ put_uint16 (data, subscribe_ ? SP_PUBSUB_CMD_SUBSCRIBE :
+ SP_PUBSUB_CMD_UNSUBSCRIBE);
put_uint16 (data + 2, filter_id_);
memcpy (data + 4, data_, size_);
diff --git a/src/xsub.hpp b/src/xsub.hpp
index 0cf81b5..986f753 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -47,6 +47,7 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
bool xhas_out ();
diff --git a/src/xsurveyor.cpp b/src/xsurveyor.cpp
index 985b6d8..7016550 100644
--- a/src/xsurveyor.cpp
+++ b/src/xsurveyor.cpp
@@ -21,11 +21,16 @@
#include "xsurveyor.hpp"
#include "err.hpp"
#include "msg.hpp"
+#include "wire.hpp"
xs::xsurveyor_t::xsurveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_XSURVEYOR;
+ options.sp_pattern = SP_SURVEY;
+ options.sp_version = 2;
+ options.sp_role = SP_SURVEY_SURVEYOR;
+ options.sp_complement = SP_SURVEY_RESPONDENT;
// When the XSURVEYOR socket is close it makes no sense to send any pending
// surveys. The responses will be unroutable anyway.
@@ -36,6 +41,34 @@ xs::xsurveyor_t::~xsurveyor_t ()
{
}
+int xs::xsurveyor_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PATTERN_VERSION) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ if (version != 1) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ options.sp_version = version;
+ return 0;
+}
+
void xs::xsurveyor_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/xsurveyor.hpp b/src/xsurveyor.hpp
index b0c3f9c..e312667 100644
--- a/src/xsurveyor.hpp
+++ b/src/xsurveyor.hpp
@@ -45,6 +45,7 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
int xrecv (xs::msg_t *msg_, int flags_);
diff --git a/tests/libzmq21.cpp b/tests/libzmq21.cpp
index ecf271b..3323ef0 100644
--- a/tests/libzmq21.cpp
+++ b/tests/libzmq21.cpp
@@ -53,8 +53,9 @@ int XS_TEST_MAIN ()
assert (ctx);
void *pub = xs_socket (ctx, XS_PUB);
assert (pub);
- int protocol = 1;
- int rc = xs_setsockopt (pub, XS_PROTOCOL, &protocol, sizeof (protocol));
+ int pattern_version = 1;
+ int rc = xs_setsockopt (pub, XS_PATTERN_VERSION, &pattern_version,
+ sizeof (pattern_version));
assert (rc == 0);
rc = xs_bind (pub, "tcp://127.0.0.1:5560");
assert (rc != -1);
@@ -99,8 +100,9 @@ int XS_TEST_MAIN ()
assert (ctx);
void *sub = xs_socket (ctx, XS_SUB);
assert (sub);
- protocol = 1;
- rc = xs_setsockopt (sub, XS_PROTOCOL, &protocol, sizeof (protocol));
+ pattern_version = 1;
+ rc = xs_setsockopt (sub, XS_PATTERN_VERSION, &pattern_version,
+ sizeof (pattern_version));
assert (rc == 0);
rc = xs_setsockopt (sub, XS_SUBSCRIBE, "", 0);
assert (rc == 0);
diff --git a/tests/wireformat.cpp b/tests/wireformat.cpp
index f3e0f96..f12f5aa 100644
--- a/tests/wireformat.cpp
+++ b/tests/wireformat.cpp
@@ -49,7 +49,11 @@ int XS_TEST_MAIN ()
assert (push);
// Bind the peer and get the message.
- int rc = xs_bind (pull, "tcp://127.0.0.1:5560");
+ int service_id = 0x1234;
+ int rc = xs_setsockopt (pull, XS_SERVICE_ID, &service_id,
+ sizeof service_id);
+ assert (rc == 0);
+ rc = xs_bind (pull, "tcp://127.0.0.1:5560");
assert (rc != -1);
rc = xs_bind (push, "tcp://127.0.0.1:5561");
assert (rc != -1);
@@ -71,10 +75,10 @@ int XS_TEST_MAIN ()
assert (rc == 0);
// Let's send some data and check if it arrived
- rc = send (rpush, "\x04\0abc", 5, 0);
- assert (rc == 5);
+ rc = send (rpush, "\0SP\0\x12\x34\x04\x31\x04\0abc", 13, 0);
+ assert (rc == 13);
unsigned char buf [3];
- unsigned char buf2 [3];
+ unsigned char buf2 [8];
rc = xs_recv (pull, buf, sizeof (buf), 0);
assert (rc == 3);
assert (!memcmp (buf, "abc", 3));
@@ -83,8 +87,11 @@ int XS_TEST_MAIN ()
rc = xs_send (push, buf, sizeof (buf), 0);
assert (rc == 3);
rc = recv (rpull, (char*) buf2, sizeof (buf2), 0);
- assert (rc == 3);
- assert (!memcmp (buf2, "\x04\0abc", 3));
+ assert (rc == 8);
+ assert (!memcmp (buf2, "\0SP\0\0\0\x04\x31", 8));
+ rc = recv (rpull, (char*) buf2, 5, 0);
+ assert (rc == 5);
+ assert (!memcmp (buf2, "\x04\0abc", 5));
#if defined XS_HAVE_WINDOWS
rc = closesocket (rpush);