From 5ba1cb20fe6f6699cef1cc726718e760cd4c9af1 Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Mon, 23 Jan 2012 08:53:25 +0100 Subject: Imported Upstream version 2.0.9.dfsg --- src/Makefile.am | 12 +- src/Makefile.in | 59 ++++++---- src/app_thread.cpp | 12 +- src/ctx.cpp | 1 + src/decoder.hpp | 4 +- src/downstream.cpp | 101 ----------------- src/downstream.hpp | 61 ---------- src/encoder.hpp | 4 +- src/forwarder.cpp | 26 ++++- src/i_poll_events.hpp | 2 +- src/ip.cpp | 7 +- src/msg_store.cpp | 307 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/msg_store.hpp | 114 +++++++++++++++++++ src/pair.cpp | 3 + src/pipe.cpp | 146 ++++++++++++++++++------ src/pipe.hpp | 32 ++++-- src/platform.hpp.in | 3 + src/poll.cpp | 23 ++-- src/prefix_tree.cpp | 12 +- src/prefix_tree.hpp | 2 +- src/pull.cpp | 98 ++++++++++++++++ src/pull.hpp | 62 ++++++++++ src/push.cpp | 101 +++++++++++++++++ src/push.hpp | 61 ++++++++++ src/queue.cpp | 43 +++++-- src/select.cpp | 15 ++- src/session.cpp | 4 +- src/signaler.cpp | 26 +++-- src/socket_base.cpp | 29 ++++- src/streamer.cpp | 26 ++++- src/tcp_connecter.cpp | 15 +-- src/tcp_listener.cpp | 2 +- src/tcp_socket.cpp | 3 +- src/upstream.cpp | 98 ---------------- src/upstream.hpp | 62 ---------- src/uuid.cpp | 4 +- src/xrep.cpp | 29 +++-- src/xrep.hpp | 6 + src/xreq.cpp | 27 +---- src/xreq.hpp | 3 - src/yarray_item.hpp | 4 +- src/ypipe.hpp | 17 +++ src/zmq.cpp | 113 +++++++++++++++++++ src/zmq_decoder.cpp | 7 ++ 44 files changed, 1282 insertions(+), 504 deletions(-) delete mode 100644 src/downstream.cpp delete mode 100644 src/downstream.hpp create mode 100644 src/msg_store.cpp create mode 100644 src/msg_store.hpp create mode 100644 src/pull.cpp create mode 100644 src/pull.hpp create mode 100644 src/push.cpp create mode 100644 src/push.hpp delete mode 100644 src/upstream.cpp delete mode 100644 src/upstream.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index fa97ca3..19a80d0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -3,7 +3,7 @@ lib_LTLIBRARIES = libzmq.la pkgconfigdir = $(libdir)/pkgconfig pkgconfig_DATA = libzmq.pc -include_HEADERS = ../include/zmq.h ../include/zmq.hpp +include_HEADERS = ../include/zmq.h ../include/zmq.hpp ../include/zmq_utils.h if BUILD_PGM pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \ @@ -58,7 +58,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ctx.hpp \ decoder.hpp \ devpoll.hpp \ - downstream.hpp \ + push.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ @@ -76,6 +76,7 @@ libzmq_la_SOURCES = app_thread.hpp \ lb.hpp \ likely.hpp \ msg_content.hpp \ + msg_store.hpp \ mutex.hpp \ object.hpp \ options.hpp \ @@ -104,7 +105,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ - upstream.hpp \ + pull.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ @@ -124,7 +125,7 @@ libzmq_la_SOURCES = app_thread.hpp \ command.cpp \ ctx.cpp \ devpoll.cpp \ - downstream.cpp \ + push.cpp \ epoll.cpp \ err.cpp \ forwarder.cpp \ @@ -134,6 +135,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ip.cpp \ kqueue.cpp \ lb.cpp \ + msg_store.cpp \ object.cpp \ options.cpp \ owned.cpp \ @@ -158,7 +160,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ - upstream.cpp \ + pull.cpp \ uuid.cpp \ xrep.cpp \ xreq.cpp \ diff --git a/src/Makefile.in b/src/Makefile.in index bc3f00f..1bb65a4 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -61,11 +61,12 @@ libLTLIBRARIES_INSTALL = $(INSTALL) LTLIBRARIES = $(lib_LTLIBRARIES) libzmq_la_LIBADD = am_libzmq_la_OBJECTS = libzmq_la-app_thread.lo libzmq_la-command.lo \ - libzmq_la-ctx.lo libzmq_la-devpoll.lo libzmq_la-downstream.lo \ + libzmq_la-ctx.lo libzmq_la-devpoll.lo libzmq_la-push.lo \ libzmq_la-epoll.lo libzmq_la-err.lo libzmq_la-forwarder.lo \ libzmq_la-fq.lo libzmq_la-io_object.lo libzmq_la-io_thread.lo \ libzmq_la-ip.lo libzmq_la-kqueue.lo libzmq_la-lb.lo \ - libzmq_la-object.lo libzmq_la-options.lo libzmq_la-owned.lo \ + libzmq_la-msg_store.lo libzmq_la-object.lo \ + libzmq_la-options.lo libzmq_la-owned.lo \ libzmq_la-pgm_receiver.lo libzmq_la-pgm_sender.lo \ libzmq_la-pgm_socket.lo libzmq_la-pair.lo \ libzmq_la-prefix_tree.lo libzmq_la-pipe.lo libzmq_la-poll.lo \ @@ -74,9 +75,9 @@ am_libzmq_la_OBJECTS = libzmq_la-app_thread.lo libzmq_la-command.lo \ libzmq_la-signaler.lo libzmq_la-socket_base.lo \ libzmq_la-streamer.lo libzmq_la-sub.lo \ libzmq_la-tcp_connecter.lo libzmq_la-tcp_listener.lo \ - libzmq_la-tcp_socket.lo libzmq_la-thread.lo \ - libzmq_la-upstream.lo libzmq_la-uuid.lo libzmq_la-xrep.lo \ - libzmq_la-xreq.lo libzmq_la-zmq.lo libzmq_la-zmq_connecter.lo \ + libzmq_la-tcp_socket.lo libzmq_la-thread.lo libzmq_la-pull.lo \ + libzmq_la-uuid.lo libzmq_la-xrep.lo libzmq_la-xreq.lo \ + libzmq_la-zmq.lo libzmq_la-zmq_connecter.lo \ libzmq_la-zmq_decoder.lo libzmq_la-zmq_encoder.lo \ libzmq_la-zmq_engine.lo libzmq_la-zmq_init.lo \ libzmq_la-zmq_listener.lo @@ -271,7 +272,7 @@ top_srcdir = @top_srcdir@ lib_LTLIBRARIES = libzmq.la pkgconfigdir = $(libdir)/pkgconfig pkgconfig_DATA = libzmq.pc -include_HEADERS = ../include/zmq.h ../include/zmq.hpp +include_HEADERS = ../include/zmq.h ../include/zmq.hpp ../include/zmq_utils.h @BUILD_PGM_TRUE@pgm_sources = ../foreign/openpgm/@pgm_basename@/openpgm/pgm/packet.c \ @BUILD_PGM_TRUE@ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/time.c \ @BUILD_PGM_TRUE@ ../foreign/openpgm/@pgm_basename@/openpgm/pgm/if.c \ @@ -316,7 +317,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ctx.hpp \ decoder.hpp \ devpoll.hpp \ - downstream.hpp \ + push.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ @@ -334,6 +335,7 @@ libzmq_la_SOURCES = app_thread.hpp \ lb.hpp \ likely.hpp \ msg_content.hpp \ + msg_store.hpp \ mutex.hpp \ object.hpp \ options.hpp \ @@ -362,7 +364,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ - upstream.hpp \ + pull.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ @@ -382,7 +384,7 @@ libzmq_la_SOURCES = app_thread.hpp \ command.cpp \ ctx.cpp \ devpoll.cpp \ - downstream.cpp \ + push.cpp \ epoll.cpp \ err.cpp \ forwarder.cpp \ @@ -392,6 +394,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ip.cpp \ kqueue.cpp \ lb.cpp \ + msg_store.cpp \ object.cpp \ options.cpp \ owned.cpp \ @@ -416,7 +419,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ - upstream.cpp \ + pull.cpp \ uuid.cpp \ xrep.cpp \ xreq.cpp \ @@ -497,9 +500,9 @@ $(srcdir)/Makefile.in: $(srcdir)/Makefile.am $(am__configure_deps) exit 1;; \ esac; \ done; \ - echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu src/Makefile'; \ + echo ' cd $(top_srcdir) && $(AUTOMAKE) --foreign src/Makefile'; \ cd $(top_srcdir) && \ - $(AUTOMAKE) --gnu src/Makefile + $(AUTOMAKE) --foreign src/Makefile .PRECIOUS: Makefile Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status @case '$?' in \ @@ -579,7 +582,6 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-command.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-ctx.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-devpoll.Plo@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-downstream.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-epoll.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-err.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-forwarder.Plo@am__quote@ @@ -600,6 +602,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-lb.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-log.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-md5.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-msg_store.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-nametoindex.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-net.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-object.Plo@am__quote@ @@ -615,6 +618,8 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-poll.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-prefix_tree.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-pub.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-pull.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-push.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-queue.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-rate_control.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-receiver.Plo@am__quote@ @@ -641,7 +646,6 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-transport.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-tsi.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-txwi.Plo@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-upstream.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-uuid.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-version.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/libzmq_la-wsastrerror.Plo@am__quote@ @@ -956,12 +960,12 @@ libzmq_la-devpoll.lo: devpoll.cpp @AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-devpoll.lo `test -f 'devpoll.cpp' || echo '$(srcdir)/'`devpoll.cpp -libzmq_la-downstream.lo: downstream.cpp -@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-downstream.lo -MD -MP -MF $(DEPDIR)/libzmq_la-downstream.Tpo -c -o libzmq_la-downstream.lo `test -f 'downstream.cpp' || echo '$(srcdir)/'`downstream.cpp -@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-downstream.Tpo $(DEPDIR)/libzmq_la-downstream.Plo -@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='downstream.cpp' object='libzmq_la-downstream.lo' libtool=yes @AMDEPBACKSLASH@ +libzmq_la-push.lo: push.cpp +@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-push.lo -MD -MP -MF $(DEPDIR)/libzmq_la-push.Tpo -c -o libzmq_la-push.lo `test -f 'push.cpp' || echo '$(srcdir)/'`push.cpp +@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-push.Tpo $(DEPDIR)/libzmq_la-push.Plo +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='push.cpp' object='libzmq_la-push.lo' libtool=yes @AMDEPBACKSLASH@ @AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ -@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-downstream.lo `test -f 'downstream.cpp' || echo '$(srcdir)/'`downstream.cpp +@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-push.lo `test -f 'push.cpp' || echo '$(srcdir)/'`push.cpp libzmq_la-epoll.lo: epoll.cpp @am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-epoll.lo -MD -MP -MF $(DEPDIR)/libzmq_la-epoll.Tpo -c -o libzmq_la-epoll.lo `test -f 'epoll.cpp' || echo '$(srcdir)/'`epoll.cpp @@ -1026,6 +1030,13 @@ libzmq_la-lb.lo: lb.cpp @AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-lb.lo `test -f 'lb.cpp' || echo '$(srcdir)/'`lb.cpp +libzmq_la-msg_store.lo: msg_store.cpp +@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-msg_store.lo -MD -MP -MF $(DEPDIR)/libzmq_la-msg_store.Tpo -c -o libzmq_la-msg_store.lo `test -f 'msg_store.cpp' || echo '$(srcdir)/'`msg_store.cpp +@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-msg_store.Tpo $(DEPDIR)/libzmq_la-msg_store.Plo +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='msg_store.cpp' object='libzmq_la-msg_store.lo' libtool=yes @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-msg_store.lo `test -f 'msg_store.cpp' || echo '$(srcdir)/'`msg_store.cpp + libzmq_la-object.lo: object.cpp @am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-object.lo -MD -MP -MF $(DEPDIR)/libzmq_la-object.Tpo -c -o libzmq_la-object.lo `test -f 'object.cpp' || echo '$(srcdir)/'`object.cpp @am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-object.Tpo $(DEPDIR)/libzmq_la-object.Plo @@ -1194,12 +1205,12 @@ libzmq_la-thread.lo: thread.cpp @AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-thread.lo `test -f 'thread.cpp' || echo '$(srcdir)/'`thread.cpp -libzmq_la-upstream.lo: upstream.cpp -@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-upstream.lo -MD -MP -MF $(DEPDIR)/libzmq_la-upstream.Tpo -c -o libzmq_la-upstream.lo `test -f 'upstream.cpp' || echo '$(srcdir)/'`upstream.cpp -@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-upstream.Tpo $(DEPDIR)/libzmq_la-upstream.Plo -@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='upstream.cpp' object='libzmq_la-upstream.lo' libtool=yes @AMDEPBACKSLASH@ +libzmq_la-pull.lo: pull.cpp +@am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-pull.lo -MD -MP -MF $(DEPDIR)/libzmq_la-pull.Tpo -c -o libzmq_la-pull.lo `test -f 'pull.cpp' || echo '$(srcdir)/'`pull.cpp +@am__fastdepCXX_TRUE@ mv -f $(DEPDIR)/libzmq_la-pull.Tpo $(DEPDIR)/libzmq_la-pull.Plo +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ source='pull.cpp' object='libzmq_la-pull.lo' libtool=yes @AMDEPBACKSLASH@ @AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ -@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-upstream.lo `test -f 'upstream.cpp' || echo '$(srcdir)/'`upstream.cpp +@am__fastdepCXX_FALSE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -c -o libzmq_la-pull.lo `test -f 'pull.cpp' || echo '$(srcdir)/'`pull.cpp libzmq_la-uuid.lo: uuid.cpp @am__fastdepCXX_TRUE@ $(LIBTOOL) --tag=CXX $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=compile $(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(libzmq_la_CXXFLAGS) $(CXXFLAGS) -MT libzmq_la-uuid.lo -MD -MP -MF $(DEPDIR)/libzmq_la-uuid.Tpo -c -o libzmq_la-uuid.lo `test -f 'uuid.cpp' || echo '$(srcdir)/'`uuid.cpp diff --git a/src/app_thread.cpp b/src/app_thread.cpp index fbf034c..ac59464 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -46,8 +46,8 @@ #include "rep.hpp" #include "xreq.hpp" #include "xrep.hpp" -#include "upstream.hpp" -#include "downstream.hpp" +#include "pull.hpp" +#include "push.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -157,11 +157,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) case ZMQ_XREP: s = new (std::nothrow) xrep_t (this); break; - case ZMQ_UPSTREAM: - s = new (std::nothrow) upstream_t (this); + case ZMQ_PULL: + s = new (std::nothrow) pull_t (this); break; - case ZMQ_DOWNSTREAM: - s = new (std::nothrow) downstream_t (this); + case ZMQ_PUSH: + s = new (std::nothrow) push_t (this); break; default: if (sockets.empty ()) diff --git a/src/ctx.cpp b/src/ctx.cpp index f0e177d..397f692 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -151,6 +151,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) // Create the new application thread proxy object. app_thread_info_t info; + memset (&info, 0, sizeof (info)); info.associated = false; info.app_thread = new (std::nothrow) app_thread_t (this, io_threads.size () + app_threads.size ()); diff --git a/src/decoder.hpp b/src/decoder.hpp index 1662bda..f05f651 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -55,7 +55,9 @@ namespace zmq zmq_assert (buf); } - inline ~decoder_t () + // The destructor doesn't have to be virtual. It is mad virtual + // just to keep ICC and code checking tools from complaining. + inline virtual ~decoder_t () { free (buf); } diff --git a/src/downstream.cpp b/src/downstream.cpp deleted file mode 100644 index 4074a9e..0000000 --- a/src/downstream.cpp +++ /dev/null @@ -1,101 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "downstream.hpp" -#include "err.hpp" -#include "pipe.hpp" - -zmq::downstream_t::downstream_t (class app_thread_t *parent_) : - socket_base_t (parent_) -{ - options.requires_in = false; - options.requires_out = true; -} - -zmq::downstream_t::~downstream_t () -{ -} - -void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) -{ - zmq_assert (!inpipe_ && outpipe_); - lb.attach (outpipe_); -} - -void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_) -{ - // There are no inpipes, so this function shouldn't be called at all. - zmq_assert (false); -} - -void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_) -{ - zmq_assert (pipe_); - lb.detach (pipe_); -} - -void zmq::downstream_t::xkill (class reader_t *pipe_) -{ - // There are no inpipes, so this function shouldn't be called at all. - zmq_assert (false); -} - -void zmq::downstream_t::xrevive (class reader_t *pipe_) -{ - // There are no inpipes, so this function shouldn't be called at all. - zmq_assert (false); -} - -void zmq::downstream_t::xrevive (class writer_t *pipe_) -{ - lb.revive (pipe_); -} - -int zmq::downstream_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - // No special option for this socket type. - errno = EINVAL; - return -1; -} - -int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_) -{ - return lb.send (msg_, flags_); -} - -int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_) -{ - errno = ENOTSUP; - return -1; -} - -bool zmq::downstream_t::xhas_in () -{ - return false; -} - -bool zmq::downstream_t::xhas_out () -{ - return lb.has_out (); -} - diff --git a/src/downstream.hpp b/src/downstream.hpp deleted file mode 100644 index 1306743..0000000 --- a/src/downstream.hpp +++ /dev/null @@ -1,61 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__ -#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__ - -#include "socket_base.hpp" -#include "lb.hpp" - -namespace zmq -{ - - class downstream_t : public socket_base_t - { - public: - - downstream_t (class app_thread_t *parent_); - ~downstream_t (); - - // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, - const blob_t &peer_identity_); - void xdetach_inpipe (class reader_t *pipe_); - void xdetach_outpipe (class writer_t *pipe_); - void xkill (class reader_t *pipe_); - void xrevive (class reader_t *pipe_); - void xrevive (class writer_t *pipe_); - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); - int xsend (zmq_msg_t *msg_, int flags_); - int xrecv (zmq_msg_t *msg_, int flags_); - bool xhas_in (); - bool xhas_out (); - - private: - - // Load balancer managing the outbound pipes. - lb_t lb; - - downstream_t (const downstream_t&); - void operator = (const downstream_t&); - }; - -} - -#endif diff --git a/src/encoder.hpp b/src/encoder.hpp index 10fe912..0d5b6ba 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -50,7 +50,9 @@ namespace zmq zmq_assert (buf); } - inline ~encoder_t () + // The destructor doesn't have to be virtual. It is mad virtual + // just to keep ICC and code checking tools from complaining. + inline virtual ~encoder_t () { free (buf); } diff --git a/src/forwarder.cpp b/src/forwarder.cpp index 5aab8f2..d1f324e 100644 --- a/src/forwarder.cpp +++ b/src/forwarder.cpp @@ -21,6 +21,7 @@ #include "forwarder.hpp" #include "socket_base.hpp" +#include "likely.hpp" #include "err.hpp" int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_) @@ -29,9 +30,30 @@ int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_) int rc = zmq_msg_init (&msg); errno_assert (rc == 0); + int64_t more; + size_t more_sz = sizeof (more); + while (true) { - insocket_->recv (&msg, 0); - outsocket_->send (&msg, 0); + rc = insocket_->recv (&msg, 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } } return 0; diff --git a/src/i_poll_events.hpp b/src/i_poll_events.hpp index 8b85f7a..6d474b2 100644 --- a/src/i_poll_events.hpp +++ b/src/i_poll_events.hpp @@ -28,7 +28,7 @@ namespace zmq struct i_poll_events { - virtual ~i_poll_events () {}; + virtual ~i_poll_events () {} // Called by I/O thread when file descriptor is ready for reading. virtual void in_event () = 0; diff --git a/src/ip.cpp b/src/ip.cpp index 79d90da..f491008 100644 --- a/src/ip.cpp +++ b/src/ip.cpp @@ -289,11 +289,8 @@ int zmq::resolve_ip_hostname (sockaddr_storage *addr_, socklen_t *addr_len_, // doesn't really matter, since it's not included in the addr-output. req.ai_socktype = SOCK_STREAM; - // Avoid named services due to unclear socktype, and don't pick IPv4 - // addresses if we don't have a local IPv4 address configured. - // If this is failing for you on a host with only IPv6 connectivity, - // please contribute proper IPv6 support for all functions in this file. - req.ai_flags = AI_NUMERICSERV | AI_ADDRCONFIG; + // Avoid named services due to unclear socktype. + req.ai_flags = AI_NUMERICSERV; // Resolve host name. Some of the error info is lost in case of error, // however, there's no way to report EAI errors via errno. diff --git a/src/msg_store.cpp b/src/msg_store.cpp new file mode 100644 index 0000000..aaf6dbe --- /dev/null +++ b/src/msg_store.cpp @@ -0,0 +1,307 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "platform.hpp" + +#ifdef ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#include +#else +#include +#endif + +#include "../include/zmq.h" + +#include +#include +#include +#include +#include +#include + +#include "atomic_counter.hpp" +#include "msg_store.hpp" +#include "err.hpp" + +zmq::msg_store_t::msg_store_t (int64_t filesize_, size_t block_size_) : + fd (-1), + filesize (filesize_), + file_pos (0), + write_pos (0), + read_pos (0), + block_size (block_size_), + write_buf_start_addr (0) +{ + zmq_assert (filesize > 0); + zmq_assert (block_size > 0); + + buf1 = new (std::nothrow) char [block_size]; + zmq_assert (buf1); + + buf2 = new (std::nothrow) char [block_size]; + zmq_assert (buf2); + + read_buf = write_buf = buf1; +} + +zmq::msg_store_t::~msg_store_t () +{ + delete [] buf1; + delete [] buf2; + + if (fd == -1) + return; + +#ifdef ZMQ_HAVE_WINDOWS + int rc = _close (fd); +#else + int rc = close (fd); +#endif + errno_assert (rc == 0); + +#ifdef ZMQ_HAVE_WINDOWS + rc = _unlink (filename.c_str ()); +#else + rc = unlink (filename.c_str ()); +#endif + errno_assert (rc == 0); +} + +int zmq::msg_store_t::init () +{ + static zmq::atomic_counter_t seqnum (0); + + // Get process ID. +#ifdef ZMQ_HAVE_WINDOWS + int pid = GetCurrentThreadId (); +#else + pid_t pid = getpid (); +#endif + + std::ostringstream outs; + outs << "zmq_" << pid << '_' << seqnum.get () << ".swap"; + filename = outs.str (); + + seqnum.add (1); + + // Open the backing file. +#ifdef ZMQ_HAVE_WINDOWS + fd = _open (filename.c_str (), _O_RDWR | _O_CREAT, 0600); +#else + fd = open (filename.c_str (), O_RDWR | O_CREAT, 0600); +#endif + if (fd == -1) + return -1; + +#ifdef ZMQ_HAVE_LINUX + // Enable more aggresive read-ahead optimization. + posix_fadvise (fd, 0, filesize, POSIX_FADV_SEQUENTIAL); +#endif + return 0; +} + +bool zmq::msg_store_t::store (zmq_msg_t *msg_) +{ + size_t msg_size = zmq_msg_size (msg_); + + // Check buffer space availability. + // NOTE: We always keep one byte open. + if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size)) + return false; + + // Don't store the ZMQ_MSG_SHARED flag. + uint8_t msg_flags = msg_->flags & ~ZMQ_MSG_SHARED; + + // Write message length, flags, and message body. + copy_to_file (&msg_size, sizeof msg_size); + copy_to_file (&msg_flags, sizeof msg_flags); + copy_to_file (zmq_msg_data (msg_), msg_size); + + zmq_msg_close (msg_); + + return true; +} + +void zmq::msg_store_t::fetch (zmq_msg_t *msg_) +{ + // There must be at least one message available. + zmq_assert (read_pos != write_pos); + + // Retrieve the message size. + size_t msg_size; + copy_from_file (&msg_size, sizeof msg_size); + + // Initialize the message. + zmq_msg_init_size (msg_, msg_size); + + // Retrieve the message flags. + copy_from_file (&msg_->flags, sizeof msg_->flags); + + // Retrieve the message payload. + copy_from_file (zmq_msg_data (msg_), msg_size); +} + +void zmq::msg_store_t::commit () +{ + commit_pos = write_pos; +} + +void zmq::msg_store_t::rollback () +{ + if (commit_pos == write_pos || read_pos == write_pos) + return; + + if (write_pos > read_pos) + zmq_assert (read_pos <= commit_pos && commit_pos <= write_pos); + else + zmq_assert (read_pos <= commit_pos || commit_pos <= write_pos); + + if (commit_pos / block_size == read_pos / block_size) { + write_buf_start_addr = commit_pos % block_size; + write_buf = read_buf; + } + else if (commit_pos / block_size != write_pos / block_size) { + write_buf_start_addr = commit_pos % block_size; + fill_buf (write_buf, write_buf_start_addr); + } + write_pos = commit_pos; +} + +bool zmq::msg_store_t::empty () +{ + return read_pos == write_pos; +} + +bool zmq::msg_store_t::full () +{ + return buffer_space () == 1; +} + +void zmq::msg_store_t::copy_from_file (void *buffer_, size_t count_) +{ + char *dest_ptr = (char *) buffer_; + size_t chunk_size, remainder = count_; + + while (remainder > 0) { + chunk_size = std::min (remainder, + std::min ((size_t) (filesize - read_pos), + (size_t) (block_size - read_pos % block_size))); + + memcpy (dest_ptr, &read_buf [read_pos % block_size], chunk_size); + dest_ptr += chunk_size; + + read_pos = (read_pos + chunk_size) % filesize; + if (read_pos % block_size == 0) { + if (read_pos / block_size == write_pos / block_size) + read_buf = write_buf; + else + fill_buf (read_buf, read_pos); + } + remainder -= chunk_size; + } +} + +void zmq::msg_store_t::copy_to_file (const void *buffer_, size_t count_) +{ + char *source_ptr = (char *) buffer_; + size_t chunk_size, remainder = count_; + + while (remainder > 0) { + chunk_size = std::min (remainder, + std::min ((size_t) (filesize - write_pos), + (size_t) (block_size - write_pos % block_size))); + + memcpy (&write_buf [write_pos % block_size], source_ptr, chunk_size); + source_ptr += chunk_size; + + write_pos = (write_pos + chunk_size) % filesize; + if (write_pos % block_size == 0) { + save_write_buf (); + write_buf_start_addr = write_pos; + + if (write_buf == read_buf) { + if (read_buf == buf2) + write_buf = buf1; + else + write_buf = buf2; + } + } + remainder -= chunk_size; + } +} + +void zmq::msg_store_t::fill_buf (char *buf, int64_t pos) +{ + if (file_pos != pos) { +#ifdef ZMQ_HAVE_WINDOWS + __int64 offset = _lseeki64 (fd, pos, SEEK_SET); +#else + off_t offset = lseek (fd, (off_t) pos, SEEK_SET); +#endif + errno_assert (offset == pos); + file_pos = pos; + } + size_t octets_stored = 0; + size_t octets_total = std::min (block_size, (size_t) (filesize - file_pos)); + + while (octets_stored < octets_total) { +#ifdef ZMQ_HAVE_WINDOWS + int rc = _read (fd, &buf [octets_stored], octets_total - octets_stored); +#else + ssize_t rc = read (fd, &buf [octets_stored], octets_total - octets_stored); +#endif + errno_assert (rc > 0); + octets_stored += rc; + } + file_pos += octets_total; +} + +void zmq::msg_store_t::save_write_buf () +{ + if (file_pos != write_buf_start_addr) { +#ifdef ZMQ_HAVE_WINDOWS + __int64 offset = _lseeki64 (fd, write_buf_start_addr, SEEK_SET); +#else + off_t offset = lseek (fd, (off_t) write_buf_start_addr, SEEK_SET); +#endif + errno_assert (offset == write_buf_start_addr); + file_pos = write_buf_start_addr; + } + size_t octets_stored = 0; + size_t octets_total = std::min (block_size, (size_t) (filesize - file_pos)); + + while (octets_stored < octets_total) { +#ifdef ZMQ_HAVE_WINDOWS + int rc = _write (fd, &write_buf [octets_stored], octets_total - octets_stored); +#else + ssize_t rc = write (fd, &write_buf [octets_stored], octets_total - octets_stored); +#endif + errno_assert (rc > 0); + octets_stored += rc; + } + file_pos += octets_total; +} + +int64_t zmq::msg_store_t::buffer_space () +{ + if (write_pos < read_pos) + return read_pos - write_pos; + + return filesize - (write_pos - read_pos); +} diff --git a/src/msg_store.hpp b/src/msg_store.hpp new file mode 100644 index 0000000..765fc60 --- /dev/null +++ b/src/msg_store.hpp @@ -0,0 +1,114 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_MSG_STORE_HPP_INCLUDED__ +#define __ZMQ_MSG_STORE_HPP_INCLUDED__ + +#include "../include/zmq.h" + +#include +#include "stdint.hpp" + +namespace zmq +{ + + // This class implements a message store. Messages are retrieved from + // the store in the same order as they entered it. + + class msg_store_t + { + public: + + enum { default_block_size = 8192 }; + + // Creates message store. + msg_store_t (int64_t filesize_, size_t block_size_ = default_block_size); + + ~msg_store_t (); + + int init (); + + // Stores the message into the message store. The function + // returns false if the message store is full; true otherwise. + bool store (zmq_msg_t *msg_); + + // Fetches the oldest message from the message store. It is an error + // to call this function when the message store is empty. + void fetch (zmq_msg_t *msg_); + + void commit (); + + void rollback (); + + // Returns true if the message store is empty; false otherwise. + bool empty (); + + // Returns true if and only if the store is full. + bool full (); + + private: + + // Copies data from a memory buffer to the backing file. + // Wraps around when reaching maximum file size. + void copy_from_file (void *buffer_, size_t count_); + + // Copies data from the backing file to the memory buffer. + // Wraps around when reaching end-of-file. + void copy_to_file (const void *buffer_, size_t count_); + + // Returns the buffer space available. + int64_t buffer_space (); + + void fill_buf (char *buf, int64_t pos); + + void save_write_buf (); + + // File descriptor to the backing file. + int fd; + + // Name of the backing file. + std::string filename; + + // Maximum size of the backing file. + int64_t filesize; + + // File offset associated with the fd file descriptor. + int64_t file_pos; + + // File offset the next message will be stored at. + int64_t write_pos; + + // File offset the next message will be read from. + int64_t read_pos; + + int64_t commit_pos; + + size_t block_size; + + char *buf1; + char *buf2; + char *read_buf; + char *write_buf; + + int64_t write_buf_start_addr; + }; + +} + +#endif diff --git a/src/pair.cpp b/src/pair.cpp index 31524de..3872b28 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -115,6 +115,9 @@ int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_) zmq_msg_close (msg_); if (!alive || !inpipe || !inpipe->read (msg_)) { + // No message is available. Initialise the output parameter + // to be a 0-byte message. + zmq_msg_init (msg_); errno = EAGAIN; return -1; } diff --git a/src/pipe.cpp b/src/pipe.cpp index 1df64e9..200beb0 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -21,20 +21,14 @@ #include "pipe.hpp" -zmq::reader_t::reader_t (object_t *parent_, - uint64_t hwm_, uint64_t lwm_) : +zmq::reader_t::reader_t (object_t *parent_, uint64_t lwm_) : object_t (parent_), pipe (NULL), peer (NULL), - hwm (hwm_), lwm (lwm_), msgs_read (0), endpoint (NULL) -{ - // Adjust lwm and hwm. - if (lwm == 0 || lwm > hwm) - lwm = hwm; -} +{} zmq::reader_t::~reader_t () { @@ -50,15 +44,32 @@ void zmq::reader_t::set_pipe (pipe_t *pipe_) register_pipe (pipe); } +bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_) +{ + unsigned char *offset = 0; + + return msg_.content == (void*) (offset + ZMQ_DELIMITER); +} + bool zmq::reader_t::check_read () { // Check if there's an item in the pipe. - if (pipe->check_read ()) - return true; - // If not, deactivate the pipe. - endpoint->kill (this); - return false; + if (!pipe->check_read ()) { + endpoint->kill (this); + return false; + } + + // If the next item in the pipe is message delimiter, + // initiate its termination. + if (pipe->probe (is_delimiter)) { + if (endpoint) + endpoint->detach_inpipe (this); + term (); + return false; + } + + return true; } bool zmq::reader_t::read (zmq_msg_t *msg_) @@ -113,20 +124,28 @@ void zmq::reader_t::process_pipe_term_ack () } zmq::writer_t::writer_t (object_t *parent_, - uint64_t hwm_, uint64_t lwm_) : + uint64_t hwm_, int64_t swap_size_) : object_t (parent_), pipe (NULL), peer (NULL), hwm (hwm_), - lwm (lwm_), msgs_read (0), msgs_written (0), + msg_store (NULL), + extra_msg_flag (false), stalled (false), + pending_close (false), endpoint (NULL) { - // Adjust lwm and hwm. - if (lwm == 0 || lwm > hwm) - lwm = hwm; + if (swap_size_ > 0) { + msg_store = new (std::nothrow) msg_store_t (swap_size_); + if (msg_store != NULL) { + if (msg_store->init () < 0) { + delete msg_store; + msg_store = NULL; + } + } + } } void zmq::writer_t::set_endpoint (i_endpoint *endpoint_) @@ -136,6 +155,10 @@ void zmq::writer_t::set_endpoint (i_endpoint *endpoint_) zmq::writer_t::~writer_t () { + if (extra_msg_flag) + zmq_msg_close (&extra_msg); + + delete msg_store; } void zmq::writer_t::set_pipe (pipe_t *pipe_) @@ -147,7 +170,7 @@ void zmq::writer_t::set_pipe (pipe_t *pipe_) bool zmq::writer_t::check_write () { - if (pipe_full ()) { + if (pipe_full () && (msg_store == NULL || msg_store->full () || extra_msg_flag)) { stalled = true; return false; } @@ -157,29 +180,45 @@ bool zmq::writer_t::check_write () bool zmq::writer_t::write (zmq_msg_t *msg_) { - if (pipe_full ()) { - stalled = true; + if (!check_write ()) return false; + + if (pipe_full ()) { + if (msg_store->store (msg_)) { + if (!(msg_->flags & ZMQ_MSG_MORE)) + msg_store->commit (); + } else { + extra_msg = *msg_; + extra_msg_flag = true; + } + } + else { + pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE); + if (!(msg_->flags & ZMQ_MSG_MORE)) + msgs_written++; } - pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE); - if (!(msg_->flags & ZMQ_MSG_MORE)) - msgs_written++; return true; } void zmq::writer_t::rollback () { - zmq_msg_t msg; + if (extra_msg_flag && extra_msg.flags & ZMQ_MSG_MORE) { + zmq_msg_close (&extra_msg); + extra_msg_flag = false; + } + if (msg_store != NULL) + msg_store->rollback (); + + zmq_msg_t msg; // Remove all incomplete messages from the pipe. while (pipe->unwrite (&msg)) { zmq_assert (msg.flags & ZMQ_MSG_MORE); zmq_msg_close (&msg); - msgs_written--; } - if (stalled && endpoint != NULL && !pipe_full()) { + if (stalled && endpoint != NULL && check_write ()) { stalled = false; endpoint->revive (this); } @@ -198,6 +237,14 @@ void zmq::writer_t::term () // Rollback any unfinished messages. rollback (); + if (msg_store == NULL || (msg_store->empty () && !extra_msg_flag)) + write_delimiter (); + else + pending_close = true; +} + +void zmq::writer_t::write_delimiter () +{ // Push delimiter into the pipe. // Trick the compiler to belive that the tag is a valid pointer. zmq_msg_t msg; @@ -205,12 +252,47 @@ void zmq::writer_t::term () msg.content = (void*) (offset + ZMQ_DELIMITER); msg.flags = 0; pipe->write (msg, false); - pipe->flush (); + flush (); } void zmq::writer_t::process_reader_info (uint64_t msgs_read_) { + zmq_msg_t msg; + msgs_read = msgs_read_; + if (msg_store) { + + // Move messages from backing store into pipe. + while (!pipe_full () && !msg_store->empty ()) { + msg_store->fetch(&msg); + // Write message into the pipe. + pipe->write (msg, msg.flags & ZMQ_MSG_MORE); + if (!(msg.flags & ZMQ_MSG_MORE)) + msgs_written++; + } + + if (extra_msg_flag) { + if (!pipe_full ()) { + pipe->write (extra_msg, extra_msg.flags & ZMQ_MSG_MORE); + if (!(extra_msg.flags & ZMQ_MSG_MORE)) + msgs_written++; + extra_msg_flag = false; + } + else if (msg_store->store (&extra_msg)) { + if (!(extra_msg.flags & ZMQ_MSG_MORE)) + msg_store->commit (); + extra_msg_flag = false; + } + } + + if (pending_close && msg_store->empty () && !extra_msg_flag) { + write_delimiter (); + pending_close = false; + } + + flush (); + } + if (stalled && endpoint != NULL) { stalled = false; endpoint->revive (this); @@ -233,9 +315,9 @@ bool zmq::writer_t::pipe_full () } zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_) : - reader (reader_parent_, hwm_, compute_lwm (hwm_)), - writer (writer_parent_, hwm_, compute_lwm (hwm_)) + uint64_t hwm_, int64_t swap_size_) : + reader (reader_parent_, compute_lwm (hwm_)), + writer (writer_parent_, hwm_, swap_size_) { reader.set_pipe (this); writer.set_pipe (this); @@ -276,6 +358,6 @@ uint64_t zmq::pipe_t::compute_lwm (uint64_t hwm_) if (hwm_ > max_wm_delta * 2) return hwm_ - max_wm_delta; else - return hwm_ / 2; + return (hwm_ + 1) / 2; } diff --git a/src/pipe.hpp b/src/pipe.hpp index 9f57653..ece678a 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -26,6 +26,7 @@ #include "i_endpoint.hpp" #include "yarray_item.hpp" #include "ypipe.hpp" +#include "msg_store.hpp" #include "config.hpp" #include "object.hpp" @@ -36,8 +37,7 @@ namespace zmq { public: - reader_t (class object_t *parent_, - uint64_t hwm_, uint64_t lwm_); + reader_t (class object_t *parent_, uint64_t lwm_); ~reader_t (); void set_pipe (class pipe_t *pipe_); @@ -58,14 +58,16 @@ namespace zmq void process_revive (); void process_pipe_term_ack (); + // Returns true if the message is delimiter; false otherwise. + static bool is_delimiter (zmq_msg_t &msg_); + // The underlying pipe. class pipe_t *pipe; // Pipe writer associated with the other side of the pipe. class writer_t *peer; - // High and low watermarks for in-memory storage (in bytes). - uint64_t hwm; + // Low watermark for in-memory storage (in bytes). uint64_t lwm; // Number of messages read so far. @@ -82,8 +84,7 @@ namespace zmq { public: - writer_t (class object_t *parent_, - uint64_t hwm_, uint64_t lwm_); + writer_t (class object_t *parent_, uint64_t hwm_, int64_t swap_size_); ~writer_t (); void set_pipe (class pipe_t *pipe_); @@ -117,15 +118,18 @@ namespace zmq // Tests whether the pipe is already full. bool pipe_full (); + // Write special message to the pipe so that the reader + // can find out we are finished. + void write_delimiter (); + // The underlying pipe. class pipe_t *pipe; // Pipe reader associated with the other side of the pipe. class reader_t *peer; - // High and low watermarks for in-memory storage (in bytes). + // High watermark for in-memory storage (in bytes). uint64_t hwm; - uint64_t lwm; // Last confirmed number of messages read from the pipe. // The actual number can be higher. @@ -134,9 +138,19 @@ namespace zmq // Number of messages we have written so far. uint64_t msgs_written; + // Pointer to backing store. If NULL, messages are always + // kept in main memory. + msg_store_t *msg_store; + + bool extra_msg_flag; + + zmq_msg_t extra_msg; + // True iff the last attempt to write a message has failed. bool stalled; + bool pending_close; + // Endpoint (either session or socket) the pipe is attached to. i_endpoint *endpoint; @@ -150,7 +164,7 @@ namespace zmq public: pipe_t (object_t *reader_parent_, object_t *writer_parent_, - uint64_t hwm_); + uint64_t hwm_, int64_t swap_size_); ~pipe_t (); reader_t reader; diff --git a/src/platform.hpp.in b/src/platform.hpp.in index a885c21..bab50ec 100644 --- a/src/platform.hpp.in +++ b/src/platform.hpp.in @@ -24,6 +24,9 @@ /* Define to 1 if you have the header file. */ #undef HAVE_INTTYPES_H +/* Define to 1 if you have the `crypto' library (-lcrypto). */ +#undef HAVE_LIBCRYPTO + /* Define to 1 if you have the `iphlpapi' library (-liphlpapi). */ #undef HAVE_LIBIPHLPAPI diff --git a/src/poll.cpp b/src/poll.cpp index 4214195..1b203db 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -165,22 +165,21 @@ void zmq::poll_t::loop () continue; } - for (pollset_t::iterator it = pollset.begin (); - it != pollset.end (); it ++) { + for (pollset_t::size_type i = 0; i != pollset.size (); i++) { - zmq_assert (!(it->revents & POLLNVAL)); - if (it->fd == retired_fd) + zmq_assert (!(pollset [i].revents & POLLNVAL)); + if (pollset [i].fd == retired_fd) continue; - if (it->revents & (POLLERR | POLLHUP)) - fd_table [it->fd].events->in_event (); - if (it->fd == retired_fd) + if (pollset [i].revents & (POLLERR | POLLHUP)) + fd_table [pollset [i].fd].events->in_event (); + if (pollset [i].fd == retired_fd) continue; - if (it->revents & POLLOUT) - fd_table [it->fd].events->out_event (); - if (it->fd == retired_fd) + if (pollset [i].revents & POLLOUT) + fd_table [pollset [i].fd].events->out_event (); + if (pollset [i].fd == retired_fd) continue; - if (it->revents & POLLIN) - fd_table [it->fd].events->in_event (); + if (pollset [i].revents & POLLIN) + fd_table [pollset [i].fd].events->in_event (); } // Clean up the pollset and update the fd_table accordingly. diff --git a/src/prefix_tree.cpp b/src/prefix_tree.cpp index 51225d6..6d4f084 100644 --- a/src/prefix_tree.cpp +++ b/src/prefix_tree.cpp @@ -42,7 +42,7 @@ zmq::prefix_tree_t::~prefix_tree_t () if (count == 1) delete next.node; else if (count > 1) { - for (unsigned char i = 0; i != count; ++i) + for (unsigned short i = 0; i != count; ++i) if (next.table [i]) delete next.table [i]; free (next.table); @@ -74,7 +74,7 @@ void zmq::prefix_tree_t::add (unsigned char *prefix_, size_t size_) next.table = (prefix_tree_t**) malloc (sizeof (prefix_tree_t*) * count); zmq_assert (next.table); - for (unsigned char i = 0; i != count; ++i) + for (unsigned short i = 0; i != count; ++i) next.table [i] = 0; min = std::min (min, c); next.table [oldc - min] = oldp; @@ -82,25 +82,25 @@ void zmq::prefix_tree_t::add (unsigned char *prefix_, size_t size_) else if (min < c) { // The new character is above the current character range. - unsigned char old_count = count; + unsigned short old_count = count; count = c - min + 1; next.table = (prefix_tree_t**) realloc ((void*) next.table, sizeof (prefix_tree_t*) * count); zmq_assert (next.table); - for (unsigned char i = old_count; i != count; i++) + for (unsigned short i = old_count; i != count; i++) next.table [i] = NULL; } else { // The new character is below the current character range. - unsigned char old_count = count; + unsigned short old_count = count; count = (min + old_count) - c; next.table = (prefix_tree_t**) realloc ((void*) next.table, sizeof (prefix_tree_t*) * count); zmq_assert (next.table); memmove (next.table + min - c, next.table, old_count * sizeof (prefix_tree_t*)); - for (unsigned char i = 0; i != min - c; i++) + for (unsigned short i = 0; i != min - c; i++) next.table [i] = NULL; min = c; } diff --git a/src/prefix_tree.hpp b/src/prefix_tree.hpp index 53c7c18..bf1c4b9 100644 --- a/src/prefix_tree.hpp +++ b/src/prefix_tree.hpp @@ -42,7 +42,7 @@ namespace zmq uint32_t refcnt; unsigned char min; - unsigned char count; + unsigned short count; union { class prefix_tree_t *node; class prefix_tree_t **table; diff --git a/src/pull.cpp b/src/pull.cpp new file mode 100644 index 0000000..b2413ee --- /dev/null +++ b/src/pull.cpp @@ -0,0 +1,98 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../include/zmq.h" + +#include "pull.hpp" +#include "err.hpp" + +zmq::pull_t::pull_t (class app_thread_t *parent_) : + socket_base_t (parent_) +{ + options.requires_in = true; + options.requires_out = false; +} + +zmq::pull_t::~pull_t () +{ +} + +void zmq::pull_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_, const blob_t &peer_identity_) +{ + zmq_assert (inpipe_ && !outpipe_); + fq.attach (inpipe_); +} + +void zmq::pull_t::xdetach_inpipe (class reader_t *pipe_) +{ + zmq_assert (pipe_); + fq.detach (pipe_); +} + +void zmq::pull_t::xdetach_outpipe (class writer_t *pipe_) +{ + // There are no outpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::pull_t::xkill (class reader_t *pipe_) +{ + fq.kill (pipe_); +} + +void zmq::pull_t::xrevive (class reader_t *pipe_) +{ + fq.revive (pipe_); +} + +void zmq::pull_t::xrevive (class writer_t *pipe_) +{ + zmq_assert (false); +} + +int zmq::pull_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + // No special options for this socket type. + errno = EINVAL; + return -1; +} + +int zmq::pull_t::xsend (zmq_msg_t *msg_, int flags_) +{ + errno = ENOTSUP; + return -1; +} + +int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + return fq.recv (msg_, flags_); +} + +bool zmq::pull_t::xhas_in () +{ + return fq.has_in (); +} + +bool zmq::pull_t::xhas_out () +{ + return false; +} + diff --git a/src/pull.hpp b/src/pull.hpp new file mode 100644 index 0000000..7f249e9 --- /dev/null +++ b/src/pull.hpp @@ -0,0 +1,62 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_PULL_HPP_INCLUDED__ +#define __ZMQ_PULL_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "fq.hpp" + +namespace zmq +{ + + class pull_t : public socket_base_t + { + public: + + pull_t (class app_thread_t *parent_); + ~pull_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + void xrevive (class writer_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // Fair queueing object for inbound pipes. + fq_t fq; + + pull_t (const pull_t&); + void operator = (const pull_t&); + + }; + +} + +#endif diff --git a/src/push.cpp b/src/push.cpp new file mode 100644 index 0000000..522101f --- /dev/null +++ b/src/push.cpp @@ -0,0 +1,101 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../include/zmq.h" + +#include "push.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::push_t::push_t (class app_thread_t *parent_) : + socket_base_t (parent_) +{ + options.requires_in = false; + options.requires_out = true; +} + +zmq::push_t::~push_t () +{ +} + +void zmq::push_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_, const blob_t &peer_identity_) +{ + zmq_assert (!inpipe_ && outpipe_); + lb.attach (outpipe_); +} + +void zmq::push_t::xdetach_inpipe (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::push_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (pipe_); + lb.detach (pipe_); +} + +void zmq::push_t::xkill (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::push_t::xrevive (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::push_t::xrevive (class writer_t *pipe_) +{ + lb.revive (pipe_); +} + +int zmq::push_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + // No special option for this socket type. + errno = EINVAL; + return -1; +} + +int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_) +{ + return lb.send (msg_, flags_); +} + +int zmq::push_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + errno = ENOTSUP; + return -1; +} + +bool zmq::push_t::xhas_in () +{ + return false; +} + +bool zmq::push_t::xhas_out () +{ + return lb.has_out (); +} + diff --git a/src/push.hpp b/src/push.hpp new file mode 100644 index 0000000..b3c8d87 --- /dev/null +++ b/src/push.hpp @@ -0,0 +1,61 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_PUSH_HPP_INCLUDED__ +#define __ZMQ_PUSH_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "lb.hpp" + +namespace zmq +{ + + class push_t : public socket_base_t + { + public: + + push_t (class app_thread_t *parent_); + ~push_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + void xrevive (class writer_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // Load balancer managing the outbound pipes. + lb_t lb; + + push_t (const push_t&); + void operator = (const push_t&); + }; + +} + +#endif diff --git a/src/queue.cpp b/src/queue.cpp index 470ea67..36fab07 100644 --- a/src/queue.cpp +++ b/src/queue.cpp @@ -23,6 +23,7 @@ #include "queue.hpp" #include "socket_base.hpp" +#include "likely.hpp" #include "err.hpp" int zmq::queue (class socket_base_t *insocket_, @@ -49,7 +50,11 @@ int zmq::queue (class socket_base_t *insocket_, // Wait while there are either requests or replies to process. rc = zmq_poll (&items [0], 2, -1); - errno_assert (rc > 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } // The algorithm below asumes ratio of request and replies processed // under full load to be 1:1. Although processing requests replies @@ -61,14 +66,26 @@ int zmq::queue (class socket_base_t *insocket_, while (true) { rc = insocket_->recv (&msg, 0); - errno_assert (rc == 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } moresz = sizeof (more); rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); - errno_assert (rc == 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); - errno_assert (rc == 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } if (!more) break; @@ -80,14 +97,26 @@ int zmq::queue (class socket_base_t *insocket_, while (true) { rc = outsocket_->recv (&msg, 0); - errno_assert (rc == 0); + if (unlikely (rc < 0)) { + if (errno