summaryrefslogtreecommitdiff
path: root/src/xsub.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/xsub.cpp')
-rw-r--r--src/xsub.cpp55
1 files changed, 48 insertions, 7 deletions
diff --git a/src/xsub.cpp b/src/xsub.cpp
index 7272a8e..9255ceb 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -29,6 +29,10 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_XSUB;
+ options.sp_pattern = SP_PUBSUB;
+ options.sp_version = 4;
+ options.sp_role = SP_PUBSUB_SUB;
+ options.sp_complement = SP_PUBSUB_PUB;
// When socket is being closed down we don't want to wait till pending
// subscription commands are sent to the wire.
@@ -42,6 +46,42 @@ xs::xsub_t::~xsub_t ()
{
}
+int xs::xsub_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ if (option_ != XS_PATTERN_VERSION) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (!optval_) {
+ errno = EFAULT;
+ return -1;
+ }
+
+ int version = *(int *) optval_;
+ switch (version) {
+ case 1:
+ options.legacy_protocol = true;
+ options.sp_version = 1;
+ break;
+ case 3:
+ options.legacy_protocol = false;
+ options.sp_version = 3;
+ break;
+ default:
+ errno = EINVAL;
+ return -1;
+ }
+
+ return 0;
+}
+
void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
@@ -49,7 +89,7 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
// Pipes with 0MQ/2.1-style protocol are not eligible for accepting
// subscriptions.
- if (pipe_->get_protocol () == 1)
+ if (pipe_->get_sp_version () == 1)
return;
dist.attach (pipe_);
@@ -75,14 +115,14 @@ void xs::xsub_t::xwrite_activated (pipe_t *pipe_)
void xs::xsub_t::xterminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
- if (pipe_->get_protocol () != 1)
+ if (pipe_->get_sp_version () != 1)
dist.terminated (pipe_);
}
void xs::xsub_t::xhiccuped (pipe_t *pipe_)
{
// In 0MQ/2.1 protocol there is no subscription forwarding.
- if (pipe_->get_protocol () == 1)
+ if (pipe_->get_sp_version () == 1)
return;
// Send all the cached subscriptions to the new upstream peer.
@@ -104,13 +144,13 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)
}
int cmd = get_uint16 (data);
int filter_id = get_uint16 (data + 2);
- if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) {
+ if (cmd != SP_PUBSUB_CMD_SUBSCRIBE && cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) {
errno = EINVAL;
return -1;
}
// Process the subscription.
- if (cmd == XS_CMD_SUBSCRIBE) {
+ if (cmd == SP_PUBSUB_CMD_SUBSCRIBE) {
subscriptions_t::iterator it = subscriptions.insert (
std::make_pair (std::make_pair (filter_id,
blob_t (data + 4, size - 4)), 0)).first;
@@ -118,7 +158,7 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)
if (it->second == 1)
return dist.send_to_all (msg_, flags_);
}
- else if (cmd == XS_CMD_UNSUBSCRIBE) {
+ else if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) {
subscriptions_t::iterator it = subscriptions.find (
std::make_pair (filter_id, blob_t (data + 4, size - 4)));
if (it != subscriptions.end ()) {
@@ -162,7 +202,8 @@ void xs::xsub_t::send_subscription (pipe_t *pipe_, bool subscribe_,
int rc = msg.init_size (size_ + 4);
xs_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
- put_uint16 (data, subscribe_ ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE);
+ put_uint16 (data, subscribe_ ? SP_PUBSUB_CMD_SUBSCRIBE :
+ SP_PUBSUB_CMD_UNSUBSCRIBE);
put_uint16 (data + 2, filter_id_);
memcpy (data + 4, data_, size_);