diff options
| -rw-r--r-- | c/zmq.h | 2 | ||||
| -rw-r--r-- | perf/cpp/remote_thr.cpp | 13 | ||||
| -rw-r--r-- | src/Makefile.am | 6 | ||||
| -rw-r--r-- | src/config.hpp | 4 | ||||
| -rw-r--r-- | src/options.cpp | 4 | ||||
| -rw-r--r-- | src/options.hpp | 6 | ||||
| -rw-r--r-- | src/socket_base.cpp | 74 | 
7 files changed, 93 insertions, 16 deletions
| @@ -52,6 +52,8 @@ extern "C" {  #define ZMQ_IDENTITY 6  #define ZMQ_SUBSCRIBE 7  #define ZMQ_UNSUBSCRIBE 8 +#define ZMQ_RATE 9 +#define ZMQ_RECOVERY_IVL 10  //  The operation should be performed in non-blocking mode. I.e. if it cannot  //  be processed immediately, error should be returned with errno set to EAGAIN. diff --git a/perf/cpp/remote_thr.cpp b/perf/cpp/remote_thr.cpp index 15a4ed1..71a7b53 100644 --- a/perf/cpp/remote_thr.cpp +++ b/perf/cpp/remote_thr.cpp @@ -22,6 +22,7 @@  #include <stdlib.h>  #include <assert.h>  #include <stddef.h> +#include <stdint.h>  int main (int argc, char *argv [])  { @@ -34,9 +35,19 @@ int main (int argc, char *argv [])      size_t message_size = (size_t) atoi (argv [2]);      int message_count = atoi (argv [3]); +    //  appl threads, io threads      zmq::context_t ctx (1, 1); -    zmq::socket_t s (ctx, ZMQ_P2P); +    zmq::socket_t s (ctx, ZMQ_PUB); +     +    //  10Mb/s +    uint32_t rate = 10000; +    s.setsockopt (ZMQ_RATE, &rate, sizeof (rate)); + +    //  10s +    uint32_t recovery_ivl = 10; +    s.setsockopt (ZMQ_RECOVERY_IVL, &recovery_ivl, sizeof (recovery_ivl)); +      s.connect (connect_to);      for (int i = 0; i != message_count; i++) { diff --git a/src/Makefile.am b/src/Makefile.am index 68b34fa..f4f338e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -62,6 +62,8 @@ libzmq_la_SOURCES = $(pgm_sources) \      object.hpp \      options.hpp \      owned.hpp \ +    pgm_sender.hpp \ +    pgm_socket.hpp \      pipe.hpp \      platform.hpp \      poll.hpp \ @@ -101,6 +103,8 @@ libzmq_la_SOURCES = $(pgm_sources) \      object.cpp \      options.cpp \      owned.cpp \ +    pgm_sender.cpp \ +    pgm_socket.cpp \      pipe.cpp \      poll.cpp \      select.cpp \ @@ -122,7 +126,7 @@ libzmq_la_SOURCES = $(pgm_sources) \      zmq_listener.cpp \      zmq_listener_init.cpp -libzmq_la_LDFLAGS = -version-info @LTVER@ +libzmq_la_LDFLAGS = -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAFS@  if BUILD_PGM  libzmq_la_CXXFLAGS = -I$(top_srcdir)/foreign/openpgm/@pgm_basename@/openpgm/pgm/include/ -Wall @LIBZMQ_EXTRA_CXXFLAGS@ diff --git a/src/config.hpp b/src/config.hpp index 17e67b9..43a4513 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -70,8 +70,10 @@ namespace zmq          //  Maximal number of non-accepted connections that can be held by          //  TCP listener object. -        tcp_connection_backlog = 10 +        tcp_connection_backlog = 10, +        //  Maximum transport data unit size for PGM (TPDU). +        pgm_max_tpdu = 1500      };  } diff --git a/src/options.cpp b/src/options.cpp index cd07c44..804cb4f 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -24,6 +24,8 @@ zmq::options_t::options_t () :      lwm (0),      swap (0),      mask (0), -    affinity (0) +    affinity (0), +    rate (0), +    recovery_ivl (0)  {  } diff --git a/src/options.hpp b/src/options.hpp index faf21b8..9f4a264 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -37,6 +37,12 @@ namespace zmq          uint64_t mask;          uint64_t affinity;          std::string identity; + +        //  Maximum tranfer rate [kb/s]. +        uint32_t rate; + +        //  Reliability time interval [s]. +        uint32_t recovery_ivl;      };  } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index a26c280..2384d80 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -17,6 +17,8 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ +#include <iostream> +  #include <string>  #include <algorithm> @@ -35,6 +37,7 @@  #include "uuid.hpp"  #include "pipe.hpp"  #include "err.hpp" +#include "platform.hpp"  zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :      object_t (parent_), @@ -145,6 +148,22 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,          errno = ENOTSUP;          return -1; +    case ZMQ_RATE: +        if (optvallen_ != sizeof (uint32_t)) { +            errno = EINVAL; +            return -1; +        } +        options.rate = *((int32_t*) optval_); +        return 0; +         +    case ZMQ_RECOVERY_IVL: +        if (optvallen_ != sizeof (uint32_t)) { +            errno = EINVAL; +            return -1; +        } +        options.recovery_ivl = *((int32_t*) optval_); +        return 0; +      default:          errno = EINVAL;          return -1; @@ -170,6 +189,21 @@ int zmq::socket_base_t::connect (const char *addr_)      std::string session_name ("#");      session_name += uuid_t ().to_string (); +    //  Parse addr_ string. +    std::string addr_type; +    std::string addr_args; + +    std::string addr (addr_); +    std::string::size_type pos = addr.find ("://"); + +    if (pos == std::string::npos) { +        errno = EINVAL; +        return -1; +    } + +    addr_type = addr.substr (0, pos); +    addr_args = addr.substr (pos + 3); +      //  Create the session.      io_thread_t *io_thread = choose_io_thread (options.affinity);      session_t *session = new session_t (io_thread, this, session_name.c_str (), @@ -198,20 +232,36 @@ int zmq::socket_base_t::connect (const char *addr_)      send_plug (session);      send_own (this, session); -    //  Create the connecter object. Supply it with the session name so that -    //  it can bind the new connection to the session once it is established. -    zmq_connecter_t *connecter = new zmq_connecter_t ( -        choose_io_thread (options.affinity), this, options, -        session_name.c_str ()); -    int rc = connecter->set_address (addr_); -    if (rc != 0) { -        delete connecter; -        return -1; +    if (addr_type == "tcp") { + +        //  Create the connecter object. Supply it with the session name so that +        //  it can bind the new connection to the session once it is established. +        zmq_connecter_t *connecter = new zmq_connecter_t ( +            choose_io_thread (options.affinity), this, options, +            session_name.c_str ()); +        int rc = connecter->set_address (addr_args.c_str ()); +        if (rc != 0) { +            delete connecter; +            return -1; +        } +        send_plug (connecter); +        send_own (this, connecter); + +        return 0;      } -    send_plug (connecter); -    send_own (this, connecter); -    return 0; +#if defined ZMQ_HAVE_OPENPGM +    if (addr_type == "pgm") { +         +        zmq_assert (false); + +        return 0; +    } +#endif + +    //  Unknown address type. +    errno = ENOTSUP; +    return -1;  }  int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) | 
