diff options
-rw-r--r-- | include/zmq.h | 3 | ||||
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/msg.hpp | 1 | ||||
-rw-r--r-- | src/socket_base.cpp | 53 | ||||
-rw-r--r-- | src/socket_base.hpp | 7 |
5 files changed, 46 insertions, 20 deletions
diff --git a/include/zmq.h b/include/zmq.h index 2385040..2ba560e 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -158,6 +158,7 @@ ZMQ_EXPORT int zmq_term (void *context); #define ZMQ_PUSH 8 #define ZMQ_XPUB 9 #define ZMQ_XSUB 10 +#define ZMQ_GENERIC 13 /* Socket options. */ #define ZMQ_AFFINITY 4 @@ -182,11 +183,13 @@ ZMQ_EXPORT int zmq_term (void *context); #define ZMQ_RCVTIMEO 27 #define ZMQ_SNDTIMEO 28 #define ZMQ_RCVLABEL 29 +#define ZMQ_RCVCMD 30 /* Send/recv options. */ #define ZMQ_DONTWAIT 1 #define ZMQ_SNDMORE 2 #define ZMQ_SNDLABEL 4 +#define ZMQ_SNDCMD 8 ZMQ_EXPORT void *zmq_socket (void *context, int type); ZMQ_EXPORT int zmq_close (void *s); diff --git a/src/Makefile.am b/src/Makefile.am index 5049ad8..971ebeb 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -23,6 +23,7 @@ libzmq_la_SOURCES = \ err.hpp \ fd.hpp \ fq.hpp \ + generic.hpp \ io_object.hpp \ io_thread.hpp \ ip.hpp \ @@ -88,6 +89,7 @@ libzmq_la_SOURCES = \ epoll.cpp \ err.cpp \ fq.cpp \ + generic.cpp \ io_object.cpp \ io_thread.cpp \ ip.cpp \ diff --git a/src/msg.hpp b/src/msg.hpp index 2111bf1..1363e78 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -48,6 +48,7 @@ namespace zmq enum { label = 1, + command = 2, shared = 64, more = 128 }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index bc58e8b..2513f81 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -58,6 +58,7 @@ #include "xrep.hpp" #include "xpub.hpp" #include "xsub.hpp" +#include "generic.hpp" bool zmq::socket_base_t::check_tag () { @@ -103,6 +104,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, case ZMQ_XSUB: s = new (std::nothrow) xsub_t (parent_, tid_); break; + case ZMQ_GENERIC: + s = new (std::nothrow) generic_t (parent_, tid_); + break; default: errno = EINVAL; return NULL; @@ -119,6 +123,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) : last_tsc (0), ticks (0), rcvlabel (false), + rcvcmd (false), rcvmore (false) { } @@ -259,6 +264,16 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, return 0; } + if (option_ == ZMQ_RCVCMD) { + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = rcvcmd ? 1 : 0; + *optvallen_ = sizeof (int); + return 0; + } + if (option_ == ZMQ_RCVMORE) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; @@ -469,11 +484,13 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) if (unlikely (rc != 0)) return -1; - // At this point we impose the LABEL & MORE flags on the message. + // At this point we impose the flags on the message. if (flags_ & ZMQ_SNDLABEL) msg_->set_flags (msg_t::label); if (flags_ & ZMQ_SNDMORE) msg_->set_flags (msg_t::more); + if (flags_ & ZMQ_SNDCMD) + msg_->set_flags (msg_t::command); // Try to send the message. rc = xsend (msg_, flags_); @@ -550,12 +567,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // If we have the message, return immediately. if (rc == 0) { - rcvlabel = msg_->flags () & msg_t::label; - if (rcvlabel) - msg_->reset_flags (msg_t::label); - rcvmore = msg_->flags () & msg_t::more ? true : false; - if (rcvmore) - msg_->reset_flags (msg_t::more); + extract_flags (msg_); return 0; } @@ -571,12 +583,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) rc = xrecv (msg_, flags_); if (rc < 0) return rc; - rcvlabel = msg_->flags () & msg_t::label; - if (rcvlabel) - msg_->reset_flags (msg_t::label); - rcvmore = msg_->flags () & msg_t::more ? true : false; - if (rcvmore) - msg_->reset_flags (msg_t::more); + extract_flags (msg_); return 0; } @@ -609,13 +616,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) } } - // Extract LABEL & MORE flags from the message. - rcvlabel = msg_->flags () & msg_t::label; - if (rcvlabel) - msg_->reset_flags (msg_t::label); - rcvmore = msg_->flags () & msg_t::more ? true : false; - if (rcvmore) - msg_->reset_flags (msg_t::more); + extract_flags (msg_); return 0; } @@ -856,3 +857,15 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_) unregister_term_ack (); } +void zmq::socket_base_t::extract_flags (msg_t *msg_) +{ + rcvlabel = msg_->flags () & msg_t::label; + if (rcvlabel) + msg_->reset_flags (msg_t::label); + rcvmore = msg_->flags () & msg_t::more ? true : false; + if (rcvmore) + msg_->reset_flags (msg_t::more); + rcvcmd = msg_->flags () & msg_t::command ? true : false; + if (rcvcmd) + msg_->reset_flags (msg_t::command); +} diff --git a/src/socket_base.hpp b/src/socket_base.hpp index fb60bbe..c7c86e7 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -128,6 +128,10 @@ namespace zmq // handlers explicitly. If required, it will deallocate the socket. void check_destroy (); + // Moves the flags from the message to local variables, + // to be later retrieved by getsockopt. + void extract_flags (msg_t *msg_); + // Used to check whether the object is a socket. uint32_t tag; @@ -182,6 +186,9 @@ namespace zmq // True if the last message received had LABEL flag set. bool rcvlabel; + // True if the last message received had COMMAND flag set. + bool rcvcmd; + // True if the last message received had MORE flag set. bool rcvmore; |