summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/decoder.cpp2
-rw-r--r--src/options.cpp25
-rw-r--r--src/options.hpp3
-rw-r--r--src/pipe.cpp19
-rw-r--r--src/pipe.hpp14
-rw-r--r--src/session_base.cpp2
-rw-r--r--src/socket_base.cpp4
-rw-r--r--src/xpub.cpp8
-rw-r--r--src/xsub.cpp18
9 files changed, 74 insertions, 21 deletions
diff --git a/src/decoder.cpp b/src/decoder.cpp
index 23546f1..dc8e54e 100644
--- a/src/decoder.cpp
+++ b/src/decoder.cpp
@@ -126,7 +126,7 @@ bool xs::decoder_t::eight_byte_size_ready ()
bool xs::decoder_t::flags_ready ()
{
// Store the flags from the wire into the message structure.
- in_progress.set_flags (tmpbuf [0]);
+ in_progress.set_flags (tmpbuf [0] & 0x01);
next_step (in_progress.data (), in_progress.size (),
&decoder_t::message_ready);
diff --git a/src/options.cpp b/src/options.cpp
index 07d3752..c9cbaae 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -46,6 +46,7 @@ xs::options_t::options_t () :
sndtimeo (-1),
ipv4only (1),
keepalive (0),
+ protocol (0),
delay_on_close (true),
delay_on_disconnect (true),
filter (false),
@@ -232,6 +233,21 @@ int xs::options_t::setsockopt (int option_, const void *optval_,
return 0;
}
+ case XS_PROTOCOL:
+ {
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ int val = *((int*) optval_);
+ if (val < 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ protocol = val;
+ return 0;
+ }
+
}
errno = EINVAL;
@@ -413,6 +429,15 @@ int xs::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int);
return 0;
+ case XS_PROTOCOL:
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = protocol;
+ *optvallen_ = sizeof (int);
+ return 0;
+
}
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index 3e47336..c1e4dda 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -92,6 +92,9 @@ namespace xs
// If 1, keepalives are to be sent periodically.
int keepalive;
+ // Version of wire protocol to use.
+ int protocol;
+
// If true, session reads all the pending messages from the pipe and
// sends them to the network when socket is closed.
bool delay_on_close;
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 76df1b0..51ecedc 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -27,7 +27,7 @@
#include "err.hpp"
int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
- int hwms_ [2], bool delays_ [2])
+ int hwms_ [2], bool delays_ [2], int protocol_)
{
// Creates two pipe objects. These objects are connected by two ypipes,
// each to pass messages in one direction.
@@ -38,10 +38,10 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
alloc_assert (upipe2);
pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
- hwms_ [1], hwms_ [0], delays_ [0]);
+ hwms_ [1], hwms_ [0], delays_ [0], protocol_);
alloc_assert (pipes_ [0]);
pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
- hwms_ [0], hwms_ [1], delays_ [1]);
+ hwms_ [0], hwms_ [1], delays_ [1], protocol_);
alloc_assert (pipes_ [1]);
pipes_ [0]->set_peer (pipes_ [1]);
@@ -51,7 +51,7 @@ int xs::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
}
xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
- int inhwm_, int outhwm_, bool delay_) :
+ int inhwm_, int outhwm_, bool delay_, int protocol_) :
object_t (parent_),
inpipe (inpipe_),
outpipe (outpipe_),
@@ -65,7 +65,8 @@ xs::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
peer (NULL),
sink (NULL),
state (active),
- delay (delay_)
+ delay (delay_),
+ protocol (protocol_)
{
}
@@ -375,7 +376,8 @@ void xs::pipe_t::terminate (bool delay_)
rollback ();
// Push delimiter into the outbound pipe. Note that watermarks are not
- // checked thus the delimiter can be written even though the pipe is full.
+ // checked thus the delimiter can be written even though the pipe
+ // is full.
msg_t msg;
msg.init_delimiter ();
outpipe->write (msg, false);
@@ -383,6 +385,11 @@ void xs::pipe_t::terminate (bool delay_)
}
}
+int xs::pipe_t::get_protocol ()
+{
+ return protocol;
+}
+
bool xs::pipe_t::is_delimiter (msg_t &msg_)
{
return msg_.is_delimiter ();
diff --git a/src/pipe.hpp b/src/pipe.hpp
index bce2c04..c298154 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -44,7 +44,7 @@ namespace xs
// pipe receives all the pending messages before terminating, otherwise it
// terminates straight away.
int pipepair (xs::object_t *parents_ [2], xs::pipe_t* pipes_ [2],
- int hwms_ [2], bool delays_ [2]);
+ int hwms_ [2], bool delays_ [2], int protocol_);
struct i_pipe_events
{
@@ -68,7 +68,8 @@ namespace xs
{
// This allows pipepair to create pipe objects.
friend int pipepair (xs::object_t *parents_ [2],
- xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]);
+ xs::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2],
+ int protocol_);
public:
@@ -110,6 +111,9 @@ namespace xs
// before actual shutdown.
void terminate (bool delay_);
+ // Returns the ID of the protocol associated with the pipe.
+ int get_protocol ();
+
private:
// Type of the underlying lock-free pipe.
@@ -128,7 +132,7 @@ namespace xs
// Constructor is private. Pipe can only be created using
// pipepair function.
pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
- int inhwm_, int outhwm_, bool delay_);
+ int inhwm_, int outhwm_, bool delay_, int protocol_);
// Pipepair uses this function to let us know about
// the peer pipe object.
@@ -188,6 +192,10 @@ namespace xs
// asks us to.
bool delay;
+ // ID of the protocol to use. This value is not used by the pipe
+ // itself, rather it's used by the users of the pipe.
+ int protocol;
+
// Identity of the writer. Used uniquely by the reader side.
blob_t identity;
diff --git a/src/session_base.cpp b/src/session_base.cpp
index b752f9f..0c9428b 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -278,7 +278,7 @@ void xs::session_base_t::process_attach (i_engine *engine_)
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.rcvhwm, options.sndhwm};
bool delays [2] = {options.delay_on_close, options.delay_on_disconnect};
- int rc = pipepair (parents, pipes, hwms, delays);
+ int rc = pipepair (parents, pipes, hwms, delays, options.protocol);
errno_assert (rc == 0);
// Plug the local end of the pipe.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 29961ab..9d33348 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -410,7 +410,7 @@ int xs::socket_base_t::connect (const char *addr_)
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {sndhwm, rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
- int rc = pipepair (parents, pipes, hwms, delays);
+ int rc = pipepair (parents, pipes, hwms, delays, options.protocol);
errno_assert (rc == 0);
// Attach local end of the pipe to this socket object.
@@ -452,7 +452,7 @@ int xs::socket_base_t::connect (const char *addr_)
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.sndhwm, options.rcvhwm};
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
- rc = pipepair (parents, pipes, hwms, delays);
+ rc = pipepair (parents, pipes, hwms, delays, options.protocol);
errno_assert (rc == 0);
// PGM does not support subscription forwarding; ask for all data to be
diff --git a/src/xpub.cpp b/src/xpub.cpp
index 29dd079..255c063 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -42,9 +42,11 @@ void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
xs_assert (pipe_);
dist.attach (pipe_);
- // If icanhasall_ is specified, the caller would like to subscribe
- // to all data on this pipe, implicitly.
- if (icanhasall_)
+ // If icanhasall_ is specified, the caller would like to subscribe
+ // to all data on this pipe, implicitly. Also, if we are using
+ // 0MQ/2.1-style protocol, there's no subscription forwarding. Thus,
+ // we need to subscribe for all messages automatically.
+ if (icanhasall_ || pipe_->get_protocol () == 1)
subscriptions.add (NULL, 0, pipe_);
// The pipe is active when attached. Let's read the subscriptions from
diff --git a/src/xsub.cpp b/src/xsub.cpp
index add5ba9..af6789f 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -52,7 +52,11 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
fq.attach (pipe_);
- dist.attach (pipe_);
+
+ // Pipes with 0MQ/2.1-style protocol are not eligible for accepting
+ // subscriptions.
+ if (pipe_->get_protocol () != 1)
+ dist.attach (pipe_);
// Send all the cached subscriptions to the new upstream peer.
subscriptions.apply (send_subscription, pipe_);
@@ -72,14 +76,18 @@ void xs::xsub_t::xwrite_activated (pipe_t *pipe_)
void xs::xsub_t::xterminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
- dist.terminated (pipe_);
+ if (pipe_->get_protocol () != 1)
+ dist.terminated (pipe_);
}
void xs::xsub_t::xhiccuped (pipe_t *pipe_)
{
- // Send all the cached subscriptions to the hiccuped pipe.
- subscriptions.apply (send_subscription, pipe_);
- pipe_->flush ();
+ if (pipe_->get_protocol () != 1) {
+
+ // Send all the cached subscriptions to the hiccuped pipe.
+ subscriptions.apply (send_subscription, pipe_);
+ pipe_->flush ();
+ }
}
int xs::xsub_t::xsend (msg_t *msg_, int flags_)