From 5ba1cb20fe6f6699cef1cc726718e760cd4c9af1 Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Mon, 23 Jan 2012 08:53:25 +0100 Subject: Imported Upstream version 2.0.9.dfsg --- src/Makefile.am | 12 +- src/Makefile.in | 59 ++++++---- src/app_thread.cpp | 12 +- src/ctx.cpp | 1 + src/decoder.hpp | 4 +- src/downstream.cpp | 101 ----------------- src/downstream.hpp | 61 ---------- src/encoder.hpp | 4 +- src/forwarder.cpp | 26 ++++- src/i_poll_events.hpp | 2 +- src/ip.cpp | 7 +- src/msg_store.cpp | 307 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/msg_store.hpp | 114 +++++++++++++++++++ src/pair.cpp | 3 + src/pipe.cpp | 146 ++++++++++++++++++------ src/pipe.hpp | 32 ++++-- src/platform.hpp.in | 3 + src/poll.cpp | 23 ++-- src/prefix_tree.cpp | 12 +- src/prefix_tree.hpp | 2 +- src/pull.cpp | 98 ++++++++++++++++ src/pull.hpp | 62 ++++++++++ src/push.cpp | 101 +++++++++++++++++ src/push.hpp | 61 ++++++++++ src/queue.cpp | 43 +++++-- src/select.cpp | 15 ++- src/session.cpp | 4 +- src/signaler.cpp | 26 +++-- src/socket_base.cpp | 29 ++++- src/streamer.cpp | 26 ++++- src/tcp_connecter.cpp | 15 +-- src/tcp_listener.cpp | 2 +- src/tcp_socket.cpp | 3 +- src/upstream.cpp | 98 ---------------- src/upstream.hpp | 62 ---------- src/uuid.cpp | 4 +- src/xrep.cpp | 29 +++-- src/xrep.hpp | 6 + src/xreq.cpp | 27 +---- src/xreq.hpp | 3 - src/yarray_item.hpp | 4 +- src/ypipe.hpp | 17 +++ src/zmq.cpp | 113 +++++++++++++++++++ src/zmq_decoder.cpp | 7 ++ 44 files changed, 1282 insertions(+), 504 deletions(-) delete mode 100644 src/downstream.cpp delete mode 100644 src/downstream.hpp create mode 100644 src/msg_store.cpp create mode 100644 src/msg_store.hpp create mode 100644 src/pull.cpp create mode 100644 src/pull.hpp create mode 100644 src/push.cpp create mode 100644 src/push.hpp delete mode 100644 src/upstream.cpp delete mode 100644 src/upstream.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index fa97ca3..19a80d0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -3,7 +3,7 @@ lib_LTLIBRARIES = libzmq.la pkgconfigdir = $(libdir)/pkgconfig pkgconfig_DATA = libzmq.pc -include_HEADERS = ../include/zmq.h ../include/zmq.hpp +include_HEADERS = ../include/zmq.h ../include/zmq.hpp ../include/zmq_utils.h if BUILD_PGM pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \ @@ -58,7 +58,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ctx.hpp \ decoder.hpp \ devpoll.hpp \ - downstream.hpp \ + push.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ @@ -76,6 +76,7 @@ libzmq_la_SOURCES = app_thread.hpp \ lb.hpp \ likely.hpp \ msg_content.hpp \ + msg_store.hpp \ mutex.hpp \ object.hpp \ options.hpp \ @@ -104,7 +105,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ - upstream.hpp \ + pull.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ @@ -124,7 +125,7 @@ libzmq_la_SOURCES = app_thread.hpp \ command.cpp \ ctx.cpp \ devpoll.cpp \ - downstream.cpp \ + push.cpp \ epoll.cpp \ err.cpp \ forwarder.cpp \ @@ -134,6 +135,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ip.cpp \ kqueue.cpp \ lb.cpp \ + msg_store.cpp \ object.cpp \ options.cpp \ owned.cpp \ @@ -158,7 +160,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ - upstream.cpp \ + pull.cpp \ uuid.cpp \ xrep.cpp \ xreq.cpp \ diff --git a/src/Makefile.in b/src/Makefile.in index bc3f00f..1bb65a4 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -61,11 +61,12 @@ libLTLIBRARIES_INSTALL = $(INSTALL) LTLIBRARIES = $(lib_LTLIBRARIES) libzmq_la_LIBADD = am_libzmq_la_OBJECTS = libzmq_la-app_thread.lo libzmq_la-command.lo \ - libzmq_la-ctx.lo libzmq_la-devpoll.lo libzmq_la-downstream.lo \ + libzmq_la-ctx.lo libzmq_la-devpoll.lo libzmq_la-push.lo \ libzmq_la-epoll.lo libzmq_la-err.lo libzmq_la-forwarder.lo \ libzmq_la-fq.lo libzmq_la-io_object.lo libzmq_la-io_thread.lo \ libzmq_la-ip.lo libzmq_la-kqueue.lo libzmq_la-lb.lo \ - libzmq_la-object.lo libzmq_la-options.lo libzmq_la-owned.lo \ + libzmq_la-msg_store.lo libzmq_la-object.lo \ + libzmq_la-options.lo libzmq_la-owned.lo \ libzmq_la-pgm_receiver.lo libzmq_la-pgm_sender.lo \ libzmq_la-pgm_socket.lo libzmq_la-pair.lo \ libzmq_la-prefix_tree.lo libzmq_la-pipe.lo libzmq_la-poll.lo \ @@ -74,9 +75,9 @@ am_libzmq_la_OBJECTS = libzmq_la-app_thread.lo libzmq_la-command.lo \ libzmq_la-signaler.lo libzmq_la-socket_base.lo \ libzmq_la-streamer.lo libzmq_la-sub.lo \ libzmq_la-tcp_connecter.lo libzmq_la-tcp_listener.lo \ - libzmq_la-tcp_socket.lo libzmq_la-thread.lo \ - libzmq_la-upstream.lo libzmq_la-uuid.lo libzmq_la-xrep.lo \ - libzmq_la-xreq.lo libzmq_la-zmq.lo libzmq_la-zmq_connecter.lo \ + libzmq_la-tcp_socket.lo libzmq_la-thread.lo libzmq_la-pull.lo \ + libzmq_la-uuid.lo libzmq_la-xrep.lo libzmq_la-xreq.lo \ + libzmq_la-zmq.lo libzmq_la-zmq_connecter.lo \ libzmq_la-zmq_decoder.lo libzmq_la-zmq_encoder.lo \ libzmq_la-zmq_engine.lo libzmq_la-zmq_init.lo \ libzmq_la-zmq_listener.lo @@ -271,7 +272,7 @@ top_srcdir = @top_srcdir@ lib_LTLIBRARIES = libzmq.la pkgconfigdir = $(libdir)/pkgconfig pkgconfig_DATA = libzmq.pc -include_HEADERS = ../include/zmq.h ../include/zmq.hpp +include_HEADERS = ../include/zmq.h ../include/zmq.hpp ../include/zmq_utils.h @BUILD_PGM_TRUE@pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \ @BUILD_PGM_TRUE@ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/time.c \ @BUILD_PGM_TRUE@ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/if.c \ @@ -316,7 +317,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ctx.hpp \ decoder.hpp \ devpoll.hpp \ - downstream.hpp \ + push.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ @@ -334,6 +335,7 @@ libzmq_la_SOURCES = app_thread.hpp \ lb.hpp \ likely.hpp \ msg_content.hpp \ + msg_store.hpp \ mutex.hpp \ object.hpp \ options.hpp \ @@ -362,7 +364,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ - upstream.hpp \ + pull.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ @@ -382,7 +384,7 @@ libzmq_la_SOURCES = app_thread.hpp \ command.cpp \ ctx.cpp \ devpoll.cpp \ - downstream.cpp \ + push.cpp \ epoll.cpp \ err.cpp \ forwarder.cpp \ @@ -392,6 +394,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ip.cpp \ kqueue.cpp \ lb.cpp \ + msg_store.cpp \ object.cpp \ options.cpp \ owned.cpp \ @@ -416,7 +419,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ - upstream.cpp \ + pull.cpp \ uuid.cpp \ xrep.cpp \ xreq.cpp \ @@ -497,9 +500,9 @@ $(srcdir)/Makefile.in: $(srcdir)/Makefile.am $(am__configure_deps) exit 1;; \ esac; \ done; \ - echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu src/Makefile'; \ + echo ' cd $(top_srcdir) && $(AUTOMAKE) --foreign src/Makefile'; \ cd $(top_srcdir) && \ - $(AUTOMAKE) --gnu src/Makefile + $(AUTOMAKE) --foreign src/Makefile .PRECIOUS: Makefile Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status @case '$?' in \ @@ -579,7 +582,6 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-command.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-ctx.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-devpoll.Plo@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-downstream.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-epoll.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-err.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-forwarder.Plo@am__quote@ @@ -600,6 +602,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-lb.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-log.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-md5.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-msg_store.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-nametoindex.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-net.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-object.Plo@am__quote@ @@ -615,6 +618,8 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-poll.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-prefix_tree.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-pub.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-pull.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-push.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-queue.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-rate_control.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-receiver.Plo@am__quote@ @@ -641,7 +646,6 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-transport.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-tsi.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-txwi.Plo@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-upstream.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-uuid.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-version.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-wsastrerror.Plo@am__quote@ @@ -956,12 +960,12 @@ libzmq_la-devpoll.lo: devpoll.cpp @AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-devpoll.lo `test -f 'devpoll.cpp' || echo '$(srcdir)/'`devpoll.cpp -libzmq_la-downstream.lo: downstream.cpp -@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-downstream.lo -MD -MP -MF $(DEPDIR)/libzmq_la-downstream.Tpo -c -o libzmq_la-downstream.lo `test -f 'downstream.cpp' || echo '$(srcdir)/'`downstream.cpp -@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-downstream.Tpo $(DEPDIR)/libzmq_la-downstream.Plo -@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='downstream.cpp' object='libzmq_la-downstream.lo' libtool=yes @AMDEPBACKSLASH@ +libzmq_la-push.lo: push.cpp +@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-push.lo -MD -MP -MF $(DEPDIR)/libzmq_la-push.Tpo -c -o libzmq_la-push.lo `test -f 'push.cpp' || echo '$(srcdir)/'`push.cpp +@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-push.Tpo $(DEPDIR)/libzmq_la-push.Plo +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='push.cpp' object='libzmq_la-push.lo' libtool=yes @AMDEPBACKSLASH@ @AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ -@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-downstream.lo `test -f 'downstream.cpp' || echo '$(srcdir)/'`downstream.cpp +@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-push.lo `test -f 'push.cpp' || echo '$(srcdir)/'`push.cpp libzmq_la-epoll.lo: epoll.cpp @am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-epoll.lo -MD -MP -MF $(DEPDIR)/libzmq_la-epoll.Tpo -c -o libzmq_la-epoll.lo `test -f 'epoll.cpp' || echo '$(srcdir)/'`epoll.cpp @@ -1026,6 +1030,13 @@ libzmq_la-lb.lo: lb.cpp @AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-lb.lo `test -f 'lb.cpp' || echo '$(srcdir)/'`lb.cpp +libzmq_la-msg_store.lo: msg_store.cpp +@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-msg_store.lo -MD -MP -MF $(DEPDIR)/libzmq_la-msg_store.Tpo -c -o libzmq_la-msg_store.lo `test -f 'msg_store.cpp' || echo '$(srcdir)/'`msg_store.cpp +@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-msg_store.Tpo $(DEPDIR)/libzmq_la-msg_store.Plo +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='msg_store.cpp' object='libzmq_la-msg_store.lo' libtool=yes @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-msg_store.lo `test -f 'msg_store.cpp' || echo '$(srcdir)/'`msg_store.cpp + libzmq_la-object.lo: object.cpp @am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-object.lo -MD -MP -MF $(DEPDIR)/libzmq_la-object.Tpo -c -o libzmq_la-object.lo `test -f 'object.cpp' || echo '$(srcdir)/'`object.cpp @am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-object.Tpo $(DEPDIR)/libzmq_la-object.Plo @@ -1194,12 +1205,12 @@ libzmq_la-thread.lo: thread.cpp @AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-thread.lo `test -f 'thread.cpp' || echo '$(srcdir)/'`thread.cpp -libzmq_la-upstream.lo: upstream.cpp -@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-upstream.lo -MD -MP -MF $(DEPDIR)/libzmq_la-upstream.Tpo -c -o libzmq_la-upstream.lo `test -f 'upstream.cpp' || echo '$(srcdir)/'`upstream.cpp -@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-upstream.Tpo $(DEPDIR)/libzmq_la-upstream.Plo -@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='upstream.cpp' object='libzmq_la-upstream.lo' libtool=yes @AMDEPBACKSLASH@ +libzmq_la-pull.lo: pull.cpp +@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-pull.lo -MD -MP -MF $(DEPDIR)/libzmq_la-pull.Tpo -c -o libzmq_la-pull.lo `test -f 'pull.cpp' || echo '$(srcdir)/'`pull.cpp +@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-pull.Tpo $(DEPDIR)/libzmq_la-pull.Plo +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='pull.cpp' object='libzmq_la-pull.lo' libtool=yes @AMDEPBACKSLASH@ @AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ -@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-upstream.lo `test -f 'upstream.cpp' || echo '$(srcdir)/'`upstream.cpp +@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-pull.lo `test -f 'pull.cpp' || echo '$(srcdir)/'`pull.cpp libzmq_la-uuid.lo: uuid.cpp @am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-uuid.lo -MD -MP -MF $(DEPDIR)/libzmq_la-uuid.Tpo -c -o libzmq_la-uuid.lo `test -f 'uuid.cpp' || echo '$(srcdir)/'`uuid.cpp diff --git a/src/app_thread.cpp b/src/app_thread.cpp index fbf034c..ac59464 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -46,8 +46,8 @@ #include "rep.hpp" #include "xreq.hpp" #include "xrep.hpp" -#include "upstream.hpp" -#include "downstream.hpp" +#include "pull.hpp" +#include "push.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -157,11 +157,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) case ZMQ_XREP: s = new (std::nothrow) xrep_t (this); break; - case ZMQ_UPSTREAM: - s = new (std::nothrow) upstream_t (this); + case ZMQ_PULL: + s = new (std::nothrow) pull_t (this); break; - case ZMQ_DOWNSTREAM: - s = new (std::nothrow) downstream_t (this); + case ZMQ_PUSH: + s = new (std::nothrow) push_t (this); break; default: if (sockets.empty ()) diff --git a/src/ctx.cpp b/src/ctx.cpp index f0e177d..397f692 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -151,6 +151,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) // Create the new application thread proxy object. app_thread_info_t info; + memset (&info, 0, sizeof (info)); info.associated = false; info.app_thread = new (std::nothrow) app_thread_t (this, io_threads.size () + app_threads.size ()); diff --git a/src/decoder.hpp b/src/decoder.hpp index 1662bda..f05f651 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -55,7 +55,9 @@ namespace zmq zmq_assert (buf); } - inline ~decoder_t () + // The destructor doesn't have to be virtual. It is mad virtual + // just to keep ICC and code checking tools from complaining. + inline virtual ~decoder_t () { free (buf); } diff --git a/src/downstream.cpp b/src/downstream.cpp deleted file mode 100644 index 4074a9e..0000000 --- a/src/downstream.cpp +++ /dev/null @@ -1,101 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "downstream.hpp" -#include "err.hpp" -#include "pipe.hpp" - -zmq::downstream_t::downstream_t (class app_thread_t *parent_) : - socket_base_t (parent_) -{ - options.requires_in = false; - options.requires_out = true; -} - -zmq::downstream_t::~downstream_t () -{ -} - -void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) -{ - zmq_assert (!inpipe_ && outpipe_); - lb.attach (outpipe_); -} - -void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_) -{ - // There are no inpipes, so this function shouldn't be called at all. - zmq_assert (false); -} - -void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_) -{ - zmq_assert (pipe_); - lb.detach (pipe_); -} - -void zmq::downstream_t::xkill (class reader_t *pipe_) -{ - // There are no inpipes, so this function shouldn't be called at all. - zmq_assert (false); -} - -void zmq::downstream_t::xrevive (class reader_t *pipe_) -{ - // There are no inpipes, so this function shouldn't be called at all. - zmq_assert (false); -} - -void zmq::downstream_t::xrevive (class writer_t *pipe_) -{ - lb.revive (pipe_); -} - -int zmq::downstream_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - // No special option for this socket type. - errno = EINVAL; - return -1; -} - -int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_) -{ - return lb.send (msg_, flags_); -} - -int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_) -{ - errno = ENOTSUP; - return -1; -} - -bool zmq::downstream_t::xhas_in () -{ - return false; -} - -bool zmq::downstream_t::xhas_out () -{ - return lb.has_out (); -} - diff --git a/src/downstream.hpp b/src/downstream.hpp deleted file mode 100644 index 1306743..0000000 --- a/src/downstream.hpp +++ /dev/null @@ -1,61 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__ -#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__ - -#include "socket_base.hpp" -#include "lb.hpp" - -namespace zmq -{ - - class downstream_t : public socket_base_t - { - public: - - downstream_t (class app_thread_t *parent_); - ~downstream_t (); - - // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, - const blob_t &peer_identity_); - void xdetach_inpipe (class reader_t *pipe_); - void xdetach_outpipe (class writer_t *pipe_); - void xkill (class reader_t *pipe_); - void xrevive (class reader_t *pipe_); - void xrevive (class writer_t *pipe_); - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); - int xsend (zmq_msg_t *msg_, int flags_); - int xrecv (zmq_msg_t *msg_, int flags_); - bool xhas_in (); - bool xhas_out (); - - private: - - // Load balancer managing the outbound pipes. - lb_t lb; - - downstream_t (const downstream_t&); - void operator = (const downstream_t&); - }; - -} - -#endif diff --git a/src/encoder.hpp b/src/encoder.hpp index 10fe912..0d5b6ba 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -50,7 +50,9 @@ namespace zmq zmq_assert (buf); } - inline ~encoder_t () + // The destructor doesn't have to be virtual. It is mad virtual + // just to keep ICC and code checking tools from complaining. + inline virtual ~encoder_t () { free (buf); } diff --git a/src/forwarder.cpp b/src/forwarder.cpp index 5aab8f2..d1f324e 100644 --- a/src/forwarder.cpp +++ b/src/forwarder.cpp @@ -21,6 +21,7 @@ #include "forwarder.hpp" #include "socket_base.hpp" +#include "likely.hpp" #include "err.hpp" int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_) @@ -29,9 +30,30 @@ int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_) int rc = zmq_msg_init (&msg); errno_assert (rc == 0); + int64_t more; + size_t more_sz = sizeof (more); + while (true) { - insocket_->recv (&msg, 0); - outsocket_->send (&msg, 0); + rc = insocket_->recv (&msg, 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } } return 0; diff --git a/src/i_poll_events.hpp b/src/i_poll_events.hpp index 8b85f7a..6d474b2 100644 --- a/src/i_poll_events.hpp +++ b/src/i_poll_events.hpp @@ -28,7 +28,7 @@ namespace zmq struct i_poll_events { - virtual ~i_poll_events () {}; + virtual ~i_poll_events () {} // Called by I/O thread when file descriptor is ready for reading. virtual void in_event () = 0; diff --git a/src/ip.cpp b/src/ip.cpp index 79d90da..f491008 100644 --- a/src/ip.cpp +++ b/src/ip.cpp @@ -289,11 +289,8 @@ int zmq::resolve_ip_hostname (sockaddr_storage *addr_, socklen_t *addr_len_, // doesn't really matter, since it's not included in the addr-output. req.ai_socktype = SOCK_STREAM; - // Avoid named services due to unclear socktype, and don't pick IPv4 - // addresses if we don't have a local IPv4 address configured. - // If this is failing for you on a host with only IPv6 connectivity, - // please contribute proper IPv6 support for all functions in this file. - req.ai_flags = AI_NUMERICSERV | AI_ADDRCONFIG; + // Avoid named services due to unclear socktype. + req.ai_flags = AI_NUMERICSERV; // Resolve host name. Some of the error info is lost in case of error, // however, there's no way to report EAI errors via errno. diff --git a/src/msg_store.cpp b/src/msg_store.cpp new file mode 100644 index 0000000..aaf6dbe --- /dev/null +++ b/src/msg_store.cpp @@ -0,0 +1,307 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "platform.hpp" + +#ifdef ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#include +#else +#include +#endif + +#include "../include/zmq.h" + +#include +#include +#include +#include +#include +#include + +#include "atomic_counter.hpp" +#include "msg_store.hpp" +#include "err.hpp" + +zmq::msg_store_t::msg_store_t (int64_t filesize_, size_t block_size_) : + fd (-1), + filesize (filesize_), + file_pos (0), + write_pos (0), + read_pos (0), + block_size (block_size_), + write_buf_start_addr (0) +{ + zmq_assert (filesize > 0); + zmq_assert (block_size > 0); + + buf1 = new (std::nothrow) char [block_size]; + zmq_assert (buf1); + + buf2 = new (std::nothrow) char [block_size]; + zmq_assert (buf2); + + read_buf = write_buf = buf1; +} + +zmq::msg_store_t::~msg_store_t () +{ + delete [] buf1; + delete [] buf2; + + if (fd == -1) + return; + +#ifdef ZMQ_HAVE_WINDOWS + int rc = _close (fd); +#else + int rc = close (fd); +#endif + errno_assert (rc == 0); + +#ifdef ZMQ_HAVE_WINDOWS + rc = _unlink (filename.c_str ()); +#else + rc = unlink (filename.c_str ()); +#endif + errno_assert (rc == 0); +} + +int zmq::msg_store_t::init () +{ + static zmq::atomic_counter_t seqnum (0); + + // Get process ID. +#ifdef ZMQ_HAVE_WINDOWS + int pid = GetCurrentThreadId (); +#else + pid_t pid = getpid (); +#endif + + std::ostringstream outs; + outs << "zmq_" << pid << '_' << seqnum.get () << ".swap"; + filename = outs.str (); + + seqnum.add (1); + + // Open the backing file. +#ifdef ZMQ_HAVE_WINDOWS + fd = _open (filename.c_str (), _O_RDWR | _O_CREAT, 0600); +#else + fd = open (filename.c_str (), O_RDWR | O_CREAT, 0600); +#endif + if (fd == -1) + return -1; + +#ifdef ZMQ_HAVE_LINUX + // Enable more aggresive read-ahead optimization. + posix_fadvise (fd, 0, filesize, POSIX_FADV_SEQUENTIAL); +#endif + return 0; +} + +bool zmq::msg_store_t::store (zmq_msg_t *msg_) +{ + size_t msg_size = zmq_msg_size (msg_); + + // Check buffer space availability. + // NOTE: We always keep one byte open. + if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size)) + return false; + + // Don't store the ZMQ_MSG_SHARED flag. + uint8_t msg_flags = msg_->flags & ~ZMQ_MSG_SHARED; + + // Write message length, flags, and message body. + copy_to_file (&msg_size, sizeof msg_size); + copy_to_file (&msg_flags, sizeof msg_flags); + copy_to_file (zmq_msg_data (msg_), msg_size); + + zmq_msg_close (msg_); + + return true; +} + +void zmq::msg_store_t::fetch (zmq_msg_t *msg_) +{ + // There must be at least one message available. + zmq_assert (read_pos != write_pos); + + // Retrieve the message size. + size_t msg_size; + copy_from_file (&msg_size, sizeof msg_size); + + // Initialize the message. + zmq_msg_init_size (msg_, msg_size); + + // Retrieve the message flags. + copy_from_file (&msg_->flags, sizeof msg_->flags); + + // Retrieve the message payload. + copy_from_file (zmq_msg_data (msg_), msg_size); +} + +void zmq::msg_store_t::commit () +{ + commit_pos = write_pos; +} + +void zmq::msg_store_t::rollback () +{ + if (commit_pos == write_pos || read_pos == write_pos) + return; + + if (write_pos > read_pos) + zmq_assert (read_pos <= commit_pos && commit_pos <= write_pos); + else + zmq_assert (read_pos <= commit_pos || commit_pos <= write_pos); + + if (commit_pos / block_size == read_pos / block_size) { + write_buf_start_addr = commit_pos % block_size; + write_buf = read_buf; + } + else if (commit_pos / block_size != write_pos / block_size) { + write_buf_start_addr = commit_pos % block_size; + fill_buf (write_buf, write_buf_start_addr); + } + write_pos = commit_pos; +} + +bool zmq::msg_store_t::empty () +{ + return read_pos == write_pos; +} + +bool zmq::msg_store_t::full () +{ + return buffer_space () == 1; +} + +void zmq::msg_store_t::copy_from_file (void *buffer_, size_t count_) +{ + char *dest_ptr = (char *) buffer_; + size_t chunk_size, remainder = count_; + + while (remainder > 0) { + chunk_size = std::min (remainder, + std::min ((size_t) (filesize - read_pos), + (size_t) (block_size - read_pos % block_size))); + + memcpy (dest_ptr, &read_buf [read_pos % block_size], chunk_size); + dest_ptr += chunk_size; + + read_pos = (read_pos + chunk_size) % filesize; + if (read_pos % block_size == 0) { + if (read_pos / block_size == write_pos / block_size) + read_buf = write_buf; + else + fill_buf (read_buf, read_pos); + } + remainder -= chunk_size; + } +} + +void zmq::msg_store_t::copy_to_file (const void *buffer_, size_t count_) +{ + char *source_ptr = (char *) buffer_; + size_t chunk_size, remainder = count_; + + while (remainder > 0) { + chunk_size = std::min (remainder, + std::min ((size_t) (filesize - write_pos), + (size_t) (block_size - write_pos % block_size))); + + memcpy (&write_buf [write_pos % block_size], source_ptr, chunk_size); + source_ptr += chunk_size; + + write_pos = (write_pos + chunk_size) % filesize; + if (write_pos % block_size == 0) { + save_write_buf (); + write_buf_start_addr = write_pos; + + if (write_buf == read_buf) { + if (read_buf == buf2) + write_buf = buf1; + else + write_buf = buf2; + } + } + remainder -= chunk_size; + } +} + +void zmq::msg_store_t::fill_buf (char *buf, int64_t pos) +{ + if (file_pos != pos) { +#ifdef ZMQ_HAVE_WINDOWS + __int64 offset = _lseeki64 (fd, pos, SEEK_SET); +#else + off_t offset = lseek (fd, (off_t) pos, SEEK_SET); +#endif + errno_assert (offset == pos); + file_pos = pos; + } + size_t octets_stored = 0; + size_t octets_total = std::min (block_size, (size_t) (filesize - file_pos)); + + while (octets_stored < octets_total) { +#ifdef ZMQ_HAVE_WINDOWS + int rc = _read (fd, &buf [octets_stored], octets_total - octets_stored); +#else + ssize_t rc = read (fd, &buf [octets_stored], octets_total - octets_stored); +#endif + errno_assert (rc > 0); + octets_stored += rc; + } + file_pos += octets_total; +} + +void zmq::msg_store_t::save_write_buf () +{ + if (file_pos != write_buf_start_addr) { +#ifdef ZMQ_HAVE_WINDOWS + __int64 offset = _lseeki64 (fd, write_buf_start_addr, SEEK_SET); +#else + off_t offset = lseek (fd, (off_t) write_buf_start_addr, SEEK_SET); +#endif + errno_assert (offset == write_buf_start_addr); + file_pos = write_buf_start_addr; + } + size_t octets_stored = 0; + size_t octets_total = std::min (block_size, (size_t) (filesize - file_pos)); + + while (octets_stored < octets_total) { +#ifdef ZMQ_HAVE_WINDOWS + int rc = _write (fd, &write_buf [octets_stored], octets_total - octets_stored); +#else + ssize_t rc = write (fd, &write_buf [octets_stored], octets_total - octets_stored); +#endif + errno_assert (rc > 0); + octets_stored += rc; + } + file_pos += octets_total; +} + +int64_t zmq::msg_store_t::buffer_space () +{ + if (write_pos < read_pos) + return read_pos - write_pos; + + return filesize - (write_pos - read_pos); +} diff --git a/src/msg_store.hpp b/src/msg_store.hpp new file mode 100644 index 0000000..765fc60 --- /dev/null +++ b/src/msg_store.hpp @@ -0,0 +1,114 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_MSG_STORE_HPP_INCLUDED__ +#define __ZMQ_MSG_STORE_HPP_INCLUDED__ + +#include "../include/zmq.h" + +#include +#include "stdint.hpp" + +namespace zmq +{ + + // This class implements a message store. Messages are retrieved from + // the store in the same order as they entered it. + + class msg_store_t + { + public: + + enum { default_block_size = 8192 }; + + // Creates message store. + msg_store_t (int64_t filesize_, size_t block_size_ = default_block_size); + + ~msg_store_t (); + + int init (); + + // Stores the message into the message store. The function + // returns false if the message store is full; true otherwise. + bool store (zmq_msg_t *msg_); + + // Fetches the oldest message from the message store. It is an error + // to call this function when the message store is empty. + void fetch (zmq_msg_t *msg_); + + void commit (); + + void rollback (); + + // Returns true if the message store is empty; false otherwise. + bool empty (); + + // Returns true if and only if the store is full. + bool full (); + + private: + + // Copies data from a memory buffer to the backing file. + // Wraps around when reaching maximum file size. + void copy_from_file (void *buffer_, size_t count_); + + // Copies data from the backing file to the memory buffer. + // Wraps around when reaching end-of-file. + void copy_to_file (const void *buffer_, size_t count_); + + // Returns the buffer space available. + int64_t buffer_space (); + + void fill_buf (char *buf, int64_t pos); + + void save_write_buf (); + + // File descriptor to the backing file. + int fd; + + // Name of the backing file. + std::string filename; + + // Maximum size of the backing file. + int64_t filesize; + + // File offset associated with the fd file descriptor. + int64_t file_pos; + + // File offset the next message will be stored at. + int64_t write_pos; + + // File offset the next message will be read from. + int64_t read_pos; + + int64_t commit_pos; + + size_t block_size; + + char *buf1; + char *buf2; + char *read_buf; + char *write_buf; + + int64_t write_buf_start_addr; + }; + +} + +#endif diff --git a/src/pair.cpp b/src/pair.cpp index 31524de..3872b28 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -115,6 +115,9 @@ int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_) zmq_msg_close (msg_); if (!alive || !inpipe || !inpipe->read (msg_)) { + // No message is available. Initialise the output parameter + // to be a 0-byte message. + zmq_msg_init (msg_); errno = EAGAIN; return -1; } diff --git a/src/pipe.cpp b/src/pipe.cpp index 1df64e9..200beb0 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -21,20 +21,14 @@ #include "pipe.hpp" -zmq::reader_t::reader_t (object_t *parent_, - uint64_t hwm_, uint64_t lwm_) : +zmq::reader_t::reader_t (object_t *parent_, uint64_t lwm_) : object_t (parent_), pipe (NULL), peer (NULL), - hwm (hwm_), lwm (lwm_), msgs_read (0), endpoint (NULL) -{ - // Adjust lwm and hwm. - if (lwm == 0 || lwm > hwm) - lwm = hwm; -} +{} zmq::reader_t::~reader_t () { @@ -50,15 +44,32 @@ void zmq::reader_t::set_pipe (pipe_t *pipe_) register_pipe (pipe); } +bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_) +{ + unsigned char *offset = 0; + + return msg_.content == (void*) (offset + ZMQ_DELIMITER); +} + bool zmq::reader_t::check_read () { // Check if there's an item in the pipe. - if (pipe->check_read ()) - return true; - // If not, deactivate the pipe. - endpoint->kill (this); - return false; + if (!pipe->check_read ()) { + endpoint->kill (this); + return false; + } + + // If the next item in the pipe is message delimiter, + // initiate its termination. + if (pipe->probe (is_delimiter)) { + if (endpoint) + endpoint->detach_inpipe (this); + term (); + return false; + } + + return true; } bool zmq::reader_t::read (zmq_msg_t *msg_) @@ -113,20 +124,28 @@ void zmq::reader_t::process_pipe_term_ack () } zmq::writer_t::writer_t (object_t *parent_, - uint64_t hwm_, uint64_t lwm_) : + uint64_t hwm_, int64_t swap_size_) : object_t (parent_), pipe (NULL), peer (NULL), hwm (hwm_), - lwm (lwm_), msgs_read (0), msgs_written (0), + msg_store (NULL), + extra_msg_flag (false), stalled (false), + pending_close (false), endpoint (NULL) { - // Adjust lwm and hwm. - if (lwm == 0 || lwm > hwm) - lwm = hwm; + if (swap_size_ > 0) { + msg_store = new (std::nothrow) msg_store_t (swap_size_); + if (msg_store != NULL) { + if (msg_store->init () < 0) { + delete msg_store; + msg_store = NULL; + } + } + } } void zmq::writer_t::set_endpoint (i_endpoint *endpoint_) @@ -136,6 +155,10 @@ void zmq::writer_t::set_endpoint (i_endpoint *endpoint_) zmq::writer_t::~writer_t () { + if (extra_msg_flag) + zmq_msg_close (&extra_msg); + + delete msg_store; } void zmq::writer_t::set_pipe (pipe_t *pipe_) @@ -147,7 +170,7 @@ void zmq::writer_t::set_pipe (pipe_t *pipe_) bool zmq::writer_t::check_write () { - if (pipe_full ()) { + if (pipe_full () && (msg_store == NULL || msg_store->full () || extra_msg_flag)) { stalled = true; return false; } @@ -157,29 +180,45 @@ bool zmq::writer_t::check_write () bool zmq::writer_t::write (zmq_msg_t *msg_) { - if (pipe_full ()) { - stalled = true; + if (!check_write ()) return false; + + if (pipe_full ()) { + if (msg_store->store (msg_)) { + if (!(msg_->flags & ZMQ_MSG_MORE)) + msg_store->commit (); + } else { + extra_msg = *msg_; + extra_msg_flag = true; + } + } + else { + pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE); + if (!(msg_->flags & ZMQ_MSG_MORE)) + msgs_written++; } - pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE); - if (!(msg_->flags & ZMQ_MSG_MORE)) - msgs_written++; return true; } void zmq::writer_t::rollback () { - zmq_msg_t msg; + if (extra_msg_flag && extra_msg.flags & ZMQ_MSG_MORE) { + zmq_msg_close (&extra_msg); + extra_msg_flag = false; + } + if (msg_store != NULL) + msg_store->rollback (); + + zmq_msg_t msg; // Remove all incomplete messages from the pipe. while (pipe->unwrite (&msg)) { zmq_assert (msg.flags & ZMQ_MSG_MORE); zmq_msg_close (&msg); - msgs_written--; } - if (stalled && endpoint != NULL && !pipe_full()) { + if (stalled && endpoint != NULL && check_write ()) { stalled = false; endpoint->revive (this); } @@ -198,6 +237,14 @@ void zmq::writer_t::term () // Rollback any unfinished messages. rollback (); + if (msg_store == NULL || (msg_store->empty () && !extra_msg_flag)) + write_delimiter (); + else + pending_close = true; +} + +void zmq::writer_t::write_delimiter () +{ // Push delimiter into the pipe. // Trick the compiler to belive that the tag is a valid pointer. zmq_msg_t msg; @@ -205,12 +252,47 @@ void zmq::writer_t::term () msg.content = (void*) (offset + ZMQ_DELIMITER); msg.flags = 0; pipe->write (msg, false); - pipe->flush (); + flush (); } void zmq::writer_t::process_reader_info (uint64_t msgs_read_) { + zmq_msg_t msg; + msgs_read = msgs_read_; + if (msg_store) { + + // Move messages from backing store into pipe. + while (!pipe_full () && !msg_store->empty ()) { + msg_store->fetch(&msg); + // Write message into the pipe. + pipe->write (msg, msg.flags & ZMQ_MSG_MORE); + if (!(msg.flags & ZMQ_MSG_MORE)) + msgs_written++; + } + + if (extra_msg_flag) { + if (!pipe_full ()) { + pipe->write (extra_msg, extra_msg.flags & ZMQ_MSG_MORE); + if (!(extra_msg.flags & ZMQ_MSG_MORE)) + msgs_written++; + extra_msg_flag = false; + } + else if (msg_store->store (&extra_msg)) { + if (!(extra_msg.flags & ZMQ_MSG_MORE)) + msg_store->commit (); + extra_msg_flag = false; + } + } + + if (pending_close && msg_store->empty () && !extra_msg_flag) { + write_delimiter (); + pending_close = false; + } + + flush (); + } + if (stalled && endpoint != NULL) { stalled = false; endpoint->revive (this); @@ -233,9 +315,9 @@ bool zmq::writer_t::pipe_full () } zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_) : - reader (reader_parent_, hwm_, compute_lwm (hwm_)), - writer (writer_parent_, hwm_, compute_lwm (hwm_)) + uint64_t hwm_, int64_t swap_size_) : + reader (reader_parent_, compute_lwm (hwm_)), + writer (writer_parent_, hwm_, swap_size_) { reader.set_pipe (this); writer.set_pipe (this); @@ -276,6 +358,6 @@ uint64_t zmq::pipe_t::compute_lwm (uint64_t hwm_) if (hwm_ > max_wm_delta * 2) return hwm_ - max_wm_delta; else - return hwm_ / 2; + return (hwm_ + 1) / 2; } diff --git a/src/pipe.hpp b/src/pipe.hpp index 9f57653..ece678a 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -26,6 +26,7 @@ #include "i_endpoint.hpp" #include "yarray_item.hpp" #include "ypipe.hpp" +#include "msg_store.hpp" #include "config.hpp" #include "object.hpp" @@ -36,8 +37,7 @@ namespace zmq { public: - reader_t (class object_t *parent_, - uint64_t hwm_, uint64_t lwm_); + reader_t (class object_t *parent_, uint64_t lwm_); ~reader_t (); void set_pipe (class pipe_t *pipe_); @@ -58,14 +58,16 @@ namespace zmq void process_revive (); void process_pipe_term_ack (); + // Returns true if the message is delimiter; false otherwise. + static bool is_delimiter (zmq_msg_t &msg_); + // The underlying pipe. class pipe_t *pipe; // Pipe writer associated with the other side of the pipe. class writer_t *peer; - // High and low watermarks for in-memory storage (in bytes). - uint64_t hwm; + // Low watermark for in-memory storage (in bytes). uint64_t lwm; // Number of messages read so far. @@ -82,8 +84,7 @@ namespace zmq { public: - writer_t (class object_t *parent_, - uint64_t hwm_, uint64_t lwm_); + writer_t (class object_t *parent_, uint64_t hwm_, int64_t swap_size_); ~writer_t (); void set_pipe (class pipe_t *pipe_); @@ -117,15 +118,18 @@ namespace zmq // Tests whether the pipe is already full. bool pipe_full (); + // Write special message to the pipe so that the reader + // can find out we are finished. + void write_delimiter (); + // The underlying pipe. class pipe_t *pipe; // Pipe reader associated with the other side of the pipe. class reader_t *peer; - // High and low watermarks for in-memory storage (in bytes). + // High watermark for in-memory storage (in bytes). uint64_t hwm; - uint64_t lwm; // Last confirmed number of messages read from the pipe. // The actual number can be higher. @@ -134,9 +138,19 @@ namespace zmq // Number of messages we have written so far. uint64_t msgs_written; + // Pointer to backing store. If NULL, messages are always + // kept in main memory. + msg_store_t *msg_store; + + bool extra_msg_flag; + + zmq_msg_t extra_msg; + // True iff the last attempt to write a message has failed. bool stalled; + bool pending_close; + // Endpoint (either session or socket) the pipe is attached to. i_endpoint *endpoint; @@ -150,7 +164,7 @@ namespace zmq public: pipe_t (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_); + uint64_t hwm_, int64_t swap_size_); ~pipe_t (); reader_t reader; diff --git a/src/platform.hpp.in b/src/platform.hpp.in index a885c21..bab50ec 100644 --- a/src/platform.hpp.in +++ b/src/platform.hpp.in @@ -24,6 +24,9 @@ /* Define to 1 if you have the header file. */ #undef HAVE_INTTYPES_H +/* Define to 1 if you have the `crypto' library (-lcrypto). */ +#undef HAVE_LIBCRYPTO + /* Define to 1 if you have the `iphlpapi' library (-liphlpapi). */ #undef HAVE_LIBIPHLPAPI diff --git a/src/poll.cpp b/src/poll.cpp index 4214195..1b203db 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -165,22 +165,21 @@ void zmq::poll_t::loop () continue; } - for (pollset_t::iterator it = pollset.begin (); - it != pollset.end (); it ++) { + for (pollset_t::size_type i = 0; i != pollset.size (); i++) { - zmq_assert (!(it->revents & POLLNVAL)); - if (it->fd == retired_fd) + zmq_assert (!(pollset [i].revents & POLLNVAL)); + if (pollset [i].fd == retired_fd) continue; - if (it->revents & (POLLERR | POLLHUP)) - fd_table [it->fd].events->in_event (); - if (it->fd == retired_fd) + if (pollset [i].revents & (POLLERR | POLLHUP)) + fd_table [pollset [i].fd].events->in_event (); + if (pollset [i].fd == retired_fd) continue; - if (it->revents & POLLOUT) - fd_table [it->fd].events->out_event (); - if (it->fd == retired_fd) + if (pollset [i].revents & POLLOUT) + fd_table [pollset [i].fd].events->out_event (); + if (pollset [i].fd == retired_fd) continue; - if (it->revents & POLLIN) - fd_table [it->fd].events->in_event (); + if (pollset [i].revents & POLLIN) + fd_table [pollset [i].fd].events->in_event (); } // Clean up the pollset and update the fd_table accordingly. diff --git a/src/prefix_tree.cpp b/src/prefix_tree.cpp index 51225d6..6d4f084 100644 --- a/src/prefix_tree.cpp +++ b/src/prefix_tree.cpp @@ -42,7 +42,7 @@ zmq::prefix_tree_t::~prefix_tree_t () if (count == 1) delete next.node; else if (count > 1) { - for (unsigned char i = 0; i != count; ++i) + for (unsigned short i = 0; i != count; ++i) if (next.table [i]) delete next.table [i]; free (next.table); @@ -74,7 +74,7 @@ void zmq::prefix_tree_t::add (unsigned char *prefix_, size_t size_) next.table = (prefix_tree_t**) malloc (sizeof (prefix_tree_t*) * count); zmq_assert (next.table); - for (unsigned char i = 0; i != count; ++i) + for (unsigned short i = 0; i != count; ++i) next.table [i] = 0; min = std::min (min, c); next.table [oldc - min] = oldp; @@ -82,25 +82,25 @@ void zmq::prefix_tree_t::add (unsigned char *prefix_, size_t size_) else if (min < c) { // The new character is above the current character range. - unsigned char old_count = count; + unsigned short old_count = count; count = c - min + 1; next.table = (prefix_tree_t**) realloc ((void*) next.table, sizeof (prefix_tree_t*) * count); zmq_assert (next.table); - for (unsigned char i = old_count; i != count; i++) + for (unsigned short i = old_count; i != count; i++) next.table [i] = NULL; } else { // The new character is below the current character range. - unsigned char old_count = count; + unsigned short old_count = count; count = (min + old_count) - c; next.table = (prefix_tree_t**) realloc ((void*) next.table, sizeof (prefix_tree_t*) * count); zmq_assert (next.table); memmove (next.table + min - c, next.table, old_count * sizeof (prefix_tree_t*)); - for (unsigned char i = 0; i != min - c; i++) + for (unsigned short i = 0; i != min - c; i++) next.table [i] = NULL; min = c; } diff --git a/src/prefix_tree.hpp b/src/prefix_tree.hpp index 53c7c18..bf1c4b9 100644 --- a/src/prefix_tree.hpp +++ b/src/prefix_tree.hpp @@ -42,7 +42,7 @@ namespace zmq uint32_t refcnt; unsigned char min; - unsigned char count; + unsigned short count; union { class prefix_tree_t *node; class prefix_tree_t **table; diff --git a/src/pull.cpp b/src/pull.cpp new file mode 100644 index 0000000..b2413ee --- /dev/null +++ b/src/pull.cpp @@ -0,0 +1,98 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../include/zmq.h" + +#include "pull.hpp" +#include "err.hpp" + +zmq::pull_t::pull_t (class app_thread_t *parent_) : + socket_base_t (parent_) +{ + options.requires_in = true; + options.requires_out = false; +} + +zmq::pull_t::~pull_t () +{ +} + +void zmq::pull_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_, const blob_t &peer_identity_) +{ + zmq_assert (inpipe_ && !outpipe_); + fq.attach (inpipe_); +} + +void zmq::pull_t::xdetach_inpipe (class reader_t *pipe_) +{ + zmq_assert (pipe_); + fq.detach (pipe_); +} + +void zmq::pull_t::xdetach_outpipe (class writer_t *pipe_) +{ + // There are no outpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::pull_t::xkill (class reader_t *pipe_) +{ + fq.kill (pipe_); +} + +void zmq::pull_t::xrevive (class reader_t *pipe_) +{ + fq.revive (pipe_); +} + +void zmq::pull_t::xrevive (class writer_t *pipe_) +{ + zmq_assert (false); +} + +int zmq::pull_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + // No special options for this socket type. + errno = EINVAL; + return -1; +} + +int zmq::pull_t::xsend (zmq_msg_t *msg_, int flags_) +{ + errno = ENOTSUP; + return -1; +} + +int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + return fq.recv (msg_, flags_); +} + +bool zmq::pull_t::xhas_in () +{ + return fq.has_in (); +} + +bool zmq::pull_t::xhas_out () +{ + return false; +} + diff --git a/src/pull.hpp b/src/pull.hpp new file mode 100644 index 0000000..7f249e9 --- /dev/null +++ b/src/pull.hpp @@ -0,0 +1,62 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_PULL_HPP_INCLUDED__ +#define __ZMQ_PULL_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "fq.hpp" + +namespace zmq +{ + + class pull_t : public socket_base_t + { + public: + + pull_t (class app_thread_t *parent_); + ~pull_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + void xrevive (class writer_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // Fair queueing object for inbound pipes. + fq_t fq; + + pull_t (const pull_t&); + void operator = (const pull_t&); + + }; + +} + +#endif diff --git a/src/push.cpp b/src/push.cpp new file mode 100644 index 0000000..522101f --- /dev/null +++ b/src/push.cpp @@ -0,0 +1,101 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../include/zmq.h" + +#include "push.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::push_t::push_t (class app_thread_t *parent_) : + socket_base_t (parent_) +{ + options.requires_in = false; + options.requires_out = true; +} + +zmq::push_t::~push_t () +{ +} + +void zmq::push_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_, const blob_t &peer_identity_) +{ + zmq_assert (!inpipe_ && outpipe_); + lb.attach (outpipe_); +} + +void zmq::push_t::xdetach_inpipe (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::push_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (pipe_); + lb.detach (pipe_); +} + +void zmq::push_t::xkill (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::push_t::xrevive (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::push_t::xrevive (class writer_t *pipe_) +{ + lb.revive (pipe_); +} + +int zmq::push_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + // No special option for this socket type. + errno = EINVAL; + return -1; +} + +int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_) +{ + return lb.send (msg_, flags_); +} + +int zmq::push_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + errno = ENOTSUP; + return -1; +} + +bool zmq::push_t::xhas_in () +{ + return false; +} + +bool zmq::push_t::xhas_out () +{ + return lb.has_out (); +} + diff --git a/src/push.hpp b/src/push.hpp new file mode 100644 index 0000000..b3c8d87 --- /dev/null +++ b/src/push.hpp @@ -0,0 +1,61 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_PUSH_HPP_INCLUDED__ +#define __ZMQ_PUSH_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "lb.hpp" + +namespace zmq +{ + + class push_t : public socket_base_t + { + public: + + push_t (class app_thread_t *parent_); + ~push_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + void xrevive (class writer_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // Load balancer managing the outbound pipes. + lb_t lb; + + push_t (const push_t&); + void operator = (const push_t&); + }; + +} + +#endif diff --git a/src/queue.cpp b/src/queue.cpp index 470ea67..36fab07 100644 --- a/src/queue.cpp +++ b/src/queue.cpp @@ -23,6 +23,7 @@ #include "queue.hpp" #include "socket_base.hpp" +#include "likely.hpp" #include "err.hpp" int zmq::queue (class socket_base_t *insocket_, @@ -49,7 +50,11 @@ int zmq::queue (class socket_base_t *insocket_, // Wait while there are either requests or replies to process. rc = zmq_poll (&items [0], 2, -1); - errno_assert (rc > 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } // The algorithm below asumes ratio of request and replies processed // under full load to be 1:1. Although processing requests replies @@ -61,14 +66,26 @@ int zmq::queue (class socket_base_t *insocket_, while (true) { rc = insocket_->recv (&msg, 0); - errno_assert (rc == 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } moresz = sizeof (more); rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); - errno_assert (rc == 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); - errno_assert (rc == 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } if (!more) break; @@ -80,14 +97,26 @@ int zmq::queue (class socket_base_t *insocket_, while (true) { rc = outsocket_->recv (&msg, 0); - errno_assert (rc == 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } moresz = sizeof (more); rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); - errno_assert (rc == 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } rc = insocket_->send (&msg, more ? ZMQ_SNDMORE : 0); - errno_assert (rc == 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } if (!more) break; diff --git a/src/select.cpp b/src/select.cpp index be5cd47..59eb83e 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -65,6 +65,10 @@ zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) fd_entry_t entry = {fd_, events_}; fds.push_back (entry); + // Ensure we do not attempt to select () on more than FD_SETSIZE + // file descriptors. + zmq_assert (fds.size () <= FD_SETSIZE); + // Start polling on errors. FD_SET (fd_, &source_set_err); @@ -217,10 +221,13 @@ void zmq::select_t::loop () // Destroy retired event sources. if (retired) { - for (fd_set_t::size_type i = 0; i < fds.size (); i ++) { - if (fds [i].fd == retired_fd) { - fds.erase (fds.begin () + i); - i --; + fd_set_t::iterator it = fds.begin(); + while (it != fds.end()) { + if (it->fd == retired_fd) { + it = fds.erase(it); + } + else { + it++; } } retired = false; diff --git a/src/session.cpp b/src/session.cpp index 3cd27fb..f798877 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -265,7 +265,7 @@ void zmq::session_t::process_attach (i_engine *engine_, writer_t *socket_writer = NULL; if (options.requires_in && !out_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm); + pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm, options.swap); zmq_assert (pipe); out_pipe = &pipe->writer; out_pipe->set_endpoint (this); @@ -273,7 +273,7 @@ void zmq::session_t::process_attach (i_engine *engine_, } if (options.requires_out && !in_pipe) { - pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm); + pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm, options.swap); zmq_assert (pipe); in_pipe = &pipe->reader; in_pipe->set_endpoint (this); diff --git a/src/signaler.cpp b/src/signaler.cpp index 592688b..d4a9214 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -176,12 +176,15 @@ zmq::signaler_t::~signaler_t () void zmq::signaler_t::send (const command_t &cmd_) { - ssize_t nbytes = send (w, &cmd_, sizeof (command_t), 0); + ssize_t nbytes; + do { + nbytes = ::send (w, &cmd_, sizeof (command_t), 0); + } while (nbytes == -1 && errno == EINTR); errno_assert (nbytes != -1); zmq_assert (nbytes == sizeof (command_t)); } -bool zmq::signaler_t::recv (command_t &cmd_, bool block_) +bool zmq::signaler_t::recv (command_t *cmd_, bool block_) { if (block_) { @@ -194,7 +197,10 @@ bool zmq::signaler_t::recv (command_t &cmd_, bool block_) } bool result; - ssize_t nbytes = recv (r, buffer, sizeof (command_t), 0); + ssize_t nbytes; + do { + nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0); + } while (nbytes == -1 && errno == EINTR); if (nbytes == -1 && errno == EAGAIN) { result = false; } @@ -207,7 +213,7 @@ bool zmq::signaler_t::recv (command_t &cmd_, bool block_) result = true; } - if (block_) + if (block_) { // Set the reader to non-blocking mode. int flags = fcntl (r, F_GETFL, 0); @@ -249,7 +255,10 @@ void zmq::signaler_t::send (const command_t &cmd_) { // TODO: Note that send is a blocking operation. // How should we behave if the command cannot be written to the signaler? - ssize_t nbytes = ::send (w, &cmd_, sizeof (command_t), 0); + ssize_t nbytes; + do { + nbytes = ::send (w, &cmd_, sizeof (command_t), 0); + } while (nbytes == -1 && errno == EINTR); errno_assert (nbytes != -1); // This should never happen as we've already checked that command size is @@ -259,8 +268,11 @@ void zmq::signaler_t::send (const command_t &cmd_) bool zmq::signaler_t::recv (command_t *cmd_, bool block_) { - ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), - block_ ? 0 : MSG_DONTWAIT); + ssize_t nbytes; + do { + nbytes = ::recv (r, cmd_, sizeof (command_t), + block_ ? 0 : MSG_DONTWAIT); + } while (nbytes == -1 && errno == EINTR); // If there's no signal available return false. if (nbytes == -1 && errno == EAGAIN) diff --git a/src/socket_base.cpp b/src/socket_base.cpp index eddb297..c933954 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -39,6 +39,7 @@ #include "pgm_sender.hpp" #include "pgm_receiver.hpp" #include "likely.hpp" +#include "uuid.hpp" zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : object_t (parent_), @@ -194,13 +195,13 @@ int zmq::socket_base_t::connect (const char *addr_) // Create inbound pipe, if required. if (options.requires_in) { - in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm); + in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm, options.swap); zmq_assert (in_pipe); } // Create outbound pipe, if required. if (options.requires_out) { - out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm); + out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm, options.swap); zmq_assert (out_pipe); } @@ -233,14 +234,14 @@ int zmq::socket_base_t::connect (const char *addr_) // Create inbound pipe, if required. if (options.requires_in) { - in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm); + in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm, options.swap); zmq_assert (in_pipe); } // Create outbound pipe, if required. if (options.requires_out) { - out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm); + out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm, options.swap); zmq_assert (out_pipe); } @@ -424,7 +425,14 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) return -1; } ticks = 0; - return xrecv (msg_, flags_); + + rc = xrecv (msg_, flags_); + if (rc == 0) { + rcvmore = msg_->flags & ZMQ_MSG_MORE; + if (rcvmore) + msg_->flags &= ~ZMQ_MSG_MORE; + } + return rc; } // In blocking scenario, commands are processed over and over again until @@ -621,7 +629,16 @@ void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, inpipe_->set_endpoint (this); if (outpipe_) outpipe_->set_endpoint (this); - xattach_pipes (inpipe_, outpipe_, peer_identity_); + + // If the peer haven't specified it's identity, let's generate one. + if (peer_identity_.size ()) { + xattach_pipes (inpipe_, outpipe_, peer_identity_); + } + else { + blob_t identity (1, 0); + identity.append (uuid_t ().to_blob (), uuid_t::uuid_blob_len); + xattach_pipes (inpipe_, outpipe_, identity); + } } void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_) diff --git a/src/streamer.cpp b/src/streamer.cpp index 796771b..7c03365 100644 --- a/src/streamer.cpp +++ b/src/streamer.cpp @@ -21,6 +21,7 @@ #include "streamer.hpp" #include "socket_base.hpp" +#include "likely.hpp" #include "err.hpp" int zmq::streamer (socket_base_t *insocket_, socket_base_t *outsocket_) @@ -29,9 +30,30 @@ int zmq::streamer (socket_base_t *insocket_, socket_base_t *outsocket_) int rc = zmq_msg_init (&msg); errno_assert (rc == 0); + int64_t more; + size_t more_sz = sizeof (more); + while (true) { - insocket_->recv (&msg, 0); - outsocket_->send (&msg, 0); + rc = insocket_->recv (&msg, 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } } return 0; diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 17c0257..dee71be 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -117,10 +117,10 @@ zmq::fd_t zmq::tcp_connecter_t::connect () // Assert that the error was caused by the networking problems // rather than 0MQ bug. - zmq_assert (err == WSAECONNREFUSED || err == WSAETIMEDOUT || - err == WSAECONNABORTED); - errno = err; + errno_assert (errno == WSAECONNREFUSED || errno == WSAETIMEDOUT || + errno == WSAECONNABORTED || errno == WSAEHOSTUNREACH); + return retired_fd; } @@ -291,11 +291,12 @@ zmq::fd_t zmq::tcp_connecter_t::connect () err = errno; if (err != 0) { - // Assert that the error was caused by the networking problems - // rather than 0MQ bug. - zmq_assert (err == ECONNREFUSED || err == ETIMEDOUT); - + // Assert if the error was caused by 0MQ bug. + // Networking problems are OK. No need to assert. errno = err; + errno_assert (errno == ECONNREFUSED || errno == ECONNRESET || + errno == ETIMEDOUT || errno == EHOSTUNREACH); + return retired_fd; } diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 0cb9a6e..a62bc04 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -64,7 +64,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_) // Allow reusing of the address. int flag = 1; - rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, + rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char*) &flag, sizeof (int)); wsa_assert (rc != SOCKET_ERROR); diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp index cc426d7..c83bba6 100644 --- a/src/tcp_socket.cpp +++ b/src/tcp_socket.cpp @@ -210,7 +210,8 @@ int zmq::tcp_socket_t::read (void *data, int size) return 0; // Signalise peer failure. - if (nbytes == -1 && (errno == ECONNRESET || errno == ECONNREFUSED)) + if (nbytes == -1 && (errno == ECONNRESET || errno == ECONNREFUSED || + errno == ETIMEDOUT || errno == EHOSTUNREACH)) return -1; errno_assert (nbytes != -1); diff --git a/src/upstream.cpp b/src/upstream.cpp deleted file mode 100644 index 1498c31..0000000 --- a/src/upstream.cpp +++ /dev/null @@ -1,98 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "upstream.hpp" -#include "err.hpp" - -zmq::upstream_t::upstream_t (class app_thread_t *parent_) : - socket_base_t (parent_) -{ - options.requires_in = true; - options.requires_out = false; -} - -zmq::upstream_t::~upstream_t () -{ -} - -void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) -{ - zmq_assert (inpipe_ && !outpipe_); - fq.attach (inpipe_); -} - -void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_) -{ - zmq_assert (pipe_); - fq.detach (pipe_); -} - -void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_) -{ - // There are no outpipes, so this function shouldn't be called at all. - zmq_assert (false); -} - -void zmq::upstream_t::xkill (class reader_t *pipe_) -{ - fq.kill (pipe_); -} - -void zmq::upstream_t::xrevive (class reader_t *pipe_) -{ - fq.revive (pipe_); -} - -void zmq::upstream_t::xrevive (class writer_t *pipe_) -{ - zmq_assert (false); -} - -int zmq::upstream_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - // No special options for this socket type. - errno = EINVAL; - return -1; -} - -int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_) -{ - errno = ENOTSUP; - return -1; -} - -int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_) -{ - return fq.recv (msg_, flags_); -} - -bool zmq::upstream_t::xhas_in () -{ - return fq.has_in (); -} - -bool zmq::upstream_t::xhas_out () -{ - return false; -} - diff --git a/src/upstream.hpp b/src/upstream.hpp deleted file mode 100644 index 5fe42ae..0000000 --- a/src/upstream.hpp +++ /dev/null @@ -1,62 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__ -#define __ZMQ_UPSTREAM_HPP_INCLUDED__ - -#include "socket_base.hpp" -#include "fq.hpp" - -namespace zmq -{ - - class upstream_t : public socket_base_t - { - public: - - upstream_t (class app_thread_t *parent_); - ~upstream_t (); - - // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, - const blob_t &peer_identity_); - void xdetach_inpipe (class reader_t *pipe_); - void xdetach_outpipe (class writer_t *pipe_); - void xkill (class reader_t *pipe_); - void xrevive (class reader_t *pipe_); - void xrevive (class writer_t *pipe_); - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); - int xsend (zmq_msg_t *msg_, int flags_); - int xrecv (zmq_msg_t *msg_, int flags_); - bool xhas_in (); - bool xhas_out (); - - private: - - // Fair queueing object for inbound pipes. - fq_t fq; - - upstream_t (const upstream_t&); - void operator = (const upstream_t&); - - }; - -} - -#endif diff --git a/src/uuid.cpp b/src/uuid.cpp index 406bbb4..f1dddf0 100644 --- a/src/uuid.cpp +++ b/src/uuid.cpp @@ -190,8 +190,10 @@ unsigned char zmq::uuid_t::convert_byte (const char *hexa_) byte = *hexa_ - 'A' + 10; else if (*hexa_ >= 'a' && *hexa_ <= 'f') byte = *hexa_ - 'a' + 10; - else + else { zmq_assert (false); + byte = 0; + } byte *= 16; diff --git a/src/xrep.cpp b/src/xrep.cpp index 4e8d18a..5fd6cbb 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -26,6 +26,7 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) : socket_base_t (parent_), current_in (0), + prefetched (false), more_in (false), current_out (NULL), more_out (false) @@ -142,8 +143,11 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) if (!more_out) { zmq_assert (!current_out); - // There's no such thing as prefix with no subsequent message. - zmq_assert (msg_->flags & ZMQ_MSG_MORE); + // If we have malformed message (prefix with no subsequent message) + // then just silently drop the message. + if ((msg_->flags & ZMQ_MSG_MORE) == 0) + return 0; + more_out = true; // Find the pipe associated with the identity stored in the prefix. @@ -153,7 +157,7 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) outpipes_t::iterator it = outpipes.find (identity); if (it == outpipes.end ()) return 0; - + // Remember the outgoing pipe. current_out = it->second.writer; @@ -189,6 +193,13 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) // Deallocate old content of the message. zmq_msg_close (msg_); + if (prefetched) { + zmq_msg_move (msg_, &prefetched_msg); + more_in = msg_->flags & ZMQ_MSG_MORE; + prefetched = false; + return 0; + } + // If we are in the middle of reading a message, just grab next part of it. if (more_in) { zmq_assert (inpipes [current_in].active); @@ -207,21 +218,17 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) for (int count = inpipes.size (); count != 0; count--) { // Try to fetch new message. - bool fetched; - if (!inpipes [current_in].active) - fetched = false; - else - fetched = inpipes [current_in].reader->check_read (); + if (inpipes [current_in].active) + prefetched = inpipes [current_in].reader->read (&prefetched_msg); // If we have a message, create a prefix and return it to the caller. - if (fetched) { + if (prefetched) { int rc = zmq_msg_init_size (msg_, inpipes [current_in].identity.size ()); zmq_assert (rc == 0); memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (), zmq_msg_size (msg_)); msg_->flags = ZMQ_MSG_MORE; - more_in = true; return 0; } @@ -241,7 +248,7 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) bool zmq::xrep_t::xhas_in () { // There are subsequent parts of the partly-read message available. - if (more_in) + if (prefetched || more_in) return true; // Note that messing with current doesn't break the fairness of fair diff --git a/src/xrep.hpp b/src/xrep.hpp index 940d288..da1b3d8 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -67,6 +67,12 @@ namespace zmq // The pipe we are currently reading from. inpipes_t::size_type current_in; + // Have we prefetched a message. + bool prefetched; + + // Holds the prefetched message. + zmq_msg_t prefetched_msg; + // If true, more incoming message parts are expected. bool more_in; diff --git a/src/xreq.cpp b/src/xreq.cpp index ab90f68..66e5cc3 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -23,8 +23,7 @@ #include "err.hpp" zmq::xreq_t::xreq_t (class app_thread_t *parent_) : - socket_base_t (parent_), - dropping (false) + socket_base_t (parent_) { options.requires_in = true; options.requires_out = true; @@ -78,25 +77,7 @@ int zmq::xreq_t::xsetsockopt (int option_, const void *optval_, int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) { - while (true) { - - // If we are ignoring the current message, just drop it and return. - if (dropping) { - if (!(msg_->flags & ZMQ_MSG_MORE)) - dropping = false; - int rc = zmq_msg_close (msg_); - zmq_assert (rc == 0); - rc = zmq_msg_init (msg_); - zmq_assert (rc == 0); - return 0; - } - - int rc = lb.send (msg_, flags_); - if (rc != 0 && errno == EAGAIN) - dropping = true; - else - return rc; - } + return lb.send (msg_, flags_); } int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_) @@ -111,8 +92,6 @@ bool zmq::xreq_t::xhas_in () bool zmq::xreq_t::xhas_out () { - // Socket is always ready for writing. When the queue is full, message - // will be silently dropped. - return true; + return lb.has_out (); } diff --git a/src/xreq.hpp b/src/xreq.hpp index 25a97f1..8ee0bb9 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -55,9 +55,6 @@ namespace zmq fq_t fq; lb_t lb; - // If true, curently sent message is being dropped. - bool dropping; - xreq_t (const xreq_t&); void operator = (const xreq_t&); }; diff --git a/src/yarray_item.hpp b/src/yarray_item.hpp index b6d89cc..db24dda 100644 --- a/src/yarray_item.hpp +++ b/src/yarray_item.hpp @@ -35,7 +35,9 @@ namespace zmq { } - inline ~yarray_item_t () + // The destructor doesn't have to be virtual. It is mad virtual + // just to keep ICC and code checking tools from complaining. + inline virtual ~yarray_item_t () { } diff --git a/src/ypipe.hpp b/src/ypipe.hpp index df5b3d0..26f021c 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -50,6 +50,12 @@ namespace zmq c.set (&queue.back ()); } + // The destructor doesn't have to be virtual. It is mad virtual + // just to keep ICC and code checking tools from complaining. + inline virtual ~ypipe_t () + { + } + // Following function (write) deliberately copies uninitialised data // when used with zmq_msg. Initialising the VSM body for // non-VSM messages won't be good for performance. @@ -156,6 +162,17 @@ namespace zmq return true; } + // Applies the function fn to the first elemenent in the pipe + // and returns the value returned by the fn. + // The pipe mustn't be empty or the function crashes. + inline bool probe (bool (*fn)(T &)) + { + bool rc = check_read (); + zmq_assert (rc); + + return (*fn) (queue.front ()); + } + protected: // Allocation-efficient queue to store pipe items. diff --git a/src/zmq.cpp b/src/zmq.cpp index c8f419a..f3ccaac 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -18,6 +18,7 @@ */ #include "../include/zmq.h" +#include "../include/zmq_utils.h" #include #include @@ -271,6 +272,11 @@ void *zmq_init (int io_threads_) int zmq_term (void *ctx_) { + if (!ctx_) { + errno = EFAULT; + return -1; + } + int rc = ((zmq::ctx_t*) ctx_)->term (); int en = errno; @@ -286,11 +292,19 @@ int zmq_term (void *ctx_) void *zmq_socket (void *ctx_, int type_) { + if (!ctx_) { + errno = EFAULT; + return NULL; + } return (void*) (((zmq::ctx_t*) ctx_)->create_socket (type_)); } int zmq_close (void *s_) { + if (!s_) { + errno = EFAULT; + return -1; + } ((zmq::socket_base_t*) s_)->close (); return 0; } @@ -298,33 +312,57 @@ int zmq_close (void *s_) int zmq_setsockopt (void *s_, int option_, const void *optval_, size_t optvallen_) { + if (!s_) { + errno = EFAULT; + return -1; + } return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_, optvallen_)); } int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_) { + if (!s_) { + errno = EFAULT; + return -1; + } return (((zmq::socket_base_t*) s_)->getsockopt (option_, optval_, optvallen_)); } int zmq_bind (void *s_, const char *addr_) { + if (!s_) { + errno = EFAULT; + return -1; + } return (((zmq::socket_base_t*) s_)->bind (addr_)); } int zmq_connect (void *s_, const char *addr_) { + if (!s_) { + errno = EFAULT; + return -1; + } return (((zmq::socket_base_t*) s_)->connect (addr_)); } int zmq_send (void *s_, zmq_msg_t *msg_, int flags_) { + if (!s_) { + errno = EFAULT; + return -1; + } return (((zmq::socket_base_t*) s_)->send (msg_, flags_)); } int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) { + if (!s_) { + errno = EFAULT; + return -1; + } return (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); } @@ -336,6 +374,10 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\ defined ZMQ_HAVE_NETBSD + if (!items_) { + errno = EFAULT; + return -1; + } pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); zmq_assert (pollfds); int npollfds = 0; @@ -489,6 +531,10 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) zmq::fd_t maxfd = zmq::retired_fd; zmq::fd_t notify_fd = zmq::retired_fd; + // Ensure we do not attempt to select () on more than FD_SETSIZE + // file descriptors. + zmq_assert (nitems_ <= FD_SETSIZE); + for (int i = 0; i != nitems_; i++) { // 0MQ sockets. @@ -647,6 +693,10 @@ int zmq_errno () int zmq_device (int device_, void *insocket_, void *outsocket_) { + if (!insocket_ || !outsocket_) { + errno = EFAULT; + return -1; + } switch (device_) { case ZMQ_FORWARDER: return zmq::forwarder ((zmq::socket_base_t*) insocket_, @@ -661,3 +711,66 @@ int zmq_device (int device_, void *insocket_, void *outsocket_) return EINVAL; } } + +//////////////////////////////////////////////////////////////////////////////// +// 0MQ utils - to be used by perf tests +//////////////////////////////////////////////////////////////////////////////// + +#if defined ZMQ_HAVE_WINDOWS + +static uint64_t now () +{ + // Get the high resolution counter's accuracy. + LARGE_INTEGER ticksPerSecond; + QueryPerformanceFrequency (&ticksPerSecond); + + // What time is it? + LARGE_INTEGER tick; + QueryPerformanceCounter (&tick); + + // Convert the tick number into the number of seconds + // since the system was started. + double ticks_div = (double) (ticksPerSecond.QuadPart / 1000000); + return (uint64_t) (tick.QuadPart / ticks_div); +} + +void zmq_sleep (int seconds_) +{ + Sleep (seconds_ * 1000); +} + +#else + +static uint64_t now () +{ + struct timeval tv; + int rc; + + rc = gettimeofday (&tv, NULL); + assert (rc == 0); + return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec); +} + +void zmq_sleep (int seconds_) +{ + sleep (seconds_); +} + +#endif + +void *zmq_stopwatch_start () +{ + uint64_t *watch = (uint64_t*) malloc (sizeof (uint64_t)); + assert (watch); + *watch = now (); + return (void*) watch; +} + +unsigned long zmq_stopwatch_stop (void *watch_) +{ + uint64_t end = now (); + uint64_t start = *(uint64_t*) watch_; + free (watch_); + return (unsigned long) (end - start); +} + diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index 8e335c9..dcf8e76 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -56,6 +56,9 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () // TODO: Handle over-sized message decently. + // There has to be at least one byte (the flags) in the message). + zmq_assert (*tmpbuf > 0); + // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... @@ -74,6 +77,10 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () // TODO: Handle over-sized message decently. + // There has to be at least one byte (the flags) in the message). + zmq_assert (size > 0); + + // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... -- cgit v1.2.3