diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/decoder.cpp | 2 | ||||
-rw-r--r-- | src/options.cpp | 25 | ||||
-rw-r--r-- | src/options.hpp | 3 | ||||
-rw-r--r-- | src/pipe.cpp | 19 | ||||
-rw-r--r-- | src/pipe.hpp | 14 | ||||
-rw-r--r-- | src/session_base.cpp | 2 | ||||
-rw-r--r-- | src/socket_base.cpp | 4 | ||||
-rw-r--r-- | src/xpub.cpp | 8 | ||||
-rw-r--r-- | src/xsub.cpp | 18 |
9 files changed, 74 insertions, 21 deletions
diff --git a/src/decoder.cpp b/src/decoder.cpp index 23546f1..dc8e54e 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -126,7 +126,7 @@ bool xs::decoder_t::eight_byte_size_ready () bool xs::decoder_t::flags_ready () { // Store the flags from the wire into the message structure. - in_progress.set_flags (tmpbuf [0]); + in_progress.set_flags (tmpbuf [0] & 0x01); next_step (in_progress.data (), in_progress.size (), &decoder_t::message_ready); diff --git a/src/options.cpp b/src/options.cpp index 07d3752..c9cbaae 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -46,6 +46,7 @@ xs::options_t::options_t () : sndtimeo (-1), ipv4only (1), keepalive (0), + protocol (0), delay_on_close (true), delay_on_disconnect (true), filter (false), @@ -232,6 +233,21 @@ int xs::options_t::setsockopt (int option_, const void *optval_, return 0; } + case XS_PROTOCOL: + { + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + int val = *((int*) optval_); + if (val < 0) { + errno = EINVAL; + return -1; + } + protocol = val; + return 0; + } + } errno = EINVAL; @@ -413,6 +429,15 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (int); return 0; + case XS_PROTOCOL: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = protocol; + *optvallen_ = sizeof (int); + return 0; + } errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index 3e47336..c1e4dda 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -92,6 +92,9 @@ namespace xs // If 1, keepalives are to be sent periodically. int keepalive; + // Version of wire protocol to use. + int protocol; + // If true, session reads all the pending messages from the pipe and // sends them to the network when socket is closed. bool delay_on_close; diff --git a/src/pipe.cpp b/src/pipe.cpp index 76df1b0..51ecedc 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -27,7 +27,7 @@ #include "err.hpp" int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2]) + int hwms_ [2], bool delays_ [2], int protocol_) { // Creates two pipe objects. These objects are connected by two ypipes, // each to pass messages in one direction. @@ -38,10 +38,10 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], alloc_assert (upipe2); pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, - hwms_ [1], hwms_ [0], delays_ [0]); + hwms_ [1], hwms_ [0], delays_ [0], protocol_); alloc_assert (pipes_ [0]); pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, - hwms_ [0], hwms_ [1], delays_ [1]); + hwms_ [0], hwms_ [1], delays_ [1], protocol_); alloc_assert (pipes_ [1]); pipes_ [0]->set_peer (pipes_ [1]); @@ -51,7 +51,7 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], } xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_) : + int inhwm_, int outhwm_, bool delay_, int protocol_) : object_t (parent_), inpipe (inpipe_), outpipe (outpipe_), @@ -65,7 +65,8 @@ xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, peer (NULL), sink (NULL), state (active), - delay (delay_) + delay (delay_), + protocol (protocol_) { } @@ -375,7 +376,8 @@ void xs::pipe_t::terminate (bool delay_) rollback (); // Push delimiter into the outbound pipe. Note that watermarks are not - // checked thus the delimiter can be written even though the pipe is full. + // checked thus the delimiter can be written even though the pipe + // is full. msg_t msg; msg.init_delimiter (); outpipe->write (msg, false); @@ -383,6 +385,11 @@ void xs::pipe_t::terminate (bool delay_) } } +int xs::pipe_t::get_protocol () +{ + return protocol; +} + bool xs::pipe_t::is_delimiter (msg_t &msg_) { return msg_.is_delimiter (); diff --git a/src/pipe.hpp b/src/pipe.hpp index bce2c04..c298154 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -44,7 +44,7 @@ namespace xs // pipe receives all the pending messages before terminating, otherwise it // terminates straight away. int pipepair (xs::object_t *parents_ [2], xs::pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2]); + int hwms_ [2], bool delays_ [2], int protocol_); struct i_pipe_events { @@ -68,7 +68,8 @@ namespace xs { // This allows pipepair to create pipe objects. friend int pipepair (xs::object_t *parents_ [2], - xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]); + xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2], + int protocol_); public: @@ -110,6 +111,9 @@ namespace xs // before actual shutdown. void terminate (bool delay_); + // Returns the ID of the protocol associated with the pipe. + int get_protocol (); + private: // Type of the underlying lock-free pipe. @@ -128,7 +132,7 @@ namespace xs // Constructor is private. Pipe can only be created using // pipepair function. pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_); + int inhwm_, int outhwm_, bool delay_, int protocol_); // Pipepair uses this function to let us know about // the peer pipe object. @@ -188,6 +192,10 @@ namespace xs // asks us to. bool delay; + // ID of the protocol to use. This value is not used by the pipe + // itself, rather it's used by the users of the pipe. + int protocol; + // Identity of the writer. Used uniquely by the reader side. blob_t identity; diff --git a/src/session_base.cpp b/src/session_base.cpp index b752f9f..0c9428b 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -278,7 +278,7 @@ void xs::session_base_t::process_attach (i_engine *engine_) pipe_t *pipes [2] = {NULL, NULL}; int hwms [2] = {options.rcvhwm, options.sndhwm}; bool delays [2] = {options.delay_on_close, options.delay_on_disconnect}; - int rc = pipepair (parents, pipes, hwms, delays); + int rc = pipepair (parents, pipes, hwms, delays, options.protocol); errno_assert (rc == 0); // Plug the local end of the pipe. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 29961ab..9d33348 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -410,7 +410,7 @@ int xs::socket_base_t::connect (const char *addr_) pipe_t *pipes [2] = {NULL, NULL}; int hwms [2] = {sndhwm, rcvhwm}; bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; - int rc = pipepair (parents, pipes, hwms, delays); + int rc = pipepair (parents, pipes, hwms, delays, options.protocol); errno_assert (rc == 0); // Attach local end of the pipe to this socket object. @@ -452,7 +452,7 @@ int xs::socket_base_t::connect (const char *addr_) pipe_t *pipes [2] = {NULL, NULL}; int hwms [2] = {options.sndhwm, options.rcvhwm}; bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; - rc = pipepair (parents, pipes, hwms, delays); + rc = pipepair (parents, pipes, hwms, delays, options.protocol); errno_assert (rc == 0); // PGM does not support subscription forwarding; ask for all data to be diff --git a/src/xpub.cpp b/src/xpub.cpp index 29dd079..255c063 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -42,9 +42,11 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) xs_assert (pipe_); dist.attach (pipe_); - // If icanhasall_ is specified, the caller would like to subscribe - // to all data on this pipe, implicitly. - if (icanhasall_) + // If icanhasall_ is specified, the caller would like to subscribe + // to all data on this pipe, implicitly. Also, if we are using + // 0MQ/2.1-style protocol, there's no subscription forwarding. Thus, + // we need to subscribe for all messages automatically. + if (icanhasall_ || pipe_->get_protocol () == 1) subscriptions.add (NULL, 0, pipe_); // The pipe is active when attached. Let's read the subscriptions from diff --git a/src/xsub.cpp b/src/xsub.cpp index add5ba9..af6789f 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -52,7 +52,11 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); fq.attach (pipe_); - dist.attach (pipe_); + + // Pipes with 0MQ/2.1-style protocol are not eligible for accepting + // subscriptions. + if (pipe_->get_protocol () != 1) + dist.attach (pipe_); // Send all the cached subscriptions to the new upstream peer. subscriptions.apply (send_subscription, pipe_); @@ -72,14 +76,18 @@ void xs::xsub_t::xwrite_activated (pipe_t *pipe_) void xs::xsub_t::xterminated (pipe_t *pipe_) { fq.terminated (pipe_); - dist.terminated (pipe_); + if (pipe_->get_protocol () != 1) + dist.terminated (pipe_); } void xs::xsub_t::xhiccuped (pipe_t *pipe_) { - // Send all the cached subscriptions to the hiccuped pipe. - subscriptions.apply (send_subscription, pipe_); - pipe_->flush (); + if (pipe_->get_protocol () != 1) { + + // Send all the cached subscriptions to the hiccuped pipe. + subscriptions.apply (send_subscription, pipe_); + pipe_->flush (); + } } int xs::xsub_t::xsend (msg_t *msg_, int flags_) |