summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--AUTHORS2
-rw-r--r--Makefile.am8
-rw-r--r--bindings/c/zmq.h11
-rw-r--r--bindings/java/org/zmq/Socket.java2
-rw-r--r--bindings/python/pyzmq.cpp6
-rw-r--r--bindings/ruby/rbzmq.cpp2
-rw-r--r--builds/msvc/libzmq/libzmq.vcproj16
-rw-r--r--configure.in23
-rw-r--r--devices/Makefile.am8
-rw-r--r--devices/zmq_forwarder/zmq_forwarder.cpp7
-rw-r--r--devices/zmq_streamer/Makefile.am9
-rw-r--r--devices/zmq_streamer/zmq_streamer.cpp122
-rw-r--r--man/Makefile.am19
-rw-r--r--man/convert2pdf.sh66
-rw-r--r--man/man1/zmq_forwarder.111
-rw-r--r--man/man3/zmq_bind.348
-rw-r--r--man/man3/zmq_close.327
-rw-r--r--man/man3/zmq_connect.347
-rw-r--r--man/man3/zmq_flush.337
-rw-r--r--man/man3/zmq_init.339
-rw-r--r--man/man3/zmq_msg_close.332
-rw-r--r--man/man3/zmq_msg_copy.343
-rw-r--r--man/man3/zmq_msg_data.327
-rw-r--r--man/man3/zmq_msg_init.333
-rw-r--r--man/man3/zmq_msg_init_data.348
-rw-r--r--man/man3/zmq_msg_init_size.344
-rw-r--r--man/man3/zmq_msg_move.338
-rw-r--r--man/man3/zmq_msg_size.330
-rw-r--r--man/man3/zmq_poll.365
-rw-r--r--man/man3/zmq_recv.352
-rw-r--r--man/man3/zmq_send.364
-rw-r--r--man/man3/zmq_setsockopt.3115
-rw-r--r--man/man3/zmq_socket.377
-rw-r--r--man/man3/zmq_strerror.327
-rw-r--r--man/man3/zmq_term.325
-rw-r--r--man/man7/zmq.79
-rw-r--r--perf/c/local_thr.c3
-rw-r--r--src/Makefile.am4
-rw-r--r--src/app_thread.cpp14
-rw-r--r--src/command.hpp4
-rw-r--r--src/devpoll.cpp3
-rw-r--r--src/dispatcher.cpp56
-rw-r--r--src/dispatcher.hpp12
-rw-r--r--src/downstream.cpp131
-rw-r--r--src/downstream.hpp64
-rw-r--r--src/kqueue.cpp3
-rw-r--r--src/object.cpp29
-rw-r--r--src/object.hpp14
-rw-r--r--src/p2p.hpp4
-rw-r--r--src/pipe.cpp6
-rw-r--r--src/pub.hpp4
-rw-r--r--src/rep.cpp11
-rw-r--r--src/rep.hpp4
-rw-r--r--src/req.hpp4
-rw-r--r--src/session.cpp10
-rw-r--r--src/simple_semaphore.hpp60
-rw-r--r--src/socket_base.cpp72
-rw-r--r--src/socket_base.hpp16
-rw-r--r--src/sub.hpp4
-rw-r--r--src/upstream.cpp143
-rw-r--r--src/upstream.hpp69
-rw-r--r--src/zmq.cpp6
-rw-r--r--src/zmq_decoder.cpp8
-rw-r--r--src/zmq_encoder.cpp7
-rw-r--r--src/zmq_listener_init.cpp1
65 files changed, 1941 insertions, 64 deletions
diff --git a/AUTHORS b/AUTHORS
index 13f0076..5e09bd3 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -9,6 +9,7 @@ Dirk O. Kaar
Erich Heine
Frank Denis
George Neill
+Jon Dyte
Martin Hurton
Martin Lucina
Martin Sustrik
@@ -36,3 +37,4 @@ Matt Muggeridge
Paulo Henrique Silva
Peter Lemenkov
Robert Zhang
+Vitaly Mayatskikh
diff --git a/Makefile.am b/Makefile.am
index fe0f1f0..af6ba2e 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -2,8 +2,12 @@ if BUILD_PERF
DIR_PERF = perf
endif
-SUBDIRS = src $(DIR_PERF) devices bindings
-DIST_SUBDIRS = src perf devices bindings
+if INSTALL_MAN
+DIR_MAN = man
+endif
+
+SUBDIRS = src $(DIR_MAN) $(DIR_PERF) devices bindings
+DIST_SUBDIRS = src man perf devices bindings
EXTRA_DIST = $(top_srcdir)/foreign/openpgm/@pgm1_basename@.tar.bz2 \
$(top_srcdir)/foreign/openpgm/@pgm2_basename@ \
diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h
index 40750ec..ff94dd4 100644
--- a/bindings/c/zmq.h
+++ b/bindings/c/zmq.h
@@ -63,6 +63,9 @@ extern "C" {
#ifndef EADDRNOTAVAIL
#define EADDRNOTAVAIL (ZMQ_HAUSNUMERO + 6)
#endif
+#ifndef ECONNREFUSED
+#define ECONNREFUSED (ZMQ_HAUSNUMERO + 7)
+#endif
// Native 0MQ error codes.
#define EMTHREAD (ZMQ_HAUSNUMERO + 50)
@@ -180,7 +183,7 @@ ZMQ_EXPORT int zmq_term (void *context);
// Socket to send requests and receive replies. Requests are
// load-balanced among all the peers. This socket type allows
-// only an alternated sequence of send's and recv's
+// only an alternated sequence of send's and recv's.
#define ZMQ_REQ 3
// Socket to receive requests and send replies. This socket type allows
@@ -188,6 +191,12 @@ ZMQ_EXPORT int zmq_term (void *context);
// the peer that issued the last received request.
#define ZMQ_REP 4
+// Socket to receive messages from up the stream.
+#define ZMQ_UPSTREAM 5
+
+// Socket to send messages downstream.
+#define ZMQ_DOWNSTREAM 6
+
// Open a socket. 'type' is one of the socket types defined above.
//
// Errors: EINVAL - invalid socket type.
diff --git a/bindings/java/org/zmq/Socket.java b/bindings/java/org/zmq/Socket.java
index 501bc16..396a6a0 100644
--- a/bindings/java/org/zmq/Socket.java
+++ b/bindings/java/org/zmq/Socket.java
@@ -34,6 +34,8 @@ public class Socket
public static final int SUB = 2;
public static final int REQ = 3;
public static final int REP = 4;
+ public static final int UPSTREAM = 4;
+ public static final int DOWNSTREAM = 4;
public static final int HWM = 1;
public static final int LWM = 2;
diff --git a/bindings/python/pyzmq.cpp b/bindings/python/pyzmq.cpp
index b180bcd..26ca7ac 100644
--- a/bindings/python/pyzmq.cpp
+++ b/bindings/python/pyzmq.cpp
@@ -498,6 +498,12 @@ PyMODINIT_FUNC initlibpyzmq ()
t = PyInt_FromLong (ZMQ_REP);
PyDict_SetItemString (dict, "REP", t);
Py_DECREF (t);
+ t = PyInt_FromLong (ZMQ_UPSTREAM);
+ PyDict_SetItemString (dict, "UPSTREAM", t);
+ Py_DECREF (t);
+ t = PyInt_FromLong (ZMQ_DOWNSTREAM);
+ PyDict_SetItemString (dict, "DOWNSTREAM", t);
+ Py_DECREF (t);
t = PyInt_FromLong (ZMQ_HWM);
PyDict_SetItemString (dict, "HWM", t);
Py_DECREF (t);
diff --git a/bindings/ruby/rbzmq.cpp b/bindings/ruby/rbzmq.cpp
index 6112972..2a26ce1 100644
--- a/bindings/ruby/rbzmq.cpp
+++ b/bindings/ruby/rbzmq.cpp
@@ -275,6 +275,8 @@ extern "C" void Init_librbzmq ()
rb_define_global_const ("PUB", INT2NUM (ZMQ_PUB));
rb_define_global_const ("REQ", INT2NUM (ZMQ_REQ));
rb_define_global_const ("REP", INT2NUM (ZMQ_REP));
+ rb_define_global_const ("UPSTREAM", INT2NUM (ZMQ_UPSTREAM));
+ rb_define_global_const ("DOWNSTREAM", INT2NUM (ZMQ_DOWNSTREAM));
rb_define_global_const ("POLL", INT2NUM (ZMQ_POLL));
}
diff --git a/builds/msvc/libzmq/libzmq.vcproj b/builds/msvc/libzmq/libzmq.vcproj
index 8927fa2..e575d67 100644
--- a/builds/msvc/libzmq/libzmq.vcproj
+++ b/builds/msvc/libzmq/libzmq.vcproj
@@ -182,6 +182,10 @@
>
</File>
<File
+ RelativePath="..\..\..\src\downstream.cpp"
+ >
+ </File>
+ <File
RelativePath="..\..\..\src\epoll.cpp"
>
</File>
@@ -290,6 +294,10 @@
>
</File>
<File
+ RelativePath="..\..\..\src\upstream.cpp"
+ >
+ </File>
+ <File
RelativePath="..\..\..\src\uuid.cpp"
>
</File>
@@ -376,6 +384,10 @@
>
</File>
<File
+ RelativePath="..\..\..\src\downstream.hpp"
+ >
+ </File>
+ <File
RelativePath="..\..\..\src\epoll.hpp"
>
</File>
@@ -532,6 +544,10 @@
>
</File>
<File
+ RelativePath="..\..\..\src\upstream.hpp"
+ >
+ </File>
+ <File
RelativePath="..\..\..\src\uuid.hpp"
>
</File>
diff --git a/configure.in b/configure.in
index 1b1db35..bd3a0f4 100644
--- a/configure.in
+++ b/configure.in
@@ -49,6 +49,10 @@ on_mingw32="no"
# Host speciffic checks
AC_CANONICAL_HOST
+# Whether or not install manual pages.
+# Note that on MinGW manpages are not installed.
+install_man="yes"
+
case "${host_os}" in
*linux*)
AC_DEFINE(ZMQ_HAVE_LINUX, 1, [Have Linux OS])
@@ -134,6 +138,7 @@ case "${host_os}" in
[AC_MSG_ERROR([Could not link with Iphlpapi.dll.])])
CFLAGS="${CFLAGS} -std=c99"
on_mingw32="yes"
+ install_man="no"
;;
*)
AC_MSG_ERROR([Not supported os: $host.])
@@ -585,6 +590,14 @@ if test "x$with_forwarder" != "xno"; then
forwarder="yes"
fi
+# streamer device
+streamer="no"
+AC_ARG_WITH([streamer], [AS_HELP_STRING([--with-streamer],
+ [build streamer device [default=no]])], [with_streamer=yes], [with_streamer=no])
+
+if test "x$with_streamer" != "xno"; then
+ streamer="yes"
+fi
# Perf
perf="no"
@@ -613,10 +626,12 @@ AM_CONDITIONAL(BUILD_CPP, test "x$cppzmq" = "xyes")
AM_CONDITIONAL(BUILD_PGM1, test "x$pgm1_ext" = "xyes")
AM_CONDITIONAL(BUILD_PGM2, test "x$pgm2_ext" = "xyes")
AM_CONDITIONAL(BUILD_NO_PGM, test "x$pgm2_ext" = "xno" -a "x$pgm1_ext" = "xno")
-AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes")
+AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes")
+AM_CONDITIONAL(BUILD_STREAMER, test "x$streamer" = "xyes")
AM_CONDITIONAL(BUILD_PERF, test "x$perf" = "xyes")
AM_CONDITIONAL(ON_MINGW, test "x$on_mingw32" = "xyes")
AM_CONDITIONAL(BUILD_PGM2_EXAMPLES, test "x$with_pgm2_ext" = "xyes")
+AM_CONDITIONAL(INSTALL_MAN, test "x$install_man" = "xyes")
AC_SUBST(stdint)
AC_SUBST(inttypes)
@@ -631,11 +646,12 @@ AC_FUNC_MALLOC
AC_TYPE_SIGNAL
AC_CHECK_FUNCS(perror gettimeofday memset socket getifaddrs freeifaddrs)
-AC_OUTPUT(Makefile src/Makefile bindings/python/Makefile \
+AC_OUTPUT(Makefile src/Makefile man/Makefile bindings/python/Makefile \
bindings/python/setup.py bindings/ruby/Makefile \
bindings/java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \
perf/python/Makefile perf/ruby/Makefile perf/java/Makefile src/libzmq.pc \
- devices/Makefile devices/zmq_forwarder/Makefile bindings/Makefile)
+ devices/Makefile devices/zmq_forwarder/Makefile \
+ devices/zmq_streamer/Makefile bindings/Makefile)
AC_MSG_RESULT([])
AC_MSG_RESULT([ ******************************************************** ])
@@ -670,6 +686,7 @@ AC_MSG_RESULT([ PGM: no])
fi
AC_MSG_RESULT([ Devices:])
AC_MSG_RESULT([ forwarder: $forwarder])
+AC_MSG_RESULT([ streamer: $streamer])
AC_MSG_RESULT([ Performance tests: $perf])
AC_MSG_RESULT([])
AC_MSG_RESULT([ ******************************************************** ])
diff --git a/devices/Makefile.am b/devices/Makefile.am
index 4cbad14..ab18976 100644
--- a/devices/Makefile.am
+++ b/devices/Makefile.am
@@ -2,5 +2,9 @@ if BUILD_FORWARDER
FORWARDER_DIR = zmq_forwarder
endif
-SUBDIRS = $(FORWARDER_DIR)
-DIST_SUBDIRS = zmq_forwarder
+if BUILD_STREAMER
+STREAMER_DIR = zmq_streamer
+endif
+
+SUBDIRS = $(FORWARDER_DIR) $(STREAMER_DIR)
+DIST_SUBDIRS = zmq_forwarder zmq_streamer
diff --git a/devices/zmq_forwarder/zmq_forwarder.cpp b/devices/zmq_forwarder/zmq_forwarder.cpp
index 32af5dd..d29ed62 100644
--- a/devices/zmq_forwarder/zmq_forwarder.cpp
+++ b/devices/zmq_forwarder/zmq_forwarder.cpp
@@ -23,7 +23,7 @@
int main (int argc, char *argv [])
{
if (argc != 2) {
- fprintf (stderr, "usage: forwarder <config-file>\n");
+ fprintf (stderr, "usage: zmq_forwarder <config-file>\n");
return 1;
}
@@ -53,8 +53,9 @@ int main (int argc, char *argv [])
// TODO: make the number of I/O threads configurable.
zmq::context_t ctx (1, 1);
- zmq::socket_t in_socket (ctx, ZMQ_P2P);
- zmq::socket_t out_socket (ctx, ZMQ_P2P);
+ zmq::socket_t in_socket (ctx, ZMQ_SUB);
+ in_socket.setsockopt (ZMQ_SUBSCRIBE, "*", 1);
+ zmq::socket_t out_socket (ctx, ZMQ_PUB);
int n = 0;
while (true) {
diff --git a/devices/zmq_streamer/Makefile.am b/devices/zmq_streamer/Makefile.am
new file mode 100644
index 0000000..e3681bf
--- /dev/null
+++ b/devices/zmq_streamer/Makefile.am
@@ -0,0 +1,9 @@
+INCLUDES = -I$(top_builddir)/bindings/c
+
+bin_PROGRAMS = zmq_streamer
+
+zmq_streamer_LDADD = $(top_builddir)/src/libzmq.la
+zmq_streamer_SOURCES = zmq_streamer.cpp
+zmq_streamer_CXXFLAGS = -Wall -pedantic -Werror
+
+
diff --git a/devices/zmq_streamer/zmq_streamer.cpp b/devices/zmq_streamer/zmq_streamer.cpp
new file mode 100644
index 0000000..84e6569
--- /dev/null
+++ b/devices/zmq_streamer/zmq_streamer.cpp
@@ -0,0 +1,122 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../../bindings/cpp/zmq.hpp"
+#include "../../foreign/xmlParser/xmlParser.cpp"
+
+int main (int argc, char *argv [])
+{
+ if (argc != 2) {
+ fprintf (stderr, "usage: zmq_streamer <config-file>\n");
+ return 1;
+ }
+
+ XMLNode root = XMLNode::parseFile (argv [1]);
+ if (root.isEmpty ()) {
+ fprintf (stderr, "configuration file not found\n");
+ return 1;
+ }
+
+ if (strcmp (root.getName (), "streamer") != 0) {
+ fprintf (stderr, "root element in the configuration file should be "
+ "named 'streamer'\n");
+ return 1;
+ }
+
+ XMLNode in_node = root.getChildNode ("in");
+ if (in_node.isEmpty ()) {
+ fprintf (stderr, "'in' node is missing in the configuration file\n");
+ return 1;
+ }
+
+ XMLNode out_node = root.getChildNode ("out");
+ if (out_node.isEmpty ()) {
+ fprintf (stderr, "'out' node is missing in the configuration file\n");
+ return 1;
+ }
+
+ // TODO: make the number of I/O threads configurable.
+ zmq::context_t ctx (1, 1);
+ zmq::socket_t in_socket (ctx, ZMQ_UPSTREAM);
+ zmq::socket_t out_socket (ctx, ZMQ_DOWNSTREAM);
+
+ int n = 0;
+ while (true) {
+ XMLNode bind = in_node.getChildNode ("bind", n);
+ if (bind.isEmpty ())
+ break;
+ const char *addr = bind.getAttribute ("addr");
+ if (!addr) {
+ fprintf (stderr, "'bind' node is missing 'addr' attribute\n");
+ return 1;
+ }
+ in_socket.bind (addr);
+ n++;
+ }
+
+ n = 0;
+ while (true) {
+ XMLNode connect = in_node.getChildNode ("connect", n);
+ if (connect.isEmpty ())
+ break;
+ const char *addr = connect.getAttribute ("addr");
+ if (!addr) {
+ fprintf (stderr, "'connect' node is missing 'addr' attribute\n");
+ return 1;
+ }
+ in_socket.connect (addr);
+ n++;
+ }
+
+ n = 0;
+ while (true) {
+ XMLNode bind = out_node.getChildNode ("bind", n);
+ if (bind.isEmpty ())
+ break;
+ const char *addr = bind.getAttribute ("addr");
+ if (!addr) {
+ fprintf (stderr, "'bind' node is missing 'addr' attribute\n");
+ return 1;
+ }
+ out_socket.bind (addr);
+ n++;
+ }
+
+ n = 0;
+ while (true) {
+ XMLNode connect = out_node.getChildNode ("connect", n);
+ if (connect.isEmpty ())
+ break;
+ const char *addr = connect.getAttribute ("addr");
+ if (!addr) {
+ fprintf (stderr, "'connect' node is missing 'addr' attribute\n");
+ return 1;
+ }
+ out_socket.connect (addr);
+ n++;
+ }
+
+ zmq::message_t msg;
+ while (true) {
+ in_socket.recv (&msg);
+ out_socket.send (msg);
+ }
+
+ return 0;
+}
diff --git a/man/Makefile.am b/man/Makefile.am
new file mode 100644
index 0000000..4f8c05a
--- /dev/null
+++ b/man/Makefile.am
@@ -0,0 +1,19 @@
+dist_man_MANS = man1/zmq_forwarder.1 man3/zmq_init.3 man3/zmq_term.3 \
+ man3/zmq_socket.3 man3/zmq_close.3 man3/zmq_setsockopt.3 man3/zmq_bind.3 \
+ man3/zmq_connect.3 man3/zmq_send.3 man3/zmq_flush.3 man3/zmq_recv.3 \
+ man3/zmq_poll.3 man3/zmq_msg_init.3 man3/zmq_msg_init_size.3 \
+ man3/zmq_msg_close.3 man3/zmq_msg_move.3 man3/zmq_msg_copy.3 \
+ man3/zmq_msg_data.3 man3/zmq_msg_size.3 \
+ man3/zmq_strerror.3 man7/zmq.7
+
+distclean-local:
+ -rm *.pdf
+ -rm man1/*.ps
+ -rm man3/*.ps
+ -rm man7/*.ps
+
+dist-hook:
+ ./convert2pdf.sh
+ $(mkdir_p) $(top_distdir)/doc
+ cp $(top_srcdir)/man/*.pdf $(top_distdir)/doc
+
diff --git a/man/convert2pdf.sh b/man/convert2pdf.sh
new file mode 100644
index 0000000..85bc22c
--- /dev/null
+++ b/man/convert2pdf.sh
@@ -0,0 +1,66 @@
+#!/bin/sh
+#
+# Copyright (c) 2007-2009 FastMQ Inc.
+#
+# This file is part of 0MQ.
+#
+# 0MQ is free software; you can redistribute it and/or modify it under
+# the terms of the Lesser GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# 0MQ is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Lesser GNU General Public License for more details.
+#
+# You should have received a copy of the Lesser GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+groff -man -Tps man1/zmq_forwarder.1 > man1/zmq_forwarder.1.ps
+ps2pdf man1/zmq_forwarder.1.ps zmq_forwarder.pdf
+
+groff -man -Tps man3/zmq_init.3 > man3/zmq_init.3.ps
+ps2pdf man3/zmq_init.3.ps zmq_init.pdf
+groff -man -Tps man3/zmq_term.3 > man3/zmq_term.3.ps
+ps2pdf man3/zmq_term.3.ps zmq_term.pdf
+groff -man -Tps man3/zmq_socket.3 > man3/zmq_socket.3.ps
+ps2pdf man3/zmq_socket.3.ps zmq_socket.pdf
+groff -man -Tps man3/zmq_close.3 > man3/zmq_close.3.ps
+ps2pdf man3/zmq_close.3.ps zmq_close.pdf
+groff -man -Tps man3/zmq_setsockopt.3 > man3/zmq_setsockopt.3.ps
+ps2pdf man3/zmq_setsockopt.3.ps zmq_setsockopt.pdf
+groff -man -Tps man3/zmq_bind.3 > man3/zmq_bind.3.ps
+ps2pdf man3/zmq_bind.3.ps zmq_bind.pdf
+groff -man -Tps man3/zmq_connect.3 > man3/zmq_connect.3.ps
+ps2pdf man3/zmq_connect.3.ps zmq_connect.pdf
+groff -man -Tps man3/zmq_send.3 > man3/zmq_send.3.ps
+ps2pdf man3/zmq_send.3.ps zmq_send.pdf
+groff -man -Tps man3/zmq_flush.3 > man3/zmq_flush.3.ps
+ps2pdf man3/zmq_flush.3.ps zmq_flush.pdf
+groff -man -Tps man3/zmq_recv.3 > man3/zmq_recv.3.ps
+ps2pdf man3/zmq_recv.3.ps zmq_recv.pdf
+groff -man -Tps man3/zmq_poll.3 > man3/zmq_poll.3.ps
+ps2pdf man3/zmq_poll.3.ps zmq_poll.pdf
+groff -man -Tps man3/zmq_msg_init.3 > man3/zmq_msg_init.3.ps
+ps2pdf man3/zmq_msg_init.3.ps zmq_msg_init.pdf
+groff -man -Tps man3/zmq_msg_init_size.3 > man3/zmq_msg_init_size.3.ps
+ps2pdf man3/zmq_msg_init_size.3.ps zmq_msg_init_size.pdf
+groff -man -Tps man3/zmq_msg_init_data.3 > man3/zmq_msg_init_data.3.ps
+ps2pdf man3/zmq_msg_init_data.3.ps zmq_msg_init_data.pdf
+groff -man -Tps man3/zmq_msg_close.3 > man3/zmq_msg_close.3.ps
+ps2pdf man3/zmq_msg_close.3.ps zmq_msg_close.pdf
+groff -man -Tps man3/zmq_msg_move.3 > man3/zmq_msg_move.3.ps
+ps2pdf man3/zmq_msg_move.3.ps zmq_msg_move.pdf
+groff -man -Tps man3/zmq_msg_copy.3 > man3/zmq_msg_copy.3.ps
+ps2pdf man3/zmq_msg_copy.3.ps zmq_msg_copy.pdf
+groff -man -Tps man3/zmq_msg_data.3 > man3/zmq_msg_data.3.ps
+ps2pdf man3/zmq_msg_data.3.ps zmq_msg_data.pdf
+groff -man -Tps man3/zmq_msg_size.3 > man3/zmq_msg_size.3.ps
+ps2pdf man3/zmq_msg_size.3.ps zmq_msg_size.pdf
+groff -man -Tps man3/zmq_strerror.3 > man3/zmq_strerror.3.ps
+ps2pdf man3/zmq_strerror.3.ps zmq_strerror.pdf
+
+groff -man -Tps man7/zmq.7 > man7/zmq.7.ps
+ps2pdf man7/zmq.7.ps zmq.pdf
+
diff --git a/man/man1/zmq_forwarder.1 b/man/man1/zmq_forwarder.1
new file mode 100644
index 0000000..63a0b6b
--- /dev/null
+++ b/man/man1/zmq_forwarder.1
@@ -0,0 +1,11 @@
+.TH zmq_forwarder 1 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_forwarder \- forwards the stream of PUB/SUB messages
+.SH SYNOPSIS
+.SH DESCRIPTION
+.SH OPTIONS
+.SH "SEE ALSO"
+.SH AUTHOR
+Martin Sustrik <sustrik at fastmq dot com>
+
+
diff --git a/man/man3/zmq_bind.3 b/man/man3/zmq_bind.3
new file mode 100644
index 0000000..069b966
--- /dev/null
+++ b/man/man3/zmq_bind.3
@@ -0,0 +1,48 @@
+.TH zmq_bind 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_bind \- binds the socket to the specified address
+.SH SYNOPSIS
+.B int zmq_bind (void *s, const char *addr);
+.SH DESCRIPTION
+The function binds socket
+.IR s to a particular transport. Actual semantics of the
+command depend on the underlying transport mechanism, however, in cases where
+peers connect in an asymetric manner,
+.IR zmq_bind
+should be called first,
+.IR zmq_connect
+afterwards. For actual formats of
+.IR addr
+parameter for different types of transport have a look at
+.IR zmq(7) .
+Note that single socket can be bound (and connected) to
+arbitrary number of peers using different transport mechanisms.
+.SH RETURN VALUE
+In case of success the function returns zero. Otherwise it returns -1 and
+sets
+.IR errno
+to the appropriate value.
+.SH ERRORS
+.IP "\fBEPROTONOSUPPORT\fP"
+unsupported protocol.
+.IP "\fBENOCOMPATPROTO\fP"
+protocol is not compatible with the socket type.
+.IP "\fBEADDRINUSE\fP"
+the given address is already in use.
+.IP "\fBEADDRNOTAVAIL\fP"
+a nonexistent interface was requested or the requested address was not local.
+.SH EXAMPLE
+.nf
+void *s = zmq_socket (context, ZMQ_PUB);
+assert (s);
+int rc = zmq_bind (s, "inproc://my_publisher");
+assert (rc == 0);
+rc = zmq_bind (s, "tcp://eth0:5555");
+assert (rc == 0);
+.fi
+.SH SEE ALSO
+.BR zmq_connect (3)
+.BR zmq_socket (3)
+.BR zmq (7)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_close.3 b/man/man3/zmq_close.3
new file mode 100644
index 0000000..cc49635
--- /dev/null
+++ b/man/man3/zmq_close.3
@@ -0,0 +1,27 @@
+.TH zmq_close 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_close \- destroys 0MQ socket
+.SH SYNOPSIS
+.B int zmq_close (void *s);
+.SH DESCRIPTION
+Destroys 0MQ socket (one created using
+.IR zmq_socket
+function). All sockets have to be properly closed before the application
+terminates, otherwise memory leaks will occur.
+.SH RETURN VALUE
+In case of success the function returns zero. Otherwise it returns -1 and
+sets
+.IR errno
+to the appropriate value.
+.SH ERRORS
+No errors are defined.
+.SH EXAMPLE
+.nf
+int rc = zmq_close (s);
+assert (rc == 0);
+.fi
+.SH SEE ALSO
+.BR zmq_socket (3)
+.BR zmq_term (3)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_connect.3 b/man/man3/zmq_connect.3
new file mode 100644
index 0000000..8f09e20
--- /dev/null
+++ b/man/man3/zmq_connect.3
@@ -0,0 +1,47 @@
+.TH zmq_connect 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_connect \- connect the socket to the specified peer
+.SH SYNOPSIS
+.B int zmq_connect (void *s, const char *addr);
+.SH DESCRIPTION
+The function connect socket
+.IR s to the peer identified by
+.IR addr .
+Actual semantics of the command depend on the underlying transport mechanism,
+however, in cases where peers connect in an asymetric manner,
+.IR zmq_bind
+should be called first,
+.IR zmq_connect
+afterwards. For actual formats of
+.IR addr
+parameter for different types of transport have a look at
+.IR zmq(7) .
+Note that single socket can be connected (and bound) to
+arbitrary number of peers using different transport mechanisms.
+.SH RETURN VALUE
+In case of success the function returns zero. Otherwise it returns -1 and
+sets
+.IR errno
+to the appropriate value.
+.SH ERRORS
+.IP "\fBEPROTONOSUPPORT\fP"
+unsupported protocol.
+.IP "\fBENOCOMPATPROTO\fP"
+protocol is not compatible with the socket type.
+.IP "\fBECONNREFUSED\fP"
+no-one listening on the remote address.
+.SH EXAMPLE
+.nf
+void *s = zmq_socket (context, ZMQ_SUB);
+assert (s);
+int rc = zmq_connect (s, "inproc://my_publisher");
+assert (rc == 0);
+rc = zmq_connect (s, "tcp://server001:5555");
+assert (rc == 0);
+.fi
+.SH SEE ALSO
+.BR zmq_bind (3)
+.BR zmq_socket (3)
+.BR zmq (7)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_flush.3 b/man/man3/zmq_flush.3
new file mode 100644
index 0000000..194cf6c
--- /dev/null
+++ b/man/man3/zmq_flush.3
@@ -0,0 +1,37 @@
+.TH zmq_flush 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_flush \- flushes pre-sent messages to the socket
+.SH SYNOPSIS
+.B int zmq_flush (void *s);
+.SH DESCRIPTION
+Flushes all the pre-sent messages - i.e. those that have been sent with
+ZMQ_NOFLUSH flag - to the socket. This functionality improves performance in
+cases where several messages are sent during a single business operation.
+It should not be used as a transaction - ACID properties are not guaranteed.
+Note that calling
+.IR zmq_send
+without ZMQ_NOFLUSH flag automatically flushes all previously pre-sent messages.
+.SH RETURN VALUE
+In case of success the function returns zero. Otherwise it returns -1 and
+sets
+.IR errno
+to the appropriate value.
+.SH ERRORS
+.IP "\fBENOTSUP\fP"
+function isn't supported by particular socket type.
+.IP "\fBEFSM\fP"
+function cannot be called at the moment, because socket is not in the
+approprite state.
+.SH EXAMPLE
+.nf
+rc = zmq_send (s, &msg1, ZMQ_NOFLUSH);
+assert (rc == 0);
+rc = zmq_send (s, &msg2, ZMQ_NOFLUSH);
+assert (rc == 0);
+rc = zmq_flush (s);
+assert (rc == 0);
+.fi
+.SH SEE ALSO
+.BR zmq_send (3)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_init.3 b/man/man3/zmq_init.3
new file mode 100644
index 0000000..317dbba
--- /dev/null
+++ b/man/man3/zmq_init.3
@@ -0,0 +1,39 @@
+.TH zmq_init 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_init \- initialises 0MQ context
+.SH SYNOPSIS
+.B void *zmq_init (int app_threads, int io_threads, int flags);
+.SH DESCRIPTION
+Initialises 0MQ context.
+.IR app_threads
+specifies maximal number of application threads that can own open sockets
+at the same time. At least one application thread should be defined.
+.IR io_threads
+specifies the size of thread pool to handle I/O operations. The value shouldn't
+be negative. Zero can be used in case only in-process messaging is going to be
+used, i.e. there will be no I/O traffic.
+'flags' argument is a combination of the flags defined below:
+
+.IP "\fBZMQ_POLL\fP"
+flag specifying that the sockets within this context should be pollable (see
+.IR zmq_poll
+). Pollable sockets may add a little latency to the message transfer when
+compared to non-pollable sockets.
+
+.SH RETURN VALUE
+Function returns context handle is successful. Otherwise it returns NULL and
+sets errno to one of the values below.
+.SH ERRORS
+.IP "\fBEINVAL\fP"
+there's less than one application thread allocated, or number of I/O threads
+is negative.
+.SH EXAMPLE
+.nf
+void *ctx = zmq_init (1, 1, ZMQ_POLL);
+assert (ctx);
+.fi
+.SH SEE ALSO
+.BR zmq_term (3)
+.BR zmq_socket (3)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_msg_close.3 b/man/man3/zmq_msg_close.3
new file mode 100644
index 0000000..6613360
--- /dev/null
+++ b/man/man3/zmq_msg_close.3
@@ -0,0 +1,32 @@
+.TH zmq_msg_close 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_msg_close \- destroys 0MQ message
+.SH SYNOPSIS
+.B int zmq_msg_close (zmq_msg_t *msg);
+.SH DESCRIPTION
+Deallocates message
+.IR msg
+including any associated buffers (unless the buffer is
+shared with another message). Not calling this function can result in
+memory leaks.
+.SH RETURN VALUE
+In case of success the function returns zero. Otherwise it returns -1 and
+sets
+.IR errno
+to the appropriate value.
+.SH ERRORS
+No errors are defined.
+.SH EXAMPLE
+.nf
+zmq_msg_t msg;
+rc = zmq_msg_init_size (&msg, 1000000);
+assert (rc = 0);
+rc = zmq_msg_close (&msg);
+assert (rc = 0);
+.fi
+.SH SEE ALSO
+.BR zmq_msg_init (3)
+.BR zmq_msg_init_size (3)
+.BR zmq_msg_init_data (3)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_msg_copy.3 b/man/man3/zmq_msg_copy.3
new file mode 100644
index 0000000..2f70400
--- /dev/null
+++ b/man/man3/zmq_msg_copy.3
@@ -0,0 +1,43 @@
+.TH zmq_msg_copy 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_msg_copy \- copies content of a message to another message
+.SH SYNOPSIS
+.B int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
+.SH DESCRIPTION
+Copy the
+.IR src
+message to
+.IR dest .
+The original content of
+.IR dest
+is orderly deallocated.
+Caution: The implementation may choose not to physically copy the data, rather
+to share the buffer between two messages. Thus avoid modifying message data
+after the message was copied. Doing so can modify multiple message instances.
+If what you need is actual hard copy, allocate new message using
+.IR zmq_msg_size
+and copy the data using
+.IR memcpy .
+.SH RETURN VALUE
+In case of success the function returns zero. Otherwise it returns -1 and
+sets
+.IR errno
+to the appropriate value.
+.SH ERRORS
+No errors are defined.
+.SH EXAMPLE
+.nf
+zmq_msg_t dest;
+rc = zmq_msg_init (&dest);
+assert (rc == 0);
+rc = zmq_msg_copy (&dest, &src);
+assert (rc == 0);
+.fi
+.SH SEE ALSO
+.BR zmq_msg_move (3)
+.BR zmq_msg_init (3)
+.BR zmq_msg_init_size (3)
+.BR zmq_msg_init_data (3)
+.BR zmq_msg_close (3)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_msg_data.3 b/man/man3/zmq_msg_data.3
new file mode 100644
index 0000000..9876378
--- /dev/null
+++ b/man/man3/zmq_msg_data.3
@@ -0,0 +1,27 @@
+.TH zmq_msg_data 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_msg_data \- retrieves pointer to the message content
+.SH SYNOPSIS
+.B void *zmq_msg_data (zmq_msg_t *msg);
+.SH DESCRIPTION
+Returns pointer to message data. Always use this function to access the data,
+never use
+.IR zmq_msg_t
+members directly.
+.SH RETURN VALUE
+Pointer to the message data.
+.SH ERRORS
+No errors are defined.
+.SH EXAMPLE
+.nf
+zmq_msg_t msg;
+rc = zmq_msg_init_size (&msg, 100);
+memset (zmq_msg_data (&msg), 0, 100);
+.fi
+.SH SEE ALSO
+.BR zmq_msg_init (3)
+.BR zmq_msg_init_size (3)
+.BR zmq_msg_init_data (3)
+.BR zmq_msg_close (3)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_msg_init.3 b/man/man3/zmq_msg_init.3
new file mode 100644
index 0000000..a531fc1
--- /dev/null
+++ b/man/man3/zmq_msg_init.3
@@ -0,0 +1,33 @@
+.TH zmq_msg_init 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_msg_init \- initialises empty 0MQ message
+.SH SYNOPSIS
+.B int zmq_msg_init (zmq_msg_t *msg);
+.SH DESCRIPTION
+Initialises 0MQ message zero bytes long. The function is most useful
+to initialise a
+.IR zmq_msg_t
+structure before receiving a message.
+.SH RETURN VALUE
+In case of success the function returns zero. Otherwise it returns -1 and
+sets
+.IR errno
+to the appropriate value.
+.SH ERRORS
+No errors are defined.
+.SH EXAMPLE
+.nf
+zmq_msg_t msg;
+rc = zmq_msg_init (&msg);
+assert (rc == 0);
+rc = zmq_recv (s, &msg, 0);
+assert (rc == 0);
+.fi
+.SH SEE ALSO
+.BR zmq_msg_close (3)
+.BR zmq_msg_init_size (3)
+.BR zmq_msg_init_data (3)
+.BR zmq_msg_data (3)
+.BR zmq_msg_size (3)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_msg_init_data.3 b/man/man3/zmq_msg_init_data.3
new file mode 100644
index 0000000..a0b14c8
--- /dev/null
+++ b/man/man3/zmq_msg_init_data.3
@@ -0,0 +1,48 @@
+.TH zmq_msg_init_data 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_msg_init \- initialises 0MQ message from the given data
+.SH SYNOPSIS
+.nf
+.B typedef void (zmq_free_fn) (void *data);
+.B int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn);
+.fi
+.SH DESCRIPTION
+Initialise a message from a supplied buffer. Message isn't copied,
+instead 0MQ infrastructure takes ownership of the buffer located at address
+.IR data ,
+.IR size
+bytes long.
+Deallocation function (
+.IR ffn
+) will be called once the data are not needed anymore. Note that deallocation
+function prototype is designed so that it complies with standard C
+.IR free
+function. When using a static constant buffer,
+.IR ffn
+may be NULL to prevent subsequent deallocation.
+.SH RETURN VALUE
+In case of success the function returns zero. Otherwise it returns -1 and
+sets
+.IR errno
+to the appropriate value.
+.SH ERRORS
+No errors are defined.
+.SH EXAMPLE
+.nf
+void *data = malloc (6);
+assert (data);
+memcpy (data, "ABCDEF", 6);
+zmq_msg_t msg;
+rc = zmq_msg_init_data (&msg, data, 6, free);
+assert (rc == 0);
+rc = zmq_send (s, &msg, 0);
+assert (rc == 0);
+.fi
+.SH SEE ALSO
+.BR zmq_msg_close (3)
+.BR zmq_msg_init (3)
+.BR zmq_msg_init_size (3)
+.BR zmq_msg_data (3)
+.BR zmq_msg_size (3)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_msg_init_size.3 b/man/man3/zmq_msg_init_size.3
new file mode 100644
index 0000000..ce1ec94
--- /dev/null
+++ b/man/man3/zmq_msg_init_size.3
@@ -0,0 +1,44 @@
+.TH zmq_msg_init_size 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_msg_init \- initialises 0MQ message of a specified size
+.SH SYNOPSIS
+.B int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
+.SH DESCRIPTION
+Initialises 0MQ message
+.IR size
+bytes long. The implementation chooses whether it is more efficient to store
+message content on the stack (small messages) or on the heap (large messages).
+Therefore, never access message data directly via
+.IR zmq_msg_t
+members, rather use
+.IR zmq_msg_data
+and
+.IR zmq_msg_size
+functions to get message data and size. Note that the message data are not
+nullified to avoid the associated performance impact. Thus you
+should expect your message to contain bogus data after this call.
+.SH RETURN VALUE
+In case of success the function returns zero. Otherwise it returns -1 and
+sets
+.IR errno
+to the appropriate value.
+.SH ERRORS
+.IP "\fBENOMEM\fP"
+memory to hold the message cannot be allocated.
+.SH EXAMPLE
+.nf
+zmq_msg_t msg;
+rc = zmq_msg_init_size (&msg, 6);
+assert (rc == 0);
+memcpy (zmq_msg_data (&msg), "ABCDEF", 6);
+rc = zmq_send (s, &msg, 0);
+assert (rc == 0);
+.fi
+.SH SEE ALSO
+.BR zmq_msg_close (3)
+.BR zmq_msg_init (3)
+.BR zmq_msg_init_data (3)
+.BR zmq_msg_data (3)
+.BR zmq_msg_size (3)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_msg_move.3 b/man/man3/zmq_msg_move.3
new file mode 100644
index 0000000..810e105
--- /dev/null
+++ b/man/man3/zmq_msg_move.3
@@ -0,0 +1,38 @@
+.TH zmq_msg_move 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_msg_move \- moves content of a message to another message
+.SH SYNOPSIS
+.B int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
+.SH DESCRIPTION
+Move the content of the message from
+.IR src
+to
+.IR dest .
+The content isn't copied, just moved.
+.IR src
+becomes an empty message after the call. Original content of
+.IR dest
+message is deallocated.
+.SH RETURN VALUE
+In case of success the function returns zero. Otherwise it returns -1 and
+sets
+.IR errno
+to the appropriate value.
+.SH ERRORS
+No errors are defined.
+.SH EXAMPLE
+.nf
+zmq_msg_t dest;
+rc = zmq_msg_init (&dest);
+assert (rc == 0);
+rc = zmq_msg_move (&dest, &src);
+assert (rc == 0);
+.fi
+.SH SEE ALSO
+.BR zmq_msg_copy (3)
+.BR zmq_msg_init (3)
+.BR zmq_msg_init_size (3)
+.BR zmq_msg_init_data (3)
+.BR zmq_msg_close (3)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_msg_size.3 b/man/man3/zmq_msg_size.3
new file mode 100644
index 0000000..b51d582
--- /dev/null
+++ b/man/man3/zmq_msg_size.3
@@ -0,0 +1,30 @@
+.TH zmq_msg_size 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_msg_size \- retrieves size of the message content
+.SH SYNOPSIS
+.B size_t zmq_msg_size (zmq_msg_t *msg);
+.SH DESCRIPTION
+Returns size of the message data. Always use this function to get the size,
+never use
+.IR zmq_msg_t
+members directly.
+.SH RETURN VALUE
+Size of the message data (bytes).
+.SH ERRORS
+No errors are defined.
+.SH EXAMPLE
+.nf
+zmq_msg_t msg;
+rc = zmq_msg_init (&msg);
+assert (rc == 0);
+rc = zmq_recv (s, &msg, 0);
+assert (rc == 0);
+size_t msg_size = zmq_msg_size (&msg);
+.fi
+.SH SEE ALSO
+.BR zmq_msg_init (3)
+.BR zmq_msg_init_size (3)
+.BR zmq_msg_init_data (3)
+.BR zmq_msg_close (3)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_poll.3 b/man/man3/zmq_poll.3
new file mode 100644
index 0000000..5ce5055
--- /dev/null
+++ b/man/man3/zmq_poll.3
@@ -0,0 +1,65 @@
+.TH zmq_poll 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_poll \- polls for events on a set of 0MQ and POSIX sockets
+.SH SYNOPSIS
+.B int zmq_poll (zmq_pollitem_t *items, int nitems);
+.SH DESCRIPTION
+Waits for the events specified by
+.IR items
+parameter. Number of items in the array is determined by
+.IR nitems
+argument. Each item in the array looks like this:
+
+.nf
+typedef struct
+{
+ void *socket;
+ int fd;
+ short events;
+ short revents;
+} zmq_pollitem_t;
+.fi
+
+0MQ socket to poll on is specified by
+.IR socket .
+In case you want to poll on standard POSIX socket, set
+.IR socket
+to NULL and fill the POSIX file descriptor to
+.IR fd .
+.IR events
+specifies which events to wait for. It's a combination of the values below.
+Once the call exits,
+.IR revent
+will be filled with events that have actually occured on the socket. The field
+will contain a combination of the following values.
+
+.IP "\fBZMQ_POLLIN\fP"
+poll for incoming messages.
+.IP "\fBZMQ_POLLOUT\fP"
+wait while message can be set socket. Poll will return if a message of at least
+one byte can be written to the socket. However, there is no guarantee that
+arbitrarily large message can be sent.
+
+.SH RETURN VALUE
+Function returns number of items signaled or -1 in the case of error.
+.SH ERRORS
+.IP "\fBEFAULT\fP"
+there's a 0MQ socket in the pollset belonging to a different application thread.
+.IP "\fBENOTSUP\fP"
+0MQ context was initialised without ZMQ_POLL flag. I/O multiplexing is disabled.
+.SH EXAMPLE
+.nf
+zmq_pollitem_t items [2];
+items [0].socket = s;
+items [0].events = POLLIN;
+items [1].socket = NULL;
+items [1].fd = my_fd;
+items [1].events = POLLIN;
+
+int rc = zmq_poll (items, 2);
+assert (rc != -1);
+.fi
+.SH SEE ALSO
+.BR zmq_socket (3)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_recv.3 b/man/man3/zmq_recv.3
new file mode 100644
index 0000000..d3cf2fd
--- /dev/null
+++ b/man/man3/zmq_recv.3
@@ -0,0 +1,52 @@
+.TH zmq_recv 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_recv \- retrieves a message from the socket
+.SH SYNOPSIS
+.B int zmq_recv (void *s, zmq_msg_t *msg, int flags);
+.SH DESCRIPTION
+Receive a message from the socket
+.IR s ,
+store it in
+.IR msg .
+Any content previously in
+.IR msg
+will be properly deallocated.
+.IR flags
+argument can be combination of the flags described below.
+
+.IP "\fBZMQ_NOBLOCK\fP"
+The flag specifies that the operation should be performed in
+non-blocking mode. I.e. if it cannot be processed immediately,
+error should be returned with
+.IR errno
+set to EAGAIN.
+
+.SH RETURN VALUE
+In case of success the function returns zero. Otherwise it returns -1 and
+sets
+.IR errno
+to the appropriate value.
+.SH ERRORS
+.IP "\fBEAGAIN\fP"
+it's a non-blocking receive and there's no message available at the moment.
+.IP "\fBENOTSUP\fP"
+function isn't supported by particular socket type.
+.IP "\fBEFSM\fP"
+function cannot be called at the moment, because socket is not in the
+approprite state. This error may occur with sockets that switch between
+several states (e.g. ZMQ_REQ).
+.SH EXAMPLE
+.nf
+zmq_msg_t msg;
+int rc = zmq_msg_init (&msg);
+assert (rc == 0);
+rc = zmq_recv (s, &msg, 0);
+assert (rc == 0);
+.fi
+.SH SEE ALSO
+.BR zmq_send (3)
+.BR zmq_msg_init (3)
+.BR zmq_msg_data (3)
+.BR zmq_msg_size (3)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_send.3 b/man/man3/zmq_send.3
new file mode 100644
index 0000000..0ebbd0c
--- /dev/null
+++ b/man/man3/zmq_send.3
@@ -0,0 +1,64 @@
+.TH zmq_send 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_send \- sends a message
+.SH SYNOPSIS
+.B int zmq_send (void *s, zmq_msg_t *msg, int flags);
+.SH DESCRIPTION
+Send the message
+.IR msg
+to the socket
+.IR s .
+.IR flags
+argument can be combination the flags described below.
+
+.IP "\fBZMQ_NOBLOCK\fP"
+The flag specifies that the operation should be performed in
+non-blocking mode. I.e. if it cannot be processed immediately,
+error should be returned with
+.IR errno
+set to EAGAIN.
+
+.IP "\fBZMQ_NOFLUSH\fP"
+The flag specifies that
+.IR zmq_send
+should not flush the message downstream immediately. Instead, it should batch
+ZMQ_NOFLUSH messages and send them downstream only once
+.IR zmq_flush
+is invoked. This is an optimisation for cases where several messages are sent
+in a single business transaction. However, the effect is measurable only in
+extremely high-perf scenarios (million messages a second or so).
+If that's not your case, use standard flushing send instead.
+
+.SH RETURN VALUE
+In case of success the function returns zero. Otherwise it returns -1 and
+sets
+.IR errno
+to the appropriate value.
+.SH ERRORS
+.IP "\fBEAGAIN\fP"
+it's a non-blocking send and message cannot be sent at the moment.
+.IP "\fBENOTSUP\fP"
+function isn't supported by particular socket type.
+.IP "\fBEFSM\fP"
+function cannot be called at the moment, because socket is not in the
+approprite state. This error may occur with sockets that switch between
+several states (e.g. ZMQ_REQ).
+.SH EXAMPLE
+.nf
+zmq_msg_t msg;
+int rc = zmq_msg_init_size (&msg, 6);
+assert (rc == 0);
+memset (zmq_msg_data (&msg), 'A', 6);
+rc = zmq_send (s, &msg, 0);
+assert (rc == 0);
+.fi
+.SH SEE ALSO
+.BR zmq_flush (3)
+.BR zmq_recv (3)
+.BR zmq_msg_init (3)
+.BR zmq_msg_init_size (3)
+.BR zmq_msg_init_data (3)
+.BR zmq_msg_data (3)
+.BR zmq_msg_size (3)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_setsockopt.3 b/man/man3/zmq_setsockopt.3
new file mode 100644
index 0000000..a79f879
--- /dev/null
+++ b/man/man3/zmq_setsockopt.3
@@ -0,0 +1,115 @@
+.TH zmq_setsockopt 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_setsockopt \- sets a specified option on a 0MQ socket
+.SH SYNOPSIS
+.B int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen);
+.SH DESCRIPTION
+Sets an option on the socket.
+.IR option
+argument specifies the option from the list below.
+.IR optval
+is a pointer to the value to set,
+.IR optvallen
+is the size of the value in bytes.
+
+.IP "\fBZMQ_HWM\fP"
+High watermark for the message pipes associated with the socket. The water
+mark cannot be exceeded. If the messages don't fit into the pipe emergency
+mechanisms of the particular socket type are used (block, drop etc.) If HWM
+is set to zero, there are no limits for the content of the pipe.
+Type: int64_t Unit: bytes Default: 0
+
+.IP "\fBZMQ_LWM\fP"
+Low watermark makes sense only if high watermark is defined (i.e. is non-zero).
+When the emergency state is reached when messages overflow the pipe, the
+emergency lasts till the size of the pipe decreases to low watermark.
+At that point normal state is resumed.
+Type: int64_t Unit: bytes Default: 0
+
+.IP "\fBZMQ_SWAP\fP"
+Swap allows the pipe to exceed high watermark. However, the data are written
+to the disk rather than held in the memory. Until high watermark is
+exceeded there is no disk activity involved though. The value of the option
+defines maximal size of the swap file.
+Type: int64_t Unit: bytes Default: 0
+
+.IP "\fBZMQ_AFFINITY\fP"
+Affinity defines which threads in the thread pool will be used to handle
+newly created sockets. This way you can dedicate some of the threads (CPUs)
+to a specific work. Value of 0 means no affinity. Work is distributed
+fairly among the threads in the thread pool. For non-zero values, the lowest
+bit corresponds to the thread 1, second lowest bit to the thread 2 etc.
+Thus, value of 3 means that from now on newly created sockets will handle
+I/O activity exclusively using threads no. 1 and 2.
+Type: int64_t Unit: N/A (bitmap) Default: 0
+
+.IP "\fBZMQ_IDENTITY\fP"
+Identity of the socket. Identity is important when restarting applications.
+If the socket has no identity, each run of the application is completely
+separated from other runs. However, with identity application reconnects to
+existing infrastructure left by the previous run. Thus it may receive
+messages that were sent in the meantime, it shares pipe limits with the
+previous run etc.
+Type: string Unit: N/A Default: NULL
+
+.IP "\fBZMQ_SUBSCRIBE\fP"
+Applicable only to ZMQ_SUB socket type. It establishes new message filter.
+When ZMQ_SUB socket is created all the incoming messages are filtered out.
+This option allows you to subscribe for all messages ("*"), messages with
+specific topic ("x.y.z") and/or messages with specific topic prefix
+("x.y.*"). Topic is one-byte-size-prefixed string located at
+the very beginning of the message. Multiple filters can be attached to
+a single 'sub' socket. In that case message passes if it matches at least
+one of the filters.
+Type: string Unit: N/A Default: N/A
+
+.IP "\fBZMQ_UNSUBSCRIBE\fP"
+Applicable only to ZMQ_SUB socket type. Removes existing message filter.
+The filter specified must match the string passed to ZMQ_SUBSCRIBE options
+exactly. If there were several instances of the same filter created,
+this options removes only one of them, leaving the rest in place
+and functional.
+Type: string Unit: N/A Default: N/A
+
+.IP "\fBZMQ_RATE\fP"
+This option applies only to sending side of multicast transports (pgm & udp).
+It specifies maximal outgoing data rate that an individual sender socket
+can send.
+Type: uint64_t Unit: kilobits/second Default: 100
+
+.IP "\fBZMQ_RECOVERY_IVL\fP"
+This option applies only to multicast transports (pgm & udp). It specifies
+how long can the receiver socket survive when the sender is inaccessible.
+Keep in mind that large recovery intervals at high data rates result in
+very large recovery buffers, meaning that you can easily overload your box
+by setting say 1 minute recovery interval at 1Gb/s rate (requires
+7GB in-memory buffer).
+Type: uint64_t Unit: seconds Default: 10
+
+.IP "\fBZMQ_MCAST_LOOP\fP"
+This option applies only to multicast transports (pgm & udp). Value of 1
+means that the mutlicast packets can be received on the box they were sent
+from. Setting the value to 0 disables the loopback functionality which
+can have negative impact on the performance. If possible, disable
+the loopback in production environments.
+Type: uint64_t Unit: N/A (boolean value) Default: 1
+
+.SH RETURN VALUE
+In case of success the function returns zero. Otherwise it returns -1 and
+sets
+.IR errno
+to the appropriate value.
+.SH ERRORS
+.IP "\fBEINVAL\fP"
+unknown option, a value with incorrect length or invalid value.
+.SH EXAMPLE
+.nf
+int rc = zmq_setsockopt (s, ZMQ_SUBSCRIBE, "*", 1);
+assert (rc == 0);
+.fi
+.SH SEE ALSO
+.BR zmq_socket (3)
+.BR zmq (7)
+
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_socket.3 b/man/man3/zmq_socket.3
new file mode 100644
index 0000000..a73bba5
--- /dev/null
+++ b/man/man3/zmq_socket.3
@@ -0,0 +1,77 @@
+.TH zmq_socket 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_socket \- creates 0MQ socket
+.SH SYNOPSIS
+.B void *zmq_socket (void *context, int type);
+.SH DESCRIPTION
+Open a socket within the specified
+.IR context .
+To create a context, use
+.IR zmq_init
+function.
+.IR type
+argument can be one of the values defined below. Note that each socket is owned
+by exactly one thread (the one that it was created from) and should not be used
+from any other thread.
+
+.IP "\fBZMQ_P2P\fP"
+Socket to communicate with a single peer. Allows for only a single connect or a
+single bind. There's no message routing or message filtering involved.
+
+.IP "\fBZMQ_PUB\fP"
+Socket to distribute data. Recv fuction is not implemented for this socket
+type. Messages are distributed in fanout fashion to all the peers.
+
+.IP "\fBZMQ_SUB\fP"
+Socket to subscribe for data. Send function is not implemented for this
+socket type. Initially, socket is subscribed for no messages. Use ZMQ_SUBSCRIBE
+option to specify which messages to subscribe for.
+
+.IP "\fBZMQ_REQ\fP"
+Socket to send requests and receive replies. Requests are load-balanced among
+all the peers. This socket type allows only an alternated sequence of
+send's and recv's.
+
+.IP "\fBZMQ_REP\fP"
+Socket to receive requests and send replies. This socket type allows
+only an alternated sequence of recv's and send's. Each send is routed to
+the peer that issued the last received request.
+
+.IP "\fBZMQ_UPSTREAM\fP"
+Socket to receive messages from up the stream. Messages are fair-queued
+from among all the connected peers. Send function is not implemented for
+this socket type.
+
+.IP "\fBZMQ_DOWNSTREAM\fP"
+Socket to send messages down stream. Messages are load-balanced among all the
+connected peers. Send function is not implemented for this socket type.
+
+.SH RETURN VALUE
+Function returns socket handle is successful. Otherwise it returns NULL and
+sets errno to one of the values below.
+.SH ERRORS
+.IP "\fBEINVAL\fP"
+invalid socket type.
+.IP "\fBEMTHREAD\fP"
+the number of application threads allowed to own 0MQ sockets was exceeded. See
+.IR app_threads
+parameter to
+.IR zmq_init
+function.
+.SH EXAMPLE
+.nf
+void *s = zmq_socket (context, ZMQ_PUB);
+assert (s);
+int rc = zmq_bind (s, "tcp://192.168.0.1:5555");
+assert (rc == 0);
+.fi
+.SH SEE ALSO
+.BR zmq_init (3)
+.BR zmq_setsockopt (3)
+.BR zmq_bind (3)
+.BR zmq_connect (3)
+.BR zmq_send (3)
+.BR zmq_flush (3)
+.BR zmq_recv (3)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_strerror.3 b/man/man3/zmq_strerror.3
new file mode 100644
index 0000000..343c3ed
--- /dev/null
+++ b/man/man3/zmq_strerror.3
@@ -0,0 +1,27 @@
+.TH zmq_strerror 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_strerror \- returns string describing the error number
+.SH SYNOPSIS
+.B const char *zmq_strerror (int errnum);
+.SH DESCRIPTION
+As 0MQ defines few additional (non-POSIX) error codes, standard
+.IR strerror
+isn't capable of translating those errors into human readable strings. Instead,
+.IR zmq_strerror
+should be used.
+.SH RETURN VALUE
+Returns string describing the error number.
+.SH ERRORS
+No errors are defined.
+.SH EXAMPLE
+.nf
+void *ctx = zmq_init (1, 1, 0);
+if (!ctx) {
+ printf ("error occured during zmq_init: %s\\n", zmq_strerror (errno));
+ abort ();
+}
+.fi
+.SH SEE ALSO
+.BR zmq (7)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man3/zmq_term.3 b/man/man3/zmq_term.3
new file mode 100644
index 0000000..8d822b6
--- /dev/null
+++ b/man/man3/zmq_term.3
@@ -0,0 +1,25 @@
+.TH zmq_term 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+zmq_init \- terminates 0MQ context
+.SH SYNOPSIS
+.B int zmq_term (void *context);
+.SH DESCRIPTION
+Destroys 0MQ context. However, if there are still any sockets open within
+the context,
+.IR zmq_term
+succeeds but shutdown of the context is delayed till the last socket is closed.
+.SH RETURN VALUE
+Function returns zero is successful. Otherwise it returns -1 and
+sets errno to one of the values below.
+.SH ERRORS
+No errors are defined.
+.SH EXAMPLE
+.nf
+int rc = zmq_term (context);
+assert (rc == 0);
+.fi
+.SH SEE ALSO
+.BR zmq_init (3)
+.BR zmq_close (3)
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
diff --git a/man/man7/zmq.7 b/man/man7/zmq.7
new file mode 100644
index 0000000..02257a7
--- /dev/null
+++ b/man/man7/zmq.7
@@ -0,0 +1,9 @@
+.TH zmq 7 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
+.SH NAME
+0MQ \- a lightweight messaging kernel
+.SH SYNOPSIS
+.SH DESCRIPTION
+.SH "SEE ALSO"
+.SH AUTHOR
+Martin Sustrik <sustrik at fastmq dot com>
+
diff --git a/perf/c/local_thr.c b/perf/c/local_thr.c
index c97af11..f785e80 100644
--- a/perf/c/local_thr.c
+++ b/perf/c/local_thr.c
@@ -79,6 +79,9 @@ int main (int argc, char *argv [])
if (elapsed == 0)
elapsed = 1;
+ rc = zmq_msg_close (&msg);
+ assert (rc == 0);
+
throughput = (unsigned long)
((double) message_count / (double) elapsed * 1000000);
megabits = (double) (throughput * message_size * 8) / 1000000;
diff --git a/src/Makefile.am b/src/Makefile.am
index 91fb555..3d038b7 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -77,6 +77,7 @@ libzmq_la_SOURCES = app_thread.hpp \
decoder.hpp \
devpoll.hpp \
dispatcher.hpp \
+ downstream.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
@@ -117,6 +118,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
+ upstream.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
@@ -135,6 +137,7 @@ libzmq_la_SOURCES = app_thread.hpp \
app_thread.cpp \
devpoll.cpp \
dispatcher.cpp \
+ downstream.cpp \
epoll.cpp \
err.cpp \
fd_signaler.cpp \
@@ -162,6 +165,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
+ upstream.cpp \
uuid.cpp \
ypollset.cpp \
zmq.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index fbda335..a671822 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -40,11 +40,13 @@
#include "pipe.hpp"
#include "config.hpp"
#include "socket_base.hpp"
+#include "p2p.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
-#include "p2p.hpp"
+#include "upstream.hpp"
+#include "downstream.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@@ -158,6 +160,9 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
{
socket_base_t *s = NULL;
switch (type_) {
+ case ZMQ_P2P:
+ s = new p2p_t (this);
+ break;
case ZMQ_PUB:
s = new pub_t (this);
break;
@@ -170,8 +175,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
case ZMQ_REP:
s = new rep_t (this);
break;
- case ZMQ_P2P:
- s = new p2p_t (this);
+ case ZMQ_UPSTREAM:
+ s = new upstream_t (this);
+ break;
+ case ZMQ_DOWNSTREAM:
+ s = new downstream_t (this);
break;
default:
// TODO: This should be EINVAL.
diff --git a/src/command.hpp b/src/command.hpp
index 9a2e5d5..3099852 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -69,10 +69,12 @@ namespace zmq
} attach;
// Sent from session to socket to establish pipe(s) between them.
+ // If adjust_seqnum is true, caller have used inc_seqnum beforehand
+ // and thus the callee should take care of catching up.
struct {
- class owned_t *session;
class reader_t *in_pipe;
class writer_t *out_pipe;
+ bool adjust_seqnum;
} bind;
// Sent by pipe writer to inform dormant pipe reader that there
diff --git a/src/devpoll.cpp b/src/devpoll.cpp
index f28d55e..0ee772b 100644
--- a/src/devpoll.cpp
+++ b/src/devpoll.cpp
@@ -37,7 +37,8 @@
#include "config.hpp"
#include "i_poll_events.hpp"
-zmq::devpoll_t::devpoll_t ()
+zmq::devpoll_t::devpoll_t () :
+ stopping (false)
{
// Get limit on open files
struct rlimit rl;
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 1f6b4f0..1e41ee8 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -20,6 +20,7 @@
#include "../bindings/c/zmq.h"
#include "dispatcher.hpp"
+#include "socket_base.hpp"
#include "app_thread.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
@@ -202,3 +203,58 @@ void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_)
zmq_assert (erased == 1);
pipes_sync.unlock ();
}
+
+int zmq::dispatcher_t::register_endpoint (const char *addr_,
+ socket_base_t *socket_)
+{
+ endpoints_sync.lock ();
+
+ bool inserted = endpoints.insert (std::make_pair (addr_, socket_)).second;
+ if (!inserted) {
+ errno = EADDRINUSE;
+ endpoints_sync.unlock ();
+ return -1;
+ }
+
+ endpoints_sync.unlock ();
+ return 0;
+}
+
+void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_)
+{
+ endpoints_sync.lock ();
+
+ endpoints_t::iterator it = endpoints.begin ();
+ while (it != endpoints.end ()) {
+ if (it->second == socket_) {
+ endpoints_t::iterator to_erase = it;
+ it++;
+ endpoints.erase (to_erase);
+ continue;
+ }
+ it++;
+ }
+
+ endpoints_sync.unlock ();
+}
+
+zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_)
+{
+ endpoints_sync.lock ();
+
+ endpoints_t::iterator it = endpoints.find (addr_);
+ if (it == endpoints.end ()) {
+ endpoints_sync.unlock ();
+ errno = ECONNREFUSED;
+ return NULL;
+ }
+ socket_base_t *endpoint = it->second;
+
+ // Increment the command sequence number of the peer so that it won't
+ // get deallocated until "bind" command is issued by the caller.
+ endpoint->inc_seqnum ();
+
+ endpoints_sync.unlock ();
+ return endpoint;
+}
+
diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp
index 23b6a33..8364d4d 100644
--- a/src/dispatcher.hpp
+++ b/src/dispatcher.hpp
@@ -97,6 +97,11 @@ namespace zmq
void register_pipe (class pipe_t *pipe_);
void unregister_pipe (class pipe_t *pipe_);
+ // Management of inproc endpoints.
+ int register_endpoint (const char *addr_, class socket_base_t *socket_);
+ void unregister_endpoints (class socket_base_t *socket_);
+ class socket_base_t *find_endpoint (const char *addr_);
+
private:
~dispatcher_t ();
@@ -149,6 +154,13 @@ namespace zmq
// and 'terminated' flag).
mutex_t term_sync;
+ // List of inproc endpoints within this context.
+ typedef std::map <std::string, class socket_base_t*> endpoints_t;
+ endpoints_t endpoints;
+
+ // Synchronisation of access to the list of inproc endpoints.
+ mutex_t endpoints_sync;
+
dispatcher_t (const dispatcher_t&);
void operator = (const dispatcher_t&);
};
diff --git a/src/downstream.cpp b/src/downstream.cpp
new file mode 100644
index 0000000..4f994e6
--- /dev/null
+++ b/src/downstream.cpp
@@ -0,0 +1,131 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "downstream.hpp"
+#include "err.hpp"
+#include "pipe.hpp"
+
+zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
+ socket_base_t (parent_),
+ current (0)
+{
+ options.requires_in = false;
+ options.requires_out = true;
+}
+
+zmq::downstream_t::~downstream_t ()
+{
+}
+
+void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (!inpipe_ && outpipe_);
+ pipes.push_back (outpipe_);
+}
+
+void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ // There are no inpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ zmq_assert (pipe_);
+ pipes.erase (pipes.index (pipe_));
+}
+
+void zmq::downstream_t::xkill (class reader_t *pipe_)
+{
+ // There are no inpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+void zmq::downstream_t::xrevive (class reader_t *pipe_)
+{
+ // There are no inpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ // No special option for this socket type.
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_)
+{
+ // If there are no pipes we cannot send the message.
+ if (pipes.empty ()) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ // Move to the next pipe (load-balancing).
+ current++;
+ if (current >= pipes.size ())
+ current = 0;
+
+ // TODO: Implement this once queue limits are in-place.
+ zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_)));
+
+ // Push message to the selected pipe.
+ pipes [current]->write (msg_);
+ pipes [current]->flush ();
+
+ // Detach the message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return 0;
+}
+
+int zmq::downstream_t::xflush ()
+{
+ // TODO: Maybe there's a point in flushing messages downstream.
+ // It may be useful in the case where number of messages in a single
+ // transaction is much greater than the number of attached pipes.
+ errno = ENOTSUP;
+ return -1;
+
+}
+
+int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+bool zmq::downstream_t::xhas_in ()
+{
+ return false;
+}
+
+bool zmq::downstream_t::xhas_out ()
+{
+ // TODO: Modify this code once pipe limits are in place.
+ return true;
+}
+
+
diff --git a/src/downstream.hpp b/src/downstream.hpp
new file mode 100644
index 0000000..c6a7ed8
--- /dev/null
+++ b/src/downstream.hpp
@@ -0,0 +1,64 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__
+#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__
+
+#include "socket_base.hpp"
+#include "yarray.hpp"
+
+namespace zmq
+{
+
+ class downstream_t : public socket_base_t
+ {
+ public:
+
+ downstream_t (class app_thread_t *parent_);
+ ~downstream_t ();
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ // List of outbound pipes.
+ typedef yarray_t <class writer_t> pipes_t;
+ pipes_t pipes;
+
+ // Points to the last pipe that the most recent message was sent to.
+ pipes_t::size_type current;
+
+ downstream_t (const downstream_t&);
+ void operator = (const downstream_t&);
+ };
+
+}
+
+#endif
diff --git a/src/kqueue.cpp b/src/kqueue.cpp
index f32fa36..69ad0c8 100644
--- a/src/kqueue.cpp
+++ b/src/kqueue.cpp
@@ -33,7 +33,8 @@
#include "config.hpp"
#include "i_poll_events.hpp"
-zmq::kqueue_t::kqueue_t ()
+zmq::kqueue_t::kqueue_t () :
+ stopping (false)
{
// Create event queue
kqueue_fd = kqueue ();
diff --git a/src/object.cpp b/src/object.cpp
index 1433b7b..b5d5eee 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -83,8 +83,8 @@ void zmq::object_t::process_command (command_t &cmd_)
return;
case command_t::bind:
- process_bind (cmd_.args.bind.session,
- cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);
+ process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe,
+ cmd_.args.bind.adjust_seqnum);
return;
case command_t::pipe_term:
@@ -122,6 +122,21 @@ void zmq::object_t::unregister_pipe (class pipe_t *pipe_)
dispatcher->unregister_pipe (pipe_);
}
+int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_)
+{
+ return dispatcher->register_endpoint (addr_, socket_);
+}
+
+void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
+{
+ return dispatcher->unregister_endpoints (socket_);
+}
+
+zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_)
+{
+ return dispatcher->find_endpoint (addr_);
+}
+
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
{
return dispatcher->choose_io_thread (taskset_);
@@ -168,15 +183,15 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_)
send_command (cmd);
}
-void zmq::object_t::send_bind (object_t *destination_, owned_t *session_,
- reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::object_t::send_bind (object_t *destination_,
+ reader_t *in_pipe_, writer_t *out_pipe_, bool adjust_seqnum_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::bind;
- cmd.args.bind.session = session_;
cmd.args.bind.in_pipe = in_pipe_;
cmd.args.bind.out_pipe = out_pipe_;
+ cmd.args.bind.adjust_seqnum = adjust_seqnum_;
send_command (cmd);
}
@@ -250,8 +265,8 @@ void zmq::object_t::process_attach (i_engine *engine_)
zmq_assert (false);
}
-void zmq::object_t::process_bind (owned_t *session_,
- reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
+ bool adjust_seqnum_)
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index 1954071..4fd0a8e 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -49,6 +49,12 @@ namespace zmq
protected:
+ // Using following function, socket is able to access global
+ // repository of inproc endpoints.
+ int register_endpoint (const char *addr_, class socket_base_t *socket_);
+ void unregister_endpoints (class socket_base_t *socket_);
+ class socket_base_t *find_endpoint (const char *addr_);
+
// Derived object can use following functions to interact with
// global repositories. See dispatcher.hpp for function details.
int thread_slot_count ();
@@ -62,8 +68,8 @@ namespace zmq
class owned_t *object_);
void send_attach (class session_t *destination_,
struct i_engine *engine_);
- void send_bind (object_t *destination_, class owned_t *session_,
- class reader_t *in_pipe_, class writer_t *out_pipe_);
+ void send_bind (object_t *destination_, class reader_t *in_pipe_,
+ class writer_t *out_pipe_, bool adjust_seqnum_);
void send_revive (class object_t *destination_);
void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_);
@@ -78,8 +84,8 @@ namespace zmq
virtual void process_plug ();
virtual void process_own (class owned_t *object_);
virtual void process_attach (struct i_engine *engine_);
- virtual void process_bind (class owned_t *session_,
- class reader_t *in_pipe_, class writer_t *out_pipe_);
+ virtual void process_bind (class reader_t *in_pipe_,
+ class writer_t *out_pipe_, bool adjust_seqnum_);
virtual void process_revive ();
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
diff --git a/src/p2p.hpp b/src/p2p.hpp
index 1fd7e34..32d7755 100644
--- a/src/p2p.hpp
+++ b/src/p2p.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_P2P_INCLUDED__
-#define __ZMQ_P2P_INCLUDED__
+#ifndef __ZMQ_P2P_HPP_INCLUDED__
+#define __ZMQ_P2P_HPP_INCLUDED__
#include "socket_base.hpp"
diff --git a/src/pipe.cpp b/src/pipe.cpp
index e444520..0e15dce 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -81,7 +81,11 @@ void zmq::reader_t::term ()
void zmq::reader_t::process_revive ()
{
- endpoint->revive (this);
+ // Beacuse of command throttling mechanism, incoming termination request
+ // may not have been processed before subsequent send.
+ // In that case endpoint is NULL.
+ if (endpoint)
+ endpoint->revive (this);
}
void zmq::reader_t::process_pipe_term_ack ()
diff --git a/src/pub.hpp b/src/pub.hpp
index b3e868d..9dbcb4a 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_PUB_INCLUDED__
-#define __ZMQ_PUB_INCLUDED__
+#ifndef __ZMQ_PUB_HPP_INCLUDED__
+#define __ZMQ_PUB_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
diff --git a/src/rep.cpp b/src/rep.cpp
index e8a9e39..f06f4ab 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -71,7 +71,7 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
}
// Now both inpipe and outpipe are detached. Remove them from the lists.
- if (in_pipes.index (pipe_) < active)
+ if (index < active)
active--;
in_pipes.erase (index);
out_pipes.erase (index);
@@ -178,14 +178,15 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
// Round-robin over the pipes to get next message.
for (int count = active; count != 0; count--) {
bool fetched = in_pipes [current]->read (msg_);
- current++;
- if (current >= active)
- current = 0;
if (fetched) {
reply_pipe = out_pipes [current];
waiting_for_reply = true;
- return 0;
}
+ current++;
+ if (current >= active)
+ current = 0;
+ if (fetched)
+ return 0;
}
// No message is available. Initialise the output parameter
diff --git a/src/rep.hpp b/src/rep.hpp
index 3e87dc1..0b327aa 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_REP_INCLUDED__
-#define __ZMQ_REP_INCLUDED__
+#ifndef __ZMQ_REP_HPP_INCLUDED__
+#define __ZMQ_REP_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
diff --git a/src/req.hpp b/src/req.hpp
index 86554b5..756cc42 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_REQ_INCLUDED__
-#define __ZMQ_REQ_INCLUDED__
+#ifndef __ZMQ_REQ_HPP_INCLUDED__
+#define __ZMQ_REQ_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
diff --git a/src/session.cpp b/src/session.cpp
index eb0a963..87b47b0 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -51,10 +51,6 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
bool zmq::session_t::write (::zmq_msg_t *msg_)
{
- // The communication is unidirectional.
- // We don't expect any message to arrive.
- zmq_assert (out_pipe);
-
if (out_pipe->write (msg_)) {
zmq_msg_init (msg_);
return true;
@@ -155,8 +151,10 @@ void zmq::session_t::process_plug ()
out_pipe->set_endpoint (this);
}
- send_bind (owner, this, outbound ? &outbound->reader : NULL,
- inbound ? &inbound->writer : NULL);
+ // Note that initial call to inc_seqnum was optimised out. Last
+ // parameter conveys the fact to the callee.
+ send_bind (owner, outbound ? &outbound->reader : NULL,
+ inbound ? &inbound->writer : NULL, false);
}
owned_t::process_plug ();
diff --git a/src/simple_semaphore.hpp b/src/simple_semaphore.hpp
index 209ccb4..3342281 100644
--- a/src/simple_semaphore.hpp
+++ b/src/simple_semaphore.hpp
@@ -23,7 +23,11 @@
#include "platform.hpp"
#include "err.hpp"
-#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
+#if 0 //defined ZMQ_HAVE_LINUX
+#include <sys/syscall.h>
+#include <unistd.h>
+#include <linux/futex.h>
+#elif defined ZMQ_HAVE_LINUX ||defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
#include <pthread.h>
#elif defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
@@ -33,13 +37,63 @@
namespace zmq
{
-
// Simple semaphore. Only single thread may be waiting at any given time.
// Also, the semaphore may not be posted before the previous post
// was matched by corresponding wait and the waiting thread was
// released.
-#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
+#if 0 //defined ZMQ_HAVE_LINUX
+
+ // In theory, using private futexes should be more efficient on Linux
+ // platform than using mutexes. However, in uncontended cases of TCP
+ // transport on loopback interface we haven't seen any latency improvement.
+ // The code is commented out waiting for more thorough testing.
+
+ class simple_semaphore_t
+ {
+ public:
+
+ // Initialise the semaphore.
+ inline simple_semaphore_t () :
+ dummy (0)
+ {
+ }
+
+ // Destroy the semaphore.
+ inline ~simple_semaphore_t ()
+ {
+ }
+
+ // Wait for the semaphore.
+ inline void wait ()
+ {
+ int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAIT_PRIVATE,
+ (int) 0, NULL, NULL, (int) 0);
+ zmq_assert (rc == 0);
+ }
+
+ // Post the semaphore.
+ inline void post ()
+ {
+ while (true) {
+ int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAKE_PRIVATE,
+ (int) 1, NULL, NULL, (int) 0);
+ zmq_assert (rc != -1 && rc <= 1);
+ if (rc == 1)
+ break;
+ }
+ }
+
+ private:
+
+ int dummy;
+
+ // Disable copying of the object.
+ simple_semaphore_t (const simple_semaphore_t&);
+ void operator = (const simple_semaphore_t&);
+ };
+
+#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
// On platforms that allow for double locking of a mutex from the same
// thread, simple semaphore is implemented using mutex, as it is more
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 6583608..a614759 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -43,7 +43,9 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
pending_term_acks (0),
ticks (0),
app_thread (parent_),
- shutting_down (false)
+ shutting_down (false),
+ sent_seqnum (0),
+ processed_seqnum (0)
{
}
@@ -81,6 +83,9 @@ int zmq::socket_base_t::bind (const char *addr_)
addr_type = addr.substr (0, pos);
addr_args = addr.substr (pos + 3);
+ if (addr_type == "inproc")
+ return register_endpoint (addr_args.c_str (), this);
+
if (addr_type == "tcp") {
zmq_listener_t *listener = new zmq_listener_t (
choose_io_thread (options.affinity), this, options);
@@ -126,6 +131,41 @@ int zmq::socket_base_t::connect (const char *addr_)
addr_type = addr.substr (0, pos);
addr_args = addr.substr (pos + 3);
+ if (addr_type == "inproc") {
+
+ // Find the peer socket.
+ socket_base_t *peer = find_endpoint (addr_args.c_str ());
+ if (!peer)
+ return -1;
+
+ pipe_t *in_pipe = NULL;
+ pipe_t *out_pipe = NULL;
+
+ // Create inbound pipe, if required.
+ if (options.requires_in) {
+ in_pipe = new pipe_t (this, peer, options.hwm, options.lwm);
+ zmq_assert (in_pipe);
+ }
+
+ // Create outbound pipe, if required.
+ if (options.requires_out) {
+ out_pipe = new pipe_t (peer, this, options.hwm, options.lwm);
+ zmq_assert (out_pipe);
+ }
+
+ // Attach the pipes to this socket object.
+ attach_pipes (in_pipe ? &in_pipe->reader : NULL,
+ out_pipe ? &out_pipe->writer : NULL);
+
+ // Attach the pipes to the peer socket. Note that peer's seqnum
+ // was incremented in find_endpoint function. The callee is notified
+ // about the fact via the last parameter.
+ send_bind (peer, out_pipe ? &out_pipe->reader : NULL,
+ in_pipe ? &in_pipe->writer : NULL, true);
+
+ return 0;
+ }
+
// Create the session.
io_thread_t *io_thread = choose_io_thread (options.affinity);
session_t *session = new session_t (io_thread, this, session_name.c_str (),
@@ -319,13 +359,24 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::close ()
{
+ shutting_down = true;
+
+ // Let the thread know that the socket is no longer available.
app_thread->remove_socket (this);
// Pointer to the dispatcher must be retrieved before the socket is
// deallocated. Afterwards it is not available.
dispatcher_t *dispatcher = get_dispatcher ();
- shutting_down = true;
+ // Unregister all inproc endpoints associated with this socket.
+ // From this point we are sure that inc_seqnum won't be called again
+ // on this object.
+ dispatcher->unregister_endpoints (this);
+
+ // Wait till all undelivered commands are delivered. This should happen
+ // very quickly. There's no way to wait here for extensive period of time.
+ while (processed_seqnum != sent_seqnum.get ())
+ app_thread->process_commands (true, false);
while (true) {
@@ -364,6 +415,12 @@ int zmq::socket_base_t::close ()
return 0;
}
+void zmq::socket_base_t::inc_seqnum ()
+{
+ // NB: This function may be called from a different thread!
+ sent_seqnum.add (1);
+}
+
zmq::app_thread_t *zmq::socket_base_t::get_thread ()
{
return app_thread;
@@ -452,9 +509,16 @@ void zmq::socket_base_t::process_own (owned_t *object_)
io_objects.insert (object_);
}
-void zmq::socket_base_t::process_bind (owned_t *session_,
- reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
+ bool adjust_seqnum_)
{
+ // In case of inproc transport, the seqnum should catch up here.
+ // For other transports the seqnum modification can be optimised out
+ // because final handshaking between the socket and the session ensures
+ // that no 'bind' command will be left unprocessed.
+ if (adjust_seqnum_)
+ processed_seqnum++;
+
attach_pipes (in_pipe_, out_pipe_);
}
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 49ff5a5..dd7b526 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -33,6 +33,7 @@
#include "mutex.hpp"
#include "options.hpp"
#include "stdint.hpp"
+#include "atomic_counter.hpp"
namespace zmq
{
@@ -54,6 +55,11 @@ namespace zmq
int recv (zmq_msg_t *msg_, int flags_);
int close ();
+ // When another owned object wants to send command to this object
+ // it calls this function to let it know it should not shut down
+ // before the command is delivered.
+ void inc_seqnum ();
+
// This function is used by the polling mechanism to determine
// whether the socket belongs to the application thread the poll
// is called from.
@@ -108,8 +114,8 @@ namespace zmq
// Handlers for incoming commands.
void process_own (class owned_t *object_);
- void process_bind (class owned_t *session_,
- class reader_t *in_pipe_, class writer_t *out_pipe_);
+ void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_,
+ bool adjust_seqnum_);
void process_term_req (class owned_t *object_);
void process_term_ack ();
@@ -132,6 +138,12 @@ namespace zmq
// started.
bool shutting_down;
+ // Sequence number of the last command sent to this object.
+ atomic_counter_t sent_seqnum;
+
+ // Sequence number of the last command processed by this object.
+ uint64_t processed_seqnum;
+
// List of existing sessions. This list is never referenced from within
// the socket, instead it is used by I/O objects owned by the session.
// As those objects can live in different threads, the access is
diff --git a/src/sub.hpp b/src/sub.hpp
index fb881dc..8ad8a18 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_SUB_INCLUDED__
-#define __ZMQ_SUB_INCLUDED__
+#ifndef __ZMQ_SUB_HPP_INCLUDED__
+#define __ZMQ_SUB_HPP_INCLUDED__
#include <set>
#include <string>
diff --git a/src/upstream.cpp b/src/upstream.cpp
new file mode 100644
index 0000000..da202f8
--- /dev/null
+++ b/src/upstream.cpp
@@ -0,0 +1,143 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "upstream.hpp"
+#include "err.hpp"
+#include "pipe.hpp"
+
+zmq::upstream_t::upstream_t (class app_thread_t *parent_) :
+ socket_base_t (parent_),
+ active (0),
+ current (0)
+{
+ options.requires_in = true;
+ options.requires_out = false;
+}
+
+zmq::upstream_t::~upstream_t ()
+{
+}
+
+void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (inpipe_ && !outpipe_);
+
+ pipes.push_back (inpipe_);
+ pipes.swap (active, pipes.size () - 1);
+ active++;
+}
+
+void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ // Remove the pipe from the list; adjust number of active pipes
+ // accordingly.
+ zmq_assert (pipe_);
+ pipes_t::size_type index = pipes.index (pipe_);
+ if (index < active)
+ active--;
+ pipes.erase (index);
+}
+
+void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ // There are no outpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+void zmq::upstream_t::xkill (class reader_t *pipe_)
+{
+ // Move the pipe to the list of inactive pipes.
+ active--;
+ pipes.swap (pipes.index (pipe_), active);
+}
+
+void zmq::upstream_t::xrevive (class reader_t *pipe_)
+{
+ // Move the pipe to the list of active pipes.
+ pipes.swap (pipes.index (pipe_), active);
+ active++;
+}
+
+int zmq::upstream_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ // No special options for this socket type.
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+int zmq::upstream_t::xflush ()
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_)
+{
+ // Deallocate old content of the message.
+ zmq_msg_close (msg_);
+
+ // Round-robin over the pipes to get next message.
+ for (int count = active; count != 0; count--) {
+ bool fetched = pipes [current]->read (msg_);
+ current++;
+ if (current >= active)
+ current = 0;
+ if (fetched)
+ return 0;
+ }
+
+ // No message is available. Initialise the output parameter
+ // to be a 0-byte message.
+ zmq_msg_init (msg_);
+ errno = EAGAIN;
+ return -1;
+}
+
+bool zmq::upstream_t::xhas_in ()
+{
+ // Note that messing with current doesn't break the fairness of fair
+ // queueing algorithm. If there are no messages available current will
+ // get back to its original value. Otherwise it'll point to the first
+ // pipe holding messages, skipping only pipes with no messages available.
+ for (int count = active; count != 0; count--) {
+ if (pipes [current]->check_read ())
+ return true;
+ current++;
+ if (current >= active)
+ current = 0;
+ }
+
+ return false;
+}
+
+bool zmq::upstream_t::xhas_out ()
+{
+ return false;
+}
+
diff --git a/src/upstream.hpp b/src/upstream.hpp
new file mode 100644
index 0000000..0e2f5ad
--- /dev/null
+++ b/src/upstream.hpp
@@ -0,0 +1,69 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__
+#define __ZMQ_UPSTREAM_HPP_INCLUDED__
+
+#include "socket_base.hpp"
+#include "yarray.hpp"
+
+namespace zmq
+{
+
+ class upstream_t : public socket_base_t
+ {
+ public:
+
+ upstream_t (class app_thread_t *parent_);
+ ~upstream_t ();
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ // Inbound pipes.
+ typedef yarray_t <class reader_t> pipes_t;
+ pipes_t pipes;
+
+ // Number of active pipes. All the active pipes are located at the
+ // beginning of the pipes array.
+ pipes_t::size_type active;
+
+ // Index of the next bound pipe to read a message from.
+ pipes_t::size_type current;
+
+ upstream_t (const upstream_t&);
+ void operator = (const upstream_t&);
+
+ };
+
+}
+
+#endif
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 7952b61..9b66be8 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -198,8 +198,10 @@ size_t zmq_msg_size (zmq_msg_t *msg_)
void *zmq_init (int app_threads_, int io_threads_, int flags_)
{
- // There should be at least a single thread managed by the dispatcher.
- if (app_threads_ <= 0 || io_threads_ <= 0 ||
+ // There should be at least a single application thread managed
+ // by the dispatcher. There's no need for I/O threads if 0MQ is used
+ // only for inproc messaging
+ if (app_threads_ < 1 || io_threads_ < 0 ||
app_threads_ > 63 || io_threads_ > 63) {
errno = EINVAL;
return NULL;
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
index 53811a1..8040f21 100644
--- a/src/zmq_decoder.cpp
+++ b/src/zmq_decoder.cpp
@@ -51,6 +51,9 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
else {
// TODO: Handle over-sized message decently.
+ // in_progress is initialised at this point so in theory we should
+ // close it before calling zmq_msg_init_size, however, it's a 0-byte
+ // message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, *tmpbuf);
errno_assert (rc == 0);
@@ -67,6 +70,9 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
size_t size = (size_t) get_uint64 (tmpbuf);
// TODO: Handle over-sized message decently.
+ // in_progress is initialised at this point so in theory we should
+ // close it before calling zmq_msg_init_size, however, it's a 0-byte
+ // message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, size);
errno_assert (rc == 0);
@@ -78,7 +84,7 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
bool zmq::zmq_decoder_t::message_ready ()
{
// Message is completely read. Push it further and start reading
- // new message.
+ // new message. (in_progress is a 0-byte message after this point.)
if (!destination || !destination->write (&in_progress))
return false;
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
index 44b919b..180bda7 100644
--- a/src/zmq_encoder.cpp
+++ b/src/zmq_encoder.cpp
@@ -50,12 +50,17 @@ bool zmq::zmq_encoder_t::size_ready ()
bool zmq::zmq_encoder_t::message_ready ()
{
+ // Destroy content of the old message.
+ zmq_msg_close(&in_progress);
+
// Read new message from the dispatcher. If there is none, return false.
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
- if (!source || !source->read (&in_progress))
+ if (!source || !source->read (&in_progress)) {
+ zmq_msg_init (&in_progress);
return false;
+ }
size_t size = zmq_msg_size (&in_progress);
diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp
index eec41c7..0d9488d 100644
--- a/src/zmq_listener_init.cpp
+++ b/src/zmq_listener_init.cpp
@@ -55,7 +55,6 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
has_peer_identity = true;
peer_identity.assign ((const char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
-
return true;
}