diff options
| -rw-r--r-- | .gitignore | 1 | ||||
| -rw-r--r-- | doc/Makefile.am | 2 | ||||
| -rw-r--r-- | doc/zmq_getmsgopt.txt | 85 | ||||
| -rw-r--r-- | include/zmq.h | 5 | ||||
| -rw-r--r-- | src/socket_base.cpp | 10 | ||||
| -rw-r--r-- | src/zmq.cpp | 19 | ||||
| -rw-r--r-- | tests/Makefile.am | 4 | ||||
| -rw-r--r-- | tests/test_msg_flags.cpp | 82 | 
8 files changed, 200 insertions, 8 deletions
| @@ -32,6 +32,7 @@ tests/test_reqrep_device  tests/test_reqrep_drop  tests/test_sub_forward  tests/test_invalid_rep +tests/test_msg_flags  src/platform.hpp*  src/stamp-h1  perf/local_lat diff --git a/doc/Makefile.am b/doc/Makefile.am index dae71be..ff00c18 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -3,7 +3,7 @@ MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_init.3 \      zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \      zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \      zmq_strerror.3 zmq_term.3 zmq_version.3 zmq_getsockopt.3 zmq_errno.3 \ -    zmq_sendmsg.3 zmq_recvmsg.3 +    zmq_sendmsg.3 zmq_recvmsg.3 zmq_getmsgopt.3  MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7  MAN_DOC = $(MAN1) $(MAN3) $(MAN7) diff --git a/doc/zmq_getmsgopt.txt b/doc/zmq_getmsgopt.txt new file mode 100644 index 0000000..a82c30c --- /dev/null +++ b/doc/zmq_getmsgopt.txt @@ -0,0 +1,85 @@ +zmq_getmsgopt(3) +================ + + +NAME +---- +zmq_getmsgopt - retrieve message option + + +SYNOPSIS +-------- +*int zmq_getmsgopt (zmq_msg_t '*message', int 'option_name', void '*option_value', size_t '*option_len');* + + +DESCRIPTION +----------- +The _zmq_getmsgopt()_ function shall retrieve the value for the option +specified by the 'option_name' argument for the message pointed to by the +'message' argument, and store it in the buffer pointed to by the 'option_value' +argument. The 'option_len' argument is the size in bytes of the buffer pointed +to by 'option_value'; upon successful completion _zmq_getsockopt()_ shall +modify the 'option_len' argument to indicate the actual size of the option +value stored in the buffer. + +The following options can be retrieved with the _zmq_getmsgopt()_ function: + +*ZMQ_MORE*:: +Indicates that there are more message parts to follow after the 'message'. + +RETURN VALUE +------------ +The _zmq_getmsgopt()_ function shall return zero if successful. Otherwise it +shall return `-1` and set 'errno' to one of the values defined below. + + +ERRORS +------ +*EINVAL*:: +The requested option _option_name_ is unknown, or the requested _option_size_ or +_option_value_ is invalid, or the size of the buffer pointed to by +_option_value_, as specified by _option_len_, is insufficient for storing the +option value. + + +EXAMPLE +------- +.Receiving a multi-part message +---- +zmq_msg_t part; +int more; +size_t more_size = sizeof (more); +while (true) { +    /* Create an empty 0MQ message to hold the message part */ +    int rc = zmq_msg_init (&part); +    assert (rc == 0); +    /* Block until a message is available to be received from socket */ +    rc = zmq_recvmsg (socket, &part, 0); +    assert (rc != -1); +    rc = getmsgopt (&part, ZMQ_MORE, &more, &more_size); +    assert (rc == 0); +    if (more) { +      fprintf (stderr, "more\n"); +    } +    else { +      fprintf (stderr, "end\n"); +      break; +    } +    zmq_msg_close (part); +} +---- + + +SEE ALSO +-------- +linkzmq:zmq_msg_data[3] +linkzmq:zmq_msg_init[3] +linkzmq:zmq_msg_init_size[3] +linkzmq:zmq_msg_init_data[3] +linkzmq:zmq_msg_close[3] +linkzmq:zmq[7] + + +AUTHORS +------- +The 0MQ documentation was written by Chuck Remes <cremes@mac.com>. diff --git a/include/zmq.h b/include/zmq.h index 0263b43..ac350d3 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -139,6 +139,8 @@ ZMQ_EXPORT int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);  ZMQ_EXPORT int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);  ZMQ_EXPORT void *zmq_msg_data (zmq_msg_t *msg);  ZMQ_EXPORT size_t zmq_msg_size (zmq_msg_t *msg); +ZMQ_EXPORT int zmq_getmsgopt (zmq_msg_t *msg, int option, void *optval, +    size_t *optvallen);  /******************************************************************************/  /*  0MQ infrastructure (a.k.a. context) initialisation & termination.         */ @@ -192,6 +194,9 @@ ZMQ_EXPORT int zmq_term (void *context);  #define ZMQ_SNDTIMEO 28  #define ZMQ_IPV4ONLY 31 +/*  Message options                                                           */ +#define ZMQ_MORE 1 +  /*  Send/recv options.                                                        */  #define ZMQ_DONTWAIT 1  #define ZMQ_SNDMORE 2 diff --git a/src/socket_base.cpp b/src/socket_base.cpp index ced28d4..302003f 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -485,6 +485,9 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)      if (unlikely (rc != 0))          return -1; +    //  Clear any user-visible flags that are set on the message. +    msg_->reset_flags (msg_t::more); +      //  At this point we impose the flags on the message.      if (flags_ & ZMQ_SNDMORE)          msg_->set_flags (msg_t::more); @@ -857,15 +860,10 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)  void zmq::socket_base_t::extract_flags (msg_t *msg_)  {      //  Test whether IDENTITY flag is valid for this socket type. -    if (unlikely (msg_->flags () & msg_t::identity)) { +    if (unlikely (msg_->flags () & msg_t::identity))          zmq_assert (options.recv_identity); -printf ("identity recvd\n"); -    } -      //  Remove MORE flag.      rcvmore = msg_->flags () & msg_t::more ? true : false; -    if (rcvmore) -        msg_->reset_flags (msg_t::more);  } diff --git a/src/zmq.cpp b/src/zmq.cpp index b06b122..84dcdd1 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -340,6 +340,25 @@ size_t zmq_msg_size (zmq_msg_t *msg_)      return ((zmq::msg_t*) msg_)->size ();  } +int zmq_getmsgopt (zmq_msg_t *msg_, int option_, void *optval_, +    size_t *optvallen_) +{ +    switch (option_) { +    case ZMQ_MORE: +        if (*optvallen_ < sizeof (int)) { +            errno = EINVAL; +            return -1; +        } +        *((int*) optval_) = +            (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more) ? 1 : 0; +        *optvallen_ = sizeof (int); +        return 0; +    default: +        errno = EINVAL; +        return -1; +    } +} +  int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)  {  #if defined ZMQ_POLL_BASED_ON_POLL diff --git a/tests/Makefile.am b/tests/Makefile.am index 5f0cfc1..bbae270 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -8,7 +8,8 @@ noinst_PROGRAMS = test_pair_inproc \                    test_hwm \                    test_reqrep_device \                    test_sub_forward \ -                  test_invalid_rep +                  test_invalid_rep \ +                  test_msg_flags  if !ON_MINGW  noinst_PROGRAMS += test_shutdown_stress \ @@ -25,6 +26,7 @@ test_hwm_SOURCES = test_hwm.cpp  test_reqrep_device_SOURCES = test_reqrep_device.cpp  test_sub_forward_SOURCES = test_sub_forward.cpp  test_invalid_rep_SOURCES = test_invalid_rep.cpp +test_msg_flags_SOURCES = test_msg_flags.cpp  if !ON_MINGW  test_shutdown_stress_SOURCES = test_shutdown_stress.cpp diff --git a/tests/test_msg_flags.cpp b/tests/test_msg_flags.cpp new file mode 100644 index 0000000..10fd526 --- /dev/null +++ b/tests/test_msg_flags.cpp @@ -0,0 +1,82 @@ +/* +    Copyright (c) 2011 250bpm s.r.o. + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <assert.h> +#include <string.h> + +#include "../include/zmq.h" + +int main (int argc, char *argv []) +{ +    //  Create the infrastructure +    void *ctx = zmq_init (0); +    assert (ctx); +    void *sb = zmq_socket (ctx, ZMQ_XREP); +    assert (sb); +    int rc = zmq_bind (sb, "inproc://a"); +    assert (rc == 0); +    void *sc = zmq_socket (ctx, ZMQ_XREQ); +    assert (sc); +    rc = zmq_connect (sc, "inproc://a"); +    assert (rc == 0); +    +    //  Send 2-part message. +    rc = zmq_send (sc, "A", 1, ZMQ_SNDMORE); +    assert (rc == 1); +    rc = zmq_send (sc, "B", 1, 0); +    assert (rc == 1); + +    //  Identity comes first. +    zmq_msg_t msg; +    rc = zmq_msg_init (&msg); +    assert (rc == 0); +    rc = zmq_recvmsg (sb, &msg, 0); +    assert (rc >= 0); +    int more; +    size_t more_size = sizeof (more); +    rc = zmq_getmsgopt (&msg, ZMQ_MORE, &more, &more_size); +    assert (rc == 0); +    assert (more == 1); + +    //  Then the first part of the message body. +    rc = zmq_recvmsg (sb, &msg, 0); +    assert (rc == 1); +    more_size = sizeof (more); +    rc = zmq_getmsgopt (&msg, ZMQ_MORE, &more, &more_size); +    assert (rc == 0); +    assert (more == 1); + +    //  And finally, the second part of the message body. +    rc = zmq_recvmsg (sb, &msg, 0); +    assert (rc == 1); +    more_size = sizeof (more); +    rc = zmq_getmsgopt (&msg, ZMQ_MORE, &more, &more_size); +    assert (rc == 0); +    assert (more == 0); + +    //  Deallocate the infrastructure. +    rc = zmq_close (sc); +    assert (rc == 0); +    rc = zmq_close (sb); +    assert (rc == 0); +    rc = zmq_term (ctx); +    assert (rc == 0); +    return 0 ; +} + | 
