From 969522bbf55467b6f6e8113be28451d087060843 Mon Sep 17 00:00:00 2001 From: malosek Date: Wed, 16 Sep 2009 10:11:01 +0200 Subject: added OpenPGM receiver - ZMQ_SUB --- c/zmq.h | 1 + src/Makefile.am | 2 + src/options.cpp | 3 +- src/options.hpp | 7 +- src/pgm_receiver.cpp | 202 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/pgm_receiver.hpp | 98 +++++++++++++++++++++++++ src/pgm_sender.cpp | 7 +- src/pgm_sender.hpp | 2 +- src/pgm_socket.cpp | 17 +++-- src/socket_base.cpp | 75 +++++++++++++++++-- src/sub.cpp | 6 ++ 11 files changed, 395 insertions(+), 25 deletions(-) create mode 100644 src/pgm_receiver.cpp create mode 100644 src/pgm_receiver.hpp diff --git a/c/zmq.h b/c/zmq.h index d2ca20a..58c5551 100644 --- a/c/zmq.h +++ b/c/zmq.h @@ -53,6 +53,7 @@ extern "C" { #define ZMQ_UNSUBSCRIBE 7 // string #define ZMQ_RATE 8 // int64_t #define ZMQ_RECOVERY_IVL 9 // int64_t +#define ZMQ_MCAST_LOOP 10 // boolean // The operation should be performed in non-blocking mode. I.e. if it cannot // be processed immediately, error should be returned with errno set to EAGAIN. diff --git a/src/Makefile.am b/src/Makefile.am index ce88b26..398c861 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -62,6 +62,7 @@ libzmq_la_SOURCES = $(pgm_sources) \ object.hpp \ options.hpp \ owned.hpp \ + pgm_receiver.hpp \ pgm_sender.hpp \ pgm_socket.hpp \ pipe.hpp \ @@ -104,6 +105,7 @@ libzmq_la_SOURCES = $(pgm_sources) \ object.cpp \ options.cpp \ owned.cpp \ + pgm_receiver.cpp \ pgm_sender.cpp \ pgm_socket.cpp \ pipe.cpp \ diff --git a/src/options.cpp b/src/options.cpp index a39d312..55417f5 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -25,6 +25,7 @@ zmq::options_t::options_t () : swap (0), affinity (0), rate (100), - recovery_ivl (10) + recovery_ivl (10), + use_multicast_loop (false) { } diff --git a/src/options.hpp b/src/options.hpp index 4d359e3..c1ecb57 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -37,11 +37,14 @@ namespace zmq uint64_t affinity; std::string identity; - // Maximum tranfer rate [kb/s]. + // Maximum tranfer rate [kb/s]. Default 100kb/s. uint32_t rate; - // Reliability time interval [s]. + // Reliability time interval [s]. Default 10s. uint32_t recovery_ivl; + + // Enable multicast loopback. Default disabled (false). + bool use_multicast_loop; }; } diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp new file mode 100644 index 0000000..f34ecf0 --- /dev/null +++ b/src/pgm_receiver.cpp @@ -0,0 +1,202 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "platform.hpp" + +#if defined ZMQ_HAVE_OPENPGM + +#include + +#include "pgm_receiver.hpp" +#include "err.hpp" +#include "stdint.hpp" +#include "wire.hpp" +#include "i_inout.hpp" + +//#define PGM_RECEIVER_DEBUG +//#define PGM_RECEIVER_DEBUG_LEVEL 1 + +// level 1 = key behaviour +// level 2 = processing flow +// level 4 = infos + +#ifndef PGM_RECEIVER_DEBUG +# define zmq_log(n, ...) while (0) +#else +# define zmq_log(n, ...) do { if ((n) <= PGM_RECEIVER_DEBUG_LEVEL) \ + { printf (__VA_ARGS__);}} while (0) +#endif + +zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, + const options_t &options_, const char *session_name_) : + io_object_t (parent_), + pgm_socket (true, options_), + options (options_), + session_name (session_name_), + joined (false), + inout (NULL) +{ +} + +zmq::pgm_receiver_t::~pgm_receiver_t () +{ +} + +int zmq::pgm_receiver_t::init (const char *network_) +{ + return pgm_socket.init (network_); +} + +void zmq::pgm_receiver_t::plug (i_inout *inout_) +{ + // Allocate 2 fds one for socket second for waiting pipe. + int socket_fd; + int waiting_pipe_fd; + + decoder.set_inout (inout_); + + // Fill socket_fd and waiting_pipe_fd from PGM transport + pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd); + + // Add socket_fd into poller. + socket_handle = add_fd (socket_fd); + + // Add waiting_pipe_fd into poller. + pipe_handle = add_fd (waiting_pipe_fd); + + // Set POLLIN for both handlers. + set_pollin (pipe_handle); + set_pollin (socket_handle); + + inout = inout_; +} + +void zmq::pgm_receiver_t::unplug () +{ + rm_fd (socket_handle); + rm_fd (pipe_handle); + decoder.set_inout (NULL); + inout = NULL; +} + +void zmq::pgm_receiver_t::revive () +{ + zmq_assert (false); +} + +void zmq::pgm_receiver_t::reconnect () +{ + // Save inout ptr. + i_inout *inout_tmp = inout; + + // Unplug - plug PGM transport. + unplug (); + decoder.reset (); + plug (inout_tmp); +} + +// POLLIN event from socket or waiting_pipe. +void zmq::pgm_receiver_t::in_event () +{ + void *data_with_offset; + ssize_t nbytes = 0; + + // Read all data from pgm socket. + while ((nbytes = receive_with_offset (&data_with_offset)) > 0) { + + // Push all the data to the decoder. + decoder.write ((unsigned char*)data_with_offset, nbytes); + } + + // Flush any messages decoder may have produced to the dispatcher. + inout->flush (); + + // Data loss detected. + if (nbytes == -1) { + + // Throw message in progress from decoder + decoder.reset (); + + // PGM receive is not joined anymore. + joined = false; + + // Recreate PGM transport. + reconnect (); + } +} + +void zmq::pgm_receiver_t::out_event () +{ + zmq_assert (false); +} + +ssize_t zmq::pgm_receiver_t::receive_with_offset + (void **data_) +{ + + // Data from PGM socket. + void *rd = NULL; + unsigned char *raw_data = NULL; + + // Read data from underlying pgm_socket. + ssize_t nbytes = pgm_socket.receive ((void**) &rd); + raw_data = (unsigned char*) rd; + + // No ODATA or RDATA. + if (!nbytes) + return 0; + + // Data loss. + if (nbytes == -1) { + return -1; + } + + // Read offset of the fist message in current APDU. + uint16_t apdu_offset = get_uint16 (raw_data); + + // Shift raw_data & decrease nbytes by the first message offset + // information (sizeof uint16_t). + *data_ = raw_data + sizeof (uint16_t); + nbytes -= sizeof (uint16_t); + + // There is not beginning of the message in current APDU and we + // are not joined jet -> throwing data. + if (apdu_offset == 0xFFFF && !joined) { + *data_ = NULL; + return 0; + } + + // Now is the possibility to join the stream. + if (!joined) { + + // We have to move data to the begining of the first message. + *data_ = (unsigned char *)*data_ + apdu_offset; + nbytes -= apdu_offset; + + // Joined the stream. + joined = true; + + zmq_log (2, "joined into the stream, %s(%i)\n", + __FILE__, __LINE__); + } + + return nbytes; +} +#endif + diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp new file mode 100644 index 0000000..ce9fa1a --- /dev/null +++ b/src/pgm_receiver.hpp @@ -0,0 +1,98 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_PGM_RECEIVER_HPP_INCLUDED__ +#define __ZMQ_PGM_RECEIVER_HPP_INCLUDED__ + +#include "platform.hpp" + +#if defined ZMQ_HAVE_OPENPGM + +#include "io_object.hpp" +#include "i_engine.hpp" +#include "options.hpp" +#include "zmq_decoder.hpp" +#include "pgm_socket.hpp" + +namespace zmq +{ + + class pgm_receiver_t : public io_object_t, public i_engine + { + + public: + + // Creates gm_engine. Underlying PGM connection is initialised + // using network_ parameter. + pgm_receiver_t (class io_thread_t *parent_, const options_t &options_, + const char *session_name_); + ~pgm_receiver_t (); + + int init (const char *network_); + void reconnect (); + + // i_engine interface implementation. + void plug (struct i_inout *inout_); + void unplug (); + void revive (); + + // i_poll_events interface implementation. + void in_event (); + void out_event (); + + private: + // Read exactly iov_len_ count APDUs, function returns number + // of bytes received. Note that if we did not join message stream + // before and there is not message beginning in the APDUs being + // received iov_len for such a APDUs will be 0. + ssize_t receive_with_offset (void **data_); + + // Message decoder. + zmq_decoder_t decoder; + + // PGM socket. + pgm_socket_t pgm_socket; + + // Socket options. + options_t options; + + // Name of the session associated with the connecter. + std::string session_name; + + // If receiver joined the messages stream. + bool joined; + + // Parent session. + i_inout *inout; + + // Poll handle associated with PGM socket. + handle_t socket_handle; + + // Poll handle associated with engine PGM waiting pipe. + handle_t pipe_handle; + + pgm_receiver_t (const pgm_receiver_t&); + void operator = (const pgm_receiver_t&); + }; + +} + +#endif + +#endif diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 8e7a66b..9b1cef4 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -90,11 +90,6 @@ void zmq::pgm_sender_t::plug (i_inout *inout_) set_pollout (handle); inout = inout_; - - zmq_log (1, "plug: downlink_socket_fd %i, uplink_socket_fd %i, %s(%i)", - downlink_socket_fd, uplink_socket_fd, __FILE__, __LINE__); - - std::cout << std::flush; } void zmq::pgm_sender_t::unplug () @@ -185,7 +180,7 @@ void zmq::pgm_sender_t::out_event () size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_, size_t size_, uint16_t offset_) { - zmq_log (1, "data_size %i, first message offset %i, %s(%i)", + zmq_log (1, "data_size %i, first message offset %i, %s(%i)\n", (int) size_, offset_, __FILE__, __LINE__); std::cout << std::flush; diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index b1d1599..80be8d4 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -77,7 +77,7 @@ namespace zmq handle_t handle; handle_t uplink_handle; - // ? + // Parent session. i_inout *inout; // Output buffer from pgm_socket. diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 5e2e764..315b43e 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -386,12 +386,14 @@ int zmq::pgm_socket_t::open_transport (void) return -1; } } - + // Enable multicast loopback. - rc = pgm_transport_set_multicast_loop (g_transport, true); - if (rc != 0) { - errno = EINVAL; - return -1; + if (options.use_multicast_loop) { + rc = pgm_transport_set_multicast_loop (g_transport, true); + if (rc != 0) { + errno = EINVAL; + return -1; + } } // Bind a transport to the specified network devices. @@ -486,6 +488,7 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_) // Send one APDU, transmit window owned memory. size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) { + iovec iov = {data_,data_len_}; ssize_t nbytes = pgm_transport_send_packetv (g_transport, &iov, 1, @@ -561,7 +564,6 @@ void zmq::pgm_socket_t::free_buffer (void *data_) // returned. ssize_t zmq::pgm_socket_t::receive (void **raw_data_) { - // We just sent all data from pgm_transport_recvmsgv up // and have to return 0 that another engine in this thread is scheduled. if (nbytes_rec == nbytes_processed && nbytes_rec > 0) { @@ -575,7 +577,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_) } // If we have are going first time or if we have processed all pgm_msgv_t - // structure previaously read from the pgm socket. + // structure previously read from the pgm socket. if (nbytes_rec == nbytes_processed) { // Check program flow. @@ -615,6 +617,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_) } zmq_log (4, "received %i bytes\n", (int)nbytes_rec); + } zmq_assert (nbytes_rec > 0); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 900f1c5..c195e91 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -37,6 +37,7 @@ #include "err.hpp" #include "platform.hpp" #include "pgm_sender.hpp" +#include "pgm_receiver.hpp" zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) : object_t (parent_), @@ -156,6 +157,14 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, options.recovery_ivl = (uint32_t) *((int64_t*) optval_); return 0; + case ZMQ_MCAST_LOOP: + if (optvallen_ != sizeof (bool)) { + errno = EINVAL; + return -1; + } + options.use_multicast_loop = optval_; + return 0; + default: errno = EINVAL; return -1; @@ -164,15 +173,43 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, int zmq::socket_base_t::bind (const char *addr_) { - zmq_listener_t *listener = new zmq_listener_t ( - choose_io_thread (options.affinity), this, options); - int rc = listener->set_address (addr_); - if (rc != 0) + // Parse addr_ string. + std::string addr_type; + std::string addr_args; + + std::string addr (addr_); + std::string::size_type pos = addr.find ("://"); + + if (pos == std::string::npos) { + errno = EINVAL; return -1; + } - send_plug (listener); - send_own (this, listener); - return 0; + addr_type = addr.substr (0, pos); + addr_args = addr.substr (pos + 3); + + if (addr_type == "tcp") { + zmq_listener_t *listener = new zmq_listener_t ( + choose_io_thread (options.affinity), this, options); + int rc = listener->set_address (addr_args.c_str ()); + if (rc != 0) + return -1; + + send_plug (listener); + send_own (this, listener); + return 0; + } + +#if defined ZMQ_HAVE_OPENPGM + if (addr_type == "pgm") { + // In the case of PGM bind behaves the same like connect. + return connect (addr_); + } +#endif + + // Unknown address type. + errno = EFAULT; + return -1; } int zmq::socket_base_t::connect (const char *addr_) @@ -246,6 +283,8 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "pgm") { switch (type) { + + // PGM sender. case ZMQ_PUB: { pgm_sender_t *pgm_sender = @@ -266,9 +305,29 @@ int zmq::socket_base_t::connect (const char *addr_) break; } + + // PGM receiver. case ZMQ_SUB: - zmq_assert (false); + { + pgm_receiver_t *pgm_receiver = + new pgm_receiver_t (choose_io_thread (options.affinity), options, + session_name.c_str ()); + + int rc = pgm_receiver->init (addr_args.c_str ()); + if (rc != 0) { + delete pgm_receiver; + return -1; + } + + // Reserve a sequence number for following 'attach' command. + session->inc_seqnum (); + send_attach (session, pgm_receiver); + + pgm_receiver = NULL; + break; + } + default: errno = EINVAL; return -1; diff --git a/src/sub.cpp b/src/sub.cpp index 51e0c23..101c62f 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -101,6 +101,12 @@ int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_) if (rc != 0 && errno == EAGAIN) return -1; + // If there is no subscription return -1/EAGAIN. + if (!all_count && prefixes.empty () && topics.empty ()) { + errno = EAGAIN; + return -1; + } + // If there is at least one "*" subscription, the message matches. if (all_count) return 0; -- cgit v1.2.3