summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/zmq.h3
-rw-r--r--src/Makefile.am2
-rw-r--r--src/msg.hpp1
-rw-r--r--src/socket_base.cpp53
-rw-r--r--src/socket_base.hpp7
5 files changed, 46 insertions, 20 deletions
diff --git a/include/zmq.h b/include/zmq.h
index 2385040..2ba560e 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -158,6 +158,7 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_PUSH 8
#define ZMQ_XPUB 9
#define ZMQ_XSUB 10
+#define ZMQ_GENERIC 13
/* Socket options. */
#define ZMQ_AFFINITY 4
@@ -182,11 +183,13 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_RCVTIMEO 27
#define ZMQ_SNDTIMEO 28
#define ZMQ_RCVLABEL 29
+#define ZMQ_RCVCMD 30
/* Send/recv options. */
#define ZMQ_DONTWAIT 1
#define ZMQ_SNDMORE 2
#define ZMQ_SNDLABEL 4
+#define ZMQ_SNDCMD 8
ZMQ_EXPORT void *zmq_socket (void *context, int type);
ZMQ_EXPORT int zmq_close (void *s);
diff --git a/src/Makefile.am b/src/Makefile.am
index 5049ad8..971ebeb 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -23,6 +23,7 @@ libzmq_la_SOURCES = \
err.hpp \
fd.hpp \
fq.hpp \
+ generic.hpp \
io_object.hpp \
io_thread.hpp \
ip.hpp \
@@ -88,6 +89,7 @@ libzmq_la_SOURCES = \
epoll.cpp \
err.cpp \
fq.cpp \
+ generic.cpp \
io_object.cpp \
io_thread.cpp \
ip.cpp \
diff --git a/src/msg.hpp b/src/msg.hpp
index 2111bf1..1363e78 100644
--- a/src/msg.hpp
+++ b/src/msg.hpp
@@ -48,6 +48,7 @@ namespace zmq
enum
{
label = 1,
+ command = 2,
shared = 64,
more = 128
};
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index bc58e8b..2513f81 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -58,6 +58,7 @@
#include "xrep.hpp"
#include "xpub.hpp"
#include "xsub.hpp"
+#include "generic.hpp"
bool zmq::socket_base_t::check_tag ()
{
@@ -103,6 +104,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
case ZMQ_XSUB:
s = new (std::nothrow) xsub_t (parent_, tid_);
break;
+ case ZMQ_GENERIC:
+ s = new (std::nothrow) generic_t (parent_, tid_);
+ break;
default:
errno = EINVAL;
return NULL;
@@ -119,6 +123,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
last_tsc (0),
ticks (0),
rcvlabel (false),
+ rcvcmd (false),
rcvmore (false)
{
}
@@ -259,6 +264,16 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return 0;
}
+ if (option_ == ZMQ_RCVCMD) {
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = rcvcmd ? 1 : 0;
+ *optvallen_ = sizeof (int);
+ return 0;
+ }
+
if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
@@ -469,11 +484,13 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
if (unlikely (rc != 0))
return -1;
- // At this point we impose the LABEL & MORE flags on the message.
+ // At this point we impose the flags on the message.
if (flags_ & ZMQ_SNDLABEL)
msg_->set_flags (msg_t::label);
if (flags_ & ZMQ_SNDMORE)
msg_->set_flags (msg_t::more);
+ if (flags_ & ZMQ_SNDCMD)
+ msg_->set_flags (msg_t::command);
// Try to send the message.
rc = xsend (msg_, flags_);
@@ -550,12 +567,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// If we have the message, return immediately.
if (rc == 0) {
- rcvlabel = msg_->flags () & msg_t::label;
- if (rcvlabel)
- msg_->reset_flags (msg_t::label);
- rcvmore = msg_->flags () & msg_t::more ? true : false;
- if (rcvmore)
- msg_->reset_flags (msg_t::more);
+ extract_flags (msg_);
return 0;
}
@@ -571,12 +583,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
rc = xrecv (msg_, flags_);
if (rc < 0)
return rc;
- rcvlabel = msg_->flags () & msg_t::label;
- if (rcvlabel)
- msg_->reset_flags (msg_t::label);
- rcvmore = msg_->flags () & msg_t::more ? true : false;
- if (rcvmore)
- msg_->reset_flags (msg_t::more);
+ extract_flags (msg_);
return 0;
}
@@ -609,13 +616,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
}
}
- // Extract LABEL & MORE flags from the message.
- rcvlabel = msg_->flags () & msg_t::label;
- if (rcvlabel)
- msg_->reset_flags (msg_t::label);
- rcvmore = msg_->flags () & msg_t::more ? true : false;
- if (rcvmore)
- msg_->reset_flags (msg_t::more);
+ extract_flags (msg_);
return 0;
}
@@ -856,3 +857,15 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)
unregister_term_ack ();
}
+void zmq::socket_base_t::extract_flags (msg_t *msg_)
+{
+ rcvlabel = msg_->flags () & msg_t::label;
+ if (rcvlabel)
+ msg_->reset_flags (msg_t::label);
+ rcvmore = msg_->flags () & msg_t::more ? true : false;
+ if (rcvmore)
+ msg_->reset_flags (msg_t::more);
+ rcvcmd = msg_->flags () & msg_t::command ? true : false;
+ if (rcvcmd)
+ msg_->reset_flags (msg_t::command);
+}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index fb60bbe..c7c86e7 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -128,6 +128,10 @@ namespace zmq
// handlers explicitly. If required, it will deallocate the socket.
void check_destroy ();
+ // Moves the flags from the message to local variables,
+ // to be later retrieved by getsockopt.
+ void extract_flags (msg_t *msg_);
+
// Used to check whether the object is a socket.
uint32_t tag;
@@ -182,6 +186,9 @@ namespace zmq
// True if the last message received had LABEL flag set.
bool rcvlabel;
+ // True if the last message received had COMMAND flag set.
+ bool rcvcmd;
+
// True if the last message received had MORE flag set.
bool rcvmore;