summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/options.cpp23
-rw-r--r--src/options.hpp10
-rw-r--r--src/pair.cpp33
-rw-r--r--src/pair.hpp1
-rw-r--r--src/pipe.cpp14
-rw-r--r--src/pipe.hpp16
-rw-r--r--src/pull.cpp33
-rw-r--r--src/pull.hpp1
-rw-r--r--src/push.cpp33
-rw-r--r--src/push.hpp1
-rw-r--r--src/session_base.cpp2
-rw-r--r--src/socket_base.cpp4
-rw-r--r--src/stream_engine.cpp58
-rw-r--r--src/stream_engine.hpp17
-rw-r--r--src/sub.cpp7
-rw-r--r--src/wire.hpp43
-rw-r--r--src/xpub.cpp49
-rw-r--r--src/xpub.hpp1
-rw-r--r--src/xrep.cpp32
-rw-r--r--src/xrep.hpp1
-rw-r--r--src/xreq.cpp33
-rw-r--r--src/xreq.hpp1
-rw-r--r--src/xrespondent.cpp32
-rw-r--r--src/xrespondent.hpp1
-rw-r--r--src/xsub.cpp55
-rw-r--r--src/xsub.hpp1
-rw-r--r--src/xsurveyor.cpp33
-rw-r--r--src/xsurveyor.hpp1
-rw-r--r--tests/wireformat.cpp13
29 files changed, 489 insertions, 60 deletions
diff --git a/src/options.cpp b/src/options.cpp
index f7bbdc4..d362c18 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -48,7 +48,11 @@ xs::options_t::options_t () :
sndtimeo (-1),
ipv4only (1),
keepalive (0),
- protocol (0),
+ legacy_protocol (false),
+ sp_pattern (-1),
+ sp_version (-1),
+ sp_role (-1),
+ sp_complement (-1),
filter (XS_FILTER_PREFIX),
survey_timeout (-1),
delay_on_close (true),
@@ -236,21 +240,6 @@ int xs::options_t::setsockopt (int option_, const void *optval_,
return 0;
}
- case XS_PROTOCOL:
- {
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
- int val = *((int*) optval_);
- if (val < 0) {
- errno = EINVAL;
- return -1;
- }
- protocol = val;
- return 0;
- }
-
case XS_FILTER:
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
@@ -457,7 +446,7 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
errno = EINVAL;
return -1;
}
- *((int*) optval_) = protocol;
+ *((int*) optval_) = sp_version;
*optvallen_ = sizeof (int);
return 0;
diff --git a/src/options.hpp b/src/options.hpp
index 805f793..89d6d2a 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -92,8 +92,14 @@ namespace xs
// If 1, keepalives are to be sent periodically.
int keepalive;
- // Version of wire protocol to use.
- int protocol;
+ // If true, the legacy non-SP wire protocol is in use.
+ bool legacy_protocol;
+
+ // SP protocol pattern, version, role and complementary role.
+ int sp_pattern;
+ int sp_version;
+ int sp_role;
+ int sp_complement;
// Filter ID to be used with subscriptions and unsubscriptions.
int filter;
diff --git a/src/pair.cpp b/src/pair.cpp
index b4cb0b4..d4f699e 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -23,12 +23,17 @@
#include "err.hpp"
#include "pipe.hpp"
#include "msg.hpp"
+#include "wire.hpp"
xs::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
pipe (NULL)
{
options.type = XS_PAIR;
+ options.sp_pattern = SP_PAIR;
+ options.sp_role = SP_PAIR_PAIR;
+ options.sp_version = 2;
+ options.sp_complement = SP_PAIR_PAIR;
}
xs::pair_t::~pair_t ()
@@ -36,6 +41,34 @@ xs::pair_t::~pair_t ()
xs_assert (!pipe);
}
+int xs::pair_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PROTOCOL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ if (version != 2) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ options.sp_version = version;
+ return 0;
+}
+
void xs::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (!pipe);
diff --git a/src/pair.hpp b/src/pair.hpp
index 07ed6c5..6a5f167 100644
--- a/src/pair.hpp
+++ b/src/pair.hpp
@@ -42,6 +42,7 @@ namespace xs
~pair_t ();
// Overloads of functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
int xrecv (xs::msg_t *msg_, int flags_);
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 0a15cc0..c14642f 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -27,7 +27,7 @@
#include "err.hpp"
int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
- int hwms_ [2], bool delays_ [2], int protocol_)
+ int hwms_ [2], bool delays_ [2], int sp_version_)
{
// Creates two pipe objects. These objects are connected by two ypipes,
// each to pass messages in one direction.
@@ -38,10 +38,10 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
alloc_assert (upipe2);
pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
- hwms_ [1], hwms_ [0], delays_ [0], protocol_);
+ hwms_ [1], hwms_ [0], delays_ [0], sp_version_);
alloc_assert (pipes_ [0]);
pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
- hwms_ [0], hwms_ [1], delays_ [1], protocol_);
+ hwms_ [0], hwms_ [1], delays_ [1], sp_version_);
alloc_assert (pipes_ [1]);
pipes_ [0]->set_peer (pipes_ [1]);
@@ -51,7 +51,7 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
}
xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
- int inhwm_, int outhwm_, bool delay_, int protocol_) :
+ int inhwm_, int outhwm_, bool delay_, int sp_version_) :
object_t (parent_),
inpipe (inpipe_),
outpipe (outpipe_),
@@ -66,7 +66,7 @@ xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
sink (NULL),
state (active),
delay (delay_),
- protocol (protocol_)
+ sp_version (sp_version_)
{
}
@@ -386,9 +386,9 @@ void xs::pipe_t::terminate (bool delay_)
}
}
-int xs::pipe_t::get_protocol ()
+int xs::pipe_t::get_sp_version ()
{
- return protocol;
+ return sp_version;
}
bool xs::pipe_t::is_delimiter (msg_t &msg_)
diff --git a/src/pipe.hpp b/src/pipe.hpp
index c298154..dbc6b7e 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -44,7 +44,7 @@ namespace xs
// pipe receives all the pending messages before terminating, otherwise it
// terminates straight away.
int pipepair (xs::object_t *parents_ [2], xs::pipe_t* pipes_ [2],
- int hwms_ [2], bool delays_ [2], int protocol_);
+ int hwms_ [2], bool delays_ [2], int sp_version_);
struct i_pipe_events
{
@@ -69,7 +69,7 @@ namespace xs
// This allows pipepair to create pipe objects.
friend int pipepair (xs::object_t *parents_ [2],
xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2],
- int protocol_);
+ int sp_version_);
public:
@@ -111,8 +111,8 @@ namespace xs
// before actual shutdown.
void terminate (bool delay_);
- // Returns the ID of the protocol associated with the pipe.
- int get_protocol ();
+ // Returns the SP pattern version in use on this pipe.
+ int get_sp_version ();
private:
@@ -132,7 +132,7 @@ namespace xs
// Constructor is private. Pipe can only be created using
// pipepair function.
pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
- int inhwm_, int outhwm_, bool delay_, int protocol_);
+ int inhwm_, int outhwm_, bool delay_, int sp_version_);
// Pipepair uses this function to let us know about
// the peer pipe object.
@@ -192,9 +192,9 @@ namespace xs
// asks us to.
bool delay;
- // ID of the protocol to use. This value is not used by the pipe
- // itself, rather it's used by the users of the pipe.
- int protocol;
+ // SP pattern version in use on this pipe. This value is used by the
+ // pattern classes using the pipe, not the pipe itself.
+ int sp_version;
// Identity of the writer. Used uniquely by the reader side.
blob_t identity;
diff --git a/src/pull.cpp b/src/pull.cpp
index 8ae8208..4d08edb 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -23,17 +23,50 @@
#include "err.hpp"
#include "msg.hpp"
#include "pipe.hpp"
+#include "wire.hpp"
xs::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_PULL;
+ options.sp_pattern = SP_PIPELINE;
+ options.sp_version = 2;
+ options.sp_role = SP_PIPELINE_PULL;
+ options.sp_complement = SP_PIPELINE_PUSH;
}
xs::pull_t::~pull_t ()
{
}
+int xs::pull_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PROTOCOL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ if (version != 2) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ options.sp_version = version;
+ return 0;
+}
+
void xs::pull_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/pull.hpp b/src/pull.hpp
index 04da465..5453bbd 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -45,6 +45,7 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xrecv (xs::msg_t *msg_, int flags_);
bool xhas_in ();
diff --git a/src/push.cpp b/src/push.cpp
index 59f508f..94f8381 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -23,17 +23,50 @@
#include "pipe.hpp"
#include "err.hpp"
#include "msg.hpp"
+#include "wire.hpp"
xs::push_t::push_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_PUSH;
+ options.sp_pattern = SP_PIPELINE;
+ options.sp_version = 2;
+ options.sp_role = SP_PIPELINE_PUSH;
+ options.sp_complement = SP_PIPELINE_PULL;
}
xs::push_t::~push_t ()
{
}
+int xs::push_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PROTOCOL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ if (version != 2) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ options.sp_version = version;
+ return 0;
+}
+
void xs::push_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/push.hpp b/src/push.hpp
index a112f31..ffe16cc 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -45,6 +45,7 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
bool xhas_out ();
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 35c4a4e..9b58b42 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -298,7 +298,7 @@ void xs::session_base_t::process_attach (i_engine *engine_)
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.rcvhwm, options.sndhwm};
bool delays [2] = {options.delay_on_close, options.delay_on_disconnect};
- int rc = pipepair (parents, pipes, hwms, delays, options.protocol);
+ int rc = pipepair (parents, pipes, hwms, delays, options.sp_version);
errno_assert (rc == 0);
// Plug the local end of the pipe.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index ec5c8bd..e8ae28a 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -447,7 +447,7 @@ int xs::socket_base_t::connect (const char *addr_)
pipe_t *ppair [2] = {NULL, NULL};
int hwms [2] = {sndhwm, rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
- rc = pipepair (parents, ppair, hwms, delays, options.protocol);
+ rc = pipepair (parents, ppair, hwms, delays, options.sp_version);
errno_assert (rc == 0);
// Attach local end of the pipe to this socket object.
@@ -501,7 +501,7 @@ int xs::socket_base_t::connect (const char *addr_)
pipe_t *ppair [2] = {NULL, NULL};
int hwms [2] = {options.sndhwm, options.rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
- rc = pipepair (parents, ppair, hwms, delays, options.protocol);
+ rc = pipepair (parents, ppair, hwms, delays, options.sp_version);
errno_assert (rc == 0);
// PGM does not support subscription forwarding; ask for all data to be
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index 15b566b..a39b410 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -53,8 +53,21 @@ xs::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
session (NULL),
leftover_session (NULL),
options (options_),
- plugged (false)
+ plugged (false),
+ header_pos (in_header),
+ header_remaining (sizeof in_header),
+ header_received (false),
+ header_sent (false)
{
+ // Fill in outgoing SP protocol header and the complementary (desired)
+ // header.
+ if (!options.legacy_protocol) {
+ sp_get_header (out_header, options.sp_pattern, options.sp_version,
+ options.sp_role);
+ sp_get_header (desired_header, options.sp_pattern, options.sp_version,
+ options.sp_complement);
+ }
+
// Get the socket into non-blocking mode.
unblock_socket (s);
@@ -155,6 +168,35 @@ void xs::stream_engine_t::in_event (fd_t fd_)
{
bool disconnection = false;
+ // If we have not yet received the full protocol header...
+ if (unlikely (!options.legacy_protocol && !header_received)) {
+
+ // Read remaining header bytes.
+ int hbytes = read (header_pos, header_remaining);
+
+ // Check whether the peer has closed the connection.
+ if (hbytes == -1) {
+ error ();
+ return;
+ }
+
+ header_remaining -= hbytes;
+ header_pos += hbytes;
+
+ // If we did not read the whole header, poll for more.
+ if (header_remaining)
+ return;
+
+ // If the protocol headers do not match, close the connection.
+ if (memcmp (in_header, desired_header, sizeof in_header) != 0) {
+ error ();
+ return;
+ }
+
+ // Done with protocol header; proceed to read data.
+ header_received = true;
+ }
+
// If there's no data to process in the buffer...
if (!insize) {
@@ -210,6 +252,20 @@ void xs::stream_engine_t::out_event (fd_t fd_)
{
bool more_data = true;
+ // If protocol header was not yet sent...
+ if (unlikely (!options.legacy_protocol && !header_sent)) {
+ int hbytes = write (out_header, sizeof out_header);
+
+ // It should always be possible to write the full protocol header to a
+ // freshly connected TCP socket. Therefore, if we get an error or
+ // partial write here the peer has disconnected.
+ if (hbytes != sizeof out_header) {
+ error ();
+ return;
+ }
+ header_sent = true;
+ }
+
// If write buffer is empty, try to read new data from the encoder.
if (!outsize) {
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp
index 46a6080..70c9ddb 100644
--- a/src/stream_engine.hpp
+++ b/src/stream_engine.hpp
@@ -30,6 +30,7 @@
#include "encoder.hpp"
#include "decoder.hpp"
#include "options.hpp"
+#include "wire.hpp"
namespace xs
{
@@ -98,6 +99,22 @@ namespace xs
bool plugged;
+ // Outgoing protocol header.
+ sp_header_t out_header;
+
+ // Desired protocol header.
+ sp_header_t desired_header;
+
+ // Incoming protocol header.
+ sp_header_t in_header;
+
+ unsigned char *header_pos;
+ size_t header_remaining;
+
+ // Protocol header has been received/sent.
+ bool header_received;
+ bool header_sent;
+
stream_engine_t (const stream_engine_t&);
const stream_engine_t &operator = (const stream_engine_t&);
};
diff --git a/src/sub.cpp b/src/sub.cpp
index d29ae8d..79cb63c 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -48,8 +48,7 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ != XS_SUBSCRIBE && option_ != XS_UNSUBSCRIBE) {
- errno = EINVAL;
- return -1;
+ return xsub_t::xsetsockopt (option_, optval_, optvallen_);
}
if (optvallen_ > 0 && !optval_) {
@@ -202,7 +201,7 @@ int xs::sub_t::filter_subscribed (const unsigned char *data_, size_t size_)
int rc = msg.init_size (size_ + 4);
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
- put_uint16 (data, XS_CMD_SUBSCRIBE);
+ put_uint16 (data, SP_PUBSUB_CMD_SUBSCRIBE);
put_uint16 (data + 2, options.filter);
memcpy (data + 4, data_, size_);
@@ -225,7 +224,7 @@ int xs::sub_t::filter_unsubscribed (const unsigned char *data_, size_t size_)
int rc = msg.init_size (size_ + 4);
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
- put_uint16 (data, XS_CMD_UNSUBSCRIBE);
+ put_uint16 (data, SP_PUBSUB_CMD_UNSUBSCRIBE);
put_uint16 (data + 2, options.filter);
memcpy (data + 4, data_, size_);
diff --git a/src/wire.hpp b/src/wire.hpp
index f840fce..e751818 100644
--- a/src/wire.hpp
+++ b/src/wire.hpp
@@ -24,11 +24,50 @@
#include "stdint.hpp"
// Protocol-related constants.
-#define XS_CMD_SUBSCRIBE 1
-#define XS_CMD_UNSUBSCRIBE 2
+
+// Protocol header.
+#define SP_HEADER_LENGTH 8
+
+// Patterns.
+#define SP_PAIR 1
+#define SP_PUBSUB 2
+#define SP_REQREP 3
+#define SP_PIPELINE 4
+#define SP_SURVEY 5
+
+// Roles.
+#define SP_PAIR_PAIR 1
+#define SP_PUBSUB_PUB 1
+#define SP_PUBSUB_SUB 2
+#define SP_REQREP_REQ 1
+#define SP_REQREP_REP 2
+#define SP_PIPELINE_PUSH 1
+#define SP_PIPELINE_PULL 2
+#define SP_SURVEY_SURVEYOR 1
+#define SP_SURVEY_RESPONDENT 2
+
+// PUBSUB pattern commands.
+#define SP_PUBSUB_CMD_SUBSCRIBE 1
+#define SP_PUBSUB_CMD_UNSUBSCRIBE 2
namespace xs
{
+ // Protocol header type.
+ typedef unsigned char sp_header_t [SP_HEADER_LENGTH];
+
+ // Get the SP protocol header for the specified pattern, version and role.
+ inline void sp_get_header (sp_header_t header_, int pattern_, int version_,
+ int role_)
+ {
+ header_ [0] = 0;
+ header_ [1] = 0;
+ header_ [2] = 'S';
+ header_ [3] = 'P';
+ header_ [4] = pattern_ & 0xff;
+ header_ [5] = version_ & 0xff;
+ header_ [6] = role_ & 0xff;
+ header_ [7] = 0;
+ }
// Helper functions to convert different integer types to/from network
// byte order.
diff --git a/src/xpub.cpp b/src/xpub.cpp
index fe0b9a7..b176bf8 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -35,6 +35,10 @@ xs::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
tmp_filter_id (-1)
{
options.type = XS_XPUB;
+ options.sp_pattern = SP_PUBSUB;
+ options.sp_version = 3;
+ options.sp_role = SP_PUBSUB_PUB;
+ options.sp_complement = SP_PUBSUB_SUB;
}
xs::xpub_t::~xpub_t ()
@@ -44,6 +48,42 @@ xs::xpub_t::~xpub_t ()
it->type->pf_destroy ((void*) (core_t*) this, it->instance);
}
+int xs::xpub_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PROTOCOL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ switch (version) {
+ case 1:
+ options.legacy_protocol = true;
+ options.sp_version = 1;
+ break;
+ case 3:
+ options.legacy_protocol = false;
+ options.sp_version = 3;
+ break;
+ default:
+ errno = EINVAL;
+ return -1;
+ }
+
+ return 0;
+}
+
void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
@@ -53,7 +93,7 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
// to all data on this pipe, implicitly. Also, if we are using
// 0MQ/2.1-style protocol, there's no subscription forwarding. Thus,
// we need to subscribe for all messages automatically.
- if (icanhasall_|| pipe_->get_protocol () == 1) {
+ if (icanhasall_|| pipe_->get_sp_version () == 1) {
// Find the prefix filter.
// TODO: Change this to ALL filter.
@@ -121,7 +161,8 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_)
int filter_id = XS_FILTER_PREFIX;
#endif
- if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) {
+ if (cmd != SP_PUBSUB_CMD_SUBSCRIBE &&
+ cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) {
sub.close ();
return;
}
@@ -133,7 +174,7 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_)
break;
bool unique;
- if (cmd == XS_CMD_UNSUBSCRIBE) {
+ if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) {
xs_assert (it != filters.end ());
unique = it->type->pf_unsubscribe ((void*) (core_t*) this,
it->instance, pipe_, data + 4, size - 4) ? true : false;
@@ -258,7 +299,7 @@ int xs::xpub_t::filter_unsubscribed (const unsigned char *data_, size_t size_)
// Place the unsubscription to the queue of pending (un)sunscriptions
// to be retrived by the user later on.
blob_t unsub (size_ + 4, 0);
- put_uint16 ((unsigned char*) unsub.data (), XS_CMD_UNSUBSCRIBE);
+ put_uint16 ((unsigned char*) unsub.data (), SP_PUBSUB_CMD_UNSUBSCRIBE);
put_uint16 ((unsigned char*) unsub.data () + 2, tmp_filter_id);
memcpy ((void*) (unsub.data () + 4), data_, size_);
diff --git a/src/xpub.hpp b/src/xpub.hpp
index c0e47ff..f9cb737 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -49,6 +49,7 @@ namespace xs
~xpub_t ();
// Implementations of virtual functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
bool xhas_out ();
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 007ed27..5e9fc8a 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -36,6 +36,10 @@ xs::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
next_peer_id (generate_random ())
{
options.type = XS_XREP;
+ options.sp_pattern = SP_REQREP;
+ options.sp_version = 1;
+ options.sp_role = SP_REQREP_REP;
+ options.sp_complement = SP_REQREP_REQ;
// TODO: Uncomment the following line when XREP will become true XREP
// rather than generic router socket.
@@ -55,6 +59,34 @@ xs::xrep_t::~xrep_t ()
prefetched_msg.close ();
}
+int xs::xrep_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PROTOCOL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ if (version != 1) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ options.sp_version = version;
+ return 0;
+}
+
void xs::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 8e16a4c..71a7bf4 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -49,6 +49,7 @@ namespace xs
~xrep_t ();
// Overloads of functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (msg_t *msg_, int flags_);
int xrecv (msg_t *msg_, int flags_);
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 1c6af9d..e42c691 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -22,12 +22,17 @@
#include "xreq.hpp"
#include "err.hpp"
#include "msg.hpp"
+#include "wire.hpp"
xs::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
prefetched (false)
{
options.type = XS_XREQ;
+ options.sp_pattern = SP_REQREP;
+ options.sp_version = 1;
+ options.sp_role = SP_REQREP_REQ;
+ options.sp_complement = SP_REQREP_REP;
// TODO: Uncomment the following line when XREQ will become true XREQ
// rather than generic dealer socket.
@@ -46,6 +51,34 @@ xs::xreq_t::~xreq_t ()
prefetched_msg.close ();
}
+int xs::xreq_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PROTOCOL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ if (version != 1) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ options.sp_version = version;
+ return 0;
+}
+
void xs::xreq_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/xreq.hpp b/src/xreq.hpp
index c4f9baf..9fe6ebb 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -46,6 +46,7 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
int xrecv (xs::msg_t *msg_, int flags_);
diff --git a/src/xrespondent.cpp b/src/xrespondent.cpp
index c16d900..bd19913 100644
--- a/src/xrespondent.cpp
+++ b/src/xrespondent.cpp
@@ -35,6 +35,10 @@ xs::xrespondent_t::xrespondent_t (class ctx_t *parent_, uint32_t tid_,
next_peer_id (generate_random ())
{
options.type = XS_XRESPONDENT;
+ options.sp_pattern = SP_SURVEY;
+ options.sp_version = 1;
+ options.sp_role = SP_SURVEY_RESPONDENT;
+ options.sp_complement = SP_SURVEY_SURVEYOR;
// If the connection disappears it makes no sense to read any more surveys
// from it. The responses will be unroutable anyway.
@@ -49,6 +53,34 @@ xs::xrespondent_t::~xrespondent_t ()
prefetched_msg.close ();
}
+int xs::xrespondent_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PROTOCOL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ if (version != 1) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ options.sp_version = version;
+ return 0;
+}
+
void xs::xrespondent_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/xrespondent.hpp b/src/xrespondent.hpp
index eb6ac9a..4143e36 100644
--- a/src/xrespondent.hpp
+++ b/src/xrespondent.hpp
@@ -47,6 +47,7 @@ namespace xs
~xrespondent_t ();
// Overloads of functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (msg_t *msg_, int flags_);
int xrecv (msg_t *msg_, int flags_);
diff --git a/src/xsub.cpp b/src/xsub.cpp
index 7272a8e..18acfea 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -29,6 +29,10 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_XSUB;
+ options.sp_pattern = SP_PUBSUB;
+ options.sp_version = 3;
+ options.sp_role = SP_PUBSUB_SUB;
+ options.sp_complement = SP_PUBSUB_PUB;
// When socket is being closed down we don't want to wait till pending
// subscription commands are sent to the wire.
@@ -42,6 +46,42 @@ xs::xsub_t::~xsub_t ()
{
}
+int xs::xsub_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PROTOCOL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ switch (version) {
+ case 1:
+ options.legacy_protocol = true;
+ options.sp_version = 1;
+ break;
+ case 3:
+ options.legacy_protocol = false;
+ options.sp_version = 3;
+ break;
+ default:
+ errno = EINVAL;
+ return -1;
+ }
+
+ return 0;
+}
+
void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
@@ -49,7 +89,7 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
// Pipes with 0MQ/2.1-style protocol are not eligible for accepting
// subscriptions.
- if (pipe_->get_protocol () == 1)
+ if (pipe_->get_sp_version () == 1)
return;
dist.attach (pipe_);
@@ -75,14 +115,14 @@ void xs::xsub_t::xwrite_activated (pipe_t *pipe_)
void xs::xsub_t::xterminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
- if (pipe_->get_protocol () != 1)
+ if (pipe_->get_sp_version () != 1)
dist.terminated (pipe_);
}
void xs::xsub_t::xhiccuped (pipe_t *pipe_)
{
// In 0MQ/2.1 protocol there is no subscription forwarding.
- if (pipe_->get_protocol () == 1)
+ if (pipe_->get_sp_version () == 1)
return;
// Send all the cached subscriptions to the new upstream peer.
@@ -104,13 +144,13 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)
}
int cmd = get_uint16 (data);
int filter_id = get_uint16 (data + 2);
- if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) {
+ if (cmd != SP_PUBSUB_CMD_SUBSCRIBE && cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) {
errno = EINVAL;
return -1;
}
// Process the subscription.
- if (cmd == XS_CMD_SUBSCRIBE) {
+ if (cmd == SP_PUBSUB_CMD_SUBSCRIBE) {
subscriptions_t::iterator it = subscriptions.insert (
std::make_pair (std::make_pair (filter_id,
blob_t (data + 4, size - 4)), 0)).first;
@@ -118,7 +158,7 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)
if (it->second == 1)
return dist.send_to_all (msg_, flags_);
}
- else if (cmd == XS_CMD_UNSUBSCRIBE) {
+ else if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) {
subscriptions_t::iterator it = subscriptions.find (
std::make_pair (filter_id, blob_t (data + 4, size - 4)));
if (it != subscriptions.end ()) {
@@ -162,7 +202,8 @@ void xs::xsub_t::send_subscription (pipe_t *pipe_, bool subscribe_,
int rc = msg.init_size (size_ + 4);
xs_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
- put_uint16 (data, subscribe_ ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE);
+ put_uint16 (data, subscribe_ ? SP_PUBSUB_CMD_SUBSCRIBE :
+ SP_PUBSUB_CMD_UNSUBSCRIBE);
put_uint16 (data + 2, filter_id_);
memcpy (data + 4, data_, size_);
diff --git a/src/xsub.hpp b/src/xsub.hpp
index 5bc11c4..fcaf61c 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -47,6 +47,7 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
bool xhas_out ();
diff --git a/src/xsurveyor.cpp b/src/xsurveyor.cpp
index 985b6d8..d02ea09 100644
--- a/src/xsurveyor.cpp
+++ b/src/xsurveyor.cpp
@@ -21,11 +21,16 @@
#include "xsurveyor.hpp"
#include "err.hpp"
#include "msg.hpp"
+#include "wire.hpp"
xs::xsurveyor_t::xsurveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_XSURVEYOR;
+ options.sp_pattern = SP_SURVEY;
+ options.sp_version = 1;
+ options.sp_role = SP_SURVEY_SURVEYOR;
+ options.sp_complement = SP_SURVEY_RESPONDENT;
// When the XSURVEYOR socket is close it makes no sense to send any pending
// surveys. The responses will be unroutable anyway.
@@ -36,6 +41,34 @@ xs::xsurveyor_t::~xsurveyor_t ()
{
}
+int xs::xsurveyor_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PROTOCOL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ if (version != 1) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ options.sp_version = version;
+ return 0;
+}
+
void xs::xsurveyor_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/xsurveyor.hpp b/src/xsurveyor.hpp
index b0c3f9c..e312667 100644
--- a/src/xsurveyor.hpp
+++ b/src/xsurveyor.hpp
@@ -45,6 +45,7 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
int xrecv (xs::msg_t *msg_, int flags_);
diff --git a/tests/wireformat.cpp b/tests/wireformat.cpp
index f3e0f96..7bf21f3 100644
--- a/tests/wireformat.cpp
+++ b/tests/wireformat.cpp
@@ -71,10 +71,10 @@ int XS_TEST_MAIN ()
assert (rc == 0);
// Let's send some data and check if it arrived
- rc = send (rpush, "\x04\0abc", 5, 0);
- assert (rc == 5);
+ rc = send (rpush, "\0\0SP\x04\x02\x01\0\x04\0abc", 13, 0);
+ assert (rc == 13);
unsigned char buf [3];
- unsigned char buf2 [3];
+ unsigned char buf2 [8];
rc = xs_recv (pull, buf, sizeof (buf), 0);
assert (rc == 3);
assert (!memcmp (buf, "abc", 3));
@@ -83,8 +83,11 @@ int XS_TEST_MAIN ()
rc = xs_send (push, buf, sizeof (buf), 0);
assert (rc == 3);
rc = recv (rpull, (char*) buf2, sizeof (buf2), 0);
- assert (rc == 3);
- assert (!memcmp (buf2, "\x04\0abc", 3));
+ assert (rc == 8);
+ assert (!memcmp (buf2, "\0\0SP\x04\x02\x01\0", 8));
+ rc = recv (rpull, (char*) buf2, 5, 0);
+ assert (rc == 5);
+ assert (!memcmp (buf2, "\x04\0abc", 5));
#if defined XS_HAVE_WINDOWS
rc = closesocket (rpush);