diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 4 | ||||
-rw-r--r-- | src/session_base.cpp | 38 | ||||
-rw-r--r-- | src/socket_base.cpp | 6 | ||||
-rw-r--r-- | src/udp_receiver.cpp | 211 | ||||
-rw-r--r-- | src/udp_receiver.hpp | 101 | ||||
-rw-r--r-- | src/udp_sender.cpp | 197 | ||||
-rw-r--r-- | src/udp_sender.hpp | 97 |
7 files changed, 3 insertions, 651 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 9ed62b1..1daee23 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -78,8 +78,6 @@ libxs_la_SOURCES = \ tcp_listener.hpp \ thread.hpp \ topic_filter.hpp \ - udp_receiver.hpp \ - udp_sender.hpp \ upoll.hpp \ windows.hpp \ wire.hpp \ @@ -140,8 +138,6 @@ libxs_la_SOURCES = \ tcp_listener.cpp \ thread.cpp \ topic_filter.cpp \ - udp_receiver.cpp \ - udp_sender.cpp \ upoll.cpp \ xpub.cpp \ xrep.cpp \ diff --git a/src/session_base.cpp b/src/session_base.cpp index 49fdce1..1ffc060 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -30,8 +30,6 @@ #include "ipc_connecter.hpp" #include "pgm_sender.hpp" #include "pgm_receiver.hpp" -#include "udp_sender.hpp" -#include "udp_receiver.hpp" #include "req.hpp" #include "xreq.hpp" @@ -486,42 +484,6 @@ void xs::session_base_t::start_connecting (bool wait_) } #endif - // UDP support. - if (protocol == "udp") { - - // At this point we'll create message pipes to the session straight - // away. There's no point in delaying it as no concept of 'connect' - // exists with UDP anyway. - if (options.type == XS_PUB || options.type == XS_XPUB) { - - // UDP sender. - udp_sender_t *udp_sender = new (std::nothrow) udp_sender_t ( - io_thread, options); - alloc_assert (udp_sender); - - int rc = udp_sender->init (address.c_str ()); - xs_assert (rc == 0); - - send_attach (this, udp_sender); - } - else if (options.type == XS_SUB || options.type == XS_XSUB) { - - // UDP receiver. - udp_receiver_t *udp_receiver = new (std::nothrow) udp_receiver_t ( - io_thread, options); - alloc_assert (udp_receiver); - - int rc = udp_receiver->init (address.c_str ()); - xs_assert (rc == 0); - - send_attach (this, udp_receiver); - } - else - xs_assert (false); - - return; - } - xs_assert (false); } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 6b8588a..5aed0dc 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -208,7 +208,7 @@ int xs::socket_base_t::check_protocol (const std::string &protocol_) { // First check out whether the protcol is something we are aware of. if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && - protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "udp") { + protocol_ != "pgm" && protocol_ != "epgm") { errno = EPROTONOSUPPORT; return -1; } @@ -234,7 +234,7 @@ int xs::socket_base_t::check_protocol (const std::string &protocol_) // Check whether socket type and transport protocol match. // Specifically, multicast protocols can't be combined with // bi-directional messaging patterns (socket types). - if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "udp") && + if ((protocol_ == "pgm" || protocol_ == "epgm") && options.type != XS_PUB && options.type != XS_SUB && options.type != XS_XPUB && options.type != XS_XSUB) { errno = ENOCOMPATPROTO; @@ -359,7 +359,7 @@ int xs::socket_base_t::bind (const char *addr_) return 0; } - if (protocol == "pgm" || protocol == "epgm" || protocol == "udp") { + if (protocol == "pgm" || protocol == "epgm") { // For convenience's sake, bind can be used interchageable with // connect for PGM and EPGM transports. diff --git a/src/udp_receiver.cpp b/src/udp_receiver.cpp deleted file mode 100644 index 9cd3c4c..0000000 --- a/src/udp_receiver.cpp +++ /dev/null @@ -1,211 +0,0 @@ -/* - Copyright (c) 2012 Martin Lucina <martin@lucina.net> - - This file is part of Crossroads I/O. - - Crossroads I/O is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or (at your - option) any later version. - - Crossroads I/O is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public - License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "platform.hpp" - -#include <new> - -#include "udp_receiver.hpp" -#include "session_base.hpp" -#include "stdint.hpp" -#include "wire.hpp" -#include "err.hpp" -#include "ip.hpp" - -xs::udp_receiver_t::udp_receiver_t (class io_thread_t *parent_, - const options_t &options_) : - io_object_t (parent_), - options (options_), - pending_bytes (0), - pending_p (NULL), - session (NULL), - last_seq_no (0) -{ - decoder = new (std::nothrow) decoder_t (in_batch_size, options.maxmsgsize); - alloc_assert (decoder); -} - -xs::udp_receiver_t::~udp_receiver_t () -{ - delete decoder; -} - -int xs::udp_receiver_t::init (const char *address_) -{ - int rc; - - rc = address_resolve_tcp (&address, address_, false, options.ipv4only); - if (rc != 0) - return rc; - - // Create a unconnected UDP socket, bind it to the requested address - // and make it non-blocking. - socket = open_socket (address.ss_family, SOCK_DGRAM, IPPROTO_UDP); - if (socket == -1) - return -1; - rc = ::bind (socket, (const sockaddr *)&address, address_size (&address)); - if (rc != 0) - return -1; - unblock_socket (socket); - - return 0; -} - -void xs::udp_receiver_t::plug (io_thread_t *io_thread_, - session_base_t *session_) -{ - // Start polling. - socket_handle = add_fd (socket); - set_pollin (socket_handle); - - session = session_; - decoder->set_session (session); - - // If there are any subscriptions already queued in the session, drop them. - drop_subscriptions (); -} - -void xs::udp_receiver_t::unplug () -{ - rm_fd (socket_handle); - - session = NULL; -} - -void xs::udp_receiver_t::terminate () -{ - unplug (); - delete this; -} - -// Called when our pipe is reactivated (has more data for us). -void xs::udp_receiver_t::activate_out () -{ - drop_subscriptions (); -} - -// Called when our pipe is reactivated (able to accept more data). -void xs::udp_receiver_t::activate_in () -{ - // Process any pending data. - if (pending_bytes > 0) { - ssize_t processed_bytes = \ - decoder->process_buffer (pending_p, pending_bytes); - // Flush any messages produced by the decoder to the pipe. - session->flush (); - if (processed_bytes < pending_bytes) { - // Some data (still) could not be written to the pipe. - pending_bytes -= processed_bytes; - pending_p += processed_bytes; - // Try again later. - return; - } - // Done with unprocessed data. - pending_bytes = 0; - } - - // Reactivate polling. - set_pollin (socket_handle); - - // Read any data that might have showed up on the socket in the mean time. - in_event (retired_fd); -} - -// Called when POLLIN is fired on the socket. -void xs::udp_receiver_t::in_event (fd_t fd_) -{ - // Receive a packet. - ssize_t recv_bytes = recv (socket, data, sizeof data, 0); - // At the moment, go back to polling on EAGAIN and assert on any - // other error. - if ((recv_bytes < 0) && errno == EAGAIN) - return; - assert (recv_bytes > 0); - - // Parse UDP packet header. - unsigned char *data_p = data; - uint32_t seq_no = get_uint32 (data_p); - uint16_t offset = get_uint16 (data_p + 4); - data_p += udp_header_size; - recv_bytes -= udp_header_size; - - // If this is our first packet, join the message stream. - if (last_seq_no == 0) { - if (offset == 0xffff) - return; - else { - data_p += offset; - recv_bytes -= offset; - } - } - // Otherwise, decide based on the sequence number. - else { - // If this packet is in sequence, process the whole packet. - if ((last_seq_no + 1) == seq_no) - ; - // Otherwise, if it is an old packet, drop it. - else if (seq_no <= last_seq_no) - return; - // Otherwise we have packet loss, rejoin the message stream. - else { - if (offset == 0xffff) - return; - else { - data_p += offset; - recv_bytes -= offset; - - // Re-create decoder to clear state. - delete decoder; - decoder = NULL; - decoder = new (std::nothrow) decoder_t (in_batch_size, - options.maxmsgsize); - alloc_assert (decoder); - decoder->set_session (session); - } - } - } - // If we get here, we will process this packet and it becomes our - // last seen sequence number. - last_seq_no = seq_no; - - // Decode data and push it to our pipe. - ssize_t processed_bytes = decoder->process_buffer (data_p, recv_bytes); - if (processed_bytes < recv_bytes) { - // Some data could not be written to the pipe. Save it for later. - pending_bytes = recv_bytes - processed_bytes; - pending_p = data_p + processed_bytes; - // Stop polling. We will be restarted by a call to activate_in (). - reset_pollin (socket_handle); - } - - // Flush any messages produced by the decoder to the pipe. - session->flush (); -} - -void xs::udp_receiver_t::out_event (fd_t fd_) -{ - assert (false); -} - -void xs::udp_receiver_t::drop_subscriptions () -{ - msg_t msg; - while (session->read (&msg)) - msg.close (); -} diff --git a/src/udp_receiver.hpp b/src/udp_receiver.hpp deleted file mode 100644 index 114df24..0000000 --- a/src/udp_receiver.hpp +++ /dev/null @@ -1,101 +0,0 @@ -/* - Copyright (c) 2012 Martin Lucina <martin@lucina.net> - - This file is part of Crossroads I/O. - - Crossroads I/O is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or (at your - option) any later version. - - Crossroads I/O is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public - License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __XS_UDP_RECEIVER_HPP_INCLUDED__ -#define __XS_UDP_RECEIVER_HPP_INCLUDED__ - -#include "platform.hpp" - -#include "io_object.hpp" -#include "i_engine.hpp" -#include "options.hpp" -#include "decoder.hpp" -#include "address.hpp" - -namespace xs -{ - - class io_thread_t; - class session_base_t; - - class udp_receiver_t : public io_object_t, public i_engine - { - - public: - - udp_receiver_t (xs::io_thread_t *parent_, const options_t &options_); - ~udp_receiver_t (); - - int init (const char *address_); - - // i_engine interface implementation. - void plug (xs::io_thread_t *io_thread_, - xs::session_base_t *session_); - void unplug (); - void terminate (); - void activate_in (); - void activate_out (); - - // i_poll_events interface implementation. - void in_event (fd_t fd_); - void out_event (fd_t fd_); - - private: - - // We don't support forwarding subscriptions upstream (yet). - void drop_subscriptions (); - - // Underlying UDP socket. - fd_t socket; - handle_t socket_handle; - - // Socket address. - address_t address; - - // Socket options. - options_t options; - - // Decoder for this socket. - decoder_t *decoder; - - // Amount of unprocessed data waiting in decoder, if any. - ssize_t pending_bytes; - - // Pointer to unprocessed data in buffer, if any. - unsigned char *pending_p; - - // Buffer for data to process. - unsigned char data [pgm_max_tpdu]; - - // Associated session. - xs::session_base_t *session; - - // UDP header size. - static const size_t udp_header_size = 6; - - // Last sequence number seen, 0 if none. - uint32_t last_seq_no; - - udp_receiver_t (const udp_receiver_t&); - const udp_receiver_t &operator = (const udp_receiver_t&); - }; - -} - -#endif diff --git a/src/udp_sender.cpp b/src/udp_sender.cpp deleted file mode 100644 index ddcffbb..0000000 --- a/src/udp_sender.cpp +++ /dev/null @@ -1,197 +0,0 @@ -/* - Copyright (c) 2012 Martin Lucina <martin@lucina.net> - - This file is part of Crossroads I/O. - - Crossroads I/O is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or (at your - option) any later version. - - Crossroads I/O is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public - License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "platform.hpp" - -#include <stdlib.h> - -#include "io_thread.hpp" -#include "udp_sender.hpp" -#include "session_base.hpp" -#include "err.hpp" -#include "wire.hpp" -#include "stdint.hpp" -#include "ip.hpp" - -xs::udp_sender_t::udp_sender_t (io_thread_t *parent_, - const options_t &options_) : - io_object_t (parent_), - options (options_), - encoder (0), - seq_no (1), - backoff_timer (NULL) -{ -} - -xs::udp_sender_t::~udp_sender_t () -{ - if (backoff_timer) { - rm_timer (backoff_timer); - } -} - -int xs::udp_sender_t::init (const char *address_) -{ - int rc; - - rc = address_resolve_tcp (&address, address_, false, options.ipv4only); - if (rc != 0) - return rc; - - // Create a connected UDP socket for a single peer, and make - // it non-blocking. - socket = open_socket (address.ss_family, SOCK_DGRAM, IPPROTO_UDP); - if (socket == -1) - return -1; - rc = ::connect (socket, (const sockaddr *)&address, - address_size (&address)); - if (rc != 0) - return -1; - unblock_socket (socket); - - return 0; -} - -void xs::udp_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) -{ - session = session_; - encoder.set_session (session); - - // Start polling. - socket_handle = add_fd (socket); - set_pollout (socket_handle); - - // UDP cannot pass subscriptions upstream; for now just fake subscribing - // to all messages. - msg_t msg; - msg.init_size (1); - *(unsigned char*) msg.data () = 1; - int rc = session_->write (&msg); - errno_assert (rc == 0); - session_->flush (); -} - -void xs::udp_sender_t::unplug () -{ - rm_fd (socket_handle); - - session = NULL; -} - -void xs::udp_sender_t::terminate () -{ - unplug (); - delete this; -} - -// Called when our pipe is reactivated (has more data for us). -void xs::udp_sender_t::activate_out () -{ - // Reactivate polling. - set_pollout (socket_handle); - // Try and send more data. - out_event (retired_fd); -} - -void xs::udp_sender_t::activate_in () -{ - xs_assert (false); -} - -// Called when POLLERR is fired on the socket. -void xs::udp_sender_t::in_event (fd_t fd_) -{ - // Get the actual error code from the socket. - int err = 0; - socklen_t len = sizeof (err); - int rc = getsockopt (socket, SOL_SOCKET, SO_ERROR, (char *) &err, &len); - errno_assert (rc == 0); - - // If we got ECONNREFUSED, there is no one on the other end. - // Stop sending for XS_RECONNECT_IVL milliseconds. - if (err == ECONNREFUSED) { - backoff_timer = add_timer (options.reconnect_ivl); - // Polling will be re-started by timer_event (). - reset_pollout (socket_handle); - return; - } - - // Any other error is a bug, for now. - errno = err; - errno_assert (false); -} - -// Called when POLLOUT is fired on the socket. -void xs::udp_sender_t::out_event (fd_t fd_) -{ - unsigned char *data_p = data; - size_t size = pgm_max_tpdu - udp_header_size; - int offset = 0; - - // Get one packet of data from the encoder. - encoder.get_data (&data_p, &size, &offset); - // If there is no data in the pipe - if (size == 0) { - // Stop polling, we will be re-started by activate_out (). - reset_pollout (socket_handle); - return; - } - assert ((size + udp_header_size) <= pgm_max_tpdu); - - // Prepare UDP header (seqno, offset). - unsigned char udp_header [udp_header_size]; - put_uint32 (udp_header, seq_no); - put_uint16 (udp_header + 4, (uint16_t) offset); - - // Send out the message, ensuring the whole message was sent. - struct iovec iov[2] = { - { udp_header, udp_header_size }, - { data_p, size } - }; - ssize_t write_bytes = writev (socket, iov, 2); - - // If we got ECONNREFUSED, there is no one on the other end. - // Drop the packet and back off for XS_RECONNECT_IVL milliseconds. - if (write_bytes == -1 && errno == ECONNREFUSED) { - backoff_timer = add_timer (options.reconnect_ivl); - // Polling will be re-started by timer_event (). - reset_pollout (socket_handle); - // Increase sequence number to ensure other end is notified of - // dropped packet. - ++seq_no; - return; - } - errno_assert (write_bytes > 0); - assert ((size_t)write_bytes == (udp_header_size + size)); - - // Packet was sent successfully, increase sequence number. - ++seq_no; -} - -// Called when our backoff timer expires. -void xs::udp_sender_t::timer_event (handle_t handle_) -{ - xs_assert (handle_ == backoff_timer); - backoff_timer = NULL; - - // Re-start polling. - set_pollout (socket_handle); - - out_event (retired_fd); -} diff --git a/src/udp_sender.hpp b/src/udp_sender.hpp deleted file mode 100644 index e7c2472..0000000 --- a/src/udp_sender.hpp +++ /dev/null @@ -1,97 +0,0 @@ -/* - Copyright (c) 2012 Martin Lucina <martin@lucina.net> - - This file is part of Crossroads I/O. - - Crossroads I/O is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or (at your - option) any later version. - - Crossroads I/O is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public - License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __XS_UDP_SENDER_HPP_INCLUDED__ -#define __XS_UDP_SENDER_HPP_INCLUDED__ - -#include "platform.hpp" - -#include "stdint.hpp" -#include "io_object.hpp" -#include "i_engine.hpp" -#include "options.hpp" -#include "address.hpp" -#include "encoder.hpp" - -namespace xs -{ - - class io_thread_t; - class session_base_t; - - class udp_sender_t : public io_object_t, public i_engine - { - - public: - - udp_sender_t (xs::io_thread_t *parent_, const options_t &options_); - ~udp_sender_t (); - - int init (const char *address_); - - // i_engine interface implementation. - void plug (xs::io_thread_t *io_thread_, - xs::session_base_t *session_); - void unplug (); - void terminate (); - void activate_in (); - void activate_out (); - - // i_poll_events interface implementation. - void in_event (fd_t fd_); - void out_event (fd_t fd_); - void timer_event (handle_t handle_); - - private: - - // Underlying UDP socket. - fd_t socket; - handle_t socket_handle; - - // Socket address. - address_t address; - - // Socket options. - options_t options; - - // Encoder for this socket. - encoder_t encoder; - - // Associated session. - xs::session_base_t *session; - - // UDP packet header size. - static const size_t udp_header_size = 6; - - // Packet buffer for outgoing packets. - unsigned char data [pgm_max_tpdu]; - - // Sequence number for outgoing packets. - uint32_t seq_no; - - // Backoff timer if receiver is not present. - handle_t backoff_timer; - - udp_sender_t (const udp_sender_t&); - const udp_sender_t &operator = (const udp_sender_t&); - }; - -} - -#endif |