summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-06-13 10:25:18 +0200
committerMartin Sustrik <sustrik@250bpm.com>2012-06-13 10:25:18 +0200
commit0a2f88984a4543aa69c15d8d8639180816857a6b (patch)
tree759f4d025d4cb95e010081839197045a7fde9826
parentb018208d5901873a374d08a98cf31e3bb89d12d0 (diff)
Revert "Implement SP wire protocol"
This reverts commit a34ea4d80609395150742259fd8c9caa4409e961.
-rw-r--r--include/xs/xs.h3
-rw-r--r--src/options.cpp42
-rw-r--r--src/options.hpp12
-rw-r--r--src/pair.cpp33
-rw-r--r--src/pair.hpp1
-rw-r--r--src/pgm_receiver.cpp31
-rw-r--r--src/pgm_receiver.hpp4
-rw-r--r--src/pgm_sender.cpp39
-rw-r--r--src/pgm_sender.hpp15
-rw-r--r--src/pipe.cpp14
-rw-r--r--src/pipe.hpp16
-rw-r--r--src/pull.cpp33
-rw-r--r--src/pull.hpp1
-rw-r--r--src/push.cpp33
-rw-r--r--src/push.hpp1
-rw-r--r--src/session_base.cpp2
-rw-r--r--src/socket_base.cpp4
-rw-r--r--src/stream_engine.cpp58
-rw-r--r--src/stream_engine.hpp17
-rw-r--r--src/sub.cpp7
-rw-r--r--src/wire.hpp45
-rw-r--r--src/xpub.cpp49
-rw-r--r--src/xpub.hpp1
-rw-r--r--src/xrep.cpp32
-rw-r--r--src/xrep.hpp1
-rw-r--r--src/xreq.cpp33
-rw-r--r--src/xreq.hpp1
-rw-r--r--src/xrespondent.cpp32
-rw-r--r--src/xrespondent.hpp1
-rw-r--r--src/xsub.cpp55
-rw-r--r--src/xsub.hpp1
-rw-r--r--src/xsurveyor.cpp33
-rw-r--r--src/xsurveyor.hpp1
-rw-r--r--tests/libzmq21.cpp10
-rw-r--r--tests/wireformat.cpp19
35 files changed, 83 insertions, 597 deletions
diff --git a/include/xs/xs.h b/include/xs/xs.h
index 120bd1f..74d5f34 100644
--- a/include/xs/xs.h
+++ b/include/xs/xs.h
@@ -206,9 +206,8 @@ XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval,
#define XS_SNDTIMEO 28
#define XS_IPV4ONLY 31
#define XS_KEEPALIVE 32
-#define XS_PATTERN_VERSION 33
+#define XS_PROTOCOL 33
#define XS_SURVEY_TIMEOUT 35
-#define XS_SERVICE_ID 36
/* Message options */
#define XS_MORE 1
diff --git a/src/options.cpp b/src/options.cpp
index 26fa62c..62c03a2 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -48,12 +48,7 @@ xs::options_t::options_t () :
sndtimeo (-1),
ipv4only (1),
keepalive (0),
- legacy_protocol (false),
- sp_service (0),
- sp_pattern (-1),
- sp_version (-1),
- sp_role (-1),
- sp_complement (-1),
+ protocol (0),
filter (XS_FILTER_PREFIX),
survey_timeout (-1),
delay_on_close (true),
@@ -241,29 +236,29 @@ int xs::options_t::setsockopt (int option_, const void *optval_,
return 0;
}
- case XS_FILTER:
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
- filter = *((int*) optval_);
- return 0;
-
- case XS_SERVICE_ID:
+ case XS_PROTOCOL:
{
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
int val = *((int*) optval_);
- if (val < 0 || val > 0xffff) {
+ if (val < 0) {
errno = EINVAL;
return -1;
}
- sp_service = val;
+ protocol = val;
return 0;
}
+ case XS_FILTER:
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ filter = *((int*) optval_);
+ return 0;
+
case XS_SURVEY_TIMEOUT:
if (type != XS_SURVEYOR) {
errno = ENOTSUP;
@@ -457,21 +452,12 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int);
return 0;
- case XS_PATTERN_VERSION:
- if (*optvallen_ < sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
- *((int*) optval_) = sp_version;
- *optvallen_ = sizeof (int);
- return 0;
-
- case XS_SERVICE_ID:
+ case XS_PROTOCOL:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
- *((int*) optval_) = sp_service;
+ *((int*) optval_) = protocol;
*optvallen_ = sizeof (int);
return 0;
diff --git a/src/options.hpp b/src/options.hpp
index 9070c0f..805f793 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -92,16 +92,8 @@ namespace xs
// If 1, keepalives are to be sent periodically.
int keepalive;
- // If true, the legacy non-SP wire protocol is in use.
- bool legacy_protocol;
-
- // SP protocol service id, pattern, version, role and complementary
- // role.
- int sp_service;
- int sp_pattern;
- int sp_version;
- int sp_role;
- int sp_complement;
+ // Version of wire protocol to use.
+ int protocol;
// Filter ID to be used with subscriptions and unsubscriptions.
int filter;
diff --git a/src/pair.cpp b/src/pair.cpp
index bfb48d3..531bfc5 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -23,17 +23,12 @@
#include "err.hpp"
#include "pipe.hpp"
#include "msg.hpp"
-#include "wire.hpp"
xs::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
pipe (NULL)
{
options.type = XS_PAIR;
- options.sp_pattern = SP_PAIR;
- options.sp_role = SP_PAIR_PAIR;
- options.sp_version = 3;
- options.sp_complement = SP_PAIR_PAIR;
}
xs::pair_t::~pair_t ()
@@ -41,34 +36,6 @@ xs::pair_t::~pair_t ()
xs_assert (!pipe);
}
-int xs::pair_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PATTERN_VERSION) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- if (version != 2) {
- errno = EINVAL;
- return -1;
- }
-
- options.sp_version = version;
- return 0;
-}
-
void xs::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_ != NULL);
diff --git a/src/pair.hpp b/src/pair.hpp
index 6a5f167..07ed6c5 100644
--- a/src/pair.hpp
+++ b/src/pair.hpp
@@ -42,7 +42,6 @@ namespace xs
~pair_t ();
// Overloads of functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
int xrecv (xs::msg_t *msg_, int flags_);
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 9aa1233..371c657 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -47,11 +47,6 @@ xs::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
pending_bytes (0),
rx_timer (NULL)
{
- // If not using a legacy protocol, fill in desired protocol header.
- if (!options.legacy_protocol) {
- sp_get_header (desired_header, options.sp_service, options.sp_pattern,
- options.sp_version, options.sp_complement);
- }
}
xs::pgm_receiver_t::~pgm_receiver_t ()
@@ -178,7 +173,7 @@ void xs::pgm_receiver_t::in_event (fd_t fd_)
data = (unsigned char*) tmp;
// No data to process. This may happen if the packet received is
- // neither RDATA nor ODATA.
+ // neither ODATA nor ODATA.
if (received == 0) {
if (errno == ENOMEM || errno == EBUSY) {
const long timeout = pgm_socket.get_rx_timeout ();
@@ -205,32 +200,18 @@ void xs::pgm_receiver_t::in_event (fd_t fd_)
break;
}
- // Check protocol header.
- if (unlikely (!options.legacy_protocol)) {
- if (received < (ssize_t) sizeof desired_header)
- // Ignore malformed datagram.
- continue;
- if (memcmp (data, desired_header, sizeof desired_header) != 0)
- // Ignore datagram with incorrect protocol header.
- continue;
- data += sizeof desired_header;
- received -= sizeof desired_header;
+ // New peer. Add it to the list of know but unjoint peers.
+ if (it == peers.end ()) {
+ peer_info_t peer_info = {false, NULL};
+ it = peers.insert (peers_t::value_type (*tsi, peer_info)).first;
}
// Read the offset of the fist message in the current packet.
- if (received < (ssize_t) sizeof (uint16_t))
- // Ignore malformed datagram.
- continue;
+ xs_assert ((size_t) received >= sizeof (uint16_t));
uint16_t offset = get_uint16 (data);
data += sizeof (uint16_t);
received -= sizeof (uint16_t);
- // New peer. Add it to the list of known but disjoint peers.
- if (it == peers.end ()) {
- peer_info_t peer_info = {false, NULL};
- it = peers.insert (peers_t::value_type (*tsi, peer_info)).first;
- }
-
// Join the stream if needed.
if (!it->second.joined) {
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index c44594a..38c7a7b 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -39,7 +39,6 @@
#include "options.hpp"
#include "decoder.hpp"
#include "pgm_socket.hpp"
-#include "wire.hpp"
namespace xs
{
@@ -126,9 +125,6 @@ namespace xs
// Receive timer, if active, otherwise NULL.
handle_t rx_timer;
- // Desired protocol header.
- sp_header_t desired_header;
-
pgm_receiver_t (const pgm_receiver_t&);
const pgm_receiver_t &operator = (const pgm_receiver_t&);
};
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index d8d77b9..9c2a961 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -60,25 +60,6 @@ int xs::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
out_buffer_size = pgm_socket.get_max_tsdu_size ();
out_buffer = (unsigned char*) malloc (out_buffer_size);
alloc_assert (out_buffer);
- encode_buffer = out_buffer;
- encode_buffer_size = out_buffer_size;
- header_size = 0;
-
- // If not using a legacy protocol, fill in our datagram header and reserve
- // space for it in the datagram.
- if (!options.legacy_protocol) {
- sp_get_header (out_buffer, options.sp_service, options.sp_pattern,
- options.sp_version, options.sp_role);
- encode_buffer += SP_HEADER_LENGTH;
- encode_buffer_size -= SP_HEADER_LENGTH;
- header_size += SP_HEADER_LENGTH;
- }
-
- // Reserve space in the datagram for the offset of the first message.
- offset_p = encode_buffer;
- encode_buffer += sizeof (uint16_t);
- encode_buffer_size -= sizeof (uint16_t);
- header_size += sizeof (uint16_t);
return rc;
}
@@ -175,25 +156,27 @@ void xs::pgm_sender_t::in_event (fd_t fd_)
void xs::pgm_sender_t::out_event (fd_t fd_)
{
- // POLLOUT event from send socket. If write buffer is empty (which means
- // that the last write succeeded), try to read new data from the encoder.
+ // POLLOUT event from send socket. If write buffer is empty,
+ // try to read new data from the encoder.
if (write_size == 0) {
- // Pass our own buffer to the get_data () function to prevent it from
- // returning its own buffer.
+ // First two bytes (sizeof uint16_t) are used to store message
+ // offset in following steps. Note that by passing our buffer to
+ // the get data function we prevent it from returning its own buffer.
+ unsigned char *bf = out_buffer + sizeof (uint16_t);
+ size_t bfsz = out_buffer_size - sizeof (uint16_t);
int offset = -1;
- size_t data_size = encode_buffer_size;
- encoder.get_data (&encode_buffer, &data_size, &offset);
+ encoder.get_data (&bf, &bfsz, &offset);
// If there are no data to write stop polling for output.
- if (!data_size) {
+ if (!bfsz) {
reset_pollout (handle);
return;
}
// Put offset information in the buffer.
- write_size = header_size + data_size;
- put_uint16 (offset_p, offset == -1 ? 0xffff : (uint16_t) offset);
+ write_size = bfsz + sizeof (uint16_t);
+ put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);
}
if (tx_timer) {
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 5a808b6..89f81bf 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -84,21 +84,12 @@ namespace xs
handle_t rdata_notify_handle;
handle_t pending_notify_handle;
- // Output buffer and size for pgm_socket.
+ // Output buffer from pgm_socket.
unsigned char *out_buffer;
+
+ // Output buffer size.
size_t out_buffer_size;
- // Size of header in each datagram.
- size_t header_size;
-
- // Encoder buffer and size, adjusted from output buffer by size of
- // datagram header(s).
- unsigned char *encode_buffer;
- size_t encode_buffer_size;
-
- // Position of offset to first message in output buffer.
- unsigned char *offset_p;
-
// Number of bytes in the buffer to be written to the socket.
// If zero, there are no data to be sent.
size_t write_size;
diff --git a/src/pipe.cpp b/src/pipe.cpp
index c14642f..0a15cc0 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -27,7 +27,7 @@
#include "err.hpp"
int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
- int hwms_ [2], bool delays_ [2], int sp_version_)
+ int hwms_ [2], bool delays_ [2], int protocol_)
{
// Creates two pipe objects. These objects are connected by two ypipes,
// each to pass messages in one direction.
@@ -38,10 +38,10 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
alloc_assert (upipe2);
pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
- hwms_ [1], hwms_ [0], delays_ [0], sp_version_);
+ hwms_ [1], hwms_ [0], delays_ [0], protocol_);
alloc_assert (pipes_ [0]);
pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
- hwms_ [0], hwms_ [1], delays_ [1], sp_version_);
+ hwms_ [0], hwms_ [1], delays_ [1], protocol_);
alloc_assert (pipes_ [1]);
pipes_ [0]->set_peer (pipes_ [1]);
@@ -51,7 +51,7 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
}
xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
- int inhwm_, int outhwm_, bool delay_, int sp_version_) :
+ int inhwm_, int outhwm_, bool delay_, int protocol_) :
object_t (parent_),
inpipe (inpipe_),
outpipe (outpipe_),
@@ -66,7 +66,7 @@ xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
sink (NULL),
state (active),
delay (delay_),
- sp_version (sp_version_)
+ protocol (protocol_)
{
}
@@ -386,9 +386,9 @@ void xs::pipe_t::terminate (bool delay_)
}
}
-int xs::pipe_t::get_sp_version ()
+int xs::pipe_t::get_protocol ()
{
- return sp_version;
+ return protocol;
}
bool xs::pipe_t::is_delimiter (msg_t &msg_)
diff --git a/src/pipe.hpp b/src/pipe.hpp
index dbc6b7e..c298154 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -44,7 +44,7 @@ namespace xs
// pipe receives all the pending messages before terminating, otherwise it
// terminates straight away.
int pipepair (xs::object_t *parents_ [2], xs::pipe_t* pipes_ [2],
- int hwms_ [2], bool delays_ [2], int sp_version_);
+ int hwms_ [2], bool delays_ [2], int protocol_);
struct i_pipe_events
{
@@ -69,7 +69,7 @@ namespace xs
// This allows pipepair to create pipe objects.
friend int pipepair (xs::object_t *parents_ [2],
xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2],
- int sp_version_);
+ int protocol_);
public:
@@ -111,8 +111,8 @@ namespace xs
// before actual shutdown.
void terminate (bool delay_);
- // Returns the SP pattern version in use on this pipe.
- int get_sp_version ();
+ // Returns the ID of the protocol associated with the pipe.
+ int get_protocol ();
private:
@@ -132,7 +132,7 @@ namespace xs
// Constructor is private. Pipe can only be created using
// pipepair function.
pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
- int inhwm_, int outhwm_, bool delay_, int sp_version_);
+ int inhwm_, int outhwm_, bool delay_, int protocol_);
// Pipepair uses this function to let us know about
// the peer pipe object.
@@ -192,9 +192,9 @@ namespace xs
// asks us to.
bool delay;
- // SP pattern version in use on this pipe. This value is used by the
- // pattern classes using the pipe, not the pipe itself.
- int sp_version;
+ // ID of the protocol to use. This value is not used by the pipe
+ // itself, rather it's used by the users of the pipe.
+ int protocol;
// Identity of the writer. Used uniquely by the reader side.
blob_t identity;
diff --git a/src/pull.cpp b/src/pull.cpp
index 0cedc39..8ae8208 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -23,50 +23,17 @@
#include "err.hpp"
#include "msg.hpp"
#include "pipe.hpp"
-#include "wire.hpp"
xs::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_PULL;
- options.sp_pattern = SP_PIPELINE;
- options.sp_version = 3;
- options.sp_role = SP_PIPELINE_PULL;
- options.sp_complement = SP_PIPELINE_PUSH;
}
xs::pull_t::~pull_t ()
{
}
-int xs::pull_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PATTERN_VERSION) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- if (version != 2) {
- errno = EINVAL;
- return -1;
- }
-
- options.sp_version = version;
- return 0;
-}
-
void xs::pull_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/pull.hpp b/src/pull.hpp
index 5453bbd..04da465 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -45,7 +45,6 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xrecv (xs::msg_t *msg_, int flags_);
bool xhas_in ();
diff --git a/src/push.cpp b/src/push.cpp
index 729c97a..59f508f 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -23,50 +23,17 @@
#include "pipe.hpp"
#include "err.hpp"
#include "msg.hpp"
-#include "wire.hpp"
xs::push_t::push_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_PUSH;
- options.sp_pattern = SP_PIPELINE;
- options.sp_version = 3;
- options.sp_role = SP_PIPELINE_PUSH;
- options.sp_complement = SP_PIPELINE_PULL;
}
xs::push_t::~push_t ()
{
}
-int xs::push_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PATTERN_VERSION) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- if (version != 2) {
- errno = EINVAL;
- return -1;
- }
-
- options.sp_version = version;
- return 0;
-}
-
void xs::push_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/push.hpp b/src/push.hpp
index ffe16cc..a112f31 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -45,7 +45,6 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
bool xhas_out ();
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 81f7347..49fdce1 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -300,7 +300,7 @@ void xs::session_base_t::process_attach (i_engine *engine_)
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.rcvhwm, options.sndhwm};
bool delays [2] = {options.delay_on_close, options.delay_on_disconnect};
- int rc = pipepair (parents, pipes, hwms, delays, options.sp_version);
+ int rc = pipepair (parents, pipes, hwms, delays, options.protocol);
errno_assert (rc == 0);
// Plug the local end of the pipe.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index b424a5e..9a8103e 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -451,7 +451,7 @@ int xs::socket_base_t::connect (const char *addr_)
pipe_t *ppair [2] = {NULL, NULL};
int hwms [2] = {sndhwm, rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
- rc = pipepair (parents, ppair, hwms, delays, options.sp_version);
+ rc = pipepair (parents, ppair, hwms, delays, options.protocol);
errno_assert (rc == 0);
// Attach local end of the pipe to this socket object.
@@ -535,7 +535,7 @@ int xs::socket_base_t::connect (const char *addr_)
pipe_t *ppair [2] = {NULL, NULL};
int hwms [2] = {options.sndhwm, options.rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
- rc = pipepair (parents, ppair, hwms, delays, options.sp_version);
+ rc = pipepair (parents, ppair, hwms, delays, options.protocol);
errno_assert (rc == 0);
// PGM does not support subscription forwarding; ask for all data to be
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index b0f9f20..dc6d345 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -53,21 +53,8 @@ xs::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
session (NULL),
leftover_session (NULL),
options (options_),
- plugged (false),
- header_pos (in_header),
- header_remaining (sizeof in_header),
- header_received (false),
- header_sent (false)
+ plugged (false)
{
- // Fill in outgoing SP protocol header and the complementary (desired)
- // header.
- if (!options.legacy_protocol) {
- sp_get_header (out_header, options.sp_service, options.sp_pattern,
- options.sp_version, options.sp_role);
- sp_get_header (desired_header, options.sp_service, options.sp_pattern,
- options.sp_version, options.sp_complement);
- }
-
// Get the socket into non-blocking mode.
unblock_socket (s);
@@ -168,35 +155,6 @@ void xs::stream_engine_t::in_event (fd_t fd_)
{
bool disconnection = false;
- // If we have not yet received the full protocol header...
- if (unlikely (!options.legacy_protocol && !header_received)) {
-
- // Read remaining header bytes.
- int hbytes = read (header_pos, header_remaining);
-
- // Check whether the peer has closed the connection.
- if (hbytes == -1) {
- error ();
- return;
- }
-
- header_remaining -= hbytes;
- header_pos += hbytes;
-
- // If we did not read the whole header, poll for more.
- if (header_remaining)
- return;
-
- // If the protocol headers do not match, close the connection.
- if (memcmp (in_header, desired_header, sizeof in_header) != 0) {
- error ();
- return;
- }
-
- // Done with protocol header; proceed to read data.
- header_received = true;
- }
-
// If there's no data to process in the buffer...
if (!insize) {
@@ -252,20 +210,6 @@ void xs::stream_engine_t::out_event (fd_t fd_)
{
bool more_data = true;
- // If protocol header was not yet sent...
- if (unlikely (!options.legacy_protocol && !header_sent)) {
- int hbytes = write (out_header, sizeof out_header);
-
- // It should always be possible to write the full protocol header to a
- // freshly connected TCP socket. Therefore, if we get an error or
- // partial write here the peer has disconnected.
- if (hbytes != sizeof out_header) {
- error ();
- return;
- }
- header_sent = true;
- }
-
// If write buffer is empty, try to read new data from the encoder.
if (!outsize) {
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp
index 70c9ddb..46a6080 100644
--- a/src/stream_engine.hpp
+++ b/src/stream_engine.hpp
@@ -30,7 +30,6 @@
#include "encoder.hpp"
#include "decoder.hpp"
#include "options.hpp"
-#include "wire.hpp"
namespace xs
{
@@ -99,22 +98,6 @@ namespace xs
bool plugged;
- // Outgoing protocol header.
- sp_header_t out_header;
-
- // Desired protocol header.
- sp_header_t desired_header;
-
- // Incoming protocol header.
- sp_header_t in_header;
-
- unsigned char *header_pos;
- size_t header_remaining;
-
- // Protocol header has been received/sent.
- bool header_received;
- bool header_sent;
-
stream_engine_t (const stream_engine_t&);
const stream_engine_t &operator = (const stream_engine_t&);
};
diff --git a/src/sub.cpp b/src/sub.cpp
index 79cb63c..d29ae8d 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -48,7 +48,8 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ != XS_SUBSCRIBE && option_ != XS_UNSUBSCRIBE) {
- return xsub_t::xsetsockopt (option_, optval_, optvallen_);
+ errno = EINVAL;
+ return -1;
}
if (optvallen_ > 0 && !optval_) {
@@ -201,7 +202,7 @@ int xs::sub_t::filter_subscribed (const unsigned char *data_, size_t size_)
int rc = msg.init_size (size_ + 4);
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
- put_uint16 (data, SP_PUBSUB_CMD_SUBSCRIBE);
+ put_uint16 (data, XS_CMD_SUBSCRIBE);
put_uint16 (data + 2, options.filter);
memcpy (data + 4, data_, size_);
@@ -224,7 +225,7 @@ int xs::sub_t::filter_unsubscribed (const unsigned char *data_, size_t size_)
int rc = msg.init_size (size_ + 4);
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
- put_uint16 (data, SP_PUBSUB_CMD_UNSUBSCRIBE);
+ put_uint16 (data, XS_CMD_UNSUBSCRIBE);
put_uint16 (data + 2, options.filter);
memcpy (data + 4, data_, size_);
diff --git a/src/wire.hpp b/src/wire.hpp
index 014021b..f840fce 100644
--- a/src/wire.hpp
+++ b/src/wire.hpp
@@ -24,52 +24,11 @@
#include "stdint.hpp"
// Protocol-related constants.
-
-// Protocol header.
-#define SP_HEADER_LENGTH 8
-
-// Patterns.
-#define SP_PAIR 1
-#define SP_PUBSUB 2
-#define SP_REQREP 3
-#define SP_PIPELINE 4
-#define SP_SURVEY 5
-
-// Roles.
-#define SP_PAIR_PAIR 1
-#define SP_PUBSUB_PUB 1
-#define SP_PUBSUB_SUB 2
-#define SP_REQREP_REQ 1
-#define SP_REQREP_REP 2
-#define SP_PIPELINE_PUSH 1
-#define SP_PIPELINE_PULL 2
-#define SP_SURVEY_SURVEYOR 1
-#define SP_SURVEY_RESPONDENT 2
-
-// PUBSUB pattern commands.
-#define SP_PUBSUB_CMD_SUBSCRIBE 1
-#define SP_PUBSUB_CMD_UNSUBSCRIBE 2
+#define XS_CMD_SUBSCRIBE 1
+#define XS_CMD_UNSUBSCRIBE 2
namespace xs
{
- // Protocol header type.
- typedef unsigned char sp_header_t [SP_HEADER_LENGTH];
-
- // Get the SP protocol header for the specified service, pattern, version
- // and role.
- inline void sp_get_header (sp_header_t header_, int service_, int pattern_,
- int version_, int role_)
- {
- header_ [0] = 0; // Protocol identifier
- header_ [1] = 'S'; // "
- header_ [2] = 'P'; // "
- header_ [3] = 0; // Reserved, must be zero.
- header_ [4] = (service_ >> 8) & 0xff; // Service id high byte
- header_ [5] = service_ & 0xff; // Service id low byte
- header_ [6] = pattern_ & 0xff; // Pattern
- header_ [7] = (version_ & 0xf) << 4; // Pattern version
- header_ [7] |= role_ & 0xf; // Pattern role
- }
// Helper functions to convert different integer types to/from network
// byte order.
diff --git a/src/xpub.cpp b/src/xpub.cpp
index bc307e6..fbb45fb 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -35,10 +35,6 @@ xs::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
tmp_filter_id (-1)
{
options.type = XS_XPUB;
- options.sp_pattern = SP_PUBSUB;
- options.sp_version = 4;
- options.sp_role = SP_PUBSUB_PUB;
- options.sp_complement = SP_PUBSUB_SUB;
}
xs::xpub_t::~xpub_t ()
@@ -48,42 +44,6 @@ xs::xpub_t::~xpub_t ()
it->type->pf_destroy ((void*) (core_t*) this, it->instance);
}
-int xs::xpub_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PATTERN_VERSION) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- switch (version) {
- case 1:
- options.legacy_protocol = true;
- options.sp_version = 1;
- break;
- case 3:
- options.legacy_protocol = false;
- options.sp_version = 3;
- break;
- default:
- errno = EINVAL;
- return -1;
- }
-
- return 0;
-}
-
void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
@@ -93,7 +53,7 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
// to all data on this pipe, implicitly. Also, if we are using
// 0MQ/2.1-style protocol, there's no subscription forwarding. Thus,
// we need to subscribe for all messages automatically.
- if (icanhasall_|| pipe_->get_sp_version () == 1) {
+ if (icanhasall_|| pipe_->get_protocol () == 1) {
// Find the prefix filter.
// TODO: Change this to ALL filter.
@@ -161,8 +121,7 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_)
int filter_id = XS_FILTER_PREFIX;
#endif
- if (cmd != SP_PUBSUB_CMD_SUBSCRIBE &&
- cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) {
+ if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) {
sub.close ();
return;
}
@@ -174,7 +133,7 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_)
break;
bool unique;
- if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) {
+ if (cmd == XS_CMD_UNSUBSCRIBE) {
xs_assert (it != filters.end ());
unique = it->type->pf_unsubscribe ((void*) (core_t*) this,
it->instance, pipe_, data + 4, size - 4) ? true : false;
@@ -299,7 +258,7 @@ int xs::xpub_t::filter_unsubscribed (const unsigned char *data_, size_t size_)
// Place the unsubscription to the queue of pending (un)sunscriptions
// to be retrived by the user later on.
blob_t unsub (size_ + 4, 0);
- put_uint16 ((unsigned char*) unsub.data (), SP_PUBSUB_CMD_UNSUBSCRIBE);
+ put_uint16 ((unsigned char*) unsub.data (), XS_CMD_UNSUBSCRIBE);
put_uint16 ((unsigned char*) unsub.data () + 2, tmp_filter_id);
memcpy ((void*) (unsub.data () + 4), data_, size_);
diff --git a/src/xpub.hpp b/src/xpub.hpp
index 87cf6f5..c76f03c 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -49,7 +49,6 @@ namespace xs
~xpub_t ();
// Implementations of virtual functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
bool xhas_out ();
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 0e6acb2..2cf5742 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -36,10 +36,6 @@ xs::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
next_peer_id (generate_random ())
{
options.type = XS_XREP;
- options.sp_pattern = SP_REQREP;
- options.sp_version = 2;
- options.sp_role = SP_REQREP_REP;
- options.sp_complement = SP_REQREP_REQ;
// TODO: Uncomment the following line when XREP will become true XREP
// rather than generic router socket.
@@ -59,34 +55,6 @@ xs::xrep_t::~xrep_t ()
prefetched_msg.close ();
}
-int xs::xrep_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PATTERN_VERSION) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- if (version != 1) {
- errno = EINVAL;
- return -1;
- }
-
- options.sp_version = version;
- return 0;
-}
-
void xs::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 71a7bf4..8e16a4c 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -49,7 +49,6 @@ namespace xs
~xrep_t ();
// Overloads of functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (msg_t *msg_, int flags_);
int xrecv (msg_t *msg_, int flags_);
diff --git a/src/xreq.cpp b/src/xreq.cpp
index ac6ba4d..1c6af9d 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -22,17 +22,12 @@
#include "xreq.hpp"
#include "err.hpp"
#include "msg.hpp"
-#include "wire.hpp"
xs::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
prefetched (false)
{
options.type = XS_XREQ;
- options.sp_pattern = SP_REQREP;
- options.sp_version = 2;
- options.sp_role = SP_REQREP_REQ;
- options.sp_complement = SP_REQREP_REP;
// TODO: Uncomment the following line when XREQ will become true XREQ
// rather than generic dealer socket.
@@ -51,34 +46,6 @@ xs::xreq_t::~xreq_t ()
prefetched_msg.close ();
}
-int xs::xreq_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PATTERN_VERSION) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- if (version != 1) {
- errno = EINVAL;
- return -1;
- }
-
- options.sp_version = version;
- return 0;
-}
-
void xs::xreq_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/xreq.hpp b/src/xreq.hpp
index 9fe6ebb..c4f9baf 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -46,7 +46,6 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
int xrecv (xs::msg_t *msg_, int flags_);
diff --git a/src/xrespondent.cpp b/src/xrespondent.cpp
index 9d949ce..c16d900 100644
--- a/src/xrespondent.cpp
+++ b/src/xrespondent.cpp
@@ -35,10 +35,6 @@ xs::xrespondent_t::xrespondent_t (class ctx_t *parent_, uint32_t tid_,
next_peer_id (generate_random ())
{
options.type = XS_XRESPONDENT;
- options.sp_pattern = SP_SURVEY;
- options.sp_version = 2;
- options.sp_role = SP_SURVEY_RESPONDENT;
- options.sp_complement = SP_SURVEY_SURVEYOR;
// If the connection disappears it makes no sense to read any more surveys
// from it. The responses will be unroutable anyway.
@@ -53,34 +49,6 @@ xs::xrespondent_t::~xrespondent_t ()
prefetched_msg.close ();
}
-int xs::xrespondent_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PATTERN_VERSION) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- if (version != 1) {
- errno = EINVAL;
- return -1;
- }
-
- options.sp_version = version;
- return 0;
-}
-
void xs::xrespondent_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/xrespondent.hpp b/src/xrespondent.hpp
index 4143e36..eb6ac9a 100644
--- a/src/xrespondent.hpp
+++ b/src/xrespondent.hpp
@@ -47,7 +47,6 @@ namespace xs
~xrespondent_t ();
// Overloads of functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (msg_t *msg_, int flags_);
int xrecv (msg_t *msg_, int flags_);
diff --git a/src/xsub.cpp b/src/xsub.cpp
index 0a9ff7e..599b947 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -29,10 +29,6 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_XSUB;
- options.sp_pattern = SP_PUBSUB;
- options.sp_version = 4;
- options.sp_role = SP_PUBSUB_SUB;
- options.sp_complement = SP_PUBSUB_PUB;
// When socket is being closed down we don't want to wait till pending
// subscription commands are sent to the wire.
@@ -46,42 +42,6 @@ xs::xsub_t::~xsub_t ()
{
}
-int xs::xsub_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PATTERN_VERSION) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- switch (version) {
- case 1:
- options.legacy_protocol = true;
- options.sp_version = 1;
- break;
- case 3:
- options.legacy_protocol = false;
- options.sp_version = 3;
- break;
- default:
- errno = EINVAL;
- return -1;
- }
-
- return 0;
-}
-
void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
@@ -89,7 +49,7 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
// Pipes with 0MQ/2.1-style protocol are not eligible for accepting
// subscriptions.
- if (pipe_->get_sp_version () == 1)
+ if (pipe_->get_protocol () == 1)
return;
dist.attach (pipe_);
@@ -115,14 +75,14 @@ void xs::xsub_t::xwrite_activated (pipe_t *pipe_)
void xs::xsub_t::xterminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
- if (pipe_->get_sp_version () != 1)
+ if (pipe_->get_protocol () != 1)
dist.terminated (pipe_);
}
void xs::xsub_t::xhiccuped (pipe_t *pipe_)
{
// In 0MQ/2.1 protocol there is no subscription forwarding.
- if (pipe_->get_sp_version () == 1)
+ if (pipe_->get_protocol () == 1)
return;
// Send all the cached subscriptions to the new upstream peer.
@@ -144,13 +104,13 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)
}
int cmd = get_uint16 (data);
int filter_id = get_uint16 (data + 2);
- if (cmd != SP_PUBSUB_CMD_SUBSCRIBE && cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) {
+ if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) {
errno = EINVAL;
return -1;
}
// Process the subscription.
- if (cmd == SP_PUBSUB_CMD_SUBSCRIBE) {
+ if (cmd == XS_CMD_SUBSCRIBE) {
subscriptions_t::iterator it = subscriptions.insert (
std::make_pair (std::make_pair (filter_id,
blob_t (data + 4, size - 4)), 0)).first;
@@ -158,7 +118,7 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)
if (it->second == 1)
return dist.send_to_all (msg_, flags_);
}
- else if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) {
+ else if (cmd == XS_CMD_UNSUBSCRIBE) {
subscriptions_t::iterator it = subscriptions.find (
std::make_pair (filter_id, blob_t (data + 4, size - 4)));
if (it != subscriptions.end ()) {
@@ -202,8 +162,7 @@ void xs::xsub_t::send_subscription (pipe_t *pipe_, bool subscribe_,
int rc = msg.init_size (size_ + 4);
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
- put_uint16 (data, subscribe_ ? SP_PUBSUB_CMD_SUBSCRIBE :
- SP_PUBSUB_CMD_UNSUBSCRIBE);
+ put_uint16 (data, subscribe_ ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE);
put_uint16 (data + 2, filter_id_);
memcpy (data + 4, data_, size_);
diff --git a/src/xsub.hpp b/src/xsub.hpp
index 986f753..0cf81b5 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -47,7 +47,6 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
bool xhas_out ();
diff --git a/src/xsurveyor.cpp b/src/xsurveyor.cpp
index 7016550..985b6d8 100644
--- a/src/xsurveyor.cpp
+++ b/src/xsurveyor.cpp
@@ -21,16 +21,11 @@
#include "xsurveyor.hpp"
#include "err.hpp"
#include "msg.hpp"
-#include "wire.hpp"
xs::xsurveyor_t::xsurveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_XSURVEYOR;
- options.sp_pattern = SP_SURVEY;
- options.sp_version = 2;
- options.sp_role = SP_SURVEY_SURVEYOR;
- options.sp_complement = SP_SURVEY_RESPONDENT;
// When the XSURVEYOR socket is close it makes no sense to send any pending
// surveys. The responses will be unroutable anyway.
@@ -41,34 +36,6 @@ xs::xsurveyor_t::~xsurveyor_t ()
{
}
-int xs::xsurveyor_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PATTERN_VERSION) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- if (version != 1) {
- errno = EINVAL;
- return -1;
- }
-
- options.sp_version = version;
- return 0;
-}
-
void xs::xsurveyor_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
diff --git a/src/xsurveyor.hpp b/src/xsurveyor.hpp
index e312667..b0c3f9c 100644
--- a/src/xsurveyor.hpp
+++ b/src/xsurveyor.hpp
@@ -45,7 +45,6 @@ namespace xs
protected:
// Overloads of functions from socket_base_t.
- int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
int xsend (xs::msg_t *msg_, int flags_);
int xrecv (xs::msg_t *msg_, int flags_);
diff --git a/tests/libzmq21.cpp b/tests/libzmq21.cpp
index 3323ef0..ecf271b 100644
--- a/tests/libzmq21.cpp
+++ b/tests/libzmq21.cpp
@@ -53,9 +53,8 @@ int XS_TEST_MAIN ()
assert (ctx);
void *pub = xs_socket (ctx, XS_PUB);
assert (pub);
- int pattern_version = 1;
- int rc = xs_setsockopt (pub, XS_PATTERN_VERSION, &pattern_version,
- sizeof (pattern_version));
+ int protocol = 1;
+ int rc = xs_setsockopt (pub, XS_PROTOCOL, &protocol, sizeof (protocol));
assert (rc == 0);
rc = xs_bind (pub, "tcp://127.0.0.1:5560");
assert (rc != -1);
@@ -100,9 +99,8 @@ int XS_TEST_MAIN ()
assert (ctx);
void *sub = xs_socket (ctx, XS_SUB);
assert (sub);
- pattern_version = 1;
- rc = xs_setsockopt (sub, XS_PATTERN_VERSION, &pattern_version,
- sizeof (pattern_version));
+ protocol = 1;
+ rc = xs_setsockopt (sub, XS_PROTOCOL, &protocol, sizeof (protocol));
assert (rc == 0);
rc = xs_setsockopt (sub, XS_SUBSCRIBE, "", 0);
assert (rc == 0);
diff --git a/tests/wireformat.cpp b/tests/wireformat.cpp
index f12f5aa..f3e0f96 100644
--- a/tests/wireformat.cpp
+++ b/tests/wireformat.cpp
@@ -49,11 +49,7 @@ int XS_TEST_MAIN ()
assert (push);
// Bind the peer and get the message.
- int service_id = 0x1234;
- int rc = xs_setsockopt (pull, XS_SERVICE_ID, &service_id,
- sizeof service_id);
- assert (rc == 0);
- rc = xs_bind (pull, "tcp://127.0.0.1:5560");
+ int rc = xs_bind (pull, "tcp://127.0.0.1:5560");
assert (rc != -1);
rc = xs_bind (push, "tcp://127.0.0.1:5561");
assert (rc != -1);
@@ -75,10 +71,10 @@ int XS_TEST_MAIN ()
assert (rc == 0);
// Let's send some data and check if it arrived
- rc = send (rpush, "\0SP\0\x12\x34\x04\x31\x04\0abc", 13, 0);
- assert (rc == 13);
+ rc = send (rpush, "\x04\0abc", 5, 0);
+ assert (rc == 5);
unsigned char buf [3];
- unsigned char buf2 [8];
+ unsigned char buf2 [3];
rc = xs_recv (pull, buf, sizeof (buf), 0);
assert (rc == 3);
assert (!memcmp (buf, "abc", 3));
@@ -87,11 +83,8 @@ int XS_TEST_MAIN ()
rc = xs_send (push, buf, sizeof (buf), 0);
assert (rc == 3);
rc = recv (rpull, (char*) buf2, sizeof (buf2), 0);
- assert (rc == 8);
- assert (!memcmp (buf2, "\0SP\0\0\0\x04\x31", 8));
- rc = recv (rpull, (char*) buf2, 5, 0);
- assert (rc == 5);
- assert (!memcmp (buf2, "\x04\0abc", 5));
+ assert (rc == 3);
+ assert (!memcmp (buf2, "\x04\0abc", 3));
#if defined XS_HAVE_WINDOWS
rc = closesocket (rpush);