diff options
author | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:25 +0100 |
---|---|---|
committer | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:25 +0100 |
commit | 5ba1cb20fe6f6699cef1cc726718e760cd4c9af1 (patch) | |
tree | df7b144c5325fd8b3c88c49b456fafc24249abe6 /src | |
parent | a15854bd92db69fcd0b4444fe1b8fe3610a7acf6 (diff) |
Imported Upstream version 2.0.9.dfsgupstream/2.0.9.dfsg
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 12 | ||||
-rw-r--r-- | src/Makefile.in | 59 | ||||
-rw-r--r-- | src/app_thread.cpp | 12 | ||||
-rw-r--r-- | src/ctx.cpp | 1 | ||||
-rw-r--r-- | src/decoder.hpp | 4 | ||||
-rw-r--r-- | src/encoder.hpp | 4 | ||||
-rw-r--r-- | src/forwarder.cpp | 26 | ||||
-rw-r--r-- | src/i_poll_events.hpp | 2 | ||||
-rw-r--r-- | src/ip.cpp | 7 | ||||
-rw-r--r-- | src/msg_store.cpp | 307 | ||||
-rw-r--r-- | src/msg_store.hpp | 114 | ||||
-rw-r--r-- | src/pair.cpp | 3 | ||||
-rw-r--r-- | src/pipe.cpp | 146 | ||||
-rw-r--r-- | src/pipe.hpp | 32 | ||||
-rw-r--r-- | src/platform.hpp.in | 3 | ||||
-rw-r--r-- | src/poll.cpp | 23 | ||||
-rw-r--r-- | src/prefix_tree.cpp | 12 | ||||
-rw-r--r-- | src/prefix_tree.hpp | 2 | ||||
-rw-r--r-- | src/pull.cpp (renamed from src/upstream.cpp) | 28 | ||||
-rw-r--r-- | src/pull.hpp (renamed from src/upstream.hpp) | 14 | ||||
-rw-r--r-- | src/push.cpp (renamed from src/downstream.cpp) | 28 | ||||
-rw-r--r-- | src/push.hpp (renamed from src/downstream.hpp) | 14 | ||||
-rw-r--r-- | src/queue.cpp | 43 | ||||
-rw-r--r-- | src/select.cpp | 15 | ||||
-rw-r--r-- | src/session.cpp | 4 | ||||
-rw-r--r-- | src/signaler.cpp | 26 | ||||
-rw-r--r-- | src/socket_base.cpp | 29 | ||||
-rw-r--r-- | src/streamer.cpp | 26 | ||||
-rw-r--r-- | src/tcp_connecter.cpp | 15 | ||||
-rw-r--r-- | src/tcp_listener.cpp | 2 | ||||
-rw-r--r-- | src/tcp_socket.cpp | 3 | ||||
-rw-r--r-- | src/uuid.cpp | 4 | ||||
-rw-r--r-- | src/xrep.cpp | 29 | ||||
-rw-r--r-- | src/xrep.hpp | 6 | ||||
-rw-r--r-- | src/xreq.cpp | 27 | ||||
-rw-r--r-- | src/xreq.hpp | 3 | ||||
-rw-r--r-- | src/yarray_item.hpp | 4 | ||||
-rw-r--r-- | src/ypipe.hpp | 17 | ||||
-rw-r--r-- | src/zmq.cpp | 113 | ||||
-rw-r--r-- | src/zmq_decoder.cpp | 7 |
40 files changed, 1002 insertions, 224 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index fa97ca3..19a80d0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -3,7 +3,7 @@ lib_LTLIBRARIES = libzmq.la pkgconfigdir = $(libdir)/pkgconfig pkgconfig_DATA = libzmq.pc -include_HEADERS = ../include/zmq.h ../include/zmq.hpp +include_HEADERS = ../include/zmq.h ../include/zmq.hpp ../include/zmq_utils.h if BUILD_PGM pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \ @@ -58,7 +58,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ctx.hpp \ decoder.hpp \ devpoll.hpp \ - downstream.hpp \ + push.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ @@ -76,6 +76,7 @@ libzmq_la_SOURCES = app_thread.hpp \ lb.hpp \ likely.hpp \ msg_content.hpp \ + msg_store.hpp \ mutex.hpp \ object.hpp \ options.hpp \ @@ -104,7 +105,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ - upstream.hpp \ + pull.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ @@ -124,7 +125,7 @@ libzmq_la_SOURCES = app_thread.hpp \ command.cpp \ ctx.cpp \ devpoll.cpp \ - downstream.cpp \ + push.cpp \ epoll.cpp \ err.cpp \ forwarder.cpp \ @@ -134,6 +135,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ip.cpp \ kqueue.cpp \ lb.cpp \ + msg_store.cpp \ object.cpp \ options.cpp \ owned.cpp \ @@ -158,7 +160,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ - upstream.cpp \ + pull.cpp \ uuid.cpp \ xrep.cpp \ xreq.cpp \ diff --git a/src/Makefile.in b/src/Makefile.in index bc3f00f..1bb65a4 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -61,11 +61,12 @@ libLTLIBRARIES_INSTALL = $(INSTALL) LTLIBRARIES = $(lib_LTLIBRARIES) libzmq_la_LIBADD = am_libzmq_la_OBJECTS = libzmq_la-app_thread.lo libzmq_la-command.lo \ - libzmq_la-ctx.lo libzmq_la-devpoll.lo libzmq_la-downstream.lo \ + libzmq_la-ctx.lo libzmq_la-devpoll.lo libzmq_la-push.lo \ libzmq_la-epoll.lo libzmq_la-err.lo libzmq_la-forwarder.lo \ libzmq_la-fq.lo libzmq_la-io_object.lo libzmq_la-io_thread.lo \ libzmq_la-ip.lo libzmq_la-kqueue.lo libzmq_la-lb.lo \ - libzmq_la-object.lo libzmq_la-options.lo libzmq_la-owned.lo \ + libzmq_la-msg_store.lo libzmq_la-object.lo \ + libzmq_la-options.lo libzmq_la-owned.lo \ libzmq_la-pgm_receiver.lo libzmq_la-pgm_sender.lo \ libzmq_la-pgm_socket.lo libzmq_la-pair.lo \ libzmq_la-prefix_tree.lo libzmq_la-pipe.lo libzmq_la-poll.lo \ @@ -74,9 +75,9 @@ am_libzmq_la_OBJECTS = libzmq_la-app_thread.lo libzmq_la-command.lo \ libzmq_la-signaler.lo libzmq_la-socket_base.lo \ libzmq_la-streamer.lo libzmq_la-sub.lo \ libzmq_la-tcp_connecter.lo libzmq_la-tcp_listener.lo \ - libzmq_la-tcp_socket.lo libzmq_la-thread.lo \ - libzmq_la-upstream.lo libzmq_la-uuid.lo libzmq_la-xrep.lo \ - libzmq_la-xreq.lo libzmq_la-zmq.lo libzmq_la-zmq_connecter.lo \ + libzmq_la-tcp_socket.lo libzmq_la-thread.lo libzmq_la-pull.lo \ + libzmq_la-uuid.lo libzmq_la-xrep.lo libzmq_la-xreq.lo \ + libzmq_la-zmq.lo libzmq_la-zmq_connecter.lo \ libzmq_la-zmq_decoder.lo libzmq_la-zmq_encoder.lo \ libzmq_la-zmq_engine.lo libzmq_la-zmq_init.lo \ libzmq_la-zmq_listener.lo @@ -271,7 +272,7 @@ top_srcdir = @top_srcdir@ lib_LTLIBRARIES = libzmq.la pkgconfigdir = $(libdir)/pkgconfig pkgconfig_DATA = libzmq.pc -include_HEADERS = ../include/zmq.h ../include/zmq.hpp +include_HEADERS = ../include/zmq.h ../include/zmq.hpp ../include/zmq_utils.h @BUILD_PGM_TRUE@pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \ @BUILD_PGM_TRUE@ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/time.c \ @BUILD_PGM_TRUE@ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/if.c \ @@ -316,7 +317,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ctx.hpp \ decoder.hpp \ devpoll.hpp \ - downstream.hpp \ + push.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ @@ -334,6 +335,7 @@ libzmq_la_SOURCES = app_thread.hpp \ lb.hpp \ likely.hpp \ msg_content.hpp \ + msg_store.hpp \ mutex.hpp \ object.hpp \ options.hpp \ @@ -362,7 +364,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ - upstream.hpp \ + pull.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ @@ -382,7 +384,7 @@ libzmq_la_SOURCES = app_thread.hpp \ command.cpp \ ctx.cpp \ devpoll.cpp \ - downstream.cpp \ + push.cpp \ epoll.cpp \ err.cpp \ forwarder.cpp \ @@ -392,6 +394,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ip.cpp \ kqueue.cpp \ lb.cpp \ + msg_store.cpp \ object.cpp \ options.cpp \ owned.cpp \ @@ -416,7 +419,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ - upstream.cpp \ + pull.cpp \ uuid.cpp \ xrep.cpp \ xreq.cpp \ @@ -497,9 +500,9 @@ $(srcdir)/Makefile.in: $(srcdir)/Makefile.am $(am__configure_deps) exit 1;; \ esac; \ done; \ - echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu src/Makefile'; \ + echo ' cd $(top_srcdir) && $(AUTOMAKE) --foreign src/Makefile'; \ cd $(top_srcdir) && \ - $(AUTOMAKE) --gnu src/Makefile + $(AUTOMAKE) --foreign src/Makefile .PRECIOUS: Makefile Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status @case '$?' in \ @@ -579,7 +582,6 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-command.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-ctx.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-devpoll.Plo@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-downstream.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-epoll.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-err.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-forwarder.Plo@am__quote@ @@ -600,6 +602,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-lb.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-log.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-md5.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-msg_store.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-nametoindex.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-net.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-object.Plo@am__quote@ @@ -615,6 +618,8 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-poll.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-prefix_tree.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-pub.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-pull.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-push.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-queue.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-rate_control.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-receiver.Plo@am__quote@ @@ -641,7 +646,6 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-transport.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-tsi.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-txwi.Plo@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-upstream.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-uuid.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-version.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-wsastrerror.Plo@am__quote@ @@ -956,12 +960,12 @@ libzmq_la-devpoll.lo: devpoll.cpp @AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-devpoll.lo `test -f 'devpoll.cpp' || echo '$(srcdir)/'`devpoll.cpp -libzmq_la-downstream.lo: downstream.cpp -@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-downstream.lo -MD -MP -MF $(DEPDIR)/libzmq_la-downstream.Tpo -c -o libzmq_la-downstream.lo `test -f 'downstream.cpp' || echo '$(srcdir)/'`downstream.cpp -@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-downstream.Tpo $(DEPDIR)/libzmq_la-downstream.Plo -@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='downstream.cpp' object='libzmq_la-downstream.lo' libtool=yes @AMDEPBACKSLASH@ +libzmq_la-push.lo: push.cpp +@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-push.lo -MD -MP -MF $(DEPDIR)/libzmq_la-push.Tpo -c -o libzmq_la-push.lo `test -f 'push.cpp' || echo '$(srcdir)/'`push.cpp +@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-push.Tpo $(DEPDIR)/libzmq_la-push.Plo +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='push.cpp' object='libzmq_la-push.lo' libtool=yes @AMDEPBACKSLASH@ @AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ -@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-downstream.lo `test -f 'downstream.cpp' || echo '$(srcdir)/'`downstream.cpp +@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-push.lo `test -f 'push.cpp' || echo '$(srcdir)/'`push.cpp libzmq_la-epoll.lo: epoll.cpp @am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-epoll.lo -MD -MP -MF $(DEPDIR)/libzmq_la-epoll.Tpo -c -o libzmq_la-epoll.lo `test -f 'epoll.cpp' || echo '$(srcdir)/'`epoll.cpp @@ -1026,6 +1030,13 @@ libzmq_la-lb.lo: lb.cpp @AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-lb.lo `test -f 'lb.cpp' || echo '$(srcdir)/'`lb.cpp +libzmq_la-msg_store.lo: msg_store.cpp +@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-msg_store.lo -MD -MP -MF $(DEPDIR)/libzmq_la-msg_store.Tpo -c -o libzmq_la-msg_store.lo `test -f 'msg_store.cpp' || echo '$(srcdir)/'`msg_store.cpp +@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-msg_store.Tpo $(DEPDIR)/libzmq_la-msg_store.Plo +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='msg_store.cpp' object='libzmq_la-msg_store.lo' libtool=yes @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-msg_store.lo `test -f 'msg_store.cpp' || echo '$(srcdir)/'`msg_store.cpp + libzmq_la-object.lo: object.cpp @am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-object.lo -MD -MP -MF $(DEPDIR)/libzmq_la-object.Tpo -c -o libzmq_la-object.lo `test -f 'object.cpp' || echo '$(srcdir)/'`object.cpp @am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-object.Tpo $(DEPDIR)/libzmq_la-object.Plo @@ -1194,12 +1205,12 @@ libzmq_la-thread.lo: thread.cpp @AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-thread.lo `test -f 'thread.cpp' || echo '$(srcdir)/'`thread.cpp -libzmq_la-upstream.lo: upstream.cpp -@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-upstream.lo -MD -MP -MF $(DEPDIR)/libzmq_la-upstream.Tpo -c -o libzmq_la-upstream.lo `test -f 'upstream.cpp' || echo '$(srcdir)/'`upstream.cpp -@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-upstream.Tpo $(DEPDIR)/libzmq_la-upstream.Plo -@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='upstream.cpp' object='libzmq_la-upstream.lo' libtool=yes @AMDEPBACKSLASH@ +libzmq_la-pull.lo: pull.cpp +@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-pull.lo -MD -MP -MF $(DEPDIR)/libzmq_la-pull.Tpo -c -o libzmq_la-pull.lo `test -f 'pull.cpp' || echo '$(srcdir)/'`pull.cpp +@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-pull.Tpo $(DEPDIR)/libzmq_la-pull.Plo +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='pull.cpp' object='libzmq_la-pull.lo' libtool=yes @AMDEPBACKSLASH@ @AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ -@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-upstream.lo `test -f 'upstream.cpp' || echo '$(srcdir)/'`upstream.cpp +@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-pull.lo `test -f 'pull.cpp' || echo '$(srcdir)/'`pull.cpp libzmq_la-uuid.lo: uuid.cpp @am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-uuid.lo -MD -MP -MF $(DEPDIR)/libzmq_la-uuid.Tpo -c -o libzmq_la-uuid.lo `test -f 'uuid.cpp' || echo '$(srcdir)/'`uuid.cpp diff --git a/src/app_thread.cpp b/src/app_thread.cpp index fbf034c..ac59464 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -46,8 +46,8 @@ #include "rep.hpp" #include "xreq.hpp" #include "xrep.hpp" -#include "upstream.hpp" -#include "downstream.hpp" +#include "pull.hpp" +#include "push.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -157,11 +157,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) case ZMQ_XREP: s = new (std::nothrow) xrep_t (this); break; - case ZMQ_UPSTREAM: - s = new (std::nothrow) upstream_t (this); + case ZMQ_PULL: + s = new (std::nothrow) pull_t (this); break; - case ZMQ_DOWNSTREAM: - s = new (std::nothrow) downstream_t (this); + case ZMQ_PUSH: + s = new (std::nothrow) push_t (this); break; default: if (sockets.empty ()) diff --git a/src/ctx.cpp b/src/ctx.cpp index f0e177d..397f692 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -151,6 +151,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) // Create the new application thread proxy object. app_thread_info_t info; + memset (&info, 0, sizeof (info)); info.associated = false; info.app_thread = new (std::nothrow) app_thread_t (this, io_threads.size () + app_threads.size ()); diff --git a/src/decoder.hpp b/src/decoder.hpp index 1662bda..f05f651 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -55,7 +55,9 @@ namespace zmq zmq_assert (buf); } - inline ~decoder_t () + // The destructor doesn't have to be virtual. It is mad virtual + // just to keep ICC and code checking tools from complaining. + inline virtual ~decoder_t () { free (buf); } diff --git a/src/encoder.hpp b/src/encoder.hpp index 10fe912..0d5b6ba 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -50,7 +50,9 @@ namespace zmq zmq_assert (buf); } - inline ~encoder_t () + // The destructor doesn't have to be virtual. It is mad virtual + // just to keep ICC and code checking tools from complaining. + inline virtual ~encoder_t () { free (buf); } diff --git a/src/forwarder.cpp b/src/forwarder.cpp index 5aab8f2..d1f324e 100644 --- a/src/forwarder.cpp +++ b/src/forwarder.cpp @@ -21,6 +21,7 @@ #include "forwarder.hpp" #include "socket_base.hpp" +#include "likely.hpp" #include "err.hpp" int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_) @@ -29,9 +30,30 @@ int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_) int rc = zmq_msg_init (&msg); errno_assert (rc == 0); + int64_t more; + size_t more_sz = sizeof (more); + while (true) { - insocket_->recv (&msg, 0); - outsocket_->send (&msg, 0); + rc = insocket_->recv (&msg, 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } } return 0; diff --git a/src/i_poll_events.hpp b/src/i_poll_events.hpp index 8b85f7a..6d474b2 100644 --- a/src/i_poll_events.hpp +++ b/src/i_poll_events.hpp @@ -28,7 +28,7 @@ namespace zmq struct i_poll_events { - virtual ~i_poll_events () {}; + virtual ~i_poll_events () {} // Called by I/O thread when file descriptor is ready for reading. virtual void in_event () = 0; @@ -289,11 +289,8 @@ int zmq::resolve_ip_hostname (sockaddr_storage *addr_, socklen_t *addr_len_, // doesn't really matter, since it's not included in the addr-output. req.ai_socktype = SOCK_STREAM; - // Avoid named services due to unclear socktype, and don't pick IPv4 - // addresses if we don't have a local IPv4 address configured. - // If this is failing for you on a host with only IPv6 connectivity, - // please contribute proper IPv6 support for all functions in this file. - req.ai_flags = AI_NUMERICSERV | AI_ADDRCONFIG; + // Avoid named services due to unclear socktype. + req.ai_flags = AI_NUMERICSERV; // Resolve host name. Some of the error info is lost in case of error, // however, there's no way to report EAI errors via errno. diff --git a/src/msg_store.cpp b/src/msg_store.cpp new file mode 100644 index 0000000..aaf6dbe --- /dev/null +++ b/src/msg_store.cpp @@ -0,0 +1,307 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "platform.hpp" + +#ifdef ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#include <io.h> +#else +#include <unistd.h> +#endif + +#include "../include/zmq.h" + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <string.h> +#include <sstream> +#include <algorithm> + +#include "atomic_counter.hpp" +#include "msg_store.hpp" +#include "err.hpp" + +zmq::msg_store_t::msg_store_t (int64_t filesize_, size_t block_size_) : + fd (-1), + filesize (filesize_), + file_pos (0), + write_pos (0), + read_pos (0), + block_size (block_size_), + write_buf_start_addr (0) +{ + zmq_assert (filesize > 0); + zmq_assert (block_size > 0); + + buf1 = new (std::nothrow) char [block_size]; + zmq_assert (buf1); + + buf2 = new (std::nothrow) char [block_size]; + zmq_assert (buf2); + + read_buf = write_buf = buf1; +} + +zmq::msg_store_t::~msg_store_t () +{ + delete [] buf1; + delete [] buf2; + + if (fd == -1) + return; + +#ifdef ZMQ_HAVE_WINDOWS + int rc = _close (fd); +#else + int rc = close (fd); +#endif + errno_assert (rc == 0); + +#ifdef ZMQ_HAVE_WINDOWS + rc = _unlink (filename.c_str ()); +#else + rc = unlink (filename.c_str ()); +#endif + errno_assert (rc == 0); +} + +int zmq::msg_store_t::init () +{ + static zmq::atomic_counter_t seqnum (0); + + // Get process ID. +#ifdef ZMQ_HAVE_WINDOWS + int pid = GetCurrentThreadId (); +#else + pid_t pid = getpid (); +#endif + + std::ostringstream outs; + outs << "zmq_" << pid << '_' << seqnum.get () << ".swap"; + filename = outs.str (); + + seqnum.add (1); + + // Open the backing file. +#ifdef ZMQ_HAVE_WINDOWS + fd = _open (filename.c_str (), _O_RDWR | _O_CREAT, 0600); +#else + fd = open (filename.c_str (), O_RDWR | O_CREAT, 0600); +#endif + if (fd == -1) + return -1; + +#ifdef ZMQ_HAVE_LINUX + // Enable more aggresive read-ahead optimization. + posix_fadvise (fd, 0, filesize, POSIX_FADV_SEQUENTIAL); +#endif + return 0; +} + +bool zmq::msg_store_t::store (zmq_msg_t *msg_) +{ + size_t msg_size = zmq_msg_size (msg_); + + // Check buffer space availability. + // NOTE: We always keep one byte open. + if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size)) + return false; + + // Don't store the ZMQ_MSG_SHARED flag. + uint8_t msg_flags = msg_->flags & ~ZMQ_MSG_SHARED; + + // Write message length, flags, and message body. + copy_to_file (&msg_size, sizeof msg_size); + copy_to_file (&msg_flags, sizeof msg_flags); + copy_to_file (zmq_msg_data (msg_), msg_size); + + zmq_msg_close (msg_); + + return true; +} + +void zmq::msg_store_t::fetch (zmq_msg_t *msg_) +{ + // There must be at least one message available. + zmq_assert (read_pos != write_pos); + + // Retrieve the message size. + size_t msg_size; + copy_from_file (&msg_size, sizeof msg_size); + + // Initialize the message. + zmq_msg_init_size (msg_, msg_size); + + // Retrieve the message flags. + copy_from_file (&msg_->flags, sizeof msg_->flags); + + // Retrieve the message payload. + copy_from_file (zmq_msg_data (msg_), msg_size); +} + +void zmq::msg_store_t::commit () +{ + commit_pos = write_pos; +} + +void zmq::msg_store_t::rollback () +{ + if (commit_pos == write_pos || read_pos == write_pos) + return; + + if (write_pos > read_pos) + zmq_assert (read_pos <= commit_pos && commit_pos <= write_pos); + else + zmq_assert (read_pos <= commit_pos || commit_pos <= write_pos); + + if (commit_pos / block_size == read_pos / block_size) { + write_buf_start_addr = commit_pos % block_size; + write_buf = read_buf; + } + else if (commit_pos / block_size != write_pos / block_size) { + write_buf_start_addr = commit_pos % block_size; + fill_buf (write_buf, write_buf_start_addr); + } + write_pos = commit_pos; +} + +bool zmq::msg_store_t::empty () +{ + return read_pos == write_pos; +} + +bool zmq::msg_store_t::full () +{ + return buffer_space () == 1; +} + +void zmq::msg_store_t::copy_from_file (void *buffer_, size_t count_) +{ + char *dest_ptr = (char *) buffer_; + size_t chunk_size, remainder = count_; + + while (remainder > 0) { + chunk_size = std::min (remainder, + std::min ((size_t) (filesize - read_pos), + (size_t) (block_size - read_pos % block_size))); + + memcpy (dest_ptr, &read_buf [read_pos % block_size], chunk_size); + dest_ptr += chunk_size; + + read_pos = (read_pos + chunk_size) % filesize; + if (read_pos % block_size == 0) { + if (read_pos / block_size == write_pos / block_size) + read_buf = write_buf; + else + fill_buf (read_buf, read_pos); + } + remainder -= chunk_size; + } +} + +void zmq::msg_store_t::copy_to_file (const void *buffer_, size_t count_) +{ + char *source_ptr = (char *) buffer_; + size_t chunk_size, remainder = count_; + + |