diff options
| -rwxr-xr-x | .gitignore | 1 | ||||
| -rw-r--r-- | src/Makefile.am | 4 | ||||
| -rw-r--r-- | src/session_base.cpp | 38 | ||||
| -rw-r--r-- | src/socket_base.cpp | 6 | ||||
| -rw-r--r-- | src/udp_receiver.cpp | 211 | ||||
| -rw-r--r-- | src/udp_receiver.hpp | 101 | ||||
| -rw-r--r-- | src/udp_sender.cpp | 197 | ||||
| -rw-r--r-- | src/udp_sender.hpp | 97 | ||||
| -rw-r--r-- | tests/Makefile.am | 2 | ||||
| -rw-r--r-- | tests/pubsub_udp.cpp | 90 | 
10 files changed, 744 insertions, 3 deletions
| @@ -33,6 +33,7 @@ tests/pair_tcp  tests/reqrep_inproc  tests/reqrep_ipc  tests/reqrep_tcp +tests/pubsub_udp  tests/shutdown_stress  tests/hwm  tests/timeo diff --git a/src/Makefile.am b/src/Makefile.am index 1daee23..9ed62b1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -78,6 +78,8 @@ libxs_la_SOURCES = \      tcp_listener.hpp \      thread.hpp \      topic_filter.hpp \ +    udp_receiver.hpp \ +    udp_sender.hpp \      upoll.hpp \      windows.hpp \      wire.hpp \ @@ -138,6 +140,8 @@ libxs_la_SOURCES = \      tcp_listener.cpp \      thread.cpp \      topic_filter.cpp \ +    udp_receiver.cpp \ +    udp_sender.cpp \      upoll.cpp \      xpub.cpp \      xrep.cpp \ diff --git a/src/session_base.cpp b/src/session_base.cpp index 12d2e8d..81f7347 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -30,6 +30,8 @@  #include "ipc_connecter.hpp"  #include "pgm_sender.hpp"  #include "pgm_receiver.hpp" +#include "udp_sender.hpp" +#include "udp_receiver.hpp"  #include "req.hpp"  #include "xreq.hpp" @@ -484,6 +486,42 @@ void xs::session_base_t::start_connecting (bool wait_)      }  #endif +    //  UDP support. +    if (protocol == "udp") { + +        //  At this point we'll create message pipes to the session straight +        //  away. There's no point in delaying it as no concept of 'connect' +        //  exists with UDP anyway. +        if (options.type == XS_PUB || options.type == XS_XPUB) { + +            //  UDP sender. +            udp_sender_t *udp_sender =  new (std::nothrow) udp_sender_t ( +                io_thread, options); +            alloc_assert (udp_sender); + +            int rc = udp_sender->init (address.c_str ()); +            xs_assert (rc == 0); + +            send_attach (this, udp_sender); +        } +        else if (options.type == XS_SUB || options.type == XS_XSUB) { + +            //  UDP receiver. +            udp_receiver_t *udp_receiver =  new (std::nothrow) udp_receiver_t ( +                io_thread, options); +            alloc_assert (udp_receiver); + +            int rc = udp_receiver->init (address.c_str ()); +            xs_assert (rc == 0); + +            send_attach (this, udp_receiver); +        } +        else +            xs_assert (false); + +        return; +    } +      xs_assert (false);  } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 9de8735..389bfbc 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -208,7 +208,7 @@ int xs::socket_base_t::check_protocol (const std::string &protocol_)  {      //  First check out whether the protcol is something we are aware of.      if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && -          protocol_ != "pgm" && protocol_ != "epgm") { +          protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "udp") {          errno = EPROTONOSUPPORT;          return -1;      } @@ -234,7 +234,7 @@ int xs::socket_base_t::check_protocol (const std::string &protocol_)      //  Check whether socket type and transport protocol match.      //  Specifically, multicast protocols can't be combined with      //  bi-directional messaging patterns (socket types). -    if ((protocol_ == "pgm" || protocol_ == "epgm") && +    if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "udp") &&            options.type != XS_PUB && options.type != XS_SUB &&            options.type != XS_XPUB && options.type != XS_XSUB) {          errno = ENOCOMPATPROTO; @@ -359,7 +359,7 @@ int xs::socket_base_t::bind (const char *addr_)          return 0;      } -    if (protocol == "pgm" || protocol == "epgm") { +    if (protocol == "pgm" || protocol == "epgm" || protocol == "udp") {          //  For convenience's sake, bind can be used interchageable with          //  connect for PGM and EPGM transports. diff --git a/src/udp_receiver.cpp b/src/udp_receiver.cpp new file mode 100644 index 0000000..9cd3c4c --- /dev/null +++ b/src/udp_receiver.cpp @@ -0,0 +1,211 @@ +/* +    Copyright (c) 2012 Martin Lucina <martin@lucina.net> + +    This file is part of Crossroads I/O. + +    Crossroads I/O is free software; you can redistribute it and/or modify it +    under the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or (at your +    option) any later version. + +    Crossroads I/O is distributed in the hope that it will be useful, but +    WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +    or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public +    License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "platform.hpp" + +#include <new> + +#include "udp_receiver.hpp" +#include "session_base.hpp" +#include "stdint.hpp" +#include "wire.hpp" +#include "err.hpp" +#include "ip.hpp" + +xs::udp_receiver_t::udp_receiver_t (class io_thread_t *parent_, +      const options_t &options_) : +    io_object_t (parent_), +    options (options_), +    pending_bytes (0), +    pending_p (NULL), +    session (NULL), +    last_seq_no (0) +{ +    decoder = new (std::nothrow) decoder_t (in_batch_size, options.maxmsgsize); +    alloc_assert (decoder); +} + +xs::udp_receiver_t::~udp_receiver_t () +{ +    delete decoder; +} + +int xs::udp_receiver_t::init (const char *address_) +{ +    int rc; + +    rc = address_resolve_tcp (&address, address_, false, options.ipv4only); +    if (rc != 0) +        return rc; + +    //  Create a unconnected UDP socket, bind it to the requested address +    //  and make it non-blocking. +    socket = open_socket (address.ss_family, SOCK_DGRAM, IPPROTO_UDP); +    if (socket == -1) +        return -1; +    rc = ::bind (socket, (const sockaddr *)&address, address_size (&address)); +    if (rc != 0) +        return -1; +    unblock_socket (socket); + +    return 0; +} + +void xs::udp_receiver_t::plug (io_thread_t *io_thread_, +    session_base_t *session_) +{ +    //  Start polling. +    socket_handle = add_fd (socket); +    set_pollin (socket_handle); + +    session = session_; +    decoder->set_session (session); + +    //  If there are any subscriptions already queued in the session, drop them. +    drop_subscriptions (); +} + +void xs::udp_receiver_t::unplug () +{ +    rm_fd (socket_handle); + +    session = NULL; +} + +void xs::udp_receiver_t::terminate () +{ +    unplug (); +    delete this; +} + +//  Called when our pipe is reactivated (has more data for us). +void xs::udp_receiver_t::activate_out () +{ +    drop_subscriptions (); +} + +//  Called when our pipe is reactivated (able to accept more data). +void xs::udp_receiver_t::activate_in () +{ +    //  Process any pending data. +    if (pending_bytes > 0) { +        ssize_t processed_bytes = \ +            decoder->process_buffer (pending_p, pending_bytes); +        //  Flush any messages produced by the decoder to the pipe. +        session->flush (); +        if (processed_bytes < pending_bytes) { +            //  Some data (still) could not be written to the pipe. +            pending_bytes -= processed_bytes; +            pending_p += processed_bytes; +            //  Try again later. +            return; +        } +        //  Done with unprocessed data. +        pending_bytes = 0; +    } + +    //  Reactivate polling. +    set_pollin (socket_handle); + +    //  Read any data that might have showed up on the socket in the mean time. +    in_event (retired_fd); +} + +//  Called when POLLIN is fired on the socket. +void xs::udp_receiver_t::in_event (fd_t fd_) +{ +    //  Receive a packet. +    ssize_t recv_bytes = recv (socket, data, sizeof data, 0); +    //  At the moment, go back to polling on EAGAIN and assert on any +    //  other error. +    if ((recv_bytes < 0) && errno == EAGAIN) +        return; +    assert (recv_bytes > 0); + +    //  Parse UDP packet header. +    unsigned char *data_p = data; +    uint32_t seq_no = get_uint32 (data_p); +    uint16_t offset = get_uint16 (data_p + 4); +    data_p += udp_header_size; +    recv_bytes -= udp_header_size; + +    //  If this is our first packet, join the message stream. +    if (last_seq_no == 0) { +        if (offset == 0xffff) +            return; +        else { +            data_p += offset; +            recv_bytes -= offset; +        } +    } +    //  Otherwise, decide based on the sequence number. +    else { +        //  If this packet is in sequence, process the whole packet. +        if ((last_seq_no + 1) == seq_no) +            ; +        //  Otherwise, if it is an old packet, drop it. +        else if (seq_no <= last_seq_no) +            return; +        //  Otherwise we have packet loss, rejoin the message stream. +        else { +            if (offset == 0xffff) +                return; +            else { +                data_p += offset; +                recv_bytes -= offset; + +                //  Re-create decoder to clear state. +                delete decoder; +                decoder = NULL; +                decoder = new (std::nothrow) decoder_t (in_batch_size, +                    options.maxmsgsize); +                alloc_assert (decoder); +                decoder->set_session (session); +            } +        } +    } +    //  If we get here, we will process this packet and it becomes our +    //  last seen sequence number. +    last_seq_no = seq_no; + +    //  Decode data and push it to our pipe. +    ssize_t processed_bytes = decoder->process_buffer (data_p, recv_bytes); +    if (processed_bytes < recv_bytes) { +        //  Some data could not be written to the pipe. Save it for later. +        pending_bytes = recv_bytes - processed_bytes; +        pending_p = data_p + processed_bytes; +        //  Stop polling. We will be restarted by a call to activate_in (). +        reset_pollin (socket_handle); +    } + +    //  Flush any messages produced by the decoder to the pipe. +    session->flush (); +} + +void xs::udp_receiver_t::out_event (fd_t fd_) +{ +    assert (false); +} + +void xs::udp_receiver_t::drop_subscriptions () +{ +    msg_t msg; +    while (session->read (&msg)) +        msg.close (); +} diff --git a/src/udp_receiver.hpp b/src/udp_receiver.hpp new file mode 100644 index 0000000..114df24 --- /dev/null +++ b/src/udp_receiver.hpp @@ -0,0 +1,101 @@ +/* +    Copyright (c) 2012 Martin Lucina <martin@lucina.net> + +    This file is part of Crossroads I/O. + +    Crossroads I/O is free software; you can redistribute it and/or modify it +    under the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or (at your +    option) any later version. + +    Crossroads I/O is distributed in the hope that it will be useful, but +    WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +    or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public +    License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __XS_UDP_RECEIVER_HPP_INCLUDED__ +#define __XS_UDP_RECEIVER_HPP_INCLUDED__ + +#include "platform.hpp" + +#include "io_object.hpp" +#include "i_engine.hpp" +#include "options.hpp" +#include "decoder.hpp" +#include "address.hpp" + +namespace xs +{ + +    class io_thread_t; +    class session_base_t; + +    class udp_receiver_t : public io_object_t, public i_engine +    { + +    public: + +        udp_receiver_t (xs::io_thread_t *parent_, const options_t &options_); +        ~udp_receiver_t (); + +        int init (const char *address_); + +        //  i_engine interface implementation. +        void plug (xs::io_thread_t *io_thread_, +            xs::session_base_t *session_); +        void unplug (); +        void terminate (); +        void activate_in (); +        void activate_out (); + +        //  i_poll_events interface implementation. +        void in_event (fd_t fd_); +        void out_event (fd_t fd_); + +    private: + +        //  We don't support forwarding subscriptions upstream (yet). +        void drop_subscriptions (); + +        //  Underlying UDP socket. +        fd_t socket; +        handle_t socket_handle; + +        //  Socket address. +        address_t address; + +        //  Socket options. +        options_t options; + +        //  Decoder for this socket. +        decoder_t *decoder; + +        //  Amount of unprocessed data waiting in decoder, if any. +        ssize_t pending_bytes; + +        //  Pointer to unprocessed data in buffer, if any. +        unsigned char *pending_p; + +        //  Buffer for data to process. +        unsigned char data [pgm_max_tpdu]; + +        //  Associated session. +        xs::session_base_t *session; + +        //  UDP header size. +        static const size_t udp_header_size = 6; + +        //  Last sequence number seen, 0 if none. +        uint32_t last_seq_no; + +        udp_receiver_t (const udp_receiver_t&); +        const udp_receiver_t &operator = (const udp_receiver_t&); +    }; + +} + +#endif diff --git a/src/udp_sender.cpp b/src/udp_sender.cpp new file mode 100644 index 0000000..ddcffbb --- /dev/null +++ b/src/udp_sender.cpp @@ -0,0 +1,197 @@ +/* +    Copyright (c) 2012 Martin Lucina <martin@lucina.net> + +    This file is part of Crossroads I/O. + +    Crossroads I/O is free software; you can redistribute it and/or modify it +    under the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or (at your +    option) any later version. + +    Crossroads I/O is distributed in the hope that it will be useful, but +    WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +    or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public +    License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "platform.hpp" + +#include <stdlib.h> + +#include "io_thread.hpp" +#include "udp_sender.hpp" +#include "session_base.hpp" +#include "err.hpp" +#include "wire.hpp" +#include "stdint.hpp" +#include "ip.hpp" + +xs::udp_sender_t::udp_sender_t (io_thread_t *parent_, +      const options_t &options_) : +    io_object_t (parent_), +    options (options_), +    encoder (0), +    seq_no (1), +    backoff_timer (NULL) +{ +} + +xs::udp_sender_t::~udp_sender_t () +{ +    if (backoff_timer) { +        rm_timer (backoff_timer); +    } +} + +int xs::udp_sender_t::init (const char *address_) +{ +    int rc; + +    rc = address_resolve_tcp (&address, address_, false, options.ipv4only); +    if (rc != 0) +        return rc; + +    //  Create a connected UDP socket for a single peer, and make +    //  it non-blocking. +    socket = open_socket (address.ss_family, SOCK_DGRAM, IPPROTO_UDP); +    if (socket == -1) +        return -1; +    rc = ::connect (socket, (const sockaddr *)&address, +        address_size (&address)); +    if (rc != 0) +        return -1; +    unblock_socket (socket); + +    return 0; +} + +void xs::udp_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) +{ +    session = session_; +    encoder.set_session (session); + +    //  Start polling. +    socket_handle = add_fd (socket); +    set_pollout (socket_handle); + +    //  UDP cannot pass subscriptions upstream; for now just fake subscribing +    //  to all messages. +    msg_t msg; +    msg.init_size (1); +    *(unsigned char*) msg.data () = 1; +    int rc = session_->write (&msg); +    errno_assert (rc == 0); +    session_->flush (); +} + +void xs::udp_sender_t::unplug () +{ +    rm_fd (socket_handle); + +    session = NULL; +} + +void xs::udp_sender_t::terminate () +{ +    unplug (); +    delete this; +} + +//  Called when our pipe is reactivated (has more data for us). +void xs::udp_sender_t::activate_out () +{ +    //  Reactivate polling. +    set_pollout (socket_handle); +    //  Try and send more data. +    out_event (retired_fd); +} + +void xs::udp_sender_t::activate_in () +{ +    xs_assert (false); +} + +//  Called when POLLERR is fired on the socket. +void xs::udp_sender_t::in_event (fd_t fd_) +{ +    //  Get the actual error code from the socket. +    int err = 0; +    socklen_t len = sizeof (err); +    int rc = getsockopt (socket, SOL_SOCKET, SO_ERROR, (char *) &err, &len); +    errno_assert (rc == 0); + +    //  If we got ECONNREFUSED, there is no one on the other end. +    //  Stop sending for XS_RECONNECT_IVL milliseconds. +    if (err == ECONNREFUSED) { +        backoff_timer = add_timer (options.reconnect_ivl); +        //  Polling will be re-started by timer_event (). +        reset_pollout (socket_handle); +        return; +    } + +    //  Any other error is a bug, for now. +    errno = err; +    errno_assert (false); +} + +//  Called when POLLOUT is fired on the socket. +void xs::udp_sender_t::out_event (fd_t fd_) +{ +    unsigned char *data_p = data; +    size_t size = pgm_max_tpdu - udp_header_size; +    int offset = 0; + +    //  Get one packet of data from the encoder. +    encoder.get_data (&data_p, &size, &offset); +    //  If there is no data in the pipe +    if (size == 0) { +        //  Stop polling, we will be re-started by activate_out (). +        reset_pollout (socket_handle); +        return; +    } +    assert ((size + udp_header_size) <= pgm_max_tpdu); + +    //  Prepare UDP header (seqno, offset). +    unsigned char udp_header [udp_header_size]; +    put_uint32 (udp_header, seq_no); +    put_uint16 (udp_header + 4, (uint16_t) offset); + +    //  Send out the message, ensuring the whole message was sent. +    struct iovec iov[2] = { +        { udp_header, udp_header_size }, +        { data_p, size } +    }; +    ssize_t write_bytes = writev (socket, iov, 2); + +    //  If we got ECONNREFUSED, there is no one on the other end. +    //  Drop the packet and back off for XS_RECONNECT_IVL milliseconds. +    if (write_bytes == -1 && errno == ECONNREFUSED) { +        backoff_timer = add_timer (options.reconnect_ivl); +        //  Polling will be re-started by timer_event (). +        reset_pollout (socket_handle); +        //  Increase sequence number to ensure other end is notified of +        //  dropped packet. +        ++seq_no; +        return; +    } +    errno_assert (write_bytes > 0); +    assert ((size_t)write_bytes == (udp_header_size + size)); + +    //  Packet was sent successfully, increase sequence number. +    ++seq_no; +} + +//  Called when our backoff timer expires. +void xs::udp_sender_t::timer_event (handle_t handle_) +{ +    xs_assert (handle_ == backoff_timer); +    backoff_timer = NULL; + +    //  Re-start polling. +    set_pollout (socket_handle); + +    out_event (retired_fd); +} diff --git a/src/udp_sender.hpp b/src/udp_sender.hpp new file mode 100644 index 0000000..e7c2472 --- /dev/null +++ b/src/udp_sender.hpp @@ -0,0 +1,97 @@ +/* +    Copyright (c) 2012 Martin Lucina <martin@lucina.net> + +    This file is part of Crossroads I/O. + +    Crossroads I/O is free software; you can redistribute it and/or modify it +    under the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or (at your +    option) any later version. + +    Crossroads I/O is distributed in the hope that it will be useful, but +    WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +    or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public +    License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __XS_UDP_SENDER_HPP_INCLUDED__ +#define __XS_UDP_SENDER_HPP_INCLUDED__ + +#include "platform.hpp" + +#include "stdint.hpp" +#include "io_object.hpp" +#include "i_engine.hpp" +#include "options.hpp" +#include "address.hpp" +#include "encoder.hpp" + +namespace xs +{ + +    class io_thread_t; +    class session_base_t; + +    class udp_sender_t : public io_object_t, public i_engine +    { + +    public: + +        udp_sender_t (xs::io_thread_t *parent_, const options_t &options_); +        ~udp_sender_t (); + +        int init (const char *address_); + +        //  i_engine interface implementation. +        void plug (xs::io_thread_t *io_thread_, +            xs::session_base_t *session_); +        void unplug (); +        void terminate (); +        void activate_in (); +        void activate_out (); + +        //  i_poll_events interface implementation. +        void in_event (fd_t fd_); +        void out_event (fd_t fd_); +        void timer_event (handle_t handle_); + +    private: + +        //  Underlying UDP socket. +        fd_t socket; +        handle_t socket_handle; + +        //  Socket address. +        address_t address; + +        //  Socket options. +        options_t options; + +        //  Encoder for this socket. +        encoder_t encoder; + +        //  Associated session. +        xs::session_base_t *session; + +        //  UDP packet header size. +        static const size_t udp_header_size = 6; + +        //  Packet buffer for outgoing packets. +        unsigned char data [pgm_max_tpdu]; + +        //  Sequence number for outgoing packets. +        uint32_t seq_no; + +        //  Backoff timer if receiver is not present. +        handle_t backoff_timer; + +        udp_sender_t (const udp_sender_t&); +        const udp_sender_t &operator = (const udp_sender_t&); +    }; + +} + +#endif diff --git a/tests/Makefile.am b/tests/Makefile.am index 27ca5dd..52e8e1c 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -7,6 +7,7 @@ noinst_PROGRAMS = pair_inproc \                    pair_tcp \                    reqrep_inproc \                    reqrep_tcp \ +                  pubsub_udp \                    hwm \                    reqrep_device \                    sub_forward \ @@ -32,6 +33,7 @@ pair_inproc_SOURCES = pair_inproc.cpp testutil.hpp  pair_tcp_SOURCES = pair_tcp.cpp testutil.hpp  reqrep_inproc_SOURCES = reqrep_inproc.cpp testutil.hpp  reqrep_tcp_SOURCES = reqrep_tcp.cpp testutil.hpp +pubsub_udp_SOURCES = pubsub_udp.cpp testutil.hpp  hwm_SOURCES = hwm.cpp  reqrep_device_SOURCES = reqrep_device.cpp  sub_forward_SOURCES = sub_forward.cpp diff --git a/tests/pubsub_udp.cpp b/tests/pubsub_udp.cpp new file mode 100644 index 0000000..e981457 --- /dev/null +++ b/tests/pubsub_udp.cpp @@ -0,0 +1,90 @@ +/* +    Copyright (c) 2012 Martin Lucina <martin@lucina.net> + +    This file is part of Crossroads I/O. + +    Crossroads I/O is free software; you can redistribute it and/or modify it +    under the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or (at your +    option) any later version. + +    Crossroads I/O is distributed in the hope that it will be useful, but +    WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +    or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public +    License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "testutil.hpp" + +int XS_TEST_MAIN () +{ +    fprintf (stderr, "pubsub_udp test running...\n"); + +    void *ctx = xs_init (); +    assert (ctx); + +    void *pub = xs_socket (ctx, XS_PUB); +    assert (pub); +    int rc = xs_bind (pub, "udp://127.0.0.1:5555"); +    assert (rc != -1); + +    void *sub = xs_socket (ctx, XS_SUB); +    assert (sub); +    rc = xs_connect (sub, "udp://127.0.0.1:5555"); +    assert (rc != -1); +    rc = xs_setsockopt (sub, XS_SUBSCRIBE, "", 0); +    assert (rc == 0); +     +    const char *content = "12345678ABCDEFGH12345678abcdefgh"; + +    //  TODO: Due to a core bug, the first message on a PUB/SUB socket +    //  is always lost. For now just send a dummy message. +    rc = xs_send (pub, "", 0, 0); +    assert (rc == 0); + +    //  Send a message with two identical parts. +    rc = xs_send (pub, content, 32, XS_SNDMORE); +    assert (rc == 32); +    rc = xs_send (pub, content, 32, 0); +    assert (rc == 32); +     +    //  Receive the first part. +    char rcvbuf [32]; +    int rcvmore = 0; +    size_t rcvmore_sz = sizeof rcvmore; +    rc = xs_recv (sub, rcvbuf, 32, 0); +    assert (rc == 32); +    rc = xs_getsockopt (sub, XS_RCVMORE, &rcvmore, &rcvmore_sz); +    assert (rc == 0); + +    //  There must be one more part to receive. +    assert (rcvmore); +    //  And the content must match what was sent. +    assert (memcmp (rcvbuf, content, 32) == 0); + +    //  Receive the second part. +    rc = xs_recv (sub, rcvbuf, 32, 0); +    assert (rc == 32); +    rcvmore_sz = sizeof rcvmore; +    rc = xs_getsockopt (sub, XS_RCVMORE, &rcvmore, &rcvmore_sz); +    assert (rc == 0); + +    //  There must not be another part. +    assert (!rcvmore); +    //  And the content must match what was sent. +    assert (memcmp (rcvbuf, content, 32) == 0); + +    rc = xs_close (pub); +    assert (rc == 0); + +    rc = xs_close (sub); +    assert (rc == 0); + +    rc = xs_term (ctx); +    assert (rc == 0); + +    return 0 ; +} | 
