diff options
| -rw-r--r-- | c/zmq.h | 1 | ||||
| -rw-r--r-- | src/Makefile.am | 2 | ||||
| -rw-r--r-- | src/options.cpp | 3 | ||||
| -rw-r--r-- | src/options.hpp | 7 | ||||
| -rw-r--r-- | src/pgm_receiver.cpp | 207 | ||||
| -rw-r--r-- | src/pgm_receiver.hpp | 98 | ||||
| -rw-r--r-- | src/pgm_sender.cpp | 7 | ||||
| -rw-r--r-- | src/pgm_sender.hpp | 2 | ||||
| -rw-r--r-- | src/pgm_socket.cpp | 17 | ||||
| -rw-r--r-- | src/pub.cpp | 2 | ||||
| -rw-r--r-- | src/socket_base.cpp | 75 | ||||
| -rw-r--r-- | src/sub.cpp | 6 | 
12 files changed, 401 insertions, 26 deletions
| @@ -53,6 +53,7 @@ extern "C" {  #define ZMQ_UNSUBSCRIBE 7          //  string  #define ZMQ_RATE 8                 //  int64_t  #define ZMQ_RECOVERY_IVL 9         //  int64_t +#define ZMQ_MCAST_LOOP 10          //  boolean  //  The operation should be performed in non-blocking mode. I.e. if it cannot  //  be processed immediately, error should be returned with errno set to EAGAIN. diff --git a/src/Makefile.am b/src/Makefile.am index ce88b26..398c861 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -62,6 +62,7 @@ libzmq_la_SOURCES = $(pgm_sources) \      object.hpp \      options.hpp \      owned.hpp \ +    pgm_receiver.hpp \      pgm_sender.hpp \      pgm_socket.hpp \      pipe.hpp \ @@ -104,6 +105,7 @@ libzmq_la_SOURCES = $(pgm_sources) \      object.cpp \      options.cpp \      owned.cpp \ +    pgm_receiver.cpp \      pgm_sender.cpp \      pgm_socket.cpp \      pipe.cpp \ diff --git a/src/options.cpp b/src/options.cpp index a39d312..55417f5 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -25,6 +25,7 @@ zmq::options_t::options_t () :      swap (0),      affinity (0),      rate (100), -    recovery_ivl (10) +    recovery_ivl (10), +    use_multicast_loop (false)  {  } diff --git a/src/options.hpp b/src/options.hpp index 4d359e3..c1ecb57 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -37,11 +37,14 @@ namespace zmq          uint64_t affinity;          std::string identity; -        //  Maximum tranfer rate [kb/s]. +        //  Maximum tranfer rate [kb/s]. Default 100kb/s.          uint32_t rate; -        //  Reliability time interval [s]. +        //  Reliability time interval [s]. Default 10s.          uint32_t recovery_ivl; + +        //  Enable multicast loopback. Default disabled (false). +        bool use_multicast_loop;      };  } diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp new file mode 100644 index 0000000..6ea310c --- /dev/null +++ b/src/pgm_receiver.cpp @@ -0,0 +1,207 @@ +/* +    Copyright (c) 2007-2009 FastMQ Inc. + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "platform.hpp" + +#if defined ZMQ_HAVE_OPENPGM + +#include <iostream> + +#include "pgm_receiver.hpp" +#include "err.hpp" +#include "stdint.hpp" +#include "wire.hpp" +#include "i_inout.hpp" + +//#define PGM_RECEIVER_DEBUG +//#define PGM_RECEIVER_DEBUG_LEVEL 1 + +// level 1 = key behaviour +// level 2 = processing flow +// level 4 = infos + +#ifndef PGM_RECEIVER_DEBUG +#   define zmq_log(n, ...)  while (0) +#else +#   define zmq_log(n, ...)    do { if ((n) <= PGM_RECEIVER_DEBUG_LEVEL) \ +        { printf (__VA_ARGS__);}} while (0) +#endif + +zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,  +      const options_t &options_, const char *session_name_) : +    io_object_t (parent_), +    decoder (NULL), +    pgm_socket (true, options_), +    options (options_), +    session_name (session_name_), +    joined (false), +    inout (NULL) +{ +} + +zmq::pgm_receiver_t::~pgm_receiver_t () +{ +    if (decoder) +        delete decoder; +} + +int zmq::pgm_receiver_t::init (const char *network_) +{ +    decoder = new zmq_decoder_t; +    zmq_assert (decoder); + +    return pgm_socket.init (network_); +} + +void zmq::pgm_receiver_t::plug (i_inout *inout_) +{ +    //  Allocate 2 fds one for socket second for waiting pipe. +    int socket_fd; +    int waiting_pipe_fd; + +    decoder->set_inout (inout_); + +    //  Fill socket_fd and waiting_pipe_fd from PGM transport +    pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd); + +    //  Add socket_fd into poller. +    socket_handle = add_fd (socket_fd); + +    //  Add waiting_pipe_fd into poller. +    pipe_handle = add_fd (waiting_pipe_fd); + +    //  Set POLLIN for both handlers. +    set_pollin (pipe_handle); +    set_pollin (socket_handle); + +    inout = inout_; +} + +void zmq::pgm_receiver_t::unplug () +{ +    rm_fd (socket_handle); +    rm_fd (pipe_handle); +    decoder->set_inout (NULL); +    inout = NULL; +} + +void zmq::pgm_receiver_t::revive () +{ +    zmq_assert (false); +} + +void zmq::pgm_receiver_t::reconnect () +{ +    //  Save inout ptr. +    i_inout *inout_tmp = inout; + +    //  PGM receiver is not joined anymore. +    joined = false;     + +    //  Unplug - plug PGM transport. +    unplug (); +    delete decoder; +    decoder = new zmq_decoder_t; +    zmq_assert (decoder); +    plug (inout_tmp); +} + +//  POLLIN event from socket or waiting_pipe. +void zmq::pgm_receiver_t::in_event () +{ +    void *data_with_offset; +    ssize_t nbytes = 0; + +    //  Read all data from pgm socket. +    while ((nbytes = receive_with_offset (&data_with_offset)) > 0) { +         +        //  Push all the data to the decoder. +        decoder->write ((unsigned char*)data_with_offset, nbytes); +    } + +    //  Flush any messages decoder may have produced to the dispatcher. +    inout->flush (); + +    //  Data loss detected. +    if (nbytes == -1) { + +        //  Recreate PGM transport. +        reconnect (); +    } +} + +void zmq::pgm_receiver_t::out_event () +{ +    zmq_assert (false); +} + +ssize_t zmq::pgm_receiver_t::receive_with_offset  +    (void **data_) +{ + +    //  Data from PGM socket. +    void *rd = NULL; +    unsigned char *raw_data = NULL; + +    // Read data from underlying pgm_socket. +    ssize_t nbytes = pgm_socket.receive ((void**) &rd); +    raw_data = (unsigned char*) rd; + +    //  No ODATA or RDATA. +    if (!nbytes) +        return 0; + +    //  Data loss. +    if (nbytes == -1) { +        return -1; +    } + +    // Read offset of the fist message in current APDU. +    uint16_t apdu_offset = get_uint16 (raw_data); + +    // Shift raw_data & decrease nbytes by the first message offset  +    // information (sizeof uint16_t). +    *data_ = raw_data +  sizeof (uint16_t); +    nbytes -= sizeof (uint16_t); + +    //  There is not beginning of the message in current APDU and we +    //  are not joined jet -> throwing data. +    if (apdu_offset == 0xFFFF && !joined) { +        *data_ = NULL; +        return 0; +    } + +    //  Now is the possibility to join the stream. +    if (!joined) { +            +        //  We have to move data to the begining of the first message. +        *data_ = (unsigned char *)*data_ + apdu_offset; +        nbytes -= apdu_offset; + +        // Joined the stream. +        joined = true; + +        zmq_log (2, "joined into the stream, %s(%i)\n",  +            __FILE__, __LINE__); +    } + +    return nbytes; +} +#endif + diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp new file mode 100644 index 0000000..53d5340 --- /dev/null +++ b/src/pgm_receiver.hpp @@ -0,0 +1,98 @@ +/* +    Copyright (c) 2007-2009 FastMQ Inc. + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_PGM_RECEIVER_HPP_INCLUDED__ +#define __ZMQ_PGM_RECEIVER_HPP_INCLUDED__ + +#include "platform.hpp" + +#if defined ZMQ_HAVE_OPENPGM + +#include "io_object.hpp" +#include "i_engine.hpp" +#include "options.hpp" +#include "zmq_decoder.hpp" +#include "pgm_socket.hpp" + +namespace zmq +{ + +    class pgm_receiver_t : public io_object_t, public i_engine +    { +     +    public: + +        //  Creates gm_engine. Underlying PGM connection is initialised +        //  using network_ parameter. +        pgm_receiver_t (class io_thread_t *parent_, const options_t &options_, +            const char *session_name_); +        ~pgm_receiver_t (); + +        int init (const char *network_); +        void reconnect (); + +        //  i_engine interface implementation. +        void plug (struct i_inout *inout_); +        void unplug (); +        void revive (); + +        //  i_poll_events interface implementation. +        void in_event (); +        void out_event (); + +    private: +        //  Read exactly iov_len_ count APDUs, function returns number +        //  of bytes received. Note that if we did not join message stream  +        //  before and there is not message beginning in the APDUs being  +        //  received iov_len for such a APDUs will be 0. +        ssize_t receive_with_offset (void **data_); + +        //  Message decoder. +        zmq_decoder_t *decoder; +        +        //  PGM socket. +        pgm_socket_t pgm_socket; + +        //  Socket options. +        options_t options; + +        //  Name of the session associated with the connecter. +        std::string session_name; + +        // If receiver joined the messages stream. +        bool joined; + +        //  Parent session. +        i_inout *inout; + +        //  Poll handle associated with PGM socket. +        handle_t socket_handle; + +        //  Poll handle associated with engine PGM waiting pipe. +        handle_t pipe_handle; + +        pgm_receiver_t (const pgm_receiver_t&); +        void operator = (const pgm_receiver_t&); +    }; + +} + +#endif + +#endif diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 8e7a66b..9b1cef4 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -90,11 +90,6 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)      set_pollout (handle);      inout = inout_; - -    zmq_log (1, "plug: downlink_socket_fd %i, uplink_socket_fd %i, %s(%i)", -        downlink_socket_fd, uplink_socket_fd, __FILE__, __LINE__); - -    std::cout << std::flush;  }  void zmq::pgm_sender_t::unplug () @@ -185,7 +180,7 @@ void zmq::pgm_sender_t::out_event ()  size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_,       size_t size_, uint16_t offset_)  { -    zmq_log (1, "data_size %i, first message offset %i, %s(%i)", +    zmq_log (1, "data_size %i, first message offset %i, %s(%i)\n",          (int) size_, offset_, __FILE__, __LINE__);      std::cout << std::flush; diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index b1d1599..80be8d4 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -77,7 +77,7 @@ namespace zmq          handle_t handle;          handle_t uplink_handle; -        //  ? +        //  Parent session.          i_inout *inout;          //  Output buffer from pgm_socket. diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 5e2e764..315b43e 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -386,12 +386,14 @@ int zmq::pgm_socket_t::open_transport (void)              return -1;          }      } - +          //  Enable multicast loopback. -    rc = pgm_transport_set_multicast_loop (g_transport, true); -    if (rc != 0) { -        errno = EINVAL; -        return -1; +    if (options.use_multicast_loop) { +        rc = pgm_transport_set_multicast_loop (g_transport, true); +        if (rc != 0) { +            errno = EINVAL; +            return -1; +        }      }      //  Bind a transport to the specified network devices. @@ -486,6 +488,7 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_)  //  Send one APDU, transmit window owned memory.  size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)  { +      iovec iov = {data_,data_len_};      ssize_t nbytes = pgm_transport_send_packetv (g_transport, &iov, 1,  @@ -561,7 +564,6 @@ void zmq::pgm_socket_t::free_buffer (void *data_)  //  returned.  ssize_t zmq::pgm_socket_t::receive (void **raw_data_)  { -       //  We just sent all data from pgm_transport_recvmsgv up       //  and have to return 0 that another engine in this thread is scheduled.      if (nbytes_rec == nbytes_processed && nbytes_rec > 0) { @@ -575,7 +577,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)      }      //  If we have are going first time or if we have processed all pgm_msgv_t -    //  structure previaously read from the pgm socket. +    //  structure previously read from the pgm socket.      if (nbytes_rec == nbytes_processed) {          //  Check program flow. @@ -615,6 +617,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)          }          zmq_log (4, "received %i bytes\n", (int)nbytes_rec); +      }      zmq_assert (nbytes_rec > 0); diff --git a/src/pub.cpp b/src/pub.cpp index d6eca01..5c6e329 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -23,7 +23,7 @@  #include "err.hpp"  zmq::pub_t::pub_t (class app_thread_t *parent_) : -    socket_base_t (parent_, ZMQ_SUB) +    socket_base_t (parent_, ZMQ_PUB)  {  } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 900f1c5..c195e91 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -37,6 +37,7 @@  #include "err.hpp"  #include "platform.hpp"  #include "pgm_sender.hpp" +#include "pgm_receiver.hpp"  zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) :      object_t (parent_), @@ -156,6 +157,14 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,          options.recovery_ivl = (uint32_t) *((int64_t*) optval_);          return 0; +    case ZMQ_MCAST_LOOP: +        if (optvallen_ != sizeof (bool)) { +            errno = EINVAL; +            return -1; +        } +        options.use_multicast_loop = optval_; +        return 0; +      default:          errno = EINVAL;          return -1; @@ -164,15 +173,43 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,  int zmq::socket_base_t::bind (const char *addr_)  { -    zmq_listener_t *listener = new zmq_listener_t ( -        choose_io_thread (options.affinity), this, options); -    int rc = listener->set_address (addr_); -    if (rc != 0) +    //  Parse addr_ string. +    std::string addr_type; +    std::string addr_args; + +    std::string addr (addr_); +    std::string::size_type pos = addr.find ("://"); + +    if (pos == std::string::npos) { +        errno = EINVAL;          return -1; +    } -    send_plug (listener); -    send_own (this, listener); -    return 0; +    addr_type = addr.substr (0, pos); +    addr_args = addr.substr (pos + 3); + +    if (addr_type == "tcp") { +        zmq_listener_t *listener = new zmq_listener_t ( +            choose_io_thread (options.affinity), this, options); +        int rc = listener->set_address (addr_args.c_str ()); +        if (rc != 0) +            return -1; + +        send_plug (listener); +        send_own (this, listener); +        return 0; +    } + +#if defined ZMQ_HAVE_OPENPGM +    if (addr_type == "pgm") { +        //  In the case of PGM bind behaves the same like connect. +        return connect (addr_);  +    } +#endif + +    //  Unknown address type. +    errno = EFAULT; +    return -1;  }  int zmq::socket_base_t::connect (const char *addr_) @@ -246,6 +283,8 @@ int zmq::socket_base_t::connect (const char *addr_)      if (addr_type == "pgm") {          switch (type) { + +        //  PGM sender.          case ZMQ_PUB:          {              pgm_sender_t *pgm_sender =  @@ -266,9 +305,29 @@ int zmq::socket_base_t::connect (const char *addr_)              break;          } + +        //  PGM receiver.          case ZMQ_SUB: -            zmq_assert (false); +        { +            pgm_receiver_t *pgm_receiver =  +                new pgm_receiver_t (choose_io_thread (options.affinity), options,  +                session_name.c_str ()); + +            int rc = pgm_receiver->init (addr_args.c_str ()); +            if (rc != 0) { +                delete pgm_receiver; +                return -1; +            } + +            //  Reserve a sequence number for following 'attach' command. +            session->inc_seqnum (); +            send_attach (session, pgm_receiver); + +            pgm_receiver = NULL; +              break; +        } +          default:              errno = EINVAL;              return -1; diff --git a/src/sub.cpp b/src/sub.cpp index 51e0c23..101c62f 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -101,6 +101,12 @@ int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_)          if (rc != 0 && errno == EAGAIN)              return -1; +        //  If there is no subscription return -1/EAGAIN. +        if (!all_count && prefixes.empty () && topics.empty ()) { +            errno = EAGAIN; +            return -1;  +        } +          //  If there is at least one "*" subscription, the message matches.          if (all_count)              return 0; | 
