summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Lucina <martin@lucina.net>2012-05-20 10:40:36 +0200
committerMartin Sustrik <sustrik@250bpm.com>2012-06-01 11:14:30 +0200
commit9ba8f9a503d69b891fae38628e0038f49ed5b8a4 (patch)
tree363d40593300c9665e7973680579d4c2b7647536 /src
parent58d54740785eb2c7208a01afb5bd9736e5808069 (diff)
UDP support
Signed-off-by: Martin Lucina <martin@lucina.net>
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am4
-rw-r--r--src/session_base.cpp38
-rw-r--r--src/socket_base.cpp6
-rw-r--r--src/udp_receiver.cpp211
-rw-r--r--src/udp_receiver.hpp101
-rw-r--r--src/udp_sender.cpp197
-rw-r--r--src/udp_sender.hpp97
7 files changed, 651 insertions, 3 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 1daee23..9ed62b1 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -78,6 +78,8 @@ libxs_la_SOURCES = \
tcp_listener.hpp \
thread.hpp \
topic_filter.hpp \
+ udp_receiver.hpp \
+ udp_sender.hpp \
upoll.hpp \
windows.hpp \
wire.hpp \
@@ -138,6 +140,8 @@ libxs_la_SOURCES = \
tcp_listener.cpp \
thread.cpp \
topic_filter.cpp \
+ udp_receiver.cpp \
+ udp_sender.cpp \
upoll.cpp \
xpub.cpp \
xrep.cpp \
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 12d2e8d..81f7347 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -30,6 +30,8 @@
#include "ipc_connecter.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
+#include "udp_sender.hpp"
+#include "udp_receiver.hpp"
#include "req.hpp"
#include "xreq.hpp"
@@ -484,6 +486,42 @@ void xs::session_base_t::start_connecting (bool wait_)
}
#endif
+ // UDP support.
+ if (protocol == "udp") {
+
+ // At this point we'll create message pipes to the session straight
+ // away. There's no point in delaying it as no concept of 'connect'
+ // exists with UDP anyway.
+ if (options.type == XS_PUB || options.type == XS_XPUB) {
+
+ // UDP sender.
+ udp_sender_t *udp_sender = new (std::nothrow) udp_sender_t (
+ io_thread, options);
+ alloc_assert (udp_sender);
+
+ int rc = udp_sender->init (address.c_str ());
+ xs_assert (rc == 0);
+
+ send_attach (this, udp_sender);
+ }
+ else if (options.type == XS_SUB || options.type == XS_XSUB) {
+
+ // UDP receiver.
+ udp_receiver_t *udp_receiver = new (std::nothrow) udp_receiver_t (
+ io_thread, options);
+ alloc_assert (udp_receiver);
+
+ int rc = udp_receiver->init (address.c_str ());
+ xs_assert (rc == 0);
+
+ send_attach (this, udp_receiver);
+ }
+ else
+ xs_assert (false);
+
+ return;
+ }
+
xs_assert (false);
}
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 9de8735..389bfbc 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -208,7 +208,7 @@ int xs::socket_base_t::check_protocol (const std::string &protocol_)
{
// First check out whether the protcol is something we are aware of.
if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" &&
- protocol_ != "pgm" && protocol_ != "epgm") {
+ protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "udp") {
errno = EPROTONOSUPPORT;
return -1;
}
@@ -234,7 +234,7 @@ int xs::socket_base_t::check_protocol (const std::string &protocol_)
// Check whether socket type and transport protocol match.
// Specifically, multicast protocols can't be combined with
// bi-directional messaging patterns (socket types).
- if ((protocol_ == "pgm" || protocol_ == "epgm") &&
+ if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "udp") &&
options.type != XS_PUB && options.type != XS_SUB &&
options.type != XS_XPUB && options.type != XS_XSUB) {
errno = ENOCOMPATPROTO;
@@ -359,7 +359,7 @@ int xs::socket_base_t::bind (const char *addr_)
return 0;
}
- if (protocol == "pgm" || protocol == "epgm") {
+ if (protocol == "pgm" || protocol == "epgm" || protocol == "udp") {
// For convenience's sake, bind can be used interchageable with
// connect for PGM and EPGM transports.
diff --git a/src/udp_receiver.cpp b/src/udp_receiver.cpp
new file mode 100644
index 0000000..9cd3c4c
--- /dev/null
+++ b/src/udp_receiver.cpp
@@ -0,0 +1,211 @@
+/*
+ Copyright (c) 2012 Martin Lucina <martin@lucina.net>
+
+ This file is part of Crossroads I/O.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or (at your
+ option) any later version.
+
+ Crossroads I/O is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "platform.hpp"
+
+#include <new>
+
+#include "udp_receiver.hpp"
+#include "session_base.hpp"
+#include "stdint.hpp"
+#include "wire.hpp"
+#include "err.hpp"
+#include "ip.hpp"
+
+xs::udp_receiver_t::udp_receiver_t (class io_thread_t *parent_,
+ const options_t &options_) :
+ io_object_t (parent_),
+ options (options_),
+ pending_bytes (0),
+ pending_p (NULL),
+ session (NULL),
+ last_seq_no (0)
+{
+ decoder = new (std::nothrow) decoder_t (in_batch_size, options.maxmsgsize);
+ alloc_assert (decoder);
+}
+
+xs::udp_receiver_t::~udp_receiver_t ()
+{
+ delete decoder;
+}
+
+int xs::udp_receiver_t::init (const char *address_)
+{
+ int rc;
+
+ rc = address_resolve_tcp (&address, address_, false, options.ipv4only);
+ if (rc != 0)
+ return rc;
+
+ // Create a unconnected UDP socket, bind it to the requested address
+ // and make it non-blocking.
+ socket = open_socket (address.ss_family, SOCK_DGRAM, IPPROTO_UDP);
+ if (socket == -1)
+ return -1;
+ rc = ::bind (socket, (const sockaddr *)&address, address_size (&address));
+ if (rc != 0)
+ return -1;
+ unblock_socket (socket);
+
+ return 0;
+}
+
+void xs::udp_receiver_t::plug (io_thread_t *io_thread_,
+ session_base_t *session_)
+{
+ // Start polling.
+ socket_handle = add_fd (socket);
+ set_pollin (socket_handle);
+
+ session = session_;
+ decoder->set_session (session);
+
+ // If there are any subscriptions already queued in the session, drop them.
+ drop_subscriptions ();
+}
+
+void xs::udp_receiver_t::unplug ()
+{
+ rm_fd (socket_handle);
+
+ session = NULL;
+}
+
+void xs::udp_receiver_t::terminate ()
+{
+ unplug ();
+ delete this;
+}
+
+// Called when our pipe is reactivated (has more data for us).
+void xs::udp_receiver_t::activate_out ()
+{
+ drop_subscriptions ();
+}
+
+// Called when our pipe is reactivated (able to accept more data).
+void xs::udp_receiver_t::activate_in ()
+{
+ // Process any pending data.
+ if (pending_bytes > 0) {
+ ssize_t processed_bytes = \
+ decoder->process_buffer (pending_p, pending_bytes);
+ // Flush any messages produced by the decoder to the pipe.
+ session->flush ();
+ if (processed_bytes < pending_bytes) {
+ // Some data (still) could not be written to the pipe.
+ pending_bytes -= processed_bytes;
+ pending_p += processed_bytes;
+ // Try again later.
+ return;
+ }
+ // Done with unprocessed data.
+ pending_bytes = 0;
+ }
+
+ // Reactivate polling.
+ set_pollin (socket_handle);
+
+ // Read any data that might have showed up on the socket in the mean time.
+ in_event (retired_fd);
+}
+
+// Called when POLLIN is fired on the socket.
+void xs::udp_receiver_t::in_event (fd_t fd_)
+{
+ // Receive a packet.
+ ssize_t recv_bytes = recv (socket, data, sizeof data, 0);
+ // At the moment, go back to polling on EAGAIN and assert on any
+ // other error.
+ if ((recv_bytes < 0) && errno == EAGAIN)
+ return;
+ assert (recv_bytes > 0);
+
+ // Parse UDP packet header.
+ unsigned char *data_p = data;
+ uint32_t seq_no = get_uint32 (data_p);
+ uint16_t offset = get_uint16 (data_p + 4);
+ data_p += udp_header_size;
+ recv_bytes -= udp_header_size;
+
+ // If this is our first packet, join the message stream.
+ if (last_seq_no == 0) {
+ if (offset == 0xffff)
+ return;
+ else {
+ data_p += offset;
+ recv_bytes -= offset;
+ }
+ }
+ // Otherwise, decide based on the sequence number.
+ else {
+ // If this packet is in sequence, process the whole packet.
+ if ((last_seq_no + 1) == seq_no)
+ ;
+ // Otherwise, if it is an old packet, drop it.
+ else if (seq_no <= last_seq_no)
+ return;
+ // Otherwise we have packet loss, rejoin the message stream.
+ else {
+ if (offset == 0xffff)
+ return;
+ else {
+ data_p += offset;
+ recv_bytes -= offset;
+
+ // Re-create decoder to clear state.
+ delete decoder;
+ decoder = NULL;
+ decoder = new (std::nothrow) decoder_t (in_batch_size,
+ options.maxmsgsize);
+ alloc_assert (decoder);
+ decoder->set_session (session);
+ }
+ }
+ }
+ // If we get here, we will process this packet and it becomes our
+ // last seen sequence number.
+ last_seq_no = seq_no;
+
+ // Decode data and push it to our pipe.
+ ssize_t processed_bytes = decoder->process_buffer (data_p, recv_bytes);
+ if (processed_bytes < recv_bytes) {
+ // Some data could not be written to the pipe. Save it for later.
+ pending_bytes = recv_bytes - processed_bytes;
+ pending_p = data_p + processed_bytes;
+ // Stop polling. We will be restarted by a call to activate_in ().
+ reset_pollin (socket_handle);
+ }
+
+ // Flush any messages produced by the decoder to the pipe.
+ session->flush ();
+}
+
+void xs::udp_receiver_t::out_event (fd_t fd_)
+{
+ assert (false);
+}
+
+void xs::udp_receiver_t::drop_subscriptions ()
+{
+ msg_t msg;
+ while (session->read (&msg))
+ msg.close ();
+}
diff --git a/src/udp_receiver.hpp b/src/udp_receiver.hpp
new file mode 100644
index 0000000..114df24
--- /dev/null
+++ b/src/udp_receiver.hpp
@@ -0,0 +1,101 @@
+/*
+ Copyright (c) 2012 Martin Lucina <martin@lucina.net>
+
+ This file is part of Crossroads I/O.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or (at your
+ option) any later version.
+
+ Crossroads I/O is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_UDP_RECEIVER_HPP_INCLUDED__
+#define __XS_UDP_RECEIVER_HPP_INCLUDED__
+
+#include "platform.hpp"
+
+#include "io_object.hpp"
+#include "i_engine.hpp"
+#include "options.hpp"
+#include "decoder.hpp"
+#include "address.hpp"
+
+namespace xs
+{
+
+ class io_thread_t;
+ class session_base_t;
+
+ class udp_receiver_t : public io_object_t, public i_engine
+ {
+
+ public:
+
+ udp_receiver_t (xs::io_thread_t *parent_, const options_t &options_);
+ ~udp_receiver_t ();
+
+ int init (const char *address_);
+
+ // i_engine interface implementation.
+ void plug (xs::io_thread_t *io_thread_,
+ xs::session_base_t *session_);
+ void unplug ();
+ void terminate ();
+ void activate_in ();
+ void activate_out ();
+
+ // i_poll_events interface implementation.
+ void in_event (fd_t fd_);
+ void out_event (fd_t fd_);
+
+ private:
+
+ // We don't support forwarding subscriptions upstream (yet).
+ void drop_subscriptions ();
+
+ // Underlying UDP socket.
+ fd_t socket;
+ handle_t socket_handle;
+
+ // Socket address.
+ address_t address;
+
+ // Socket options.
+ options_t options;
+
+ // Decoder for this socket.
+ decoder_t *decoder;
+
+ // Amount of unprocessed data waiting in decoder, if any.
+ ssize_t pending_bytes;
+
+ // Pointer to unprocessed data in buffer, if any.
+ unsigned char *pending_p;
+
+ // Buffer for data to process.
+ unsigned char data [pgm_max_tpdu];
+
+ // Associated session.
+ xs::session_base_t *session;
+
+ // UDP header size.
+ static const size_t udp_header_size = 6;
+
+ // Last sequence number seen, 0 if none.
+ uint32_t last_seq_no;
+
+ udp_receiver_t (const udp_receiver_t&);
+ const udp_receiver_t &operator = (const udp_receiver_t&);
+ };
+
+}
+
+#endif
diff --git a/src/udp_sender.cpp b/src/udp_sender.cpp
new file mode 100644
index 0000000..ddcffbb
--- /dev/null
+++ b/src/udp_sender.cpp
@@ -0,0 +1,197 @@
+/*
+ Copyright (c) 2012 Martin Lucina <martin@lucina.net>
+
+ This file is part of Crossroads I/O.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or (at your
+ option) any later version.
+
+ Crossroads I/O is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "platform.hpp"
+
+#include <stdlib.h>
+
+#include "io_thread.hpp"
+#include "udp_sender.hpp"
+#include "session_base.hpp"
+#include "err.hpp"
+#include "wire.hpp"
+#include "stdint.hpp"
+#include "ip.hpp"
+
+xs::udp_sender_t::udp_sender_t (io_thread_t *parent_,
+ const options_t &options_) :
+ io_object_t (parent_),
+ options (options_),
+ encoder (0),
+ seq_no (1),
+ backoff_timer (NULL)
+{
+}
+
+xs::udp_sender_t::~udp_sender_t ()
+{
+ if (backoff_timer) {
+ rm_timer (backoff_timer);
+ }
+}
+
+int xs::udp_sender_t::init (const char *address_)
+{
+ int rc;
+
+ rc = address_resolve_tcp (&address, address_, false, options.ipv4only);
+ if (rc != 0)
+ return rc;
+
+ // Create a connected UDP socket for a single peer, and make
+ // it non-blocking.
+ socket = open_socket (address.ss_family, SOCK_DGRAM, IPPROTO_UDP);
+ if (socket == -1)
+ return -1;
+ rc = ::connect (socket, (const sockaddr *)&address,
+ address_size (&address));
+ if (rc != 0)
+ return -1;
+ unblock_socket (socket);
+
+ return 0;
+}
+
+void xs::udp_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
+{
+ session = session_;
+ encoder.set_session (session);
+
+ // Start polling.
+ socket_handle = add_fd (socket);
+ set_pollout (socket_handle);
+
+ // UDP cannot pass subscriptions upstream; for now just fake subscribing
+ // to all messages.
+ msg_t msg;
+ msg.init_size (1);
+ *(unsigned char*) msg.data () = 1;
+ int rc = session_->write (&msg);
+ errno_assert (rc == 0);
+ session_->flush ();
+}
+
+void xs::udp_sender_t::unplug ()
+{
+ rm_fd (socket_handle);
+
+ session = NULL;
+}
+
+void xs::udp_sender_t::terminate ()
+{
+ unplug ();
+ delete this;
+}
+
+// Called when our pipe is reactivated (has more data for us).
+void xs::udp_sender_t::activate_out ()
+{
+ // Reactivate polling.
+ set_pollout (socket_handle);
+ // Try and send more data.
+ out_event (retired_fd);
+}
+
+void xs::udp_sender_t::activate_in ()
+{
+ xs_assert (false);
+}
+
+// Called when POLLERR is fired on the socket.
+void xs::udp_sender_t::in_event (fd_t fd_)
+{
+ // Get the actual error code from the socket.
+ int err = 0;
+ socklen_t len = sizeof (err);
+ int rc = getsockopt (socket, SOL_SOCKET, SO_ERROR, (char *) &err, &len);
+ errno_assert (rc == 0);
+
+ // If we got ECONNREFUSED, there is no one on the other end.
+ // Stop sending for XS_RECONNECT_IVL milliseconds.
+ if (err == ECONNREFUSED) {
+ backoff_timer = add_timer (options.reconnect_ivl);
+ // Polling will be re-started by timer_event ().
+ reset_pollout (socket_handle);
+ return;
+ }
+
+ // Any other error is a bug, for now.
+ errno = err;
+ errno_assert (false);
+}
+
+// Called when POLLOUT is fired on the socket.
+void xs::udp_sender_t::out_event (fd_t fd_)
+{
+ unsigned char *data_p = data;
+ size_t size = pgm_max_tpdu - udp_header_size;
+ int offset = 0;
+
+ // Get one packet of data from the encoder.
+ encoder.get_data (&data_p, &size, &offset);
+ // If there is no data in the pipe
+ if (size == 0) {
+ // Stop polling, we will be re-started by activate_out ().
+ reset_pollout (socket_handle);
+ return;
+ }
+ assert ((size + udp_header_size) <= pgm_max_tpdu);
+
+ // Prepare UDP header (seqno, offset).
+ unsigned char udp_header [udp_header_size];
+ put_uint32 (udp_header, seq_no);
+ put_uint16 (udp_header + 4, (uint16_t) offset);
+
+ // Send out the message, ensuring the whole message was sent.
+ struct iovec iov[2] = {
+ { udp_header, udp_header_size },
+ { data_p, size }
+ };
+ ssize_t write_bytes = writev (socket, iov, 2);
+
+ // If we got ECONNREFUSED, there is no one on the other end.
+ // Drop the packet and back off for XS_RECONNECT_IVL milliseconds.
+ if (write_bytes == -1 && errno == ECONNREFUSED) {
+ backoff_timer = add_timer (options.reconnect_ivl);
+ // Polling will be re-started by timer_event ().
+ reset_pollout (socket_handle);
+ // Increase sequence number to ensure other end is notified of
+ // dropped packet.
+ ++seq_no;
+ return;
+ }
+ errno_assert (write_bytes > 0);
+ assert ((size_t)write_bytes == (udp_header_size + size));
+
+ // Packet was sent successfully, increase sequence number.
+ ++seq_no;
+}
+
+// Called when our backoff timer expires.
+void xs::udp_sender_t::timer_event (handle_t handle_)
+{
+ xs_assert (handle_ == backoff_timer);
+ backoff_timer = NULL;
+
+ // Re-start polling.
+ set_pollout (socket_handle);
+
+ out_event (retired_fd);
+}
diff --git a/src/udp_sender.hpp b/src/udp_sender.hpp
new file mode 100644
index 0000000..e7c2472
--- /dev/null
+++ b/src/udp_sender.hpp
@@ -0,0 +1,97 @@
+/*
+ Copyright (c) 2012 Martin Lucina <martin@lucina.net>
+
+ This file is part of Crossroads I/O.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or (at your
+ option) any later version.
+
+ Crossroads I/O is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_UDP_SENDER_HPP_INCLUDED__
+#define __XS_UDP_SENDER_HPP_INCLUDED__
+
+#include "platform.hpp"
+
+#include "stdint.hpp"
+#include "io_object.hpp"
+#include "i_engine.hpp"
+#include "options.hpp"
+#include "address.hpp"
+#include "encoder.hpp"
+
+namespace xs
+{
+
+ class io_thread_t;
+ class session_base_t;
+
+ class udp_sender_t : public io_object_t, public i_engine
+ {
+
+ public:
+
+ udp_sender_t (xs::io_thread_t *parent_, const options_t &options_);
+ ~udp_sender_t ();
+
+ int init (const char *address_);
+
+ // i_engine interface implementation.
+ void plug (xs::io_thread_t *io_thread_,
+ xs::session_base_t *session_);
+ void unplug ();
+ void terminate ();
+ void activate_in ();
+ void activate_out ();
+
+ // i_poll_events interface implementation.
+ void in_event (fd_t fd_);
+ void out_event (fd_t fd_);
+ void timer_event (handle_t handle_);
+
+ private:
+
+ // Underlying UDP socket.
+ fd_t socket;
+ handle_t socket_handle;
+
+ // Socket address.
+ address_t address;
+
+ // Socket options.
+ options_t options;
+
+ // Encoder for this socket.
+ encoder_t encoder;
+
+ // Associated session.
+ xs::session_base_t *session;
+
+ // UDP packet header size.
+ static const size_t udp_header_size = 6;
+
+ // Packet buffer for outgoing packets.
+ unsigned char data [pgm_max_tpdu];
+
+ // Sequence number for outgoing packets.
+ uint32_t seq_no;
+
+ // Backoff timer if receiver is not present.
+ handle_t backoff_timer;
+
+ udp_sender_t (const udp_sender_t&);
+ const udp_sender_t &operator = (const udp_sender_t&);
+ };
+
+}
+
+#endif