summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--doc/Makefile.am2
-rw-r--r--doc/zmq_getmsgopt.txt85
-rw-r--r--include/zmq.h5
-rw-r--r--src/socket_base.cpp10
-rw-r--r--src/zmq.cpp19
-rw-r--r--tests/Makefile.am4
-rw-r--r--tests/test_msg_flags.cpp82
8 files changed, 200 insertions, 8 deletions
diff --git a/.gitignore b/.gitignore
index eb8da57..3bc8348 100644
--- a/.gitignore
+++ b/.gitignore
@@ -32,6 +32,7 @@ tests/test_reqrep_device
tests/test_reqrep_drop
tests/test_sub_forward
tests/test_invalid_rep
+tests/test_msg_flags
src/platform.hpp*
src/stamp-h1
perf/local_lat
diff --git a/doc/Makefile.am b/doc/Makefile.am
index dae71be..ff00c18 100644
--- a/doc/Makefile.am
+++ b/doc/Makefile.am
@@ -3,7 +3,7 @@ MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_init.3 \
zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \
zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \
zmq_strerror.3 zmq_term.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \
- zmq_sendmsg.3 zmq_recvmsg.3
+ zmq_sendmsg.3 zmq_recvmsg.3 zmq_getmsgopt.3
MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7
MAN_DOC = $(MAN1) $(MAN3) $(MAN7)
diff --git a/doc/zmq_getmsgopt.txt b/doc/zmq_getmsgopt.txt
new file mode 100644
index 0000000..a82c30c
--- /dev/null
+++ b/doc/zmq_getmsgopt.txt
@@ -0,0 +1,85 @@
+zmq_getmsgopt(3)
+================
+
+
+NAME
+----
+zmq_getmsgopt - retrieve message option
+
+
+SYNOPSIS
+--------
+*int zmq_getmsgopt (zmq_msg_t '*message', int 'option_name', void '*option_value', size_t '*option_len');*
+
+
+DESCRIPTION
+-----------
+The _zmq_getmsgopt()_ function shall retrieve the value for the option
+specified by the 'option_name' argument for the message pointed to by the
+'message' argument, and store it in the buffer pointed to by the 'option_value'
+argument. The 'option_len' argument is the size in bytes of the buffer pointed
+to by 'option_value'; upon successful completion _zmq_getsockopt()_ shall
+modify the 'option_len' argument to indicate the actual size of the option
+value stored in the buffer.
+
+The following options can be retrieved with the _zmq_getmsgopt()_ function:
+
+*ZMQ_MORE*::
+Indicates that there are more message parts to follow after the 'message'.
+
+RETURN VALUE
+------------
+The _zmq_getmsgopt()_ function shall return zero if successful. Otherwise it
+shall return `-1` and set 'errno' to one of the values defined below.
+
+
+ERRORS
+------
+*EINVAL*::
+The requested option _option_name_ is unknown, or the requested _option_size_ or
+_option_value_ is invalid, or the size of the buffer pointed to by
+_option_value_, as specified by _option_len_, is insufficient for storing the
+option value.
+
+
+EXAMPLE
+-------
+.Receiving a multi-part message
+----
+zmq_msg_t part;
+int more;
+size_t more_size = sizeof (more);
+while (true) {
+ /* Create an empty 0MQ message to hold the message part */
+ int rc = zmq_msg_init (&part);
+ assert (rc == 0);
+ /* Block until a message is available to be received from socket */
+ rc = zmq_recvmsg (socket, &part, 0);
+ assert (rc != -1);
+ rc = getmsgopt (&part, ZMQ_MORE, &more, &more_size);
+ assert (rc == 0);
+ if (more) {
+ fprintf (stderr, "more\n");
+ }
+ else {
+ fprintf (stderr, "end\n");
+ break;
+ }
+ zmq_msg_close (part);
+}
+----
+
+
+SEE ALSO
+--------
+linkzmq:zmq_msg_data[3]
+linkzmq:zmq_msg_init[3]
+linkzmq:zmq_msg_init_size[3]
+linkzmq:zmq_msg_init_data[3]
+linkzmq:zmq_msg_close[3]
+linkzmq:zmq[7]
+
+
+AUTHORS
+-------
+The 0MQ documentation was written by Chuck Remes <cremes@mac.com>.
diff --git a/include/zmq.h b/include/zmq.h
index 0263b43..ac350d3 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -139,6 +139,8 @@ ZMQ_EXPORT int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
ZMQ_EXPORT int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
ZMQ_EXPORT void *zmq_msg_data (zmq_msg_t *msg);
ZMQ_EXPORT size_t zmq_msg_size (zmq_msg_t *msg);
+ZMQ_EXPORT int zmq_getmsgopt (zmq_msg_t *msg, int option, void *optval,
+ size_t *optvallen);
/******************************************************************************/
/* 0MQ infrastructure (a.k.a. context) initialisation & termination. */
@@ -192,6 +194,9 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_SNDTIMEO 28
#define ZMQ_IPV4ONLY 31
+/* Message options */
+#define ZMQ_MORE 1
+
/* Send/recv options. */
#define ZMQ_DONTWAIT 1
#define ZMQ_SNDMORE 2
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index ced28d4..302003f 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -485,6 +485,9 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
if (unlikely (rc != 0))
return -1;
+ // Clear any user-visible flags that are set on the message.
+ msg_->reset_flags (msg_t::more);
+
// At this point we impose the flags on the message.
if (flags_ & ZMQ_SNDMORE)
msg_->set_flags (msg_t::more);
@@ -857,15 +860,10 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)
void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
// Test whether IDENTITY flag is valid for this socket type.
- if (unlikely (msg_->flags () & msg_t::identity)) {
+ if (unlikely (msg_->flags () & msg_t::identity))
zmq_assert (options.recv_identity);
-printf ("identity recvd\n");
- }
-
// Remove MORE flag.
rcvmore = msg_->flags () & msg_t::more ? true : false;
- if (rcvmore)
- msg_->reset_flags (msg_t::more);
}
diff --git a/src/zmq.cpp b/src/zmq.cpp
index b06b122..84dcdd1 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -340,6 +340,25 @@ size_t zmq_msg_size (zmq_msg_t *msg_)
return ((zmq::msg_t*) msg_)->size ();
}
+int zmq_getmsgopt (zmq_msg_t *msg_, int option_, void *optval_,
+ size_t *optvallen_)
+{
+ switch (option_) {
+ case ZMQ_MORE:
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) =
+ (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more) ? 1 : 0;
+ *optvallen_ = sizeof (int);
+ return 0;
+ default:
+ errno = EINVAL;
+ return -1;
+ }
+}
+
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
#if defined ZMQ_POLL_BASED_ON_POLL
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 5f0cfc1..bbae270 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -8,7 +8,8 @@ noinst_PROGRAMS = test_pair_inproc \
test_hwm \
test_reqrep_device \
test_sub_forward \
- test_invalid_rep
+ test_invalid_rep \
+ test_msg_flags
if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \
@@ -25,6 +26,7 @@ test_hwm_SOURCES = test_hwm.cpp
test_reqrep_device_SOURCES = test_reqrep_device.cpp
test_sub_forward_SOURCES = test_sub_forward.cpp
test_invalid_rep_SOURCES = test_invalid_rep.cpp
+test_msg_flags_SOURCES = test_msg_flags.cpp
if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
diff --git a/tests/test_msg_flags.cpp b/tests/test_msg_flags.cpp
new file mode 100644
index 0000000..10fd526
--- /dev/null
+++ b/tests/test_msg_flags.cpp
@@ -0,0 +1,82 @@
+/*
+ Copyright (c) 2011 250bpm s.r.o.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <assert.h>
+#include <string.h>
+
+#include "../include/zmq.h"
+
+int main (int argc, char *argv [])
+{
+ // Create the infrastructure
+ void *ctx = zmq_init (0);
+ assert (ctx);
+ void *sb = zmq_socket (ctx, ZMQ_XREP);
+ assert (sb);
+ int rc = zmq_bind (sb, "inproc://a");
+ assert (rc == 0);
+ void *sc = zmq_socket (ctx, ZMQ_XREQ);
+ assert (sc);
+ rc = zmq_connect (sc, "inproc://a");
+ assert (rc == 0);
+
+ // Send 2-part message.
+ rc = zmq_send (sc, "A", 1, ZMQ_SNDMORE);
+ assert (rc == 1);
+ rc = zmq_send (sc, "B", 1, 0);
+ assert (rc == 1);
+
+ // Identity comes first.
+ zmq_msg_t msg;
+ rc = zmq_msg_init (&msg);
+ assert (rc == 0);
+ rc = zmq_recvmsg (sb, &msg, 0);
+ assert (rc >= 0);
+ int more;
+ size_t more_size = sizeof (more);
+ rc = zmq_getmsgopt (&msg, ZMQ_MORE, &more, &more_size);
+ assert (rc == 0);
+ assert (more == 1);
+
+ // Then the first part of the message body.
+ rc = zmq_recvmsg (sb, &msg, 0);
+ assert (rc == 1);
+ more_size = sizeof (more);
+ rc = zmq_getmsgopt (&msg, ZMQ_MORE, &more, &more_size);
+ assert (rc == 0);
+ assert (more == 1);
+
+ // And finally, the second part of the message body.
+ rc = zmq_recvmsg (sb, &msg, 0);
+ assert (rc == 1);
+ more_size = sizeof (more);
+ rc = zmq_getmsgopt (&msg, ZMQ_MORE, &more, &more_size);
+ assert (rc == 0);
+ assert (more == 0);
+
+ // Deallocate the infrastructure.
+ rc = zmq_close (sc);
+ assert (rc == 0);
+ rc = zmq_close (sb);
+ assert (rc == 0);
+ rc = zmq_term (ctx);
+ assert (rc == 0);
+ return 0 ;
+}
+