summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-07-17 23:31:29 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-07-17 23:31:29 +0200
commitbf78e230ad4736da9fce6e0b4d1655affb8f466b (patch)
tree8119826b6e569e4cbb93fa1afc869b8fab43690d /src
parentc8e8f2a24cd339c548e06f75a3cef96454671a85 (diff)
GENERIC socket type and COMMAND flag added
GENERIC allows to use 0MQ as a dumb networking framework. It provides user with connect/disconnect notifications. Also, each inbound message is labeled by ID of the connection it originated from. Outbound messages should be labeled by the ID of the connection to send them to. To distinguish connect/disconnect notifications from common messages, COMMAND flag was introduced. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am2
-rw-r--r--src/msg.hpp1
-rw-r--r--src/socket_base.cpp53
-rw-r--r--src/socket_base.hpp7
4 files changed, 43 insertions, 20 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 5049ad8..971ebeb 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -23,6 +23,7 @@ libzmq_la_SOURCES = \
err.hpp \
fd.hpp \
fq.hpp \
+ generic.hpp \
io_object.hpp \
io_thread.hpp \
ip.hpp \
@@ -88,6 +89,7 @@ libzmq_la_SOURCES = \
epoll.cpp \
err.cpp \
fq.cpp \
+ generic.cpp \
io_object.cpp \
io_thread.cpp \
ip.cpp \
diff --git a/src/msg.hpp b/src/msg.hpp
index 2111bf1..1363e78 100644
--- a/src/msg.hpp
+++ b/src/msg.hpp
@@ -48,6 +48,7 @@ namespace zmq
enum
{
label = 1,
+ command = 2,
shared = 64,
more = 128
};
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index bc58e8b..2513f81 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -58,6 +58,7 @@
#include "xrep.hpp"
#include "xpub.hpp"
#include "xsub.hpp"
+#include "generic.hpp"
bool zmq::socket_base_t::check_tag ()
{
@@ -103,6 +104,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
case ZMQ_XSUB:
s = new (std::nothrow) xsub_t (parent_, tid_);
break;
+ case ZMQ_GENERIC:
+ s = new (std::nothrow) generic_t (parent_, tid_);
+ break;
default:
errno = EINVAL;
return NULL;
@@ -119,6 +123,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
last_tsc (0),
ticks (0),
rcvlabel (false),
+ rcvcmd (false),
rcvmore (false)
{
}
@@ -259,6 +264,16 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return 0;
}
+ if (option_ == ZMQ_RCVCMD) {
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = rcvcmd ? 1 : 0;
+ *optvallen_ = sizeof (int);
+ return 0;
+ }
+
if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
@@ -469,11 +484,13 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
if (unlikely (rc != 0))
return -1;
- // At this point we impose the LABEL & MORE flags on the message.
+ // At this point we impose the flags on the message.
if (flags_ & ZMQ_SNDLABEL)
msg_->set_flags (msg_t::label);
if (flags_ & ZMQ_SNDMORE)
msg_->set_flags (msg_t::more);
+ if (flags_ & ZMQ_SNDCMD)
+ msg_->set_flags (msg_t::command);
// Try to send the message.
rc = xsend (msg_, flags_);
@@ -550,12 +567,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// If we have the message, return immediately.
if (rc == 0) {
- rcvlabel = msg_->flags () & msg_t::label;
- if (rcvlabel)
- msg_->reset_flags (msg_t::label);
- rcvmore = msg_->flags () & msg_t::more ? true : false;
- if (rcvmore)
- msg_->reset_flags (msg_t::more);
+ extract_flags (msg_);
return 0;
}
@@ -571,12 +583,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
rc = xrecv (msg_, flags_);
if (rc < 0)
return rc;
- rcvlabel = msg_->flags () & msg_t::label;
- if (rcvlabel)
- msg_->reset_flags (msg_t::label);
- rcvmore = msg_->flags () & msg_t::more ? true : false;
- if (rcvmore)
- msg_->reset_flags (msg_t::more);
+ extract_flags (msg_);
return 0;
}
@@ -609,13 +616,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
}
}
- // Extract LABEL & MORE flags from the message.
- rcvlabel = msg_->flags () & msg_t::label;
- if (rcvlabel)
- msg_->reset_flags (msg_t::label);
- rcvmore = msg_->flags () & msg_t::more ? true : false;
- if (rcvmore)
- msg_->reset_flags (msg_t::more);
+ extract_flags (msg_);
return 0;
}
@@ -856,3 +857,15 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)
unregister_term_ack ();
}
+void zmq::socket_base_t::extract_flags (msg_t *msg_)
+{
+ rcvlabel = msg_->flags () & msg_t::label;
+ if (rcvlabel)
+ msg_->reset_flags (msg_t::label);
+ rcvmore = msg_->flags () & msg_t::more ? true : false;
+ if (rcvmore)
+ msg_->reset_flags (msg_t::more);
+ rcvcmd = msg_->flags () & msg_t::command ? true : false;
+ if (rcvcmd)
+ msg_->reset_flags (msg_t::command);
+}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index fb60bbe..c7c86e7 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -128,6 +128,10 @@ namespace zmq
// handlers explicitly. If required, it will deallocate the socket.
void check_destroy ();
+ // Moves the flags from the message to local variables,
+ // to be later retrieved by getsockopt.
+ void extract_flags (msg_t *msg_);
+
// Used to check whether the object is a socket.
uint32_t tag;
@@ -182,6 +186,9 @@ namespace zmq
// True if the last message received had LABEL flag set.
bool rcvlabel;
+ // True if the last message received had COMMAND flag set.
+ bool rcvcmd;
+
// True if the last message received had MORE flag set.
bool rcvmore;