diff options
author | Martin Lucina <martin@lucina.net> | 2012-05-20 10:40:36 +0200 |
---|---|---|
committer | Martin Lucina <martin@lucina.net> | 2012-05-20 10:40:36 +0200 |
commit | 2601b302685c48e6aa8f507d200f148397811fae (patch) | |
tree | 5103e3fab82394585df96be9892600ecfd55accc | |
parent | 1d76284dee8e9b0735a26ee98a3edcd9f5208f09 (diff) |
[WIP] UDP supportudp
Signed-off-by: Martin Lucina <martin@lucina.net>
-rwxr-xr-x | .gitignore | 1 | ||||
-rw-r--r-- | src/Makefile.am | 4 | ||||
-rw-r--r-- | src/session_base.cpp | 38 | ||||
-rw-r--r-- | src/socket_base.cpp | 6 | ||||
-rw-r--r-- | src/udp_receiver.cpp | 211 | ||||
-rw-r--r-- | src/udp_receiver.hpp | 101 | ||||
-rw-r--r-- | src/udp_sender.cpp | 197 | ||||
-rw-r--r-- | src/udp_sender.hpp | 97 | ||||
-rw-r--r-- | tests/Makefile.am | 2 | ||||
-rw-r--r-- | tests/pubsub_udp.cpp | 90 |
10 files changed, 744 insertions, 3 deletions
@@ -33,6 +33,7 @@ tests/pair_tcp tests/reqrep_inproc tests/reqrep_ipc tests/reqrep_tcp +tests/pubsub_udp tests/shutdown_stress tests/hwm tests/timeo diff --git a/src/Makefile.am b/src/Makefile.am index 033ba52..33647b5 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -77,6 +77,8 @@ libxs_la_SOURCES = \ tcp_connecter.hpp \ tcp_listener.hpp \ thread.hpp \ + udp_receiver.hpp \ + udp_sender.hpp \ upoll.hpp \ windows.hpp \ wire.hpp \ @@ -136,6 +138,8 @@ libxs_la_SOURCES = \ tcp_connecter.cpp \ tcp_listener.cpp \ thread.cpp \ + udp_receiver.cpp \ + udp_sender.cpp \ upoll.cpp \ xpub.cpp \ xrep.cpp \ diff --git a/src/session_base.cpp b/src/session_base.cpp index da9ee8e..f8a1cd1 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -30,6 +30,8 @@ #include "ipc_connecter.hpp" #include "pgm_sender.hpp" #include "pgm_receiver.hpp" +#include "udp_sender.hpp" +#include "udp_receiver.hpp" #include "req.hpp" #include "xreq.hpp" @@ -484,6 +486,42 @@ void xs::session_base_t::start_connecting (bool wait_) } #endif + // UDP support. + if (protocol == "udp") { + + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with UDP anyway. + if (options.type == XS_PUB || options.type == XS_XPUB) { + + // UDP sender. + udp_sender_t *udp_sender = new (std::nothrow) udp_sender_t ( + io_thread, options); + alloc_assert (udp_sender); + + int rc = udp_sender->init (address.c_str ()); + xs_assert (rc == 0); + + send_attach (this, udp_sender); + } + else if (options.type == XS_SUB || options.type == XS_XSUB) { + + // UDP receiver. + udp_receiver_t *udp_receiver = new (std::nothrow) udp_receiver_t ( + io_thread, options); + alloc_assert (udp_receiver); + + int rc = udp_receiver->init (address.c_str ()); + xs_assert (rc == 0); + + send_attach (this, udp_receiver); + } + else + xs_assert (false); + + return; + } + xs_assert (false); } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 960bd2c..2442e72 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -208,7 +208,7 @@ int xs::socket_base_t::check_protocol (const std::string &protocol_) { // First check out whether the protcol is something we are aware of. if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && - protocol_ != "pgm" && protocol_ != "epgm") { + protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "udp") { errno = EPROTONOSUPPORT; return -1; } @@ -234,7 +234,7 @@ int xs::socket_base_t::check_protocol (const std::string &protocol_) // Check whether socket type and transport protocol match. // Specifically, multicast protocols can't be combined with // bi-directional messaging patterns (socket types). - if ((protocol_ == "pgm" || protocol_ == "epgm") && + if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "udp") && options.type != XS_PUB && options.type != XS_SUB && options.type != XS_XPUB && options.type != XS_XSUB) { errno = ENOCOMPATPROTO; @@ -359,7 +359,7 @@ int xs::socket_base_t::bind (const char *addr_) return 0; } - if (protocol == "pgm" || protocol == "epgm") { + if (protocol == "pgm" || protocol == "epgm" || protocol == "udp") { // For convenience's sake, bind can be used interchageable with // connect for PGM and EPGM transports. diff --git a/src/udp_receiver.cpp b/src/udp_receiver.cpp new file mode 100644 index 0000000..9cd3c4c --- /dev/null +++ b/src/udp_receiver.cpp @@ -0,0 +1,211 @@ +/* + Copyright (c) 2012 Martin Lucina <martin@lucina.net> + + This file is part of Crossroads I/O. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or (at your + option) any later version. + + Crossroads I/O is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "platform.hpp" + +#include <new> + +#include "udp_receiver.hpp" +#include "session_base.hpp" +#include "stdint.hpp" +#include "wire.hpp" +#include "err.hpp" +#include "ip.hpp" + +xs::udp_receiver_t::udp_receiver_t (class io_thread_t *parent_, + const options_t &options_) : + io_object_t (parent_), + options (options_), + pending_bytes (0), + pending_p (NULL), + session (NULL), + last_seq_no (0) +{ + decoder = new (std::nothrow) decoder_t (in_batch_size, options.maxmsgsize); + alloc_assert (decoder); +} + +xs::udp_receiver_t::~udp_receiver_t () +{ + delete decoder; +} + +int xs::udp_receiver_t::init (const char *address_) +{ + int rc; + + rc = address_resolve_tcp (&address, address_, false, options.ipv4only); + if (rc != 0) + return rc; + + // Create a unconnected UDP socket, bind it to the requested address + // and make it non-blocking. + socket = open_socket (address.ss_family, SOCK_DGRAM, IPPROTO_UDP); + if (socket == -1) + return -1; + rc = ::bind (socket, (const sockaddr *)&address, address_size (&address)); + if (rc != 0) + return -1; + unblock_socket (socket); + + return 0; +} + +void xs::udp_receiver_t::plug (io_thread_t *io_thread_, + session_base_t *session_) +{ + // Start polling. + socket_handle = add_fd (socket); + set_pollin (socket_handle); + + session = session_; + decoder->set_session (session); + + // If there are any subscriptions already queued in the session, drop them. + drop_subscriptions (); +} + +void xs::udp_receiver_t::unplug () +{ + rm_fd (socket_handle); + + session = NULL; +} + +void xs::udp_receiver_t::terminate () +{ + unplug (); + delete this; +} + +// Called when our pipe is reactivated (has more data for us). +void xs::udp_receiver_t::activate_out () +{ + drop_subscriptions (); +} + +// Called when our pipe is reactivated (able to accept more data). +void xs::udp_receiver_t::activate_in () +{ + // Process any pending data. + if (pending_bytes > 0) { + ssize_t processed_bytes = \ + decoder->process_buffer (pending_p, pending_bytes); + // Flush any messages produced by the decoder to the pipe. + session->flush (); + if (processed_bytes < pending_bytes) { + // Some data (still) could not be written to the pipe. + pending_bytes -= processed_bytes; + pending_p += processed_bytes; + // Try again later. + return; + } + // Done with unprocessed data. + pending_bytes = 0; + } + + // Reactivate polling. + set_pollin (socket_handle); + + // Read any data that might have showed up on the socket in the mean time. + in_event (retired_fd); +} + +// Called when POLLIN is fired on the socket. +void xs::udp_receiver_t::in_event (fd_t fd_) +{ + // Receive a packet. + ssize_t recv_bytes = recv (socket, data, sizeof data, 0); + // At the moment, go back to polling on EAGAIN and assert on any + // other error. + if ((recv_bytes < 0) && errno == EAGAIN) + return; + assert (recv_bytes > 0); + + // Parse UDP packet header. + unsigned char *data_p = data; + uint32_t seq_no = get_uint32 (data_p); + uint16_t offset = get_uint16 (data_p + 4); + data_p += udp_header_size; + recv_bytes -= udp_header_size; + + // If this is our first packet, join the message stream. + if (last_seq_no == 0) { + if (offset == 0xffff) + return; + else { + data_p += offset; + recv_bytes -= offset; + } + } + // Otherwise, decide based on the sequence number. + else { + // If this packet is in sequence, process the whole packet. + if ((last_seq_no + 1) == seq_no) + ; + // Otherwise, if it is an old packet, drop it. + else if (seq_no <= last_seq_no) + return; + // Otherwise we have packet loss, rejoin the message stream. + else { + if (offset == 0xffff) + return; + else { + data_p += offset; + recv_bytes -= offset; + + // Re-create decoder to clear state. + delete decoder; + decoder = NULL; + decoder = new (std::nothrow) decoder_t (in_batch_size, + options.maxmsgsize); + alloc_assert (decoder); + decoder->set_session (session); + } + } + } + // If we get here, we will process this packet and it becomes our + // last seen sequence number. + last_seq_no = seq_no; + + // Decode data and push it to our pipe. + ssize_t processed_bytes = decoder->process_buffer (data_p, recv_bytes); + if (processed_bytes < recv_bytes) { + // Some data could not be written to the pipe. Save it for later. + pending_bytes = recv_bytes - processed_bytes; + pending_p = data_p + processed_bytes; + // Stop polling. We will be restarted by a call to activate_in (). + reset_pollin (socket_handle); + } + + // Flush any messages produced by the decoder to the pipe. + session->flush (); +} + +void xs::udp_receiver_t::out_event (fd_t fd_) +{ + assert (false); +} + +void xs::udp_receiver_t::drop_subscriptions () +{ + msg_t msg; + while (session->read (&msg)) + msg.close (); +} diff --git a/src/udp_receiver.hpp b/src/udp_receiver.hpp new file mode 100644 index 0000000..114df24 --- /dev/null +++ b/src/udp_receiver.hpp @@ -0,0 +1,101 @@ +/* + Copyright (c) 2012 Martin Lucina <martin@lucina.net> + + This file is part of Crossroads I/O. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or (at your + option) any later version. + + Crossroads I/O is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __XS_UDP_RECEIVER_HPP_INCLUDED__ +#define __XS_UDP_RECEIVER_HPP_INCLUDED__ + +#include "platform.hpp" + +#include "io_object.hpp" +#include "i_engine.hpp" +#include "options.hpp" +#include "decoder.hpp" +#include "address.hpp" + +namespace xs +{ + + class io_thread_t; + class session_base_t; + + class udp_receiver_t : public io_object_t, public i_engine + { + + public: + + udp_receiver_t (xs::io_thread_t *parent_, const options_t &options_); + ~udp_receiver_t (); + + int init (const char *address_); + + // i_engine interface implementation. + void plug (xs::io_thread_t *io_thread_, + xs::session_base_t *session_); + void unplug (); + void terminate (); + void activate_in (); + void activate_out (); + + // i_poll_events interface implementation. + void in_event (fd_t fd_); + void out_event (fd_t fd_); + + private: + + // We don't support forwarding subscriptions upstream (yet). + void drop_subscriptions (); + + // Underlying UDP socket. + fd_t socket; + handle_t socket_handle; + + // Socket address. + address_t address; + + // Socket options. + options_t options; + + // Decoder for this socket. + decoder_t *decoder; + + // Amount of unprocessed data waiting in decoder, if any. + ssize_t pending_bytes; + + // Pointer to unprocessed data in buffer, if any. + unsigned char *pending_p; + + // Buffer for data to process. + unsigned char data [pgm_max_tpdu]; + + // Associated session. + xs::session_base_t *session; + + // UDP header size. + static const size_t udp_header_size = 6; + + // Last sequence number seen, 0 if none. + uint32_t last_seq_no; + + udp_receiver_t (const udp_receiver_t&); + const udp_receiver_t &operator = (const udp_receiver_t&); + }; + +} + +#endif diff --git a/src/udp_sender.cpp b/src/udp_sender.cpp new file mode 100644 index 0000000..ddcffbb --- /dev/null +++ b/src/udp_sender.cpp @@ -0,0 +1,197 @@ +/* + Copyright (c) 2012 Martin Lucina <martin@lucina.net> + + This file is part of Crossroads I/O. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or (at your + option) any later version. + + Crossroads I/O is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "platform.hpp" + +#include <stdlib.h> + +#include "io_thread.hpp" +#include "udp_sender.hpp" +#include "session_base.hpp" +#include "err.hpp" +#include "wire.hpp" +#include "stdint.hpp" +#include "ip.hpp" + +xs::udp_sender_t::udp_sender_t (io_thread_t *parent_, + const options_t &options_) : + io_object_t (parent_), + options (options_), + encoder (0), + seq_no (1), + backoff_timer (NULL) +{ +} + +xs::udp_sender_t::~udp_sender_t () +{ + if (backoff_timer) { + rm_timer (backoff_timer); + } +} + +int xs::udp_sender_t::init (const char *address_) +{ + int rc; + + rc = address_resolve_tcp (&address, address_, false, options.ipv4only); + if (rc != 0) + return rc; + + // Create a connected UDP socket for a single peer, and make + // it non-blocking. + socket = open_socket (address.ss_family, SOCK_DGRAM, IPPROTO_UDP); + if (socket == -1) + return -1; + rc = ::connect (socket, (const sockaddr *)&address, + address_size (&address)); + if (rc != 0) + return -1; + unblock_socket (socket); + + return 0; +} + +void xs::udp_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) +{ + session = session_; + encoder.set_session (session); + + // Start polling. + socket_handle = add_fd (socket); + set_pollout (socket_handle); + + // UDP cannot pass subscriptions upstream; for now just fake subscribing + // to all messages. + msg_t msg; + msg.init_size (1); + *(unsigned char*) msg.data () = 1; + int rc = session_->write (&msg); + errno_assert (rc == 0); + session_->flush (); +} + +void xs::udp_sender_t::unplug () +{ + rm_fd (socket_handle); + + session = NULL; +} + +void xs::udp_sender_t::terminate () +{ + unplug (); + delete this; +} + +// Called when our pipe is reactivated (has more data for us). +void xs::udp_sender_t::activate_out () +{ + // Reactivate polling. + set_pollout (socket_handle); + // Try and send more data. + out_event (retired_fd); +} + +void xs::udp_sender_t::activate_in () +{ + xs_assert (false); +} + +// Called when POLLERR is fired on the socket. +void xs::udp_sender_t::in_event (fd_t fd_) +{ + // Get the actual error code from the socket. + int err = 0; + socklen_t len = sizeof (err); + int rc = getsockopt (socket, SOL_SOCKET, SO_ERROR, (char *) &err, &len); + errno_assert (rc == 0); + + // If we got ECONNREFUSED, there is no one on the other end. + // Stop sending for XS_RECONNECT_IVL milliseconds. + if (err == ECONNREFUSED) { + backoff_timer = add_timer (options.reconnect_ivl); + // Polling will be re-started by timer_event (). + reset_pollout (socket_handle); + return; + } + + // Any other error is a bug, for now. + errno = err; + errno_assert (false); +} + +// Called when POLLOUT is fired on the socket. +void xs::udp_sender_t::out_event (fd_t fd_) +{ + unsigned char *data_p = data; + size_t size = pgm_max_tpdu - udp_header_size; + int offset = 0; + + // Get one packet of data from the encoder. + encoder.get_data (&data_p, &size, &offset); + // If there is no data in the pipe + if (size == 0) { + // Stop polling, we will be re-started by activate_out (). + reset_pollout (socket_handle); + return; + } + assert ((size + udp_header_size) <= pgm_max_tpdu); + + // Prepare UDP header (seqno, offset). + unsigned char udp_header [udp_header_size]; + put_uint32 (udp_header, seq_no); + put_uint16 (udp_header + 4, (uint16_t) offset); + + // Send out the message, ensuring the whole message was sent. + struct iovec iov[2] = { + { udp_header, udp_header_size }, + { data_p, size } + }; + ssize_t write_bytes = writev (socket, iov, 2); + + // If we got ECONNREFUSED, there is no one on the other end. + // Drop the packet and back off for XS_RECONNECT_IVL milliseconds. + if (write_bytes == -1 && errno == ECONNREFUSED) { + backoff_timer = add_timer (options.reconnect_ivl); + // Polling will be re-started by timer_event (). + reset_pollout (socket_handle); + // Increase sequence number to ensure other end is notified of + // dropped packet. + ++seq_no; + return; + } + errno_assert (write_bytes > 0); + assert ((size_t)write_bytes == (udp_header_size + size)); + + // Packet was sent successfully, increase sequence number. + ++seq_no; +} + +// Called when our backoff timer expires. +void xs::udp_sender_t::timer_event (handle_t handle_) +{ + xs_assert (handle_ == backoff_timer); + backoff_timer = NULL; + + // Re-start polling. + set_pollout (socket_handle); + + out_event (retired_fd); +} diff --git a/src/udp_sender.hpp b/src/udp_sender.hpp new file mode 100644 index 0000000..e7c2472 --- /dev/null +++ b/src/udp_sender.hpp @@ -0,0 +1,97 @@ +/* + Copyright (c) 2012 Martin Lucina <martin@lucina.net> + + This file is part of Crossroads I/O. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or (at your + option) any later version. + + Crossroads I/O is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __XS_UDP_SENDER_HPP_INCLUDED__ +#define __XS_UDP_SENDER_HPP_INCLUDED__ + +#include "platform.hpp" + +#include "stdint.hpp" +#include "io_object.hpp" +#include "i_engine.hpp" +#include "options.hpp" +#include "address.hpp" +#include "encoder.hpp" + +namespace xs +{ + + class io_thread_t; + class session_base_t; + + class udp_sender_t : public io_object_t, public i_engine + { + + public: + + udp_sender_t (xs::io_thread_t *parent_, const options_t &options_); + ~udp_sender_t (); + + int init (const char *address_); + + // i_engine interface implementation. + void plug (xs::io_thread_t *io_thread_, + xs::session_base_t *session_); + void unplug (); + void terminate (); + void activate_in (); + void activate_out (); + + // i_poll_events interface implementation. + void in_event (fd_t fd_); + void out_event (fd_t fd_); + void timer_event (handle_t handle_); + + private: + + // Underlying UDP socket. + fd_t socket; + handle_t socket_handle; + + // Socket address. + address_t address; + + // Socket options. + options_t options; + + // Encoder for this socket. + encoder_t encoder; + + // Associated session. + xs::session_base_t *session; + + // UDP packet header size. + static const size_t udp_header_size = 6; + + // Packet buffer for outgoing packets. + unsigned char data [pgm_max_tpdu]; + + // Sequence number for outgoing packets. + uint32_t seq_no; + + // Backoff timer if receiver is not present. + handle_t backoff_timer; + + udp_sender_t (const udp_sender_t&); + const udp_sender_t &operator = (const udp_sender_t&); + }; + +} + +#endif diff --git a/tests/Makefile.am b/tests/Makefile.am index 27ca5dd..52e8e1c 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -7,6 +7,7 @@ noinst_PROGRAMS = pair_inproc \ pair_tcp \ reqrep_inproc \ reqrep_tcp \ + pubsub_udp \ hwm \ reqrep_device \ sub_forward \ @@ -32,6 +33,7 @@ pair_inproc_SOURCES = pair_inproc.cpp testutil.hpp pair_tcp_SOURCES = pair_tcp.cpp testutil.hpp reqrep_inproc_SOURCES = reqrep_inproc.cpp testutil.hpp reqrep_tcp_SOURCES = reqrep_tcp.cpp testutil.hpp +pubsub_udp_SOURCES = pubsub_udp.cpp testutil.hpp hwm_SOURCES = hwm.cpp reqrep_device_SOURCES = reqrep_device.cpp sub_forward_SOURCES = sub_forward.cpp diff --git a/tests/pubsub_udp.cpp b/tests/pubsub_udp.cpp new file mode 100644 index 0000000..e981457 --- /dev/null +++ b/tests/pubsub_udp.cpp @@ -0,0 +1,90 @@ +/* + Copyright (c) 2012 Martin Lucina <martin@lucina.net> + + This file is part of Crossroads I/O. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or (at your + option) any later version. + + Crossroads I/O is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "testutil.hpp" + +int XS_TEST_MAIN () +{ + fprintf (stderr, "pubsub_udp test running...\n"); + + void *ctx = xs_init (); + assert (ctx); + + void *pub = xs_socket (ctx, XS_PUB); + assert (pub); + int rc = xs_bind (pub, "udp://127.0.0.1:5555"); + assert (rc != -1); + + void *sub = xs_socket (ctx, XS_SUB); + assert (sub); + rc = xs_connect (sub, "udp://127.0.0.1:5555"); + assert (rc != -1); + rc = xs_setsockopt (sub, XS_SUBSCRIBE, "", 0); + assert (rc == 0); + + const char *content = "12345678ABCDEFGH12345678abcdefgh"; + + // TODO: Due to a core bug, the first message on a PUB/SUB socket + // is always lost. For now just send a dummy message. + rc = xs_send (pub, "", 0, 0); + assert (rc == 0); + + // Send a message with two identical parts. + rc = xs_send (pub, content, 32, XS_SNDMORE); + assert (rc == 32); + rc = xs_send (pub, content, 32, 0); + assert (rc == 32); + + // Receive the first part. + char rcvbuf [32]; + int rcvmore = 0; + size_t rcvmore_sz = sizeof rcvmore; + rc = xs_recv (sub, rcvbuf, 32, 0); + assert (rc == 32); + rc = xs_getsockopt (sub, XS_RCVMORE, &rcvmore, &rcvmore_sz); + assert (rc == 0); + + // There must be one more part to receive. + assert (rcvmore); + // And the content must match what was sent. + assert (memcmp (rcvbuf, content, 32) == 0); + + // Receive the second part. + rc = xs_recv (sub, rcvbuf, 32, 0); + assert (rc == 32); + rcvmore_sz = sizeof rcvmore; + rc = xs_getsockopt (sub, XS_RCVMORE, &rcvmore, &rcvmore_sz); + assert (rc == 0); + + // There must not be another part. + assert (!rcvmore); + // And the content must match what was sent. + assert (memcmp (rcvbuf, content, 32) == 0); + + rc = xs_close (pub); + assert (rc == 0); + + rc = xs_close (sub); + assert (rc == 0); + + rc = xs_term (ctx); + assert (rc == 0); + + return 0 ; +} |