summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/options.cpp23
-rw-r--r--src/options.hpp10
-rw-r--r--src/pair.cpp33
-rw-r--r--src/pair.hpp1
-rw-r--r--src/pipe.cpp14
-rw-r--r--src/pipe.hpp16
-rw-r--r--src/pull.cpp33
-rw-r--r--src/pull.hpp1
-rw-r--r--src/push.cpp33
-rw-r--r--src/push.hpp1
-rw-r--r--src/session_base.cpp2
-rw-r--r--src/socket_base.cpp4
-rw-r--r--src/stream_engine.cpp58
-rw-r--r--src/stream_engine.hpp17
-rw-r--r--src/sub.cpp7
-rw-r--r--src/wire.hpp43
-rw-r--r--src/xpub.cpp49
-rw-r--r--src/xpub.hpp1
-rw-r--r--src/xrep.cpp32
-rw-r--r--src/xrep.hpp1
-rw-r--r--src/xreq.cpp33
-rw-r--r--src/xreq.hpp1
-rw-r--r--src/xrespondent.cpp32
-rw-r--r--src/xrespondent.hpp1
-rw-r--r--src/xsub.cpp55
-rw-r--r--src/xsub.hpp1
-rw-r--r--src/xsurveyor.cpp33
-rw-r--r--src/xsurveyor.hpp1
-rw-r--r--tests/wireformat.cpp13
29 files changed, 60 insertions, 489 deletions
diff --git a/src/options.cpp b/src/options.cpp
index d362c18..f7bbdc4 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -48,11 +48,7 @@ xs::options_t::options_t () :
sndtimeo (-1),
ipv4only (1),
keepalive (0),
- legacy_protocol (false),
- sp_pattern (-1),
- sp_version (-1),
- sp_role (-1),
- sp_complement (-1),
+ protocol (0),
filter (XS_FILTER_PREFIX),
survey_timeout (-1),
delay_on_close (true),
@@ -240,6 +236,21 @@ int xs::options_t::setsockopt (int option_, const void *optval_,
return 0;
}
+ case XS_PROTOCOL:
+ {
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ int val = *((int*) optval_);
+ if (val < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ protocol = val;
+ return 0;
+ }
+
case XS_FILTER:
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
@@ -446,7 +457,7 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
errno = EINVAL;
return -1;
}
- *((int*) optval_) = sp_version;
+ *((int*) optval_) = protocol;
*optvallen_ = sizeof (int);
return 0;
diff --git a/src/options.hpp b/src/options.hpp
index 89d6d2a..805f793 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -92,14 +92,8 @@ namespace xs
// If 1, keepalives are to be sent periodically.
int keepalive;
- // If true, the legacy non-SP wire protocol is in use.
- bool legacy_protocol;
-
- // SP protocol pattern, version, role and complementary role.
- int sp_pattern;
- int sp_version;
- int sp_role;
- int sp_complement;
+ // Version of wire protocol to use.
+ int protocol;
// Filter ID to be used with subscriptions and unsubscriptions.
int filter;
diff --git a/src/pair.cpp b/src/pair.cpp
index d4f699e..b4cb0b4 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -23,17 +23,12 @@
#include "err.hpp"
#include "pipe.hpp"
#include "msg.hpp"
-#include "wire.hpp"
xs::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
pipe (NULL)
{
options.type = XS_PAIR;
- options.sp_pattern = SP_PAIR;
- options.sp_role = SP_PAIR_PAIR;
- options.sp_version = 2;
- options.sp_complement = SP_PAIR_PAIR;
}
xs::pair_t::~pair_t ()
@@ -41,34 +36,6 @@ xs::pair_t::~pair_t ()
xs_assert (!pipe);
}
-int xs::pair_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PROTOCOL) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- if (version != 2) {
- errno = EINVAL;
- return -1;
- }
-
- options.sp_version = version;
- return 0;
-}
-
void xs::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (!pipe);
diff --git a/src/pair.hpp b/src/pair.hpp
index 6a5f167..07ed6c5 100644
--- a/src/pair.hpp
+++ b/src/pair.hpp
@@ -42,7 +42,6 @@ namespace xs
~pair_t ();
// Overloads of functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
int xrecv (xs::msg_t *msg_, int flags_);
diff --git a/src/pipe.cpp b/src/pipe.cpp
index c14642f..0a15cc0 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -27,7 +27,7 @@
#include "err.hpp"
int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
- int hwms_ [2], bool delays_ [2], int sp_version_)
+ int hwms_ [2], bool delays_ [2], int protocol_)
{
// Creates two pipe objects. These objects are connected by two ypipes,
// each to pass messages in one direction.
@@ -38,10 +38,10 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
alloc_assert (upipe2);
pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
- hwms_ [1], hwms_ [0], delays_ [0], sp_version_);
+ hwms_ [1], hwms_ [0], delays_ [0], protocol_);
alloc_assert (pipes_ [0]);
pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
- hwms_ [0], hwms_ [1], delays_ [1], sp_version_);
+ hwms_ [0], hwms_ [1], delays_ [1], protocol_);
alloc_assert (pipes_ [1]);
pipes_ [0]->set_peer (pipes_ [1]);
@@ -51,7 +51,7 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
}
xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
- int inhwm_, int outhwm_, bool delay_, int sp_version_) :
+ int inhwm_, int outhwm_, bool delay_, int protocol_) :
object_t (parent_),
inpipe (inpipe_),
outpipe (outpipe_),
@@ -66,7 +66,7 @@ xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
sink (NULL),
state (active),
delay (delay_),
- sp_version (sp_version_)
+ protocol (protocol_)
{
}
@@ -386,9 +386,9 @@ void xs::pipe_t::terminate (bool delay_)
}
}
-int xs::pipe_t::get_sp_version ()
+int xs::pipe_t::get_protocol ()
{
- return sp_version;
+ return protocol;
}
bool xs::pipe_t::is_delimiter (msg_t &msg_)
diff --git a/src/pipe.hpp b/src/pipe.hpp
index dbc6b7e..c298154 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -44,7 +44,7 @@ namespace xs
// pipe receives all the pending messages before terminating, otherwise it
// terminates straight away.
int pipepair (xs::object_t *parents_ [2], xs::pipe_t* pipes_ [2],
- int hwms_ [2], bool delays_ [2], int sp_version_);
+ int hwms_ [2], bool delays_ [2], int protocol_);
struct i_pipe_events
{
@@ -69,7 +69,7 @@ namespace xs
// This allows pipepair to create pipe objects.
friend int pipepair (xs::object_t *parents_ [2],
xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2],
- int sp_version_);
+ int protocol_);
public:
@@ -111,8 +111,8 @@ namespace xs
// before actual shutdown.
void terminate (bool delay_);
- // Returns the SP pattern version in use on this pipe.
- int get_sp_version ();
+ // Returns the ID of the protocol associated with the pipe.
+ int get_protocol ();
private:
@@ -132,7 +132,7 @@ namespace xs
// Constructor is private. Pipe can only be created using
// pipepair function.
pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
- int inhwm_, int outhwm_, bool delay_, int sp_version_);
+ int inhwm_, int outhwm_, bool delay_, int protocol_);
// Pipepair uses this function to let us know about
// the peer pipe object.
@@ -192,9 +192,9 @@ namespace xs
// asks us to.
bool delay;
- // SP pattern version in use on this pipe. This value is used by the
- // pattern classes using the pipe, not the pipe itself.
- int sp_version;
+ // ID of the protocol to use. This value is not used by the pipe
+ // itself, rather it's used by the users of the pipe.
+ int protocol;
// Identity of the writer. Used uniquely by the reader side.
blob_t identity;
diff --git a/src/pull.cpp b/src/pull.cpp
index 4d08edb..8ae8208 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -23,50 +23,17 @@
#include "err.hpp"
#include "msg.hpp"
#include "pipe.hpp"
-#include "wire.hpp"
xs::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_PULL;
- options.sp_pattern = SP_PIPELINE;
- options.sp_version = 2;
- options.sp_role = SP_PIPELINE_PULL;
- options.sp_complement = SP_PIPELINE_PUSH;
}
xs::pull_t::~pull_t ()
{
}
-int xs::pull_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PROTOCOL) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- if (version != 2) {
- errno = EINVAL;
- return -1;
- }
-
- options.sp_version = version;
- return 0;
-}
-
void xs::pull_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/pull.hpp b/src/pull.hpp
index 5453bbd..04da465 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -45,7 +45,6 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xrecv (xs::msg_t *msg_, int flags_);
bool xhas_in ();
diff --git a/src/push.cpp b/src/push.cpp
index 94f8381..59f508f 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -23,50 +23,17 @@
#include "pipe.hpp"
#include "err.hpp"
#include "msg.hpp"
-#include "wire.hpp"
xs::push_t::push_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_PUSH;
- options.sp_pattern = SP_PIPELINE;
- options.sp_version = 2;
- options.sp_role = SP_PIPELINE_PUSH;
- options.sp_complement = SP_PIPELINE_PULL;
}
xs::push_t::~push_t ()
{
}
-int xs::push_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PROTOCOL) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- if (version != 2) {
- errno = EINVAL;
- return -1;
- }
-
- options.sp_version = version;
- return 0;
-}
-
void xs::push_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/push.hpp b/src/push.hpp
index ffe16cc..a112f31 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -45,7 +45,6 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
bool xhas_out ();
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 9b58b42..35c4a4e 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -298,7 +298,7 @@ void xs::session_base_t::process_attach (i_engine *engine_)
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.rcvhwm, options.sndhwm};
bool delays [2] = {options.delay_on_close, options.delay_on_disconnect};
- int rc = pipepair (parents, pipes, hwms, delays, options.sp_version);
+ int rc = pipepair (parents, pipes, hwms, delays, options.protocol);
errno_assert (rc == 0);
// Plug the local end of the pipe.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index e8ae28a..ec5c8bd 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -447,7 +447,7 @@ int xs::socket_base_t::connect (const char *addr_)
pipe_t *ppair [2] = {NULL, NULL};
int hwms [2] = {sndhwm, rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
- rc = pipepair (parents, ppair, hwms, delays, options.sp_version);
+ rc = pipepair (parents, ppair, hwms, delays, options.protocol);
errno_assert (rc == 0);
// Attach local end of the pipe to this socket object.
@@ -501,7 +501,7 @@ int xs::socket_base_t::connect (const char *addr_)
pipe_t *ppair [2] = {NULL, NULL};
int hwms [2] = {options.sndhwm, options.rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
- rc = pipepair (parents, ppair, hwms, delays, options.sp_version);
+ rc = pipepair (parents, ppair, hwms, delays, options.protocol);
errno_assert (rc == 0);
// PGM does not support subscription forwarding; ask for all data to be
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index 85e3073..e70d1d3 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -53,21 +53,8 @@ xs::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
session (NULL),
leftover_session (NULL),
options (options_),
- plugged (false),
- header_pos (in_header),
- header_remaining (sizeof in_header),
- header_received (false),
- header_sent (false)
+ plugged (false)
{
- // Fill in outgoing SP protocol header and the complementary (desired)
- // header.
- if (!options.legacy_protocol) {
- sp_get_header (out_header, options.sp_pattern, options.sp_version,
- options.sp_role);
- sp_get_header (desired_header, options.sp_pattern, options.sp_version,
- options.sp_complement);
- }
-
// Get the socket into non-blocking mode.
unblock_socket (s);
@@ -168,35 +155,6 @@ void xs::stream_engine_t::in_event (fd_t fd_)
{
bool disconnection = false;
- // If we have not yet received the full protocol header...
- if (unlikely (!options.legacy_protocol && !header_received)) {
-
- // Read remaining header bytes.
- int hbytes = read (header_pos, header_remaining);
-
- // Check whether the peer has closed the connection.
- if (hbytes == -1) {
- error ();
- return;
- }
-
- header_remaining -= hbytes;
- header_pos += hbytes;
-
- // If we did not read the whole header, poll for more.
- if (header_remaining)
- return;
-
- // If the protocol headers do not match, close the connection.
- if (memcmp (in_header, desired_header, sizeof in_header) != 0) {
- error ();
- return;
- }
-
- // Done with protocol header; proceed to read data.
- header_received = true;
- }
-
// If there's no data to process in the buffer...
if (!insize) {
@@ -252,20 +210,6 @@ void xs::stream_engine_t::out_event (fd_t fd_)
{
bool more_data = true;
- // If protocol header was not yet sent...
- if (unlikely (!options.legacy_protocol && !header_sent)) {
- int hbytes = write (out_header, sizeof out_header);
-
- // It should always be possible to write the full protocol header to a
- // freshly connected TCP socket. Therefore, if we get an error or
- // partial write here the peer has disconnected.
- if (hbytes != sizeof out_header) {
- error ();
- return;
- }
- header_sent = true;
- }
-
// If write buffer is empty, try to read new data from the encoder.
if (!outsize) {
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp
index 70c9ddb..46a6080 100644
--- a/src/stream_engine.hpp
+++ b/src/stream_engine.hpp
@@ -30,7 +30,6 @@
#include "encoder.hpp"
#include "decoder.hpp"
#include "options.hpp"
-#include "wire.hpp"
namespace xs
{
@@ -99,22 +98,6 @@ namespace xs
bool plugged;
- // Outgoing protocol header.
- sp_header_t out_header;
-
- // Desired protocol header.
- sp_header_t desired_header;
-
- // Incoming protocol header.
- sp_header_t in_header;
-
- unsigned char *header_pos;
- size_t header_remaining;
-
- // Protocol header has been received/sent.
- bool header_received;
- bool header_sent;
-
stream_engine_t (const stream_engine_t&);
const stream_engine_t &operator = (const stream_engine_t&);
};
diff --git a/src/sub.cpp b/src/sub.cpp
index 79cb63c..d29ae8d 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -48,7 +48,8 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ != XS_SUBSCRIBE && option_ != XS_UNSUBSCRIBE) {
- return xsub_t::xsetsockopt (option_, optval_, optvallen_);
+ errno = EINVAL;
+ return -1;
}
if (optvallen_ > 0 && !optval_) {
@@ -201,7 +202,7 @@ int xs::sub_t::filter_subscribed (const unsigned char *data_, size_t size_)
int rc = msg.init_size (size_ + 4);
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
- put_uint16 (data, SP_PUBSUB_CMD_SUBSCRIBE);
+ put_uint16 (data, XS_CMD_SUBSCRIBE);
put_uint16 (data + 2, options.filter);
memcpy (data + 4, data_, size_);
@@ -224,7 +225,7 @@ int xs::sub_t::filter_unsubscribed (const unsigned char *data_, size_t size_)
int rc = msg.init_size (size_ + 4);
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
- put_uint16 (data, SP_PUBSUB_CMD_UNSUBSCRIBE);
+ put_uint16 (data, XS_CMD_UNSUBSCRIBE);
put_uint16 (data + 2, options.filter);
memcpy (data + 4, data_, size_);
diff --git a/src/wire.hpp b/src/wire.hpp
index e751818..f840fce 100644
--- a/src/wire.hpp
+++ b/src/wire.hpp
@@ -24,50 +24,11 @@
#include "stdint.hpp"
// Protocol-related constants.
-
-// Protocol header.
-#define SP_HEADER_LENGTH 8
-
-// Patterns.
-#define SP_PAIR 1
-#define SP_PUBSUB 2
-#define SP_REQREP 3
-#define SP_PIPELINE 4
-#define SP_SURVEY 5
-
-// Roles.
-#define SP_PAIR_PAIR 1
-#define SP_PUBSUB_PUB 1
-#define SP_PUBSUB_SUB 2
-#define SP_REQREP_REQ 1
-#define SP_REQREP_REP 2
-#define SP_PIPELINE_PUSH 1
-#define SP_PIPELINE_PULL 2
-#define SP_SURVEY_SURVEYOR 1
-#define SP_SURVEY_RESPONDENT 2
-
-// PUBSUB pattern commands.
-#define SP_PUBSUB_CMD_SUBSCRIBE 1
-#define SP_PUBSUB_CMD_UNSUBSCRIBE 2
+#define XS_CMD_SUBSCRIBE 1
+#define XS_CMD_UNSUBSCRIBE 2
namespace xs
{
- // Protocol header type.
- typedef unsigned char sp_header_t [SP_HEADER_LENGTH];
-
- // Get the SP protocol header for the specified pattern, version and role.
- inline void sp_get_header (sp_header_t header_, int pattern_, int version_,
- int role_)
- {
- header_ [0] = 0;
- header_ [1] = 0;
- header_ [2] = 'S';
- header_ [3] = 'P';
- header_ [4] = pattern_ & 0xff;
- header_ [5] = version_ & 0xff;
- header_ [6] = role_ & 0xff;
- header_ [7] = 0;
- }
// Helper functions to convert different integer types to/from network
// byte order.
diff --git a/src/xpub.cpp b/src/xpub.cpp
index b176bf8..fe0b9a7 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -35,10 +35,6 @@ xs::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
tmp_filter_id (-1)
{
options.type = XS_XPUB;
- options.sp_pattern = SP_PUBSUB;
- options.sp_version = 3;
- options.sp_role = SP_PUBSUB_PUB;
- options.sp_complement = SP_PUBSUB_SUB;
}
xs::xpub_t::~xpub_t ()
@@ -48,42 +44,6 @@ xs::xpub_t::~xpub_t ()
it->type->pf_destroy ((void*) (core_t*) this, it->instance);
}
-int xs::xpub_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PROTOCOL) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- switch (version) {
- case 1:
- options.legacy_protocol = true;
- options.sp_version = 1;
- break;
- case 3:
- options.legacy_protocol = false;
- options.sp_version = 3;
- break;
- default:
- errno = EINVAL;
- return -1;
- }
-
- return 0;
-}
-
void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
@@ -93,7 +53,7 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
// to all data on this pipe, implicitly. Also, if we are using
// 0MQ/2.1-style protocol, there's no subscription forwarding. Thus,
// we need to subscribe for all messages automatically.
- if (icanhasall_|| pipe_->get_sp_version () == 1) {
+ if (icanhasall_|| pipe_->get_protocol () == 1) {
// Find the prefix filter.
// TODO: Change this to ALL filter.
@@ -161,8 +121,7 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_)
int filter_id = XS_FILTER_PREFIX;
#endif
- if (cmd != SP_PUBSUB_CMD_SUBSCRIBE &&
- cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) {
+ if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) {
sub.close ();
return;
}
@@ -174,7 +133,7 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_)
break;
bool unique;
- if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) {
+ if (cmd == XS_CMD_UNSUBSCRIBE) {
xs_assert (it != filters.end ());
unique = it->type->pf_unsubscribe ((void*) (core_t*) this,
it->instance, pipe_, data + 4, size - 4) ? true : false;
@@ -299,7 +258,7 @@ int xs::xpub_t::filter_unsubscribed (const unsigned char *data_, size_t size_)
// Place the unsubscription to the queue of pending (un)sunscriptions
// to be retrived by the user later on.
blob_t unsub (size_ + 4, 0);
- put_uint16 ((unsigned char*) unsub.data (), SP_PUBSUB_CMD_UNSUBSCRIBE);
+ put_uint16 ((unsigned char*) unsub.data (), XS_CMD_UNSUBSCRIBE);
put_uint16 ((unsigned char*) unsub.data () + 2, tmp_filter_id);
memcpy ((void*) (unsub.data () + 4), data_, size_);
diff --git a/src/xpub.hpp b/src/xpub.hpp
index f9cb737..c0e47ff 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -49,7 +49,6 @@ namespace xs
~xpub_t ();
// Implementations of virtual functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
bool xhas_out ();
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 5e9fc8a..007ed27 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -36,10 +36,6 @@ xs::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
next_peer_id (generate_random ())
{
options.type = XS_XREP;
- options.sp_pattern = SP_REQREP;
- options.sp_version = 1;
- options.sp_role = SP_REQREP_REP;
- options.sp_complement = SP_REQREP_REQ;
// TODO: Uncomment the following line when XREP will become true XREP
// rather than generic router socket.
@@ -59,34 +55,6 @@ xs::xrep_t::~xrep_t ()
prefetched_msg.close ();
}
-int xs::xrep_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PROTOCOL) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- if (version != 1) {
- errno = EINVAL;
- return -1;
- }
-
- options.sp_version = version;
- return 0;
-}
-
void xs::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 71a7bf4..8e16a4c 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -49,7 +49,6 @@ namespace xs
~xrep_t ();
// Overloads of functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (msg_t *msg_, int flags_);
int xrecv (msg_t *msg_, int flags_);
diff --git a/src/xreq.cpp b/src/xreq.cpp
index e42c691..1c6af9d 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -22,17 +22,12 @@
#include "xreq.hpp"
#include "err.hpp"
#include "msg.hpp"
-#include "wire.hpp"
xs::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
prefetched (false)
{
options.type = XS_XREQ;
- options.sp_pattern = SP_REQREP;
- options.sp_version = 1;
- options.sp_role = SP_REQREP_REQ;
- options.sp_complement = SP_REQREP_REP;
// TODO: Uncomment the following line when XREQ will become true XREQ
// rather than generic dealer socket.
@@ -51,34 +46,6 @@ xs::xreq_t::~xreq_t ()
prefetched_msg.close ();
}
-int xs::xreq_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PROTOCOL) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- if (version != 1) {
- errno = EINVAL;
- return -1;
- }
-
- options.sp_version = version;
- return 0;
-}
-
void xs::xreq_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/xreq.hpp b/src/xreq.hpp
index 9fe6ebb..c4f9baf 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -46,7 +46,6 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
int xrecv (xs::msg_t *msg_, int flags_);
diff --git a/src/xrespondent.cpp b/src/xrespondent.cpp
index bd19913..c16d900 100644
--- a/src/xrespondent.cpp
+++ b/src/xrespondent.cpp
@@ -35,10 +35,6 @@ xs::xrespondent_t::xrespondent_t (class ctx_t *parent_, uint32_t tid_,
next_peer_id (generate_random ())
{
options.type = XS_XRESPONDENT;
- options.sp_pattern = SP_SURVEY;
- options.sp_version = 1;
- options.sp_role = SP_SURVEY_RESPONDENT;
- options.sp_complement = SP_SURVEY_SURVEYOR;
// If the connection disappears it makes no sense to read any more surveys
// from it. The responses will be unroutable anyway.
@@ -53,34 +49,6 @@ xs::xrespondent_t::~xrespondent_t ()
prefetched_msg.close ();
}
-int xs::xrespondent_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PROTOCOL) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- if (version != 1) {
- errno = EINVAL;
- return -1;
- }
-
- options.sp_version = version;
- return 0;
-}
-
void xs::xrespondent_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/xrespondent.hpp b/src/xrespondent.hpp
index 4143e36..eb6ac9a 100644
--- a/src/xrespondent.hpp
+++ b/src/xrespondent.hpp
@@ -47,7 +47,6 @@ namespace xs
~xrespondent_t ();
// Overloads of functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (msg_t *msg_, int flags_);
int xrecv (msg_t *msg_, int flags_);
diff --git a/src/xsub.cpp b/src/xsub.cpp
index 18acfea..7272a8e 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -29,10 +29,6 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_XSUB;
- options.sp_pattern = SP_PUBSUB;
- options.sp_version = 3;
- options.sp_role = SP_PUBSUB_SUB;
- options.sp_complement = SP_PUBSUB_PUB;
// When socket is being closed down we don't want to wait till pending
// subscription commands are sent to the wire.
@@ -46,42 +42,6 @@ xs::xsub_t::~xsub_t ()
{
}
-int xs::xsub_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PROTOCOL) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- switch (version) {
- case 1:
- options.legacy_protocol = true;
- options.sp_version = 1;
- break;
- case 3:
- options.legacy_protocol = false;
- options.sp_version = 3;
- break;
- default:
- errno = EINVAL;
- return -1;
- }
-
- return 0;
-}
-
void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
@@ -89,7 +49,7 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
// Pipes with 0MQ/2.1-style protocol are not eligible for accepting
// subscriptions.
- if (pipe_->get_sp_version () == 1)
+ if (pipe_->get_protocol () == 1)
return;
dist.attach (pipe_);
@@ -115,14 +75,14 @@ void xs::xsub_t::xwrite_activated (pipe_t *pipe_)
void xs::xsub_t::xterminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
- if (pipe_->get_sp_version () != 1)
+ if (pipe_->get_protocol () != 1)
dist.terminated (pipe_);
}
void xs::xsub_t::xhiccuped (pipe_t *pipe_)
{
// In 0MQ/2.1 protocol there is no subscription forwarding.
- if (pipe_->get_sp_version () == 1)
+ if (pipe_->get_protocol () == 1)
return;
// Send all the cached subscriptions to the new upstream peer.
@@ -144,13 +104,13 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)
}
int cmd = get_uint16 (data);
int filter_id = get_uint16 (data + 2);
- if (cmd != SP_PUBSUB_CMD_SUBSCRIBE && cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) {
+ if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) {
errno = EINVAL;
return -1;
}
// Process the subscription.
- if (cmd == SP_PUBSUB_CMD_SUBSCRIBE) {
+ if (cmd == XS_CMD_SUBSCRIBE) {
subscriptions_t::iterator it = subscriptions.insert (
std::make_pair (std::make_pair (filter_id,
blob_t (data + 4, size - 4)), 0)).first;
@@ -158,7 +118,7 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)
if (it->second == 1)
return dist.send_to_all (msg_, flags_);
}
- else if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) {
+ else if (cmd == XS_CMD_UNSUBSCRIBE) {
subscriptions_t::iterator it = subscriptions.find (
std::make_pair (filter_id, blob_t (data + 4, size - 4)));
if (it != subscriptions.end ()) {
@@ -202,8 +162,7 @@ void xs::xsub_t::send_subscription (pipe_t *pipe_, bool subscribe_,
int rc = msg.init_size (size_ + 4);
xs_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
- put_uint16 (data, subscribe_ ? SP_PUBSUB_CMD_SUBSCRIBE :
- SP_PUBSUB_CMD_UNSUBSCRIBE);
+ put_uint16 (data, subscribe_ ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE);
put_uint16 (data + 2, filter_id_);
memcpy (data + 4, data_, size_);
diff --git a/src/xsub.hpp b/src/xsub.hpp
index fcaf61c..5bc11c4 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -47,7 +47,6 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
bool xhas_out ();
diff --git a/src/xsurveyor.cpp b/src/xsurveyor.cpp
index d02ea09..985b6d8 100644
--- a/src/xsurveyor.cpp
+++ b/src/xsurveyor.cpp
@@ -21,16 +21,11 @@
#include "xsurveyor.hpp"
#include "err.hpp"
#include "msg.hpp"
-#include "wire.hpp"
xs::xsurveyor_t::xsurveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_XSURVEYOR;
- options.sp_pattern = SP_SURVEY;
- options.sp_version = 1;
- options.sp_role = SP_SURVEY_SURVEYOR;
- options.sp_complement = SP_SURVEY_RESPONDENT;
// When the XSURVEYOR socket is close it makes no sense to send any pending
// surveys. The responses will be unroutable anyway.
@@ -41,34 +36,6 @@ xs::xsurveyor_t::~xsurveyor_t ()
{
}
-int xs::xsurveyor_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PROTOCOL) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- if (version != 1) {
- errno = EINVAL;
- return -1;
- }
-
- options.sp_version = version;
- return 0;
-}
-
void xs::xsurveyor_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/xsurveyor.hpp b/src/xsurveyor.hpp
index e312667..b0c3f9c 100644
--- a/src/xsurveyor.hpp
+++ b/src/xsurveyor.hpp
@@ -45,7 +45,6 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
int xrecv (xs::msg_t *msg_, int flags_);
diff --git a/tests/wireformat.cpp b/tests/wireformat.cpp
index 7bf21f3..f3e0f96 100644
--- a/tests/wireformat.cpp
+++ b/tests/wireformat.cpp
@@ -71,10 +71,10 @@ int XS_TEST_MAIN ()
assert (rc == 0);
// Let's send some data and check if it arrived
- rc = send (rpush, "\0\0SP\x04\x02\x01\0\x04\0abc", 13, 0);
- assert (rc == 13);
+ rc = send (rpush, "\x04\0abc", 5, 0);
+ assert (rc == 5);
unsigned char buf [3];
- unsigned char buf2 [8];
+ unsigned char buf2 [3];
rc = xs_recv (pull, buf, sizeof (buf), 0);
assert (rc == 3);
assert (!memcmp (buf, "abc", 3));
@@ -83,11 +83,8 @@ int XS_TEST_MAIN ()
rc = xs_send (push, buf, sizeof (buf), 0);
assert (rc == 3);
rc = recv (rpull, (char*) buf2, sizeof (buf2), 0);
- assert (rc == 8);
- assert (!memcmp (buf2, "\0\0SP\x04\x02\x01\0", 8));
- rc = recv (rpull, (char*) buf2, 5, 0);
- assert (rc == 5);
- assert (!memcmp (buf2, "\x04\0abc", 5));
+ assert (rc == 3);
+ assert (!memcmp (buf2, "\x04\0abc", 3));
#if defined XS_HAVE_WINDOWS
rc = closesocket (rpush);