summaryrefslogtreecommitdiff
path: root/src/xsub.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/xsub.cpp')
-rw-r--r--src/xsub.cpp55
1 files changed, 7 insertions, 48 deletions
diff --git a/src/xsub.cpp b/src/xsub.cpp
index 0a9ff7e..599b947 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -29,10 +29,6 @@ xs::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = XS_XSUB;
- options.sp_pattern = SP_PUBSUB;
- options.sp_version = 4;
- options.sp_role = SP_PUBSUB_SUB;
- options.sp_complement = SP_PUBSUB_PUB;
// When socket is being closed down we don't want to wait till pending
// subscription commands are sent to the wire.
@@ -46,42 +42,6 @@ xs::xsub_t::~xsub_t ()
{
}
-int xs::xsub_t::xsetsockopt (int option_, const void *optval_,
- size_t optvallen_)
-{
- if (option_ != XS_PATTERN_VERSION) {
- errno = EINVAL;
- return -1;
- }
-
- if (optvallen_ != sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
-
- if (!optval_) {
- errno = EFAULT;
- return -1;
- }
-
- int version = *(int *) optval_;
- switch (version) {
- case 1:
- options.legacy_protocol = true;
- options.sp_version = 1;
- break;
- case 3:
- options.legacy_protocol = false;
- options.sp_version = 3;
- break;
- default:
- errno = EINVAL;
- return -1;
- }
-
- return 0;
-}
-
void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
xs_assert (pipe_);
@@ -89,7 +49,7 @@ void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
// Pipes with 0MQ/2.1-style protocol are not eligible for accepting
// subscriptions.
- if (pipe_->get_sp_version () == 1)
+ if (pipe_->get_protocol () == 1)
return;
dist.attach (pipe_);
@@ -115,14 +75,14 @@ void xs::xsub_t::xwrite_activated (pipe_t *pipe_)
void xs::xsub_t::xterminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
- if (pipe_->get_sp_version () != 1)
+ if (pipe_->get_protocol () != 1)
dist.terminated (pipe_);
}
void xs::xsub_t::xhiccuped (pipe_t *pipe_)
{
// In 0MQ/2.1 protocol there is no subscription forwarding.
- if (pipe_->get_sp_version () == 1)
+ if (pipe_->get_protocol () == 1)
return;
// Send all the cached subscriptions to the new upstream peer.
@@ -144,13 +104,13 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)
}
int cmd = get_uint16 (data);
int filter_id = get_uint16 (data + 2);
- if (cmd != SP_PUBSUB_CMD_SUBSCRIBE && cmd != SP_PUBSUB_CMD_UNSUBSCRIBE) {
+ if (cmd != XS_CMD_SUBSCRIBE && cmd != XS_CMD_UNSUBSCRIBE) {
errno = EINVAL;
return -1;
}
// Process the subscription.
- if (cmd == SP_PUBSUB_CMD_SUBSCRIBE) {
+ if (cmd == XS_CMD_SUBSCRIBE) {
subscriptions_t::iterator it = subscriptions.insert (
std::make_pair (std::make_pair (filter_id,
blob_t (data + 4, size - 4)), 0)).first;
@@ -158,7 +118,7 @@ int xs::xsub_t::xsend (msg_t *msg_, int flags_)
if (it->second == 1)
return dist.send_to_all (msg_, flags_);
}
- else if (cmd == SP_PUBSUB_CMD_UNSUBSCRIBE) {
+ else if (cmd == XS_CMD_UNSUBSCRIBE) {
subscriptions_t::iterator it = subscriptions.find (
std::make_pair (filter_id, blob_t (data + 4, size - 4)));
if (it != subscriptions.end ()) {
@@ -202,8 +162,7 @@ void xs::xsub_t::send_subscription (pipe_t *pipe_, bool subscribe_,
int rc = msg.init_size (size_ + 4);
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
- put_uint16 (data, subscribe_ ? SP_PUBSUB_CMD_SUBSCRIBE :
- SP_PUBSUB_CMD_UNSUBSCRIBE);
+ put_uint16 (data, subscribe_ ? XS_CMD_SUBSCRIBE : XS_CMD_UNSUBSCRIBE);
put_uint16 (data + 2, filter_id_);
memcpy (data + 4, data_, size_);