diff options
| -rw-r--r-- | include/xs/xs.h | 3 | ||||
| -rw-r--r-- | src/options.cpp | 42 | ||||
| -rw-r--r-- | src/options.hpp | 12 | ||||
| -rw-r--r-- | src/pair.cpp | 33 | ||||
| -rw-r--r-- | src/pair.hpp | 1 | ||||
| -rw-r--r-- | src/pgm_receiver.cpp | 31 | ||||
| -rw-r--r-- | src/pgm_receiver.hpp | 4 | ||||
| -rw-r--r-- | src/pgm_sender.cpp | 39 | ||||
| -rw-r--r-- | src/pgm_sender.hpp | 15 | ||||
| -rw-r--r-- | src/pipe.cpp | 14 | ||||
| -rw-r--r-- | src/pipe.hpp | 16 | ||||
| -rw-r--r-- | src/pull.cpp | 33 | ||||
| -rw-r--r-- | src/pull.hpp | 1 | ||||
| -rw-r--r-- | src/push.cpp | 33 | ||||
| -rw-r--r-- | src/push.hpp | 1 | ||||
| -rw-r--r-- | src/session_base.cpp | 2 | ||||
| -rw-r--r-- | src/socket_base.cpp | 4 | ||||
| -rw-r--r-- | src/stream_engine.cpp | 58 | ||||
| -rw-r--r-- | src/stream_engine.hpp | 17 | ||||
| -rw-r--r-- | src/sub.cpp | 7 | ||||
| -rw-r--r-- | src/wire.hpp | 45 | ||||
| -rw-r--r-- | src/xpub.cpp | 49 | ||||
| -rw-r--r-- | src/xpub.hpp | 1 | ||||
| -rw-r--r-- | src/xrep.cpp | 32 | ||||
| -rw-r--r-- | src/xrep.hpp | 1 | ||||
| -rw-r--r-- | src/xreq.cpp | 33 | ||||
| -rw-r--r-- | src/xreq.hpp | 1 | ||||
| -rw-r--r-- | src/xrespondent.cpp | 32 | ||||
| -rw-r--r-- | src/xrespondent.hpp | 1 | ||||
| -rw-r--r-- | src/xsub.cpp | 55 | ||||
| -rw-r--r-- | src/xsub.hpp | 1 | ||||
| -rw-r--r-- | src/xsurveyor.cpp | 33 | ||||
| -rw-r--r-- | src/xsurveyor.hpp | 1 | ||||
| -rw-r--r-- | tests/libzmq21.cpp | 10 | ||||
| -rw-r--r-- | tests/wireformat.cpp | 19 | 
35 files changed, 83 insertions, 597 deletions
| diff --git a/include/xs/xs.h b/include/xs/xs.h index 120bd1f..74d5f34 100644 --- a/include/xs/xs.h +++ b/include/xs/xs.h @@ -206,9 +206,8 @@ XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval,  #define XS_SNDTIMEO 28  #define XS_IPV4ONLY 31  #define XS_KEEPALIVE 32 -#define XS_PATTERN_VERSION 33 +#define XS_PROTOCOL 33  #define XS_SURVEY_TIMEOUT 35 -#define XS_SERVICE_ID 36  /*  Message options                                                           */  #define XS_MORE 1 diff --git a/src/options.cpp b/src/options.cpp index 26fa62c..62c03a2 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -48,12 +48,7 @@ xs::options_t::options_t () :      sndtimeo (-1),      ipv4only (1),      keepalive (0), -    legacy_protocol (false), -    sp_service (0), -    sp_pattern (-1), -    sp_version (-1), -    sp_role (-1), -    sp_complement (-1), +    protocol (0),      filter (XS_FILTER_PREFIX),      survey_timeout (-1),      delay_on_close (true), @@ -241,29 +236,29 @@ int xs::options_t::setsockopt (int option_, const void *optval_,              return 0;          } -    case XS_FILTER: -        if (optvallen_ != sizeof (int)) { -            errno = EINVAL; -            return -1; -        } -        filter = *((int*) optval_); -        return 0; - -    case XS_SERVICE_ID: +    case XS_PROTOCOL:          {              if (optvallen_ != sizeof (int)) {                  errno = EINVAL;                  return -1;              }              int val = *((int*) optval_); -            if (val < 0 || val > 0xffff) { +            if (val < 0) {                  errno = EINVAL;                  return -1;              } -            sp_service = val; +            protocol = val;              return 0;          } +    case XS_FILTER: +        if (optvallen_ != sizeof (int)) { +            errno = EINVAL; +            return -1; +        } +        filter = *((int*) optval_); +        return 0; +      case XS_SURVEY_TIMEOUT:          if (type != XS_SURVEYOR) {              errno = ENOTSUP; @@ -457,21 +452,12 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)          *optvallen_ = sizeof (int);          return 0; -    case XS_PATTERN_VERSION: -        if (*optvallen_ < sizeof (int)) { -            errno = EINVAL; -            return -1; -        } -        *((int*) optval_) = sp_version; -        *optvallen_ = sizeof (int); -        return 0; - -    case XS_SERVICE_ID: +    case XS_PROTOCOL:          if (*optvallen_ < sizeof (int)) {              errno = EINVAL;              return -1;          } -        *((int*) optval_) = sp_service; +        *((int*) optval_) = protocol;          *optvallen_ = sizeof (int);          return 0; diff --git a/src/options.hpp b/src/options.hpp index 9070c0f..805f793 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -92,16 +92,8 @@ namespace xs          //  If 1, keepalives are to be sent periodically.          int keepalive; -        //  If true, the legacy non-SP wire protocol is in use. -        bool legacy_protocol; - -        //  SP protocol service id, pattern, version, role and complementary -        //  role. -        int sp_service; -        int sp_pattern; -        int sp_version; -        int sp_role; -        int sp_complement; +        //  Version of wire protocol to use. +        int protocol;          //  Filter ID to be used with subscriptions and unsubscriptions.          int filter; diff --git a/src/pair.cpp b/src/pair.cpp index bfb48d3..531bfc5 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -23,17 +23,12 @@  #include "err.hpp"  #include "pipe.hpp"  #include "msg.hpp" -#include "wire.hpp"  xs::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_, int sid_) :      socket_base_t (parent_, tid_, sid_),      pipe (NULL)  {      options.type = XS_PAIR; -    options.sp_pattern = SP_PAIR; -    options.sp_role = SP_PAIR_PAIR; -    options.sp_version = 3; -    options.sp_complement = SP_PAIR_PAIR;  }  xs::pair_t::~pair_t () @@ -41,34 +36,6 @@ xs::pair_t::~pair_t ()      xs_assert (!pipe);  } -int xs::pair_t::xsetsockopt (int option_, const void *optval_, -    size_t optvallen_) -{ -    if (option_ != XS_PATTERN_VERSION) { -        errno = EINVAL; -        return -1; -    } - -    if (optvallen_ != sizeof (int)) { -        errno = EINVAL; -        return -1; -    } - -    if (!optval_) { -        errno = EFAULT; -        return -1; -    } - -    int version = *(int *) optval_; -    if (version != 2) { -        errno = EINVAL; -        return -1; -    } - -    options.sp_version = version; -    return 0; -} -  void xs::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)  {      xs_assert (pipe_ != NULL); diff --git a/src/pair.hpp b/src/pair.hpp index 6a5f167..07ed6c5 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -42,7 +42,6 @@ namespace xs          ~pair_t ();          //  Overloads of functions from socket_base_t. -        int xsetsockopt (int option_, const void *optval_, size_t optvallen_);          void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);          int xsend (xs::msg_t *msg_, int flags_);          int xrecv (xs::msg_t *msg_, int flags_); diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 9aa1233..371c657 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -47,11 +47,6 @@ xs::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,      pending_bytes (0),      rx_timer (NULL)  { -    //  If not using a legacy protocol, fill in desired protocol header. -    if (!options.legacy_protocol) { -        sp_get_header (desired_header, options.sp_service, options.sp_pattern, -            options.sp_version, options.sp_complement); -    }  }  xs::pgm_receiver_t::~pgm_receiver_t () @@ -178,7 +173,7 @@ void xs::pgm_receiver_t::in_event (fd_t fd_)          data = (unsigned char*) tmp;          //  No data to process. This may happen if the packet received is -        //  neither RDATA nor ODATA. +        //  neither ODATA nor ODATA.          if (received == 0) {              if (errno == ENOMEM || errno == EBUSY) {                  const long timeout = pgm_socket.get_rx_timeout (); @@ -205,32 +200,18 @@ void xs::pgm_receiver_t::in_event (fd_t fd_)              break;          } -        //  Check protocol header. -        if (unlikely (!options.legacy_protocol)) { -            if (received < (ssize_t) sizeof desired_header) -                //  Ignore malformed datagram. -                continue; -            if (memcmp (data, desired_header, sizeof desired_header) != 0) -                //  Ignore datagram with incorrect protocol header. -                continue; -            data += sizeof desired_header; -            received -= sizeof desired_header; +        //  New peer. Add it to the list of know but unjoint peers. +        if (it == peers.end ()) { +            peer_info_t peer_info = {false, NULL}; +            it = peers.insert (peers_t::value_type (*tsi, peer_info)).first;          }          //  Read the offset of the fist message in the current packet. -        if (received < (ssize_t) sizeof (uint16_t)) -            //  Ignore malformed datagram. -            continue; +        xs_assert ((size_t) received >= sizeof (uint16_t));          uint16_t offset = get_uint16 (data);          data += sizeof (uint16_t);          received -= sizeof (uint16_t); -        //  New peer. Add it to the list of known but disjoint peers. -        if (it == peers.end ()) { -            peer_info_t peer_info = {false, NULL}; -            it = peers.insert (peers_t::value_type (*tsi, peer_info)).first; -        } -          //  Join the stream if needed.          if (!it->second.joined) { diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index c44594a..38c7a7b 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -39,7 +39,6 @@  #include "options.hpp"  #include "decoder.hpp"  #include "pgm_socket.hpp" -#include "wire.hpp"  namespace xs  { @@ -126,9 +125,6 @@ namespace xs          //  Receive timer, if active, otherwise NULL.          handle_t rx_timer; -        //  Desired protocol header. -        sp_header_t desired_header; -          pgm_receiver_t (const pgm_receiver_t&);          const pgm_receiver_t &operator = (const pgm_receiver_t&);      }; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index d8d77b9..9c2a961 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -60,25 +60,6 @@ int xs::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)      out_buffer_size = pgm_socket.get_max_tsdu_size ();      out_buffer = (unsigned char*) malloc (out_buffer_size);      alloc_assert (out_buffer); -    encode_buffer = out_buffer; -    encode_buffer_size = out_buffer_size; -    header_size = 0; - -    //  If not using a legacy protocol, fill in our datagram header and reserve -    //  space for it in the datagram. -    if (!options.legacy_protocol) { -        sp_get_header (out_buffer, options.sp_service, options.sp_pattern, -            options.sp_version, options.sp_role); -        encode_buffer += SP_HEADER_LENGTH; -        encode_buffer_size -= SP_HEADER_LENGTH; -        header_size += SP_HEADER_LENGTH; -    } - -    //  Reserve space in the datagram for the offset of the first message. -    offset_p = encode_buffer; -    encode_buffer += sizeof (uint16_t); -    encode_buffer_size -= sizeof (uint16_t); -    header_size += sizeof (uint16_t);      return rc;  } @@ -175,25 +156,27 @@ void xs::pgm_sender_t::in_event (fd_t fd_)  void xs::pgm_sender_t::out_event (fd_t fd_)  { -    //  POLLOUT event from send socket. If write buffer is empty (which means -    //  that the last write succeeded), try to read new data from the encoder. +    //  POLLOUT event from send socket. If write buffer is empty,  +    //  try to read new data from the encoder.      if (write_size == 0) { -        //  Pass our own buffer to the get_data () function to prevent it from -        //  returning its own buffer. +        //  First two bytes (sizeof uint16_t) are used to store message  +        //  offset in following steps. Note that by passing our buffer to +        //  the get data function we prevent it from returning its own buffer. +        unsigned char *bf = out_buffer + sizeof (uint16_t); +        size_t bfsz = out_buffer_size - sizeof (uint16_t);          int offset = -1; -        size_t data_size = encode_buffer_size; -        encoder.get_data (&encode_buffer, &data_size, &offset); +        encoder.get_data (&bf, &bfsz, &offset);          //  If there are no data to write stop polling for output. -        if (!data_size) { +        if (!bfsz) {              reset_pollout (handle);              return;          }          //  Put offset information in the buffer. -        write_size = header_size + data_size; -        put_uint16 (offset_p, offset == -1 ? 0xffff : (uint16_t) offset); +        write_size = bfsz + sizeof (uint16_t); +        put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);      }      if (tx_timer) { diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 5a808b6..89f81bf 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -84,21 +84,12 @@ namespace xs          handle_t rdata_notify_handle;          handle_t pending_notify_handle; -        //  Output buffer and size for pgm_socket. +        //  Output buffer from pgm_socket.          unsigned char *out_buffer; +         +        //  Output buffer size.          size_t out_buffer_size; -        //  Size of header in each datagram. -        size_t header_size; - -        //  Encoder buffer and size, adjusted from output buffer by size of -        //  datagram header(s). -        unsigned char *encode_buffer; -        size_t encode_buffer_size; - -        //  Position of offset to first message in output buffer. -        unsigned char *offset_p; -          //  Number of bytes in the buffer to be written to the socket.          //  If zero, there are no data to be sent.          size_t write_size; diff --git a/src/pipe.cpp b/src/pipe.cpp index c14642f..0a15cc0 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -27,7 +27,7 @@  #include "err.hpp"  int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], -    int hwms_ [2], bool delays_ [2], int sp_version_) +    int hwms_ [2], bool delays_ [2], int protocol_)  {      //   Creates two pipe objects. These objects are connected by two ypipes,      //   each to pass messages in one direction. @@ -38,10 +38,10 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],      alloc_assert (upipe2);      pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, -        hwms_ [1], hwms_ [0], delays_ [0], sp_version_); +        hwms_ [1], hwms_ [0], delays_ [0], protocol_);      alloc_assert (pipes_ [0]);      pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, -        hwms_ [0], hwms_ [1], delays_ [1], sp_version_); +        hwms_ [0], hwms_ [1], delays_ [1], protocol_);      alloc_assert (pipes_ [1]);      pipes_ [0]->set_peer (pipes_ [1]); @@ -51,7 +51,7 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],  }  xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, -      int inhwm_, int outhwm_, bool delay_, int sp_version_) : +      int inhwm_, int outhwm_, bool delay_, int protocol_) :      object_t (parent_),      inpipe (inpipe_),      outpipe (outpipe_), @@ -66,7 +66,7 @@ xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,      sink (NULL),      state (active),      delay (delay_), -    sp_version (sp_version_) +    protocol (protocol_)  {  } @@ -386,9 +386,9 @@ void xs::pipe_t::terminate (bool delay_)      }  } -int xs::pipe_t::get_sp_version () +int xs::pipe_t::get_protocol ()  { -    return sp_version; +    return protocol;  }  bool xs::pipe_t::is_delimiter (msg_t &msg_) diff --git a/src/pipe.hpp b/src/pipe.hpp index dbc6b7e..c298154 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -44,7 +44,7 @@ namespace xs      //  pipe receives all the pending messages before terminating, otherwise it      //  terminates straight away.      int pipepair (xs::object_t *parents_ [2], xs::pipe_t* pipes_ [2], -        int hwms_ [2], bool delays_ [2], int sp_version_); +        int hwms_ [2], bool delays_ [2], int protocol_);      struct i_pipe_events      { @@ -69,7 +69,7 @@ namespace xs          //  This allows pipepair to create pipe objects.          friend int pipepair (xs::object_t *parents_ [2],              xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2], -            int sp_version_); +            int protocol_);      public: @@ -111,8 +111,8 @@ namespace xs          //  before actual shutdown.          void terminate (bool delay_); -        //  Returns the SP pattern version in use on this pipe. -        int get_sp_version (); +        //  Returns the ID of the protocol associated with the pipe. +        int get_protocol ();      private: @@ -132,7 +132,7 @@ namespace xs          //  Constructor is private. Pipe can only be created using          //  pipepair function.          pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, -            int inhwm_, int outhwm_, bool delay_, int sp_version_); +            int inhwm_, int outhwm_, bool delay_, int protocol_);          //  Pipepair uses this function to let us know about          //  the peer pipe object. @@ -192,9 +192,9 @@ namespace xs          //  asks us to.          bool delay; -        //  SP pattern version in use on this pipe. This value is used by the -        //  pattern classes using the pipe, not the pipe itself. -        int sp_version; +        //  ID of the protocol to use. This value is not used by the pipe +        //  itself, rather it's used by the users of the pipe. +        int protocol;          //  Identity of the writer. Used uniquely by the reader side.          blob_t identity; diff --git a/src/pull.cpp b/src/pull.cpp index 0cedc39..8ae8208 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -23,50 +23,17 @@  #include "err.hpp"  #include "msg.hpp"  #include "pipe.hpp" -#include "wire.hpp"  xs::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_, int sid_) :      socket_base_t (parent_, tid_, sid_)  {      options.type = XS_PULL; -    options.sp_pattern = SP_PIPELINE; -    options.sp_version = 3; -    options.sp_role = SP_PIPELINE_PULL; -    options.sp_complement = SP_PIPELINE_PUSH;  }  xs::pull_t::~pull_t ()  {  } -int xs::pull_t::xsetsockopt (int option_, const void *optval_, -    size_t optvallen_) -{ -    if (option_ != XS_PATTERN_VERSION) { -        errno = EINVAL; -        return -1; -    } - -    if (optvallen_ != sizeof (int)) { -        errno = EINVAL; -        return -1; -    } - -    if (!optval_) { -        errno = EFAULT; -        return -1; -    } - -    int version = *(int *) optval_; -    if (version != 2) { -        errno = EINVAL; -        return -1; -    } - -    options.sp_version = version; -    return 0; -} -  void xs::pull_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)  {      xs_assert (pipe_); diff --git a/src/pull.hpp b/src/pull.hpp index 5453bbd..04da465 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -45,7 +45,6 @@ namespace xs      protected:          //  Overloads of functions from socket_base_t. -        int xsetsockopt (int option_, const void *optval_, size_t optvallen_);          void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);          int xrecv (xs::msg_t *msg_, int flags_);          bool xhas_in (); diff --git a/src/push.cpp b/src/push.cpp index 729c97a..59f508f 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -23,50 +23,17 @@  #include "pipe.hpp"  #include "err.hpp"  #include "msg.hpp" -#include "wire.hpp"  xs::push_t::push_t (class ctx_t *parent_, uint32_t tid_, int sid_) :      socket_base_t (parent_, tid_, sid_)  {      options.type = XS_PUSH; -    options.sp_pattern = SP_PIPELINE; -    options.sp_version = 3; -    options.sp_role = SP_PIPELINE_PUSH; -    options.sp_complement = SP_PIPELINE_PULL;  }  xs::push_t::~push_t ()  {  } -int xs::push_t::xsetsockopt (int option_, const void *optval_, -    size_t optvallen_) -{ -    if (option_ != XS_PATTERN_VERSION) { -        errno = EINVAL; -        return -1; -    } - -    if (optvallen_ != sizeof (int)) { -        errno = EINVAL; -        return -1; -    } - -    if (!optval_) { -        errno = EFAULT; -        return -1; -    } - -    int version = *(int *) optval_; -    if (version != 2) { -        errno = EINVAL; -        return -1; -    } - -    options.sp_version = version; -    return 0; -} -  void xs::push_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)  {      xs_assert (pipe_); diff --git a/src/push.hpp b/src/push.hpp index ffe16cc..a112f31 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -45,7 +45,6 @@ namespace xs      protected:          //  Overloads of functions from socket_base_t. -        int xsetsockopt (int option_, const void *optval_, size_t optvallen_);          void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);          int xsend (xs::msg_t *msg_, int flags_);          bool xhas_out (); diff --git a/src/session_base.cpp b/src/session_base.cpp index 81f7347..49fdce1 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -300,7 +300,7 @@ void xs::session_base_t::process_attach (i_engine *engine_)          pipe_t *pipes [2] = {NULL, NULL};          int hwms [2] = {options.rcvhwm, options.sndhwm};          bool delays [2] = {options.delay_on_close, options.delay_on_disconnect}; -        int rc = pipepair (parents, pipes, hwms, delays, options.sp_version); +        int rc = pipepair (parents, pipes, hwms, delays, options.protocol);          errno_assert (rc == 0);          //  Plug the local end of the pipe. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index b424a5e..9a8103e 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -451,7 +451,7 @@ int xs::socket_base_t::connect (const char *addr_)          pipe_t *ppair [2] = {NULL, NULL};          int hwms [2] = {sndhwm, rcvhwm};          bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; -        rc = pipepair (parents, ppair, hwms, delays, options.sp_version); +        rc = pipepair (parents, ppair, hwms, delays, options.protocol);          errno_assert (rc == 0);          //  Attach local end of the pipe to this socket object. @@ -535,7 +535,7 @@ int xs::socket_base_t::connect (const char *addr_)      pipe_t *ppair [2] = {NULL, NULL};      int hwms [2] = {options.sndhwm, options.rcvhwm};      bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; -    rc = pipepair (parents, ppair, hwms, delays, options.sp_version); +    rc = pipepair (parents, ppair, hwms, delays, options.protocol);      errno_assert (rc == 0);      // PGM does not support subscription forwarding; ask for all data to be diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index b0f9f20..dc6d345 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -53,21 +53,8 @@ xs::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :      session (NULL),      leftover_session (NULL),      options (options_), -    plugged (false), -    header_pos (in_header), -    header_remaining (sizeof in_header), -    header_received (false), -    header_sent (false) +    plugged (false)  { -    //  Fill in outgoing SP protocol header and the complementary (desired) -    //  header. -    if (!options.legacy_protocol) { -        sp_get_header (out_header, options.sp_service, options.sp_pattern, -            options.sp_version, options.sp_role); -        sp_get_header (desired_header, options.sp_service, options.sp_pattern, -            options.sp_version, options.sp_complement); -    } -      //  Get the socket into non-blocking mode.      unblock_socket (s); @@ -168,35 +155,6 @@ void xs::stream_engine_t::in_event (fd_t fd_)  {      bool disconnection = false; -    //  If we have not yet received the full protocol header... -    if (unlikely (!options.legacy_protocol && !header_received)) { - -        //  Read remaining header bytes. -        int hbytes = read (header_pos, header_remaining); - -        //  Check whether the peer has closed the connection. -        if (hbytes == -1) { -            error (); -            return; -        } - -        header_remaining -= hbytes; -        header_pos += hbytes; - -        //  If we did not read the whole header, poll for more. -        if (header_remaining) -            return; - -        //  If the protocol headers do not match, close the connection. -        if (memcmp (in_header, desired_header, sizeof in_header) != 0) { -            error (); -            return; -        } - -        //  Done with protocol header; proceed to read data. -        header_received = true; -    } -      //  If there's no data to process in the buffer...      if (!insize) { @@ -252,20 +210,6 @@ void xs::stream_engine_t::out_event (fd_t fd_)  {      bool more_data = true; -    //  If protocol header was not yet sent... -    if (unlikely (!options.legacy_protocol && !header_sent)) { -        int hbytes = write (out_header, sizeof out_header); - -        //  It should always be possible to write the full protocol header to a -        //  freshly connected TCP socket. Therefore, if we get an error or -        //  partial write here the peer has disconnected. -        if (hbytes != sizeof out_header) { -            error (); -            return; -        } -        header_sent = true; -    } -      //  If write buffer is empty, try to read new data from the encoder.      if (!outsize) { diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 70c9ddb..46a6080 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -30,7 +30,6 @@  #include "encoder.hpp"  #include "decoder.hpp"  #include "options.hpp" -#include "wire.hpp"  namespace xs  { @@ -99,22 +98,6 @@ namespace xs          bool plugged; -        //  Outgoing protocol header. -        sp_header_t out_header; - -        //  Desired protocol header. -        sp_header_t desired_header; - -        //  Incoming protocol header. -        sp_header_t in_header; - -        unsigned char *header_pos; -        size_t header_remaining; - -        //  Protocol header has been received/sent. -        bool header_received; -        bool header_sent; -          stream_engine_t (const stream_engine_t&);          const stream_engine_t &operator = (const stream_engine_t&);      }; diff --git a/src/sub.cpp b/src/sub.cpp index 79cb63c..d29ae8d 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -48,7 +48,8 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_,      size_t optvallen_)  {      if (option_ != XS_SUBSCRIBE && option_ != XS_UNSUBSCRIBE) { -        return xsub_t::xsetsockopt (option_, optval_, optvallen_); +        errno = EINVAL; +        return -1;      }      if (optvallen_ > 0 && !optval_) { @@ -201,7 +202,7 @@ int xs::sub_t::filter_subscribed (const unsigned char *data_, size_t size_)      int rc = msg.init_size (size_ + 4);      errno_assert (rc == 0);      unsigned char *data = (unsigned char*) msg.data (); -    put_uint16 (data, SP_PUBSUB_CMD_SUBSCRIBE); +    put_uint16 (data, XS_CMD_SUBSCRIBE);      put_uint16 (data + 2, options.filter);      memcpy (data + 4, data_, size_); @@ -224,7 +225,7 @@ int xs::sub_t::filter_unsubscribed (const unsigned char *data_, size_t size_)      int rc = msg.init_size (size_ + 4);      errno_assert (rc == 0);      unsigned char *data = (unsigned char*) msg.data (); -    put_uint16 (data, SP_PUBSUB_CMD_UNSUBSCRIBE); +    put_uint16 (data, XS_CMD_UNSUBSCRIBE);      put_uint16 (data + 2, options.filter);      memcpy (data + 4, data_, size_); diff --git a/src/wire.hpp b/src/wire.hpp index 014021b..f840fce 100644 --- a/src/wire.hpp +++ b/src/wire.hpp @@ -24,52 +24,11 @@  #include "stdint.hpp"  < | 
