diff options
| author | Martin Lucina <martin@lucina.net> | 2012-04-27 14:18:08 +0200 | 
|---|---|---|
| committer | Martin Sustrik <sustrik@250bpm.com> | 2012-04-29 09:02:14 +0200 | 
| commit | 512f3a604924fec9d89e2b4bfd6f73aa66309fa7 (patch) | |
| tree | 563b8ec0bb0babc8093f39d2ed52d2ae308335ba /src | |
| parent | a84a77a4861c8fc1b0b6d3ec0931e83395cb34b5 (diff) | |
Implement protocol versioning (except PGM)
Implements SP protocol versioning, legacy protocol support, and the
following pattern protocol versions:
PAIR:     v2
PUBSUB:   v1 (legacy), v3
REQREP:   v1
PIPELINE: v2
SURVEY:   v1
Engine support is only for stream_engine_t at this stage.
Signed-off-by: Martin Lucina <martin@lucina.net>
Diffstat (limited to 'src')
| -rw-r--r-- | src/options.cpp | 23 | ||||
| -rw-r--r-- | src/options.hpp | 10 | ||||
| -rw-r--r-- | src/pair.cpp | 33 | ||||
| -rw-r--r-- | src/pair.hpp | 1 | ||||
| -rw-r--r-- | src/pipe.cpp | 14 | ||||
| -rw-r--r-- | src/pipe.hpp | 16 | ||||
| -rw-r--r-- | src/pull.cpp | 33 | ||||
| -rw-r--r-- | src/pull.hpp | 1 | ||||
| -rw-r--r-- | src/push.cpp | 33 | ||||
| -rw-r--r-- | src/push.hpp | 1 | ||||
| -rw-r--r-- | src/session_base.cpp | 2 | ||||
| -rw-r--r-- | src/socket_base.cpp | 4 | ||||
| -rw-r--r-- | src/stream_engine.cpp | 58 | ||||
| -rw-r--r-- | src/stream_engine.hpp | 17 | ||||
| -rw-r--r-- | src/sub.cpp | 7 | ||||
| -rw-r--r-- | src/wire.hpp | 43 | ||||
| -rw-r--r-- | src/xpub.cpp | 49 | ||||
| -rw-r--r-- | src/xpub.hpp | 1 | ||||
| -rw-r--r-- | src/xrep.cpp | 32 | ||||
| -rw-r--r-- | src/xrep.hpp | 1 | ||||
| -rw-r--r-- | src/xreq.cpp | 33 | ||||
| -rw-r--r-- | src/xreq.hpp | 1 | ||||
| -rw-r--r-- | src/xrespondent.cpp | 32 | ||||
| -rw-r--r-- | src/xrespondent.hpp | 1 | ||||
| -rw-r--r-- | src/xsub.cpp | 55 | ||||
| -rw-r--r-- | src/xsub.hpp | 1 | ||||
| -rw-r--r-- | src/xsurveyor.cpp | 33 | ||||
| -rw-r--r-- | src/xsurveyor.hpp | 1 | 
28 files changed, 481 insertions, 55 deletions
| diff --git a/src/options.cpp b/src/options.cpp index f7bbdc4..d362c18 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -48,7 +48,11 @@ xs::options_t::options_t () :      sndtimeo (-1),      ipv4only (1),      keepalive (0), -    protocol (0), +    legacy_protocol (false), +    sp_pattern (-1), +    sp_version (-1), +    sp_role (-1), +    sp_complement (-1),      filter (XS_FILTER_PREFIX),      survey_timeout (-1),      delay_on_close (true), @@ -236,21 +240,6 @@ int xs::options_t::setsockopt (int option_, const void *optval_,              return 0;          } -    case XS_PROTOCOL: -        { -            if (optvallen_ != sizeof (int)) { -                errno = EINVAL; -                return -1; -            } -            int val = *((int*) optval_); -            if (val < 0) { -                errno = EINVAL; -                return -1; -            } -            protocol = val; -            return 0; -        } -      case XS_FILTER:          if (optvallen_ != sizeof (int)) {              errno = EINVAL; @@ -457,7 +446,7 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)              errno = EINVAL;              return -1;          } -        *((int*) optval_) = protocol; +        *((int*) optval_) = sp_version;          *optvallen_ = sizeof (int);          return 0; diff --git a/src/options.hpp b/src/options.hpp index 805f793..89d6d2a 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -92,8 +92,14 @@ namespace xs          //  If 1, keepalives are to be sent periodically.          int keepalive; -        //  Version of wire protocol to use. -        int protocol; +        //  If true, the legacy non-SP wire protocol is in use. +        bool legacy_protocol; + +        //  SP protocol pattern, version, role and complementary role. +        int sp_pattern; +        int sp_version; +        int sp_role; +        int sp_complement;          //  Filter ID to be used with subscriptions and unsubscriptions.          int filter; diff --git a/src/pair.cpp b/src/pair.cpp index b4cb0b4..d4f699e 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -23,12 +23,17 @@  #include "err.hpp"  #include "pipe.hpp"  #include "msg.hpp" +#include "wire.hpp"  xs::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_, int sid_) :      socket_base_t (parent_, tid_, sid_),      pipe (NULL)  {      options.type = XS_PAIR; +    options.sp_pattern = SP_PAIR; +    options.sp_role = SP_PAIR_PAIR; +    options.sp_version = 2; +    options.sp_complement = SP_PAIR_PAIR;  }  xs::pair_t::~pair_t () @@ -36,6 +41,34 @@ xs::pair_t::~pair_t ()      xs_assert (!pipe);  } +int xs::pair_t::xsetsockopt (int option_, const void *optval_, +    size_t optvallen_) +{ +    if (option_ != XS_PROTOCOL) { +        errno = EINVAL; +        return -1; +    } + +    if (optvallen_ != sizeof (int)) { +        errno = EINVAL; +        return -1; +    } + +    if (!optval_) { +        errno = EFAULT; +        return -1; +    } + +    int version = *(int *) optval_; +    if (version != 2) { +        errno = EINVAL; +        return -1; +    } + +    options.sp_version = version; +    return 0; +} +  void xs::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)  {      xs_assert (!pipe); diff --git a/src/pair.hpp b/src/pair.hpp index 07ed6c5..6a5f167 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -42,6 +42,7 @@ namespace xs          ~pair_t ();          //  Overloads of functions from socket_base_t. +        int xsetsockopt (int option_, const void *optval_, size_t optvallen_);          void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);          int xsend (xs::msg_t *msg_, int flags_);          int xrecv (xs::msg_t *msg_, int flags_); diff --git a/src/pipe.cpp b/src/pipe.cpp index 0a15cc0..c14642f 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -27,7 +27,7 @@  #include "err.hpp"  int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], -    int hwms_ [2], bool delays_ [2], int protocol_) +    int hwms_ [2], bool delays_ [2], int sp_version_)  {      //   Creates two pipe objects. These objects are connected by two ypipes,      //   each to pass messages in one direction. @@ -38,10 +38,10 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],      alloc_assert (upipe2);      pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, -        hwms_ [1], hwms_ [0], delays_ [0], protocol_); +        hwms_ [1], hwms_ [0], delays_ [0], sp_version_);      alloc_assert (pipes_ [0]);      pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, -        hwms_ [0], hwms_ [1], delays_ [1], protocol_); +        hwms_ [0], hwms_ [1], delays_ [1], sp_version_);      alloc_assert (pipes_ [1]);      pipes_ [0]->set_peer (pipes_ [1]); @@ -51,7 +51,7 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],  }  xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, -      int inhwm_, int outhwm_, bool delay_, int protocol_) : +      int inhwm_, int outhwm_, bool delay_, int sp_version_) :      object_t (parent_),      inpipe (inpipe_),      outpipe (outpipe_), @@ -66,7 +66,7 @@ xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,      sink (NULL),      state (active),      delay (delay_), -    protocol (protocol_) +    sp_version (sp_version_)  {  } @@ -386,9 +386,9 @@ void xs::pipe_t::terminate (bool delay_)      }  } -int xs::pipe_t::get_protocol () +int xs::pipe_t::get_sp_version ()  { -    return protocol; +    return sp_version;  }  bool xs::pipe_t::is_delimiter (msg_t &msg_) diff --git a/src/pipe.hpp b/src/pipe.hpp index c298154..dbc6b7e 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -44,7 +44,7 @@ namespace xs      //  pipe receives all the pending messages before terminating, otherwise it      //  terminates straight away.      int pipepair (xs::object_t *parents_ [2], xs::pipe_t* pipes_ [2], -        int hwms_ [2], bool delays_ [2], int protocol_); +        int hwms_ [2], bool delays_ [2], int sp_version_);      struct i_pipe_events      { @@ -69,7 +69,7 @@ namespace xs          //  This allows pipepair to create pipe objects.          friend int pipepair (xs::object_t *parents_ [2],              xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2], -            int protocol_); +            int sp_version_);      public: @@ -111,8 +111,8 @@ namespace xs          //  before actual shutdown.          void terminate (bool delay_); -        //  Returns the ID of the protocol associated with the pipe. -        int get_protocol (); +        //  Returns the SP pattern version in use on this pipe. +        int get_sp_version ();      private: @@ -132,7 +132,7 @@ namespace xs          //  Constructor is private. Pipe can only be created using          //  pipepair function.          pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, -            int inhwm_, int outhwm_, bool delay_, int protocol_); +            int inhwm_, int outhwm_, bool delay_, int sp_version_);          //  Pipepair uses this function to let us know about          //  the peer pipe object. @@ -192,9 +192,9 @@ namespace xs          //  asks us to.          bool delay; -        //  ID of the protocol to use. This value is not used by the pipe -        //  itself, rather it's used by the users of the pipe. -        int protocol; +        //  SP pattern version in use on this pipe. This value is used by the +        //  pattern classes using the pipe, not the pipe itself. +        int sp_version;          //  Identity of the writer. Used uniquely by the reader side.          blob_t identity; diff --git a/src/pull.cpp b/src/pull.cpp index 8ae8208..4d08edb 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -23,17 +23,50 @@  #include "err.hpp"  #include "msg.hpp"  #include "pipe.hpp" +#include "wire.hpp"  xs::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_, int sid_) :      socket_base_t (parent_, tid_, sid_)  {      options.type = XS_PULL; +    options.sp_pattern = SP_PIPELINE; +    options.sp_version = 2; +    options.sp_role = SP_PIPELINE_PULL; +    options.sp_complement = SP_PIPELINE_PUSH;  }  xs::pull_t::~pull_t ()  {  } +int xs::pull_t::xsetsockopt (int option_, const void *optval_, +    size_t optvallen_) +{ +    if (option_ != XS_PROTOCOL) { +        errno = EINVAL; +        return -1; +    } + +    if (optvallen_ != sizeof (int)) { +        errno = EINVAL; +        return -1; +    } + +    if (!optval_) { +        errno = EFAULT; +        return -1; +    } + +    int version = *(int *) optval_; +    if (version != 2) { +        errno = EINVAL; +        return -1; +    } + +    options.sp_version = version; +    return 0; +} +  void xs::pull_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)  {      xs_assert (pipe_); diff --git a/src/pull.hpp b/src/pull.hpp index 04da465..5453bbd 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -45,6 +45,7 @@ namespace xs      protected:          //  Overloads of functions from socket_base_t. +        int xsetsockopt (int option_, const void *optval_, size_t optvallen_);          void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);          int xrecv (xs::msg_t *msg_, int flags_);          bool xhas_in (); diff --git a/src/push.cpp b/src/push.cpp index 59f508f..94f8381 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -23,17 +23,50 @@  #include "pipe.hpp"  #include "err.hpp"  #include "msg.hpp" +#include "wire.hpp"  xs::push_t::push_t (class ctx_t *parent_, uint32_t tid_, int sid_) :      socket_base_t (parent_, tid_, sid_)  {      options.type = XS_PUSH; +    options.sp_pattern = SP_PIPELINE; +    options.sp_version = 2; +    options.sp_role = SP_PIPELINE_PUSH; +    options.sp_complement = SP_PIPELINE_PULL;  }  xs::push_t::~push_t ()  {  } +int xs::push_t::xsetsockopt (int option_, const void *optval_, +    size_t optvallen_) +{ +    if (option_ != XS_PROTOCOL) { +        errno = EINVAL; +        return -1; +    } + +    if (optvallen_ != sizeof (int)) { +        errno = EINVAL; +        return -1; +    } + +    if (!optval_) { +        errno = EFAULT; +        return -1; +    } + +    int version = *(int *) optval_; +    if (version != 2) { +        errno = EINVAL; +        return -1; +    } + +    options.sp_version = version; +    return 0; +} +  void xs::push_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)  {      xs_assert (pipe_); diff --git a/src/push.hpp b/src/push.hpp index a112f31..ffe16cc 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -45,6 +45,7 @@ namespace xs      protected:          //  Overloads of functions from socket_base_t. +        int xsetsockopt (int option_, const void *optval_, size_t optvallen_);          void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);          int xsend (xs::msg_t *msg_, int flags_);          bool xhas_out (); diff --git a/src/session_base.cpp b/src/session_base.cpp index 35c4a4e..9b58b42 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -298,7 +298,7 @@ void xs::session_base_t::process_attach (i_engine *engine_)          pipe_t *pipes [2] = {NULL, NULL};          int hwms [2] = {options.rcvhwm, options.sndhwm};          bool delays [2] = {options.delay_on_close, options.delay_on_disconnect}; -        int rc = pipepair (parents, pipes, hwms, delays, options.protocol); +        int rc = pipepair (parents, pipes, hwms, delays, options.sp_version);          errno_assert (rc == 0);          //  Plug the local end of the pipe. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index ec5c8bd..e8ae28a 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -447,7 +447,7 @@ int xs::socket_base_t::connect (const char *addr_)          pipe_t *ppair [2] = {NULL, NULL};          int hwms [2] = {sndhwm, rcvhwm};          bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; -        rc = pipepair (parents, ppair, hwms, delays, options.protocol); +        rc = pipepair (parents, ppair, hwms, delays, options.sp_version);          errno_assert (rc == 0);          //  Attach local end of the pipe to this socket object. @@ -501,7 +501,7 @@ int xs::socket_base_t::connect (const char *addr_)      pipe_t *ppair [2] = {NULL, NULL};      int hwms [2] = {options.sndhwm, options.rcvhwm};      bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; -    rc = pipepair (parents, ppair, hwms, delays, options.protocol); +    rc = pipepair (parents, ppair, hwms, delays, options.sp_version);      errno_assert (rc == 0);      // PGM does not support subscription forwarding; ask for all data to be diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 15b566b..a39b410 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -53,8 +53,21 @@ xs::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :      session (NULL),      leftover_session (NULL),      options (options_), -    plugged (false) +    plugged (false), +    header_pos (in_header), +    header_remaining (sizeof in_header), +    header_received (false), +    header_sent (false)  { +    //  Fill in outgoing SP protocol header and the complementary (desired) +    //  header. +    if (!options.legacy_protocol) { +        sp_get_header (out_header, options.sp_pattern, options.sp_version, +            options.sp_role); +        sp_get_header (desired_header, options.sp_pattern, options.sp_version, +            options.sp_complement); +    } +      //  Get the socket into non-blocking mode.      unblock_socket (s); @@ -155,6 +168,35 @@ void xs::stream_engine_t::in_event (fd_t fd_)  {      bool disconnection = false; +    //  If we have not yet received the full protocol header... +    if (unlikely (!options.legacy_protocol && !header_received)) { + +        //  Read remaining header bytes. +        int hbytes = read (header_pos, header_remaining); + +        //  Check whether the peer has closed the connection. +        if (hbytes == -1) { +            error (); +            return; +        } + +        header_remaining -= hbytes; +        header_pos += hbytes; + +        //  If we did not read the whole header, poll for more. +        if (header_remaining) +            return; + +        //  If the protocol headers do not match, close the connection. +        if (memcmp (in_header, desired_header, sizeof in_header) != 0) { +            error (); +            return; +        } + +        //  Done with protocol header; proceed to read data. +        header_received = true; +    } +      //  If there's no data to process in the buffer...      if (!insize) { @@ -210,6 +252,20 @@ void xs::stream_engine_t::out_event (fd_t fd_)  {      bool more_data = true; +    //  If protocol header was not yet sent... +    if (unlikely (!options.legacy_protocol && !header_sent)) { +        int hbytes = write (out_header, sizeof out_header); + +        //  It should always be possible to write the full protocol header to a +        //  freshly connected TCP socket. Therefore, if we get an error or +        //  partial write here the peer has disconnected. +        if (hbytes != sizeof out_header) { +            error (); +            return; +        } +        header_sent = true; +    } +      //  If write buffer is empty, try to read new data from the encoder.      if (!outsize) { diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 46a6080..70c9ddb 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -30,6 +30,7 @@  #include "encoder.hpp"  #include "decoder.hpp"  #include "options.hpp" +#include "wire.hpp"  namespace xs  { @@ -98,6 +99,22 @@ namespace xs          bool plugged; +        //  Outgoing protocol header. +        sp_header_t out_header; + +        //  Desired protocol header. +        sp_header_t desired_header; + +        //  Incoming protocol header. +        sp_header_t in_header; + +        unsigned char *header_pos; +        size_t header_remaining; + +        //  Protocol header has been received/sent. +        bool header_received; +        bool header_sent; +          stream_engine_t (const stream_engine_t&);          const stream_engine_t &operator = (const stream_engine_t&);      }; diff --git a/src/sub.cpp b/src/sub.cpp index d29ae8d..79cb63c 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -48,8 +48,7 @@ int xs::sub_t::xsetsockopt (int option_, const void *optval_,      size_t optvallen_)  {      if (option_ != XS_SUBSCRIBE && option_ != XS_UNSUBSCRIBE) { -        errno = EINVAL; -        return -1; +        return xsub_t::xsetsockopt (option_, optval_, optvallen_);      }      if (optvallen_ > 0 && !optval_) { @@ -202,7 +201,7 @@ int xs::sub_t::filter_subscribed (const unsigned char *data_, size_t size_)      int rc = msg.init_size (size_ + 4);      errno_assert (rc == 0);      unsigned char *data = (unsigned char*) msg.data (); -    put_uint16 (data, XS_CMD_SUBSCRIBE); +    put_uint16 (data, SP_PUBSUB_CMD_SUBSCRIBE);      put_uint16 (data + 2, options.filter);      memcpy (data + 4, data_, size_); @@ -225,7 +224,7 @@ int xs::sub_t::filter_unsubscribed (const unsigned char *data_, size_t size_)      int rc = msg.init_size (size_ + 4);      errno_assert (rc == 0);      unsigned char *data = (unsigned char*) msg.data (); -    put_uint16 (data, XS_CMD_UNSUBSCRIBE); +    put_uint16 (data, SP_PUBSUB_CMD_UNSUBSCRIBE);      put_uint16 (data + 2, options.filter);      memcpy (data + 4, data_, size_); diff --git a/src/wire.hpp b/src/wire.hpp index f840fce..e751818 100644 --- a/src/wire.hpp +++ b/src/wire.hpp @@ -24,11 +24,50 @@  #include "stdint.hpp"  //  Protocol-related constants. -#define XS_CMD_SUBSCRIBE 1 -#define XS_CMD_UNSUBSCRIBE 2 + +//  Protocol header. +#define SP_HEADER_LENGTH 8 + +//  Patterns. +#define SP_PAIR 1 +#define SP_PUBSUB 2 +#define SP_REQREP 3 +#define SP_PIPELINE 4 +#define SP_SURVEY 5 + +//  Roles. +#define SP_PAIR_PAIR 1 +#define SP_PUBSUB_PUB 1 +#define SP_PUBSUB_SUB 2 +#define SP_REQREP_REQ 1 +#define SP_REQREP_REP 2 +#define SP_PIPELINE_PUSH 1 +#define SP_PIPELINE_PULL 2 +#define SP_SURVEY_SURVEYOR 1 +#define SP_SURVEY_RESPONDENT 2 + +//  PUBSUB pattern commands. +#define SP_PUBSUB_CMD_SUBSCRIBE 1 +#define SP_PUBSUB_CMD_UNSUBSCRIBE 2  namespace xs  { +    //  Protocol header type. +    typedef unsigned char sp_header_t [SP_HEADER_LENGTH]; + +    //  Get the SP protocol header for the specified pattern, version and role. +    inline void sp_get_header (sp_header_t header_, int pattern_, int version_, +        int role_) +    { +        header_ [0] = 0; +        header_ [1] = 0; +        header_ [2] = 'S'; +        header_ [3] = 'P'; +        header_ [4] = pattern_ & 0xff; +        header_ [5] = version_ & 0xff; +        header_ [6] = role_ & 0xff; +        header_ [7] = 0; +    }      //  Helper functions to convert different integer types to/from network      //  byte order. diff --git a/src/xpub.cpp b/src/xpub.cpp index fe0b9a7..b176bf8 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -35,6 +35,10 @@ xs::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :      tmp_filter_id (-1)  {      options.type = XS_XPUB; +    options.sp_pattern = SP_PUBSUB; +    options.sp_version = 3; +    options.sp_role = SP_PUBSUB_PUB; +    options.sp_complement = SP_PUBSUB_SUB;  }  xs::xpub_t::~xpub_t () @@ -44,6 +48,42 @@ xs::xpub_t::~xpub_t ()          it->type->pf_destroy ((void*) (core_t*) this, it->instance);  } +int xs::xpub_t::xsetsockopt (int option_, const void *optval_, +    size_t optvallen_) +{ +    if (option_ != XS_PROTOCOL) { +        errno = EINVAL; +        return -1; +    } + +    if (optvallen_ != sizeof (int)) { +        errno = EINVAL; +        return -1; +    } + +    if (!optval_) { +        errno = EFAULT; +        return -1; +    } + +    int version = *(int *) optval_; +    switch (version) { +    case 1: +        options.legacy_protocol = true; +        options.sp_version = 1; +        break; +    case 3: +        options.legacy_protocol = false; +        options.sp_version = 3; +        break; +    default: +        errno = EINVAL; +        return -1; +    } + +    return 0; +} +  void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)  {      xs_assert (pipe_); @@ -53,7 +93,7 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)      //  to all data on this pipe, implicitly. Also, if we are using      //  0MQ/2.1-style protocol, there's no subscription forwarding. Thus,      //  we need to subscribe for all messages automatically. -    if (icanhasall_|| pipe_->get_protocol () == 1) { +    if (icanhasall_|| pipe_->get_sp_version () == 1) {          //  Find the prefix filter.          //  TODO: Change this to ALL filter. @@ -121,7 +161,8 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_)          int filter_id = XS_FILTER_PREFIX;  #endif -        if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) { +        if (cmd != SP_PUBSUB_CMD_SUBSCRIBE && +              cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) {              sub.close ();              return;          } @@ -133,7 +174,7 @@ void xs::xpub_t::xread_activated (pipe_t *pipe_)                  break;          bool unique; -		if (cmd == XS_CMD_UNSUBSCRIBE) { +		if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) {              xs_assert (it != filters.end ());              unique = it->type->pf_unsubscribe ((void*) (core_t*) this,                  it->instance, pipe_, data + 4, size - 4) ? true : false; @@ -258,7 +299,7 @@ int xs::xpub_t::filter_unsubscribed (const unsigned char *data_, size_t size_)  		//  Place the unsubscription to the queue of pending (un)sunscriptions  		//  to be retrived by the user later on.  		blob_t unsub (size_ + 4, 0); -        put_uint16 ((unsigned char*) unsub.data (), XS_CMD_UNSUBSCRIBE); +        put_uint16 ((unsigned char*) unsub.data (), SP_PUBSUB_CMD_UNSUBSCRIBE);          put_uint16 ((unsigned char*) unsub.data () + 2, tmp_filter_id);  		memcpy ((void*) (unsub.data () + 4), data_, size_); diff --git a/src/xpub.hpp b/src/xpub.hpp index c0e47ff..f9cb737 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -49,6 +49,7 @@ namespace xs          ~xpub_t ();          //  Implementations of virtual functions from socket_base_t. +        int xsetsockopt (int option_, const void *optval_, size_t optvallen_);          void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);          int xsend (xs::msg_t *msg_, int flags_);          bool xhas_out (); diff --git a/src/xrep.cpp b/src/xrep.cpp index 007ed27..5e9fc8a 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -36,6 +36,10 @@ xs::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_, int sid_) :      next_peer_id (generate_random ())  {      options.type = XS_XREP; +    options.sp_pattern = SP_REQREP; +    options.sp_version = 1; +    options.sp_role = SP_REQREP_REP; +    options.sp_complement = SP_REQREP_REQ;      //  TODO: Uncomment the following line when XREP will become true XREP      //  rather than generic router socket. @@ -55,6 +59,34 @@ xs::xrep_t::~xrep_t ()      prefetched_msg.close ();  } +int xs::xrep_t::xsetsockopt (int option_, const void *optval_, +    size_t optvallen_) +{ +    if (option_ != XS_PROTOCOL) { +        errno = EINVAL; +        return -1; +    } + +    if (optvallen_ != sizeof (int)) { +        errno = EINVAL; +        return -1; +    } + +    if (!optval_) { +        errno = EFAULT; +        return -1; +    } + +    int version = *(int *) optval_; +    if (version != 1) { +        errno = EINVAL; +        return -1; +    } + +    options.sp_version = version; +    return 0; +} +  void xs::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)  {      xs_assert (pipe_); diff --git a/src/xrep.hpp b/src/xrep.hpp index 8e16a4c..71a7bf4 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -49,6 +49,7 @@ namespace xs          ~xrep_t ();          //  Overloads of functions from socket_base_t. +        int xsetsockopt (int option_, const void *optval_, size_t optvallen_);          void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);          int xsend (msg_t *msg_, int flags_);          int xrecv (msg_t *msg_, int flags_); diff --git a/src/xreq.cpp b/src/xreq.cpp index 1c6af9d..e42c691 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -22,12 +22,17 @@  #include "xreq.hpp"  #include "err.hpp"  #include "msg.hpp" +#include "wire.hpp"  xs::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_, int sid_) :      socket_base_t (parent_, tid_, sid_),      prefetched (false)  {      options.type = XS_XREQ; +    options.sp_pattern = SP_REQREP; +    options.sp_version = 1; +    options.sp_role = SP_REQREP_REQ; +    options.sp_complement = SP_REQREP_REP;      //  TODO: Uncomment the following line when XREQ will become true XREQ      //  rather than generic dealer socket. @@ -46,6 +51,34 @@ xs::xreq_t::~xreq_t ()      prefetched_msg.close ();  } +int xs::xreq_t::xsetsockopt (int option_, const void *optval_, +    size_t optvallen_) +{ +    if (option_ != XS_PROTOCOL) { +        errno = EINVAL; +        return -1; +    } + +    if (optvallen_ != sizeof (int)) { +        errno = EINVAL; +        return -1; +    } + +    if (!optval_) { +        errno = EFAULT; +        return -1; +    } + +    int version = *(int *) optval_; +    if (version != 1) { +        errno = EINVAL; +        return -1; +    } + +    options.sp_version = version; +    return 0; +} +  void xs::xreq_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)  {      xs_assert (pipe_); diff --git a/src/xreq.hpp b/src/xreq.hpp index c4f9baf..9fe6ebb 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -46,6 +46,7 @@ namespace xs      protected:          //  Overloads of functions from socket_base_t. +        int xsetsockopt (int option_, const void *optval_, size_t | 
