summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp53
1 files changed, 33 insertions, 20 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index bc58e8b..2513f81 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -58,6 +58,7 @@
#include "xrep.hpp"
#include "xpub.hpp"
#include "xsub.hpp"
+#include "generic.hpp"
bool zmq::socket_base_t::check_tag ()
{
@@ -103,6 +104,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
case ZMQ_XSUB:
s = new (std::nothrow) xsub_t (parent_, tid_);
break;
+ case ZMQ_GENERIC:
+ s = new (std::nothrow) generic_t (parent_, tid_);
+ break;
default:
errno = EINVAL;
return NULL;
@@ -119,6 +123,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
last_tsc (0),
ticks (0),
rcvlabel (false),
+ rcvcmd (false),
rcvmore (false)
{
}
@@ -259,6 +264,16 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return 0;
}
+ if (option_ == ZMQ_RCVCMD) {
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = rcvcmd ? 1 : 0;
+ *optvallen_ = sizeof (int);
+ return 0;
+ }
+
if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
@@ -469,11 +484,13 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
if (unlikely (rc != 0))
return -1;
- // At this point we impose the LABEL & MORE flags on the message.
+ // At this point we impose the flags on the message.
if (flags_ & ZMQ_SNDLABEL)
msg_->set_flags (msg_t::label);
if (flags_ & ZMQ_SNDMORE)
msg_->set_flags (msg_t::more);
+ if (flags_ & ZMQ_SNDCMD)
+ msg_->set_flags (msg_t::command);
// Try to send the message.
rc = xsend (msg_, flags_);
@@ -550,12 +567,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// If we have the message, return immediately.
if (rc == 0) {
- rcvlabel = msg_->flags () & msg_t::label;
- if (rcvlabel)
- msg_->reset_flags (msg_t::label);
- rcvmore = msg_->flags () & msg_t::more ? true : false;
- if (rcvmore)
- msg_->reset_flags (msg_t::more);
+ extract_flags (msg_);
return 0;
}
@@ -571,12 +583,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
rc = xrecv (msg_, flags_);
if (rc < 0)
return rc;
- rcvlabel = msg_->flags () & msg_t::label;
- if (rcvlabel)
- msg_->reset_flags (msg_t::label);
- rcvmore = msg_->flags () & msg_t::more ? true : false;
- if (rcvmore)
- msg_->reset_flags (msg_t::more);
+ extract_flags (msg_);
return 0;
}
@@ -609,13 +616,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
}
}
- // Extract LABEL & MORE flags from the message.
- rcvlabel = msg_->flags () & msg_t::label;
- if (rcvlabel)
- msg_->reset_flags (msg_t::label);
- rcvmore = msg_->flags () & msg_t::more ? true : false;
- if (rcvmore)
- msg_->reset_flags (msg_t::more);
+ extract_flags (msg_);
return 0;
}
@@ -856,3 +857,15 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)
unregister_term_ack ();
}
+void zmq::socket_base_t::extract_flags (msg_t *msg_)
+{
+ rcvlabel = msg_->flags () & msg_t::label;
+ if (rcvlabel)
+ msg_->reset_flags (msg_t::label);
+ rcvmore = msg_->flags () & msg_t::more ? true : false;
+ if (rcvmore)
+ msg_->reset_flags (msg_t::more);
+ rcvcmd = msg_->flags () & msg_t::command ? true : false;
+ if (rcvcmd)
+ msg_->reset_flags (msg_t::command);
+}