diff options
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r-- | src/socket_base.cpp | 53 |
1 files changed, 33 insertions, 20 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp index bc58e8b..2513f81 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -58,6 +58,7 @@ #include "xrep.hpp" #include "xpub.hpp" #include "xsub.hpp" +#include "generic.hpp" bool zmq::socket_base_t::check_tag () { @@ -103,6 +104,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, case ZMQ_XSUB: s = new (std::nothrow) xsub_t (parent_, tid_); break; + case ZMQ_GENERIC: + s = new (std::nothrow) generic_t (parent_, tid_); + break; default: errno = EINVAL; return NULL; @@ -119,6 +123,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) : last_tsc (0), ticks (0), rcvlabel (false), + rcvcmd (false), rcvmore (false) { } @@ -259,6 +264,16 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, return 0; } + if (option_ == ZMQ_RCVCMD) { + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = rcvcmd ? 1 : 0; + *optvallen_ = sizeof (int); + return 0; + } + if (option_ == ZMQ_RCVMORE) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; @@ -469,11 +484,13 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) if (unlikely (rc != 0)) return -1; - // At this point we impose the LABEL & MORE flags on the message. + // At this point we impose the flags on the message. if (flags_ & ZMQ_SNDLABEL) msg_->set_flags (msg_t::label); if (flags_ & ZMQ_SNDMORE) msg_->set_flags (msg_t::more); + if (flags_ & ZMQ_SNDCMD) + msg_->set_flags (msg_t::command); // Try to send the message. rc = xsend (msg_, flags_); @@ -550,12 +567,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // If we have the message, return immediately. if (rc == 0) { - rcvlabel = msg_->flags () & msg_t::label; - if (rcvlabel) - msg_->reset_flags (msg_t::label); - rcvmore = msg_->flags () & msg_t::more ? true : false; - if (rcvmore) - msg_->reset_flags (msg_t::more); + extract_flags (msg_); return 0; } @@ -571,12 +583,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) rc = xrecv (msg_, flags_); if (rc < 0) return rc; - rcvlabel = msg_->flags () & msg_t::label; - if (rcvlabel) - msg_->reset_flags (msg_t::label); - rcvmore = msg_->flags () & msg_t::more ? true : false; - if (rcvmore) - msg_->reset_flags (msg_t::more); + extract_flags (msg_); return 0; } @@ -609,13 +616,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) } } - // Extract LABEL & MORE flags from the message. - rcvlabel = msg_->flags () & msg_t::label; - if (rcvlabel) - msg_->reset_flags (msg_t::label); - rcvmore = msg_->flags () & msg_t::more ? true : false; - if (rcvmore) - msg_->reset_flags (msg_t::more); + extract_flags (msg_); return 0; } @@ -856,3 +857,15 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_) unregister_term_ack (); } +void zmq::socket_base_t::extract_flags (msg_t *msg_) +{ + rcvlabel = msg_->flags () & msg_t::label; + if (rcvlabel) + msg_->reset_flags (msg_t::label); + rcvmore = msg_->flags () & msg_t::more ? true : false; + if (rcvmore) + msg_->reset_flags (msg_t::more); + rcvcmd = msg_->flags () & msg_t::command ? true : false; + if (rcvcmd) + msg_->reset_flags (msg_t::command); +} |