summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Lucina <martin@lucina.net>2012-01-23 08:53:25 +0100
committerMartin Lucina <martin@lucina.net>2012-01-23 08:53:25 +0100
commit5ba1cb20fe6f6699cef1cc726718e760cd4c9af1 (patch)
treedf7b144c5325fd8b3c88c49b456fafc24249abe6 /src
parenta15854bd92db69fcd0b4444fe1b8fe3610a7acf6 (diff)
Imported Upstream version 2.0.9.dfsgupstream/2.0.9.dfsg
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am12
-rw-r--r--src/Makefile.in59
-rw-r--r--src/app_thread.cpp12
-rw-r--r--src/ctx.cpp1
-rw-r--r--src/decoder.hpp4
-rw-r--r--src/encoder.hpp4
-rw-r--r--src/forwarder.cpp26
-rw-r--r--src/i_poll_events.hpp2
-rw-r--r--src/ip.cpp7
-rw-r--r--src/msg_store.cpp307
-rw-r--r--src/msg_store.hpp114
-rw-r--r--src/pair.cpp3
-rw-r--r--src/pipe.cpp146
-rw-r--r--src/pipe.hpp32
-rw-r--r--src/platform.hpp.in3
-rw-r--r--src/poll.cpp23
-rw-r--r--src/prefix_tree.cpp12
-rw-r--r--src/prefix_tree.hpp2
-rw-r--r--src/pull.cpp (renamed from src/upstream.cpp)28
-rw-r--r--src/pull.hpp (renamed from src/upstream.hpp)14
-rw-r--r--src/push.cpp (renamed from src/downstream.cpp)28
-rw-r--r--src/push.hpp (renamed from src/downstream.hpp)14
-rw-r--r--src/queue.cpp43
-rw-r--r--src/select.cpp15
-rw-r--r--src/session.cpp4
-rw-r--r--src/signaler.cpp26
-rw-r--r--src/socket_base.cpp29
-rw-r--r--src/streamer.cpp26
-rw-r--r--src/tcp_connecter.cpp15
-rw-r--r--src/tcp_listener.cpp2
-rw-r--r--src/tcp_socket.cpp3
-rw-r--r--src/uuid.cpp4
-rw-r--r--src/xrep.cpp29
-rw-r--r--src/xrep.hpp6
-rw-r--r--src/xreq.cpp27
-rw-r--r--src/xreq.hpp3
-rw-r--r--src/yarray_item.hpp4
-rw-r--r--src/ypipe.hpp17
-rw-r--r--src/zmq.cpp113
-rw-r--r--src/zmq_decoder.cpp7
40 files changed, 1002 insertions, 224 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index fa97ca3..19a80d0 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -3,7 +3,7 @@ lib_LTLIBRARIES = libzmq.la
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libzmq.pc
-include_HEADERS = ../include/zmq.h ../include/zmq.hpp
+include_HEADERS = ../include/zmq.h ../include/zmq.hpp ../include/zmq_utils.h
if BUILD_PGM
pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \
@@ -58,7 +58,7 @@ libzmq_la_SOURCES = app_thread.hpp \
ctx.hpp \
decoder.hpp \
devpoll.hpp \
- downstream.hpp \
+ push.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
@@ -76,6 +76,7 @@ libzmq_la_SOURCES = app_thread.hpp \
lb.hpp \
likely.hpp \
msg_content.hpp \
+ msg_store.hpp \
mutex.hpp \
object.hpp \
options.hpp \
@@ -104,7 +105,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
- upstream.hpp \
+ pull.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
@@ -124,7 +125,7 @@ libzmq_la_SOURCES = app_thread.hpp \
command.cpp \
ctx.cpp \
devpoll.cpp \
- downstream.cpp \
+ push.cpp \
epoll.cpp \
err.cpp \
forwarder.cpp \
@@ -134,6 +135,7 @@ libzmq_la_SOURCES = app_thread.hpp \
ip.cpp \
kqueue.cpp \
lb.cpp \
+ msg_store.cpp \
object.cpp \
options.cpp \
owned.cpp \
@@ -158,7 +160,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
- upstream.cpp \
+ pull.cpp \
uuid.cpp \
xrep.cpp \
xreq.cpp \
diff --git a/src/Makefile.in b/src/Makefile.in
index bc3f00f..1bb65a4 100644
--- a/src/Makefile.in
+++ b/src/Makefile.in
@@ -61,11 +61,12 @@ libLTLIBRARIES_INSTALL = $(INSTALL)
LTLIBRARIES = $(lib_LTLIBRARIES)
libzmq_la_LIBADD =
am_libzmq_la_OBJECTS = libzmq_la-app_thread.lo libzmq_la-command.lo \
- libzmq_la-ctx.lo libzmq_la-devpoll.lo libzmq_la-downstream.lo \
+ libzmq_la-ctx.lo libzmq_la-devpoll.lo libzmq_la-push.lo \
libzmq_la-epoll.lo libzmq_la-err.lo libzmq_la-forwarder.lo \
libzmq_la-fq.lo libzmq_la-io_object.lo libzmq_la-io_thread.lo \
libzmq_la-ip.lo libzmq_la-kqueue.lo libzmq_la-lb.lo \
- libzmq_la-object.lo libzmq_la-options.lo libzmq_la-owned.lo \
+ libzmq_la-msg_store.lo libzmq_la-object.lo \
+ libzmq_la-options.lo libzmq_la-owned.lo \
libzmq_la-pgm_receiver.lo libzmq_la-pgm_sender.lo \
libzmq_la-pgm_socket.lo libzmq_la-pair.lo \
libzmq_la-prefix_tree.lo libzmq_la-pipe.lo libzmq_la-poll.lo \
@@ -74,9 +75,9 @@ am_libzmq_la_OBJECTS = libzmq_la-app_thread.lo libzmq_la-command.lo \
libzmq_la-signaler.lo libzmq_la-socket_base.lo \
libzmq_la-streamer.lo libzmq_la-sub.lo \
libzmq_la-tcp_connecter.lo libzmq_la-tcp_listener.lo \
- libzmq_la-tcp_socket.lo libzmq_la-thread.lo \
- libzmq_la-upstream.lo libzmq_la-uuid.lo libzmq_la-xrep.lo \
- libzmq_la-xreq.lo libzmq_la-zmq.lo libzmq_la-zmq_connecter.lo \
+ libzmq_la-tcp_socket.lo libzmq_la-thread.lo libzmq_la-pull.lo \
+ libzmq_la-uuid.lo libzmq_la-xrep.lo libzmq_la-xreq.lo \
+ libzmq_la-zmq.lo libzmq_la-zmq_connecter.lo \
libzmq_la-zmq_decoder.lo libzmq_la-zmq_encoder.lo \
libzmq_la-zmq_engine.lo libzmq_la-zmq_init.lo \
libzmq_la-zmq_listener.lo
@@ -271,7 +272,7 @@ top_srcdir = @top_srcdir@
lib_LTLIBRARIES = libzmq.la
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = libzmq.pc
-include_HEADERS = ../include/zmq.h ../include/zmq.hpp
+include_HEADERS = ../include/zmq.h ../include/zmq.hpp ../include/zmq_utils.h
@BUILD_PGM_TRUE@pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \
@BUILD_PGM_TRUE@ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/time.c \
@BUILD_PGM_TRUE@ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/if.c \
@@ -316,7 +317,7 @@ libzmq_la_SOURCES = app_thread.hpp \
ctx.hpp \
decoder.hpp \
devpoll.hpp \
- downstream.hpp \
+ push.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
@@ -334,6 +335,7 @@ libzmq_la_SOURCES = app_thread.hpp \
lb.hpp \
likely.hpp \
msg_content.hpp \
+ msg_store.hpp \
mutex.hpp \
object.hpp \
options.hpp \
@@ -362,7 +364,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
- upstream.hpp \
+ pull.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
@@ -382,7 +384,7 @@ libzmq_la_SOURCES = app_thread.hpp \
command.cpp \
ctx.cpp \
devpoll.cpp \
- downstream.cpp \
+ push.cpp \
epoll.cpp \
err.cpp \
forwarder.cpp \
@@ -392,6 +394,7 @@ libzmq_la_SOURCES = app_thread.hpp \
ip.cpp \
kqueue.cpp \
lb.cpp \
+ msg_store.cpp \
object.cpp \
options.cpp \
owned.cpp \
@@ -416,7 +419,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
- upstream.cpp \
+ pull.cpp \
uuid.cpp \
xrep.cpp \
xreq.cpp \
@@ -497,9 +500,9 @@ $(srcdir)/Makefile.in: $(srcdir)/Makefile.am $(am__configure_deps)
exit 1;; \
esac; \
done; \
- echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu src/Makefile'; \
+ echo ' cd $(top_srcdir) && $(AUTOMAKE) --foreign src/Makefile'; \
cd $(top_srcdir) && \
- $(AUTOMAKE) --gnu src/Makefile
+ $(AUTOMAKE) --foreign src/Makefile
.PRECIOUS: Makefile
Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status
@case '$?' in \
@@ -579,7 +582,6 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-command.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-ctx.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-devpoll.Plo@am__quote@
-@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-downstream.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-epoll.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-err.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-forwarder.Plo@am__quote@
@@ -600,6 +602,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-lb.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-log.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-md5.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-msg_store.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-nametoindex.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-net.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-object.Plo@am__quote@
@@ -615,6 +618,8 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-poll.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-prefix_tree.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-pub.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-pull.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-push.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-queue.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-rate_control.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-receiver.Plo@am__quote@
@@ -641,7 +646,6 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-transport.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-tsi.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-txwi.Plo@am__quote@
-@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-upstream.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-uuid.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-version.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-wsastrerror.Plo@am__quote@
@@ -956,12 +960,12 @@ libzmq_la-devpoll.lo: devpoll.cpp
@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-devpoll.lo `test -f 'devpoll.cpp' || echo '$(srcdir)/'`devpoll.cpp
-libzmq_la-downstream.lo: downstream.cpp
-@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-downstream.lo -MD -MP -MF $(DEPDIR)/libzmq_la-downstream.Tpo -c -o libzmq_la-downstream.lo `test -f 'downstream.cpp' || echo '$(srcdir)/'`downstream.cpp
-@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-downstream.Tpo $(DEPDIR)/libzmq_la-downstream.Plo
-@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='downstream.cpp' object='libzmq_la-downstream.lo' libtool=yes @AMDEPBACKSLASH@
+libzmq_la-push.lo: push.cpp
+@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-push.lo -MD -MP -MF $(DEPDIR)/libzmq_la-push.Tpo -c -o libzmq_la-push.lo `test -f 'push.cpp' || echo '$(srcdir)/'`push.cpp
+@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-push.Tpo $(DEPDIR)/libzmq_la-push.Plo
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='push.cpp' object='libzmq_la-push.lo' libtool=yes @AMDEPBACKSLASH@
@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
-@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-downstream.lo `test -f 'downstream.cpp' || echo '$(srcdir)/'`downstream.cpp
+@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-push.lo `test -f 'push.cpp' || echo '$(srcdir)/'`push.cpp
libzmq_la-epoll.lo: epoll.cpp
@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-epoll.lo -MD -MP -MF $(DEPDIR)/libzmq_la-epoll.Tpo -c -o libzmq_la-epoll.lo `test -f 'epoll.cpp' || echo '$(srcdir)/'`epoll.cpp
@@ -1026,6 +1030,13 @@ libzmq_la-lb.lo: lb.cpp
@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-lb.lo `test -f 'lb.cpp' || echo '$(srcdir)/'`lb.cpp
+libzmq_la-msg_store.lo: msg_store.cpp
+@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-msg_store.lo -MD -MP -MF $(DEPDIR)/libzmq_la-msg_store.Tpo -c -o libzmq_la-msg_store.lo `test -f 'msg_store.cpp' || echo '$(srcdir)/'`msg_store.cpp
+@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-msg_store.Tpo $(DEPDIR)/libzmq_la-msg_store.Plo
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='msg_store.cpp' object='libzmq_la-msg_store.lo' libtool=yes @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-msg_store.lo `test -f 'msg_store.cpp' || echo '$(srcdir)/'`msg_store.cpp
+
libzmq_la-object.lo: object.cpp
@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-object.lo -MD -MP -MF $(DEPDIR)/libzmq_la-object.Tpo -c -o libzmq_la-object.lo `test -f 'object.cpp' || echo '$(srcdir)/'`object.cpp
@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-object.Tpo $(DEPDIR)/libzmq_la-object.Plo
@@ -1194,12 +1205,12 @@ libzmq_la-thread.lo: thread.cpp
@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-thread.lo `test -f 'thread.cpp' || echo '$(srcdir)/'`thread.cpp
-libzmq_la-upstream.lo: upstream.cpp
-@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-upstream.lo -MD -MP -MF $(DEPDIR)/libzmq_la-upstream.Tpo -c -o libzmq_la-upstream.lo `test -f 'upstream.cpp' || echo '$(srcdir)/'`upstream.cpp
-@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-upstream.Tpo $(DEPDIR)/libzmq_la-upstream.Plo
-@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='upstream.cpp' object='libzmq_la-upstream.lo' libtool=yes @AMDEPBACKSLASH@
+libzmq_la-pull.lo: pull.cpp
+@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-pull.lo -MD -MP -MF $(DEPDIR)/libzmq_la-pull.Tpo -c -o libzmq_la-pull.lo `test -f 'pull.cpp' || echo '$(srcdir)/'`pull.cpp
+@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-pull.Tpo $(DEPDIR)/libzmq_la-pull.Plo
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='pull.cpp' object='libzmq_la-pull.lo' libtool=yes @AMDEPBACKSLASH@
@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
-@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-upstream.lo `test -f 'upstream.cpp' || echo '$(srcdir)/'`upstream.cpp
+@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-pull.lo `test -f 'pull.cpp' || echo '$(srcdir)/'`pull.cpp
libzmq_la-uuid.lo: uuid.cpp
@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-uuid.lo -MD -MP -MF $(DEPDIR)/libzmq_la-uuid.Tpo -c -o libzmq_la-uuid.lo `test -f 'uuid.cpp' || echo '$(srcdir)/'`uuid.cpp
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index fbf034c..ac59464 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -46,8 +46,8 @@
#include "rep.hpp"
#include "xreq.hpp"
#include "xrep.hpp"
-#include "upstream.hpp"
-#include "downstream.hpp"
+#include "pull.hpp"
+#include "push.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@@ -157,11 +157,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
case ZMQ_XREP:
s = new (std::nothrow) xrep_t (this);
break;
- case ZMQ_UPSTREAM:
- s = new (std::nothrow) upstream_t (this);
+ case ZMQ_PULL:
+ s = new (std::nothrow) pull_t (this);
break;
- case ZMQ_DOWNSTREAM:
- s = new (std::nothrow) downstream_t (this);
+ case ZMQ_PUSH:
+ s = new (std::nothrow) push_t (this);
break;
default:
if (sockets.empty ())
diff --git a/src/ctx.cpp b/src/ctx.cpp
index f0e177d..397f692 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -151,6 +151,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
// Create the new application thread proxy object.
app_thread_info_t info;
+ memset (&info, 0, sizeof (info));
info.associated = false;
info.app_thread = new (std::nothrow) app_thread_t (this,
io_threads.size () + app_threads.size ());
diff --git a/src/decoder.hpp b/src/decoder.hpp
index 1662bda..f05f651 100644
--- a/src/decoder.hpp
+++ b/src/decoder.hpp
@@ -55,7 +55,9 @@ namespace zmq
zmq_assert (buf);
}
- inline ~decoder_t ()
+ // The destructor doesn't have to be virtual. It is mad virtual
+ // just to keep ICC and code checking tools from complaining.
+ inline virtual ~decoder_t ()
{
free (buf);
}
diff --git a/src/encoder.hpp b/src/encoder.hpp
index 10fe912..0d5b6ba 100644
--- a/src/encoder.hpp
+++ b/src/encoder.hpp
@@ -50,7 +50,9 @@ namespace zmq
zmq_assert (buf);
}
- inline ~encoder_t ()
+ // The destructor doesn't have to be virtual. It is mad virtual
+ // just to keep ICC and code checking tools from complaining.
+ inline virtual ~encoder_t ()
{
free (buf);
}
diff --git a/src/forwarder.cpp b/src/forwarder.cpp
index 5aab8f2..d1f324e 100644
--- a/src/forwarder.cpp
+++ b/src/forwarder.cpp
@@ -21,6 +21,7 @@
#include "forwarder.hpp"
#include "socket_base.hpp"
+#include "likely.hpp"
#include "err.hpp"
int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_)
@@ -29,9 +30,30 @@ int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_)
int rc = zmq_msg_init (&msg);
errno_assert (rc == 0);
+ int64_t more;
+ size_t more_sz = sizeof (more);
+
while (true) {
- insocket_->recv (&msg, 0);
- outsocket_->send (&msg, 0);
+ rc = insocket_->recv (&msg, 0);
+ if (unlikely (rc < 0)) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
+
+ rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz);
+ if (unlikely (rc < 0)) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
+
+ rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
+ if (unlikely (rc < 0)) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
}
return 0;
diff --git a/src/i_poll_events.hpp b/src/i_poll_events.hpp
index 8b85f7a..6d474b2 100644
--- a/src/i_poll_events.hpp
+++ b/src/i_poll_events.hpp
@@ -28,7 +28,7 @@ namespace zmq
struct i_poll_events
{
- virtual ~i_poll_events () {};
+ virtual ~i_poll_events () {}
// Called by I/O thread when file descriptor is ready for reading.
virtual void in_event () = 0;
diff --git a/src/ip.cpp b/src/ip.cpp
index 79d90da..f491008 100644
--- a/src/ip.cpp
+++ b/src/ip.cpp
@@ -289,11 +289,8 @@ int zmq::resolve_ip_hostname (sockaddr_storage *addr_, socklen_t *addr_len_,
// doesn't really matter, since it's not included in the addr-output.
req.ai_socktype = SOCK_STREAM;
- // Avoid named services due to unclear socktype, and don't pick IPv4
- // addresses if we don't have a local IPv4 address configured.
- // If this is failing for you on a host with only IPv6 connectivity,
- // please contribute proper IPv6 support for all functions in this file.
- req.ai_flags = AI_NUMERICSERV | AI_ADDRCONFIG;
+ // Avoid named services due to unclear socktype.
+ req.ai_flags = AI_NUMERICSERV;
// Resolve host name. Some of the error info is lost in case of error,
// however, there's no way to report EAI errors via errno.
diff --git a/src/msg_store.cpp b/src/msg_store.cpp
new file mode 100644
index 0000000..aaf6dbe
--- /dev/null
+++ b/src/msg_store.cpp
@@ -0,0 +1,307 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "platform.hpp"
+
+#ifdef ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#include <io.h>
+#else
+#include <unistd.h>
+#endif
+
+#include "../include/zmq.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <string.h>
+#include <sstream>
+#include <algorithm>
+
+#include "atomic_counter.hpp"
+#include "msg_store.hpp"
+#include "err.hpp"
+
+zmq::msg_store_t::msg_store_t (int64_t filesize_, size_t block_size_) :
+ fd (-1),
+ filesize (filesize_),
+ file_pos (0),
+ write_pos (0),
+ read_pos (0),
+ block_size (block_size_),
+ write_buf_start_addr (0)
+{
+ zmq_assert (filesize > 0);
+ zmq_assert (block_size > 0);
+
+ buf1 = new (std::nothrow) char [block_size];
+ zmq_assert (buf1);
+
+ buf2 = new (std::nothrow) char [block_size];
+ zmq_assert (buf2);
+
+ read_buf = write_buf = buf1;
+}
+
+zmq::msg_store_t::~msg_store_t ()
+{
+ delete [] buf1;
+ delete [] buf2;
+
+ if (fd == -1)
+ return;
+
+#ifdef ZMQ_HAVE_WINDOWS
+ int rc = _close (fd);
+#else
+ int rc = close (fd);
+#endif
+ errno_assert (rc == 0);
+
+#ifdef ZMQ_HAVE_WINDOWS
+ rc = _unlink (filename.c_str ());
+#else
+ rc = unlink (filename.c_str ());
+#endif
+ errno_assert (rc == 0);
+}
+
+int zmq::msg_store_t::init ()
+{
+ static zmq::atomic_counter_t seqnum (0);
+
+ // Get process ID.
+#ifdef ZMQ_HAVE_WINDOWS
+ int pid = GetCurrentThreadId ();
+#else
+ pid_t pid = getpid ();
+#endif
+
+ std::ostringstream outs;
+ outs << "zmq_" << pid << '_' << seqnum.get () << ".swap";
+ filename = outs.str ();
+
+ seqnum.add (1);
+
+ // Open the backing file.
+#ifdef ZMQ_HAVE_WINDOWS
+ fd = _open (filename.c_str (), _O_RDWR | _O_CREAT, 0600);
+#else
+ fd = open (filename.c_str (), O_RDWR | O_CREAT, 0600);
+#endif
+ if (fd == -1)
+ return -1;
+
+#ifdef ZMQ_HAVE_LINUX
+ // Enable more aggresive read-ahead optimization.
+ posix_fadvise (fd, 0, filesize, POSIX_FADV_SEQUENTIAL);
+#endif
+ return 0;
+}
+
+bool zmq::msg_store_t::store (zmq_msg_t *msg_)
+{
+ size_t msg_size = zmq_msg_size (msg_);
+
+ // Check buffer space availability.
+ // NOTE: We always keep one byte open.
+ if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size))
+ return false;
+
+ // Don't store the ZMQ_MSG_SHARED flag.
+ uint8_t msg_flags = msg_->flags & ~ZMQ_MSG_SHARED;
+
+ // Write message length, flags, and message body.
+ copy_to_file (&msg_size, sizeof msg_size);
+ copy_to_file (&msg_flags, sizeof msg_flags);
+ copy_to_file (zmq_msg_data (msg_), msg_size);
+
+ zmq_msg_close (msg_);
+
+ return true;
+}
+
+void zmq::msg_store_t::fetch (zmq_msg_t *msg_)
+{
+ // There must be at least one message available.
+ zmq_assert (read_pos != write_pos);
+
+ // Retrieve the message size.
+ size_t msg_size;
+ copy_from_file (&msg_size, sizeof msg_size);
+
+ // Initialize the message.
+ zmq_msg_init_size (msg_, msg_size);
+
+ // Retrieve the message flags.
+ copy_from_file (&msg_->flags, sizeof msg_->flags);
+
+ // Retrieve the message payload.
+ copy_from_file (zmq_msg_data (msg_), msg_size);
+}
+
+void zmq::msg_store_t::commit ()
+{
+ commit_pos = write_pos;
+}
+
+void zmq::msg_store_t::rollback ()
+{
+ if (commit_pos == write_pos || read_pos == write_pos)
+ return;
+
+ if (write_pos > read_pos)
+ zmq_assert (read_pos <= commit_pos && commit_pos <= write_pos);
+ else
+ zmq_assert (read_pos <= commit_pos || commit_pos <= write_pos);
+
+ if (commit_pos / block_size == read_pos / block_size) {
+ write_buf_start_addr = commit_pos % block_size;
+ write_buf = read_buf;
+ }
+ else if (commit_pos / block_size != write_pos / block_size) {
+ write_buf_start_addr = commit_pos % block_size;
+ fill_buf (write_buf, write_buf_start_addr);
+ }
+ write_pos = commit_pos;
+}
+
+bool zmq::msg_store_t::empty ()
+{
+ return read_pos == write_pos;
+}
+
+bool zmq::msg_store_t::full ()
+{
+ return buffer_space () == 1;
+}
+
+void zmq::msg_store_t::copy_from_file (void *buffer_, size_t count_)
+{
+ char *dest_ptr = (char *) buffer_;
+ size_t chunk_size, remainder = count_;
+
+ while (remainder > 0) {
+ chunk_size = std::min (remainder,
+ std::min ((size_t) (filesize - read_pos),
+ (size_t) (block_size - read_pos % block_size)));
+
+ memcpy (dest_ptr, &read_buf [read_pos % block_size], chunk_size);
+ dest_ptr += chunk_size;
+
+ read_pos = (read_pos + chunk_size) % filesize;
+ if (read_pos % block_size == 0) {
+ if (read_pos / block_size == write_pos / block_size)
+ read_buf = write_buf;
+ else
+ fill_buf (read_buf, read_pos);
+ }
+ remainder -= chunk_size;
+ }
+}
+
+void zmq::msg_store_t::copy_to_file (const void *buffer_, size_t count_)
+{
+ char *source_ptr = (char *) buffer_;
+ size_t chunk_size, remainder = count_;
+
+ while (remainder > 0) {
+ chunk_size = std::min (remainder,
+ std::min ((size_t) (filesize - write_pos),
+ (size_t) (block_size - write_pos % block_size)));
+
+ memcpy (&write_buf [write_pos % block_size], source_ptr, chunk_size);
+ source_ptr += chunk_size;
+
+ write_pos = (write_pos + chunk_size) % filesize;
+ if (write_pos % block_size == 0) {
+ save_write_buf ();
+ write_buf_start_addr = write_pos;
+
+ if (write_buf == read_buf) {
+ if (read_buf == buf2)
+ write_buf = buf1;
+ else
+ write_buf = buf2;
+ }
+ }
+ remainder -= chunk_size;
+ }
+}
+
+void zmq::msg_store_t::fill_buf (char *buf, int64_t pos)
+{
+ if (file_pos != pos) {
+#ifdef ZMQ_HAVE_WINDOWS
+ __int64 offset = _lseeki64 (fd, pos, SEEK_SET);
+#else
+ off_t offset = lseek (fd, (off_t) pos, SEEK_SET);
+#endif
+ errno_assert (offset == pos);
+ file_pos = pos;
+ }
+ size_t octets_stored = 0;
+ size_t octets_total = std::min (block_size, (size_t) (filesize - file_pos));
+
+ while (octets_stored < octets_total) {
+#ifdef ZMQ_HAVE_WINDOWS
+ int rc = _read (fd, &buf [octets_stored], octets_total - octets_stored);
+#else
+ ssize_t rc = read (fd, &buf [octets_stored], octets_total - octets_stored);
+#endif
+ errno_assert (rc > 0);
+ octets_stored += rc;
+ }
+ file_pos += octets_total;
+}
+
+void zmq::msg_store_t::save_write_buf ()
+{
+ if (file_pos != write_buf_start_addr) {
+#ifdef ZMQ_HAVE_WINDOWS
+ __int64 offset = _lseeki64 (fd, write_buf_start_addr, SEEK_SET);
+#else
+ off_t offset = lseek (fd, (off_t) write_buf_start_addr, SEEK_SET);
+#endif
+ errno_assert (offset == write_buf_start_addr);
+ file_pos = write_buf_start_addr;
+ }
+ size_t octets_stored = 0;
+ size_t octets_total = std::min (block_size, (size_t) (filesize - file_pos));
+
+ while (octets_stored < octets_total) {
+#ifdef ZMQ_HAVE_WINDOWS
+ int rc = _write (fd, &write_buf [octets_stored], octets_total - octets_stored);
+#else
+ ssize_t rc = write (fd, &write_buf [octets_stored], octets_total - octets_stored);
+#endif
+ errno_assert (rc > 0);
+ octets_stored += rc;
+ }
+ file_pos += octets_total;
+}
+
+int64_t zmq::msg_store_t::buffer_space ()
+{
+ if (write_pos < read_pos)
+ return read_pos - write_pos;
+
+ return filesize - (write_pos - read_pos);
+}
diff --git a/src/msg_store.hpp b/src/msg_store.hpp
new file mode 100644
index 0000000..765fc60
--- /dev/null
+++ b/src/msg_store.hpp
@@ -0,0 +1,114 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_MSG_STORE_HPP_INCLUDED__
+#define __ZMQ_MSG_STORE_HPP_INCLUDED__
+
+#include "../include/zmq.h"
+
+#include <string>
+#include "stdint.hpp"
+
+namespace zmq
+{
+
+ // This class implements a message store. Messages are retrieved from
+ // the store in the same order as they entered it.
+
+ class msg_store_t
+ {
+ public:
+
+ enum { default_block_size = 8192 };
+
+ // Creates message store.
+ msg_store_t (int64_t filesize_, size_t block_size_ = default_block_size);
+
+ ~msg_store_t ();
+
+ int init ();
+
+ // Stores the message into the message store. The function
+ // returns false if the message store is full; true otherwise.
+ bool store (zmq_msg_t *msg_);
+
+ // Fetches the oldest message from the message store. It is an error
+ // to call this function when the message store is empty.
+ void fetch (zmq_msg_t *msg_);
+
+ void commit ();
+
+ void rollback ();
+
+ // Returns true if the message store is empty; false otherwise.
+ bool empty ();
+
+ // Returns true if and only if the store is full.
+ bool full ();
+
+ private:
+
+ // Copies data from a memory buffer to the backing file.
+ // Wraps around when reaching maximum file size.
+ void copy_from_file (void *buffer_, size_t count_);
+
+ // Copies data from the backing file to the memory buffer.
+ // Wraps around when reaching end-of-file.
+ void copy_to_file (const void *buffer_, size_t count_);
+
+ // Returns the buffer space available.
+ int64_t buffer_space ();
+
+ void fill_buf (char *buf, int64_t pos);
+
+ void save_write_buf ();
+
+ // File descriptor to the backing file.
+ int fd;
+
+ // Name of the backing file.
+ std::string filename;
+
+ // Maximum size of the backing file.
+ int64_t filesize;
+
+ // File offset associated with the fd file descriptor.
+ int64_t file_pos;
+
+ // File offset the next message will be stored at.
+ int64_t write_pos;
+
+ // File offset the next message will be read from.
+ int64_t read_pos;
+
+ int64_t commit_pos;
+
+ size_t block_size;
+
+ char *buf1;
+ char *buf2;
+ char *read_buf;
+ char *write_buf;
+
+ int64_t write_buf_start_addr;
+ };
+
+}
+
+#endif
diff --git a/src/pair.cpp b/src/pair.cpp
index 31524de..3872b28 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -115,6 +115,9 @@ int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_)
zmq_msg_close (msg_);
if (!alive || !inpipe || !inpipe->read (msg_)) {
+ // No message is available. Initialise the output parameter
+ // to be a 0-byte message.
+ zmq_msg_init (msg_);
errno = EAGAIN;
return -1;
}
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 1df64e9..200beb0 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -21,20 +21,14 @@
#include "pipe.hpp"
-zmq::reader_t::reader_t (object_t *parent_,
- uint64_t hwm_, uint64_t lwm_) :
+zmq::reader_t::reader_t (object_t *parent_, uint64_t lwm_) :
object_t (parent_),
pipe (NULL),
peer (NULL),
- hwm (hwm_),
lwm (lwm_),
msgs_read (0),
endpoint (NULL)
-{
- // Adjust lwm and hwm.
- if (lwm == 0 || lwm > hwm)
- lwm = hwm;
-}
+{}
zmq::reader_t::~reader_t ()
{
@@ -50,15 +44,32 @@ void zmq::reader_t::set_pipe (pipe_t *pipe_)
register_pipe (pipe);
}
+bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_)
+{
+ unsigned char *offset = 0;
+
+ return msg_.content == (void*) (offset + ZMQ_DELIMITER);
+}
+
bool zmq::reader_t::check_read ()
{
// Check if there's an item in the pipe.
- if (pipe->check_read ())
- return true;
-
// If not, deactivate the pipe.
- endpoint->kill (this);
- return false;
+ if (!pipe->check_read ()) {
+ endpoint->kill (this);
+ return false;
+ }
+
+ // If the next item in the pipe is message delimiter,
+ // initiate its termination.
+ if (pipe->probe (is_delimiter)) {
+ if (endpoint)
+ endpoint->detach_inpipe (this);
+ term ();
+ return false;
+ }
+
+ return true;
}
bool zmq::reader_t::read (zmq_msg_t *msg_)
@@ -113,20 +124,28 @@ void zmq::reader_t::process_pipe_term_ack ()
}
zmq::writer_t::writer_t (object_t *parent_,
- uint64_t hwm_, uint64_t lwm_) :
+ uint64_t hwm_, int64_t swap_size_) :
object_t (parent_),
pipe (NULL),
peer (NULL),
hwm (hwm_),
- lwm (lwm_),
msgs_read (0),
msgs_written (0),
+ msg_store (NULL),
+ extra_msg_flag (false),
stalled (false),
+ pending_close (false),
endpoint (NULL)
{
- // Adjust lwm and hwm.
- if (lwm == 0 || lwm > hwm)
- lwm = hwm;
+ if (swap_size_ > 0) {
+ msg_store = new (std::nothrow) msg_store_t (swap_size_);
+ if (msg_store != NULL) {
+ if (msg_store->init () < 0) {
+ delete msg_store;
+ msg_store = NULL;
+ }
+ }
+ }
}
void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
@@ -136,6 +155,10 @@ void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
zmq::writer_t::~writer_t ()
{
+ if (extra_msg_flag)
+ zmq_msg_close (&extra_msg);
+
+ delete msg_store;
}
void zmq::writer_t::set_pipe (pipe_t *pipe_)
@@ -147,7 +170,7 @@ void zmq::writer_t::set_pipe (pipe_t *pipe_)
bool zmq::writer_t::check_write ()
{
- if (pipe_full ()) {
+ if (pipe_full () && (msg_store == NULL || msg_store->full () || extra_msg_flag)) {
stalled = true;
return false;
}
@@ -157,29 +180,45 @@ bool zmq::writer_t::check_write ()
bool zmq::writer_t::write (zmq_msg_t *msg_)
{
- if (pipe_full ()) {
- stalled = true;
+ if (!check_write ())
return false;
+
+ if (pipe_full ()) {
+ if (msg_store->store (msg_)) {
+ if (!(msg_->flags & ZMQ_MSG_MORE))
+ msg_store->commit ();
+ } else {
+ extra_msg = *msg_;
+ extra_msg_flag = true;
+ }
+ }
+ else {
+ pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE);
+ if (!(msg_->flags & ZMQ_MSG_MORE))
+ msgs_written++;
}
- pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE);
- if (!(msg_->flags & ZMQ_MSG_MORE))
- msgs_written++;
return true;
}
void zmq::writer_t::rollback ()
{
- zmq_msg_t msg;
+ if (extra_msg_flag && extra_msg.flags & ZMQ_MSG_MORE) {
+ zmq_msg_close (&extra_msg);
+ extra_msg_flag = false;
+ }
+ if (msg_store != NULL)
+ msg_store->rollback ();
+
+ zmq_msg_t msg;
// Remove all incomplete messages from the pipe.
while (pipe->unwrite (&msg)) {
zmq_assert (msg.flags & ZMQ_MSG_MORE);
zmq_msg_close (&msg);
- msgs_written--;
}
- if (stalled && endpoint != NULL && !pipe_full()) {
+ if (stalled && endpoint != NULL && check_write ()) {
stalled = false;
endpoint->revive (this);
}
@@ -198,6 +237,14 @@ void zmq::writer_t::term ()
// Rollback any unfinished messages.
rollback ();
+ if (msg_store == NULL || (msg_store->empty () && !extra_msg_flag))
+ write_delimiter ();
+ else
+ pending_close = true;
+}
+
+void zmq::writer_t::write_delimiter ()
+{
// Push delimiter into the pipe.
// Trick the compiler to belive that the tag is a valid pointer.
zmq_msg_t msg;
@@ -205,12 +252,47 @@ void zmq::writer_t::term ()
msg.content = (void*) (offset + ZMQ_DELIMITER);
msg.flags = 0;
pipe->write (msg, false);
- pipe->flush ();
+ flush ();
}
void zmq::writer_t::process_reader_info (uint64_t msgs_read_)
{
+ zmq_msg_t msg;
+
msgs_read = msgs_read_;
+ if (msg_store) {
+
+ // Move messages from backing store into pipe.
+ while (!pipe_full () && !msg_store->empty ()) {
+ msg_store->fetch(&msg);
+ // Write message into the pipe.
+ pipe->write (msg, msg.flags & ZMQ_MSG_MORE);
+ if (!(msg.flags & ZMQ_MSG_MORE))
+ msgs_written++;
+ }
+
+ if (extra_msg_flag) {
+ if (!pipe_full ()) {
+ pipe->write (extra_msg, extra_msg.flags & ZMQ_MSG_MORE);
+ if (!(extra_msg.flags & ZMQ_MSG_MORE))
+ msgs_written++;
+ extra_msg_flag = false;
+ }
+ else if (msg_store->store (&extra_msg)) {
+ if (!(extra_msg.flags & ZMQ_MSG_MORE))
+ msg_store->commit ();
+ extra_msg_flag = false;
+ }
+ }
+
+ if (pending_close && msg_store->empty () && !extra_msg_flag) {
+ write_delimiter ();
+ pending_close = false;
+ }
+
+ flush ();
+ }
+
if (stalled && endpoint != NULL) {
stalled = false;
endpoint->revive (this);
@@ -233,9 +315,9 @@ bool zmq::writer_t::pipe_full ()
}
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
- uint64_t hwm_) :
- reader (reader_parent_, hwm_, compute_lwm (hwm_)),
- writer (writer_parent_, hwm_, compute_lwm (hwm_))
+ uint64_t hwm_, int64_t swap_size_) :
+ reader (reader_parent_, compute_lwm (hwm_)),
+ writer (writer_parent_, hwm_, swap_size_)
{
reader.set_pipe (this);
writer.set_pipe (this);
@@ -276,6 +358,6 @@ uint64_t zmq::pipe_t::compute_lwm (uint64_t hwm_)
if (hwm_ > max_wm_delta * 2)
return hwm_ - max_wm_delta;
else
- return hwm_ / 2;
+ return (hwm_ + 1) / 2;
}
diff --git a/src/pipe.hpp b/src/pipe.hpp
index 9f57653..ece678a 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -26,6 +26,7 @@
#include "i_endpoint.hpp"
#include "yarray_item.hpp"
#include "ypipe.hpp"
+#include "msg_store.hpp"
#include "config.hpp"
#include "object.hpp"
@@ -36,8 +37,7 @@ namespace zmq
{
public:
- reader_t (class object_t *parent_,
- uint64_t hwm_, uint64_t lwm_);
+ reader_t (class object_t *parent_, uint64_t lwm_);
~reader_t ();
void set_pipe (class pipe_t *pipe_);
@@ -58,14 +58,16 @@ namespace zmq
void process_revive ();
void process_pipe_term_ack ();
+ // Returns true if the message is delimiter; false otherwise.
+ static bool is_delimiter (zmq_msg_t &msg_);
+
// The underlying pipe.
class pipe_t *pipe;
// Pipe writer associated with the other side of the pipe.
class writer_t *peer;
- // High and low watermarks for in-memory storage (in bytes).
- uint64_t hwm;
+ // Low watermark for in-memory storage (in bytes).
uint64_t lwm;
// Number of messages read so far.
@@ -82,8 +84,7 @@ namespace zmq
{
public:
- writer_t (class object_t *parent_,
- uint64_t hwm_, uint64_t lwm_);
+ writer_t (class object_t *parent_, uint64_t hwm_, int64_t swap_size_);
~writer_t ();
void set_pipe (class pipe_t *pipe_);
@@ -117,15 +118,18 @@ namespace zmq
// Tests whether the pipe is already full.
bool pipe_full ();
+ // Write special message to the pipe so that the reader
+ // can find out we are finished.
+ void write_delimiter ();
+
// The underlying pipe.
class pipe_t *pipe;
// Pipe reader associated with the other side of the pipe.
class reader_t *peer;
- // High and low watermarks for in-memory storage (in bytes).
+ // High watermark for in-memory storage (in bytes).
uint64_t hwm;
- uint64_t lwm;
// Last confirmed number of messages read from the pipe.
// The actual number can be higher.
@@ -134,9 +138,19 @@ namespace zmq
// Number of messages we have written so far.
uint64_t msgs_written;
+ // Pointer to backing store. If NULL, messages are always
+ // kept in main memory.
+ msg_store_t *msg_store;
+
+ bool extra_msg_flag;
+
+ zmq_msg_t extra_msg;
+
// True iff the last attempt to write a message has failed.
bool stalled;
+ bool pending_close;
+
// Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint;
@@ -150,7 +164,7 @@ namespace zmq
public:
pipe_t (object_t *reader_parent_, object_t *writer_parent_,
- uint64_t hwm_);
+ uint64_t hwm_, int64_t swap_size_);
~pipe_t ();
reader_t reader;
diff --git a/src/platform.hpp.in b/src/platform.hpp.in
index a885c21..bab50ec 100644
--- a/src/platform.hpp.in
+++ b/src/platform.hpp.in
@@ -24,6 +24,9 @@
/* Define to 1 if you have the <inttypes.h> header file. */
#undef HAVE_INTTYPES_H
+/* Define to 1 if you have the `crypto' library (-lcrypto). */
+#undef HAVE_LIBCRYPTO
+
/* Define to 1 if you have the `iphlpapi' library (-liphlpapi). */
#undef HAVE_LIBIPHLPAPI
diff --git a/src/poll.cpp b/src/poll.cpp
index 4214195..1b203db 100644
--- a/src/poll.cpp
+++ b/src/poll.cpp
@@ -165,22 +165,21 @@ void zmq::poll_t::loop ()
continue;
}
- for (pollset_t::iterator it = pollset.begin ();
- it != pollset.end (); it ++) {
+ for (pollset_t::size_type i = 0; i != pollset.size (); i++) {
- zmq_assert (!(it->revents & POLLNVAL));
- if (it->fd == retired_fd)
+ zmq_assert (!(pollset [i].revents & POLLNVAL));
+ if (pollset [i].fd == retired_fd)
continue;
- if (it->revents & (POLLERR | POLLHUP))
- fd_table [it->fd].events->in_event ();
- if (it->fd == retired_fd)
+ if (pollset [i].revents & (POLLERR | POLLHUP))
+ fd_table [pollset [i].fd].events->in_event ();
+ if (pollset [i].fd == retired_fd)
continue;
- if (it->revents & POLLOUT)
- fd_table [it->fd].events->out_event ();
- if (it->fd == retired_fd)
+ if (pollset [i].revents & POLLOUT)
+ fd_table [pollset [i].fd].events->out_event ();
+ if (pollset [i].fd == retired_fd)
continue;
- if (it->revents & POLLIN)
- fd_table [it->fd].events->in_event ();
+ if (pollset [i].revents & POLLIN)
+ fd_table [pollset [i].fd].events->in_event ();
}
// Clean up the pollset and update the fd_table accordingly.
diff --git a/src/prefix_tree.cpp b/src/prefix_tree.cpp
index 51225d6..6d4f084 100644
--- a/src/prefix_tree.cpp
+++ b/src/prefix_tree.cpp
@@ -42,7 +42,7 @@ zmq::prefix_tree_t::~prefix_tree_t ()
if (count == 1)
delete next.node;
else if (count > 1) {
- for (unsigned char i = 0; i != count; ++i)
+ for (unsigned short i = 0; i != count; ++i)
if (next.table [i])
delete next.table [i];
free (next.table);
@@ -74,7 +74,7 @@ void zmq::prefix_tree_t::add (unsigned char *prefix_, size_t size_)
next.table = (prefix_tree_t**)
malloc (sizeof (prefix_tree_t*) * count);
zmq_assert (next.table);
- for (unsigned char i = 0; i != count; ++i)
+ for (unsigned short i = 0; i != count; ++i)
next.table [i] = 0;
min = std::min (min, c);
next.table [oldc - min] = oldp;
@@ -82,25 +82,25 @@ void zmq::prefix_tree_t::add (unsigned char *prefix_, size_t size_)
else if (min < c) {
// The new character is above the current character range.
- unsigned char old_count = count;
+ unsigned short old_count = count;
count = c - min + 1;
next.table = (prefix_tree_t**) realloc ((void*) next.table,
sizeof (prefix_tree_t*) * count);
zmq_assert (next.table);
- for (unsigned char i = old_count; i != count; i++)
+ for (unsigned short i = old_count; i != count; i++)
next.table [i] = NULL;
}
else {
// The new character is below the current character range.
- unsigned char old_count = count;
+ unsigned short old_count = count;
count = (min + old_count) - c;
next.table = (prefix_tree_t**) realloc ((void*) next.table,
sizeof (prefix_tree_t*) * count);
zmq_assert (next.table);
memmove (next.table + min - c, next.table,
old_count * sizeof (prefix_tree_t*));
- for (unsigned char i = 0; i != min - c; i++)
+ for (unsigned short i = 0; i != min - c; i++)
next.table [i] = NULL;
min = c;
}
diff --git a/src/prefix_tree.hpp b/src/prefix_tree.hpp
index 53c7c18..bf1c4b9 100644
--- a/src/prefix_tree.hpp
+++ b/src/prefix_tree.hpp
@@ -42,7 +42,7 @@ namespace zmq
uint32_t refcnt;
unsigned char min;
- unsigned char count;
+ unsigned short count;
union {
class prefix_tree_t *node;
class prefix_tree_t **table;
diff --git a/src/upstream.cpp b/src/pull.cpp
index 1498c31..b2413ee 100644
--- a/src/upstream.cpp
+++ b/src/pull.cpp
@@ -19,55 +19,55 @@
#include "../include/zmq.h"
-#include "upstream.hpp"
+#include "pull.hpp"
#include "err.hpp"
-zmq::upstream_t::upstream_t (class app_thread_t *parent_) :
+zmq::pull_t::pull_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
options.requires_in = true;
options.requires_out = false;
}
-zmq::upstream_t::~upstream_t ()
+zmq::pull_t::~pull_t ()
{
}
-void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_,
+void zmq::pull_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && !outpipe_);
fq.attach (inpipe_);
}
-void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_)
+void zmq::pull_t::xdetach_inpipe (class reader_t *pipe_)
{
zmq_assert (pipe_);
fq.detach (pipe_);
}
-void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_)
+void zmq::pull_t::xdetach_outpipe (class writer_t *pipe_)
{
// There are no outpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
-void zmq::upstream_t::xkill (class reader_t *pipe_)
+void zmq::pull_t::xkill (class reader_t *pipe_)
{
fq.kill (pipe_);
}
-void zmq::upstream_t::xrevive (class reader_t *pipe_)
+void zmq::pull_t::xrevive (class reader_t *pipe_)
{
fq.revive (pipe_);
}
-void zmq::upstream_t::xrevive (class writer_t *pipe_)
+void zmq::pull_t::xrevive (class writer_t *pipe_)
{
zmq_assert (false);
}
-int zmq::upstream_t::xsetsockopt (int option_, const void *optval_,
+int zmq::pull_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
// No special options for this socket type.
@@ -75,23 +75,23 @@ int zmq::upstream_t::xsetsockopt (int option_, const void *optval_,
return -1;
}
-int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_)
+int zmq::pull_t::xsend (zmq_msg_t *msg_, int flags_)
{
errno = ENOTSUP;
return -1;
}
-int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_)
+int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_)
{
return fq.recv (msg_, flags_);
}
-bool zmq::upstream_t::xhas_in ()
+bool zmq::pull_t::xhas_in ()
{
return fq.has_in ();
}
-bool zmq::upstream_t::xhas_out ()
+bool zmq::pull_t::xhas_out ()
{
return false;
}
diff --git a/src/upstream.hpp b/src/pull.hpp
index 5fe42ae..7f249e9 100644
--- a/src/upstream.hpp
+++ b/src/pull.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__
-#define __ZMQ_UPSTREAM_HPP_INCLUDED__
+#ifndef __ZMQ_PULL_HPP_INCLUDED__
+#define __ZMQ_PULL_HPP_INCLUDED__
#include "socket_base.hpp"
#include "fq.hpp"
@@ -26,12 +26,12 @@
namespace zmq
{
- class upstream_t : public socket_base_t
+ class pull_t : public socket_base_t
{
public:
- upstream_t (class app_thread_t *parent_);
- ~upstream_t ();
+ pull_t (class app_thread_t *parent_);
+ ~pull_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
@@ -52,8 +52,8 @@ namespace zmq
// Fair queueing object for inbound pipes.
fq_t fq;
- upstream_t (const upstream_t&);
- void operator = (const upstream_t&);
+ pull_t (const pull_t&);
+ void operator = (const pull_t&);
};
diff --git a/src/downstream.cpp b/src/push.cpp
index 4074a9e..522101f 100644
--- a/src/downstream.cpp
+++ b/src/push.cpp
@@ -19,58 +19,58 @@
#include "../include/zmq.h"
-#include "downstream.hpp"
+#include "push.hpp"
#include "err.hpp"
#include "pipe.hpp"
-zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
+zmq::push_t::push_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
options.requires_in = false;
options.requires_out = true;
}
-zmq::downstream_t::~downstream_t ()
+zmq::push_t::~push_t ()
{
}
-void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
+void zmq::push_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_ && outpipe_);
lb.attach (outpipe_);
}
-void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_)
+void zmq::push_t::xdetach_inpipe (class reader_t *pipe_)
{
// There are no inpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
-void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_)
+void zmq::push_t::xdetach_outpipe (class writer_t *pipe_)
{
zmq_assert (pipe_);
lb.detach (pipe_);
}
-void zmq::downstream_t::xkill (class reader_t *pipe_)
+void zmq::push_t::xkill (class reader_t *pipe_)
{
// There are no inpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
-void zmq::downstream_t::xrevive (class reader_t *pipe_)
+void zmq::push_t::xrevive (class reader_t *pipe_)
{
// There are no inpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
-void zmq::downstream_t::xrevive (class writer_t *pipe_)
+void zmq::push_t::xrevive (class writer_t *pipe_)
{
lb.revive (pipe_);
}
-int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,
+int zmq::push_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
// No special option for this socket type.
@@ -78,23 +78,23 @@ int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,
return -1;
}
-int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_)
+int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_)
{
return lb.send (msg_, flags_);
}
-int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_)
+int zmq::push_t::xrecv (zmq_msg_t *msg_, int flags_)
{
errno = ENOTSUP;
return -1;
}
-bool zmq::downstream_t::xhas_in ()
+bool zmq::push_t::xhas_in ()
{
return false;
}
-bool zmq::downstream_t::xhas_out ()
+bool zmq::push_t::xhas_out ()
{
return lb.has_out ();
}
diff --git a/src/downstream.hpp b/src/push.hpp
index 1306743..b3c8d87 100644
--- a/src/downstream.hpp
+++ b/src/push.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__
-#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__
+#ifndef __ZMQ_PUSH_HPP_INCLUDED__
+#define __ZMQ_PUSH_HPP_INCLUDED__
#include "socket_base.hpp"
#include "lb.hpp"
@@ -26,12 +26,12 @@
namespace zmq
{
- class downstream_t : public socket_base_t
+ class push_t : public socket_base_t
{
public:
- downstream_t (class app_thread_t *parent_);
- ~downstream_t ();
+ push_t (class app_thread_t *parent_);
+ ~push_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
@@ -52,8 +52,8 @@ namespace zmq
// Load balancer managing the outbound pipes.
lb_t lb;
- downstream_t (const downstream_t&);
- void operator = (const downstream_t&);
+ push_t (const push_t&);
+ void operator = (const push_t&);
};
}
diff --git a/src/queue.cpp b/src/queue.cpp
index 470ea67..36fab07 100644
--- a/src/queue.cpp
+++ b/src/queue.cpp
@@ -23,6 +23,7 @@
#include "queue.hpp"
#include "socket_base.hpp"
+#include "likely.hpp"
#include "err.hpp"
int zmq::queue (class socket_base_t *insocket_,
@@ -49,7 +50,11 @@ int zmq::queue (class socket_base_t *insocket_,
// Wait while there are either requests or replies to process.
rc = zmq_poll (&items [0], 2, -1);
- errno_assert (rc > 0);
+ if (unlikely (rc < 0)) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
// The algorithm below asumes ratio of request and replies processed
// under full load to be 1:1. Although processing requests replies
@@ -61,14 +66,26 @@ int zmq::queue (class socket_base_t *insocket_,
while (true) {
rc = insocket_->recv (&msg, 0);
- errno_assert (rc == 0);
+ if (unlikely (rc < 0)) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
moresz = sizeof (more);
rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
- errno_assert (rc == 0);
+ if (unlikely (rc < 0)) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
- errno_assert (rc == 0);
+ if (unlikely (rc < 0)) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
if (!more)
break;
@@ -80,14 +97,26 @@ int zmq::queue (class socket_base_t *insocket_,
while (true) {
rc = outsocket_->recv (&msg, 0);
- errno_assert (rc == 0);
+ if (unlikely (rc < 0)) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
moresz = sizeof (more);
rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
- errno_assert (rc == 0);
+ if (unlikely (rc < 0)) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
rc = insocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
- errno_assert (rc == 0);
+ if (unlikely (rc < 0)) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
if (!more)
break;
diff --git a/src/select.cpp b/src/select.cpp
index be5cd47..59eb83e 100644
--- a/src/select.cpp
+++ b/src/select.cpp
@@ -65,6 +65,10 @@ zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
fd_entry_t entry = {fd_, events_};
fds.push_back (entry);
+ // Ensure we do not attempt to select () on more than FD_SETSIZE
+ // file descriptors.
+ zmq_assert (fds.size () <= FD_SETSIZE);
+
// Start polling on errors.
FD_SET (fd_, &source_set_err);
@@ -217,10 +221,13 @@ void zmq::select_t::loop ()
// Destroy retired event sources.
if (retired) {
- for (fd_set_t::size_type i = 0; i < fds.size (); i ++) {
- if (fds [i].fd == retired_fd) {
- fds.erase (fds.begin () + i);
- i --;
+ fd_set_t::iterator it = fds.begin();
+ while (it != fds.end()) {
+ if (it->fd == retired_fd) {
+ it = fds.erase(it);
+ }
+ else {
+ it++;
}
}
retired = false;
diff --git a/src/session.cpp b/src/session.cpp
index 3cd27fb..f798877 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -265,7 +265,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
writer_t *socket_writer = NULL;
if (options.requires_in && !out_pipe) {
- pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm);
+ pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm, options.swap);
zmq_assert (pipe);
out_pipe = &pipe->writer;
out_pipe->set_endpoint (this);
@@ -273,7 +273,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
}
if (options.requires_out && !in_pipe) {
- pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm);
+ pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm, options.swap);
zmq_assert (pipe);
in_pipe = &pipe->reader;
in_pipe->set_endpoint (this);
diff --git a/src/signaler.cpp b/src/signaler.cpp
index 592688b..d4a9214 100644
--- a/src/signaler.cpp
+++ b/src/signaler.cpp
@@ -176,12 +176,15 @@ zmq::signaler_t::~signaler_t ()
void zmq::signaler_t::send (const command_t &cmd_)
{
- ssize_t nbytes = send (w, &cmd_, sizeof (command_t), 0);
+ ssize_t nbytes;
+ do {
+ nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
+ } while (nbytes == -1 && errno == EINTR);
errno_assert (nbytes != -1);
zmq_assert (nbytes == sizeof (command_t));
}
-bool zmq::signaler_t::recv (command_t &cmd_, bool block_)
+bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
{
if (block_) {
@@ -194,7 +197,10 @@ bool zmq::signaler_t::recv (command_t &cmd_, bool block_)
}
bool result;
- ssize_t nbytes = recv (r, buffer, sizeof (command_t), 0);
+ ssize_t nbytes;
+ do {
+ nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0);
+ } while (nbytes == -1 && errno == EINTR);
if (nbytes == -1 && errno == EAGAIN) {
result = false;
}
@@ -207,7 +213,7 @@ bool zmq::signaler_t::recv (command_t &cmd_, bool block_)
result = true;
}
- if (block_)
+ if (block_) {
// Set the reader to non-blocking mode.
int flags = fcntl (r, F_GETFL, 0);
@@ -249,7 +255,10 @@ void zmq::signaler_t::send (const command_t &cmd_)
{
// TODO: Note that send is a blocking operation.
// How should we behave if the command cannot be written to the signaler?
- ssize_t nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
+ ssize_t nbytes;
+ do {
+ nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
+ } while (nbytes == -1 && errno == EINTR);
errno_assert (nbytes != -1);
// This should never happen as we've already checked that command size is
@@ -259,8 +268,11 @@ void zmq::signaler_t::send (const command_t &cmd_)
bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
{
- ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t),
- block_ ? 0 : MSG_DONTWAIT);
+ ssize_t nbytes;
+ do {
+ nbytes = ::recv (r, cmd_, sizeof (command_t),
+ block_ ? 0 : MSG_DONTWAIT);
+ } while (nbytes == -1 && errno == EINTR);
// If there's no signal available return false.
if (nbytes == -1 && errno == EAGAIN)
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index eddb297..c933954 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -39,6 +39,7 @@
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
#include "likely.hpp"
+#include "uuid.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
@@ -194,13 +195,13 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
if (options.requires_in) {
- in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm);
+ in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm, options.swap);
zmq_assert (in_pipe);
}
// Create outbound pipe, if required.
if (options.requires_out) {
- out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm);
+ out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm, options.swap);
zmq_assert (out_pipe);
}
@@ -233,14 +234,14 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create inbound pipe, if required.
if (options.requires_in) {
- in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm);
+ in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm, options.swap);
zmq_assert (in_pipe);
}
// Create outbound pipe, if required.
if (options.requires_out) {
- out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm);
+ out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm, options.swap);
zmq_assert (out_pipe);
}
@@ -424,7 +425,14 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
return -1;
}
ticks = 0;
- return xrecv (msg_, flags_);
+
+ rc = xrecv (msg_, flags_);
+ if (rc == 0) {
+ rcvmore = msg_->flags & ZMQ_MSG_MORE;
+ if (rcvmore)
+ msg_->flags &= ~ZMQ_MSG_MORE;
+ }
+ return rc;
}
// In blocking scenario, commands are processed over and over again until
@@ -621,7 +629,16 @@ void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
inpipe_->set_endpoint (this);
if (outpipe_)
outpipe_->set_endpoint (this);
- xattach_pipes (inpipe_, outpipe_, peer_identity_);
+
+ // If the peer haven't specified it's identity, let's generate one.
+ if (peer_identity_.size ()) {
+ xattach_pipes (inpipe_, outpipe_, peer_identity_);
+ }
+ else {
+ blob_t identity (1, 0);
+ identity.append (uuid_t ().to_blob (), uuid_t::uuid_blob_len);
+ xattach_pipes (inpipe_, outpipe_, identity);
+ }
}
void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_)
diff --git a/src/streamer.cpp b/src/streamer.cpp
index 796771b..7c03365 100644
--- a/src/streamer.cpp
+++ b/src/streamer.cpp
@@ -21,6 +21,7 @@
#include "streamer.hpp"
#include "socket_base.hpp"
+#include "likely.hpp"
#include "err.hpp"
int zmq::streamer (socket_base_t *insocket_, socket_base_t *outsocket_)
@@ -29,9 +30,30 @@ int zmq::streamer (socket_base_t *insocket_, socket_base_t *outsocket_)
int rc = zmq_msg_init (&msg);
errno_assert (rc == 0);
+ int64_t more;
+ size_t more_sz = sizeof (more);
+
while (true) {
- insocket_->recv (&msg, 0);
- outsocket_->send (&msg, 0);
+ rc = insocket_->recv (&msg, 0);
+ if (unlikely (rc < 0)) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
+
+ rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz);
+ if (unlikely (rc < 0)) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
+
+ rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
+ if (unlikely (rc < 0)) {
+ if (errno == ETERM)
+ return -1;
+ errno_assert (false);
+ }
}
return 0;
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index 17c0257..dee71be 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -117,10 +117,10 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()
// Assert that the error was caused by the networking problems
// rather than 0MQ bug.
- zmq_assert (err == WSAECONNREFUSED || err == WSAETIMEDOUT ||
- err == WSAECONNABORTED);
-
errno = err;
+ errno_assert (errno == WSAECONNREFUSED || errno == WSAETIMEDOUT ||
+ errno == WSAECONNABORTED || errno == WSAEHOSTUNREACH);
+
return retired_fd;
}
@@ -291,11 +291,12 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()
err = errno;
if (err != 0) {
- // Assert that the error was caused by the networking problems
- // rather than 0MQ bug.
- zmq_assert (err == ECONNREFUSED || err == ETIMEDOUT);
-
+ // Assert if the error was caused by 0MQ bug.
+ // Networking problems are OK. No need to assert.
errno = err;
+ errno_assert (errno == ECONNREFUSED || errno == ECONNRESET ||
+ errno == ETIMEDOUT || errno == EHOSTUNREACH);
+
return retired_fd;
}
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 0cb9a6e..a62bc04 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -64,7 +64,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_)
// Allow reusing of the address.
int flag = 1;
- rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR,
+ rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
(const char*) &flag, sizeof (int));
wsa_assert (rc != SOCKET_ERROR);
diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp
index cc426d7..c83bba6 100644
--- a/src/tcp_socket.cpp
+++ b/src/tcp_socket.cpp
@@ -210,7 +210,8 @@ int zmq::tcp_socket_t::read (void *data, int size)
return 0;
// Signalise peer failure.
- if (nbytes == -1 && (errno == ECONNRESET || errno == ECONNREFUSED))
+ if (nbytes == -1 && (errno == ECONNRESET || errno == ECONNREFUSED ||
+ errno == ETIMEDOUT || errno == EHOSTUNREACH))
return -1;
errno_assert (nbytes != -1);
diff --git a/src/uuid.cpp b/src/uuid.cpp
index 406bbb4..f1dddf0 100644
--- a/src/uuid.cpp
+++ b/src/uuid.cpp
@@ -190,8 +190,10 @@ unsigned char zmq::uuid_t::convert_byte (const char *hexa_)
byte = *hexa_ - 'A' + 10;
else if (*hexa_ >= 'a' && *hexa_ <= 'f')
byte = *hexa_ - 'a' + 10;
- else
+ else {
zmq_assert (false);
+ byte = 0;
+ }
byte *= 16;
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 4e8d18a..5fd6cbb 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -26,6 +26,7 @@
zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
socket_base_t (parent_),
current_in (0),
+ prefetched (false),
more_in (false),
current_out (NULL),
more_out (false)
@@ -142,8 +143,11 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
if (!more_out) {
zmq_assert (!current_out);
- // There's no such thing as prefix with no subsequent message.
- zmq_assert (msg_->flags & ZMQ_MSG_MORE);
+ // If we have malformed message (prefix with no subsequent message)
+ // then just silently drop the message.
+ if ((msg_->flags & ZMQ_MSG_MORE) == 0)
+ return 0;
+
more_out = true;
// Find the pipe associated with the identity stored in the prefix.
@@ -153,7 +157,7 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
outpipes_t::iterator it = outpipes.find (identity);
if (it == outpipes.end ())
return 0;
-
+
// Remember the outgoing pipe.
current_out = it->second.writer;
@@ -189,6 +193,13 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
// Deallocate old content of the message.
zmq_msg_close (msg_);
+ if (prefetched) {
+ zmq_msg_move (msg_, &prefetched_msg);
+ more_in = msg_->flags & ZMQ_MSG_MORE;
+ prefetched = false;
+ return 0;
+ }
+
// If we are in the middle of reading a message, just grab next part of it.
if (more_in) {
zmq_assert (inpipes [current_in].active);
@@ -207,21 +218,17 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
for (int count = inpipes.size (); count != 0; count--) {
// Try to fetch new message.
- bool fetched;
- if (!inpipes [current_in].active)
- fetched = false;
- else
- fetched = inpipes [current_in].reader->check_read ();
+ if (inpipes [current_in].active)
+ prefetched = inpipes [current_in].reader->read (&prefetched_msg);
// If we have a message, create a prefix and return it to the caller.
- if (fetched) {
+ if (prefetched) {
int rc = zmq_msg_init_size (msg_,
inpipes [current_in].identity.size ());
zmq_assert (rc == 0);
memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (),
zmq_msg_size (msg_));
msg_->flags = ZMQ_MSG_MORE;
- more_in = true;
return 0;
}
@@ -241,7 +248,7 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
bool zmq::xrep_t::xhas_in ()
{
// There are subsequent parts of the partly-read message available.
- if (more_in)
+ if (prefetched || more_in)
return true;
// Note that messing with current doesn't break the fairness of fair
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 940d288..da1b3d8 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -67,6 +67,12 @@ namespace zmq
// The pipe we are currently reading from.
inpipes_t::size_type current_in;
+ // Have we prefetched a message.
+ bool prefetched;
+
+ // Holds the prefetched message.
+ zmq_msg_t prefetched_msg;
+
// If true, more incoming message parts are expected.
bool more_in;
diff --git a/src/xreq.cpp b/src/xreq.cpp
index ab90f68..66e5cc3 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -23,8 +23,7 @@
#include "err.hpp"
zmq::xreq_t::xreq_t (class app_thread_t *parent_) :
- socket_base_t (parent_),
- dropping (false)
+ socket_base_t (parent_)
{
options.requires_in = true;
options.requires_out = true;
@@ -78,25 +77,7 @@ int zmq::xreq_t::xsetsockopt (int option_, const void *optval_,
int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
{
- while (true) {
-
- // If we are ignoring the current message, just drop it and return.
- if (dropping) {
- if (!(msg_->flags & ZMQ_MSG_MORE))
- dropping = false;
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
- return 0;
- }
-
- int rc = lb.send (msg_, flags_);
- if (rc != 0 && errno == EAGAIN)
- dropping = true;
- else
- return rc;
- }
+ return lb.send (msg_, flags_);
}
int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_)
@@ -111,8 +92,6 @@ bool zmq::xreq_t::xhas_in ()
bool zmq::xreq_t::xhas_out ()
{
- // Socket is always ready for writing. When the queue is full, message
- // will be silently dropped.
- return true;
+ return lb.has_out ();
}
diff --git a/src/xreq.hpp b/src/xreq.hpp
index 25a97f1..8ee0bb9 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -55,9 +55,6 @@ namespace zmq
fq_t fq;
lb_t lb;
- // If true, curently sent message is being dropped.
- bool dropping;
-
xreq_t (const xreq_t&);
void operator = (const xreq_t&);
};
diff --git a/src/yarray_item.hpp b/src/yarray_item.hpp
index b6d89cc..db24dda 100644
--- a/src/yarray_item.hpp
+++ b/src/yarray_item.hpp
@@ -35,7 +35,9 @@ namespace zmq
{
}
- inline ~yarray_item_t ()
+ // The destructor doesn't have to be virtual. It is mad virtual
+ // just to keep ICC and code checking tools from complaining.
+ inline virtual ~yarray_item_t ()
{
}
diff --git a/src/ypipe.hpp b/src/ypipe.hpp
index df5b3d0..26f021c 100644
--- a/src/ypipe.hpp
+++ b/src/ypipe.hpp
@@ -50,6 +50,12 @@ namespace zmq
c.set (&queue.back ());
}
+ // The destructor doesn't have to be virtual. It is mad virtual
+ // just to keep ICC and code checking tools from complaining.
+ inline virtual ~ypipe_t ()
+ {
+ }
+
// Following function (write) deliberately copies uninitialised data
// when used with zmq_msg. Initialising the VSM body for
// non-VSM messages won't be good for performance.
@@ -156,6 +162,17 @@ namespace zmq
return true;
}
+ // Applies the function fn to the first elemenent in the pipe
+ // and returns the value returned by the fn.
+ // The pipe mustn't be empty or the function crashes.
+ inline bool probe (bool (*fn)(T &))
+ {
+ bool rc = check_read ();
+ zmq_assert (rc);
+
+ return (*fn) (queue.front ());
+ }
+
protected:
// Allocation-efficient queue to store pipe items.
diff --git a/src/zmq.cpp b/src/zmq.cpp
index c8f419a..f3ccaac 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -18,6 +18,7 @@
*/
#include "../include/zmq.h"
+#include "../include/zmq_utils.h"
#include <string.h>
#include <errno.h>
@@ -271,6 +272,11 @@ void *zmq_init (int io_threads_)
int zmq_term (void *ctx_)
{
+ if (!ctx_) {
+ errno = EFAULT;
+ return -1;
+ }
+
int rc = ((zmq::ctx_t*) ctx_)->term ();
int en = errno;
@@ -286,11 +292,19 @@ int zmq_term (void *ctx_)
void *zmq_socket (void *ctx_, int type_)
{
+ if (!ctx_) {
+ errno = EFAULT;
+ return NULL;
+ }
return (void*) (((zmq::ctx_t*) ctx_)->create_socket (type_));
}
int zmq_close (void *s_)
{
+ if (!s_) {
+ errno = EFAULT;
+ return -1;
+ }
((zmq::socket_base_t*) s_)->close ();
return 0;
}
@@ -298,33 +312,57 @@ int zmq_close (void *s_)
int zmq_setsockopt (void *s_, int option_, const void *optval_,
size_t optvallen_)
{
+ if (!s_) {
+ errno = EFAULT;
+ return -1;
+ }
return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_,
optvallen_));
}
int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
{
+ if (!s_) {
+ errno = EFAULT;
+ return -1;
+ }
return (((zmq::socket_base_t*) s_)->getsockopt (option_, optval_,
optvallen_));
}
int zmq_bind (void *s_, const char *addr_)
{
+ if (!s_) {
+ errno = EFAULT;
+ return -1;
+ }
return (((zmq::socket_base_t*) s_)->bind (addr_));
}
int zmq_connect (void *s_, const char *addr_)
{
+ if (!s_) {
+ errno = EFAULT;
+ return -1;
+ }
return (((zmq::socket_base_t*) s_)->connect (addr_));
}
int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)
{
+ if (!s_) {
+ errno = EFAULT;
+ return -1;
+ }
return (((zmq::socket_base_t*) s_)->send (msg_, flags_));
}
int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
{
+ if (!s_) {
+ errno = EFAULT;
+ return -1;
+ }
return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
}
@@ -336,6 +374,10 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
defined ZMQ_HAVE_NETBSD
+ if (!items_) {
+ errno = EFAULT;
+ return -1;
+ }
pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
zmq_assert (pollfds);
int npollfds = 0;
@@ -489,6 +531,10 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
zmq::fd_t maxfd = zmq::retired_fd;
zmq::fd_t notify_fd = zmq::retired_fd;
+ // Ensure we do not attempt to select () on more than FD_SETSIZE
+ // file descriptors.
+ zmq_assert (nitems_ <= FD_SETSIZE);
+
for (int i = 0; i != nitems_; i++) {
// 0MQ sockets.
@@ -647,6 +693,10 @@ int zmq_errno ()
int zmq_device (int device_, void *insocket_, void *outsocket_)
{
+ if (!insocket_ || !outsocket_) {
+ errno = EFAULT;
+ return -1;
+ }
switch (device_) {
case ZMQ_FORWARDER:
return zmq::forwarder ((zmq::socket_base_t*) insocket_,
@@ -661,3 +711,66 @@ int zmq_device (int device_, void *insocket_, void *outsocket_)
return EINVAL;
}
}
+
+////////////////////////////////////////////////////////////////////////////////
+// 0MQ utils - to be used by perf tests
+////////////////////////////////////////////////////////////////////////////////
+
+#if defined ZMQ_HAVE_WINDOWS
+
+static uint64_t now ()
+{
+ // Get the high resolution counter's accuracy.
+ LARGE_INTEGER ticksPerSecond;
+ QueryPerformanceFrequency (&ticksPerSecond);
+
+ // What time is it?
+ LARGE_INTEGER tick;
+ QueryPerformanceCounter (&tick);
+
+ // Convert the tick number into the number of seconds
+ // since the system was started.
+ double ticks_div = (double) (ticksPerSecond.QuadPart / 1000000);
+ return (uint64_t) (tick.QuadPart / ticks_div);
+}
+
+void zmq_sleep (int seconds_)
+{
+ Sleep (seconds_ * 1000);
+}
+
+#else
+
+static uint64_t now ()
+{
+ struct timeval tv;
+ int rc;
+
+ rc = gettimeofday (&tv, NULL);
+ assert (rc == 0);
+ return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec);
+}
+
+void zmq_sleep (int seconds_)
+{
+ sleep (seconds_);
+}
+
+#endif
+
+void *zmq_stopwatch_start ()
+{
+ uint64_t *watch = (uint64_t*) malloc (sizeof (uint64_t));
+ assert (watch);
+ *watch = now ();
+ return (void*) watch;
+}
+
+unsigned long zmq_stopwatch_stop (void *watch_)
+{
+ uint64_t end = now ();
+ uint64_t start = *(uint64_t*) watch_;
+ free (watch_);
+ return (unsigned long) (end - start);
+}
+
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
index 8e335c9..dcf8e76 100644
--- a/src/zmq_decoder.cpp
+++ b/src/zmq_decoder.cpp
@@ -56,6 +56,9 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
// TODO: Handle over-sized message decently.
+ // There has to be at least one byte (the flags) in the message).
+ zmq_assert (*tmpbuf > 0);
+
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
@@ -74,6 +77,10 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
// TODO: Handle over-sized message decently.
+ // There has to be at least one byte (the flags) in the message).
+ zmq_assert (size > 0);
+
+
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...