diff options
65 files changed, 1941 insertions, 64 deletions
@@ -9,6 +9,7 @@ Dirk O. Kaar Erich Heine Frank Denis George Neill +Jon Dyte Martin Hurton Martin Lucina Martin Sustrik @@ -36,3 +37,4 @@ Matt Muggeridge Paulo Henrique Silva Peter Lemenkov Robert Zhang +Vitaly Mayatskikh diff --git a/Makefile.am b/Makefile.am index fe0f1f0..af6ba2e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -2,8 +2,12 @@ if BUILD_PERF DIR_PERF = perf endif -SUBDIRS = src $(DIR_PERF) devices bindings -DIST_SUBDIRS = src perf devices bindings +if INSTALL_MAN +DIR_MAN = man +endif + +SUBDIRS = src $(DIR_MAN) $(DIR_PERF) devices bindings +DIST_SUBDIRS = src man perf devices bindings EXTRA_DIST = $(top_srcdir)/foreign/openpgm/@pgm1_basename@.tar.bz2 \ $(top_srcdir)/foreign/openpgm/@pgm2_basename@ \ diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h index 40750ec..ff94dd4 100644 --- a/bindings/c/zmq.h +++ b/bindings/c/zmq.h @@ -63,6 +63,9 @@ extern "C" { #ifndef EADDRNOTAVAIL #define EADDRNOTAVAIL (ZMQ_HAUSNUMERO + 6) #endif +#ifndef ECONNREFUSED +#define ECONNREFUSED (ZMQ_HAUSNUMERO + 7) +#endif // Native 0MQ error codes. #define EMTHREAD (ZMQ_HAUSNUMERO + 50) @@ -180,7 +183,7 @@ ZMQ_EXPORT int zmq_term (void *context); // Socket to send requests and receive replies. Requests are // load-balanced among all the peers. This socket type allows -// only an alternated sequence of send's and recv's +// only an alternated sequence of send's and recv's. #define ZMQ_REQ 3 // Socket to receive requests and send replies. This socket type allows @@ -188,6 +191,12 @@ ZMQ_EXPORT int zmq_term (void *context); // the peer that issued the last received request. #define ZMQ_REP 4 +// Socket to receive messages from up the stream. +#define ZMQ_UPSTREAM 5 + +// Socket to send messages downstream. +#define ZMQ_DOWNSTREAM 6 + // Open a socket. 'type' is one of the socket types defined above. // // Errors: EINVAL - invalid socket type. diff --git a/bindings/java/org/zmq/Socket.java b/bindings/java/org/zmq/Socket.java index 501bc16..396a6a0 100644 --- a/bindings/java/org/zmq/Socket.java +++ b/bindings/java/org/zmq/Socket.java @@ -34,6 +34,8 @@ public class Socket public static final int SUB = 2; public static final int REQ = 3; public static final int REP = 4; + public static final int UPSTREAM = 4; + public static final int DOWNSTREAM = 4; public static final int HWM = 1; public static final int LWM = 2; diff --git a/bindings/python/pyzmq.cpp b/bindings/python/pyzmq.cpp index b180bcd..26ca7ac 100644 --- a/bindings/python/pyzmq.cpp +++ b/bindings/python/pyzmq.cpp @@ -498,6 +498,12 @@ PyMODINIT_FUNC initlibpyzmq () t = PyInt_FromLong (ZMQ_REP); PyDict_SetItemString (dict, "REP", t); Py_DECREF (t); + t = PyInt_FromLong (ZMQ_UPSTREAM); + PyDict_SetItemString (dict, "UPSTREAM", t); + Py_DECREF (t); + t = PyInt_FromLong (ZMQ_DOWNSTREAM); + PyDict_SetItemString (dict, "DOWNSTREAM", t); + Py_DECREF (t); t = PyInt_FromLong (ZMQ_HWM); PyDict_SetItemString (dict, "HWM", t); Py_DECREF (t); diff --git a/bindings/ruby/rbzmq.cpp b/bindings/ruby/rbzmq.cpp index 6112972..2a26ce1 100644 --- a/bindings/ruby/rbzmq.cpp +++ b/bindings/ruby/rbzmq.cpp @@ -275,6 +275,8 @@ extern "C" void Init_librbzmq () rb_define_global_const ("PUB", INT2NUM (ZMQ_PUB)); rb_define_global_const ("REQ", INT2NUM (ZMQ_REQ)); rb_define_global_const ("REP", INT2NUM (ZMQ_REP)); + rb_define_global_const ("UPSTREAM", INT2NUM (ZMQ_UPSTREAM)); + rb_define_global_const ("DOWNSTREAM", INT2NUM (ZMQ_DOWNSTREAM)); rb_define_global_const ("POLL", INT2NUM (ZMQ_POLL)); } diff --git a/builds/msvc/libzmq/libzmq.vcproj b/builds/msvc/libzmq/libzmq.vcproj index 8927fa2..e575d67 100644 --- a/builds/msvc/libzmq/libzmq.vcproj +++ b/builds/msvc/libzmq/libzmq.vcproj @@ -182,6 +182,10 @@ > </File> <File + RelativePath="..\..\..\src\downstream.cpp" + > + </File> + <File RelativePath="..\..\..\src\epoll.cpp" > </File> @@ -290,6 +294,10 @@ > </File> <File + RelativePath="..\..\..\src\upstream.cpp" + > + </File> + <File RelativePath="..\..\..\src\uuid.cpp" > </File> @@ -376,6 +384,10 @@ > </File> <File + RelativePath="..\..\..\src\downstream.hpp" + > + </File> + <File RelativePath="..\..\..\src\epoll.hpp" > </File> @@ -532,6 +544,10 @@ > </File> <File + RelativePath="..\..\..\src\upstream.hpp" + > + </File> + <File RelativePath="..\..\..\src\uuid.hpp" > </File> diff --git a/configure.in b/configure.in index 1b1db35..bd3a0f4 100644 --- a/configure.in +++ b/configure.in @@ -49,6 +49,10 @@ on_mingw32="no" # Host speciffic checks AC_CANONICAL_HOST +# Whether or not install manual pages. +# Note that on MinGW manpages are not installed. +install_man="yes" + case "${host_os}" in *linux*) AC_DEFINE(ZMQ_HAVE_LINUX, 1, [Have Linux OS]) @@ -134,6 +138,7 @@ case "${host_os}" in [AC_MSG_ERROR([Could not link with Iphlpapi.dll.])]) CFLAGS="${CFLAGS} -std=c99" on_mingw32="yes" + install_man="no" ;; *) AC_MSG_ERROR([Not supported os: $host.]) @@ -585,6 +590,14 @@ if test "x$with_forwarder" != "xno"; then forwarder="yes" fi +# streamer device +streamer="no" +AC_ARG_WITH([streamer], [AS_HELP_STRING([--with-streamer], + [build streamer device [default=no]])], [with_streamer=yes], [with_streamer=no]) + +if test "x$with_streamer" != "xno"; then + streamer="yes" +fi # Perf perf="no" @@ -613,10 +626,12 @@ AM_CONDITIONAL(BUILD_CPP, test "x$cppzmq" = "xyes") AM_CONDITIONAL(BUILD_PGM1, test "x$pgm1_ext" = "xyes") AM_CONDITIONAL(BUILD_PGM2, test "x$pgm2_ext" = "xyes") AM_CONDITIONAL(BUILD_NO_PGM, test "x$pgm2_ext" = "xno" -a "x$pgm1_ext" = "xno") -AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes") +AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes") +AM_CONDITIONAL(BUILD_STREAMER, test "x$streamer" = "xyes") AM_CONDITIONAL(BUILD_PERF, test "x$perf" = "xyes") AM_CONDITIONAL(ON_MINGW, test "x$on_mingw32" = "xyes") AM_CONDITIONAL(BUILD_PGM2_EXAMPLES, test "x$with_pgm2_ext" = "xyes") +AM_CONDITIONAL(INSTALL_MAN, test "x$install_man" = "xyes") AC_SUBST(stdint) AC_SUBST(inttypes) @@ -631,11 +646,12 @@ AC_FUNC_MALLOC AC_TYPE_SIGNAL AC_CHECK_FUNCS(perror gettimeofday memset socket getifaddrs freeifaddrs) -AC_OUTPUT(Makefile src/Makefile bindings/python/Makefile \ +AC_OUTPUT(Makefile src/Makefile man/Makefile bindings/python/Makefile \ bindings/python/setup.py bindings/ruby/Makefile \ bindings/java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \ perf/python/Makefile perf/ruby/Makefile perf/java/Makefile src/libzmq.pc \ - devices/Makefile devices/zmq_forwarder/Makefile bindings/Makefile) + devices/Makefile devices/zmq_forwarder/Makefile \ + devices/zmq_streamer/Makefile bindings/Makefile) AC_MSG_RESULT([]) AC_MSG_RESULT([ ******************************************************** ]) @@ -670,6 +686,7 @@ AC_MSG_RESULT([ PGM: no]) fi AC_MSG_RESULT([ Devices:]) AC_MSG_RESULT([ forwarder: $forwarder]) +AC_MSG_RESULT([ streamer: $streamer]) AC_MSG_RESULT([ Performance tests: $perf]) AC_MSG_RESULT([]) AC_MSG_RESULT([ ******************************************************** ]) diff --git a/devices/Makefile.am b/devices/Makefile.am index 4cbad14..ab18976 100644 --- a/devices/Makefile.am +++ b/devices/Makefile.am @@ -2,5 +2,9 @@ if BUILD_FORWARDER FORWARDER_DIR = zmq_forwarder endif -SUBDIRS = $(FORWARDER_DIR) -DIST_SUBDIRS = zmq_forwarder +if BUILD_STREAMER +STREAMER_DIR = zmq_streamer +endif + +SUBDIRS = $(FORWARDER_DIR) $(STREAMER_DIR) +DIST_SUBDIRS = zmq_forwarder zmq_streamer diff --git a/devices/zmq_forwarder/zmq_forwarder.cpp b/devices/zmq_forwarder/zmq_forwarder.cpp index 32af5dd..d29ed62 100644 --- a/devices/zmq_forwarder/zmq_forwarder.cpp +++ b/devices/zmq_forwarder/zmq_forwarder.cpp @@ -23,7 +23,7 @@ int main (int argc, char *argv []) { if (argc != 2) { - fprintf (stderr, "usage: forwarder <config-file>\n"); + fprintf (stderr, "usage: zmq_forwarder <config-file>\n"); return 1; } @@ -53,8 +53,9 @@ int main (int argc, char *argv []) // TODO: make the number of I/O threads configurable. zmq::context_t ctx (1, 1); - zmq::socket_t in_socket (ctx, ZMQ_P2P); - zmq::socket_t out_socket (ctx, ZMQ_P2P); + zmq::socket_t in_socket (ctx, ZMQ_SUB); + in_socket.setsockopt (ZMQ_SUBSCRIBE, "*", 1); + zmq::socket_t out_socket (ctx, ZMQ_PUB); int n = 0; while (true) { diff --git a/devices/zmq_streamer/Makefile.am b/devices/zmq_streamer/Makefile.am new file mode 100644 index 0000000..e3681bf --- /dev/null +++ b/devices/zmq_streamer/Makefile.am @@ -0,0 +1,9 @@ +INCLUDES = -I$(top_builddir)/bindings/c + +bin_PROGRAMS = zmq_streamer + +zmq_streamer_LDADD = $(top_builddir)/src/libzmq.la +zmq_streamer_SOURCES = zmq_streamer.cpp +zmq_streamer_CXXFLAGS = -Wall -pedantic -Werror + + diff --git a/devices/zmq_streamer/zmq_streamer.cpp b/devices/zmq_streamer/zmq_streamer.cpp new file mode 100644 index 0000000..84e6569 --- /dev/null +++ b/devices/zmq_streamer/zmq_streamer.cpp @@ -0,0 +1,122 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../../bindings/cpp/zmq.hpp" +#include "../../foreign/xmlParser/xmlParser.cpp" + +int main (int argc, char *argv []) +{ + if (argc != 2) { + fprintf (stderr, "usage: zmq_streamer <config-file>\n"); + return 1; + } + + XMLNode root = XMLNode::parseFile (argv [1]); + if (root.isEmpty ()) { + fprintf (stderr, "configuration file not found\n"); + return 1; + } + + if (strcmp (root.getName (), "streamer") != 0) { + fprintf (stderr, "root element in the configuration file should be " + "named 'streamer'\n"); + return 1; + } + + XMLNode in_node = root.getChildNode ("in"); + if (in_node.isEmpty ()) { + fprintf (stderr, "'in' node is missing in the configuration file\n"); + return 1; + } + + XMLNode out_node = root.getChildNode ("out"); + if (out_node.isEmpty ()) { + fprintf (stderr, "'out' node is missing in the configuration file\n"); + return 1; + } + + // TODO: make the number of I/O threads configurable. + zmq::context_t ctx (1, 1); + zmq::socket_t in_socket (ctx, ZMQ_UPSTREAM); + zmq::socket_t out_socket (ctx, ZMQ_DOWNSTREAM); + + int n = 0; + while (true) { + XMLNode bind = in_node.getChildNode ("bind", n); + if (bind.isEmpty ()) + break; + const char *addr = bind.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'bind' node is missing 'addr' attribute\n"); + return 1; + } + in_socket.bind (addr); + n++; + } + + n = 0; + while (true) { + XMLNode connect = in_node.getChildNode ("connect", n); + if (connect.isEmpty ()) + break; + const char *addr = connect.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'connect' node is missing 'addr' attribute\n"); + return 1; + } + in_socket.connect (addr); + n++; + } + + n = 0; + while (true) { + XMLNode bind = out_node.getChildNode ("bind", n); + if (bind.isEmpty ()) + break; + const char *addr = bind.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'bind' node is missing 'addr' attribute\n"); + return 1; + } + out_socket.bind (addr); + n++; + } + + n = 0; + while (true) { + XMLNode connect = out_node.getChildNode ("connect", n); + if (connect.isEmpty ()) + break; + const char *addr = connect.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'connect' node is missing 'addr' attribute\n"); + return 1; + } + out_socket.connect (addr); + n++; + } + + zmq::message_t msg; + while (true) { + in_socket.recv (&msg); + out_socket.send (msg); + } + + return 0; +} diff --git a/man/Makefile.am b/man/Makefile.am new file mode 100644 index 0000000..4f8c05a --- /dev/null +++ b/man/Makefile.am @@ -0,0 +1,19 @@ +dist_man_MANS = man1/zmq_forwarder.1 man3/zmq_init.3 man3/zmq_term.3 \ + man3/zmq_socket.3 man3/zmq_close.3 man3/zmq_setsockopt.3 man3/zmq_bind.3 \ + man3/zmq_connect.3 man3/zmq_send.3 man3/zmq_flush.3 man3/zmq_recv.3 \ + man3/zmq_poll.3 man3/zmq_msg_init.3 man3/zmq_msg_init_size.3 \ + man3/zmq_msg_close.3 man3/zmq_msg_move.3 man3/zmq_msg_copy.3 \ + man3/zmq_msg_data.3 man3/zmq_msg_size.3 \ + man3/zmq_strerror.3 man7/zmq.7 + +distclean-local: + -rm *.pdf + -rm man1/*.ps + -rm man3/*.ps + -rm man7/*.ps + +dist-hook: + ./convert2pdf.sh + $(mkdir_p) $(top_distdir)/doc + cp $(top_srcdir)/man/*.pdf $(top_distdir)/doc + diff --git a/man/convert2pdf.sh b/man/convert2pdf.sh new file mode 100644 index 0000000..85bc22c --- /dev/null +++ b/man/convert2pdf.sh @@ -0,0 +1,66 @@ +#!/bin/sh +# +# Copyright (c) 2007-2009 FastMQ Inc. +# +# This file is part of 0MQ. +# +# 0MQ is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# 0MQ is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +groff -man -Tps man1/zmq_forwarder.1 > man1/zmq_forwarder.1.ps +ps2pdf man1/zmq_forwarder.1.ps zmq_forwarder.pdf + +groff -man -Tps man3/zmq_init.3 > man3/zmq_init.3.ps +ps2pdf man3/zmq_init.3.ps zmq_init.pdf +groff -man -Tps man3/zmq_term.3 > man3/zmq_term.3.ps +ps2pdf man3/zmq_term.3.ps zmq_term.pdf +groff -man -Tps man3/zmq_socket.3 > man3/zmq_socket.3.ps +ps2pdf man3/zmq_socket.3.ps zmq_socket.pdf +groff -man -Tps man3/zmq_close.3 > man3/zmq_close.3.ps +ps2pdf man3/zmq_close.3.ps zmq_close.pdf +groff -man -Tps man3/zmq_setsockopt.3 > man3/zmq_setsockopt.3.ps +ps2pdf man3/zmq_setsockopt.3.ps zmq_setsockopt.pdf +groff -man -Tps man3/zmq_bind.3 > man3/zmq_bind.3.ps +ps2pdf man3/zmq_bind.3.ps zmq_bind.pdf +groff -man -Tps man3/zmq_connect.3 > man3/zmq_connect.3.ps +ps2pdf man3/zmq_connect.3.ps zmq_connect.pdf +groff -man -Tps man3/zmq_send.3 > man3/zmq_send.3.ps +ps2pdf man3/zmq_send.3.ps zmq_send.pdf +groff -man -Tps man3/zmq_flush.3 > man3/zmq_flush.3.ps +ps2pdf man3/zmq_flush.3.ps zmq_flush.pdf +groff -man -Tps man3/zmq_recv.3 > man3/zmq_recv.3.ps +ps2pdf man3/zmq_recv.3.ps zmq_recv.pdf +groff -man -Tps man3/zmq_poll.3 > man3/zmq_poll.3.ps +ps2pdf man3/zmq_poll.3.ps zmq_poll.pdf +groff -man -Tps man3/zmq_msg_init.3 > man3/zmq_msg_init.3.ps +ps2pdf man3/zmq_msg_init.3.ps zmq_msg_init.pdf +groff -man -Tps man3/zmq_msg_init_size.3 > man3/zmq_msg_init_size.3.ps +ps2pdf man3/zmq_msg_init_size.3.ps zmq_msg_init_size.pdf +groff -man -Tps man3/zmq_msg_init_data.3 > man3/zmq_msg_init_data.3.ps +ps2pdf man3/zmq_msg_init_data.3.ps zmq_msg_init_data.pdf +groff -man -Tps man3/zmq_msg_close.3 > man3/zmq_msg_close.3.ps +ps2pdf man3/zmq_msg_close.3.ps zmq_msg_close.pdf +groff -man -Tps man3/zmq_msg_move.3 > man3/zmq_msg_move.3.ps +ps2pdf man3/zmq_msg_move.3.ps zmq_msg_move.pdf +groff -man -Tps man3/zmq_msg_copy.3 > man3/zmq_msg_copy.3.ps +ps2pdf man3/zmq_msg_copy.3.ps zmq_msg_copy.pdf +groff -man -Tps man3/zmq_msg_data.3 > man3/zmq_msg_data.3.ps +ps2pdf man3/zmq_msg_data.3.ps zmq_msg_data.pdf +groff -man -Tps man3/zmq_msg_size.3 > man3/zmq_msg_size.3.ps +ps2pdf man3/zmq_msg_size.3.ps zmq_msg_size.pdf +groff -man -Tps man3/zmq_strerror.3 > man3/zmq_strerror.3.ps +ps2pdf man3/zmq_strerror.3.ps zmq_strerror.pdf + +groff -man -Tps man7/zmq.7 > man7/zmq.7.ps +ps2pdf man7/zmq.7.ps zmq.pdf + diff --git a/man/man1/zmq_forwarder.1 b/man/man1/zmq_forwarder.1 new file mode 100644 index 0000000..63a0b6b --- /dev/null +++ b/man/man1/zmq_forwarder.1 @@ -0,0 +1,11 @@ +.TH zmq_forwarder 1 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_forwarder \- forwards the stream of PUB/SUB messages +.SH SYNOPSIS +.SH DESCRIPTION +.SH OPTIONS +.SH "SEE ALSO" +.SH AUTHOR +Martin Sustrik <sustrik at fastmq dot com> + + diff --git a/man/man3/zmq_bind.3 b/man/man3/zmq_bind.3 new file mode 100644 index 0000000..069b966 --- /dev/null +++ b/man/man3/zmq_bind.3 @@ -0,0 +1,48 @@ +.TH zmq_bind 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_bind \- binds the socket to the specified address +.SH SYNOPSIS +.B int zmq_bind (void *s, const char *addr); +.SH DESCRIPTION +The function binds socket +.IR s to a particular transport. Actual semantics of the +command depend on the underlying transport mechanism, however, in cases where +peers connect in an asymetric manner, +.IR zmq_bind +should be called first, +.IR zmq_connect +afterwards. For actual formats of +.IR addr +parameter for different types of transport have a look at +.IR zmq(7) . +Note that single socket can be bound (and connected) to +arbitrary number of peers using different transport mechanisms. +.SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. +.SH ERRORS +.IP "\fBEPROTONOSUPPORT\fP" +unsupported protocol. +.IP "\fBENOCOMPATPROTO\fP" +protocol is not compatible with the socket type. +.IP "\fBEADDRINUSE\fP" +the given address is already in use. +.IP "\fBEADDRNOTAVAIL\fP" +a nonexistent interface was requested or the requested address was not local. +.SH EXAMPLE +.nf +void *s = zmq_socket (context, ZMQ_PUB); +assert (s); +int rc = zmq_bind (s, "inproc://my_publisher"); +assert (rc == 0); +rc = zmq_bind (s, "tcp://eth0:5555"); +assert (rc == 0); +.fi +.SH SEE ALSO +.BR zmq_connect (3) +.BR zmq_socket (3) +.BR zmq (7) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_close.3 b/man/man3/zmq_close.3 new file mode 100644 index 0000000..cc49635 --- /dev/null +++ b/man/man3/zmq_close.3 @@ -0,0 +1,27 @@ +.TH zmq_close 3 "" "(c)200 |