summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Lucina <mato@kotelna.sk>2010-09-08 15:25:45 +0200
committerMartin Lucina <martin@lucina.net>2012-01-23 08:53:27 +0100
commit90d73cba9cd1d1724f38ed82fc0eefb1781c9c20 (patch)
tree1760872164a93384d1adb90db9c8d41777dbb2a7 /src
parentcf026feae205bfeb7e007f6afd0e8d7b283865c8 (diff)
parent5ba1cb20fe6f6699cef1cc726718e760cd4c9af1 (diff)
Imported Debian patch 2.0.9.dfsg-1debian/2.0.9.dfsg-1
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am12
-rw-r--r--src/Makefile.in59
-rw-r--r--src/app_thread.cpp12
-rw-r--r--src/ctx.cpp1
-rw-r--r--src/decoder.hpp4
-rw-r--r--src/encoder.hpp4
-rw-r--r--src/forwarder.cpp26
-rw-r--r--src/i_poll_events.hpp2
-rw-r--r--src/ip.cpp7
-rw-r--r--src/msg_store.cpp307
-rw-r--r--src/msg_store.hpp114
-rw-r--r--src/pair.cpp3
-rw-r--r--src/pipe.cpp146
-rw-r--r--src/pipe.hpp32
-rw-r--r--src/platform.hpp.in3
-rw-r--r--src/poll.cpp23
-rw-r--r--src/prefix_tree.cpp12
-rw-r--r--src/prefix_tree.hpp2
-rw-r--r--src/pull.cpp (renamed from src/upstream.cpp)28
-rw-r--r--src/pull.hpp (renamed from src/upstream.hpp)14
-rw-r--r--src/push.cpp (renamed from src/downstream.cpp)28
-rw-r--r--src/push.hpp (renamed from src/downstream.hpp)14
-rw-r--r--src/queue.cpp43
-rw-r--r--src/select.cpp15
-rw-r--r--src/session.cpp4
-rw-r--r--src/signaler.cpp26
-rw-r--r--src/socket_base.cpp29
-rw-r--r--src/streamer.cpp26
-rw-r--r--src/tcp_connecter.cpp15
-rw-r--r--src/tcp_listener.cpp2
-rw-r--r--src/tcp_socket.cpp3
-rw-r--r--src/uuid.cpp4
-rw-r--r--src/xrep.cpp29
-rw-r--r--src/xrep.hpp6
-rw-r--r--src/xreq.cpp27
-rw-r--r--src/xreq.hpp3
-rw-r--r--src/yarray_item.hpp4
-rw-r--r--src/ypipe.hpp17
-rw-r--r--src/zmq.cpp113
-rw-r--r--src/zmq_decoder.cpp7
40 files changed, 1002 insertions, 224 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index fa97ca3..19a80d0 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -3,7 +3,7 @@ lib_LTLIBRARIES = libzmq.la
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libzmq.pc
-include_HEADERS = ../include/zmq.h ../include/zmq.hpp
+include_HEADERS = ../include/zmq.h ../include/zmq.hpp ../include/zmq_utils.h
if BUILD_PGM
pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \
@@ -58,7 +58,7 @@ libzmq_la_SOURCES = app_thread.hpp \
ctx.hpp \
decoder.hpp \
devpoll.hpp \
- downstream.hpp \
+ push.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
@@ -76,6 +76,7 @@ libzmq_la_SOURCES = app_thread.hpp \
lb.hpp \
likely.hpp \
msg_content.hpp \
+ msg_store.hpp \
mutex.hpp \
object.hpp \
options.hpp \
@@ -104,7 +105,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
- upstream.hpp \
+ pull.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
@@ -124,7 +125,7 @@ libzmq_la_SOURCES = app_thread.hpp \
command.cpp \
ctx.cpp \
devpoll.cpp \
- downstream.cpp \
+ push.cpp \
epoll.cpp \
err.cpp \
forwarder.cpp \
@@ -134,6 +135,7 @@ libzmq_la_SOURCES = app_thread.hpp \
ip.cpp \
kqueue.cpp \
lb.cpp \
+ msg_store.cpp \
object.cpp \
options.cpp \
owned.cpp \
@@ -158,7 +160,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
- upstream.cpp \
+ pull.cpp \
uuid.cpp \
xrep.cpp \
xreq.cpp \
diff --git a/src/Makefile.in b/src/Makefile.in
index bc3f00f..1bb65a4 100644
--- a/src/Makefile.in
+++ b/src/Makefile.in
@@ -61,11 +61,12 @@ libLTLIBRARIES_INSTALL = $(INSTALL)
LTLIBRARIES = $(lib_LTLIBRARIES)
libzmq_la_LIBADD =
am_libzmq_la_OBJECTS = libzmq_la-app_thread.lo libzmq_la-command.lo \
- libzmq_la-ctx.lo libzmq_la-devpoll.lo libzmq_la-downstream.lo \
+ libzmq_la-ctx.lo libzmq_la-devpoll.lo libzmq_la-push.lo \
libzmq_la-epoll.lo libzmq_la-err.lo libzmq_la-forwarder.lo \
libzmq_la-fq.lo libzmq_la-io_object.lo libzmq_la-io_thread.lo \
libzmq_la-ip.lo libzmq_la-kqueue.lo libzmq_la-lb.lo \
- libzmq_la-object.lo libzmq_la-options.lo libzmq_la-owned.lo \
+ libzmq_la-msg_store.lo libzmq_la-object.lo \
+ libzmq_la-options.lo libzmq_la-owned.lo \
libzmq_la-pgm_receiver.lo libzmq_la-pgm_sender.lo \
libzmq_la-pgm_socket.lo libzmq_la-pair.lo \
libzmq_la-prefix_tree.lo libzmq_la-pipe.lo libzmq_la-poll.lo \
@@ -74,9 +75,9 @@ am_libzmq_la_OBJECTS = libzmq_la-app_thread.lo libzmq_la-command.lo \
libzmq_la-signaler.lo libzmq_la-socket_base.lo \
libzmq_la-streamer.lo libzmq_la-sub.lo \
libzmq_la-tcp_connecter.lo libzmq_la-tcp_listener.lo \
- libzmq_la-tcp_socket.lo libzmq_la-thread.lo \
- libzmq_la-upstream.lo libzmq_la-uuid.lo libzmq_la-xrep.lo \
- libzmq_la-xreq.lo libzmq_la-zmq.lo libzmq_la-zmq_connecter.lo \
+ libzmq_la-tcp_socket.lo libzmq_la-thread.lo libzmq_la-pull.lo \
+ libzmq_la-uuid.lo libzmq_la-xrep.lo libzmq_la-xreq.lo \
+ libzmq_la-zmq.lo libzmq_la-zmq_connecter.lo \
libzmq_la-zmq_decoder.lo libzmq_la-zmq_encoder.lo \
libzmq_la-zmq_engine.lo libzmq_la-zmq_init.lo \
libzmq_la-zmq_listener.lo
@@ -271,7 +272,7 @@ top_srcdir = @top_srcdir@
lib_LTLIBRARIES = libzmq.la
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libzmq.pc
-include_HEADERS = ../include/zmq.h ../include/zmq.hpp
+include_HEADERS = ../include/zmq.h ../include/zmq.hpp ../include/zmq_utils.h
@BUILD_PGM_TRUE@pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \
@BUILD_PGM_TRUE@ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/time.c \
@BUILD_PGM_TRUE@ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/if.c \
@@ -316,7 +317,7 @@ libzmq_la_SOURCES = app_thread.hpp \
ctx.hpp \
decoder.hpp \
devpoll.hpp \
- downstream.hpp \
+ push.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
@@ -334,6 +335,7 @@ libzmq_la_SOURCES = app_thread.hpp \
lb.hpp \
likely.hpp \
msg_content.hpp \
+ msg_store.hpp \
mutex.hpp \
object.hpp \
options.hpp \
@@ -362,7 +364,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
- upstream.hpp \
+ pull.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
@@ -382,7 +384,7 @@ libzmq_la_SOURCES = app_thread.hpp \
command.cpp \
ctx.cpp \
devpoll.cpp \
- downstream.cpp \
+ push.cpp \
epoll.cpp \
err.cpp \
forwarder.cpp \
@@ -392,6 +394,7 @@ libzmq_la_SOURCES = app_thread.hpp \
ip.cpp \
kqueue.cpp \
lb.cpp \
+ msg_store.cpp \
object.cpp \
options.cpp \
owned.cpp \
@@ -416,7 +419,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
- upstream.cpp \
+ pull.cpp \
uuid.cpp \
xrep.cpp \
xreq.cpp \
@@ -497,9 +500,9 @@ $(srcdir)/Makefile.in: $(srcdir)/Makefile.am $(am__configure_deps)
exit 1;; \
esac; \
done; \
- echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu src/Makefile'; \
+ echo ' cd $(top_srcdir) && $(AUTOMAKE) --foreign src/Makefile'; \
cd $(top_srcdir) && \
- $(AUTOMAKE) --gnu src/Makefile
+ $(AUTOMAKE) --foreign src/Makefile
.PRECIOUS: Makefile
Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status
@case '$?' in \
@@ -579,7 +582,6 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-command.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-ctx.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-devpoll.Plo@am__quote@
-@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-downstream.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-epoll.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-err.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-forwarder.Plo@am__quote@
@@ -600,6 +602,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-lb.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-log.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-md5.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-msg_store.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-nametoindex.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-net.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-object.Plo@am__quote@
@@ -615,6 +618,8 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-poll.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-prefix_tree.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-pub.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-pull.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-push.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-queue.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-rate_control.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-receiver.Plo@am__quote@
@@ -641,7 +646,6 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-transport.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-tsi.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-txwi.Plo@am__quote@
-@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-upstream.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-uuid.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-version.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-wsastrerror.Plo@am__quote@
@@ -956,12 +960,12 @@ libzmq_la-devpoll.lo: devpoll.cpp
@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-devpoll.lo `test -f 'devpoll.cpp' || echo '$(srcdir)/'`devpoll.cpp
-libzmq_la-downstream.lo: downstream.cpp
-@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-downstream.lo -MD -MP -MF $(DEPDIR)/libzmq_la-downstream.Tpo -c -o libzmq_la-downstream.lo `test -f 'downstream.cpp' || echo '$(srcdir)/'`downstream.cpp
-@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-downstream.Tpo $(DEPDIR)/libzmq_la-downstream.Plo
-@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='downstream.cpp' object='libzmq_la-downstream.lo' libtool=yes @AMDEPBACKSLASH@
+libzmq_la-push.lo: push.cpp
+@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-push.lo -MD -MP -MF $(DEPDIR)/libzmq_la-push.Tpo -c -o libzmq_la-push.lo `test -f 'push.cpp' || echo '$(srcdir)/'`push.cpp
+@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-push.Tpo $(DEPDIR)/libzmq_la-push.Plo
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='push.cpp' object='libzmq_la-push.lo' libtool=yes @AMDEPBACKSLASH@
@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
-@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-downstream.lo `test -f 'downstream.cpp' || echo '$(srcdir)/'`downstream.cpp
+@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-push.lo `test -f 'push.cpp' || echo '$(srcdir)/'`push.cpp
libzmq_la-epoll.lo: epoll.cpp
@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-epoll.lo -MD -MP -MF $(DEPDIR)/libzmq_la-epoll.Tpo -c -o libzmq_la-epoll.lo `test -f 'epoll.cpp' || echo '$(srcdir)/'`epoll.cpp
@@ -1026,6 +1030,13 @@ libzmq_la-lb.lo: lb.cpp
@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-lb.lo `test -f 'lb.cpp' || echo '$(srcdir)/'`lb.cpp
+libzmq_la-msg_store.lo: msg_store.cpp
+@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-msg_store.lo -MD -MP -MF $(DEPDIR)/libzmq_la-msg_store.Tpo -c -o libzmq_la-msg_store.lo `test -f 'msg_store.cpp' || echo '$(srcdir)/'`msg_store.cpp
+@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-msg_store.Tpo $(DEPDIR)/libzmq_la-msg_store.Plo
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='msg_store.cpp' object='libzmq_la-msg_store.lo' libtool=yes @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-msg_store.lo `test -f 'msg_store.cpp' || echo '$(srcdir)/'`msg_store.cpp
+
libzmq_la-object.lo: object.cpp
@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-object.lo -MD -MP -MF $(DEPDIR)/libzmq_la-object.Tpo -c -o libzmq_la-object.lo `test -f 'object.cpp' || echo '$(srcdir)/'`object.cpp
@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-object.Tpo $(DEPDIR)/libzmq_la-object.Plo
@@ -1194,12 +1205,12 @@ libzmq_la-thread.lo: thread.cpp
@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-thread.lo `test -f 'thread.cpp' || echo '$(srcdir)/'`thread.cpp
-libzmq_la-upstream.lo: upstream.cpp
-@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-upstream.lo -MD -MP -MF $(DEPDIR)/libzmq_la-upstream.Tpo -c -o libzmq_la-upstream.lo `test -f 'upstream.cpp' || echo '$(srcdir)/'`upstream.cpp
-@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-upstream.Tpo $(DEPDIR)/libzmq_la-upstream.Plo
-@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='upstream.cpp' object='libzmq_la-upstream.lo' libtool=yes @AMDEPBACKSLASH@
+libzmq_la-pull.lo: pull.cpp
+@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-pull.lo -MD -MP -MF $(DEPDIR)/libzmq_la-pull.Tpo -c -o libzmq_la-pull.lo `test -f 'pull.cpp' || echo '$(srcdir)/'`pull.cpp
+@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-pull.Tpo $(DEPDIR)/libzmq_la-pull.Plo
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='pull.cpp' object='libzmq_la-pull.lo' libtool=yes @AMDEPBACKSLASH@
@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
-@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-upstream.lo `test -f 'upstream.cpp' || echo '$(srcdir)/'`upstream.cpp
+@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-pull.lo `test -f 'pull.cpp' || echo '$(srcdir)/'`pull.cpp
libzmq_la-uuid.lo: uuid.cpp
@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-uuid.lo -MD -MP -MF $(DEPDIR)/libzmq_la-uuid.Tpo -c -o libzmq_la-uuid.lo `test -f 'uuid.cpp' || echo '$(srcdir)/'`uuid.cpp
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index fbf034c..ac59464 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -46,8 +46,8 @@
#include "rep.hpp"
#include "xreq.hpp"
#include "xrep.hpp"
-#include "upstream.hpp"
-#include "downstream.hpp"
+#include "pull.hpp"
+#include "push.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@@ -157,11 +157,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
case ZMQ_XREP:
s = new (std::nothrow) xrep_t (this);
break;
- case ZMQ_UPSTREAM:
- s = new (std::nothrow) upstream_t (this);
+ case ZMQ_PULL:
+ s = new (std::nothrow) pull_t (this);
break;
- case ZMQ_DOWNSTREAM:
- s = new (std::nothrow) downstream_t (this);
+ case ZMQ_PUSH:
+ s = new (std::nothrow) push_t (this);
break;
default:
if (sockets.empty ())
diff --git a/src/ctx.cpp b/src/ctx.cpp
index f0e177d..397f692 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -151,6 +151,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
// Create the new application thread proxy object.
app_thread_info_t info;
+ memset (&info, 0, sizeof (info));
info.associated = false;
info.app_thread = new (std::nothrow) app_thread_t (this,
io_threads.size () + app_threads.size ());
diff --git a/src/decoder.hpp b/src/decoder.hpp
index 1662bda..f05f651 100644
--- a/src/decoder.hpp
+++ b/src/decoder.hpp
@@ -55,7 +55,9 @@ namespace zmq
zmq_assert (buf);
}
- inline ~decoder_t ()
+ // The destructor doesn't have to be virtual. It is mad virtual
+ // just to keep ICC and code checking tools from complaining.
+ inline virtual ~decoder_t ()
{
free (buf);
}
diff --git a/src/encoder.hpp b/src/encoder.hpp
index 10fe912..0d5b6ba 100644
--- a/src/encoder.hpp
+++ b/src/encoder.hpp
@@ -50,7 +50,9 @@ namespace zmq
zmq_assert (buf);
}
- inline ~encoder_t ()
+ // The destructor doesn't have to be virtual. It is mad virtual
+ // just to keep ICC and code checking tools from complaining.
+ inline virtual ~encoder_t ()
{
free (buf);
}
diff --git a/src/forwarder.cpp b/src/forwarder.cpp
index 5aab8f2..d1f324e 100644
--- a/src/forwarder.cpp
+++ b/src/forwarder.cpp
@@ -21,6 +21,7 @@
#include "forwarder.hpp"
#include "socket_base.hpp"
+#include "likely.hpp"
#include "err.hpp"
int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_)
@@ -29,9 +30,30 @@ int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_)
int rc = zmq_msg_init (&msg);
errno_assert (rc == 0);
+ int64_t more;
+ size_t more_sz = sizeof (more);
+
while (true) {
- insocket_->recv (&msg, 0);
- outsocket_->send (&msg, 0);
+ rc = insocket_->recv (&msg, 0);
+ if (unlikely (rc < 0)) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
+
+ rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz);
+ if (unlikely (rc < 0)) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
+
+ rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
+ if (unlikely (rc < 0)) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
}
return 0;
diff --git a/src/i_poll_events.hpp b/src/i_poll_events.hpp
index 8b85f7a..6d474b2 100644
--- a/src/i_poll_events.hpp
+++ b/src/i_poll_events.hpp
@@ -28,7 +28,7 @@ namespace zmq
struct i_poll_events
{
- virtual ~i_poll_events () {};
+ virtual ~i_poll_events () {}
// Called by I/O thread when file descriptor is ready for reading.
virtual void in_event () = 0;
diff --git a/src/ip.cpp b/src/ip.cpp
index 79d90da..f491008 100644
--- a/src/ip.cpp
+++ b/src/ip.cpp
@@ -289,11 +289,8 @@ int zmq::resolve_ip_hostname (sockaddr_storage *addr_, socklen_t *addr_len_,
// doesn't really matter, since it's not included in the addr-output.
req.ai_socktype = SOCK_STREAM;
- // Avoid named services due to unclear socktype, and don't pick IPv4
- // addresses if we don't have a local IPv4 address configured.
- // If this is failing for you on a host with only IPv6 connectivity,
- // please contribute proper IPv6 support for all functions in this file.
- req.ai_flags = AI_NUMERICSERV | AI_ADDRCONFIG;
+ // Avoid named services due to unclear socktype.
+ req.ai_flags = AI_NUMERICSERV;
// Resolve host name. Some of the error info is lost in case of error,
// however, there's no way to report EAI errors via errno.
diff --git a/src/msg_store.cpp b/src/msg_store.cpp
new file mode 100644
index 0000000..aaf6dbe
--- /dev/null
+++ b/src/msg_store.cpp
@@ -0,0 +1,307 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "platform.hpp"
+
+#ifdef ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#include <io.h>
+#else
+#include <unistd.h>
+#endif
+
+#include "../include/zmq.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <string.h>
+#include <sstream>
+#include <algorithm>
+
+#include "atomic_counter.hpp"
+#include "msg_store.hpp"
+#include "err.hpp"
+
+zmq::msg_store_t::msg_store_t (int64_t filesize_, size_t block_size_) :
+ fd (-1),
+ filesize (filesize_),
+ file_pos (0),
+ write_pos (0),
+ read_pos (0),
+ block_size (block_size_),
+ write_buf_start_addr (0)
+{
+ zmq_assert (filesize > 0);
+ zmq_assert (block_size > 0);
+
+ buf1 = new (std::nothrow) char [block_size];
+ zmq_assert (buf1);
+
+ buf2 = new (std::nothrow) char [block_size];
+ zmq_assert (buf2);
+
+ read_buf = write_buf = buf1;
+}
+
+zmq::msg_store_t::~msg_store_t ()
+{
+ delete [] buf1;
+ delete [] buf2;
+
+ if (fd == -1)
+ return;
+
+#ifdef ZMQ_HAVE_WINDOWS
+ int rc = _close (fd);
+#else
+ int rc = close (fd);
+#endif
+ errno_assert (rc == 0);
+
+#ifdef ZMQ_HAVE_WINDOWS
+ rc = _unlink (filename.c_str ());
+#else
+ rc = unlink (filename.c_str ());
+#endif
+ errno_assert (rc == 0);
+}
+
+int zmq::msg_store_t::init ()
+{
+ static zmq::atomic_counter_t seqnum (0);
+
+ // Get process ID.
+#ifdef ZMQ_HAVE_WINDOWS
+ int pid = GetCurrentThreadId ();
+#else
+ pid_t pid = getpid ();
+#endif
+
+ std::ostringstream outs;
+ outs << "zmq_" << pid << '_' << seqnum.get () << ".swap";
+ filename = outs.str ();
+
+ seqnum.add (1);
+
+ // Open the backing file.
+#ifdef ZMQ_HAVE_WINDOWS
+ fd = _open (filename.c_str (), _O_RDWR | _O_CREAT, 0600);
+#else
+ fd = open (filename.c_str (), O_RDWR | O_CREAT, 0600);
+#endif
+ if (fd == -1)
+ return -1;
+
+#ifdef ZMQ_HAVE_LINUX
+ // Enable more aggresive read-ahead optimization.
+ posix_fadvise (fd, 0, filesize, POSIX_FADV_SEQUENTIAL);
+#endif
+ return 0;
+}
+
+bool zmq::msg_store_t::store (zmq_msg_t *msg_)
+{
+ size_t msg_size = zmq_msg_size (msg_);
+
+ // Check buffer space availability.
+ // NOTE: We always keep one byte open.
+ if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size))
+ return false;
+
+ // Don't store the ZMQ_MSG_SHARED flag.
+ uint8_t msg_flags = msg_->flags & ~ZMQ_MSG_SHARED;
+
+ // Write message length, flags, and message body.
+ copy_to_file (&msg_size, sizeof msg_size);
+ copy_to_file (&msg_flags, sizeof msg_flags);
+ copy_to_file (zmq_msg_data (msg_), msg_size);
+
+ zmq_msg_close (msg_);
+
+ return true;
+}
+
+void zmq::msg_store_t::fetch (zmq_msg_t *msg_)
+{
+ // There must be at least one message available.
+ zmq_assert (read_pos != write_pos);
+
+ // Retrieve the message size.
+ size_t msg_size;
+ copy_from_file (&msg_size, sizeof msg_size);
+
+ // Initialize the message.
+ zmq_msg_init_size (msg_, msg_size);
+
+ // Retrieve the message flags.
+ copy_from_file (&msg_->flags, sizeof msg_->flags);
+
+ // Retrieve the message payload.
+ copy_from_file (zmq_msg_data (msg_), msg_size);
+}
+
+void zmq::msg_store_t::commit ()
+{
+ commit_pos = write_pos;
+}
+
+void zmq::msg_store_t::rollback ()
+{
+ if (commit_pos == write_pos || read_pos == write_pos)
+ return;
+
+ if (write_pos > read_pos)
+ zmq_assert (read_pos <= commit_pos && commit_pos <= write_pos);
+ else
+ zmq_assert (read_pos <= commit_pos || commit_pos <= write_pos);
+
+ if (commit_pos / block_size == read_pos / block_size) {
+ write_buf_start_addr = commit_pos % block_size;
+ write_buf = read_buf;
+ }
+ else if (commit_pos / block_size != write_pos / block_size) {
+ write_buf_start_addr = commit_pos % block_size;
+ fill_buf (write_buf, write_buf_start_addr);
+ }
+ write_pos = commit_pos;
+}
+
+bool zmq::msg_store_t::empty ()
+{
+ return read_pos == write_pos;
+}
+
+bool zmq::msg_store_t::full ()
+{
+ return buffer_space () == 1;
+}
+
+void zmq::msg_store_t::copy_from_file (void *buffer_, size_t count_)
+{
+ char *dest_ptr = (char *) buffer_;
+ size_t chunk_size, remainder = count_;
+
+ while (remainder > 0) {
+ chunk_size = std::min (remainder,
+ std::min ((size_t) (filesize - read_pos),
+ (size_t) (block_size - read_pos % block_size)));
+
+ memcpy (dest_ptr, &read_buf [read_pos % block_size], chunk_size);
+ dest_ptr += chunk_size;
+
+ read_pos = (read_pos + chunk_size) % filesize;
+ if (read_pos % block_size == 0) {
+ if (read_pos / block_size == write_pos / block_size)
+ read_buf = w