diff options
| -rw-r--r-- | include/zmq.h | 3 | ||||
| -rw-r--r-- | src/Makefile.am | 2 | ||||
| -rw-r--r-- | src/msg.hpp | 1 | ||||
| -rw-r--r-- | src/socket_base.cpp | 53 | ||||
| -rw-r--r-- | src/socket_base.hpp | 7 | 
5 files changed, 46 insertions, 20 deletions
| diff --git a/include/zmq.h b/include/zmq.h index 2385040..2ba560e 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -158,6 +158,7 @@ ZMQ_EXPORT int zmq_term (void *context);  #define ZMQ_PUSH 8  #define ZMQ_XPUB 9  #define ZMQ_XSUB 10 +#define ZMQ_GENERIC 13  /*  Socket options.                                                           */  #define ZMQ_AFFINITY 4 @@ -182,11 +183,13 @@ ZMQ_EXPORT int zmq_term (void *context);  #define ZMQ_RCVTIMEO 27  #define ZMQ_SNDTIMEO 28  #define ZMQ_RCVLABEL 29 +#define ZMQ_RCVCMD 30  /*  Send/recv options.                                                        */  #define ZMQ_DONTWAIT 1  #define ZMQ_SNDMORE 2  #define ZMQ_SNDLABEL 4 +#define ZMQ_SNDCMD 8  ZMQ_EXPORT void *zmq_socket (void *context, int type);  ZMQ_EXPORT int zmq_close (void *s); diff --git a/src/Makefile.am b/src/Makefile.am index 5049ad8..971ebeb 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -23,6 +23,7 @@ libzmq_la_SOURCES = \      err.hpp \      fd.hpp \      fq.hpp \ +    generic.hpp \      io_object.hpp \      io_thread.hpp \      ip.hpp \ @@ -88,6 +89,7 @@ libzmq_la_SOURCES = \      epoll.cpp \      err.cpp \      fq.cpp \ +    generic.cpp \      io_object.cpp \      io_thread.cpp \      ip.cpp \ diff --git a/src/msg.hpp b/src/msg.hpp index 2111bf1..1363e78 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -48,6 +48,7 @@ namespace zmq          enum          {              label = 1, +            command = 2,              shared = 64,              more = 128          }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index bc58e8b..2513f81 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -58,6 +58,7 @@  #include "xrep.hpp"  #include "xpub.hpp"  #include "xsub.hpp" +#include "generic.hpp"  bool zmq::socket_base_t::check_tag ()  { @@ -103,6 +104,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,      case ZMQ_XSUB:          s = new (std::nothrow) xsub_t (parent_, tid_);          break; +    case ZMQ_GENERIC: +        s = new (std::nothrow) generic_t (parent_, tid_); +        break;      default:          errno = EINVAL;          return NULL; @@ -119,6 +123,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :      last_tsc (0),      ticks (0),      rcvlabel (false), +    rcvcmd (false),      rcvmore (false)  {  } @@ -259,6 +264,16 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,          return 0;      } +    if (option_ == ZMQ_RCVCMD) { +        if (*optvallen_ < sizeof (int)) { +            errno = EINVAL; +            return -1; +        } +        *((int*) optval_) = rcvcmd ? 1 : 0; +        *optvallen_ = sizeof (int); +        return 0; +    } +      if (option_ == ZMQ_RCVMORE) {          if (*optvallen_ < sizeof (int)) {              errno = EINVAL; @@ -469,11 +484,13 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)      if (unlikely (rc != 0))          return -1; -    //  At this point we impose the LABEL & MORE flags on the message. +    //  At this point we impose the flags on the message.      if (flags_ & ZMQ_SNDLABEL)          msg_->set_flags (msg_t::label);      if (flags_ & ZMQ_SNDMORE)          msg_->set_flags (msg_t::more); +    if (flags_ & ZMQ_SNDCMD) +        msg_->set_flags (msg_t::command);      //  Try to send the message.      rc = xsend (msg_, flags_); @@ -550,12 +567,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)      //  If we have the message, return immediately.      if (rc == 0) { -        rcvlabel = msg_->flags () & msg_t::label; -        if (rcvlabel) -            msg_->reset_flags (msg_t::label); -        rcvmore = msg_->flags () & msg_t::more ? true : false; -        if (rcvmore) -            msg_->reset_flags (msg_t::more); +        extract_flags (msg_);          return 0;      } @@ -571,12 +583,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)          rc = xrecv (msg_, flags_);          if (rc < 0)              return rc; -        rcvlabel = msg_->flags () & msg_t::label; -        if (rcvlabel) -            msg_->reset_flags (msg_t::label); -        rcvmore = msg_->flags () & msg_t::more ? true : false; -        if (rcvmore) -            msg_->reset_flags (msg_t::more); +        extract_flags (msg_);          return 0;      } @@ -609,13 +616,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)          }      } -    //  Extract LABEL & MORE flags from the message. -    rcvlabel = msg_->flags () & msg_t::label; -    if (rcvlabel) -        msg_->reset_flags (msg_t::label); -    rcvmore = msg_->flags () & msg_t::more ? true : false; -    if (rcvmore) -        msg_->reset_flags (msg_t::more); +    extract_flags (msg_);      return 0;  } @@ -856,3 +857,15 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)          unregister_term_ack ();  } +void zmq::socket_base_t::extract_flags (msg_t *msg_) +{ +    rcvlabel = msg_->flags () & msg_t::label; +    if (rcvlabel) +        msg_->reset_flags (msg_t::label); +    rcvmore = msg_->flags () & msg_t::more ? true : false; +    if (rcvmore) +        msg_->reset_flags (msg_t::more); +    rcvcmd = msg_->flags () & msg_t::command ? true : false; +    if (rcvcmd) +        msg_->reset_flags (msg_t::command); +} diff --git a/src/socket_base.hpp b/src/socket_base.hpp index fb60bbe..c7c86e7 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -128,6 +128,10 @@ namespace zmq          //  handlers explicitly. If required, it will deallocate the socket.          void check_destroy (); +        //  Moves the flags from the message to local variables, +        //  to be later retrieved by getsockopt. +        void extract_flags (msg_t *msg_); +          //  Used to check whether the object is a socket.          uint32_t tag; @@ -182,6 +186,9 @@ namespace zmq          //  True if the last message received had LABEL flag set.          bool rcvlabel; +        //  True if the last message received had COMMAND flag set. +        bool rcvcmd; +          //  True if the last message received had MORE flag set.          bool rcvmore; | 
