diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2012-05-01 16:41:04 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2012-05-01 16:41:04 +0200 |
commit | 971cbeda3de6691b212ad18bed9e9f08b278cfe1 (patch) | |
tree | adadbdfb63dd5d95e72a9eef54c738e20d81f013 | |
parent | dce17292e1f62ca692aa36ed3eaf5bfb09e7ab69 (diff) |
Revert "Implement protocol versioning (except PGM)"
This reverts commit 512f3a604924fec9d89e2b4bfd6f73aa66309fa7.
-rw-r--r-- | src/options.cpp | 23 | ||||
-rw-r--r-- | src/options.hpp | 10 | ||||
-rw-r--r-- | src/pair.cpp | 33 | ||||
-rw-r--r-- | src/pair.hpp | 1 | ||||
-rw-r--r-- | src/pipe.cpp | 14 | ||||
-rw-r--r-- | src/pipe.hpp | 16 | ||||
-rw-r--r-- | src/pull.cpp | 33 | ||||
-rw-r--r-- | src/pull.hpp | 1 | ||||
-rw-r--r-- | src/push.cpp | 33 | ||||
-rw-r--r-- | src/push.hpp | 1 | ||||
-rw-r--r-- | src/session_base.cpp | 2 | ||||
-rw-r--r-- | src/socket_base.cpp | 4 | ||||
-rw-r--r-- | src/stream_engine.cpp | 58 | ||||
-rw-r--r-- | src/stream_engine.hpp | 17 | ||||
-rw-r--r-- | src/sub.cpp | 7 | ||||
-rw-r--r-- | src/wire.hpp | 43 | ||||
-rw-r--r-- | src/xpub.cpp | 49 | ||||
-rw-r--r-- | src/xpub.hpp | 1 | ||||
-rw-r--r-- | src/xrep.cpp | 32 | ||||
-rw-r--r-- | src/xrep.hpp | 1 | ||||
-rw-r--r-- | src/xreq.cpp | 33 | ||||
-rw-r--r-- | src/xreq.hpp | 1 | ||||
-rw-r--r-- | src/xrespondent.cpp | 32 | ||||
-rw-r--r-- | src/xrespondent.hpp | 1 | ||||
-rw-r--r-- | src/xsub.cpp | 55 | ||||
-rw-r--r-- | src/xsub.hpp | 1 | ||||
-rw-r--r-- | src/xsurveyor.cpp | 33 | ||||
-rw-r--r-- | src/xsurveyor.hpp | 1 | ||||
-rw-r--r-- | tests/wireformat.cpp | 13 |
29 files changed, 60 insertions, 489 deletions
diff --git a/src/options.cpp b/src/options.cpp index d362c18..f7bbdc4 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -48,11 +48,7 @@ xs::options_t::options_t () : sndtimeo (-1), ipv4only (1), keepalive (0), - legacy_protocol (false), - sp_pattern (-1), - sp_version (-1), - sp_role (-1), - sp_complement (-1), + protocol (0), filter (XS_FILTER_PREFIX), survey_timeout (-1), delay_on_close (true), @@ -240,6 +236,21 @@ int xs::options_t::setsockopt (int option_, const void *optval_, return 0; } + case XS_PROTOCOL: + { + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + int val = *((int*) optval_); + if (val < 0) { + errno = EINVAL; + return -1; + } + protocol = val; + return 0; + } + case XS_FILTER: if (optvallen_ != sizeof (int)) { errno = EINVAL; @@ -446,7 +457,7 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) errno = EINVAL; return -1; } - *((int*) optval_) = sp_version; + *((int*) optval_) = protocol; *optvallen_ = sizeof (int); return 0; diff --git a/src/options.hpp b/src/options.hpp index 89d6d2a..805f793 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -92,14 +92,8 @@ namespace xs // If 1, keepalives are to be sent periodically. int keepalive; - // If true, the legacy non-SP wire protocol is in use. - bool legacy_protocol; - - // SP protocol pattern, version, role and complementary role. - int sp_pattern; - int sp_version; - int sp_role; - int sp_complement; + // Version of wire protocol to use. + int protocol; // Filter ID to be used with subscriptions and unsubscriptions. int filter; diff --git a/src/pair.cpp b/src/pair.cpp index d4f699e..b4cb0b4 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -23,17 +23,12 @@ #include "err.hpp" #include "pipe.hpp" #include "msg.hpp" -#include "wire.hpp" xs::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), pipe (NULL) { options.type = XS_PAIR; - options.sp_pattern = SP_PAIR; - options.sp_role = SP_PAIR_PAIR; - options.sp_version = 2; - options.sp_complement = SP_PAIR_PAIR; } xs::pair_t::~pair_t () @@ -41,34 +36,6 @@ xs::pair_t::~pair_t () xs_assert (!pipe); } -int xs::pair_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - if (option_ != XS_PROTOCOL) { - errno = EINVAL; - return -1; - } - - if (optvallen_ != sizeof (int)) { - errno = EINVAL; - return -1; - } - - if (!optval_) { - errno = EFAULT; - return -1; - } - - int version = *(int *) optval_; - if (version != 2) { - errno = EINVAL; - return -1; - } - - options.sp_version = version; - return 0; -} - void xs::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (!pipe); diff --git a/src/pair.hpp b/src/pair.hpp index 6a5f167..07ed6c5 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -42,7 +42,6 @@ namespace xs ~pair_t (); // Overloads of functions from socket_base_t. - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (xs::msg_t *msg_, int flags_); int xrecv (xs::msg_t *msg_, int flags_); diff --git a/src/pipe.cpp b/src/pipe.cpp index c14642f..0a15cc0 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -27,7 +27,7 @@ #include "err.hpp" int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2], int sp_version_) + int hwms_ [2], bool delays_ [2], int protocol_) { // Creates two pipe objects. These objects are connected by two ypipes, // each to pass messages in one direction. @@ -38,10 +38,10 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], alloc_assert (upipe2); pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, - hwms_ [1], hwms_ [0], delays_ [0], sp_version_); + hwms_ [1], hwms_ [0], delays_ [0], protocol_); alloc_assert (pipes_ [0]); pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, - hwms_ [0], hwms_ [1], delays_ [1], sp_version_); + hwms_ [0], hwms_ [1], delays_ [1], protocol_); alloc_assert (pipes_ [1]); pipes_ [0]->set_peer (pipes_ [1]); @@ -51,7 +51,7 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], } xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_, int sp_version_) : + int inhwm_, int outhwm_, bool delay_, int protocol_) : object_t (parent_), inpipe (inpipe_), outpipe (outpipe_), @@ -66,7 +66,7 @@ xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, sink (NULL), state (active), delay (delay_), - sp_version (sp_version_) + protocol (protocol_) { } @@ -386,9 +386,9 @@ void xs::pipe_t::terminate (bool delay_) } } -int xs::pipe_t::get_sp_version () +int xs::pipe_t::get_protocol () { - return sp_version; + return protocol; } bool xs::pipe_t::is_delimiter (msg_t &msg_) diff --git a/src/pipe.hpp b/src/pipe.hpp index dbc6b7e..c298154 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -44,7 +44,7 @@ namespace xs // pipe receives all the pending messages before terminating, otherwise it // terminates straight away. int pipepair (xs::object_t *parents_ [2], xs::pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2], int sp_version_); + int hwms_ [2], bool delays_ [2], int protocol_); struct i_pipe_events { @@ -69,7 +69,7 @@ namespace xs // This allows pipepair to create pipe objects. friend int pipepair (xs::object_t *parents_ [2], xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2], - int sp_version_); + int protocol_); public: @@ -111,8 +111,8 @@ namespace xs // before actual shutdown. void terminate (bool delay_); - // Returns the SP pattern version in use on this pipe. - int get_sp_version (); + // Returns the ID of the protocol associated with the pipe. + int get_protocol (); private: @@ -132,7 +132,7 @@ namespace xs // Constructor is private. Pipe can only be created using // pipepair function. pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_, int sp_version_); + int inhwm_, int outhwm_, bool delay_, int protocol_); // Pipepair uses this function to let us know about // the peer pipe object. @@ -192,9 +192,9 @@ namespace xs // asks us to. bool delay; - // SP pattern version in use on this pipe. This value is used by the - // pattern classes using the pipe, not the pipe itself. - int sp_version; + // ID of the protocol to use. This value is not used by the pipe + // itself, rather it's used by the users of the pipe. + int protocol; // Identity of the writer. Used uniquely by the reader side. blob_t identity; diff --git a/src/pull.cpp b/src/pull.cpp index 4d08edb..8ae8208 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -23,50 +23,17 @@ #include "err.hpp" #include "msg.hpp" #include "pipe.hpp" -#include "wire.hpp" xs::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_) { options.type = XS_PULL; - options.sp_pattern = SP_PIPELINE; - options.sp_version = 2; - options.sp_role = SP_PIPELINE_PULL; - options.sp_complement = SP_PIPELINE_PUSH; } xs::pull_t::~pull_t () { } -int xs::pull_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - if (option_ != XS_PROTOCOL) { - errno = EINVAL; - return -1; - } - - if (optvallen_ != sizeof (int)) { - errno = EINVAL; - return -1; - } - - if (!optval_) { - errno = EFAULT; - return -1; - } - - int version = *(int *) optval_; - if (version != 2) { - errno = EINVAL; - return -1; - } - - options.sp_version = version; - return 0; -} - void xs::pull_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); diff --git a/src/pull.hpp b/src/pull.hpp index 5453bbd..04da465 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -45,7 +45,6 @@ namespace xs protected: // Overloads of functions from socket_base_t. - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xrecv (xs::msg_t *msg_, int flags_); bool xhas_in (); diff --git a/src/push.cpp b/src/push.cpp index 94f8381..59f508f 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -23,50 +23,17 @@ #include "pipe.hpp" #include "err.hpp" #include "msg.hpp" -#include "wire.hpp" xs::push_t::push_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_) { options.type = XS_PUSH; - options.sp_pattern = SP_PIPELINE; - options.sp_version = 2; - options.sp_role = SP_PIPELINE_PUSH; - options.sp_complement = SP_PIPELINE_PULL; } xs::push_t::~push_t () { } -int xs::push_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - if (option_ != XS_PROTOCOL) { - errno = EINVAL; - return -1; - } - - if (optvallen_ != sizeof (int)) { - errno = EINVAL; - return -1; - } - - if (!optval_) { - errno = EFAULT; - return -1; - } - - int version = *(int *) optval_; - if (version != 2) { - errno = EINVAL; - return -1; - } - - options.sp_version = version; - return 0; -} - void xs::push_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); diff --git a/src/push.hpp b/src/push.hpp index ffe16cc..a112f31 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -45,7 +45,6 @@ namespace xs protected: // Overloads of functions from socket_base_t. - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (xs::msg_t *msg_, int flags_); bool xhas_out (); diff --git a/src/session_base.cpp b/src/session_base.cpp index 9b58b42..35c4a4e 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -298,7 +298,7 @@ void xs::session_base_t::process_attach (i_engine *engine_) pipe_t *pipes [2] = {NULL, NULL}; int hwms [2] = {options.rcvhwm, options.sndhwm}; bool delays [2] = {options.delay_on_close, options.delay_on_disconnect}; - int rc = pipepair (parents, pipes, hwms, delays, options.sp_version); + int rc = pipepair (parents, pipes, hwms, delays, options.protocol); errno_assert (rc == 0); // Plug the local end of the pipe. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index e8ae28a..ec5c8bd 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -447,7 +447,7 @@ int xs::socket_base_t::connect (const char *addr_) pipe_t *ppair [2] = {NULL, NULL}; int hwms [2] = {sndhwm, rcvhwm}; bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; - rc = pipepair (parents, ppair, hwms, delays, options.sp_version); + rc = pipepair (parents, ppair, hwms, delays, options.protocol); errno_assert (rc == 0); // Attach local end of the pipe to this socket object. @@ -501,7 +501,7 @@ int xs::socket_base_t::connect (const char *addr_) pipe_t *ppair [2] = {NULL, NULL}; int hwms [2] = {options.sndhwm, options.rcvhwm}; bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; - rc = pipepair (parents, ppair, hwms, delays, options.sp_version); + rc = pipepair (parents, ppair, hwms, delays, options.protocol); errno_assert (rc == 0); // PGM does not support subscription forwarding; ask for all data to be diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 85e3073..e70d1d3 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -53,21 +53,8 @@ xs::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : session (NULL), leftover_session (NULL), options (options_), - plugged (false), - header_pos (in_header), - header_remaining (sizeof in_header), - header_received (false), - header_sent (false) + plugged (false) { - // Fill in outgoing SP protocol header and the complementary (desired) - // header. - if (!options.legacy_protocol) { - sp_get_header (out_header, options.sp_pattern, options.sp_version, - options.sp_role); - sp_get_header (desired_header, options.sp_pattern, options.sp_version, - options.sp_complement); - } - // Get the socket into non-blocking mode. unblock_socket (s); @@ -168,35 +155,6 @@ void xs::stream_engine_t::in_event (fd_t fd_) { bool disconnection = false; - // If we have not yet received the full protocol header... - if (unlikely (!options.legacy_protocol && !header_received)) { - - // Read remaining header bytes. - int hbytes = read (header_pos, header_remaining); - - // Check whether the peer has closed the connection. - if (hbytes == -1) { - error (); - return; - } - - header_remaining -= hbytes; - header_pos += hbytes; - - // If we did not read the whole header, poll for more. - if (header_remaining) - return; - - // If the protocol headers do not match, close the connection. - if (memcmp (in_header, desired_header, sizeof in_header) != 0) { - error (); - return; - } - - // Done with protocol header; proceed to read data. - header_received = true; - } - // If there's no data to process in the buffer... if (!insize) { @@ -252,20 +210,6 @@ void xs::stream_engine_t::out_event (fd_t fd_) { bool more_data = true; - // If protocol header was not yet sent... - if (unlikely (!options.legacy_protocol && !header_sent)) { - int hbytes = write (out_header, sizeof out_header); - - // It should always be possible to write the full protocol header to a - // freshly connected TCP socket. Therefore, if we get an error or - // partial write here the peer has disconnected. - if (hbytes != sizeof out_header) { - error (); - return; - } - header_sent = true; - } - // If write buffer is empty, try to read new data from the encoder. if (!outsize) { diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 70c9ddb..46a6080 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -30,7 +30,6 @@ #include "encoder.hpp" #include "decoder.hpp" #include "options.hpp" -#include "wire.hpp" namespace xs { @@ -99,22 +98,6 @@ namespace xs bool plugged; - // Outgoing protocol header. - sp_header_t out_header; - - // Desired protocol header. - sp_header_t desired_header; - - // Incoming protocol header. - sp_header_t in_header; - - unsigned char *header_pos; - size_t header_remaining; - - // Protocol header has been received/sent. - bool header_received; - bool header_sent; - stream_engine_t (const stream_engine_t&); const stream_engine_t &operator = (const stream_engine_t&); }; diff --git a/src/sub.cpp b/src/sub.cpp index 79cb63c..d29ae8d 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -48,7 +48,8 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { if (option_ != XS_SUBSCRIBE && option_ != XS_UNSUBSCRIBE) { - return xsub_t::xsetsockopt (option_, optval_, optvallen_); + errno = EINVAL; + return -1; } if (optvallen_ > 0 && !optval_) { @@ -201,7 +202,7 @@ int xs::sub_t::filter_subscribed (const unsigned char *data_, size_t size_) int rc = msg.init_size (size_ + 4); errno_assert (rc == 0); unsigned char *data = (unsigned char*) msg.data (); - put_uint16 (data, SP_PUBSUB_CMD_SUBSCRIBE); + put_uint16 (data, XS_CMD_SUBSCRIBE); put_uint16 (data + 2, options.filter); memcpy (data + 4, data_, size_); @@ -224,7 +225,7 @@ int xs::sub_t::filter_unsubscribed (const unsigned char *data_, size_t size_) int rc = msg.init_size (size_ + 4); errno_assert (rc == 0); unsigned char *data = (unsigned char*) msg.data (); - put_uint16 (data, SP_PUBSUB_CMD_UNSUBSCRIBE); + put_uint16 (data, XS_CMD_UNSUBSCRIBE); put_uint16 (data + 2, options.filter); memcpy (data + 4, data_, size_); diff --git a/src/wire.hpp b/src/wire.hpp index e751818..f840fce 100644 --- a/src/wire.hpp +++ b/src/wire.hpp @@ -24,50 +24,11 @@ #include "stdint.hpp" // Protocol-related constants. - -// Protocol header. -#define SP_HEADER_LENGTH 8 - -// Patterns. -#define SP_PAIR 1 -#define SP_PUBSUB 2 -#define SP_REQREP 3 -#define SP_PIPELINE 4 -#define SP_SURVEY 5 - -// Roles. -#define SP_PAIR_PAIR 1 -#define SP_PUBSUB_PUB 1 -#define SP_PUBSUB_SUB 2 -#define SP_REQREP_REQ 1 -#define SP_REQREP_REP 2 -#define SP_PIPELINE_PUSH 1 -#define SP_PIPELINE_PULL 2 -#define SP_SURVEY_SURVEYOR 1 -#define SP_SURVEY_RESPONDENT 2 - -// PUBSUB pattern commands. -#define SP_PUBSUB_CMD_SUBSCRIBE 1 -#define SP_PUBSUB_CMD_UNSUBSCRIBE 2 +#define XS_CMD_SUBSCRIBE 1 +#define XS_CMD_UNSUBSCRIBE 2 namespace xs { - // Protocol header type. - typedef unsigned char sp_header_t [SP_HEADER_LENGTH]; - - // Get the SP protocol header for the specified pattern, version and role. - inline void sp_get_header (sp_header_t header_, int pattern_, int version_, - int role_) - { - header_ [0] = 0; - header_ [1] = 0; - header_ [2] = 'S'; - header_ [3] = 'P'; - header_ [4] = pattern_ & 0xff; - header_ [5] = version_ & 0xff; - header_ [6] = role_ & 0xff; - header_ [7] = 0; - } // Helper functions to convert different integer types to/from network // byte order. diff --git a/src/xpub.cpp b/src/xpub.cpp index b176bf8..fe0b9a7 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -35,10 +35,6 @@ xs::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : tmp_filter_id (-1) { options.type = XS_XPUB; - options.sp_pattern = SP_PUBSUB; - options.sp_version = 3; - options.sp_role = SP_PUBSUB_PUB; - options.sp_complement = SP_PUBSUB_SUB; } xs::xpub_t::~xpub_t () @@ -48,42 +44,6 @@ xs::xpub_t::~xpub_t () it->type->pf_destroy ((void*) (core_t*) this, it->instance); } -int xs::xpub_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - if (option_ != XS_PROTOCOL) { - errno = EINVAL; - return -1; - } - - if (optvallen_ != sizeof (int)) { - errno = EINVAL; - return -1; - } - - if (!optval_) { - errno = EFAULT; - return -1; - } - - int version = *(int *) optval_; - switch (version) { - case 1: - options.legacy_protocol = true; - options.sp_version = 1; - break; - case 3: - options.legacy_protocol = false; - options.sp_version = 3; - break; - default: - errno = EINVAL; - return -1; - } - - return 0; -} - void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); @@ -93,7 +53,7 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) // to all data on this pipe, implicitly. Also, if we are using // 0MQ/2.1-style protocol, there's no subscription forwarding. Thus, // we need to subscribe for all messages automatically. - if (icanhasall_|| pipe_->get_sp_version () == 1) { + if (icanhasall_|| pipe_->get_protocol () == 1) { // Find the prefix filter. // TODO: Change this to ALL filter. @@ -161,8 +121,7 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_) int filter_id = XS_FILTER_PREFIX; #endif - if (cmd != SP_PUBSUB_CMD_SUBSCRIBE && - cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) { + if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) { sub.close (); return; } @@ -174,7 +133,7 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_) break; bool unique; - if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) { + if (cmd == XS_CMD_UNSUBSCRIBE) { xs_assert (it != filters.end ()); unique = it->type->pf_unsubscribe ((void*) (core_t*) this, it->instance, pipe_, data + 4, size - 4) ? true : false; @@ -299,7 +258,7 @@ int xs::xpub_t::filter_unsubscribed (const unsigned char *data_, size_t size_) // Place the unsubscription to the queue of pending (un)sunscriptions // to be retrived by the user later on. blob_t unsub (size_ + 4, 0); - put_uint16 ((unsigned char*) unsub.data (), SP_PUBSUB_CMD_UNSUBSCRIBE); + put_uint16 ((unsigned char*) unsub.data (), XS_CMD_UNSUBSCRIBE); put_uint16 ((unsigned char*) unsub.data () + 2, tmp_filter_id); memcpy ((void*) (unsub.data () + 4), data_, size_); diff --git a/src/xpub.hpp b/src/xpub.hpp index f9cb737..c0e47ff 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -49,7 +49,6 @@ namespace xs ~xpub_t (); // Implementations of virtual functions from socket_base_t. - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (xs::msg_t *msg_, int flags_); bool xhas_out (); diff --git a/src/xrep.cpp b/src/xrep.cpp index 5e9fc8a..007ed27 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -36,10 +36,6 @@ xs::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_, int sid_) : next_peer_id (generate_random ()) { options.type = XS_XREP; - options.sp_pattern = SP_REQREP; - options.sp_version = 1; - options.sp_role = SP_REQREP_REP; - options.sp_complement = SP_REQREP_REQ; // TODO: Uncomment the following line when XREP will become true XREP // rather than generic router socket. @@ -59,34 +55,6 @@ xs::xrep_t::~xrep_t () prefetched_msg.close (); } -int xs::xrep_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - if (option_ != XS_PROTOCOL) { - errno = EINVAL; - return -1; - } - - if (optvallen_ != sizeof (int)) { - errno = EINVAL; - return -1; - } - - if (!optval_) { - errno = EFAULT; - return -1; - } - - int version = *(int *) optval_; - if (version != 1) { - errno = EINVAL; - return -1; - } - - options.sp_version = version; - return 0; -} - void xs::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); diff --git a/src/xrep.hpp b/src/xrep.hpp index 71a7bf4..8e16a4c 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -49,7 +49,6 @@ namespace xs ~xrep_t (); // Overloads of functions from socket_base_t. - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (msg_t *msg_, int flags_); int xrecv (msg_t *msg_, int flags_); diff --git a/src/xreq.cpp b/src/xreq.cpp index e42c691..1c6af9d 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -22,17 +22,12 @@ #include "xreq.hpp" #include "err.hpp" #include "msg.hpp" -#include "wire.hpp" xs::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), prefetched (false) { options.type = XS_XREQ; - options.sp_pattern = SP_REQREP; - options.sp_version = 1; - options.sp_role = SP_REQREP_REQ; - options.sp_complement = SP_REQREP_REP; // TODO: Uncomment the following line when XREQ will become true XREQ // rather than generic dealer socket. @@ -51,34 +46,6 @@ xs::xreq_t::~xreq_t () prefetched_msg.close (); } -int xs::xreq_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - if (option_ != XS_PROTOCOL) { - errno = EINVAL; - return -1; - } - - if (optvallen_ != sizeof (int)) { - errno = EINVAL; - return -1; - } - - if (!optval_) { - errno = EFAULT; - return -1; - } - - int version = *(int *) optval_; - if (version != 1) { - errno = EINVAL; - return -1; - } - - options.sp_version = version; - return 0; -} - void xs::xreq_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); diff --git a/src/xreq.hpp b/src/xreq.hpp index 9fe6ebb..c4f9baf 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -46,7 +46,6 @@ namespace xs protected: // Overloads of functions from socket_base_t. - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (xs::msg_t *msg_, int flags_); int xrecv (xs::msg_t *msg_, int flags_); diff --git a/src/xrespondent.cpp b/src/xrespondent.cpp index bd19913..c16d900 100644 --- a/src/xrespondent.cpp +++ b/src/xrespondent.cpp @@ -35,10 +35,6 @@ xs::xrespondent_t::xrespondent_t (class ctx_t *parent_, uint32_t tid_, next_peer_id (generate_random ()) { options.type = XS_XRESPONDENT; - options.sp_pattern = SP_SURVEY; - options.sp_version = 1; - options.sp_role = SP_SURVEY_RESPONDENT; - options.sp_complement = SP_SURVEY_SURVEYOR; // If the connection disappears it makes no sense to read any more surveys // from it. The responses will be unroutable anyway. @@ -53,34 +49,6 @@ xs::xrespondent_t::~xrespondent_t () prefetched_msg.close (); } -int xs::xrespondent_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - if (option_ != XS_PROTOCOL) { - errno = EINVAL; - return -1; - } - - if (optvallen_ != sizeof (int)) { - errno = EINVAL; - return -1; - } - - if (!optval_) { - errno = EFAULT; - return -1; - } - - int version = *(int *) optval_; - if (version != 1) { - errno = EINVAL; - return -1; - } - - options.sp_version = version; - return 0; -} - void xs::xrespondent_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); diff --git a/src/xrespondent.hpp b/src/xrespondent.hpp index 4143e36..eb6ac9a 100644 --- a/src/xrespondent.hpp +++ b/src/xrespondent.hpp @@ -47,7 +47,6 @@ namespace xs ~xrespondent_t (); // Overloads of functions from socket_base_t. - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (msg_t *msg_, int flags_); int xrecv (msg_t *msg_, int flags_); diff --git a/src/xsub.cpp b/src/xsub.cpp index 18acfea..7272a8e 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -29,10 +29,6 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_) { options.type = XS_XSUB; - options.sp_pattern = SP_PUBSUB; - options.sp_version = 3; - options.sp_role = SP_PUBSUB_SUB; - options.sp_complement = SP_PUBSUB_PUB; // When socket is being closed down we don't want to wait till pending // subscription commands are sent to the wire. @@ -46,42 +42,6 @@ xs::xsub_t::~xsub_t () { } -int xs::xsub_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - if (option_ != XS_PROTOCOL) { - errno = EINVAL; - return -1; - } - - if (optvallen_ != sizeof (int)) { - errno = EINVAL; - return -1; - } - - if (!optval_) { - errno = EFAULT; - return -1; - } - - int version = *(int *) optval_; - switch (version) { - case 1: - options.legacy_protocol = true; - options.sp_version = 1; - break; - case 3: - options.legacy_protocol = false; - options.sp_version = 3; - break; - default: - errno = EINVAL; - return -1; - } - - return 0; -} - void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); @@ -89,7 +49,7 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) // Pipes with 0MQ/2.1-style protocol are not eligible for accepting // subscriptions. - if (pipe_->get_sp_version () == 1) + if (pipe_->get_protocol () == 1) return; dist.attach (pipe_); @@ -115,14 +75,14 @@ void xs::xsub_t::xwrite_activated (pipe_t *pipe_) void xs::xsub_t::xterminated (pipe_t *pipe_) { fq.terminated (pipe_); - if (pipe_->get_sp_version () != 1) + if (pipe_->get_protocol () != 1) dist.terminated (pipe_); } void xs::xsub_t::xhiccuped (pipe_t *pipe_) { // In 0MQ/2.1 protocol there is no subscription forwarding. - if (pipe_->get_sp_version () == 1) + if (pipe_->get_protocol () == 1) return; // Send all the cached subscriptions to the new upstream peer. @@ -144,13 +104,13 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_) } int cmd = get_uint16 (data); int filter_id = get_uint16 (data + 2); - if (cmd != SP_PUBSUB_CMD_SUBSCRIBE && cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) { + if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) { errno = EINVAL; return -1; } // Process the subscription. - if (cmd == SP_PUBSUB_CMD_SUBSCRIBE) { + if (cmd == XS_CMD_SUBSCRIBE) { subscriptions_t::iterator it = subscriptions.insert ( std::make_pair (std::make_pair (filter_id, blob_t (data + 4, size - 4)), 0)).first; @@ -158,7 +118,7 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_) if (it->second == 1) return dist.send_to_all (msg_, flags_); } - else if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) { + else if (cmd == XS_CMD_UNSUBSCRIBE) { subscriptions_t::iterator it = subscriptions.find ( std::make_pair (filter_id, blob_t (data + 4, size - 4))); if (it != subscriptions.end ()) { @@ -202,8 +162,7 @@ void xs::xsub_t::send_subscription (pipe_t *pipe_, bool subscribe_, int rc = msg.init_size (size_ + 4); xs_assert (rc == 0); unsigned char *data = (unsigned char*) msg.data (); - put_uint16 (data, subscribe_ ? SP_PUBSUB_CMD_SUBSCRIBE : - SP_PUBSUB_CMD_UNSUBSCRIBE); + put_uint16 (data, subscribe_ ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE); put_uint16 (data + 2, filter_id_); memcpy (data + 4, data_, size_); diff --git a/src/xsub.hpp b/src/xsub.hpp index fcaf61c..5bc11c4 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -47,7 +47,6 @@ namespace xs protected: // Overloads of functions from socket_base_t. - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (xs::msg_t *msg_, int flags_); bool xhas_out (); diff --git a/src/xsurveyor.cpp b/src/xsurveyor.cpp index d02ea09..985b6d8 100644 --- a/src/xsurveyor.cpp +++ b/src/xsurveyor.cpp @@ -21,16 +21,11 @@ #include "xsurveyor.hpp" #include "err.hpp" #include "msg.hpp" -#include "wire.hpp" xs::xsurveyor_t::xsurveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_) { options.type = XS_XSURVEYOR; - options.sp_pattern = SP_SURVEY; - options.sp_version = 1; - options.sp_role = SP_SURVEY_SURVEYOR; - options.sp_complement = SP_SURVEY_RESPONDENT; // When the XSURVEYOR socket is close it makes no sense to send any pending // surveys. The responses will be unroutable anyway. @@ -41,34 +36,6 @@ xs::xsurveyor_t::~xsurveyor_t () { } -int xs::xsurveyor_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - if (option_ != XS_PROTOCOL) { - errno = EINVAL; - return -1; - } - - if (optvallen_ != sizeof (int)) { - errno = EINVAL; - return -1; - } - - if (!optval_) { - errno = EFAULT; - return -1; - } - - int version = *(int *) optval_; - if (version != 1) { - errno = EINVAL; - return -1; - } - - options.sp_version = version; - return 0; -} - void xs::xsurveyor_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); diff --git a/src/xsurveyor.hpp b/src/xsurveyor.hpp index e312667..b0c3f9c 100644 --- a/src/xsurveyor.hpp +++ b/src/xsurveyor.hpp @@ -45,7 +45,6 @@ namespace xs protected: // Overloads of functions from socket_base_t. - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (xs::msg_t *msg_, int flags_); int xrecv (xs::msg_t *msg_, int flags_); diff --git a/tests/wireformat.cpp b/tests/wireformat.cpp index 7bf21f3..f3e0f96 100644 --- a/tests/wireformat.cpp +++ b/tests/wireformat.cpp @@ -71,10 +71,10 @@ int XS_TEST_MAIN () assert (rc == 0); // Let's send some data and check if it arrived - rc = send (rpush, "\0\0SP\x04\x02\x01\0\x04\0abc", 13, 0); - assert (rc == 13); + rc = send (rpush, "\x04\0abc", 5, 0); + assert (rc == 5); unsigned char buf [3]; - unsigned char buf2 [8]; + unsigned char buf2 [3]; rc = xs_recv (pull, buf, sizeof (buf), 0); assert (rc == 3); assert (!memcmp (buf, "abc", 3)); @@ -83,11 +83,8 @@ int XS_TEST_MAIN () rc = xs_send (push, buf, sizeof (buf), 0); assert (rc == 3); rc = recv (rpull, (char*) buf2, sizeof (buf2), 0); - assert (rc == 8); - assert (!memcmp (buf2, "\0\0SP\x04\x02\x01\0", 8)); - rc = recv (rpull, (char*) buf2, 5, 0); - assert (rc == 5); - assert (!memcmp (buf2, "\x04\0abc", 5)); + assert (rc == 3); + assert (!memcmp (buf2, "\x04\0abc", 3)); #if defined XS_HAVE_WINDOWS rc = closesocket (rpush); |