From 93529d8c5db599a45171942c4510f1b84ed09e6a Mon Sep 17 00:00:00 2001 From: Chuck Remes Date: Sun, 6 Nov 2011 14:03:51 +0100 Subject: Add zmq_getmsgopt to the API The new function allows to retrieve options (flags) from zmq_msg_t. Signed-off-by: Chuck Remes Renamed from zmq_msg_flags to zmq_getmsgopt Signed-off-by: Martin Sustrik --- .gitignore | 1 + doc/Makefile.am | 2 +- doc/zmq_getmsgopt.txt | 85 ++++++++++++++++++++++++++++++++++++++++++++++++ include/zmq.h | 5 +++ src/socket_base.cpp | 10 +++--- src/zmq.cpp | 19 +++++++++++ tests/Makefile.am | 4 ++- tests/test_msg_flags.cpp | 82 ++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 200 insertions(+), 8 deletions(-) create mode 100644 doc/zmq_getmsgopt.txt create mode 100644 tests/test_msg_flags.cpp diff --git a/.gitignore b/.gitignore index eb8da57..3bc8348 100644 --- a/.gitignore +++ b/.gitignore @@ -32,6 +32,7 @@ tests/test_reqrep_device tests/test_reqrep_drop tests/test_sub_forward tests/test_invalid_rep +tests/test_msg_flags src/platform.hpp* src/stamp-h1 perf/local_lat diff --git a/doc/Makefile.am b/doc/Makefile.am index dae71be..ff00c18 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -3,7 +3,7 @@ MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_init.3 \ zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \ zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \ zmq_strerror.3 zmq_term.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \ - zmq_sendmsg.3 zmq_recvmsg.3 + zmq_sendmsg.3 zmq_recvmsg.3 zmq_getmsgopt.3 MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7 MAN_DOC = $(MAN1) $(MAN3) $(MAN7) diff --git a/doc/zmq_getmsgopt.txt b/doc/zmq_getmsgopt.txt new file mode 100644 index 0000000..a82c30c --- /dev/null +++ b/doc/zmq_getmsgopt.txt @@ -0,0 +1,85 @@ +zmq_getmsgopt(3) +================ + + +NAME +---- +zmq_getmsgopt - retrieve message option + + +SYNOPSIS +-------- +*int zmq_getmsgopt (zmq_msg_t '*message', int 'option_name', void '*option_value', size_t '*option_len');* + + +DESCRIPTION +----------- +The _zmq_getmsgopt()_ function shall retrieve the value for the option +specified by the 'option_name' argument for the message pointed to by the +'message' argument, and store it in the buffer pointed to by the 'option_value' +argument. The 'option_len' argument is the size in bytes of the buffer pointed +to by 'option_value'; upon successful completion _zmq_getsockopt()_ shall +modify the 'option_len' argument to indicate the actual size of the option +value stored in the buffer. + +The following options can be retrieved with the _zmq_getmsgopt()_ function: + +*ZMQ_MORE*:: +Indicates that there are more message parts to follow after the 'message'. + +RETURN VALUE +------------ +The _zmq_getmsgopt()_ function shall return zero if successful. Otherwise it +shall return `-1` and set 'errno' to one of the values defined below. + + +ERRORS +------ +*EINVAL*:: +The requested option _option_name_ is unknown, or the requested _option_size_ or +_option_value_ is invalid, or the size of the buffer pointed to by +_option_value_, as specified by _option_len_, is insufficient for storing the +option value. + + +EXAMPLE +------- +.Receiving a multi-part message +---- +zmq_msg_t part; +int more; +size_t more_size = sizeof (more); +while (true) { + /* Create an empty 0MQ message to hold the message part */ + int rc = zmq_msg_init (&part); + assert (rc == 0); + /* Block until a message is available to be received from socket */ + rc = zmq_recvmsg (socket, &part, 0); + assert (rc != -1); + rc = getmsgopt (&part, ZMQ_MORE, &more, &more_size); + assert (rc == 0); + if (more) { + fprintf (stderr, "more\n"); + } + else { + fprintf (stderr, "end\n"); + break; + } + zmq_msg_close (part); +} +---- + + +SEE ALSO +-------- +linkzmq:zmq_msg_data[3] +linkzmq:zmq_msg_init[3] +linkzmq:zmq_msg_init_size[3] +linkzmq:zmq_msg_init_data[3] +linkzmq:zmq_msg_close[3] +linkzmq:zmq[7] + + +AUTHORS +------- +The 0MQ documentation was written by Chuck Remes . diff --git a/include/zmq.h b/include/zmq.h index 0263b43..ac350d3 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -139,6 +139,8 @@ ZMQ_EXPORT int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src); ZMQ_EXPORT int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src); ZMQ_EXPORT void *zmq_msg_data (zmq_msg_t *msg); ZMQ_EXPORT size_t zmq_msg_size (zmq_msg_t *msg); +ZMQ_EXPORT int zmq_getmsgopt (zmq_msg_t *msg, int option, void *optval, + size_t *optvallen); /******************************************************************************/ /* 0MQ infrastructure (a.k.a. context) initialisation & termination. */ @@ -192,6 +194,9 @@ ZMQ_EXPORT int zmq_term (void *context); #define ZMQ_SNDTIMEO 28 #define ZMQ_IPV4ONLY 31 +/* Message options */ +#define ZMQ_MORE 1 + /* Send/recv options. */ #define ZMQ_DONTWAIT 1 #define ZMQ_SNDMORE 2 diff --git a/src/socket_base.cpp b/src/socket_base.cpp index ced28d4..302003f 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -485,6 +485,9 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) if (unlikely (rc != 0)) return -1; + // Clear any user-visible flags that are set on the message. + msg_->reset_flags (msg_t::more); + // At this point we impose the flags on the message. if (flags_ & ZMQ_SNDMORE) msg_->set_flags (msg_t::more); @@ -857,15 +860,10 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_) void zmq::socket_base_t::extract_flags (msg_t *msg_) { // Test whether IDENTITY flag is valid for this socket type. - if (unlikely (msg_->flags () & msg_t::identity)) { + if (unlikely (msg_->flags () & msg_t::identity)) zmq_assert (options.recv_identity); -printf ("identity recvd\n"); - } - // Remove MORE flag. rcvmore = msg_->flags () & msg_t::more ? true : false; - if (rcvmore) - msg_->reset_flags (msg_t::more); } diff --git a/src/zmq.cpp b/src/zmq.cpp index b06b122..84dcdd1 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -340,6 +340,25 @@ size_t zmq_msg_size (zmq_msg_t *msg_) return ((zmq::msg_t*) msg_)->size (); } +int zmq_getmsgopt (zmq_msg_t *msg_, int option_, void *optval_, + size_t *optvallen_) +{ + switch (option_) { + case ZMQ_MORE: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = + (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more) ? 1 : 0; + *optvallen_ = sizeof (int); + return 0; + default: + errno = EINVAL; + return -1; + } +} + int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) { #if defined ZMQ_POLL_BASED_ON_POLL diff --git a/tests/Makefile.am b/tests/Makefile.am index 5f0cfc1..bbae270 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -8,7 +8,8 @@ noinst_PROGRAMS = test_pair_inproc \ test_hwm \ test_reqrep_device \ test_sub_forward \ - test_invalid_rep + test_invalid_rep \ + test_msg_flags if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -25,6 +26,7 @@ test_hwm_SOURCES = test_hwm.cpp test_reqrep_device_SOURCES = test_reqrep_device.cpp test_sub_forward_SOURCES = test_sub_forward.cpp test_invalid_rep_SOURCES = test_invalid_rep.cpp +test_msg_flags_SOURCES = test_msg_flags.cpp if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp diff --git a/tests/test_msg_flags.cpp b/tests/test_msg_flags.cpp new file mode 100644 index 0000000..10fd526 --- /dev/null +++ b/tests/test_msg_flags.cpp @@ -0,0 +1,82 @@ +/* + Copyright (c) 2011 250bpm s.r.o. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include + +#include "../include/zmq.h" + +int main (int argc, char *argv []) +{ + // Create the infrastructure + void *ctx = zmq_init (0); + assert (ctx); + void *sb = zmq_socket (ctx, ZMQ_XREP); + assert (sb); + int rc = zmq_bind (sb, "inproc://a"); + assert (rc == 0); + void *sc = zmq_socket (ctx, ZMQ_XREQ); + assert (sc); + rc = zmq_connect (sc, "inproc://a"); + assert (rc == 0); + + // Send 2-part message. + rc = zmq_send (sc, "A", 1, ZMQ_SNDMORE); + assert (rc == 1); + rc = zmq_send (sc, "B", 1, 0); + assert (rc == 1); + + // Identity comes first. + zmq_msg_t msg; + rc = zmq_msg_init (&msg); + assert (rc == 0); + rc = zmq_recvmsg (sb, &msg, 0); + assert (rc >= 0); + int more; + size_t more_size = sizeof (more); + rc = zmq_getmsgopt (&msg, ZMQ_MORE, &more, &more_size); + assert (rc == 0); + assert (more == 1); + + // Then the first part of the message body. + rc = zmq_recvmsg (sb, &msg, 0); + assert (rc == 1); + more_size = sizeof (more); + rc = zmq_getmsgopt (&msg, ZMQ_MORE, &more, &more_size); + assert (rc == 0); + assert (more == 1); + + // And finally, the second part of the message body. + rc = zmq_recvmsg (sb, &msg, 0); + assert (rc == 1); + more_size = sizeof (more); + rc = zmq_getmsgopt (&msg, ZMQ_MORE, &more, &more_size); + assert (rc == 0); + assert (more == 0); + + // Deallocate the infrastructure. + rc = zmq_close (sc); + assert (rc == 0); + rc = zmq_close (sb); + assert (rc == 0); + rc = zmq_term (ctx); + assert (rc == 0); + return 0 ; +} + -- cgit v1.2.3