diff options
author | malosek <malosek@fastmq.com> | 2009-11-30 16:45:36 +0100 |
---|---|---|
committer | malosek <malosek@fastmq.com> | 2009-11-30 16:45:36 +0100 |
commit | c637bf292d0dc97be5c94c5c96a033c2d665576c (patch) | |
tree | f6e82c3003ac1e4a646f588a7423d60c0e7dcc23 | |
parent | 9ccf2b42cf932b4c29ea20cc9c6e3d5d8e7a62b4 (diff) | |
parent | fa1641afc593be5926e558381861112b584e861a (diff) |
Merge branch 'master' of git@github.com:sustrik/zeromq2
65 files changed, 1941 insertions, 64 deletions
@@ -9,6 +9,7 @@ Dirk O. Kaar Erich Heine Frank Denis George Neill +Jon Dyte Martin Hurton Martin Lucina Martin Sustrik @@ -36,3 +37,4 @@ Matt Muggeridge Paulo Henrique Silva Peter Lemenkov Robert Zhang +Vitaly Mayatskikh diff --git a/Makefile.am b/Makefile.am index fe0f1f0..af6ba2e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -2,8 +2,12 @@ if BUILD_PERF DIR_PERF = perf endif -SUBDIRS = src $(DIR_PERF) devices bindings -DIST_SUBDIRS = src perf devices bindings +if INSTALL_MAN +DIR_MAN = man +endif + +SUBDIRS = src $(DIR_MAN) $(DIR_PERF) devices bindings +DIST_SUBDIRS = src man perf devices bindings EXTRA_DIST = $(top_srcdir)/foreign/openpgm/@pgm1_basename@.tar.bz2 \ $(top_srcdir)/foreign/openpgm/@pgm2_basename@ \ diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h index 40750ec..ff94dd4 100644 --- a/bindings/c/zmq.h +++ b/bindings/c/zmq.h @@ -63,6 +63,9 @@ extern "C" { #ifndef EADDRNOTAVAIL #define EADDRNOTAVAIL (ZMQ_HAUSNUMERO + 6) #endif +#ifndef ECONNREFUSED +#define ECONNREFUSED (ZMQ_HAUSNUMERO + 7) +#endif // Native 0MQ error codes. #define EMTHREAD (ZMQ_HAUSNUMERO + 50) @@ -180,7 +183,7 @@ ZMQ_EXPORT int zmq_term (void *context); // Socket to send requests and receive replies. Requests are // load-balanced among all the peers. This socket type allows -// only an alternated sequence of send's and recv's +// only an alternated sequence of send's and recv's. #define ZMQ_REQ 3 // Socket to receive requests and send replies. This socket type allows @@ -188,6 +191,12 @@ ZMQ_EXPORT int zmq_term (void *context); // the peer that issued the last received request. #define ZMQ_REP 4 +// Socket to receive messages from up the stream. +#define ZMQ_UPSTREAM 5 + +// Socket to send messages downstream. +#define ZMQ_DOWNSTREAM 6 + // Open a socket. 'type' is one of the socket types defined above. // // Errors: EINVAL - invalid socket type. diff --git a/bindings/java/org/zmq/Socket.java b/bindings/java/org/zmq/Socket.java index 501bc16..396a6a0 100644 --- a/bindings/java/org/zmq/Socket.java +++ b/bindings/java/org/zmq/Socket.java @@ -34,6 +34,8 @@ public class Socket public static final int SUB = 2; public static final int REQ = 3; public static final int REP = 4; + public static final int UPSTREAM = 4; + public static final int DOWNSTREAM = 4; public static final int HWM = 1; public static final int LWM = 2; diff --git a/bindings/python/pyzmq.cpp b/bindings/python/pyzmq.cpp index b180bcd..26ca7ac 100644 --- a/bindings/python/pyzmq.cpp +++ b/bindings/python/pyzmq.cpp @@ -498,6 +498,12 @@ PyMODINIT_FUNC initlibpyzmq () t = PyInt_FromLong (ZMQ_REP); PyDict_SetItemString (dict, "REP", t); Py_DECREF (t); + t = PyInt_FromLong (ZMQ_UPSTREAM); + PyDict_SetItemString (dict, "UPSTREAM", t); + Py_DECREF (t); + t = PyInt_FromLong (ZMQ_DOWNSTREAM); + PyDict_SetItemString (dict, "DOWNSTREAM", t); + Py_DECREF (t); t = PyInt_FromLong (ZMQ_HWM); PyDict_SetItemString (dict, "HWM", t); Py_DECREF (t); diff --git a/bindings/ruby/rbzmq.cpp b/bindings/ruby/rbzmq.cpp index 6112972..2a26ce1 100644 --- a/bindings/ruby/rbzmq.cpp +++ b/bindings/ruby/rbzmq.cpp @@ -275,6 +275,8 @@ extern "C" void Init_librbzmq () rb_define_global_const ("PUB", INT2NUM (ZMQ_PUB)); rb_define_global_const ("REQ", INT2NUM (ZMQ_REQ)); rb_define_global_const ("REP", INT2NUM (ZMQ_REP)); + rb_define_global_const ("UPSTREAM", INT2NUM (ZMQ_UPSTREAM)); + rb_define_global_const ("DOWNSTREAM", INT2NUM (ZMQ_DOWNSTREAM)); rb_define_global_const ("POLL", INT2NUM (ZMQ_POLL)); } diff --git a/builds/msvc/libzmq/libzmq.vcproj b/builds/msvc/libzmq/libzmq.vcproj index 8927fa2..e575d67 100644 --- a/builds/msvc/libzmq/libzmq.vcproj +++ b/builds/msvc/libzmq/libzmq.vcproj @@ -182,6 +182,10 @@ > </File> <File + RelativePath="..\..\..\src\downstream.cpp" + > + </File> + <File RelativePath="..\..\..\src\epoll.cpp" > </File> @@ -290,6 +294,10 @@ > </File> <File + RelativePath="..\..\..\src\upstream.cpp" + > + </File> + <File RelativePath="..\..\..\src\uuid.cpp" > </File> @@ -376,6 +384,10 @@ > </File> <File + RelativePath="..\..\..\src\downstream.hpp" + > + </File> + <File RelativePath="..\..\..\src\epoll.hpp" > </File> @@ -532,6 +544,10 @@ > </File> <File + RelativePath="..\..\..\src\upstream.hpp" + > + </File> + <File RelativePath="..\..\..\src\uuid.hpp" > </File> diff --git a/configure.in b/configure.in index 1b1db35..bd3a0f4 100644 --- a/configure.in +++ b/configure.in @@ -49,6 +49,10 @@ on_mingw32="no" # Host speciffic checks AC_CANONICAL_HOST +# Whether or not install manual pages. +# Note that on MinGW manpages are not installed. +install_man="yes" + case "${host_os}" in *linux*) AC_DEFINE(ZMQ_HAVE_LINUX, 1, [Have Linux OS]) @@ -134,6 +138,7 @@ case "${host_os}" in [AC_MSG_ERROR([Could not link with Iphlpapi.dll.])]) CFLAGS="${CFLAGS} -std=c99" on_mingw32="yes" + install_man="no" ;; *) AC_MSG_ERROR([Not supported os: $host.]) @@ -585,6 +590,14 @@ if test "x$with_forwarder" != "xno"; then forwarder="yes" fi +# streamer device +streamer="no" +AC_ARG_WITH([streamer], [AS_HELP_STRING([--with-streamer], + [build streamer device [default=no]])], [with_streamer=yes], [with_streamer=no]) + +if test "x$with_streamer" != "xno"; then + streamer="yes" +fi # Perf perf="no" @@ -613,10 +626,12 @@ AM_CONDITIONAL(BUILD_CPP, test "x$cppzmq" = "xyes") AM_CONDITIONAL(BUILD_PGM1, test "x$pgm1_ext" = "xyes") AM_CONDITIONAL(BUILD_PGM2, test "x$pgm2_ext" = "xyes") AM_CONDITIONAL(BUILD_NO_PGM, test "x$pgm2_ext" = "xno" -a "x$pgm1_ext" = "xno") -AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes") +AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes") +AM_CONDITIONAL(BUILD_STREAMER, test "x$streamer" = "xyes") AM_CONDITIONAL(BUILD_PERF, test "x$perf" = "xyes") AM_CONDITIONAL(ON_MINGW, test "x$on_mingw32" = "xyes") AM_CONDITIONAL(BUILD_PGM2_EXAMPLES, test "x$with_pgm2_ext" = "xyes") +AM_CONDITIONAL(INSTALL_MAN, test "x$install_man" = "xyes") AC_SUBST(stdint) AC_SUBST(inttypes) @@ -631,11 +646,12 @@ AC_FUNC_MALLOC AC_TYPE_SIGNAL AC_CHECK_FUNCS(perror gettimeofday memset socket getifaddrs freeifaddrs) -AC_OUTPUT(Makefile src/Makefile bindings/python/Makefile \ +AC_OUTPUT(Makefile src/Makefile man/Makefile bindings/python/Makefile \ bindings/python/setup.py bindings/ruby/Makefile \ bindings/java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \ perf/python/Makefile perf/ruby/Makefile perf/java/Makefile src/libzmq.pc \ - devices/Makefile devices/zmq_forwarder/Makefile bindings/Makefile) + devices/Makefile devices/zmq_forwarder/Makefile \ + devices/zmq_streamer/Makefile bindings/Makefile) AC_MSG_RESULT([]) AC_MSG_RESULT([ ******************************************************** ]) @@ -670,6 +686,7 @@ AC_MSG_RESULT([ PGM: no]) fi AC_MSG_RESULT([ Devices:]) AC_MSG_RESULT([ forwarder: $forwarder]) +AC_MSG_RESULT([ streamer: $streamer]) AC_MSG_RESULT([ Performance tests: $perf]) AC_MSG_RESULT([]) AC_MSG_RESULT([ ******************************************************** ]) diff --git a/devices/Makefile.am b/devices/Makefile.am index 4cbad14..ab18976 100644 --- a/devices/Makefile.am +++ b/devices/Makefile.am @@ -2,5 +2,9 @@ if BUILD_FORWARDER FORWARDER_DIR = zmq_forwarder endif -SUBDIRS = $(FORWARDER_DIR) -DIST_SUBDIRS = zmq_forwarder +if BUILD_STREAMER +STREAMER_DIR = zmq_streamer +endif + +SUBDIRS = $(FORWARDER_DIR) $(STREAMER_DIR) +DIST_SUBDIRS = zmq_forwarder zmq_streamer diff --git a/devices/zmq_forwarder/zmq_forwarder.cpp b/devices/zmq_forwarder/zmq_forwarder.cpp index 32af5dd..d29ed62 100644 --- a/devices/zmq_forwarder/zmq_forwarder.cpp +++ b/devices/zmq_forwarder/zmq_forwarder.cpp @@ -23,7 +23,7 @@ int main (int argc, char *argv []) { if (argc != 2) { - fprintf (stderr, "usage: forwarder <config-file>\n"); + fprintf (stderr, "usage: zmq_forwarder <config-file>\n"); return 1; } @@ -53,8 +53,9 @@ int main (int argc, char *argv []) // TODO: make the number of I/O threads configurable. zmq::context_t ctx (1, 1); - zmq::socket_t in_socket (ctx, ZMQ_P2P); - zmq::socket_t out_socket (ctx, ZMQ_P2P); + zmq::socket_t in_socket (ctx, ZMQ_SUB); + in_socket.setsockopt (ZMQ_SUBSCRIBE, "*", 1); + zmq::socket_t out_socket (ctx, ZMQ_PUB); int n = 0; while (true) { diff --git a/devices/zmq_streamer/Makefile.am b/devices/zmq_streamer/Makefile.am new file mode 100644 index 0000000..e3681bf --- /dev/null +++ b/devices/zmq_streamer/Makefile.am @@ -0,0 +1,9 @@ +INCLUDES = -I$(top_builddir)/bindings/c + +bin_PROGRAMS = zmq_streamer + +zmq_streamer_LDADD = $(top_builddir)/src/libzmq.la +zmq_streamer_SOURCES = zmq_streamer.cpp +zmq_streamer_CXXFLAGS = -Wall -pedantic -Werror + + diff --git a/devices/zmq_streamer/zmq_streamer.cpp b/devices/zmq_streamer/zmq_streamer.cpp new file mode 100644 index 0000000..84e6569 --- /dev/null +++ b/devices/zmq_streamer/zmq_streamer.cpp @@ -0,0 +1,122 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../../bindings/cpp/zmq.hpp" +#include "../../foreign/xmlParser/xmlParser.cpp" + +int main (int argc, char *argv []) +{ + if (argc != 2) { + fprintf (stderr, "usage: zmq_streamer <config-file>\n"); + return 1; + } + + XMLNode root = XMLNode::parseFile (argv [1]); + if (root.isEmpty ()) { + fprintf (stderr, "configuration file not found\n"); + return 1; + } + + if (strcmp (root.getName (), "streamer") != 0) { + fprintf (stderr, "root element in the configuration file should be " + "named 'streamer'\n"); + return 1; + } + + XMLNode in_node = root.getChildNode ("in"); + if (in_node.isEmpty ()) { + fprintf (stderr, "'in' node is missing in the configuration file\n"); + return 1; + } + + XMLNode out_node = root.getChildNode ("out"); + if (out_node.isEmpty ()) { + fprintf (stderr, "'out' node is missing in the configuration file\n"); + return 1; + } + + // TODO: make the number of I/O threads configurable. + zmq::context_t ctx (1, 1); + zmq::socket_t in_socket (ctx, ZMQ_UPSTREAM); + zmq::socket_t out_socket (ctx, ZMQ_DOWNSTREAM); + + int n = 0; + while (true) { + XMLNode bind = in_node.getChildNode ("bind", n); + if (bind.isEmpty ()) + break; + const char *addr = bind.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'bind' node is missing 'addr' attribute\n"); + return 1; + } + in_socket.bind (addr); + n++; + } + + n = 0; + while (true) { + XMLNode connect = in_node.getChildNode ("connect", n); + if (connect.isEmpty ()) + break; + const char *addr = connect.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'connect' node is missing 'addr' attribute\n"); + return 1; + } + in_socket.connect (addr); + n++; + } + + n = 0; + while (true) { + XMLNode bind = out_node.getChildNode ("bind", n); + if (bind.isEmpty ()) + break; + const char *addr = bind.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'bind' node is missing 'addr' attribute\n"); + return 1; + } + out_socket.bind (addr); + n++; + } + + n = 0; + while (true) { + XMLNode connect = out_node.getChildNode ("connect", n); + if (connect.isEmpty ()) + break; + const char *addr = connect.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'connect' node is missing 'addr' attribute\n"); + return 1; + } + out_socket.connect (addr); + n++; + } + + zmq::message_t msg; + while (true) { + in_socket.recv (&msg); + out_socket.send (msg); + } + + return 0; +} diff --git a/man/Makefile.am b/man/Makefile.am new file mode 100644 index 0000000..4f8c05a --- /dev/null +++ b/man/Makefile.am @@ -0,0 +1,19 @@ +dist_man_MANS = man1/zmq_forwarder.1 man3/zmq_init.3 man3/zmq_term.3 \ + man3/zmq_socket.3 man3/zmq_close.3 man3/zmq_setsockopt.3 man3/zmq_bind.3 \ + man3/zmq_connect.3 man3/zmq_send.3 man3/zmq_flush.3 man3/zmq_recv.3 \ + man3/zmq_poll.3 man3/zmq_msg_init.3 man3/zmq_msg_init_size.3 \ + man3/zmq_msg_close.3 man3/zmq_msg_move.3 man3/zmq_msg_copy.3 \ + man3/zmq_msg_data.3 man3/zmq_msg_size.3 \ + man3/zmq_strerror.3 man7/zmq.7 + +distclean-local: + -rm *.pdf + -rm man1/*.ps + -rm man3/*.ps + -rm man7/*.ps + +dist-hook: + ./convert2pdf.sh + $(mkdir_p) $(top_distdir)/doc + cp $(top_srcdir)/man/*.pdf $(top_distdir)/doc + diff --git a/man/convert2pdf.sh b/man/convert2pdf.sh new file mode 100644 index 0000000..85bc22c --- /dev/null +++ b/man/convert2pdf.sh @@ -0,0 +1,66 @@ +#!/bin/sh +# +# Copyright (c) 2007-2009 FastMQ Inc. +# +# This file is part of 0MQ. +# +# 0MQ is free software; you can redistribute it and/or modify it under +# the terms of the Lesser GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# 0MQ is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Lesser GNU General Public License for more details. +# +# You should have received a copy of the Lesser GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +groff -man -Tps man1/zmq_forwarder.1 > man1/zmq_forwarder.1.ps +ps2pdf man1/zmq_forwarder.1.ps zmq_forwarder.pdf + +groff -man -Tps man3/zmq_init.3 > man3/zmq_init.3.ps +ps2pdf man3/zmq_init.3.ps zmq_init.pdf +groff -man -Tps man3/zmq_term.3 > man3/zmq_term.3.ps +ps2pdf man3/zmq_term.3.ps zmq_term.pdf +groff -man -Tps man3/zmq_socket.3 > man3/zmq_socket.3.ps +ps2pdf man3/zmq_socket.3.ps zmq_socket.pdf +groff -man -Tps man3/zmq_close.3 > man3/zmq_close.3.ps +ps2pdf man3/zmq_close.3.ps zmq_close.pdf +groff -man -Tps man3/zmq_setsockopt.3 > man3/zmq_setsockopt.3.ps +ps2pdf man3/zmq_setsockopt.3.ps zmq_setsockopt.pdf +groff -man -Tps man3/zmq_bind.3 > man3/zmq_bind.3.ps +ps2pdf man3/zmq_bind.3.ps zmq_bind.pdf +groff -man -Tps man3/zmq_connect.3 > man3/zmq_connect.3.ps +ps2pdf man3/zmq_connect.3.ps zmq_connect.pdf +groff -man -Tps man3/zmq_send.3 > man3/zmq_send.3.ps +ps2pdf man3/zmq_send.3.ps zmq_send.pdf +groff -man -Tps man3/zmq_flush.3 > man3/zmq_flush.3.ps +ps2pdf man3/zmq_flush.3.ps zmq_flush.pdf +groff -man -Tps man3/zmq_recv.3 > man3/zmq_recv.3.ps +ps2pdf man3/zmq_recv.3.ps zmq_recv.pdf +groff -man -Tps man3/zmq_poll.3 > man3/zmq_poll.3.ps +ps2pdf man3/zmq_poll.3.ps zmq_poll.pdf +groff -man -Tps man3/zmq_msg_init.3 > man3/zmq_msg_init.3.ps +ps2pdf man3/zmq_msg_init.3.ps zmq_msg_init.pdf +groff -man -Tps man3/zmq_msg_init_size.3 > man3/zmq_msg_init_size.3.ps +ps2pdf man3/zmq_msg_init_size.3.ps zmq_msg_init_size.pdf +groff -man -Tps man3/zmq_msg_init_data.3 > man3/zmq_msg_init_data.3.ps +ps2pdf man3/zmq_msg_init_data.3.ps zmq_msg_init_data.pdf +groff -man -Tps man3/zmq_msg_close.3 > man3/zmq_msg_close.3.ps +ps2pdf man3/zmq_msg_close.3.ps zmq_msg_close.pdf +groff -man -Tps man3/zmq_msg_move.3 > man3/zmq_msg_move.3.ps +ps2pdf man3/zmq_msg_move.3.ps zmq_msg_move.pdf +groff -man -Tps man3/zmq_msg_copy.3 > man3/zmq_msg_copy.3.ps +ps2pdf man3/zmq_msg_copy.3.ps zmq_msg_copy.pdf +groff -man -Tps man3/zmq_msg_data.3 > man3/zmq_msg_data.3.ps +ps2pdf man3/zmq_msg_data.3.ps zmq_msg_data.pdf +groff -man -Tps man3/zmq_msg_size.3 > man3/zmq_msg_size.3.ps +ps2pdf man3/zmq_msg_size.3.ps zmq_msg_size.pdf +groff -man -Tps man3/zmq_strerror.3 > man3/zmq_strerror.3.ps +ps2pdf man3/zmq_strerror.3.ps zmq_strerror.pdf + +groff -man -Tps man7/zmq.7 > man7/zmq.7.ps +ps2pdf man7/zmq.7.ps zmq.pdf + diff --git a/man/man1/zmq_forwarder.1 b/man/man1/zmq_forwarder.1 new file mode 100644 index 0000000..63a0b6b --- /dev/null +++ b/man/man1/zmq_forwarder.1 @@ -0,0 +1,11 @@ +.TH zmq_forwarder 1 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_forwarder \- forwards the stream of PUB/SUB messages +.SH SYNOPSIS +.SH DESCRIPTION +.SH OPTIONS +.SH "SEE ALSO" +.SH AUTHOR +Martin Sustrik <sustrik at fastmq dot com> + + diff --git a/man/man3/zmq_bind.3 b/man/man3/zmq_bind.3 new file mode 100644 index 0000000..069b966 --- /dev/null +++ b/man/man3/zmq_bind.3 @@ -0,0 +1,48 @@ +.TH zmq_bind 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_bind \- binds the socket to the specified address +.SH SYNOPSIS +.B int zmq_bind (void *s, const char *addr); +.SH DESCRIPTION +The function binds socket +.IR s to a particular transport. Actual semantics of the +command depend on the underlying transport mechanism, however, in cases where +peers connect in an asymetric manner, +.IR zmq_bind +should be called first, +.IR zmq_connect +afterwards. For actual formats of +.IR addr +parameter for different types of transport have a look at +.IR zmq(7) . +Note that single socket can be bound (and connected) to +arbitrary number of peers using different transport mechanisms. +.SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. +.SH ERRORS +.IP "\fBEPROTONOSUPPORT\fP" +unsupported protocol. +.IP "\fBENOCOMPATPROTO\fP" +protocol is not compatible with the socket type. +.IP "\fBEADDRINUSE\fP" +the given address is already in use. +.IP "\fBEADDRNOTAVAIL\fP" +a nonexistent interface was requested or the requested address was not local. +.SH EXAMPLE +.nf +void *s = zmq_socket (context, ZMQ_PUB); +assert (s); +int rc = zmq_bind (s, "inproc://my_publisher"); +assert (rc == 0); +rc = zmq_bind (s, "tcp://eth0:5555"); +assert (rc == 0); +.fi +.SH SEE ALSO +.BR zmq_connect (3) +.BR zmq_socket (3) +.BR zmq (7) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_close.3 b/man/man3/zmq_close.3 new file mode 100644 index 0000000..cc49635 --- /dev/null +++ b/man/man3/zmq_close.3 @@ -0,0 +1,27 @@ +.TH zmq_close 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_close \- destroys 0MQ socket +.SH SYNOPSIS +.B int zmq_close (void *s); +.SH DESCRIPTION +Destroys 0MQ socket (one created using +.IR zmq_socket +function). All sockets have to be properly closed before the application +terminates, otherwise memory leaks will occur. +.SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. +.SH ERRORS +No errors are defined. +.SH EXAMPLE +.nf +int rc = zmq_close (s); +assert (rc == 0); +.fi +.SH SEE ALSO +.BR zmq_socket (3) +.BR zmq_term (3) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_connect.3 b/man/man3/zmq_connect.3 new file mode 100644 index 0000000..8f09e20 --- /dev/null +++ b/man/man3/zmq_connect.3 @@ -0,0 +1,47 @@ +.TH zmq_connect 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_connect \- connect the socket to the specified peer +.SH SYNOPSIS +.B int zmq_connect (void *s, const char *addr); +.SH DESCRIPTION +The function connect socket +.IR s to the peer identified by +.IR addr . +Actual semantics of the command depend on the underlying transport mechanism, +however, in cases where peers connect in an asymetric manner, +.IR zmq_bind +should be called first, +.IR zmq_connect +afterwards. For actual formats of +.IR addr +parameter for different types of transport have a look at +.IR zmq(7) . +Note that single socket can be connected (and bound) to +arbitrary number of peers using different transport mechanisms. +.SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. +.SH ERRORS +.IP "\fBEPROTONOSUPPORT\fP" +unsupported protocol. +.IP "\fBENOCOMPATPROTO\fP" +protocol is not compatible with the socket type. +.IP "\fBECONNREFUSED\fP" +no-one listening on the remote address. +.SH EXAMPLE +.nf +void *s = zmq_socket (context, ZMQ_SUB); +assert (s); +int rc = zmq_connect (s, "inproc://my_publisher"); +assert (rc == 0); +rc = zmq_connect (s, "tcp://server001:5555"); +assert (rc == 0); +.fi +.SH SEE ALSO +.BR zmq_bind (3) +.BR zmq_socket (3) +.BR zmq (7) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_flush.3 b/man/man3/zmq_flush.3 new file mode 100644 index 0000000..194cf6c --- /dev/null +++ b/man/man3/zmq_flush.3 @@ -0,0 +1,37 @@ +.TH zmq_flush 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_flush \- flushes pre-sent messages to the socket +.SH SYNOPSIS +.B int zmq_flush (void *s); +.SH DESCRIPTION +Flushes all the pre-sent messages - i.e. those that have been sent with +ZMQ_NOFLUSH flag - to the socket. This functionality improves performance in +cases where several messages are sent during a single business operation. +It should not be used as a transaction - ACID properties are not guaranteed. +Note that calling +.IR zmq_send +without ZMQ_NOFLUSH flag automatically flushes all previously pre-sent messages. +.SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. +.SH ERRORS +.IP "\fBENOTSUP\fP" +function isn't supported by particular socket type. +.IP "\fBEFSM\fP" +function cannot be called at the moment, because socket is not in the +approprite state. +.SH EXAMPLE +.nf +rc = zmq_send (s, &msg1, ZMQ_NOFLUSH); +assert (rc == 0); +rc = zmq_send (s, &msg2, ZMQ_NOFLUSH); +assert (rc == 0); +rc = zmq_flush (s); +assert (rc == 0); +.fi +.SH SEE ALSO +.BR zmq_send (3) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_init.3 b/man/man3/zmq_init.3 new file mode 100644 index 0000000..317dbba --- /dev/null +++ b/man/man3/zmq_init.3 @@ -0,0 +1,39 @@ +.TH zmq_init 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_init \- initialises 0MQ context +.SH SYNOPSIS +.B void *zmq_init (int app_threads, int io_threads, int flags); +.SH DESCRIPTION +Initialises 0MQ context. +.IR app_threads +specifies maximal number of application threads that can own open sockets +at the same time. At least one application thread should be defined. +.IR io_threads +specifies the size of thread pool to handle I/O operations. The value shouldn't +be negative. Zero can be used in case only in-process messaging is going to be +used, i.e. there will be no I/O traffic. +'flags' argument is a combination of the flags defined below: + +.IP "\fBZMQ_POLL\fP" +flag specifying that the sockets within this context should be pollable (see +.IR zmq_poll +). Pollable sockets may add a little latency to the message transfer when +compared to non-pollable sockets. + +.SH RETURN VALUE +Function returns context handle is successful. Otherwise it returns NULL and +sets errno to one of the values below. +.SH ERRORS +.IP "\fBEINVAL\fP" +there's less than one application thread allocated, or number of I/O threads +is negative. +.SH EXAMPLE +.nf +void *ctx = zmq_init (1, 1, ZMQ_POLL); +assert (ctx); +.fi +.SH SEE ALSO +.BR zmq_term (3) +.BR zmq_socket (3) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_msg_close.3 b/man/man3/zmq_msg_close.3 new file mode 100644 index 0000000..6613360 --- /dev/null +++ b/man/man3/zmq_msg_close.3 @@ -0,0 +1,32 @@ +.TH zmq_msg_close 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_msg_close \- destroys 0MQ message +.SH SYNOPSIS +.B int zmq_msg_close (zmq_msg_t *msg); +.SH DESCRIPTION +Deallocates message +.IR msg +including any associated buffers (unless the buffer is +shared with another message). Not calling this function can result in +memory leaks. +.SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. +.SH ERRORS +No errors are defined. +.SH EXAMPLE +.nf +zmq_msg_t msg; +rc = zmq_msg_init_size (&msg, 1000000); +assert (rc = 0); +rc = zmq_msg_close (&msg); +assert (rc = 0); +.fi +.SH SEE ALSO +.BR zmq_msg_init (3) +.BR zmq_msg_init_size (3) +.BR zmq_msg_init_data (3) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_msg_copy.3 b/man/man3/zmq_msg_copy.3 new file mode 100644 index 0000000..2f70400 --- /dev/null +++ b/man/man3/zmq_msg_copy.3 @@ -0,0 +1,43 @@ +.TH zmq_msg_copy 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_msg_copy \- copies content of a message to another message +.SH SYNOPSIS +.B int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src); +.SH DESCRIPTION +Copy the +.IR src +message to +.IR dest . +The original content of +.IR dest +is orderly deallocated. +Caution: The implementation may choose not to physically copy the data, rather +to share the buffer between two messages. Thus avoid modifying message data +after the message was copied. Doing so can modify multiple message instances. +If what you need is actual hard copy, allocate new message using +.IR zmq_msg_size +and copy the data using +.IR memcpy . +.SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. +.SH ERRORS +No errors are defined. +.SH EXAMPLE +.nf +zmq_msg_t dest; +rc = zmq_msg_init (&dest); +assert (rc == 0); +rc = zmq_msg_copy (&dest, &src); +assert (rc == 0); +.fi +.SH SEE ALSO +.BR zmq_msg_move (3) +.BR zmq_msg_init (3) +.BR zmq_msg_init_size (3) +.BR zmq_msg_init_data (3) +.BR zmq_msg_close (3) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_msg_data.3 b/man/man3/zmq_msg_data.3 new file mode 100644 index 0000000..9876378 --- /dev/null +++ b/man/man3/zmq_msg_data.3 @@ -0,0 +1,27 @@ +.TH zmq_msg_data 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_msg_data \- retrieves pointer to the message content +.SH SYNOPSIS +.B void *zmq_msg_data (zmq_msg_t *msg); +.SH DESCRIPTION +Returns pointer to message data. Always use this function to access the data, +never use +.IR zmq_msg_t +members directly. +.SH RETURN VALUE +Pointer to the message data. +.SH ERRORS +No errors are defined. +.SH EXAMPLE +.nf +zmq_msg_t msg; +rc = zmq_msg_init_size (&msg, 100); +memset (zmq_msg_data (&msg), 0, 100); +.fi +.SH SEE ALSO +.BR zmq_msg_init (3) +.BR zmq_msg_init_size (3) +.BR zmq_msg_init_data (3) +.BR zmq_msg_close (3) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_msg_init.3 b/man/man3/zmq_msg_init.3 new file mode 100644 index 0000000..a531fc1 --- /dev/null +++ b/man/man3/zmq_msg_init.3 @@ -0,0 +1,33 @@ +.TH zmq_msg_init 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_msg_init \- initialises empty 0MQ message +.SH SYNOPSIS +.B int zmq_msg_init (zmq_msg_t *msg); +.SH DESCRIPTION +Initialises 0MQ message zero bytes long. The function is most useful +to initialise a +.IR zmq_msg_t +structure before receiving a message. +.SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. +.SH ERRORS +No errors are defined. +.SH EXAMPLE +.nf +zmq_msg_t msg; +rc = zmq_msg_init (&msg); +assert (rc == 0); +rc = zmq_recv (s, &msg, 0); +assert (rc == 0); +.fi +.SH SEE ALSO +.BR zmq_msg_close (3) +.BR zmq_msg_init_size (3) +.BR zmq_msg_init_data (3) +.BR zmq_msg_data (3) +.BR zmq_msg_size (3) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_msg_init_data.3 b/man/man3/zmq_msg_init_data.3 new file mode 100644 index 0000000..a0b14c8 --- /dev/null +++ b/man/man3/zmq_msg_init_data.3 @@ -0,0 +1,48 @@ +.TH zmq_msg_init_data 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_msg_init \- initialises 0MQ message from the given data +.SH SYNOPSIS +.nf +.B typedef void (zmq_free_fn) (void *data); +.B int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn); +.fi +.SH DESCRIPTION +Initialise a message from a supplied buffer. Message isn't copied, +instead 0MQ infrastructure takes ownership of the buffer located at address +.IR data , +.IR size +bytes long. +Deallocation function ( +.IR ffn +) will be called once the data are not needed anymore. Note that deallocation +function prototype is designed so that it complies with standard C +.IR free +function. When using a static constant buffer, +.IR ffn +may be NULL to prevent subsequent deallocation. +.SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. +.SH ERRORS +No errors are defined. +.SH EXAMPLE +.nf +void *data = malloc (6); +assert (data); +memcpy (data, "ABCDEF", 6); +zmq_msg_t msg; +rc = zmq_msg_init_data (&msg, data, 6, free); +assert (rc == 0); +rc = zmq_send (s, &msg, 0); +assert (rc == 0); +.fi +.SH SEE ALSO +.BR zmq_msg_close (3) +.BR zmq_msg_init (3) +.BR zmq_msg_init_size (3) +.BR zmq_msg_data (3) +.BR zmq_msg_size (3) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_msg_init_size.3 b/man/man3/zmq_msg_init_size.3 new file mode 100644 index 0000000..ce1ec94 --- /dev/null +++ b/man/man3/zmq_msg_init_size.3 @@ -0,0 +1,44 @@ +.TH zmq_msg_init_size 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_msg_init \- initialises 0MQ message of a specified size +.SH SYNOPSIS +.B int zmq_msg_init_size (zmq_msg_t *msg, size_t size); +.SH DESCRIPTION +Initialises 0MQ message +.IR size +bytes long. The implementation chooses whether it is more efficient to store +message content on the stack (small messages) or on the heap (large messages). +Therefore, never access message data directly via +.IR zmq_msg_t +members, rather use +.IR zmq_msg_data +and +.IR zmq_msg_size +functions to get message data and size. Note that the message data are not +nullified to avoid the associated performance impact. Thus you +should expect your message to contain bogus data after this call. +.SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. +.SH ERRORS +.IP "\fBENOMEM\fP" +memory to hold the message cannot be allocated. +.SH EXAMPLE +.nf +zmq_msg_t msg; +rc = zmq_msg_init_size (&msg, 6); +assert (rc == 0); +memcpy (zmq_msg_data (&msg), "ABCDEF", 6); +rc = zmq_send (s, &msg, 0); +assert (rc == 0); +.fi +.SH SEE ALSO +.BR zmq_msg_close (3) +.BR zmq_msg_init (3) +.BR zmq_msg_init_data (3) +.BR zmq_msg_data (3) +.BR zmq_msg_size (3) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_msg_move.3 b/man/man3/zmq_msg_move.3 new file mode 100644 index 0000000..810e105 --- /dev/null +++ b/man/man3/zmq_msg_move.3 @@ -0,0 +1,38 @@ +.TH zmq_msg_move 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_msg_move \- moves content of a message to another message +.SH SYNOPSIS +.B int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src); +.SH DESCRIPTION +Move the content of the message from +.IR src +to +.IR dest . +The content isn't copied, just moved. +.IR src +becomes an empty message after the call. Original content of +.IR dest +message is deallocated. +.SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. +.SH ERRORS +No errors are defined. +.SH EXAMPLE +.nf +zmq_msg_t dest; +rc = zmq_msg_init (&dest); +assert (rc == 0); +rc = zmq_msg_move (&dest, &src); +assert (rc == 0); +.fi +.SH SEE ALSO +.BR zmq_msg_copy (3) +.BR zmq_msg_init (3) +.BR zmq_msg_init_size (3) +.BR zmq_msg_init_data (3) +.BR zmq_msg_close (3) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_msg_size.3 b/man/man3/zmq_msg_size.3 new file mode 100644 index 0000000..b51d582 --- /dev/null +++ b/man/man3/zmq_msg_size.3 @@ -0,0 +1,30 @@ +.TH zmq_msg_size 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_msg_size \- retrieves size of the message content +.SH SYNOPSIS +.B size_t zmq_msg_size (zmq_msg_t *msg); +.SH DESCRIPTION +Returns size of the message data. Always use this function to get the size, +never use +.IR zmq_msg_t +members directly. +.SH RETURN VALUE +Size of the message data (bytes). +.SH ERRORS +No errors are defined. +.SH EXAMPLE +.nf +zmq_msg_t msg; +rc = zmq_msg_init (&msg); +assert (rc == 0); +rc = zmq_recv (s, &msg, 0); +assert (rc == 0); +size_t msg_size = zmq_msg_size (&msg); +.fi +.SH SEE ALSO +.BR zmq_msg_init (3) +.BR zmq_msg_init_size (3) +.BR zmq_msg_init_data (3) +.BR zmq_msg_close (3) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_poll.3 b/man/man3/zmq_poll.3 new file mode 100644 index 0000000..5ce5055 --- /dev/null +++ b/man/man3/zmq_poll.3 @@ -0,0 +1,65 @@ +.TH zmq_poll 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_poll \- polls for events on a set of 0MQ and POSIX sockets +.SH SYNOPSIS +.B int zmq_poll (zmq_pollitem_t *items, int nitems); +.SH DESCRIPTION +Waits for the events specified by +.IR items +parameter. Number of items in the array is determined by +.IR nitems +argument. Each item in the array looks like this: + +.nf +typedef struct +{ + void *socket; + int fd; + short events; + short revents; +} zmq_pollitem_t; +.fi + +0MQ socket to poll on is specified by +.IR socket . +In case you want to poll on standard POSIX socket, set +.IR socket +to NULL and fill the POSIX file descriptor to +.IR fd . +.IR events +specifies which events to wait for. It's a combination of the values below. +Once the call exits, +.IR revent +will be filled with events that have actually occured on the socket. The field +will contain a combination of the following values. + +.IP "\fBZMQ_POLLIN\fP" +poll for incoming messages. +.IP "\fBZMQ_POLLOUT\fP" +wait while message can be set socket. Poll will return if a message of at least +one byte can be written to the socket. However, there is no guarantee that +arbitrarily large message can be sent. + +.SH RETURN VALUE +Function returns number of items signaled or -1 in the case of error. +.SH ERRORS +.IP "\fBEFAULT\fP" +there's a 0MQ socket in the pollset belonging to a different application thread. +.IP "\fBENOTSUP\fP" +0MQ context was initialised without ZMQ_POLL flag. I/O multiplexing is disabled. +.SH EXAMPLE +.nf +zmq_pollitem_t items [2]; +items [0].socket = s; +items [0].events = POLLIN; +items [1].socket = NULL; +items [1].fd = my_fd; +items [1].events = POLLIN; + +int rc = zmq_poll (items, 2); +assert (rc != -1); +.fi +.SH SEE ALSO +.BR zmq_socket (3) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_recv.3 b/man/man3/zmq_recv.3 new file mode 100644 index 0000000..d3cf2fd --- /dev/null +++ b/man/man3/zmq_recv.3 @@ -0,0 +1,52 @@ +.TH zmq_recv 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_recv \- retrieves a message from the socket +.SH SYNOPSIS +.B int zmq_recv (void *s, zmq_msg_t *msg, int flags); +.SH DESCRIPTION +Receive a message from the socket +.IR s , +store it in +.IR msg . +Any content previously in +.IR msg +will be properly deallocated. +.IR flags +argument can be combination of the flags described below. + +.IP "\fBZMQ_NOBLOCK\fP" +The flag specifies that the operation should be performed in +non-blocking mode. I.e. if it cannot be processed immediately, +error should be returned with +.IR errno +set to EAGAIN. + +.SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. +.SH ERRORS +.IP "\fBEAGAIN\fP" +it's a non-blocking receive and there's no message available at the moment. +.IP "\fBENOTSUP\fP" +function isn't supported by particular socket type. +.IP "\fBEFSM\fP" +function cannot be called at the moment, because socket is not in the +approprite state. This error may occur with sockets that switch between +several states (e.g. ZMQ_REQ). +.SH EXAMPLE +.nf +zmq_msg_t msg; +int rc = zmq_msg_init (&msg); +assert (rc == 0); +rc = zmq_recv (s, &msg, 0); +assert (rc == 0); +.fi +.SH SEE ALSO +.BR zmq_send (3) +.BR zmq_msg_init (3) +.BR zmq_msg_data (3) +.BR zmq_msg_size (3) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_send.3 b/man/man3/zmq_send.3 new file mode 100644 index 0000000..0ebbd0c --- /dev/null +++ b/man/man3/zmq_send.3 @@ -0,0 +1,64 @@ +.TH zmq_send 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_send \- sends a message +.SH SYNOPSIS +.B int zmq_send (void *s, zmq_msg_t *msg, int flags); +.SH DESCRIPTION +Send the message +.IR msg +to the socket +.IR s . +.IR flags +argument can be combination the flags described below. + +.IP "\fBZMQ_NOBLOCK\fP" +The flag specifies that the operation should be performed in +non-blocking mode. I.e. if it cannot be processed immediately, +error should be returned with +.IR errno +set to EAGAIN. + +.IP "\fBZMQ_NOFLUSH\fP" +The flag specifies that +.IR zmq_send +should not flush the message downstream immediately. Instead, it should batch +ZMQ_NOFLUSH messages and send them downstream only once +.IR zmq_flush +is invoked. This is an optimisation for cases where several messages are sent +in a single business transaction. However, the effect is measurable only in +extremely high-perf scenarios (million messages a second or so). +If that's not your case, use standard flushing send instead. + +.SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. +.SH ERRORS +.IP "\fBEAGAIN\fP" +it's a non-blocking send and message cannot be sent at the moment. +.IP "\fBENOTSUP\fP" +function isn't supported by particular socket type. +.IP "\fBEFSM\fP" +function cannot be called at the moment, because socket is not in the +approprite state. This error may occur with sockets that switch between +several states (e.g. ZMQ_REQ). +.SH EXAMPLE +.nf +zmq_msg_t msg; +int rc = zmq_msg_init_size (&msg, 6); +assert (rc == 0); +memset (zmq_msg_data (&msg), 'A', 6); +rc = zmq_send (s, &msg, 0); +assert (rc == 0); +.fi +.SH SEE ALSO +.BR zmq_flush (3) +.BR zmq_recv (3) +.BR zmq_msg_init (3) +.BR zmq_msg_init_size (3) +.BR zmq_msg_init_data (3) +.BR zmq_msg_data (3) +.BR zmq_msg_size (3) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_setsockopt.3 b/man/man3/zmq_setsockopt.3 new file mode 100644 index 0000000..a79f879 --- /dev/null +++ b/man/man3/zmq_setsockopt.3 @@ -0,0 +1,115 @@ +.TH zmq_setsockopt 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_setsockopt \- sets a specified option on a 0MQ socket +.SH SYNOPSIS +.B int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen); +.SH DESCRIPTION +Sets an option on the socket. +.IR option +argument specifies the option from the list below. +.IR optval +is a pointer to the value to set, +.IR optvallen +is the size of the value in bytes. + +.IP "\fBZMQ_HWM\fP" +High watermark for the message pipes associated with the socket. The water +mark cannot be exceeded. If the messages don't fit into the pipe emergency +mechanisms of the particular socket type are used (block, drop etc.) If HWM +is set to zero, there are no limits for the content of the pipe. +Type: int64_t Unit: bytes Default: 0 + +.IP "\fBZMQ_LWM\fP" +Low watermark makes sense only if high watermark is defined (i.e. is non-zero). +When the emergency state is reached when messages overflow the pipe, the +emergency lasts till the size of the pipe decreases to low watermark. +At that point normal state is resumed. +Type: int64_t Unit: bytes Default: 0 + +.IP "\fBZMQ_SWAP\fP" +Swap allows the pipe to exceed high watermark. However, the data are written +to the disk rather than held in the memory. Until high watermark is +exceeded there is no disk activity involved though. The value of the option +defines maximal size of the swap file. +Type: int64_t Unit: bytes Default: 0 + +.IP "\fBZMQ_AFFINITY\fP" +Affinity defines which threads in the thread pool will be used to handle +newly created sockets. This way you can dedicate some of the threads (CPUs) +to a specific work. Value of 0 means no affinity. Work is distributed +fairly among the threads in the thread pool. For non-zero values, the lowest +bit corresponds to the thread 1, second lowest bit to the thread 2 etc. +Thus, value of 3 means that from now on newly created sockets will handle +I/O activity exclusively using threads no. 1 and 2. +Type: int64_t Unit: N/A (bitmap) Default: 0 + +.IP "\fBZMQ_IDENTITY\fP" +Identity of the socket. Identity is important when restarting applications. +If the socket has no identity, each run of the application is completely +separated from other runs. However, with identity application reconnects to +existing infrastructure left by the previous run. Thus it may receive +messages that were sent in the meantime, it shares pipe limits with the +previous run etc. +Type: string Unit: N/A Default: NULL + +.IP "\fBZMQ_SUBSCRIBE\fP" +Applicable only to ZMQ_SUB socket type. It establishes new message filter. +When ZMQ_SUB socket is created all the incoming messages are filtered out. +This option allows you to subscribe for all messages ("*"), messages with +specific topic ("x.y.z") and/or messages with specific topic prefix +("x.y.*"). Topic is one-byte-size-prefixed string located at +the very beginning of the message. Multiple filters can be attached to +a single 'sub' socket. In that case message passes if it matches at least +one of the filters. +Type: string Unit: N/A Default: N/A + +.IP "\fBZMQ_UNSUBSCRIBE\fP" +Applicable only to ZMQ_SUB socket type. Removes existing message filter. +The filter specified must match the string passed to ZMQ_SUBSCRIBE options +exactly. If there were several instances of the same filter created, +this options removes only one of them, leaving the rest in place +and functional. +Type: string Unit: N/A Default: N/A + +.IP "\fBZMQ_RATE\fP" +This option applies only to sending side of multicast transports (pgm & udp). +It specifies maximal outgoing data rate that an individual sender socket +can send. +Type: uint64_t Unit: kilobits/second Default: 100 + +.IP "\fBZMQ_RECOVERY_IVL\fP" +This option applies only to multicast transports (pgm & udp). It specifies +how long can the receiver socket survive when the sender is inaccessible. +Keep in mind that large recovery intervals at high data rates result in +very large recovery buffers, meaning that you can easily overload your box +by setting say 1 minute recovery interval at 1Gb/s rate (requires +7GB in-memory buffer). +Type: uint64_t Unit: seconds Default: 10 + +.IP "\fBZMQ_MCAST_LOOP\fP" +This option applies only to multicast transports (pgm & udp). Value of 1 +means that the mutlicast packets can be received on the box they were sent +from. Setting the value to 0 disables the loopback functionality which +can have negative impact on the performance. If possible, disable +the loopback in production environments. +Type: uint64_t Unit: N/A (boolean value) Default: 1 + +.SH RETURN VALUE +In case of success the function returns zero. Otherwise it returns -1 and +sets +.IR errno +to the appropriate value. +.SH ERRORS +.IP "\fBEINVAL\fP" +unknown option, a value with incorrect length or invalid value. +.SH EXAMPLE +.nf +int rc = zmq_setsockopt (s, ZMQ_SUBSCRIBE, "*", 1); +assert (rc == 0); +.fi +.SH SEE ALSO +.BR zmq_socket (3) +.BR zmq (7) + +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_socket.3 b/man/man3/zmq_socket.3 new file mode 100644 index 0000000..a73bba5 --- /dev/null +++ b/man/man3/zmq_socket.3 @@ -0,0 +1,77 @@ +.TH zmq_socket 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_socket \- creates 0MQ socket +.SH SYNOPSIS +.B void *zmq_socket (void *context, int type); +.SH DESCRIPTION +Open a socket within the specified +.IR context . +To create a context, use +.IR zmq_init +function. +.IR type +argument can be one of the values defined below. Note that each socket is owned +by exactly one thread (the one that it was created from) and should not be used +from any other thread. + +.IP "\fBZMQ_P2P\fP" +Socket to communicate with a single peer. Allows for only a single connect or a +single bind. There's no message routing or message filtering involved. + +.IP "\fBZMQ_PUB\fP" +Socket to distribute data. Recv fuction is not implemented for this socket +type. Messages are distributed in fanout fashion to all the peers. + +.IP "\fBZMQ_SUB\fP" +Socket to subscribe for data. Send function is not implemented for this +socket type. Initially, socket is subscribed for no messages. Use ZMQ_SUBSCRIBE +option to specify which messages to subscribe for. + +.IP "\fBZMQ_REQ\fP" +Socket to send requests and receive replies. Requests are load-balanced among +all the peers. This socket type allows only an alternated sequence of +send's and recv's. + +.IP "\fBZMQ_REP\fP" +Socket to receive requests and send replies. This socket type allows +only an alternated sequence of recv's and send's. Each send is routed to +the peer that issued the last received request. + +.IP "\fBZMQ_UPSTREAM\fP" +Socket to receive messages from up the stream. Messages are fair-queued +from among all the connected peers. Send function is not implemented for +this socket type. + +.IP "\fBZMQ_DOWNSTREAM\fP" +Socket to send messages down stream. Messages are load-balanced among all the +connected peers. Send function is not implemented for this socket type. + +.SH RETURN VALUE +Function returns socket handle is successful. Otherwise it returns NULL and +sets errno to one of the values below. +.SH ERRORS +.IP "\fBEINVAL\fP" +invalid socket type. +.IP "\fBEMTHREAD\fP" +the number of application threads allowed to own 0MQ sockets was exceeded. See +.IR app_threads +parameter to +.IR zmq_init +function. +.SH EXAMPLE +.nf +void *s = zmq_socket (context, ZMQ_PUB); +assert (s); +int rc = zmq_bind (s, "tcp://192.168.0.1:5555"); +assert (rc == 0); +.fi +.SH SEE ALSO +.BR zmq_init (3) +.BR zmq_setsockopt (3) +.BR zmq_bind (3) +.BR zmq_connect (3) +.BR zmq_send (3) +.BR zmq_flush (3) +.BR zmq_recv (3) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_strerror.3 b/man/man3/zmq_strerror.3 new file mode 100644 index 0000000..343c3ed --- /dev/null +++ b/man/man3/zmq_strerror.3 @@ -0,0 +1,27 @@ +.TH zmq_strerror 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_strerror \- returns string describing the error number +.SH SYNOPSIS +.B const char *zmq_strerror (int errnum); +.SH DESCRIPTION +As 0MQ defines few additional (non-POSIX) error codes, standard +.IR strerror +isn't capable of translating those errors into human readable strings. Instead, +.IR zmq_strerror +should be used. +.SH RETURN VALUE +Returns string describing the error number. +.SH ERRORS +No errors are defined. +.SH EXAMPLE +.nf +void *ctx = zmq_init (1, 1, 0); +if (!ctx) { + printf ("error occured during zmq_init: %s\\n", zmq_strerror (errno)); + abort (); +} +.fi +.SH SEE ALSO +.BR zmq (7) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man3/zmq_term.3 b/man/man3/zmq_term.3 new file mode 100644 index 0000000..8d822b6 --- /dev/null +++ b/man/man3/zmq_term.3 @@ -0,0 +1,25 @@ +.TH zmq_term 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +zmq_init \- terminates 0MQ context +.SH SYNOPSIS +.B int zmq_term (void *context); +.SH DESCRIPTION +Destroys 0MQ context. However, if there are still any sockets open within +the context, +.IR zmq_term +succeeds but shutdown of the context is delayed till the last socket is closed. +.SH RETURN VALUE +Function returns zero is successful. Otherwise it returns -1 and +sets errno to one of the values below. +.SH ERRORS +No errors are defined. +.SH EXAMPLE +.nf +int rc = zmq_term (context); +assert (rc == 0); +.fi +.SH SEE ALSO +.BR zmq_init (3) +.BR zmq_close (3) +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> diff --git a/man/man7/zmq.7 b/man/man7/zmq.7 new file mode 100644 index 0000000..02257a7 --- /dev/null +++ b/man/man7/zmq.7 @@ -0,0 +1,9 @@ +.TH zmq 7 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals" +.SH NAME +0MQ \- a lightweight messaging kernel +.SH SYNOPSIS +.SH DESCRIPTION +.SH "SEE ALSO" +.SH AUTHOR +Martin Sustrik <sustrik at fastmq dot com> + diff --git a/perf/c/local_thr.c b/perf/c/local_thr.c index c97af11..f785e80 100644 --- a/perf/c/local_thr.c +++ b/perf/c/local_thr.c @@ -79,6 +79,9 @@ int main (int argc, char *argv []) if (elapsed == 0) elapsed = 1; + rc = zmq_msg_close (&msg); + assert (rc == 0); + throughput = (unsigned long) ((double) message_count / (double) elapsed * 1000000); megabits = (double) (throughput * message_size * 8) / 1000000; diff --git a/src/Makefile.am b/src/Makefile.am index 91fb555..3d038b7 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -77,6 +77,7 @@ libzmq_la_SOURCES = app_thread.hpp \ decoder.hpp \ devpoll.hpp \ dispatcher.hpp \ + downstream.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ @@ -117,6 +118,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ + upstream.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ @@ -135,6 +137,7 @@ libzmq_la_SOURCES = app_thread.hpp \ app_thread.cpp \ devpoll.cpp \ dispatcher.cpp \ + downstream.cpp \ epoll.cpp \ err.cpp \ fd_signaler.cpp \ @@ -162,6 +165,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ + upstream.cpp \ uuid.cpp \ ypollset.cpp \ zmq.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index fbda335..a671822 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -40,11 +40,13 @@ #include "pipe.hpp" #include "config.hpp" #include "socket_base.hpp" +#include "p2p.hpp" #include "pub.hpp" #include "sub.hpp" #include "req.hpp" #include "rep.hpp" -#include "p2p.hpp" +#include "upstream.hpp" +#include "downstream.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -158,6 +160,9 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) { socket_base_t *s = NULL; switch (type_) { + case ZMQ_P2P: + s = new p2p_t (this); + break; case ZMQ_PUB: s = new pub_t (this); break; @@ -170,8 +175,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) case ZMQ_REP: s = new rep_t (this); break; - case ZMQ_P2P: - s = new p2p_t (this); + case ZMQ_UPSTREAM: + s = new upstream_t (this); + break; + case ZMQ_DOWNSTREAM: + s = new downstream_t (this); break; default: // TODO: This should be EINVAL. diff --git a/src/command.hpp b/src/command.hpp index 9a2e5d5..3099852 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -69,10 +69,12 @@ namespace zmq } attach; // Sent from session to socket to establish pipe(s) between them. + // If adjust_seqnum is true, caller have used inc_seqnum beforehand + // and thus the callee should take care of catching up. struct { - class owned_t *session; class reader_t *in_pipe; class writer_t *out_pipe; + bool adjust_seqnum; } bind; // Sent by pipe writer to inform dormant pipe reader that there diff --git a/src/devpoll.cpp b/src/devpoll.cpp index f28d55e..0ee772b 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -37,7 +37,8 @@ #include "config.hpp" #include "i_poll_events.hpp" -zmq::devpoll_t::devpoll_t () +zmq::devpoll_t::devpoll_t () : + stopping (false) { // Get limit on open files struct rlimit rl; diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 1f6b4f0..1e41ee8 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -20,6 +20,7 @@ #include "../bindings/c/zmq.h" #include "dispatcher.hpp" +#include "socket_base.hpp" #include "app_thread.hpp" #include "io_thread.hpp" #include "platform.hpp" @@ -202,3 +203,58 @@ void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_) zmq_assert (erased == 1); pipes_sync.unlock (); } + +int zmq::dispatcher_t::register_endpoint (const char *addr_, + socket_base_t *socket_) +{ + endpoints_sync.lock (); + + bool inserted = endpoints.insert (std::make_pair (addr_, socket_)).second; + if (!inserted) { + errno = EADDRINUSE; + endpoints_sync.unlock (); + return -1; + } + + endpoints_sync.unlock (); + return 0; +} + +void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_) +{ + endpoints_sync.lock (); + + endpoints_t::iterator it = endpoints.begin (); + while (it != endpoints.end ()) { + if (it->second == socket_) { + endpoints_t::iterator to_erase = it; + it++; + endpoints.erase (to_erase); + continue; + } + it++; + } + + endpoints_sync.unlock (); +} + +zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_) +{ + endpoints_sync.lock (); + + endpoints_t::iterator it = endpoints.find (addr_); + if (it == endpoints.end ()) { + endpoints_sync.unlock (); + errno = ECONNREFUSED; + return NULL; + } + socket_base_t *endpoint = it->second; + + // Increment the command sequence number of the peer so that it won't + // get deallocated until "bind" command is issued by the caller. + endpoint->inc_seqnum (); + + endpoints_sync.unlock (); + return endpoint; +} + diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index 23b6a33..8364d4d 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -97,6 +97,11 @@ namespace zmq void register_pipe (class pipe_t *pipe_); void unregister_pipe (class pipe_t *pipe_); + // Management of inproc endpoints. + int register_endpoint (const char *addr_, class socket_base_t *socket_); + void unregister_endpoints (class socket_base_t *socket_); + class socket_base_t *find_endpoint (const char *addr_); + private: ~dispatcher_t (); @@ -149,6 +154,13 @@ namespace zmq // and 'terminated' flag). mutex_t term_sync; + // List of inproc endpoints within this context. + typedef std::map <std::string, class socket_base_t*> endpoints_t; + endpoints_t endpoints; + + // Synchronisation of access to the list of inproc endpoints. + mutex_t endpoints_sync; + dispatcher_t (const dispatcher_t&); void operator = (const dispatcher_t&); }; diff --git a/src/downstream.cpp b/src/downstream.cpp new file mode 100644 index 0000000..4f994e6 --- /dev/null +++ b/src/downstream.cpp @@ -0,0 +1,131 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../bindings/c/zmq.h" + +#include "downstream.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::downstream_t::downstream_t (class app_thread_t *parent_) : + socket_base_t (parent_), + current (0) +{ + options.requires_in = false; + options.requires_out = true; +} + +zmq::downstream_t::~downstream_t () +{ +} + +void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (!inpipe_ && outpipe_); + pipes.push_back (outpipe_); +} + +void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (pipe_); + pipes.erase (pipes.index (pipe_)); +} + +void zmq::downstream_t::xkill (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::downstream_t::xrevive (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +int zmq::downstream_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + // No special option for this socket type. + errno = EINVAL; + return -1; +} + +int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_) +{ + // If there are no pipes we cannot send the message. + if (pipes.empty ()) { + errno = EAGAIN; + return -1; + } + + // Move to the next pipe (load-balancing). + current++; + if (current >= pipes.size ()) + current = 0; + + // TODO: Implement this once queue limits are in-place. + zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_))); + + // Push message to the selected pipe. + pipes [current]->write (msg_); + pipes [current]->flush (); + + // Detach the message from the data buffer. + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + + return 0; +} + +int zmq::downstream_t::xflush () +{ + // TODO: Maybe there's a point in flushing messages downstream. + // It may be useful in the case where number of messages in a single + // transaction is much greater than the number of attached pipes. + errno = ENOTSUP; + return -1; + +} + +int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + errno = ENOTSUP; + return -1; +} + +bool zmq::downstream_t::xhas_in () +{ + return false; +} + +bool zmq::downstream_t::xhas_out () +{ + // TODO: Modify this code once pipe limits are in place. + return true; +} + + diff --git a/src/downstream.hpp b/src/downstream.hpp new file mode 100644 index 0000000..c6a7ed8 --- /dev/null +++ b/src/downstream.hpp @@ -0,0 +1,64 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__ +#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + + class downstream_t : public socket_base_t + { + public: + + downstream_t (class app_thread_t *parent_); + ~downstream_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // List of outbound pipes. + typedef yarray_t <class writer_t> pipes_t; + pipes_t pipes; + + // Points to the last pipe that the most recent message was sent to. + pipes_t::size_type current; + + downstream_t (const downstream_t&); + void operator = (const downstream_t&); + }; + +} + +#endif diff --git a/src/kqueue.cpp b/src/kqueue.cpp index f32fa36..69ad0c8 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -33,7 +33,8 @@ #include "config.hpp" #include "i_poll_events.hpp" -zmq::kqueue_t::kqueue_t () +zmq::kqueue_t::kqueue_t () : + stopping (false) { // Create event queue kqueue_fd = kqueue (); diff --git a/src/object.cpp b/src/object.cpp index 1433b7b..b5d5eee 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -83,8 +83,8 @@ void zmq::object_t::process_command (command_t &cmd_) return; case command_t::bind: - process_bind (cmd_.args.bind.session, - cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); + process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe, + cmd_.args.bind.adjust_seqnum); return; case command_t::pipe_term: @@ -122,6 +122,21 @@ void zmq::object_t::unregister_pipe (class pipe_t *pipe_) dispatcher->unregister_pipe (pipe_); } +int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_) +{ + return dispatcher->register_endpoint (addr_, socket_); +} + +void zmq::object_t::unregister_endpoints (socket_base_t *socket_) +{ + return dispatcher->unregister_endpoints (socket_); +} + +zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_) +{ + return dispatcher->find_endpoint (addr_); +} + zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) { return dispatcher->choose_io_thread (taskset_); @@ -168,15 +183,15 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_) send_command (cmd); } -void zmq::object_t::send_bind (object_t *destination_, owned_t *session_, - reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::object_t::send_bind (object_t *destination_, + reader_t *in_pipe_, writer_t *out_pipe_, bool adjust_seqnum_) { command_t cmd; cmd.destination = destination_; cmd.type = command_t::bind; - cmd.args.bind.session = session_; cmd.args.bind.in_pipe = in_pipe_; cmd.args.bind.out_pipe = out_pipe_; + cmd.args.bind.adjust_seqnum = adjust_seqnum_; send_command (cmd); } @@ -250,8 +265,8 @@ void zmq::object_t::process_attach (i_engine *engine_) zmq_assert (false); } -void zmq::object_t::process_bind (owned_t *session_, - reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, + bool adjust_seqnum_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index 1954071..4fd0a8e 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -49,6 +49,12 @@ namespace zmq protected: + // Using following function, socket is able to access global + // repository of inproc endpoints. + int register_endpoint (const char *addr_, class socket_base_t *socket_); + void unregister_endpoints (class socket_base_t *socket_); + class socket_base_t *find_endpoint (const char *addr_); + // Derived object can use following functions to interact with // global repositories. See dispatcher.hpp for function details. int thread_slot_count (); @@ -62,8 +68,8 @@ namespace zmq class owned_t *object_); void send_attach (class session_t *destination_, struct i_engine *engine_); - void send_bind (object_t *destination_, class owned_t *session_, - class reader_t *in_pipe_, class writer_t *out_pipe_); + void send_bind (object_t *destination_, class reader_t *in_pipe_, + class writer_t *out_pipe_, bool adjust_seqnum_); void send_revive (class object_t *destination_); void send_pipe_term (class writer_t *destination_); void send_pipe_term_ack (class reader_t *destination_); @@ -78,8 +84,8 @@ namespace zmq virtual void process_plug (); virtual void process_own (class owned_t *object_); virtual void process_attach (struct i_engine *engine_); - virtual void process_bind (class owned_t *session_, - class reader_t *in_pipe_, class writer_t *out_pipe_); + virtual void process_bind (class reader_t *in_pipe_, + class writer_t *out_pipe_, bool adjust_seqnum_); virtual void process_revive (); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); diff --git a/src/p2p.hpp b/src/p2p.hpp index 1fd7e34..32d7755 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_P2P_INCLUDED__ -#define __ZMQ_P2P_INCLUDED__ +#ifndef __ZMQ_P2P_HPP_INCLUDED__ +#define __ZMQ_P2P_HPP_INCLUDED__ #include "socket_base.hpp" diff --git a/src/pipe.cpp b/src/pipe.cpp index e444520..0e15dce 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -81,7 +81,11 @@ void zmq::reader_t::term () void zmq::reader_t::process_revive () { - endpoint->revive (this); + // Beacuse of command throttling mechanism, incoming termination request + // may not have been processed before subsequent send. + // In that case endpoint is NULL. + if (endpoint) + endpoint->revive (this); } void zmq::reader_t::process_pipe_term_ack () diff --git a/src/pub.hpp b/src/pub.hpp index b3e868d..9dbcb4a 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_PUB_INCLUDED__ -#define __ZMQ_PUB_INCLUDED__ +#ifndef __ZMQ_PUB_HPP_INCLUDED__ +#define __ZMQ_PUB_HPP_INCLUDED__ #include "socket_base.hpp" #include "yarray.hpp" diff --git a/src/rep.cpp b/src/rep.cpp index e8a9e39..f06f4ab 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -71,7 +71,7 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_) } // Now both inpipe and outpipe are detached. Remove them from the lists. - if (in_pipes.index (pipe_) < active) + if (index < active) active--; in_pipes.erase (index); out_pipes.erase (index); @@ -178,14 +178,15 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) // Round-robin over the pipes to get next message. for (int count = active; count != 0; count--) { bool fetched = in_pipes [current]->read (msg_); - current++; - if (current >= active) - current = 0; if (fetched) { reply_pipe = out_pipes [current]; waiting_for_reply = true; - return 0; } + current++; + if (current >= active) + current = 0; + if (fetched) + return 0; } // No message is available. Initialise the output parameter diff --git a/src/rep.hpp b/src/rep.hpp index 3e87dc1..0b327aa 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_REP_INCLUDED__ -#define __ZMQ_REP_INCLUDED__ +#ifndef __ZMQ_REP_HPP_INCLUDED__ +#define __ZMQ_REP_HPP_INCLUDED__ #include "socket_base.hpp" #include "yarray.hpp" diff --git a/src/req.hpp b/src/req.hpp index 86554b5..756cc42 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_REQ_INCLUDED__ -#define __ZMQ_REQ_INCLUDED__ +#ifndef __ZMQ_REQ_HPP_INCLUDED__ +#define __ZMQ_REQ_HPP_INCLUDED__ #include "socket_base.hpp" #include "yarray.hpp" diff --git a/src/session.cpp b/src/session.cpp index eb0a963..87b47b0 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -51,10 +51,6 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) bool zmq::session_t::write (::zmq_msg_t *msg_) { - // The communication is unidirectional. - // We don't expect any message to arrive. - zmq_assert (out_pipe); - if (out_pipe->write (msg_)) { zmq_msg_init (msg_); return true; @@ -155,8 +151,10 @@ void zmq::session_t::process_plug () out_pipe->set_endpoint (this); } - send_bind (owner, this, outbound ? &outbound->reader : NULL, - inbound ? &inbound->writer : NULL); + // Note that initial call to inc_seqnum was optimised out. Last + // parameter conveys the fact to the callee. + send_bind (owner, outbound ? &outbound->reader : NULL, + inbound ? &inbound->writer : NULL, false); } owned_t::process_plug (); diff --git a/src/simple_semaphore.hpp b/src/simple_semaphore.hpp index 209ccb4..3342281 100644 --- a/src/simple_semaphore.hpp +++ b/src/simple_semaphore.hpp @@ -23,7 +23,11 @@ #include "platform.hpp" #include "err.hpp" -#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS +#if 0 //defined ZMQ_HAVE_LINUX +#include <sys/syscall.h> +#include <unistd.h> +#include <linux/futex.h> +#elif defined ZMQ_HAVE_LINUX ||defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS #include <pthread.h> #elif defined ZMQ_HAVE_WINDOWS #include "windows.hpp" @@ -33,13 +37,63 @@ namespace zmq { - // Simple semaphore. Only single thread may be waiting at any given time. // Also, the semaphore may not be posted before the previous post // was matched by corresponding wait and the waiting thread was // released. -#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS +#if 0 //defined ZMQ_HAVE_LINUX + + // In theory, using private futexes should be more efficient on Linux + // platform than using mutexes. However, in uncontended cases of TCP + // transport on loopback interface we haven't seen any latency improvement. + // The code is commented out waiting for more thorough testing. + + class simple_semaphore_t + { + public: + + // Initialise the semaphore. + inline simple_semaphore_t () : + dummy (0) + { + } + + // Destroy the semaphore. + inline ~simple_semaphore_t () + { + } + + // Wait for the semaphore. + inline void wait () + { + int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAIT_PRIVATE, + (int) 0, NULL, NULL, (int) 0); + zmq_assert (rc == 0); + } + + // Post the semaphore. + inline void post () + { + while (true) { + int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAKE_PRIVATE, + (int) 1, NULL, NULL, (int) 0); + zmq_assert (rc != -1 && rc <= 1); + if (rc == 1) + break; + } + } + + private: + + int dummy; + + // Disable copying of the object. + simple_semaphore_t (const simple_semaphore_t&); + void operator = (const simple_semaphore_t&); + }; + +#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS // On platforms that allow for double locking of a mutex from the same // thread, simple semaphore is implemented using mutex, as it is more diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 6583608..a614759 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -43,7 +43,9 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : pending_term_acks (0), ticks (0), app_thread (parent_), - shutting_down (false) + shutting_down (false), + sent_seqnum (0), + processed_seqnum (0) { } @@ -81,6 +83,9 @@ int zmq::socket_base_t::bind (const char *addr_) addr_type = addr.substr (0, pos); addr_args = addr.substr (pos + 3); + if (addr_type == "inproc") + return register_endpoint (addr_args.c_str (), this); + if (addr_type == "tcp") { zmq_listener_t *listener = new zmq_listener_t ( choose_io_thread (options.affinity), this, options); @@ -126,6 +131,41 @@ int zmq::socket_base_t::connect (const char *addr_) addr_type = addr.substr (0, pos); addr_args = addr.substr (pos + 3); + if (addr_type == "inproc") { + + // Find the peer socket. + socket_base_t *peer = find_endpoint (addr_args.c_str ()); + if (!peer) + return -1; + + pipe_t *in_pipe = NULL; + pipe_t *out_pipe = NULL; + + // Create inbound pipe, if required. + if (options.requires_in) { + in_pipe = new pipe_t (this, peer, options.hwm, options.lwm); + zmq_assert (in_pipe); + } + + // Create outbound pipe, if required. + if (options.requires_out) { + out_pipe = new pipe_t (peer, this, options.hwm, options.lwm); + zmq_assert (out_pipe); + } + + // Attach the pipes to this socket object. + attach_pipes (in_pipe ? &in_pipe->reader : NULL, + out_pipe ? &out_pipe->writer : NULL); + + // Attach the pipes to the peer socket. Note that peer's seqnum + // was incremented in find_endpoint function. The callee is notified + // about the fact via the last parameter. + send_bind (peer, out_pipe ? &out_pipe->reader : NULL, + in_pipe ? &in_pipe->writer : NULL, true); + + return 0; + } + // Create the session. io_thread_t *io_thread = choose_io_thread (options.affinity); session_t *session = new session_t (io_thread, this, session_name.c_str (), @@ -319,13 +359,24 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::close () { + shutting_down = true; + + // Let the thread know that the socket is no longer available. app_thread->remove_socket (this); // Pointer to the dispatcher must be retrieved before the socket is // deallocated. Afterwards it is not available. dispatcher_t *dispatcher = get_dispatcher (); - shutting_down = true; + // Unregister all inproc endpoints associated with this socket. + // From this point we are sure that inc_seqnum won't be called again + // on this object. + dispatcher->unregister_endpoints (this); + + // Wait till all undelivered commands are delivered. This should happen + // very quickly. There's no way to wait here for extensive period of time. + while (processed_seqnum != sent_seqnum.get ()) + app_thread->process_commands (true, false); while (true) { @@ -364,6 +415,12 @@ int zmq::socket_base_t::close () return 0; } +void zmq::socket_base_t::inc_seqnum () +{ + // NB: This function may be called from a different thread! + sent_seqnum.add (1); +} + zmq::app_thread_t *zmq::socket_base_t::get_thread () { return app_thread; @@ -452,9 +509,16 @@ void zmq::socket_base_t::process_own (owned_t *object_) io_objects.insert (object_); } -void zmq::socket_base_t::process_bind (owned_t *session_, - reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, + bool adjust_seqnum_) { + // In case of inproc transport, the seqnum should catch up here. + // For other transports the seqnum modification can be optimised out + // because final handshaking between the socket and the session ensures + // that no 'bind' command will be left unprocessed. + if (adjust_seqnum_) + processed_seqnum++; + attach_pipes (in_pipe_, out_pipe_); } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 49ff5a5..dd7b526 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -33,6 +33,7 @@ #include "mutex.hpp" #include "options.hpp" #include "stdint.hpp" +#include "atomic_counter.hpp" namespace zmq { @@ -54,6 +55,11 @@ namespace zmq int recv (zmq_msg_t *msg_, int flags_); int close (); + // When another owned object wants to send command to this object + // it calls this function to let it know it should not shut down + // before the command is delivered. + void inc_seqnum (); + // This function is used by the polling mechanism to determine // whether the socket belongs to the application thread the poll // is called from. @@ -108,8 +114,8 @@ namespace zmq // Handlers for incoming commands. void process_own (class owned_t *object_); - void process_bind (class owned_t *session_, - class reader_t *in_pipe_, class writer_t *out_pipe_); + void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_, + bool adjust_seqnum_); void process_term_req (class owned_t *object_); void process_term_ack (); @@ -132,6 +138,12 @@ namespace zmq // started. bool shutting_down; + // Sequence number of the last command sent to this object. + atomic_counter_t sent_seqnum; + + // Sequence number of the last command processed by this object. + uint64_t processed_seqnum; + // List of existing sessions. This list is never referenced from within // the socket, instead it is used by I/O objects owned by the session. // As those objects can live in different threads, the access is diff --git a/src/sub.hpp b/src/sub.hpp index fb881dc..8ad8a18 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_SUB_INCLUDED__ -#define __ZMQ_SUB_INCLUDED__ +#ifndef __ZMQ_SUB_HPP_INCLUDED__ +#define __ZMQ_SUB_HPP_INCLUDED__ #include <set> #include <string> diff --git a/src/upstream.cpp b/src/upstream.cpp new file mode 100644 index 0000000..da202f8 --- /dev/null +++ b/src/upstream.cpp @@ -0,0 +1,143 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../bindings/c/zmq.h" + +#include "upstream.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::upstream_t::upstream_t (class app_thread_t *parent_) : + socket_base_t (parent_), + active (0), + current (0) +{ + options.requires_in = true; + options.requires_out = false; +} + +zmq::upstream_t::~upstream_t () +{ +} + +void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (inpipe_ && !outpipe_); + + pipes.push_back (inpipe_); + pipes.swap (active, pipes.size () - 1); + active++; +} + +void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_) +{ + // Remove the pipe from the list; adjust number of active pipes + // accordingly. + zmq_assert (pipe_); + pipes_t::size_type index = pipes.index (pipe_); + if (index < active) + active--; + pipes.erase (index); +} + +void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_) +{ + // There are no outpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::upstream_t::xkill (class reader_t *pipe_) +{ + // Move the pipe to the list of inactive pipes. + active--; + pipes.swap (pipes.index (pipe_), active); +} + +void zmq::upstream_t::xrevive (class reader_t *pipe_) +{ + // Move the pipe to the list of active pipes. + pipes.swap (pipes.index (pipe_), active); + active++; +} + +int zmq::upstream_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + // No special options for this socket type. + errno = EINVAL; + return -1; +} + +int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_) +{ + errno = ENOTSUP; + return -1; +} + +int zmq::upstream_t::xflush () +{ + errno = ENOTSUP; + return -1; +} + +int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + // Deallocate old content of the message. + zmq_msg_close (msg_); + + // Round-robin over the pipes to get next message. + for (int count = active; count != 0; count--) { + bool fetched = pipes [current]->read (msg_); + current++; + if (current >= active) + current = 0; + if (fetched) + return 0; + } + + // No message is available. Initialise the output parameter + // to be a 0-byte message. + zmq_msg_init (msg_); + errno = EAGAIN; + return -1; +} + +bool zmq::upstream_t::xhas_in () +{ + // Note that messing with current doesn't break the fairness of fair + // queueing algorithm. If there are no messages available current will + // get back to its original value. Otherwise it'll point to the first + // pipe holding messages, skipping only pipes with no messages available. + for (int count = active; count != 0; count--) { + if (pipes [current]->check_read ()) + return true; + current++; + if (current >= active) + current = 0; + } + + return false; +} + +bool zmq::upstream_t::xhas_out () +{ + return false; +} + diff --git a/src/upstream.hpp b/src/upstream.hpp new file mode 100644 index 0000000..0e2f5ad --- /dev/null +++ b/src/upstream.hpp @@ -0,0 +1,69 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__ +#define __ZMQ_UPSTREAM_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + + class upstream_t : public socket_base_t + { + public: + + upstream_t (class app_thread_t *parent_); + ~upstream_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // Inbound pipes. + typedef yarray_t <class reader_t> pipes_t; + pipes_t pipes; + + // Number of active pipes. All the active pipes are located at the + // beginning of the pipes array. + pipes_t::size_type active; + + // Index of the next bound pipe to read a message from. + pipes_t::size_type current; + + upstream_t (const upstream_t&); + void operator = (const upstream_t&); + + }; + +} + +#endif diff --git a/src/zmq.cpp b/src/zmq.cpp index 7952b61..9b66be8 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -198,8 +198,10 @@ size_t zmq_msg_size (zmq_msg_t *msg_) void *zmq_init (int app_threads_, int io_threads_, int flags_) { - // There should be at least a single thread managed by the dispatcher. - if (app_threads_ <= 0 || io_threads_ <= 0 || + // There should be at least a single application thread managed + // by the dispatcher. There's no need for I/O threads if 0MQ is used + // only for inproc messaging + if (app_threads_ < 1 || io_threads_ < 0 || app_threads_ > 63 || io_threads_ > 63) { errno = EINVAL; return NULL; diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index 53811a1..8040f21 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -51,6 +51,9 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () else { // TODO: Handle over-sized message decently. + // in_progress is initialised at this point so in theory we should + // close it before calling zmq_msg_init_size, however, it's a 0-byte + // message and thus we can treat it as uninitialised... int rc = zmq_msg_init_size (&in_progress, *tmpbuf); errno_assert (rc == 0); @@ -67,6 +70,9 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () size_t size = (size_t) get_uint64 (tmpbuf); // TODO: Handle over-sized message decently. + // in_progress is initialised at this point so in theory we should + // close it before calling zmq_msg_init_size, however, it's a 0-byte + // message and thus we can treat it as uninitialised... int rc = zmq_msg_init_size (&in_progress, size); errno_assert (rc == 0); @@ -78,7 +84,7 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () bool zmq::zmq_decoder_t::message_ready () { // Message is completely read. Push it further and start reading - // new message. + // new message. (in_progress is a 0-byte message after this point.) if (!destination || !destination->write (&in_progress)) return false; diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index 44b919b..180bda7 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -50,12 +50,17 @@ bool zmq::zmq_encoder_t::size_ready () bool zmq::zmq_encoder_t::message_ready () { + // Destroy content of the old message. + zmq_msg_close(&in_progress); + // Read new message from the dispatcher. If there is none, return false. // Note that new state is set only if write is successful. That way // unsuccessful write will cause retry on the next state machine // invocation. - if (!source || !source->read (&in_progress)) + if (!source || !source->read (&in_progress)) { + zmq_msg_init (&in_progress); return false; + } size_t size = zmq_msg_size (&in_progress); diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp index eec41c7..0d9488d 100644 --- a/src/zmq_listener_init.cpp +++ b/src/zmq_listener_init.cpp @@ -55,7 +55,6 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_) has_peer_identity = true; peer_identity.assign ((const char*) zmq_msg_data (msg_), zmq_msg_size (msg_)); - return true; } |