diff options
Diffstat (limited to 'src/udp_sender.cpp')
-rw-r--r-- | src/udp_sender.cpp | 197 |
1 files changed, 0 insertions, 197 deletions
diff --git a/src/udp_sender.cpp b/src/udp_sender.cpp deleted file mode 100644 index ddcffbb..0000000 --- a/src/udp_sender.cpp +++ /dev/null @@ -1,197 +0,0 @@ -/* - Copyright (c) 2012 Martin Lucina <martin@lucina.net> - - This file is part of Crossroads I/O. - - Crossroads I/O is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or (at your - option) any later version. - - Crossroads I/O is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public - License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "platform.hpp" - -#include <stdlib.h> - -#include "io_thread.hpp" -#include "udp_sender.hpp" -#include "session_base.hpp" -#include "err.hpp" -#include "wire.hpp" -#include "stdint.hpp" -#include "ip.hpp" - -xs::udp_sender_t::udp_sender_t (io_thread_t *parent_, - const options_t &options_) : - io_object_t (parent_), - options (options_), - encoder (0), - seq_no (1), - backoff_timer (NULL) -{ -} - -xs::udp_sender_t::~udp_sender_t () -{ - if (backoff_timer) { - rm_timer (backoff_timer); - } -} - -int xs::udp_sender_t::init (const char *address_) -{ - int rc; - - rc = address_resolve_tcp (&address, address_, false, options.ipv4only); - if (rc != 0) - return rc; - - // Create a connected UDP socket for a single peer, and make - // it non-blocking. - socket = open_socket (address.ss_family, SOCK_DGRAM, IPPROTO_UDP); - if (socket == -1) - return -1; - rc = ::connect (socket, (const sockaddr *)&address, - address_size (&address)); - if (rc != 0) - return -1; - unblock_socket (socket); - - return 0; -} - -void xs::udp_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) -{ - session = session_; - encoder.set_session (session); - - // Start polling. - socket_handle = add_fd (socket); - set_pollout (socket_handle); - - // UDP cannot pass subscriptions upstream; for now just fake subscribing - // to all messages. - msg_t msg; - msg.init_size (1); - *(unsigned char*) msg.data () = 1; - int rc = session_->write (&msg); - errno_assert (rc == 0); - session_->flush (); -} - -void xs::udp_sender_t::unplug () -{ - rm_fd (socket_handle); - - session = NULL; -} - -void xs::udp_sender_t::terminate () -{ - unplug (); - delete this; -} - -// Called when our pipe is reactivated (has more data for us). -void xs::udp_sender_t::activate_out () -{ - // Reactivate polling. - set_pollout (socket_handle); - // Try and send more data. - out_event (retired_fd); -} - -void xs::udp_sender_t::activate_in () -{ - xs_assert (false); -} - -// Called when POLLERR is fired on the socket. -void xs::udp_sender_t::in_event (fd_t fd_) -{ - // Get the actual error code from the socket. - int err = 0; - socklen_t len = sizeof (err); - int rc = getsockopt (socket, SOL_SOCKET, SO_ERROR, (char *) &err, &len); - errno_assert (rc == 0); - - // If we got ECONNREFUSED, there is no one on the other end. - // Stop sending for XS_RECONNECT_IVL milliseconds. - if (err == ECONNREFUSED) { - backoff_timer = add_timer (options.reconnect_ivl); - // Polling will be re-started by timer_event (). - reset_pollout (socket_handle); - return; - } - - // Any other error is a bug, for now. - errno = err; - errno_assert (false); -} - -// Called when POLLOUT is fired on the socket. -void xs::udp_sender_t::out_event (fd_t fd_) -{ - unsigned char *data_p = data; - size_t size = pgm_max_tpdu - udp_header_size; - int offset = 0; - - // Get one packet of data from the encoder. - encoder.get_data (&data_p, &size, &offset); - // If there is no data in the pipe - if (size == 0) { - // Stop polling, we will be re-started by activate_out (). - reset_pollout (socket_handle); - return; - } - assert ((size + udp_header_size) <= pgm_max_tpdu); - - // Prepare UDP header (seqno, offset). - unsigned char udp_header [udp_header_size]; - put_uint32 (udp_header, seq_no); - put_uint16 (udp_header + 4, (uint16_t) offset); - - // Send out the message, ensuring the whole message was sent. - struct iovec iov[2] = { - { udp_header, udp_header_size }, - { data_p, size } - }; - ssize_t write_bytes = writev (socket, iov, 2); - - // If we got ECONNREFUSED, there is no one on the other end. - // Drop the packet and back off for XS_RECONNECT_IVL milliseconds. - if (write_bytes == -1 && errno == ECONNREFUSED) { - backoff_timer = add_timer (options.reconnect_ivl); - // Polling will be re-started by timer_event (). - reset_pollout (socket_handle); - // Increase sequence number to ensure other end is notified of - // dropped packet. - ++seq_no; - return; - } - errno_assert (write_bytes > 0); - assert ((size_t)write_bytes == (udp_header_size + size)); - - // Packet was sent successfully, increase sequence number. - ++seq_no; -} - -// Called when our backoff timer expires. -void xs::udp_sender_t::timer_event (handle_t handle_) -{ - xs_assert (handle_ == backoff_timer); - backoff_timer = NULL; - - // Re-start polling. - set_pollout (socket_handle); - - out_event (retired_fd); -} |