diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile.am | 4 | ||||
| -rw-r--r-- | src/session_base.cpp | 38 | ||||
| -rw-r--r-- | src/socket_base.cpp | 6 | ||||
| -rw-r--r-- | src/udp_receiver.cpp | 211 | ||||
| -rw-r--r-- | src/udp_receiver.hpp | 101 | ||||
| -rw-r--r-- | src/udp_sender.cpp | 197 | ||||
| -rw-r--r-- | src/udp_sender.hpp | 97 | 
7 files changed, 3 insertions, 651 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index 9ed62b1..1daee23 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -78,8 +78,6 @@ libxs_la_SOURCES = \      tcp_listener.hpp \      thread.hpp \      topic_filter.hpp \ -    udp_receiver.hpp \ -    udp_sender.hpp \      upoll.hpp \      windows.hpp \      wire.hpp \ @@ -140,8 +138,6 @@ libxs_la_SOURCES = \      tcp_listener.cpp \      thread.cpp \      topic_filter.cpp \ -    udp_receiver.cpp \ -    udp_sender.cpp \      upoll.cpp \      xpub.cpp \      xrep.cpp \ diff --git a/src/session_base.cpp b/src/session_base.cpp index 49fdce1..1ffc060 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -30,8 +30,6 @@  #include "ipc_connecter.hpp"  #include "pgm_sender.hpp"  #include "pgm_receiver.hpp" -#include "udp_sender.hpp" -#include "udp_receiver.hpp"  #include "req.hpp"  #include "xreq.hpp" @@ -486,42 +484,6 @@ void xs::session_base_t::start_connecting (bool wait_)      }  #endif -    //  UDP support. -    if (protocol == "udp") { - -        //  At this point we'll create message pipes to the session straight -        //  away. There's no point in delaying it as no concept of 'connect' -        //  exists with UDP anyway. -        if (options.type == XS_PUB || options.type == XS_XPUB) { - -            //  UDP sender. -            udp_sender_t *udp_sender =  new (std::nothrow) udp_sender_t ( -                io_thread, options); -            alloc_assert (udp_sender); - -            int rc = udp_sender->init (address.c_str ()); -            xs_assert (rc == 0); - -            send_attach (this, udp_sender); -        } -        else if (options.type == XS_SUB || options.type == XS_XSUB) { - -            //  UDP receiver. -            udp_receiver_t *udp_receiver =  new (std::nothrow) udp_receiver_t ( -                io_thread, options); -            alloc_assert (udp_receiver); - -            int rc = udp_receiver->init (address.c_str ()); -            xs_assert (rc == 0); - -            send_attach (this, udp_receiver); -        } -        else -            xs_assert (false); - -        return; -    } -      xs_assert (false);  } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 6b8588a..5aed0dc 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -208,7 +208,7 @@ int xs::socket_base_t::check_protocol (const std::string &protocol_)  {      //  First check out whether the protcol is something we are aware of.      if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && -          protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "udp") { +          protocol_ != "pgm" && protocol_ != "epgm") {          errno = EPROTONOSUPPORT;          return -1;      } @@ -234,7 +234,7 @@ int xs::socket_base_t::check_protocol (const std::string &protocol_)      //  Check whether socket type and transport protocol match.      //  Specifically, multicast protocols can't be combined with      //  bi-directional messaging patterns (socket types). -    if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "udp") && +    if ((protocol_ == "pgm" || protocol_ == "epgm") &&            options.type != XS_PUB && options.type != XS_SUB &&            options.type != XS_XPUB && options.type != XS_XSUB) {          errno = ENOCOMPATPROTO; @@ -359,7 +359,7 @@ int xs::socket_base_t::bind (const char *addr_)          return 0;      } -    if (protocol == "pgm" || protocol == "epgm" || protocol == "udp") { +    if (protocol == "pgm" || protocol == "epgm") {          //  For convenience's sake, bind can be used interchageable with          //  connect for PGM and EPGM transports. diff --git a/src/udp_receiver.cpp b/src/udp_receiver.cpp deleted file mode 100644 index 9cd3c4c..0000000 --- a/src/udp_receiver.cpp +++ /dev/null @@ -1,211 +0,0 @@ -/* -    Copyright (c) 2012 Martin Lucina <martin@lucina.net> - -    This file is part of Crossroads I/O. - -    Crossroads I/O is free software; you can redistribute it and/or modify it -    under the terms of the GNU Lesser General Public License as published by -    the Free Software Foundation; either version 3 of the License, or (at your -    option) any later version. - -    Crossroads I/O is distributed in the hope that it will be useful, but -    WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY -    or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public -    License for more details. - -    You should have received a copy of the GNU Lesser General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "platform.hpp" - -#include <new> - -#include "udp_receiver.hpp" -#include "session_base.hpp" -#include "stdint.hpp" -#include "wire.hpp" -#include "err.hpp" -#include "ip.hpp" - -xs::udp_receiver_t::udp_receiver_t (class io_thread_t *parent_, -      const options_t &options_) : -    io_object_t (parent_), -    options (options_), -    pending_bytes (0), -    pending_p (NULL), -    session (NULL), -    last_seq_no (0) -{ -    decoder = new (std::nothrow) decoder_t (in_batch_size, options.maxmsgsize); -    alloc_assert (decoder); -} - -xs::udp_receiver_t::~udp_receiver_t () -{ -    delete decoder; -} - -int xs::udp_receiver_t::init (const char *address_) -{ -    int rc; - -    rc = address_resolve_tcp (&address, address_, false, options.ipv4only); -    if (rc != 0) -        return rc; - -    //  Create a unconnected UDP socket, bind it to the requested address -    //  and make it non-blocking. -    socket = open_socket (address.ss_family, SOCK_DGRAM, IPPROTO_UDP); -    if (socket == -1) -        return -1; -    rc = ::bind (socket, (const sockaddr *)&address, address_size (&address)); -    if (rc != 0) -        return -1; -    unblock_socket (socket); - -    return 0; -} - -void xs::udp_receiver_t::plug (io_thread_t *io_thread_, -    session_base_t *session_) -{ -    //  Start polling. -    socket_handle = add_fd (socket); -    set_pollin (socket_handle); - -    session = session_; -    decoder->set_session (session); - -    //  If there are any subscriptions already queued in the session, drop them. -    drop_subscriptions (); -} - -void xs::udp_receiver_t::unplug () -{ -    rm_fd (socket_handle); - -    session = NULL; -} - -void xs::udp_receiver_t::terminate () -{ -    unplug (); -    delete this; -} - -//  Called when our pipe is reactivated (has more data for us). -void xs::udp_receiver_t::activate_out () -{ -    drop_subscriptions (); -} - -//  Called when our pipe is reactivated (able to accept more data). -void xs::udp_receiver_t::activate_in () -{ -    //  Process any pending data. -    if (pending_bytes > 0) { -        ssize_t processed_bytes = \ -            decoder->process_buffer (pending_p, pending_bytes); -        //  Flush any messages produced by the decoder to the pipe. -        session->flush (); -        if (processed_bytes < pending_bytes) { -            //  Some data (still) could not be written to the pipe. -            pending_bytes -= processed_bytes; -            pending_p += processed_bytes; -            //  Try again later. -            return; -        } -        //  Done with unprocessed data. -        pending_bytes = 0; -    } - -    //  Reactivate polling. -    set_pollin (socket_handle); - -    //  Read any data that might have showed up on the socket in the mean time. -    in_event (retired_fd); -} - -//  Called when POLLIN is fired on the socket. -void xs::udp_receiver_t::in_event (fd_t fd_) -{ -    //  Receive a packet. -    ssize_t recv_bytes = recv (socket, data, sizeof data, 0); -    //  At the moment, go back to polling on EAGAIN and assert on any -    //  other error. -    if ((recv_bytes < 0) && errno == EAGAIN) -        return; -    assert (recv_bytes > 0); - -    //  Parse UDP packet header. -    unsigned char *data_p = data; -    uint32_t seq_no = get_uint32 (data_p); -    uint16_t offset = get_uint16 (data_p + 4); -    data_p += udp_header_size; -    recv_bytes -= udp_header_size; - -    //  If this is our first packet, join the message stream. -    if (last_seq_no == 0) { -        if (offset == 0xffff) -            return; -        else { -            data_p += offset; -            recv_bytes -= offset; -        } -    } -    //  Otherwise, decide based on the sequence number. -    else { -        //  If this packet is in sequence, process the whole packet. -        if ((last_seq_no + 1) == seq_no) -            ; -        //  Otherwise, if it is an old packet, drop it. -        else if (seq_no <= last_seq_no) -            return; -        //  Otherwise we have packet loss, rejoin the message stream. -        else { -            if (offset == 0xffff) -                return; -            else { -                data_p += offset; -                recv_bytes -= offset; - -                //  Re-create decoder to clear state. -                delete decoder; -                decoder = NULL; -                decoder = new (std::nothrow) decoder_t (in_batch_size, -                    options.maxmsgsize); -                alloc_assert (decoder); -                decoder->set_session (session); -            } -        } -    } -    //  If we get here, we will process this packet and it becomes our -    //  last seen sequence number. -    last_seq_no = seq_no; - -    //  Decode data and push it to our pipe. -    ssize_t processed_bytes = decoder->process_buffer (data_p, recv_bytes); -    if (processed_bytes < recv_bytes) { -        //  Some data could not be written to the pipe. Save it for later. -        pending_bytes = recv_bytes - processed_bytes; -        pending_p = data_p + processed_bytes; -        //  Stop polling. We will be restarted by a call to activate_in (). -        reset_pollin (socket_handle); -    } - -    //  Flush any messages produced by the decoder to the pipe. -    session->flush (); -} - -void xs::udp_receiver_t::out_event (fd_t fd_) -{ -    assert (false); -} - -void xs::udp_receiver_t::drop_subscriptions () -{ -    msg_t msg; -    while (session->read (&msg)) -        msg.close (); -} diff --git a/src/udp_receiver.hpp b/src/udp_receiver.hpp deleted file mode 100644 index 114df24..0000000 --- a/src/udp_receiver.hpp +++ /dev/null @@ -1,101 +0,0 @@ -/* -    Copyright (c) 2012 Martin Lucina <martin@lucina.net> - -    This file is part of Crossroads I/O. - -    Crossroads I/O is free software; you can redistribute it and/or modify it -    under the terms of the GNU Lesser General Public License as published by -    the Free Software Foundation; either version 3 of the License, or (at your -    option) any later version. - -    Crossroads I/O is distributed in the hope that it will be useful, but -    WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY -    or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public -    License for more details. - -    You should have received a copy of the GNU Lesser General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __XS_UDP_RECEIVER_HPP_INCLUDED__ -#define __XS_UDP_RECEIVER_HPP_INCLUDED__ - -#include "platform.hpp" - -#include "io_object.hpp" -#include "i_engine.hpp" -#include "options.hpp" -#include "decoder.hpp" -#include "address.hpp" - -namespace xs -{ - -    class io_thread_t; -    class session_base_t; - -    class udp_receiver_t : public io_object_t, public i_engine -    { - -    public: - -        udp_receiver_t (xs::io_thread_t *parent_, const options_t &options_); -        ~udp_receiver_t (); - -        int init (const char *address_); - -        //  i_engine interface implementation. -        void plug (xs::io_thread_t *io_thread_, -            xs::session_base_t *session_); -        void unplug (); -        void terminate (); -        void activate_in (); -        void activate_out (); - -        //  i_poll_events interface implementation. -        void in_event (fd_t fd_); -        void out_event (fd_t fd_); - -    private: - -        //  We don't support forwarding subscriptions upstream (yet). -        void drop_subscriptions (); - -        //  Underlying UDP socket. -        fd_t socket; -        handle_t socket_handle; - -        //  Socket address. -        address_t address; - -        //  Socket options. -        options_t options; - -        //  Decoder for this socket. -        decoder_t *decoder; - -        //  Amount of unprocessed data waiting in decoder, if any. -        ssize_t pending_bytes; - -        //  Pointer to unprocessed data in buffer, if any. -        unsigned char *pending_p; - -        //  Buffer for data to process. -        unsigned char data [pgm_max_tpdu]; - -        //  Associated session. -        xs::session_base_t *session; - -        //  UDP header size. -        static const size_t udp_header_size = 6; - -        //  Last sequence number seen, 0 if none. -        uint32_t last_seq_no; - -        udp_receiver_t (const udp_receiver_t&); -        const udp_receiver_t &operator = (const udp_receiver_t&); -    }; - -} - -#endif diff --git a/src/udp_sender.cpp b/src/udp_sender.cpp deleted file mode 100644 index ddcffbb..0000000 --- a/src/udp_sender.cpp +++ /dev/null @@ -1,197 +0,0 @@ -/* -    Copyright (c) 2012 Martin Lucina <martin@lucina.net> - -    This file is part of Crossroads I/O. - -    Crossroads I/O is free software; you can redistribute it and/or modify it -    under the terms of the GNU Lesser General Public License as published by -    the Free Software Foundation; either version 3 of the License, or (at your -    option) any later version. - -    Crossroads I/O is distributed in the hope that it will be useful, but -    WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY -    or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public -    License for more details. - -    You should have received a copy of the GNU Lesser General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "platform.hpp" - -#include <stdlib.h> - -#include "io_thread.hpp" -#include "udp_sender.hpp" -#include "session_base.hpp" -#include "err.hpp" -#include "wire.hpp" -#include "stdint.hpp" -#include "ip.hpp" - -xs::udp_sender_t::udp_sender_t (io_thread_t *parent_, -      const options_t &options_) : -    io_object_t (parent_), -    options (options_), -    encoder (0), -    seq_no (1), -    backoff_timer (NULL) -{ -} - -xs::udp_sender_t::~udp_sender_t () -{ -    if (backoff_timer) { -        rm_timer (backoff_timer); -    } -} - -int xs::udp_sender_t::init (const char *address_) -{ -    int rc; - -    rc = address_resolve_tcp (&address, address_, false, options.ipv4only); -    if (rc != 0) -        return rc; - -    //  Create a connected UDP socket for a single peer, and make -    //  it non-blocking. -    socket = open_socket (address.ss_family, SOCK_DGRAM, IPPROTO_UDP); -    if (socket == -1) -        return -1; -    rc = ::connect (socket, (const sockaddr *)&address, -        address_size (&address)); -    if (rc != 0) -        return -1; -    unblock_socket (socket); - -    return 0; -} - -void xs::udp_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) -{ -    session = session_; -    encoder.set_session (session); - -    //  Start polling. -    socket_handle = add_fd (socket); -    set_pollout (socket_handle); - -    //  UDP cannot pass subscriptions upstream; for now just fake subscribing -    //  to all messages. -    msg_t msg; -    msg.init_size (1); -    *(unsigned char*) msg.data () = 1; -    int rc = session_->write (&msg); -    errno_assert (rc == 0); -    session_->flush (); -} - -void xs::udp_sender_t::unplug () -{ -    rm_fd (socket_handle); - -    session = NULL; -} - -void xs::udp_sender_t::terminate () -{ -    unplug (); -    delete this; -} - -//  Called when our pipe is reactivated (has more data for us). -void xs::udp_sender_t::activate_out () -{ -    //  Reactivate polling. -    set_pollout (socket_handle); -    //  Try and send more data. -    out_event (retired_fd); -} - -void xs::udp_sender_t::activate_in () -{ -    xs_assert (false); -} - -//  Called when POLLERR is fired on the socket. -void xs::udp_sender_t::in_event (fd_t fd_) -{ -    //  Get the actual error code from the socket. -    int err = 0; -    socklen_t len = sizeof (err); -    int rc = getsockopt (socket, SOL_SOCKET, SO_ERROR, (char *) &err, &len); -    errno_assert (rc == 0); - -    //  If we got ECONNREFUSED, there is no one on the other end. -    //  Stop sending for XS_RECONNECT_IVL milliseconds. -    if (err == ECONNREFUSED) { -        backoff_timer = add_timer (options.reconnect_ivl); -        //  Polling will be re-started by timer_event (). -        reset_pollout (socket_handle); -        return; -    } - -    //  Any other error is a bug, for now. -    errno = err; -    errno_assert (false); -} - -//  Called when POLLOUT is fired on the socket. -void xs::udp_sender_t::out_event (fd_t fd_) -{ -    unsigned char *data_p = data; -    size_t size = pgm_max_tpdu - udp_header_size; -    int offset = 0; - -    //  Get one packet of data from the encoder. -    encoder.get_data (&data_p, &size, &offset); -    //  If there is no data in the pipe -    if (size == 0) { -        //  Stop polling, we will be re-started by activate_out (). -        reset_pollout (socket_handle); -        return; -    } -    assert ((size + udp_header_size) <= pgm_max_tpdu); - -    //  Prepare UDP header (seqno, offset). -    unsigned char udp_header [udp_header_size]; -    put_uint32 (udp_header, seq_no); -    put_uint16 (udp_header + 4, (uint16_t) offset); - -    //  Send out the message, ensuring the whole message was sent. -    struct iovec iov[2] = { -        { udp_header, udp_header_size }, -        { data_p, size } -    }; -    ssize_t write_bytes = writev (socket, iov, 2); - -    //  If we got ECONNREFUSED, there is no one on the other end. -    //  Drop the packet and back off for XS_RECONNECT_IVL milliseconds. -    if (write_bytes == -1 && errno == ECONNREFUSED) { -        backoff_timer = add_timer (options.reconnect_ivl); -        //  Polling will be re-started by timer_event (). -        reset_pollout (socket_handle); -        //  Increase sequence number to ensure other end is notified of -        //  dropped packet. -        ++seq_no; -        return; -    } -    errno_assert (write_bytes > 0); -    assert ((size_t)write_bytes == (udp_header_size + size)); - -    //  Packet was sent successfully, increase sequence number. -    ++seq_no; -} - -//  Called when our backoff timer expires. -void xs::udp_sender_t::timer_event (handle_t handle_) -{ -    xs_assert (handle_ == backoff_timer); -    backoff_timer = NULL; - -    //  Re-start polling. -    set_pollout (socket_handle); - -    out_event (retired_fd); -} diff --git a/src/udp_sender.hpp b/src/udp_sender.hpp deleted file mode 100644 index e7c2472..0000000 --- a/src/udp_sender.hpp +++ /dev/null @@ -1,97 +0,0 @@ -/* -    Copyright (c) 2012 Martin Lucina <martin@lucina.net> - -    This file is part of Crossroads I/O. - -    Crossroads I/O is free software; you can redistribute it and/or modify it -    under the terms of the GNU Lesser General Public License as published by -    the Free Software Foundation; either version 3 of the License, or (at your -    option) any later version. - -    Crossroads I/O is distributed in the hope that it will be useful, but -    WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY -    or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public -    License for more details. - -    You should have received a copy of the GNU Lesser General Public License -    along with this program.  If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __XS_UDP_SENDER_HPP_INCLUDED__ -#define __XS_UDP_SENDER_HPP_INCLUDED__ - -#include "platform.hpp" - -#include "stdint.hpp" -#include "io_object.hpp" -#include "i_engine.hpp" -#include "options.hpp" -#include "address.hpp" -#include "encoder.hpp" - -namespace xs -{ - -    class io_thread_t; -    class session_base_t; - -    class udp_sender_t : public io_object_t, public i_engine -    { - -    public: - -        udp_sender_t (xs::io_thread_t *parent_, const options_t &options_); -        ~udp_sender_t (); - -        int init (const char *address_); - -        //  i_engine interface implementation. -        void plug (xs::io_thread_t *io_thread_, -            xs::session_base_t *session_); -        void unplug (); -        void terminate (); -        void activate_in (); -        void activate_out (); - -        //  i_poll_events interface implementation. -        void in_event (fd_t fd_); -        void out_event (fd_t fd_); -        void timer_event (handle_t handle_); - -    private: - -        //  Underlying UDP socket. -        fd_t socket; -        handle_t socket_handle; - -        //  Socket address. -        address_t address; - -        //  Socket options. -        options_t options; - -        //  Encoder for this socket. -        encoder_t encoder; - -        //  Associated session. -        xs::session_base_t *session; - -        //  UDP packet header size. -        static const size_t udp_header_size = 6; - -        //  Packet buffer for outgoing packets. -        unsigned char data [pgm_max_tpdu]; - -        //  Sequence number for outgoing packets. -        uint32_t seq_no; - -        //  Backoff timer if receiver is not present. -        handle_t backoff_timer; - -        udp_sender_t (const udp_sender_t&); -        const udp_sender_t &operator = (const udp_sender_t&); -    }; - -} - -#endif | 
