diff options
| author | Martin Lucina <mato@kotelna.sk> | 2010-09-08 15:25:45 +0200 | 
|---|---|---|
| committer | Martin Lucina <martin@lucina.net> | 2012-01-23 08:53:27 +0100 | 
| commit | 90d73cba9cd1d1724f38ed82fc0eefb1781c9c20 (patch) | |
| tree | 1760872164a93384d1adb90db9c8d41777dbb2a7 /src | |
| parent | cf026feae205bfeb7e007f6afd0e8d7b283865c8 (diff) | |
| parent | 5ba1cb20fe6f6699cef1cc726718e760cd4c9af1 (diff) | |
Imported Debian patch 2.0.9.dfsg-1debian/2.0.9.dfsg-1
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile.am | 12 | ||||
| -rw-r--r-- | src/Makefile.in | 59 | ||||
| -rw-r--r-- | src/app_thread.cpp | 12 | ||||
| -rw-r--r-- | src/ctx.cpp | 1 | ||||
| -rw-r--r-- | src/decoder.hpp | 4 | ||||
| -rw-r--r-- | src/encoder.hpp | 4 | ||||
| -rw-r--r-- | src/forwarder.cpp | 26 | ||||
| -rw-r--r-- | src/i_poll_events.hpp | 2 | ||||
| -rw-r--r-- | src/ip.cpp | 7 | ||||
| -rw-r--r-- | src/msg_store.cpp | 307 | ||||
| -rw-r--r-- | src/msg_store.hpp | 114 | ||||
| -rw-r--r-- | src/pair.cpp | 3 | ||||
| -rw-r--r-- | src/pipe.cpp | 146 | ||||
| -rw-r--r-- | src/pipe.hpp | 32 | ||||
| -rw-r--r-- | src/platform.hpp.in | 3 | ||||
| -rw-r--r-- | src/poll.cpp | 23 | ||||
| -rw-r--r-- | src/prefix_tree.cpp | 12 | ||||
| -rw-r--r-- | src/prefix_tree.hpp | 2 | ||||
| -rw-r--r-- | src/pull.cpp (renamed from src/upstream.cpp) | 28 | ||||
| -rw-r--r-- | src/pull.hpp (renamed from src/upstream.hpp) | 14 | ||||
| -rw-r--r-- | src/push.cpp (renamed from src/downstream.cpp) | 28 | ||||
| -rw-r--r-- | src/push.hpp (renamed from src/downstream.hpp) | 14 | ||||
| -rw-r--r-- | src/queue.cpp | 43 | ||||
| -rw-r--r-- | src/select.cpp | 15 | ||||
| -rw-r--r-- | src/session.cpp | 4 | ||||
| -rw-r--r-- | src/signaler.cpp | 26 | ||||
| -rw-r--r-- | src/socket_base.cpp | 29 | ||||
| -rw-r--r-- | src/streamer.cpp | 26 | ||||
| -rw-r--r-- | src/tcp_connecter.cpp | 15 | ||||
| -rw-r--r-- | src/tcp_listener.cpp | 2 | ||||
| -rw-r--r-- | src/tcp_socket.cpp | 3 | ||||
| -rw-r--r-- | src/uuid.cpp | 4 | ||||
| -rw-r--r-- | src/xrep.cpp | 29 | ||||
| -rw-r--r-- | src/xrep.hpp | 6 | ||||
| -rw-r--r-- | src/xreq.cpp | 27 | ||||
| -rw-r--r-- | src/xreq.hpp | 3 | ||||
| -rw-r--r-- | src/yarray_item.hpp | 4 | ||||
| -rw-r--r-- | src/ypipe.hpp | 17 | ||||
| -rw-r--r-- | src/zmq.cpp | 113 | ||||
| -rw-r--r-- | src/zmq_decoder.cpp | 7 | 
40 files changed, 1002 insertions, 224 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index fa97ca3..19a80d0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -3,7 +3,7 @@ lib_LTLIBRARIES = libzmq.la  pkgconfigdir = $(libdir)/pkgconfig  pkgconfig_DATA = libzmq.pc -include_HEADERS = ../include/zmq.h ../include/zmq.hpp +include_HEADERS = ../include/zmq.h ../include/zmq.hpp ../include/zmq_utils.h  if BUILD_PGM  pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \ @@ -58,7 +58,7 @@ libzmq_la_SOURCES = app_thread.hpp \      ctx.hpp \      decoder.hpp \      devpoll.hpp \ -    downstream.hpp \ +    push.hpp \      encoder.hpp \      epoll.hpp \      err.hpp \ @@ -76,6 +76,7 @@ libzmq_la_SOURCES = app_thread.hpp \      lb.hpp \      likely.hpp \      msg_content.hpp \ +    msg_store.hpp \      mutex.hpp \      object.hpp \      options.hpp \ @@ -104,7 +105,7 @@ libzmq_la_SOURCES = app_thread.hpp \      tcp_listener.hpp \      tcp_socket.hpp \      thread.hpp \ -    upstream.hpp \ +    pull.hpp \      uuid.hpp \      windows.hpp \      wire.hpp \ @@ -124,7 +125,7 @@ libzmq_la_SOURCES = app_thread.hpp \      command.cpp \      ctx.cpp \      devpoll.cpp \ -    downstream.cpp \ +    push.cpp \      epoll.cpp \      err.cpp \      forwarder.cpp \ @@ -134,6 +135,7 @@ libzmq_la_SOURCES = app_thread.hpp \      ip.cpp \      kqueue.cpp \      lb.cpp \ +    msg_store.cpp \      object.cpp \      options.cpp \      owned.cpp \ @@ -158,7 +160,7 @@ libzmq_la_SOURCES = app_thread.hpp \      tcp_listener.cpp \      tcp_socket.cpp \      thread.cpp \ -    upstream.cpp \ +    pull.cpp \      uuid.cpp \      xrep.cpp \      xreq.cpp \ diff --git a/src/Makefile.in b/src/Makefile.in index bc3f00f..1bb65a4 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -61,11 +61,12 @@ libLTLIBRARIES_INSTALL = $(INSTALL)  LTLIBRARIES = $(lib_LTLIBRARIES)  libzmq_la_LIBADD =  am_libzmq_la_OBJECTS = libzmq_la-app_thread.lo libzmq_la-command.lo \ -	libzmq_la-ctx.lo libzmq_la-devpoll.lo libzmq_la-downstream.lo \ +	libzmq_la-ctx.lo libzmq_la-devpoll.lo libzmq_la-push.lo \  	libzmq_la-epoll.lo libzmq_la-err.lo libzmq_la-forwarder.lo \  	libzmq_la-fq.lo libzmq_la-io_object.lo libzmq_la-io_thread.lo \  	libzmq_la-ip.lo libzmq_la-kqueue.lo libzmq_la-lb.lo \ -	libzmq_la-object.lo libzmq_la-options.lo libzmq_la-owned.lo \ +	libzmq_la-msg_store.lo libzmq_la-object.lo \ +	libzmq_la-options.lo libzmq_la-owned.lo \  	libzmq_la-pgm_receiver.lo libzmq_la-pgm_sender.lo \  	libzmq_la-pgm_socket.lo libzmq_la-pair.lo \  	libzmq_la-prefix_tree.lo libzmq_la-pipe.lo libzmq_la-poll.lo \ @@ -74,9 +75,9 @@ am_libzmq_la_OBJECTS = libzmq_la-app_thread.lo libzmq_la-command.lo \  	libzmq_la-signaler.lo libzmq_la-socket_base.lo \  	libzmq_la-streamer.lo libzmq_la-sub.lo \  	libzmq_la-tcp_connecter.lo libzmq_la-tcp_listener.lo \ -	libzmq_la-tcp_socket.lo libzmq_la-thread.lo \ -	libzmq_la-upstream.lo libzmq_la-uuid.lo libzmq_la-xrep.lo \ -	libzmq_la-xreq.lo libzmq_la-zmq.lo libzmq_la-zmq_connecter.lo \ +	libzmq_la-tcp_socket.lo libzmq_la-thread.lo libzmq_la-pull.lo \ +	libzmq_la-uuid.lo libzmq_la-xrep.lo libzmq_la-xreq.lo \ +	libzmq_la-zmq.lo libzmq_la-zmq_connecter.lo \  	libzmq_la-zmq_decoder.lo libzmq_la-zmq_encoder.lo \  	libzmq_la-zmq_engine.lo libzmq_la-zmq_init.lo \  	libzmq_la-zmq_listener.lo @@ -271,7 +272,7 @@ top_srcdir = @top_srcdir@  lib_LTLIBRARIES = libzmq.la  pkgconfigdir = $(libdir)/pkgconfig  pkgconfig_DATA = libzmq.pc -include_HEADERS = ../include/zmq.h ../include/zmq.hpp +include_HEADERS = ../include/zmq.h ../include/zmq.hpp ../include/zmq_utils.h  @BUILD_PGM_TRUE@pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \  @BUILD_PGM_TRUE@    ../foreign/openpgm/@pgm_basename@/openpgm/pgm/time.c \  @BUILD_PGM_TRUE@    ../foreign/openpgm/@pgm_basename@/openpgm/pgm/if.c \ @@ -316,7 +317,7 @@ libzmq_la_SOURCES = app_thread.hpp \      ctx.hpp \      decoder.hpp \      devpoll.hpp \ -    downstream.hpp \ +    push.hpp \      encoder.hpp \      epoll.hpp \      err.hpp \ @@ -334,6 +335,7 @@ libzmq_la_SOURCES = app_thread.hpp \      lb.hpp \      likely.hpp \      msg_content.hpp \ +    msg_store.hpp \      mutex.hpp \      object.hpp \      options.hpp \ @@ -362,7 +364,7 @@ libzmq_la_SOURCES = app_thread.hpp \      tcp_listener.hpp \      tcp_socket.hpp \      thread.hpp \ -    upstream.hpp \ +    pull.hpp \      uuid.hpp \      windows.hpp \      wire.hpp \ @@ -382,7 +384,7 @@ libzmq_la_SOURCES = app_thread.hpp \      command.cpp \      ctx.cpp \      devpoll.cpp \ -    downstream.cpp \ +    push.cpp \      epoll.cpp \      err.cpp \      forwarder.cpp \ @@ -392,6 +394,7 @@ libzmq_la_SOURCES = app_thread.hpp \      ip.cpp \      kqueue.cpp \      lb.cpp \ +    msg_store.cpp \      object.cpp \      options.cpp \      owned.cpp \ @@ -416,7 +419,7 @@ libzmq_la_SOURCES = app_thread.hpp \      tcp_listener.cpp \      tcp_socket.cpp \      thread.cpp \ -    upstream.cpp \ +    pull.cpp \      uuid.cpp \      xrep.cpp \      xreq.cpp \ @@ -497,9 +500,9 @@ $(srcdir)/Makefile.in:  $(srcdir)/Makefile.am  $(am__configure_deps)  	      exit 1;; \  	  esac; \  	done; \ -	echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu  src/Makefile'; \ +	echo ' cd $(top_srcdir) && $(AUTOMAKE) --foreign  src/Makefile'; \  	cd $(top_srcdir) && \ -	  $(AUTOMAKE) --gnu  src/Makefile +	  $(AUTOMAKE) --foreign  src/Makefile  .PRECIOUS: Makefile  Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status  	@case '$?' in \ @@ -579,7 +582,6 @@ distclean-compile:  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-command.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-ctx.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-devpoll.Plo@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-downstream.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-epoll.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-err.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-forwarder.Plo@am__quote@ @@ -600,6 +602,7 @@ distclean-compile:  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-lb.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-log.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-md5.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-msg_store.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-nametoindex.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-net.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-object.Plo@am__quote@ @@ -615,6 +618,8 @@ distclean-compile:  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-poll.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-prefix_tree.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-pub.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-pull.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-push.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-queue.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-rate_control.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-receiver.Plo@am__quote@ @@ -641,7 +646,6 @@ distclean-compile:  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-transport.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-tsi.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-txwi.Plo@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-upstream.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-uuid.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-version.Plo@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-wsastrerror.Plo@am__quote@ @@ -956,12 +960,12 @@ libzmq_la-devpoll.lo: devpoll.cpp  @AMDEP_TRUE@@am__fastdepCXX_FALSE@	DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@  @am__fastdepCXX_FALSE@	$(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-devpoll.lo `test -f 'devpoll.cpp' || echo '$(srcdir)/'`devpoll.cpp -libzmq_la-downstream.lo: downstream.cpp -@am__fastdepCXX_TRUE@	$(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-downstream.lo -MD -MP -MF $(DEPDIR)/libzmq_la-downstream.Tpo -c -o libzmq_la-downstream.lo `test -f 'downstream.cpp' || echo '$(srcdir)/'`downstream.cpp -@am__fastdepCXX_TRUE@	mv -f $(DEPDIR)/libzmq_la-downstream.Tpo $(DEPDIR)/libzmq_la-downstream.Plo -@AMDEP_TRUE@@am__fastdepCXX_FALSE@	source='downstream.cpp' object='libzmq_la-downstream.lo' libtool=yes @AMDEPBACKSLASH@ +libzmq_la-push.lo: push.cpp +@am__fastdepCXX_TRUE@	$(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-push.lo -MD -MP -MF $(DEPDIR)/libzmq_la-push.Tpo -c -o libzmq_la-push.lo `test -f 'push.cpp' || echo '$(srcdir)/'`push.cpp +@am__fastdepCXX_TRUE@	mv -f $(DEPDIR)/libzmq_la-push.Tpo $(DEPDIR)/libzmq_la-push.Plo +@AMDEP_TRUE@@am__fastdepCXX_FALSE@	source='push.cpp' object='libzmq_la-push.lo' libtool=yes @AMDEPBACKSLASH@  @AMDEP_TRUE@@am__fastdepCXX_FALSE@	DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ -@am__fastdepCXX_FALSE@	$(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-downstream.lo `test -f 'downstream.cpp' || echo '$(srcdir)/'`downstream.cpp +@am__fastdepCXX_FALSE@	$(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-push.lo `test -f 'push.cpp' || echo '$(srcdir)/'`push.cpp  libzmq_la-epoll.lo: epoll.cpp  @am__fastdepCXX_TRUE@	$(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-epoll.lo -MD -MP -MF $(DEPDIR)/libzmq_la-epoll.Tpo -c -o libzmq_la-epoll.lo `test -f 'epoll.cpp' || echo '$(srcdir)/'`epoll.cpp @@ -1026,6 +1030,13 @@ libzmq_la-lb.lo: lb.cpp  @AMDEP_TRUE@@am__fastdepCXX_FALSE@	DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@  @am__fastdepCXX_FALSE@	$(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-lb.lo `test -f 'lb.cpp' || echo '$(srcdir)/'`lb.cpp +libzmq_la-msg_store.lo: msg_store.cpp +@am__fastdepCXX_TRUE@	$(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-msg_store.lo -MD -MP -MF $(DEPDIR)/libzmq_la-msg_store.Tpo -c -o libzmq_la-msg_store.lo `test -f 'msg_store.cpp' || echo '$(srcdir)/'`msg_store.cpp +@am__fastdepCXX_TRUE@	mv -f $(DEPDIR)/libzmq_la-msg_store.Tpo $(DEPDIR)/libzmq_la-msg_store.Plo +@AMDEP_TRUE@@am__fastdepCXX_FALSE@	source='msg_store.cpp' object='libzmq_la-msg_store.lo' libtool=yes @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCXX_FALSE@	DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCXX_FALSE@	$(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-msg_store.lo `test -f 'msg_store.cpp' || echo '$(srcdir)/'`msg_store.cpp +  libzmq_la-object.lo: object.cpp  @am__fastdepCXX_TRUE@	$(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-object.lo -MD -MP -MF $(DEPDIR)/libzmq_la-object.Tpo -c -o libzmq_la-object.lo `test -f 'object.cpp' || echo '$(srcdir)/'`object.cpp  @am__fastdepCXX_TRUE@	mv -f $(DEPDIR)/libzmq_la-object.Tpo $(DEPDIR)/libzmq_la-object.Plo @@ -1194,12 +1205,12 @@ libzmq_la-thread.lo: thread.cpp  @AMDEP_TRUE@@am__fastdepCXX_FALSE@	DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@  @am__fastdepCXX_FALSE@	$(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-thread.lo `test -f 'thread.cpp' || echo '$(srcdir)/'`thread.cpp -libzmq_la-upstream.lo: upstream.cpp -@am__fastdepCXX_TRUE@	$(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-upstream.lo -MD -MP -MF $(DEPDIR)/libzmq_la-upstream.Tpo -c -o libzmq_la-upstream.lo `test -f 'upstream.cpp' || echo '$(srcdir)/'`upstream.cpp -@am__fastdepCXX_TRUE@	mv -f $(DEPDIR)/libzmq_la-upstream.Tpo $(DEPDIR)/libzmq_la-upstream.Plo -@AMDEP_TRUE@@am__fastdepCXX_FALSE@	source='upstream.cpp' object='libzmq_la-upstream.lo' libtool=yes @AMDEPBACKSLASH@ +libzmq_la-pull.lo: pull.cpp +@am__fastdepCXX_TRUE@	$(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-pull.lo -MD -MP -MF $(DEPDIR)/libzmq_la-pull.Tpo -c -o libzmq_la-pull.lo `test -f 'pull.cpp' || echo '$(srcdir)/'`pull.cpp +@am__fastdepCXX_TRUE@	mv -f $(DEPDIR)/libzmq_la-pull.Tpo $(DEPDIR)/libzmq_la-pull.Plo +@AMDEP_TRUE@@am__fastdepCXX_FALSE@	source='pull.cpp' object='libzmq_la-pull.lo' libtool=yes @AMDEPBACKSLASH@  @AMDEP_TRUE@@am__fastdepCXX_FALSE@	DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ -@am__fastdepCXX_FALSE@	$(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-upstream.lo `test -f 'upstream.cpp' || echo '$(srcdir)/'`upstream.cpp +@am__fastdepCXX_FALSE@	$(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-pull.lo `test -f 'pull.cpp' || echo '$(srcdir)/'`pull.cpp  libzmq_la-uuid.lo: uuid.cpp  @am__fastdepCXX_TRUE@	$(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-uuid.lo -MD -MP -MF $(DEPDIR)/libzmq_la-uuid.Tpo -c -o libzmq_la-uuid.lo `test -f 'uuid.cpp' || echo '$(srcdir)/'`uuid.cpp diff --git a/src/app_thread.cpp b/src/app_thread.cpp index fbf034c..ac59464 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -46,8 +46,8 @@  #include "rep.hpp"  #include "xreq.hpp"  #include "xrep.hpp" -#include "upstream.hpp" -#include "downstream.hpp" +#include "pull.hpp" +#include "push.hpp"  //  If the RDTSC is available we use it to prevent excessive  //  polling for commands. The nice thing here is that it will work on any @@ -157,11 +157,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)      case ZMQ_XREP:          s = new (std::nothrow) xrep_t (this);          break;        -    case ZMQ_UPSTREAM: -        s = new (std::nothrow) upstream_t (this); +    case ZMQ_PULL: +        s = new (std::nothrow) pull_t (this);          break; -    case ZMQ_DOWNSTREAM: -        s = new (std::nothrow) downstream_t (this); +    case ZMQ_PUSH: +        s = new (std::nothrow) push_t (this);          break;      default:          if (sockets.empty ()) diff --git a/src/ctx.cpp b/src/ctx.cpp index f0e177d..397f692 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -151,6 +151,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)              //  Create the new application thread proxy object.              app_thread_info_t info; +            memset (&info, 0, sizeof (info));              info.associated = false;              info.app_thread = new (std::nothrow) app_thread_t (this,                  io_threads.size () + app_threads.size ()); diff --git a/src/decoder.hpp b/src/decoder.hpp index 1662bda..f05f651 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -55,7 +55,9 @@ namespace zmq              zmq_assert (buf);          } -        inline ~decoder_t () +        //  The destructor doesn't have to be virtual. It is mad virtual +        //  just to keep ICC and code checking tools from complaining. +        inline virtual ~decoder_t ()          {              free (buf);          } diff --git a/src/encoder.hpp b/src/encoder.hpp index 10fe912..0d5b6ba 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -50,7 +50,9 @@ namespace zmq              zmq_assert (buf);          } -        inline ~encoder_t () +        //  The destructor doesn't have to be virtual. It is mad virtual +        //  just to keep ICC and code checking tools from complaining. +        inline virtual ~encoder_t ()          {              free (buf);          } diff --git a/src/forwarder.cpp b/src/forwarder.cpp index 5aab8f2..d1f324e 100644 --- a/src/forwarder.cpp +++ b/src/forwarder.cpp @@ -21,6 +21,7 @@  #include "forwarder.hpp"  #include "socket_base.hpp" +#include "likely.hpp"  #include "err.hpp"  int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_) @@ -29,9 +30,30 @@ int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_)      int rc = zmq_msg_init (&msg);      errno_assert (rc == 0); +    int64_t more; +    size_t more_sz = sizeof (more); +      while (true) { -        insocket_->recv (&msg, 0); -        outsocket_->send (&msg, 0); +        rc = insocket_->recv (&msg, 0); +        if (unlikely (rc < 0)) { +            if (errno == ETERM) +                return -1; +            errno_assert (false); +        } + +        rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz); +        if (unlikely (rc < 0)) { +            if (errno == ETERM) +                return -1; +            errno_assert (false); +        } + +        rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); +        if (unlikely (rc < 0)) { +            if (errno == ETERM) +                return -1; +            errno_assert (false); +        }      }      return 0; diff --git a/src/i_poll_events.hpp b/src/i_poll_events.hpp index 8b85f7a..6d474b2 100644 --- a/src/i_poll_events.hpp +++ b/src/i_poll_events.hpp @@ -28,7 +28,7 @@ namespace zmq      struct i_poll_events      { -        virtual ~i_poll_events () {}; +        virtual ~i_poll_events () {}          // Called by I/O thread when file descriptor is ready for reading.          virtual void in_event () = 0; @@ -289,11 +289,8 @@ int zmq::resolve_ip_hostname (sockaddr_storage *addr_, socklen_t *addr_len_,      //  doesn't really matter, since it's not included in the addr-output.      req.ai_socktype = SOCK_STREAM; -    //  Avoid named services due to unclear socktype, and don't pick IPv4 -    //  addresses if we don't have a local IPv4 address configured. -    //  If this is failing for you on a host with only IPv6 connectivity, -    //  please contribute proper IPv6 support for all functions in this file. -    req.ai_flags = AI_NUMERICSERV | AI_ADDRCONFIG; +    //  Avoid named services due to unclear socktype. +    req.ai_flags = AI_NUMERICSERV;      //  Resolve host name. Some of the error info is lost in case of error,      //  however, there's no way to report EAI errors via errno. diff --git a/src/msg_store.cpp b/src/msg_store.cpp new file mode 100644 index 0000000..aaf6dbe --- /dev/null +++ b/src/msg_store.cpp @@ -0,0 +1,307 @@ +/* +    Copyright (c) 2007-2010 iMatix Corporation + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "platform.hpp" + +#ifdef ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#include <io.h> +#else +#include <unistd.h> +#endif + +#include "../include/zmq.h" + +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <string.h> +#include <sstream> +#include <algorithm> + +#include "atomic_counter.hpp" +#include "msg_store.hpp" +#include "err.hpp" + +zmq::msg_store_t::msg_store_t (int64_t filesize_, size_t block_size_) : +    fd (-1), +    filesize (filesize_), +    file_pos (0), +    write_pos (0), +    read_pos (0), +    block_size (block_size_), +    write_buf_start_addr (0) +{ +    zmq_assert (filesize > 0); +    zmq_assert (block_size > 0); + +    buf1 = new (std::nothrow) char [block_size]; +    zmq_assert (buf1); + +    buf2 = new (std::nothrow) char [block_size]; +    zmq_assert (buf2); + +    read_buf = write_buf = buf1; +} + +zmq::msg_store_t::~msg_store_t () +{ +    delete [] buf1; +    delete [] buf2; + +    if (fd == -1) +        return; + +#ifdef ZMQ_HAVE_WINDOWS +    int rc = _close (fd); +#else +    int rc = close (fd); +#endif +    errno_assert (rc == 0); + +#ifdef ZMQ_HAVE_WINDOWS +    rc = _unlink (filename.c_str ()); +#else +    rc = unlink (filename.c_str ()); +#endif +    errno_assert (rc == 0); +} + +int zmq::msg_store_t::init () +{ +    static zmq::atomic_counter_t seqnum (0); + +    //  Get process ID. +#ifdef ZMQ_HAVE_WINDOWS +    int pid = GetCurrentThreadId (); +#else +    pid_t pid = getpid (); +#endif + +    std::ostringstream outs; +    outs << "zmq_" << pid << '_' << seqnum.get () << ".swap"; +    filename = outs.str (); + +    seqnum.add (1); + +    //  Open the backing file. +#ifdef ZMQ_HAVE_WINDOWS +    fd = _open (filename.c_str (), _O_RDWR | _O_CREAT, 0600); +#else +    fd = open (filename.c_str (), O_RDWR | O_CREAT, 0600); +#endif +    if (fd == -1) +        return -1; + +#ifdef ZMQ_HAVE_LINUX +    //  Enable more aggresive read-ahead optimization. +    posix_fadvise (fd, 0, filesize, POSIX_FADV_SEQUENTIAL); +#endif +    return 0; +} + +bool zmq::msg_store_t::store (zmq_msg_t *msg_) +{ +    size_t msg_size = zmq_msg_size (msg_); + +    //  Check buffer space availability. +    //  NOTE: We always keep one byte open. +    if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size)) +        return false; + +    //  Don't store the ZMQ_MSG_SHARED flag. +    uint8_t msg_flags = msg_->flags & ~ZMQ_MSG_SHARED; + +    //  Write message length, flags, and message body. +    copy_to_file (&msg_size, sizeof msg_size); +    copy_to_file (&msg_flags, sizeof msg_flags); +    copy_to_file (zmq_msg_data (msg_), msg_size); + +    zmq_msg_close (msg_); + +    return true; +} + +void zmq::msg_store_t::fetch (zmq_msg_t *msg_) +{ +    //  There must be at least one message available. +    zmq_assert (read_pos != write_pos); + +    //  Retrieve the message size. +    size_t msg_size; +    copy_from_file (&msg_size, sizeof msg_size); + +    //  Initialize the message. +    zmq_msg_init_size (msg_, msg_size); + +    //  Retrieve the message flags. +    copy_from_file (&msg_->flags, sizeof msg_->flags); + +    //  Retrieve the message payload. +    copy_from_file (zmq_msg_data (msg_), msg_size); +} + +void zmq::msg_store_t::commit () +{ +    commit_pos = write_pos; +} + +void zmq::msg_store_t::rollback () +{ +    if (commit_pos == write_pos || read_pos == write_pos) +        return; + +    if (write_pos > read_pos) +        zmq_assert (read_pos <= commit_pos && commit_pos <= write_pos); +    else +        zmq_assert (read_pos <= commit_pos || commit_pos <= write_pos); + +    if (commit_pos / block_size == read_pos / block_size) { +        write_buf_start_addr = commit_pos % block_size; +        write_buf = read_buf; +    } +    else if (commit_pos / block_size != write_pos / block_size) { +        write_buf_start_addr = commit_pos % block_size; +        fill_buf (write_buf, write_buf_start_addr); +    } +    write_pos = commit_pos; +} + +bool zmq::msg_store_t::empty () +{ +    return read_pos == write_pos; +} + +bool zmq::msg_store_t::full () +{ +    return buffer_space () == 1; +} + +void zmq::msg_store_t::copy_from_file (void *buffer_, size_t count_) +{ +    char *dest_ptr = (char *) buffer_; +    size_t chunk_size, remainder = count_; + +    while (remainder > 0) { +        chunk_size = std::min (remainder,  +            std::min ((size_t) (filesize - read_pos), +            (size_t) (block_size - read_pos % block_size))); + +        memcpy (dest_ptr, &read_buf [read_pos % block_size], chunk_size); +        dest_ptr += chunk_size; + +        read_pos = (read_pos + chunk_size) % filesize; +        if (read_pos % block_size == 0) { +            if (read_pos / block_size == write_pos / block_size) +                read_buf = write_buf; +            else +                fill_buf (read_buf, read_pos); +        } +        remainder -= chunk_size; +    } +} + +void zmq::msg_store_t::copy_to_file (const void *buffer_, size_t count_) +{ +    char *source_ptr = (char *) buffer_; +    size_t chunk_size, remainder = count_; + +    while (remainder > 0) { +        chunk_size = std::min (remainder,  +            std::min ((size_t) (filesize - write_pos), +            (size_t) (block_size - write_pos % block_size))); + +        memcpy (&write_buf [write_pos % block_size], source_ptr, chunk_size); +        source_ptr += chunk_size; + +        write_pos = (write_pos + chunk_size) % filesize; +        if (write_pos % block_size == 0) { +            save_write_buf (); +            write_buf_start_addr = write_pos; + +            if (write_buf == read_buf) { +                if (read_buf == buf2) +                    write_buf = buf1; +                else +                    write_buf = buf2; +            } +        } +        remainder -= chunk_si | 
