diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-11-04 09:48:25 +0100 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-11-04 09:48:25 +0100 |
commit | 05ce301f3571e3e690792a189cb927328163f0bc (patch) | |
tree | 1145b921033a2b88e9b987ec1d3e03060559702d | |
parent | a8362abf11b51dd553766fb07a9e60f28e788126 (diff) | |
parent | 6cdd720400ea456ccbfdf09cdc5054ab07dbdc6f (diff) |
Merge branch 'master' of github.com:zeromq/libzmq
164 files changed, 696 insertions, 1655 deletions
@@ -5,6 +5,7 @@ Alexej Lotz <alexej.lotz@arcor.de> Andrew Thompson <andy@fud.org.nz> Asko Kauppi <askok@dnainternet.net> Barak Amar <barak.amar@gmail.com> +Ben Gray <ben@benjamg.com> Bernd Prager <bernd@prager.ws> Bernd Melchers <melchers@ZEDAT.FU-Berlin.DE> Bob Beaty <rbeaty@peak6.com> @@ -50,9 +51,11 @@ Mikko Koppanen <mkoppanen@php.net> Min Ragan-Kelley <benjaminrk@gmail.com> Neale Ferguson <neale@sinenomine.net> Nir Soffer <nirsof@gmail.com> +Paul Betts <paul@paulbetts.org> Paul Colomiets <pc@gafol.net> Pavel Gushcha <pavimus@gmail.com> Pavol Malosek <malosek@fastmq.com> +Perry Kundert <perry@kundert.ca> Peter Bourgon <peter.bourgon@gmail.com> Pieter Hintjens <ph@imatix.com> Piotr Trojanek <piotr.trojanek@gmail.com> diff --git a/COPYING.LESSER b/COPYING.LESSER index 3d13ddb..9925774 100644 --- a/COPYING.LESSER +++ b/COPYING.LESSER @@ -166,16 +166,16 @@ Library. -------------------------------------------------------------------------------- - SPECIAL EXCEPTION GRANTED BY IMATIX - -As a special exception, iMatix gives you permission to link this library with -independent modules to produce an executable, regardless of the license terms -of these independent modules, and to copy and distribute the resulting -executable under terms of your choice, provided that you also meet, for each -linked independent module, the terms and conditions of the license of that -module. An independent module is a module which is not derived from or based on -this library. If you modify this library, you must extend this exception to your -version of the library. + SPECIAL EXCEPTION GRANTED BY COPYRIGHT HOLDERS + +As a special exception, copyright holders give you permission to link this +library with independent modules to produce an executable, regardless of +the license terms of these independent modules, and to copy and distribute +the resulting executable under terms of your choice, provided that you also +meet, for each linked independent module, the terms and conditions of +the license of that module. An independent module is a module which is not +derived from or based on this library. If you modify this library, you must +extend this exception to your version of the library. -------------------------------------------------------------------------------- @@ -1,6 +1,6 @@ #!/bin/sh -# Copyright (c) 2007-2011 iMatix Corporation +# Copyright (c) 2007-2009 iMatix Corporation # Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file # # This file is part of 0MQ. diff --git a/configure.in b/configure.in index b0ee7a1..86147ea 100644 --- a/configure.in +++ b/configure.in @@ -348,19 +348,6 @@ fi AC_SUBST(pgm_basename) -# VTCP extension -libzmq_vtcp="no" - -AC_ARG_WITH([vtcp], [AS_HELP_STRING([--with-vtcp], - [build libzmq with VTCP extension [default=no]])], - [with_vtcp=$withval], [with_vtcp=no]) - -if test "x$with_vtcp" != "xno"; then - AC_DEFINE(ZMQ_HAVE_VTCP, 1, [Have VTCP extension]) - AC_CHECK_LIB(vtcp, vtcp_bind, , - [AC_MSG_ERROR([cannot link with -lvtcp, install libvtcp.])]) -fi - # Set -Wall, -Werror and -pedantic AC_LANG_PUSH([C++]) diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index 99065c0..252834e 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -39,19 +39,6 @@ Default value:: N/A Applicable socket types:: all -ZMQ_RCVLABEL: Inquires whether last message received was a label -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_RCVLABEL' option shall return True (1) if the message part last -received from the 'socket' was an address label. Otherwise, this option -shall return False (0). - -[horizontal] -Option value type:: int -Option value unit:: boolean -Default value:: N/A -Applicable socket types:: all - - ZMQ_RCVMORE: More message data parts to follow ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_RCVMORE' option shall return True (1) if the message part last @@ -130,6 +117,22 @@ Option value unit:: N/A (bitmap) Default value:: 0 Applicable socket types:: N/A +ZMQ_IDENTITY: Set socket identity +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_IDENTITY' option shall retrieve the identity of the specified 'socket'. +Socket identity is used only by request/reply pattern. Namely, it can be used +in tandem with ROUTER socket to route messages to the peer with specific +identity. + +Identity should be at least one byte and at most 255 bytes long. Identities +starting with binary zero are reserved for use by 0MQ infrastructure. + +[horizontal] +Option value type:: binary data +Option value unit:: N/A +Default value:: NULL +Applicable socket types:: all + ZMQ_RATE: Retrieve multicast data rate ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/zmq_recv.txt b/doc/zmq_recv.txt index c9c3ce8..d1ef3e1 100644 --- a/doc/zmq_recv.txt +++ b/doc/zmq_recv.txt @@ -29,8 +29,7 @@ function shall fail with 'errno' set to EAGAIN. Multi-part messages ~~~~~~~~~~~~~~~~~~~ -A 0MQ message is composed of 1 or more message parts, starting with zero or -more address 'label' parts, followed by 1 or more 'data' parts. Each message +A 0MQ message is composed of 1 or more message parts. Each message part is an independent 'zmq_msg_t' in its own right. 0MQ ensures atomic delivery of messages; peers shall receive either all _message parts_ of a message or none at all. The total number of message parts is unlimited except @@ -38,9 +37,7 @@ by available memory. An application that processes multipart messages must use the _ZMQ_RCVMORE_ linkzmq:zmq_getsockopt[3] option after calling _zmq_recv()_ to determine if -there are further parts to receive. An application that manipulates address -labels must use _ZMQ_RCVLABEL_ to determine the zero or more label parts -that precede the data part(s). +there are further parts to receive. RETURN VALUE ------------ diff --git a/doc/zmq_recvmsg.txt b/doc/zmq_recvmsg.txt index 358ea3f..6e41b1e 100644 --- a/doc/zmq_recvmsg.txt +++ b/doc/zmq_recvmsg.txt @@ -29,8 +29,7 @@ function shall fail with 'errno' set to EAGAIN. Multi-part messages ~~~~~~~~~~~~~~~~~~~ -A 0MQ message is composed of 1 or more message parts, starting with zero or -more address 'label' parts, followed by 1 or more 'data' parts. Each message +A 0MQ message is composed of 1 or more message parts. Each message part is an independent 'zmq_msg_t' in its own right. 0MQ ensures atomic delivery of messages; peers shall receive either all _message parts_ of a message or none at all. The total number of message parts is unlimited except @@ -38,9 +37,7 @@ by available memory. An application that processes multipart messages must use the _ZMQ_RCVMORE_ linkzmq:zmq_getsockopt[3] option after calling _zmq_recvmsg()_ to determine if -there are further parts to receive. An application that manipulates address -labels must use _ZMQ_RCVLABEL_ to determine the zero or more label parts -that precede the data part(s). +there are further parts to receive. RETURN VALUE diff --git a/doc/zmq_send.txt b/doc/zmq_send.txt index 133de97..f00e449 100644 --- a/doc/zmq_send.txt +++ b/doc/zmq_send.txt @@ -23,11 +23,6 @@ Specifies that the operation should be performed in non-blocking mode. If the message cannot be queued on the 'socket', the _zmq_send()_ function shall fail with 'errno' set to EAGAIN. -*ZMQ_SNDLABEL*:: -Specifies that the message part being sent is an address label, and that -further message parts are to follow. Refer to linkzmq:zmq_socket[3] for the -semantics of address labels in each socket pattern. - *ZMQ_SNDMORE*:: Specifies that the message being sent is a multi-part message, and that further message parts are to follow. Refer to the section regarding multi-part messages @@ -40,16 +35,14 @@ the 'socket' and 0MQ has assumed responsibility for the message. Multi-part messages ~~~~~~~~~~~~~~~~~~~ -A 0MQ message is composed of 1 or more message parts, starting with zero or -more address 'label' parts, followed by 1 or more 'data' parts. Each message +A 0MQ message is composed of 1 or more message parts. Each message part is an independent 'zmq_msg_t' in its own right. 0MQ ensures atomic delivery of messages; peers shall receive either all _message parts_ of a message or none at all. The total number of message parts is unlimited except by available memory. An application that sends multipart messages must use the _ZMQ_SNDMORE_ flag -when sending each data part except the final one. An application that sends -address labels must use _ZMQ_SNDLABEL_ when sending each label. +when sending each data part except the final one. RETURN VALUE diff --git a/doc/zmq_sendmsg.txt b/doc/zmq_sendmsg.txt index 244c0bd..d069bd7 100644 --- a/doc/zmq_sendmsg.txt +++ b/doc/zmq_sendmsg.txt @@ -23,11 +23,6 @@ Specifies that the operation should be performed in non-blocking mode. If the message cannot be queued on the 'socket', the _zmq_sendmsg()_ function shall fail with 'errno' set to EAGAIN. -*ZMQ_SNDLABEL*:: -Specifies that the message part being sent is an address 'label', and that -further message parts are to follow. Refer to linkzmq:zmq_socket[3] for the -semantics of address labels in each socket pattern. - *ZMQ_SNDMORE*:: Specifies that the message being sent is a multi-part message, and that further message parts are to follow. Refer to the section regarding multi-part messages @@ -44,16 +39,14 @@ the 'socket' and 0MQ has assumed responsibility for the message. Multi-part messages ~~~~~~~~~~~~~~~~~~~ -A 0MQ message is composed of 1 or more message parts, starting with zero or -more address 'label' parts, followed by 1 or more 'data' parts. Each message +A 0MQ message is composed of 1 or more message parts. Each message part is an independent 'zmq_msg_t' in its own right. 0MQ ensures atomic delivery of messages; peers shall receive either all _message parts_ of a message or none at all. The total number of message parts is unlimited except by available memory. An application that sends multipart messages must use the _ZMQ_SNDMORE_ flag -when sending each data part except the final one. An application that sends -address labels must use _ZMQ_SNDLABEL_ when sending each label. +when sending each data part except the final one. RETURN VALUE ------------ diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index fd004f4..72d1faa 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -122,6 +122,23 @@ Default value:: N/A Applicable socket types:: ZMQ_SUB +ZMQ_IDENTITY: Set socket identity +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_IDENTITY' option shall set the identity of the specified 'socket'. +Socket identity is used only by request/reply pattern. Namely, it can be used +in tandem with ROUTER socket to route messages to the peer with specific +identity. + +Identity should be at least one byte and at most 255 bytes long. Identities +starting with binary zero are reserved for use by 0MQ infrastructure. + +[horizontal] +Option value type:: binary data +Option value unit:: N/A +Default value:: NULL +Applicable socket types:: all + + ZMQ_RATE: Set multicast data rate ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_RATE' option shall set the maximum send or receive data rate for diff --git a/doc/zmq_socket.txt b/doc/zmq_socket.txt index e138ebe..1ba2f42 100644 --- a/doc/zmq_socket.txt +++ b/doc/zmq_socket.txt @@ -81,11 +81,6 @@ any linkzmq:zmq_send[3] operations on the socket shall block until the exceptional state ends or at least one _service_ becomes available for sending; messages are not discarded. -'ZMQ_REQ' socket adds a unique 'request ID' label to every outbound message. -When receiving a reply, it checks whether the 'request ID' of the reply matches -the last 'request ID' sent. If it does not, the message is silently dropped and -waiting for the reply is resumed. - [horizontal] .Summary of ZMQ_REQ characteristics Compatible peer sockets:: 'ZMQ_REP' @@ -108,10 +103,6 @@ When a 'ZMQ_REP' socket enters an exceptional state due to having reached the high water mark for a _client_, then any replies sent to the _client_ in question shall be dropped until the exceptional state ends. -'ZMQ_REP' socket strips all the labels from the incoming message, stores them -and passes the remaining data parts to the user. When user sends the reply, -the stored labels are re-attached to the reply. - [horizontal] .Summary of ZMQ_REP characteristics Compatible peer sockets:: 'ZMQ_REQ' @@ -136,8 +127,6 @@ linkzmq:zmq_send[3] operations on the socket shall block until the exceptional state ends or at least one peer becomes available for sending; messages are not discarded. -'ZMQ_XREQ' socket doesn't inspect or modify the message labels. - [horizontal] .Summary of ZMQ_XREQ characteristics Compatible peer sockets:: 'ZMQ_XREP', 'ZMQ_REP' @@ -162,14 +151,6 @@ messages sent to the socket shall be dropped until the exceptional state ends. Likewise, any messages to be routed to a non-existent peer or a peer for which the individual high water mark has been reached shall also be dropped. -When receiving messages a 'ZMQ_XREP' socket attaches a label uniquely -identifying the originating peer to the message before passing it to the -application. - -When sending messages a 'ZMQ_XREP' socket removes the first label from the -message and uses it to determine which the peer the message shall be routed to. -If the peer does not exist anymore the message is silently discarded. - [horizontal] .Summary of ZMQ_XREP characteristics Compatible peer sockets:: 'ZMQ_XREQ', 'ZMQ_REQ' @@ -196,8 +177,6 @@ high water mark for a _subscriber_, then any messages that would be sent to the _subscriber_ in question shall instead be dropped until the exceptional state ends. The _zmq_send()_ function shall never block for this socket type. -This socket type doesn't use message labels. - [horizontal] .Summary of ZMQ_PUB characteristics Compatible peer sockets:: 'ZMQ_SUB', 'ZMQ_XSUB' @@ -215,8 +194,6 @@ any messages, use the 'ZMQ_SUBSCRIBE' option of linkzmq:zmq_setsockopt[3] to specify which messages to subscribe to. The _zmq_send()_ function is not implemented for this socket type. -This socket type doesn't use message labels. - [horizontal] .Summary of ZMQ_SUB characteristics Compatible peer sockets:: 'ZMQ_PUB', 'ZMQ_XPUB' @@ -233,8 +210,6 @@ in form of incoming messages. Subscription message is a byte 1 (for subscriptions) or byte 0 (for unsubscriptions) followed by the subscription body. -This socket type doesn't use message labels. - [horizontal] .Summary of ZMQ_XPUB characteristics Compatible peer sockets:: 'ZMQ_SUB', 'ZMQ_XSUB' @@ -250,8 +225,6 @@ Same as ZMQ_SUB except that you subscribe by sending subscription messages to the socket. Subscription message is a byte 1 (for subscriptions) or byte 0 (for unsubscriptions) followed by the subscription body. -This socket type doesn't use message labels. - [horizontal] .Summary of ZMQ_XSUB characteristics Compatible peer sockets:: 'ZMQ_PUB', 'ZMQ_XPUB' @@ -282,8 +255,6 @@ _nodes_ at all, then any linkzmq:zmq_send[3] operations on the socket shall block until the exceptional state ends or at least one downstream _node_ becomes available for sending; messages are not discarded. -This socket type doesn't use message labels. - [horizontal] .Summary of ZMQ_PUSH characteristics Compatible peer sockets:: 'ZMQ_PULL' @@ -301,8 +272,6 @@ from upstream pipeline _nodes_. Messages are fair-queued from among all connected upstream _nodes_. The _zmq_send()_ function is not implemented for this socket type. -This socket type doesn't use message labels. - [horizontal] .Summary of ZMQ_PULL characteristics Compatible peer sockets:: 'ZMQ_PUSH' @@ -330,8 +299,6 @@ high water mark for the connected peer, or if no peer is connected, then any linkzmq:zmq_send[3] operations on the socket shall block until the peer becomes available for sending; messages are not discarded. -This socket type doesn't use message labels. - NOTE: 'ZMQ_PAIR' sockets are experimental, and are currently missing several features such as auto-reconnection. diff --git a/include/zmq.h b/include/zmq.h index e236b2a..09ed89c 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2010 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -162,10 +164,13 @@ ZMQ_EXPORT int zmq_term (void *context); #define ZMQ_PUSH 8 #define ZMQ_XPUB 9 #define ZMQ_XSUB 10 -#define ZMQ_ROUTER 13 + +#define ZMQ_ROUTER ZMQ_XREP +#define ZMQ_DEALER ZMQ_XREQ /* Socket options. */ #define ZMQ_AFFINITY 4 +#define ZMQ_IDENTITY 5 #define ZMQ_SUBSCRIBE 6 #define ZMQ_UNSUBSCRIBE 7 #define ZMQ_RATE 8 @@ -186,15 +191,11 @@ ZMQ_EXPORT int zmq_term (void *context); #define ZMQ_MULTICAST_HOPS 25 #define ZMQ_RCVTIMEO 27 #define ZMQ_SNDTIMEO 28 -#define ZMQ_RCVLABEL 29 -#define ZMQ_RCVCMD 30 #define ZMQ_IPV4ONLY 31 /* Send/recv options. */ #define ZMQ_DONTWAIT 1 #define ZMQ_SNDMORE 2 -#define ZMQ_SNDLABEL 4 -#define ZMQ_SNDCMD 8 ZMQ_EXPORT void *zmq_socket (void *context, int type); ZMQ_EXPORT int zmq_close (void *s); diff --git a/include/zmq_utils.h b/include/zmq_utils.h index 6d8a458..341d639 100644 --- a/include/zmq_utils.h +++ b/include/zmq_utils.h @@ -1,5 +1,5 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/perf/inproc_lat.cpp b/perf/inproc_lat.cpp index 7c15013..5b6a830 100644 --- a/perf/inproc_lat.cpp +++ b/perf/inproc_lat.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/perf/inproc_thr.cpp b/perf/inproc_thr.cpp index 4673eba..b4cadfc 100644 --- a/perf/inproc_thr.cpp +++ b/perf/inproc_thr.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/perf/local_lat.cpp b/perf/local_lat.cpp index 999e799..714b8c0 100644 --- a/perf/local_lat.cpp +++ b/perf/local_lat.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/perf/local_thr.cpp b/perf/local_thr.cpp index a21707f..5c495d8 100644 --- a/perf/local_thr.cpp +++ b/perf/local_thr.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/perf/remote_lat.cpp b/perf/remote_lat.cpp index 0d438e9..9eb76b0 100644 --- a/perf/remote_lat.cpp +++ b/perf/remote_lat.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/perf/remote_thr.cpp b/perf/remote_thr.cpp index c8df333..328bdce 100644 --- a/perf/remote_thr.cpp +++ b/perf/remote_thr.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/Makefile.am b/src/Makefile.am index 3b7dec6..4d3cba3 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -9,6 +9,7 @@ libzmq_la_SOURCES = \ array.hpp \ atomic_counter.hpp \ atomic_ptr.hpp \ + blob.hpp \ clock.hpp \ command.hpp \ config.hpp \ @@ -55,7 +56,6 @@ libzmq_la_SOURCES = \ reaper.hpp \ rep.hpp \ req.hpp \ - router.hpp \ select.hpp \ session_base.hpp \ signaler.hpp \ @@ -68,8 +68,6 @@ libzmq_la_SOURCES = \ tcp_listener.hpp \ thread.hpp \ trie.hpp \ - vtcp_connecter.hpp \ - vtcp_listener.hpp \ windows.hpp \ wire.hpp \ xpub.hpp \ @@ -113,7 +111,6 @@ libzmq_la_SOURCES = \ reaper.cpp \ pub.cpp \ random.cpp \ - router.cpp \ rep.cpp \ req.cpp \ select.cpp \ @@ -127,8 +124,6 @@ libzmq_la_SOURCES = \ tcp_listener.cpp \ thread.cpp \ trie.cpp \ - vtcp_connecter.cpp \ - vtcp_listener.cpp \ xpub.cpp \ xrep.cpp \ xreq.cpp \ diff --git a/src/array.hpp b/src/array.hpp index b1f6eca..7e4ddd4 100644 --- a/src/array.hpp +++ b/src/array.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/atomic_counter.hpp b/src/atomic_counter.hpp index d7116d8..a0a67bf 100644 --- a/src/atomic_counter.hpp +++ b/src/atomic_counter.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/atomic_ptr.hpp b/src/atomic_ptr.hpp index c106cd5..c59ab81 100644 --- a/src/atomic_ptr.hpp +++ b/src/atomic_ptr.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/blob.hpp b/src/blob.hpp new file mode 100644 index 0000000..b8039c4 --- /dev/null +++ b/src/blob.hpp @@ -0,0 +1,35 @@ +/* + Copyright (c) 2010 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_BLOB_HPP_INCLUDED__ +#define __ZMQ_BLOB_HPP_INCLUDED__ + +#include <string> + +namespace zmq +{ + + // Object to hold dynamically allocated opaque binary data. + typedef std::basic_string <unsigned char> blob_t; + +} + +#endif + diff --git a/src/clock.cpp b/src/clock.cpp index f98a2f4..92fc4be 100644 --- a/src/clock.cpp +++ b/src/clock.cpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/clock.hpp b/src/clock.hpp index 1b34989..b3b19b2 100644 --- a/src/clock.hpp +++ b/src/clock.hpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/command.hpp b/src/command.hpp index 1513ca8..ecf2d93 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/config.hpp b/src/config.hpp index 8e512bb..f7b4c50 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/ctx.cpp b/src/ctx.cpp index 8aa10d9..d8783be 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -1,4 +1,5 @@ /* + Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file diff --git a/src/ctx.hpp b/src/ctx.hpp index 22ac932..619d57e 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/decoder.cpp b/src/decoder.cpp index d57265a..48f457f 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/decoder.hpp b/src/decoder.hpp index de63a09..c6f9100 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/devpoll.cpp b/src/devpoll.cpp index c4b3c54..0c46d14 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/devpoll.hpp b/src/devpoll.hpp index a668e9a..1de1af0 100644 --- a/src/devpoll.hpp +++ b/src/devpoll.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/dist.cpp b/src/dist.cpp index 795e13e..d220c43 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -1,6 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2011 250bpm s.r.o. + Copyright (c) 2011 VMware, Inc. + Copyright (c) 2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -111,8 +112,7 @@ int zmq::dist_t::send_to_all (msg_t *msg_, int flags_) int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_) { // Is this end of a multipart message? - bool msg_more = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + bool msg_more = msg_->flags () & msg_t::more ? true : false; // Push the message to matching pipes. distribute (msg_, flags_); @@ -137,6 +137,16 @@ void zmq::dist_t::distribute (msg_t *msg_, int flags_) return; } + if (msg_->is_vsm ()) { + for (pipes_t::size_type i = 0; i < matching; ++i) + write (pipes [i], msg_); + int rc = msg_->close(); + errno_assert (rc == 0); + rc = msg_->init (); + errno_assert (rc == 0); + return; + } + // Add matching-1 references to the message. We already hold one reference, // that's why -1. msg_->add_refs ((int) matching - 1); @@ -171,7 +181,7 @@ bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_) eligible--; return false; } - if (!(msg_->flags () & (msg_t::more | msg_t::label))) + if (!(msg_->flags () & msg_t::more)) pipe_->flush (); return true; } diff --git a/src/dist.hpp b/src/dist.hpp index c8d121c..a72de6e 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2011 250bpm s.r.o. + Copyright (c) 2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/encoder.cpp b/src/encoder.cpp index 8689e45..030b3ef 100644 --- a/src/encoder.cpp +++ b/src/encoder.cpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -89,14 +91,14 @@ bool zmq::encoder_t::message_ready () tmpbuf [0] = (unsigned char) size; tmpbuf [1] = (in_progress.flags () & ~msg_t::shared); next_step (tmpbuf, 2, &encoder_t::size_ready, - !(in_progress.flags () & (msg_t::more | msg_t::label))); + !(in_progress.flags () & msg_t::more)); } else { tmpbuf [0] = 0xff; put_uint64 (tmpbuf + 1, size); tmpbuf [9] = (in_progress.flags () & ~msg_t::shared); next_step (tmpbuf, 10, &encoder_t::size_ready, - !(in_progress.flags () & (msg_t::more | msg_t::label))); + !(in_progress.flags () & msg_t::more)); } return true; } diff --git a/src/encoder.hpp b/src/encoder.hpp index 949cbdc..8001c4e 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/epoll.cpp b/src/epoll.cpp index 39b4547..a62345d 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/epoll.hpp b/src/epoll.hpp index dc6b3ed..9bc31a5 100644 --- a/src/epoll.hpp +++ b/src/epoll.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/err.cpp b/src/err.cpp index ff81e03..028d752 100644 --- a/src/err.cpp +++ b/src/err.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/err.hpp b/src/err.hpp index 7c7a9d8..53a6569 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -1,5 +1,5 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -91,7 +93,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_) if (pipe_) *pipe_ = pipes [current]; more = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + msg_->flags () & msg_t::more ? true : false; if (!more) { current++; if (current >= active) @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 26e475b..19359b7 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/i_poll_events.hpp b/src/i_poll_events.hpp index fa9fb25..9cf47fd 100644 --- a/src/i_poll_events.hpp +++ b/src/i_poll_events.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/io_object.cpp b/src/io_object.cpp index e68917c..81b9ce5 100644 --- a/src/io_object.cpp +++ b/src/io_object.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/io_object.hpp b/src/io_object.hpp index fb0d1e3..bf7a625 100644 --- a/src/io_object.hpp +++ b/src/io_object.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/io_thread.cpp b/src/io_thread.cpp index c6f3880..40bbef9 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/io_thread.hpp b/src/io_thread.hpp index f578d4e..986c88d 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/ipc_address.cpp b/src/ipc_address.cpp index 6a471a6..d601c56 100644 --- a/src/ipc_address.cpp +++ b/src/ipc_address.cpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2011 250bpm s.r.o. + Copyright (c) 2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/ipc_address.hpp b/src/ipc_address.hpp index 453f5fd..4a7f230 100644 --- a/src/ipc_address.hpp +++ b/src/ipc_address.hpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2011 250bpm s.r.o. + Copyright (c) 2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index a54e8fe..dc0ee21 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2011 250bpm s.r.o. + Copyright (c) 2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index 721bcf4..c02245a 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2011 250bpm s.r.o. + Copyright (c) 2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index 5ba41be..07a7dff 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2011 250bpm s.r.o. + Copyright (c) 2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/ipc_listener.hpp b/src/ipc_listener.hpp index 4cd881b..0f06d23 100644 --- a/src/ipc_listener.hpp +++ b/src/ipc_listener.hpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2011 250bpm s.r.o. + Copyright (c) 2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/kqueue.cpp b/src/kqueue.cpp index cbf38d1..0b07fab 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/kqueue.hpp b/src/kqueue.hpp index 4ded81e..14f4e49 100644 --- a/src/kqueue.hpp +++ b/src/kqueue.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -75,7 +77,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_) // switch back to non-dropping mode. if (dropping) { - more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more = msg_->flags () & msg_t::more ? true : false; if (!more) dropping = false; @@ -88,8 +90,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_) while (active > 0) { if (pipes [current]->write (msg_)) { - more = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more = msg_->flags () & msg_t::more ? true : false; break; } @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/likely.hpp b/src/likely.hpp index a524a50..e604464 100644 --- a/src/likely.hpp +++ b/src/likely.hpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2009-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/mailbox.cpp b/src/mailbox.cpp index 9fd3ac4..ff16afe 100644 --- a/src/mailbox.cpp +++ b/src/mailbox.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/mailbox.hpp b/src/mailbox.hpp index 0675b99..c059c2a 100644 --- a/src/mailbox.hpp +++ b/src/mailbox.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/msg.cpp b/src/msg.cpp index e51ab67..60d5bf3 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -237,6 +238,11 @@ bool zmq::msg_t::is_delimiter () return u.base.type == type_delimiter; } +bool zmq::msg_t::is_vsm () +{ + return u.base.type == type_vsm; +} + void zmq::msg_t::add_refs (int refs_) { zmq_assert (refs_ >= 0); @@ -279,3 +285,4 @@ bool zmq::msg_t::rm_refs (int refs_) return true; } + diff --git a/src/msg.hpp b/src/msg.hpp index 514f95b..8c84670 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -47,10 +49,9 @@ namespace zmq // Mesage flags. enum { - label = 1, - command = 2, - shared = 64, - more = 128 + more = 1, + identity = 64, + shared = 128 }; bool check (); @@ -68,6 +69,7 @@ namespace zmq void set_flags (unsigned char flags_); void reset_flags (unsigned char flags_); bool is_delimiter (); + bool is_vsm (); // After calling this function you can copy the message in POD-style // refs_ times. No need to call copy. diff --git a/src/mtrie.cpp b/src/mtrie.cpp index 66bea20..1c96c98 100644 --- a/src/mtrie.cpp +++ b/src/mtrie.cpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2011 250bpm s.r.o. + Copyright (c) 2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/mtrie.hpp b/src/mtrie.hpp index 2c2cc32..8bbc22d 100644 --- a/src/mtrie.hpp +++ b/src/mtrie.hpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2011 250bpm s.r.o. + Copyright (c) 2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/mutex.hpp b/src/mutex.hpp index 9b13ffa..8d7068a 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/object.cpp b/src/object.cpp index 807fb04..622754c 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/object.hpp b/src/object.hpp index 1a38b24..f832596 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/options.cpp b/src/options.cpp index 8a3e527..4db1a6c 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -27,6 +29,7 @@ zmq::options_t::options_t () : sndhwm (1000), rcvhwm (1000), affinity (0), + identity_size (0), rate (100), recovery_ivl (10000), multicast_hops (1), @@ -43,7 +46,9 @@ zmq::options_t::options_t () : ipv4only (1), delay_on_close (true), delay_on_disconnect (true), - filter (false) + filter (false), + send_identity (false), + recv_identity (false) { } @@ -76,6 +81,20 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, affinity = *((uint64_t*) optval_); return 0; + case ZMQ_IDENTITY: + + // Empty identity is invalid as well as identity longer than + // 255 bytes. Identity starting with binary zero is invalid + // as these are used for auto-generated identities. + if (optvallen_ < 1 || optvallen_ > 255 || + *((const unsigned char*) optval_) == 0) { + errno = EINVAL; + return -1; + } + identity_size = optvallen_; + memcpy (identity, optval_, identity_size); + return 0; + case ZMQ_RATE: if (optvallen_ != sizeof (int) || *((int*) optval_) <= 0) { errno = EINVAL; @@ -232,6 +251,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (uint64_t); return 0; + case ZMQ_IDENTITY: + if (*optvallen_ < identity_size) { + errno = EINVAL; + return -1; + } + memcpy (optval_, identity, identity_size); + *optvallen_ = identity_size; + return 0; + case ZMQ_RATE: if (*optvallen_ < sizeof (int)) { errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index 4689522..bfc9dc7 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -41,6 +43,10 @@ namespace zmq // I/O thread affinity. uint64_t affinity; + // Socket identity + unsigned char identity_size; + unsigned char identity [256]; + // Maximum tranfer rate [kb/s]. Default 100kb/s. int rate; @@ -93,6 +99,12 @@ namespace zmq // If 1, (X)SUB socket should filter the messages. If 0, it should not. bool filter; + + // Sends identity to all new connections. + bool send_identity; + + // Receivers identity from all new connections. + bool recv_identity; }; } diff --git a/src/own.cpp b/src/own.cpp index f2ca4b2..d6dd309 100644 --- a/src/own.cpp +++ b/src/own.cpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/own.hpp b/src/own.hpp index 0902f73..ad5c452 100644 --- a/src/own.hpp +++ b/src/own.hpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/pair.cpp b/src/pair.cpp index 2fa4eac..6c652db 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/pair.hpp b/src/pair.hpp index e7390d6..67de2fd 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 6c292cd..122d110 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2010-2011 Miru Limited Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index b9e9a05..3c1d394 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2010-2011 Miru Limited Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 733b1ec..759802f 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2010-2011 Miru Limited Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index d3d5924..d8f046d 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2010-2011 Miru Limited Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 378370c..0274ee4 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2010-2011 Miru Limited Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index 8b1be54..5a5ef99 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2010-2011 Miru Limited Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/pipe.cpp b/src/pipe.cpp index c52deb9..25dd51c 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -63,8 +65,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, peer (NULL), sink (NULL), state (active), - delay (delay_), - pipe_id (0) + delay (delay_) { } @@ -86,14 +87,14 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_) sink = sink_; } -void zmq::pipe_t::set_pipe_id (uint32_t id_) +void zmq::pipe_t::set_identity (const blob_t &identity_) { - pipe_id = id_; + identity = identity_; } -uint32_t zmq::pipe_t::get_pipe_id () +zmq::blob_t zmq::pipe_t::get_identity () { - return pipe_id; + return identity; } bool zmq::pipe_t::check_read () @@ -136,7 +137,7 @@ bool zmq::pipe_t::read (msg_t *msg_) return false; } - if (!(msg_->flags () & (msg_t::more | msg_t::label))) + if (!(msg_->flags () & msg_t::more)) msgs_read++; if (lwm > 0 && msgs_read % lwm == 0) @@ -165,7 +166,7 @@ bool zmq::pipe_t::write (msg_t *msg_) if (unlikely (!check_write (msg_))) return false; - bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + bool more = msg_->flags () & msg_t::more ? true : false; outpipe->write (*msg_, more); if (!more) msgs_written++; @@ -179,7 +180,7 @@ void zmq::pipe_t::rollback () msg_t msg; if (outpipe) { while (outpipe->unwrite (&msg)) { - zmq_assert (msg.flags () & (msg_t::more | msg_t::label)); + zmq_assert (msg.flags () & msg_t::more); int rc = msg.close (); errno_assert (rc == 0); } diff --git a/src/pipe.hpp b/src/pipe.hpp index 437d84d..75a2021 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -27,6 +29,7 @@ #include "object.hpp" #include "stdint.hpp" #include "array.hpp" +#include "blob.hpp" namespace zmq { @@ -70,8 +73,8 @@ namespace zmq void set_event_sink (i_pipe_events *sink_); // Pipe endpoint can store an opaque ID to be used by its clients. - void set_pipe_id (uint32_t id_); - uint32_t get_pipe_id (); + void set_identity (const blob_t &identity_); + blob_t get_identity (); // Returns true if there is at least one message to read in the pipe. bool check_read (); @@ -182,8 +185,8 @@ namespace zmq // asks us to. bool delay; - // Opaque ID. To be used by the clients, not the pipe itself. - uint32_t pipe_id; + // Identity of the writer. Used uniquely by the reader side. + blob_t identity; // Returns true if the message is delimiter; false otherwise. static bool is_delimiter (msg_t &msg_); diff --git a/src/poll.cpp b/src/poll.cpp index 9d1978b..1d1c423 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/poll.hpp b/src/poll.hpp index 42f3af1..700256d 100644 --- a/src/poll.hpp +++ b/src/poll.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/poller.hpp b/src/poller.hpp index a8936ce..a989328 100644 --- a/src/poller.hpp +++ b/src/poller.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/poller_base.cpp b/src/poller_base.cpp index d5fb985..6e532ae 100644 --- a/src/poller_base.cpp +++ b/src/poller_base.cpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/poller_base.hpp b/src/poller_base.hpp index 44fe9f1..808ed38 100644 --- a/src/poller_base.hpp +++ b/src/poller_base.hpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/pub.cpp b/src/pub.cpp index 15ec291..7458d5f 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/pub.hpp b/src/pub.hpp index 4a4da0f..d418fd4 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/pull.cpp b/src/pull.cpp index 06575da..6028118 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2010 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/pull.hpp b/src/pull.hpp index 6a46ead..fa36d49 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2010 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/push.cpp b/src/push.cpp index e91b789..a0ed992 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2010 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/push.hpp b/src/push.hpp index 1feb71d..ea93693 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2010 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/random.cpp b/src/random.cpp index 9f7768c..326a3d9 100644 --- a/src/random.cpp +++ b/src/random.cpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2011 250bpm s.r.o. + Copyright (c) 2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/random.hpp b/src/random.hpp index d88b5ee..ca3d39a 100644 --- a/src/random.hpp +++ b/src/random.hpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2011 250bpm s.r.o. + Copyright (c) 2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/reaper.cpp b/src/reaper.cpp index 4c67b37..716f638 100644 --- a/src/reaper.cpp +++ b/src/reaper.cpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2011 250bpm s.r.o. + Copyright (c) 2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/reaper.hpp b/src/reaper.hpp index edcc319..1c1533f 100644 --- a/src/reaper.hpp +++ b/src/reaper.hpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2011 250bpm s.r.o. + Copyright (c) 2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/rep.cpp b/src/rep.cpp index 564fa89..02a825c 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -1,4 +1,5 @@ /* + Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file @@ -42,7 +43,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_) return -1; } - bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + bool more = msg_->flags () & msg_t::more ? true : false; // Push message to the reply pipe. int rc = xrep_t::xsend (msg_, flags_); @@ -71,19 +72,20 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_) int rc = xrep_t::xrecv (msg_, flags_); if (rc != 0) return rc; - if (!(msg_->flags () & msg_t::label)) - break; + zmq_assert (msg_->flags () & msg_t::more); + bool bottom = (msg_->size () == 0); rc = xrep_t::xsend (msg_, flags_); errno_assert (rc == 0); + if (bottom) + break; } request_begins = false; } - else { - int rc = xrep_t::xrecv (msg_, flags_); - if (rc != 0) - return rc; - } - zmq_assert (!(msg_->flags () & msg_t::label)); + + // Get next message part to return to the user. + int rc = xrep_t::xrecv (msg_, flags_); + if (rc != 0) + return rc; // If whole request is read, flip the FSM to reply-sending state. if (!(msg_->flags () & msg_t::more)) { diff --git a/src/rep.hpp b/src/rep.hpp index 55d57bd..de9c2b8 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -1,4 +1,5 @@ /* + Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file diff --git a/src/req.cpp b/src/req.cpp index 04a19fb..3ba1ec0 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -28,8 +30,7 @@ zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) : xreq_t (parent_, tid_), receiving_reply (false), - message_begins (true), - request_id (generate_random ()) + message_begins (true) { options.type = ZMQ_REQ; } @@ -49,19 +50,17 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_) // First part of the request is the request identity. if (message_begins) { - msg_t prefix; - int rc = prefix.init_size (4); + msg_t bottom; + int rc = bottom.init (); errno_assert (rc == 0); - prefix.set_flags (msg_t::label); - unsigned char *data = (unsigned char*) prefix.data (); - put_uint32 (data, request_id); - rc = xreq_t::xsend (&prefix, flags_); + bottom.set_flags (msg_t::more); + rc = xreq_t::xsend (&bottom, 0); if (rc != 0) - return rc; + return -1; message_begins = false; } - bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + bool more = msg_->flags () & msg_t::more ? true : false; int rc = xreq_t::xsend (msg_, flags_); if (rc != 0) @@ -91,19 +90,11 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) return rc; // TODO: This should also close the connection with the peer! - if (unlikely (!(msg_->flags () & msg_t::label) || msg_->size () != 4)) { - errno = EAGAIN; - return -1; - } - - unsigned char *data = (unsigned char*) msg_->data (); - if (unlikely (get_uint32 (data) != request_id)) { - - // The request ID does not match. Drop the entire message. + if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) { while (true) { int rc = xreq_t::xrecv (msg_, flags_); errno_assert (rc == 0); - if (!(msg_->flags () & (msg_t::label | msg_t::more))) + if (!(msg_->flags () & msg_t::more)) break; } msg_->close (); @@ -111,6 +102,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) errno = EAGAIN; return -1; } + message_begins = false; } @@ -119,8 +111,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) return rc; // If the reply is fully received, flip the FSM into request-sending state. - if (!(msg_->flags () & (msg_t::more | msg_t::label))) { - request_id++; + if (!(msg_->flags () & msg_t::more)) { receiving_reply = false; message_begins = true; } @@ -156,23 +147,32 @@ zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_, zmq::req_session_t::~req_session_t () { + state = options.recv_identity ? identity : bottom; } int zmq::req_session_t::write (msg_t *msg_) { - if (state == request_id) { - if (msg_->flags () == msg_t::label && msg_->size () == 4) { + switch (state) { + case bottom: + if (msg_->flags () == msg_t::more && msg_->size () == 0) { state = body; return xreq_session_t::write (msg_); } - } - else { + break; + case body: if (msg_->flags () == msg_t::more) return xreq_session_t::write (msg_); if (msg_->flags () == 0) { - state = request_id; + state = bottom; + return xreq_session_t::write (msg_); + } + break; + case identity: + if (msg_->flags () == 0) { + state = bottom; return xreq_session_t::write (msg_); } + break; } errno = EFAULT; return -1; diff --git a/src/req.hpp b/src/req.hpp index 0207a4f..8fae9d4 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -50,10 +52,6 @@ namespace zmq // of the message must be empty message part (backtrace stack bottom). bool message_begins; - // Request ID. Request numbers gradually increase (and wrap over) - // so that we don't have to generate random ID for each request. - uint32_t request_id; - req_t (const req_t&); const req_t &operator = (const req_t&); }; @@ -73,7 +71,8 @@ namespace zmq private: enum { - request_id, + identity, + bottom, body } state; diff --git a/src/router.cpp b/src/router.cpp deleted file mode 100755 index b7e19fb..0000000 --- a/src/router.cpp +++ /dev/null @@ -1,285 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "router.hpp" -#include "pipe.hpp" -#include "wire.hpp" -#include "random.hpp" -#include "likely.hpp" -#include "wire.hpp" -#include "err.hpp" - -zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_), - prefetched (false), - more_in (false), - current_out (NULL), - more_out (false), - next_peer_id (generate_random ()) -{ - options.type = ZMQ_ROUTER; - - prefetched_msg.init (); -} - -zmq::router_t::~router_t () -{ - zmq_assert (outpipes.empty ()); - prefetched_msg.close (); -} - -void zmq::router_t::xattach_pipe (pipe_t *pipe_) -{ - zmq_assert (pipe_); - - // Generate a new peer ID. Take care to avoid duplicates. - outpipes_t::iterator it = outpipes.lower_bound (next_peer_id); - if (!outpipes.empty ()) { - while (true) { - if (it == outpipes.end ()) - it = outpipes.begin (); - if (it->first != next_peer_id) - break; - ++next_peer_id; - ++it; - } - } - - // Add the pipe to the map out outbound pipes. - outpipe_t outpipe = {pipe_, true}; - bool ok = outpipes.insert (outpipes_t::value_type ( - next_peer_id, outpipe)).second; - zmq_assert (ok); - - // Add the pipe to the list of inbound pipes. - pipe_->set_pipe_id (next_peer_id); - fq.attach (pipe_); - - // Queue the connection command. - pending_command_t cmd = {1, next_peer_id}; - pending_commands.push_back (cmd); - - // Advance next peer ID so that if new connection is dropped shortly after - // its creation we don't accidentally get two subsequent peers with - // the same ID. - ++next_peer_id; -} - -void zmq::router_t::xterminated (pipe_t *pipe_) -{ - fq.terminated (pipe_); - - for (outpipes_t::iterator it = outpipes.begin (); - it != outpipes.end (); ++it) { - if (it->second.pipe == pipe_) { - - // Queue the disconnection command. - pending_command_t cmd = {2, it->first}; - pending_commands.push_back (cmd); - - // Remove the pipe. - outpipes.erase (it); - if (pipe_ == current_out) - current_out = NULL; - return; - } - } - zmq_assert (false); -} - -void zmq::router_t::xread_activated (pipe_t *pipe_) -{ - fq.activated (pipe_); -} - -void zmq::router_t::xwrite_activated (pipe_t *pipe_) -{ - for (outpipes_t::iterator it = outpipes.begin (); - it != outpipes.end (); ++it) { - if (it->second.pipe == pipe_) { - zmq_assert (!it->second.active); - it->second.active = true; - return; - } - } - zmq_assert (false); -} - -int zmq::router_t::xsend (msg_t *msg_, int flags_) -{ - // If this is the first part of the message it's the ID of the - // peer to send the message to. - if (!more_out) { - zmq_assert (!current_out); - - // The first message part has to be label. - if (unlikely (!(msg_->flags () & msg_t::label))) { - errno = EFSM; - return -1; - } - - // Find the pipe associated with the peer ID stored in the message. - if (unlikely (msg_->size () != 4)) { - errno = ECANTROUTE; - return -1; - } - uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ()); - outpipes_t::iterator it = outpipes.find (peer_id); - if (unlikely (it == outpipes.end ())) { - errno = ECANTROUTE; - return -1; - } - - // Check whether the pipe is available for writing. - msg_t empty; - int rc = empty.init (); - errno_assert (rc == 0); - if (!it->second.pipe->check_write (&empty)) { - rc = empty.close (); - errno_assert (rc == 0); - it->second.active = false; - errno = EAGAIN; - return -1; - } - rc = empty.close (); - errno_assert (rc == 0); - - // Mark the pipe to send the message to. - current_out = it->second.pipe; - more_out = true; - - // Clean up the message object. - rc = msg_->close (); - errno_assert (rc == 0); - rc = msg_->init (); - errno_assert (rc == 0); - return 0; - } - - // Check whether this is the last part of the message. - more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; - - // Push the message into the pipe. If there's no out pipe, just drop it. - if (current_out) { - bool ok = current_out->write (msg_); - if (unlikely (!ok)) - current_out = NULL; - else if (!more_out) { - current_out->flush (); - current_out = NULL; - } - } - else { - int rc = msg_->close (); - errno_assert (rc == 0); - } - - // Detach the message from the data buffer. - int rc = msg_->init (); - errno_assert (rc == 0); - - return 0; -} - -int zmq::router_t::xrecv (msg_t *msg_, int flags_) -{ - // If there's a queued command, pass it to the caller. - if (unlikely (!more_in && !pending_commands.empty ())) { - msg_->init_size (5); - unsigned char *data = (unsigned char*) msg_->data (); - put_uint8 (data, pending_commands.front ().cmd); - put_uint32 (data + 1, pending_commands.front ().peer); - msg_->set_flags (msg_t::command); - pending_commands.pop_front (); - return 0; - } - - // If there is a prefetched message, return it. - if (prefetched) { - int rc = msg_->move (prefetched_msg); - errno_assert (rc == 0); - more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; - prefetched = false; - return 0; - } - - // Get next message part. - pipe_t *pipe; - int rc = fq.recvpipe (msg_, flags_, &pipe); - if (rc != 0) - return -1; - - // If we are in the middle of reading a message, just return the next part. - if (more_in) { - more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; - return 0; - } - - // We are at the beginning of a new message. Move the message part we - // have to the prefetched and return the ID of the peer instead. - rc = prefetched_msg.move (*msg_); - errno_assert (rc == 0); - prefetched = true; - rc = msg_->close (); - errno_assert (rc == 0); - rc = msg_->init_size (4); - errno_assert (rc == 0); - put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ()); - msg_->set_flags (msg_t::label); - return 0; -} - -int zmq::router_t::rollback (void) -{ - if (current_out) { - current_out->rollback (); - current_out = NULL; - more_out = false; - } - return 0; -} - -bool zmq::router_t::xhas_in () -{ - if (prefetched) - return true; - return fq.has_in () || !pending_commands.empty(); -} - -bool zmq::router_t::xhas_out () -{ - // In theory, GENERIC socket is always ready for writing. Whether actual - // attempt to write succeeds depends on whitch pipe the message is going - // to be routed to. - return true; -} - -zmq::router_session_t::router_session_t (io_thread_t *io_thread_, bool connect_, - socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_) : - session_base_t (io_thread_, connect_, socket_, options_, protocol_, - address_) -{ -} - -zmq::router_session_t::~router_session_t () -{ -} - diff --git a/src/router.hpp b/src/router.hpp deleted file mode 100755 index 9a5c0f9..0000000 --- a/src/router.hpp +++ /dev/null @@ -1,123 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_ROUTER_HPP_INCLUDED__ -#define __ZMQ_ROUTER_HPP_INCLUDED__ - -#include <map> -#include <deque> - -#include "socket_base.hpp" -#include "session_base.hpp" -#include "stdint.hpp" -#include "msg.hpp" -#include "fq.hpp" - -namespace zmq -{ - - class router_t : - public socket_base_t - { - public: - - router_t (class ctx_t *parent_, uint32_t tid_); - ~router_t (); - - // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_); - int xsend (class msg_t *msg_, int flags_); - int xrecv (class msg_t *msg_, int flags_); - bool xhas_in (); - bool xhas_out (); - void xread_activated (class pipe_t *pipe_); - void xwrite_activated (class pipe_t *pipe_); - void xterminated (class pipe_t *pipe_); - - protected: - - // Rollback any message parts that were sent but not yet flushed. - int rollback (); - - private: - - // Fair queueing object for inbound pipes. - fq_t fq; - - // Have we prefetched a message. - bool prefetched; - - // Holds the prefetched message. - msg_t prefetched_msg; - - // If true, more incoming message parts are expected. - bool more_in; - - struct outpipe_t - { - class pipe_t *pipe; - bool active; - }; - - // Outbound pipes indexed by the peer IDs. - typedef std::map <uint32_t, outpipe_t> outpipes_t; - outpipes_t outpipes; - - // The pipe we are currently writing to. - class pipe_t *current_out; - - // If true, more outgoing message parts are expected. - bool more_out; - - // Peer ID are generated. It's a simple increment and wrap-over - // algorithm. This value is the next ID to use (if not used already). - uint32_t next_peer_id; - - // Commands to be delivered to the user. - struct pending_command_t - { - uint8_t cmd; - uint32_t peer; - }; - typedef std::deque <pending_command_t> pending_commands_t; - pending_commands_t pending_commands; - - router_t (const router_t&); - const router_t &operator = (const router_t&); - }; - - class router_session_t : public session_base_t - { - public: - - router_session_t (class io_thread_t *io_thread_, bool connect_, - class socket_base_t *socket_, const options_t &options_, - const char *protocol_, const char *address_); - ~router_session_t (); - - private: - - router_session_t (const router_session_t&); - const router_session_t &operator = (const router_session_t&); - }; - -} - -#endif diff --git a/src/select.cpp b/src/select.cpp index 0ecdcd7..56b87ae 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/select.hpp b/src/select.hpp index 55bc883..9231b6c 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/session_base.cpp b/src/session_base.cpp index 32dcd4f..f2ee713 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -26,7 +28,6 @@ #include "likely.hpp" #include "tcp_connecter.hpp" #include "ipc_connecter.hpp" -#include "vtcp_connecter.hpp" #include "pgm_sender.hpp" #include "pgm_receiver.hpp" @@ -40,7 +41,6 @@ #include "xsub.hpp" #include "push.hpp" #include "pull.hpp" -#include "router.hpp" #include "pair.hpp" zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, @@ -88,10 +88,6 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, s = new (std::nothrow) pull_session_t (io_thread_, connect_, socket_, options_, protocol_, address_); break; - case ZMQ_ROUTER: - s = new (std::nothrow) router_session_t (io_thread_, connect_, - socket_, options_, protocol_, address_); - break; case ZMQ_PAIR: s = new (std::nothrow) pair_session_t (io_thread_, connect_, socket_, options_, protocol_, address_); @@ -116,7 +112,9 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, engine (NULL), socket (socket_), io_thread (io_thread_), - has_linger_timer (false) + has_linger_timer (false), + send_identity (options_.send_identity), + recv_identity (options_.recv_identity) { if (protocol_) protocol = protocol_; @@ -150,18 +148,33 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_) int zmq::session_base_t::read (msg_t *msg_) { + // First message to send is identity (if required). + if (send_identity) { + zmq_assert (!(msg_->flags () & msg_t::more)); + msg_->init_size (options.identity_size); + memcpy (msg_->data (), options.identity, options.identity_size); + send_identity = false; + incomplete_in = false; + return 0; + } + if (!pipe || !pipe->read (msg_)) { errno = EAGAIN; return -1; } + incomplete_in = msg_->flags () & msg_t::more ? true : false; - incomplete_in = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; return 0; } int zmq::session_base_t::write (msg_t *msg_) { + // First message to receive is identity (if required). + if (recv_identity) { + msg_->set_flags (msg_t::identity); + recv_identity = false; + } + if (pipe && pipe->write (msg_)) { int rc = msg_->init (); errno_assert (rc == 0); @@ -398,18 +411,6 @@ void zmq::session_base_t::start_connecting (bool wait_) } #endif -#if defined ZMQ_HAVE_VTCP - if (protocol == "vtcp") { - - vtcp_connecter_t *connecter = new (std::nothrow) vtcp_connecter_t ( - io_thread, this, options, address.c_str (), - wait_); - alloc_assert (connecter); - launch_child (connecter); - return; - } -#endif - #if defined ZMQ_HAVE_OPENPGM // Both PGM and EPGM transports are using the same infrastructure. diff --git a/src/session_base.hpp b/src/session_base.hpp index e388d42..c89628f 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -118,6 +120,10 @@ namespace zmq // True is linger timer is running. bool has_linger_timer; + // If true, identity is to be sent/recvd from the network. + bool send_identity; + bool recv_identity; + // Protocol and address to use when connecting. std::string protocol; std::string address; diff --git a/src/signaler.cpp b/src/signaler.cpp index e89f45a..e7191d3 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/signaler.hpp b/src/signaler.hpp index dd474d9..4466c98 100644 --- a/src/signaler.hpp +++ b/src/signaler.hpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index a4d89db..a59ba69 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -36,7 +38,6 @@ #include "socket_base.hpp" #include "tcp_listener.hpp" #include "ipc_listener.hpp" -#include "vtcp_listener.hpp" #include "tcp_connecter.hpp" #include "io_thread.hpp" #include "session_base.hpp" @@ -60,7 +61,6 @@ #include "xrep.hpp" #include "xpub.hpp" #include "xsub.hpp" -#include "router.hpp" bool zmq::socket_base_t::check_tag () { @@ -106,9 +106,6 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, case ZMQ_XSUB: s = new (std::nothrow) xsub_t (parent_, tid_); break; - case ZMQ_ROUTER: - s = new (std::nothrow) router_t (parent_, tid_); - break; default: errno = EINVAL; return NULL; @@ -124,8 +121,6 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) : destroyed (false), last_tsc (0), ticks (0), - rcvlabel (false), - rcvcmd (false), rcvmore (false) { } @@ -176,8 +171,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) { // First check out whether the protcol is something we are aware of. if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && - protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys" && - protocol_ != "vtcp") { + protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys") { errno = EPROTONOSUPPORT; return -1; } @@ -191,14 +185,6 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) } #endif - // If 0MQ is not compiled with VTCP, vtcp transport is not avaialble. -#if !defined ZMQ_HAVE_VTCP - if (protocol_ == "vtcp") { - errno = EPROTONOSUPPORT; - return -1; - } -#endif - // IPC transport is not available on Windows and OpenVMS. #if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS if (protocol_ == "ipc") { @@ -265,26 +251,6 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, return -1; } - if (option_ == ZMQ_RCVLABEL) { - if (*optvallen_ < sizeof (int)) { - errno = EINVAL; - return -1; - } - *((int*) optval_) = rcvlabel ? 1 : 0; - *optvallen_ = sizeof (int); - return 0; - } - - if (option_ == ZMQ_RCVCMD) { - if (*optvallen_ < sizeof (int)) { - errno = EINVAL; - return -1; - } - *((int*) optval_) = rcvcmd ? 1 : 0; - *optvallen_ = sizeof (int); - return 0; - } - if (option_ == ZMQ_RCVMORE) { if (*optvallen_ < sizeof (int)) { errno = EINVAL; @@ -392,21 +358,6 @@ int zmq::socket_base_t::bind (const char *addr_) } #endif -#if defined ZMQ_HAVE_VTCP - if (protocol == "vtcp") { - vtcp_listener_t *listener = new (std::nothrow) vtcp_listener_t ( - io_thread, this, options); - alloc_assert (listener); - int rc = listener->set_address (address.c_str ()); - if (rc != 0) { - delete listener; - return -1; - } - launch_child (listener); - return 0; - } -#endif - zmq_assert (false); return -1; } @@ -524,12 +475,8 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) return -1; // At this point we impose the flags on the message. - if (flags_ & ZMQ_SNDLABEL) - msg_->set_flags (msg_t::label); if (flags_ & ZMQ_SNDMORE) msg_->set_flags (msg_t::more); - if (flags_ & ZMQ_SNDCMD) - msg_->set_flags (msg_t::command); // Try to send the message. rc = xsend (msg_, flags_); @@ -898,13 +845,16 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_) void zmq::socket_base_t::extract_flags (msg_t *msg_) { - rcvlabel = msg_->flags () & msg_t::label; - if (rcvlabel) - msg_->reset_flags (msg_t::label); + // Test whether IDENTITY flag is valid for this socket type. + if (unlikely (msg_->flags () & msg_t::identity)) { + zmq_assert (options.recv_identity); +printf ("identity recvd\n"); + } + + + // Remove MORE flag. rcvmore = msg_->flags () & msg_t::more ? true : false; if (rcvmore) msg_->reset_flags (msg_t::more); - rcvcmd = msg_->flags () & msg_t::command ? true : false; - if (rcvcmd) - msg_->reset_flags (msg_t::command); } + diff --git a/src/socket_base.hpp b/src/socket_base.hpp index c7c86e7..bc978ba 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -183,12 +185,6 @@ namespace zmq // Number of messages received since last command processing. int ticks; - // True if the last message received had LABEL flag set. - bool rcvlabel; - - // True if the last message received had COMMAND flag set. - bool rcvcmd; - // True if the last message received had MORE flag set. bool rcvmore; diff --git a/src/stdint.hpp b/src/stdint.hpp index 73186d3..b78afcd 100644 --- a/src/stdint.hpp +++ b/src/stdint.hpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 2647795..11ec264 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 92fc55f..6d122ed 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/sub.cpp b/src/sub.cpp index d9f2f2e..3249aea 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/sub.hpp b/src/sub.hpp index 7d3cf0b..bb46641 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/tcp_address.cpp b/src/tcp_address.cpp index 0aa564a..1b7577f 100644 --- a/src/tcp_address.cpp +++ b/src/tcp_address.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/tcp_address.hpp b/src/tcp_address.hpp index 58ac540..d4768c7 100644 --- a/src/tcp_address.hpp +++ b/src/tcp_address.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index fe99252..75079da 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index d1a93cd..e420c82 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 9b6068c..0b7a90d 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2010 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp index 60713e3..e712998 100644 --- a/src/tcp_listener.hpp +++ b/src/tcp_listener.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2010 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/thread.cpp b/src/thread.cpp index d1c6729..00628e5 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -1,4 +1,5 @@ /* + Copyright (c) 2010-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file diff --git a/src/thread.hpp b/src/thread.hpp index f3f5f8d..52769b1 100644 --- a/src/thread.hpp +++ b/src/thread.hpp @@ -1,4 +1,5 @@ /* + Copyright (c) 2010-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file diff --git a/src/trie.cpp b/src/trie.cpp index cd6cb7b..9718c77 100644 --- a/src/trie.cpp +++ b/src/trie.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/trie.hpp b/src/trie.hpp index a2b55c6..76e4fd9 100644 --- a/src/trie.hpp +++ b/src/trie.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/vtcp_connecter.cpp b/src/vtcp_connecter.cpp deleted file mode 100644 index 5dc147e..0000000 --- a/src/vtcp_connecter.cpp +++ /dev/null @@ -1,251 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "vtcp_connecter.hpp" - -#if defined ZMQ_HAVE_VTCP - -#include <new> -#include <string> - -#include "stream_engine.hpp" -#include "io_thread.hpp" -#include "platform.hpp" -#include "random.hpp" -#include "likely.hpp" -#include "err.hpp" -#include "ip.hpp" - -#if defined ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#else -#include <unistd.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <arpa/inet.h> -#include <netinet/tcp.h> -#include <netinet/in.h> -#include <netdb.h> -#include <fcntl.h> -#ifdef ZMQ_HAVE_OPENVMS -#include <ioctl.h> -#endif -#endif - -zmq::vtcp_connecter_t::vtcp_connecter_t (class io_thread_t *io_thread_, - class session_base_t *session_, const options_t &options_, - const char *address_, bool wait_) : - own_t (io_thread_, options_), - io_object_t (io_thread_), - s (retired_fd), - handle_valid (false), - wait (wait_), - session (session_), - current_reconnect_ivl(options.reconnect_ivl) -{ - subport = 0; - - int rc = set_address (address_); - zmq_assert (rc == 0); -} - -zmq::vtcp_connecter_t::~vtcp_connecter_t () -{ - if (wait) - cancel_timer (reconnect_timer_id); - if (handle_valid) - rm_fd (handle); - - if (s != retired_fd) - close (); -} - -int zmq::vtcp_connecter_t::set_address (const char *addr_) -{ - const char *delimiter = strrchr (addr_, '.'); - if (!delimiter) { - delimiter = strrchr (addr_, ':'); - if (!delimiter) { - errno = EINVAL; - return -1; - } - std::string addr_str (addr_, delimiter - addr_); - addr_str += ":9220"; - std::string subport_str (delimiter + 1); - subport = (vtcp_subport_t) atoi (subport_str.c_str ()); - int rc = address.resolve (addr_str.c_str (), false, true); - if (rc != 0) - return -1; - } - else { - std::string addr_str (addr_, delimiter - addr_); - std::string subport_str (delimiter + 1); - subport = (vtcp_subport_t) atoi (subport_str.c_str ()); - int rc = address.resolve (addr_str.c_str (), false, true); - if (rc != 0) - return -1; - } - - return 0; -} - -void zmq::vtcp_connecter_t::process_plug () -{ - if (wait) - add_reconnect_timer(); - else - start_connecting (); -} - -void zmq::vtcp_connecter_t::in_event () -{ - // We are not polling for incomming data, so we are actually called - // because of error here. However, we can get error on out event as well - // on some platforms, so we'll simply handle both events in the same way. - out_event (); -} - -void zmq::vtcp_connecter_t::out_event () -{ - fd_t fd = connect (); - rm_fd (handle); - handle_valid = false; - - // Handle the error condition by attempt to reconnect. - if (fd == retired_fd) { - close (); - wait = true; - add_reconnect_timer(); - return; - } - - // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); - alloc_assert (engine); - - // Attach the engine to the corresponding session object. - send_attach (session, engine); - - // Shut the connecter down. - terminate (); -} - -void zmq::vtcp_connecter_t::timer_event (int id_) -{ - zmq_assert (id_ == reconnect_timer_id); - wait = false; - start_connecting (); -} - -void zmq::vtcp_connecter_t::start_connecting () -{ - // Open the connecting socket. - int rc = open (); - - // Handle error condition by eventual reconnect. - if (unlikely (rc != 0)) { - errno_assert (false); - wait = true; - add_reconnect_timer(); - return; - } - - // Connection establishment may be dealyed. Poll for its completion. - handle = add_fd (s); - handle_valid = true; - set_pollout (handle); -} - -void zmq::vtcp_connecter_t::add_reconnect_timer() -{ - add_timer (get_new_reconnect_ivl(), reconnect_timer_id); -} - -int zmq::vtcp_connecter_t::get_new_reconnect_ivl () -{ - // The new interval is the current interval + random value. - int this_interval = current_reconnect_ivl + - (generate_random () % options.reconnect_ivl); - - // Only change the current reconnect interval if the maximum reconnect - // interval was set and if it's larger than the reconnect interval. - if (options.reconnect_ivl_max > 0 && - options.reconnect_ivl_max > options.reconnect_ivl) { - - // Calculate the next interval - current_reconnect_ivl = current_reconnect_ivl * 2; - if(current_reconnect_ivl >= options.reconnect_ivl_max) { - current_reconnect_ivl = options.reconnect_ivl_max; - } - } - return this_interval; -} - -int zmq::vtcp_connecter_t::open () -{ - zmq_assert (s == retired_fd); - - // Start the connection procedure. - sockaddr_in *paddr = (sockaddr_in*) address.addr (); - s = vtcp_connect (paddr->sin_addr.s_addr, ntohs (paddr->sin_port)); - - // Connect was successfull immediately. - if (s != retired_fd) - return 0; - - // Asynchronous connect was launched. - if (errno == EINPROGRESS) { - errno = EAGAIN; - return -1; - } - - // Error occured. - return -1; -} - -zmq::fd_t zmq::vtcp_connecter_t::connect () -{ - int rc = vtcp_acceptc (s, subport); - if (rc != 0) { - int err = errno; - close (); - errno = err; - return retired_fd; - } - - tune_tcp_socket (s); - - fd_t result = s; - s = retired_fd; - return result; -} - -int zmq::vtcp_connecter_t::close () -{ - zmq_assert (s != retired_fd); - int rc = ::close (s); - if (rc != 0) - return -1; - s = retired_fd; - return 0; -} - -#endif - diff --git a/src/vtcp_connecter.hpp b/src/vtcp_connecter.hpp deleted file mode 100644 index fe5260e..0000000 --- a/src/vtcp_connecter.hpp +++ /dev/null @@ -1,120 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __VTCP_CONNECTER_HPP_INCLUDED__ -#define __VTCP_CONNECTER_HPP_INCLUDED__ - -#include "platform.hpp" - -#if defined ZMQ_HAVE_VTCP - -#include <vtcp.h> - -#include "fd.hpp" -#include "own.hpp" -#include "stdint.hpp" -#include "io_object.hpp" -#include "tcp_address.hpp" - -namespace zmq -{ - - class vtcp_connecter_t : public own_t, public io_object_t - { - public: - - // If 'delay' is true connecter first waits for a while, then starts - // connection process. - vtcp_connecter_t (class io_thread_t *io_thread_, - class session_base_t *session_, const options_t &options_, - const char *address_, bool delay_); - ~vtcp_connecter_t (); - - private: - - // ID of the timer used to delay the reconnection. - enum {reconnect_timer_id = 1}; - - // Handlers for incoming commands. - void process_plug (); - - // Handlers for I/O events. - void in_event (); - void out_event (); - void timer_event (int id_); - - // Internal function to start the actual connection establishment. - void start_connecting (); - - // Internal function to add a reconnect timer - void add_reconnect_timer(); - - // Internal function to return a reconnect backoff delay. - // Will modify the current_reconnect_ivl used for next call - // Returns the currently used interval - int get_new_reconnect_ivl (); - - // Set address to connect to. - int set_address (const char *addr_); - - // Open TCP connecting socket. Returns -1 in case of error, - // 0 if connect was successfull immediately and 1 if async connect - // was launched. - int open (); - - // Close the connecting socket. - int close (); - - // Get the file descriptor of newly created connection. Returns - // retired_fd if the connection was unsuccessfull. - fd_t connect (); - - // Address to connect to. - tcp_address_t address; - vtcp_subport_t subport; - - // Underlying socket. - fd_t s; - - // Handle corresponding to the listening socket. - handle_t handle; - - // If true file descriptor is registered with the poller and 'handle' - // contains valid value. - bool handle_valid; - - // If true, connecter is waiting a while before trying to connect. - bool wait; - - // Reference to the session we belong to. - class session_base_t *session; - - // Current reconnect ivl, updated for backoff strategy - int current_reconnect_ivl; - - vtcp_connecter_t (const vtcp_connecter_t&); - const vtcp_connecter_t &operator = (const vtcp_connecter_t&); - }; - -} - -#endif - -#endif diff --git a/src/vtcp_listener.cpp b/src/vtcp_listener.cpp deleted file mode 100644 index 7e496e5..0000000 --- a/src/vtcp_listener.cpp +++ /dev/null @@ -1,124 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "vtcp_listener.hpp" - -#if defined ZMQ_HAVE_VTCP - -#include <string> -#include <string.h> -#include <vtcp.h> - -#include "stream_engine.hpp" -#include "session_base.hpp" -#include "stdint.hpp" -#include "err.hpp" -#include "ip.hpp" - -zmq::vtcp_listener_t::vtcp_listener_t (io_thread_t *io_thread_, - socket_base_t *socket_, options_t &options_) : - own_t (io_thread_, options_), - io_object_t (io_thread_), - s (retired_fd), - socket (socket_) -{ -} - -zmq::vtcp_listener_t::~vtcp_listener_t () -{ - if (s != retired_fd) { - int rc = ::close (s); - errno_assert (rc == 0); - s = retired_fd; - } -} - -int zmq::vtcp_listener_t::set_address (const char *addr_) -{ - // VTCP doesn't allow for binding to a specific interface. Connection - // string has to begin with *: (INADDR_ANY). - if (strlen (addr_) < 2 || addr_ [0] != '*' || addr_ [1] != ':') { - errno = EADDRNOTAVAIL; - return -1; - } - - // Parse port and subport. - uint16_t port; - uint32_t subport; - const char *delimiter = strrchr (addr_, '.'); - if (!delimiter) { - port = 9220; - subport = (uint32_t) atoi (addr_ + 2); - } - else { - std::string port_str (addr_ + 2, delimiter - addr_ - 2); - std::string subport_str (delimiter + 1); - port = (uint16_t) atoi (port_str.c_str ()); - subport = (uint32_t) atoi (subport_str.c_str ()); - } - - // Start listening. - s = vtcp_bind (port, subport); - if (s == retired_fd) - return -1; - - return 0; -} - -void zmq::vtcp_listener_t::process_plug () -{ - // Start polling for incoming connections. - handle = add_fd (s); - set_pollin (handle); -} - -void zmq::vtcp_listener_t::process_term (int linger_) -{ - rm_fd (handle); - own_t::process_term (linger_); -} - -void zmq::vtcp_listener_t::in_event () -{ - fd_t fd = vtcp_acceptb (s); - if (fd == retired_fd) - return; - - tune_tcp_socket (fd); - - // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); - alloc_assert (engine); - - // Choose I/O thread to run connecter in. Given that we are already - // running in an I/O thread, there must be at least one available. - io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_assert (io_thread); - - // Create and launch a session object. - session_base_t *session = session_base_t::create (io_thread, false, socket, - options, NULL, NULL); - alloc_assert (session); - session->inc_seqnum (); - launch_child (session); - send_attach (session, engine, false); -} - -#endif diff --git a/src/vtcp_listener.hpp b/src/vtcp_listener.hpp deleted file mode 100644 index 78f3b51..0000000 --- a/src/vtcp_listener.hpp +++ /dev/null @@ -1,71 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_VTCP_LISTENER_HPP_INCLUDED__ -#define __ZMQ_VTCP_LISTENER_HPP_INCLUDED__ - -#include "platform.hpp" - -#if defined ZMQ_HAVE_VTCP - -#include "own.hpp" -#include "io_object.hpp" -#include "fd.hpp" - -namespace zmq -{ - - class vtcp_listener_t : public own_t, public io_object_t - { - public: - - vtcp_listener_t (class io_thread_t *io_thread_, - class socket_base_t *socket_, class options_t &options_); - ~vtcp_listener_t (); - - int set_address (const char *addr_); - - private: - - // Handlers for incoming commands. - void process_plug (); - void process_term (int linger_); - - // Handlers for I/O events. - void in_event (); - - // VTCP listener socket. - fd_t s; - - // Handle corresponding to the listening socket. - handle_t handle; - - // Socket the listerner belongs to. - class socket_base_t *socket; - - vtcp_listener_t (const vtcp_listener_t&); - const vtcp_listener_t &operator = (const vtcp_listener_t&); - }; - -} - -#endif - -#endif diff --git a/src/windows.hpp b/src/windows.hpp index 8f39914..5e986b2 100644 --- a/src/windows.hpp +++ b/src/windows.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/wire.hpp b/src/wire.hpp index bc9dfe5..b0f4e0e 100644 --- a/src/wire.hpp +++ b/src/wire.hpp @@ -1,5 +1,5 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/xpub.cpp b/src/xpub.cpp index a245fea..5d7a97c 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -1,6 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2011 VMware, Inc. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -100,8 +101,7 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) int zmq::xpub_t::xsend (msg_t *msg_, int flags_) { - bool msg_more = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + bool msg_more = msg_->flags () & msg_t::more ? true : false; // For the first part of multi-part message, find the matching pipes. if (!more) diff --git a/src/xpub.hpp b/src/xpub.hpp index b410e6c..14ffc58 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/xrep.cpp b/src/xrep.cpp index 9f2a947..ea19e56 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -35,9 +37,14 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : { options.type = ZMQ_XREP; + // TODO: Uncomment the following line when XREP will become true XREP + // rather than generic router socket. // If peer disconnect there's noone to send reply to anyway. We can drop // all the outstanding requests from that peer. - options.delay_on_disconnect = false; + // options.delay_on_disconnect = false; + + options.send_identity = true; + options.recv_identity = true; prefetched_msg.init (); } @@ -52,33 +59,22 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_) { zmq_assert (pipe_); - // Generate a new peer ID. Take care to avoid duplicates. - outpipes_t::iterator it = outpipes.lower_bound (next_peer_id); - if (!outpipes.empty ()) { - while (true) { - if (it == outpipes.end ()) - it = outpipes.begin (); - if (it->first != next_peer_id) - break; - ++next_peer_id; - ++it; - } - } + // Generate a new unique peer identity. + unsigned char buf [5]; + buf [0] = 0; + put_uint32 (buf + 1, next_peer_id); + blob_t identity (buf, 5); + ++next_peer_id; // Add the pipe to the map out outbound pipes. outpipe_t outpipe = {pipe_, true}; bool ok = outpipes.insert (outpipes_t::value_type ( - next_peer_id, outpipe)).second; + identity, outpipe)).second; zmq_assert (ok); // Add the pipe to the list of inbound pipes. - pipe_->set_pipe_id (next_peer_id); - fq.attach (pipe_); - - // Advance next peer ID so that if new connection is dropped shortly after - // its creation we don't accidentally get two subsequent peers with - // the same ID. - ++next_peer_id; + pipe_->set_identity (identity); + fq.attach (pipe_); } void zmq::xrep_t::xterminated (pipe_t *pipe_) @@ -125,30 +121,29 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) // If we have malformed message (prefix with no subsequent message) // then just silently ignore it. // TODO: The connections should be killed instead. - if (msg_->flags () & msg_t::label) { + if (msg_->flags () & msg_t::more) { more_out = true; - // Find the pipe associated with the peer ID stored in the prefix. + // Find the pipe associated with the identity stored in the prefix. // If there's no such pipe just silently ignore the message. - if (msg_->size () == 4) { - uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ()); - outpipes_t::iterator it = outpipes.find (peer_id); - - if (it != outpipes.end ()) { - current_out = it->second.pipe; - msg_t empty; - int rc = empty.init (); - errno_assert (rc == 0); - if (!current_out->check_write (&empty)) { - it->second.active = false; - more_out = false; - current_out = NULL; - } - rc = empty.close (); - errno_assert (rc == 0); + blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); + outpipes_t::iterator it = outpipes.find (identity); + + if (it != outpipes.end ()) { + current_out = it->second.pipe; + msg_t empty; + int rc = empty.init (); + errno_assert (rc == 0); + if (!current_out->check_write (&empty)) { + it->second.active = false; + more_out = false; + current_out = NULL; } + rc = empty.close (); + errno_assert (rc == 0); } + } int rc = msg_->close (); @@ -159,7 +154,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) } // Check whether this is the last part of the message. - more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more_out = msg_->flags () & msg_t::more ? true : false; // Push the message into the pipe. If there's no out pipe, just drop it. if (current_out) { @@ -189,7 +184,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) if (prefetched) { int rc = msg_->move (prefetched_msg); errno_assert (rc == 0); - more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more_in = msg_->flags () & msg_t::more ? true : false; prefetched = false; return 0; } @@ -200,9 +195,40 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) if (rc != 0) return -1; + // If identity is received, change the key assigned to the pipe. + if (unlikely (msg_->flags () & msg_t::identity)) { + zmq_assert (!more_in); + + // Empty identity means we can preserve the auto-generated identity. + if (msg_->size () != 0) { + + // Actual change of the identity. + outpipes_t::iterator it = outpipes.begin (); + while (it != outpipes.end ()) { + if (it->second.pipe == pipe) { + blob_t identity ((unsigned char*) msg_->data (), + msg_->size ()); + pipe->set_identity (identity); + outpipes.erase (it); + outpipe_t outpipe = {pipe, true}; + outpipes.insert (outpipes_t::value_type (identity, + outpipe)); + break; + } + ++it; + } + zmq_assert (it != outpipes.end ()); + } + + // After processing the identity, try to get the next message. + rc = fq.recvpipe (msg_, flags_, &pipe); + if (rc != 0) + return -1; + } + // If we are in the middle of reading a message, just return the next part. if (more_in) { - more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more_in = msg_->flags () & msg_t::more ? true : false; return 0; } @@ -213,10 +239,12 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) prefetched = true; rc = msg_->close (); errno_assert (rc == 0); - rc = msg_->init_size (4); + + blob_t identity = pipe->get_identity (); + rc = msg_->init_size (identity.size ()); errno_assert (rc == 0); - put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ()); - msg_->set_flags (msg_t::label); + memcpy (msg_->data (), identity.data (), identity.size ()); + msg_->set_flags (msg_t::more); return 0; } diff --git a/src/xrep.hpp b/src/xrep.hpp index 562f87d..fc02b11 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -1,5 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2011 iMatix Corporation + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -26,6 +28,7 @@ #include "socket_base.hpp" #include "session_base.hpp" #include "stdint.hpp" +#include "blob.hpp" #include "msg.hpp" #include "fq.hpp" @@ -77,7 +80,7 @@ namespace zmq }; // Outbound pipes indexed by the peer IDs. - typedef std::map <uint32_t, outpipe_t> outpipes_t; + typedef std::map <blob_t, outpipe_t> outpipes_t; outpipes_t outpipes; // The pipe we are currently writing to. diff --git a/src/xreq.cpp b/src/xreq.cpp index 79b3b94..91317f7 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2011 VMware, Inc. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -27,9 +28,14 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) : { options.type = ZMQ_XREQ; + // TODO: Uncomment the following line when XREQ will become true XREQ + // rather than generic dealer socket. // If the socket is closing we can drop all the outbound requests. There'll // be noone to receive the replies anyway. - options.delay_on_close = false; + // options.delay_on_close = false; + + options.send_identity = true; + options.recv_identity = true; } zmq::xreq_t::~xreq_t () @@ -50,7 +56,15 @@ int zmq::xreq_t::xsend (msg_t *msg_, int flags_) int zmq::xreq_t::xrecv (msg_t *msg_, int flags_) { - return fq.recv (msg_, flags_); + // XREQ socket doesn't use identities. We can safely drop it and + while (true) { + int rc = fq.recv (msg_, flags_); + if (rc != 0) + return rc; + if (likely (!(msg_->flags () & msg_t::identity))) + break; + } + return 0; } bool zmq::xreq_t::xhas_in () diff --git a/src/xreq.hpp b/src/xreq.hpp index d7e28c4..1d979c5 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -1,8 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under diff --git a/src/xsub.cpp b/src/xsub.cpp index b24f082..aae2654 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -1,6 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2011 VMware, Inc. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -116,7 +117,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_) int rc = msg_->move (message); errno_assert (rc == 0); has_message = false; - more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more = msg_->flags () & msg_t::more ? true : false; return 0; } @@ -136,14 +137,13 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_) // Check whether the message matches at least one subscription. // Non-initial parts of the message are passed if (more || !options.filter || match (msg_)) { - more = - msg_->flags () & (msg_t::more | msg_t::label) ? true : false; + more = msg_->flags () & msg_t::more ? true : false; return 0; } // Message doesn't match. Pop any remaining parts of the message // from the pipe. - while (msg_->flags () & (msg_t::more | msg_t::label)) { + while (msg_->flags () & msg_t::more) { rc = fq.recv (msg_, ZMQ_DONTWAIT); zmq_assert (rc == 0); } @@ -183,7 +183,7 @@ bool zmq::xsub_t::xhas_in () // Message doesn't match. Pop any remaining parts of the message // from the pipe. - while (message.flags () & (msg_t::more | msg_t::label)) { + while (message.flags () & msg_t::more) { rc = fq.recv (&message, ZMQ_DONTWAIT); zmq_assert (rc == 0); } diff --git a/src/xsub.hpp b/src/xsub.hpp index 310df6e..1eac390 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/ypipe.hpp b/src/ypipe.hpp index da4e85a..74a96bc 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/yqueue.hpp b/src/yqueue.hpp index e436ea4..1c83cb8 100644 --- a/src/yqueue.hpp +++ b/src/yqueue.hpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/src/zmq.cpp b/src/zmq.cpp index 0f54fab..b06b122 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1,4 +1,5 @@ /* + Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file @@ -603,7 +604,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) } #else int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout); - if (unlikely (rc == -1) { + if (unlikely (rc == -1)) { if (errno == EINTR || errno == EBADF) return -1; errno_assert (false); diff --git a/src/zmq_utils.cpp b/src/zmq_utils.cpp index c7eb60f..8f34134 100644 --- a/src/zmq_utils.cpp +++ b/src/zmq_utils.cpp @@ -1,5 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. diff --git a/tests/Makefile.am b/tests/Makefile.am index 6ed3762..5f0cfc1 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -7,7 +7,6 @@ noinst_PROGRAMS = test_pair_inproc \ test_reqrep_tcp \ test_hwm \ test_reqrep_device \ - test_reqrep_drop \ test_sub_forward \ test_invalid_rep @@ -24,7 +23,6 @@ test_reqrep_inproc_SOURCES = test_reqrep_inproc.cpp testutil.hpp test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp test_hwm_SOURCES = test_hwm.cpp test_reqrep_device_SOURCES = test_reqrep_device.cpp -test_reqrep_drop_SOURCES = test_reqrep_drop.cpp test_sub_forward_SOURCES = test_sub_forward.cpp test_invalid_rep_SOURCES = test_invalid_rep.cpp diff --git a/tests/test_hwm.cpp b/tests/test_hwm.cpp index 10b26e1..d887b31 100644 --- a/tests/test_hwm.cpp +++ b/tests/test_hwm.cpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -20,11 +20,14 @@ #include <assert.h> +#include <stdio.h> #include "testutil.hpp" int main (int argc, char *argv []) { + fprintf (stderr, "test_hwm running...\n"); + void *ctx = zmq_init (1); assert (ctx); diff --git a/tests/test_invalid_rep.cpp b/tests/test_invalid_rep.cpp index 2657c20..9c77cc4 100644 --- a/tests/test_invalid_rep.cpp +++ b/tests/test_invalid_rep.cpp @@ -1,6 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2011 VMware, Inc. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -20,9 +21,12 @@ #include "../include/zmq.h" #include <assert.h> +#include <stdio.h> int main (int argc, char *argv []) { + fprintf (stderr, "test_invalid_rep running...\n"); + // Create REQ/XREP wiring. void *ctx = zmq_init (1); assert (ctx); @@ -45,25 +49,26 @@ int main (int argc, char *argv []) assert (rc == 1); // Receive the request. - char addr [4]; - char seqn [4]; + char addr [32]; + int addr_size; + char bottom [1]; char body [1]; - rc = zmq_recv (xrep_socket, addr, sizeof (addr), 0); - assert (rc == 4); - rc = zmq_recv (xrep_socket, seqn, sizeof (seqn), 0); - assert (rc == 4); + addr_size = zmq_recv (xrep_socket, addr, sizeof (addr), 0); + assert (addr_size >= 0); + rc = zmq_recv (xrep_socket, bottom, sizeof (bottom), 0); + assert (rc == 0); rc = zmq_recv (xrep_socket, body, sizeof (body), 0); assert (rc == 1); // Send invalid reply. - rc = zmq_send (xrep_socket, addr, 4, 0); - assert (rc == 4); + rc = zmq_send (xrep_socket, addr, addr_size, 0); + assert (rc == addr_size); // Send valid reply. - rc = zmq_send (xrep_socket, addr, 4, ZMQ_SNDLABEL); - assert (rc == 4); - rc = zmq_send (xrep_socket, seqn, 4, ZMQ_SNDLABEL); - assert (rc == 4); + rc = zmq_send (xrep_socket, addr, addr_size, ZMQ_SNDMORE); + assert (rc == addr_size); + rc = zmq_send (xrep_socket, bottom, 0, ZMQ_SNDMORE); + assert (rc == 0); rc = zmq_send (xrep_socket, "b", 1, 0); assert (rc == 1); diff --git a/tests/test_pair_inproc.cpp b/tests/test_pair_inproc.cpp index 6194f2a..6705cc5 100644 --- a/tests/test_pair_inproc.cpp +++ b/tests/test_pair_inproc.cpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -19,10 +19,13 @@ */ #include <assert.h> +#include <stdio.h> #include "testutil.hpp" int main (int argc, char *argv []) { + fprintf (stderr, "test_pair_inproc running...\n"); + void *ctx = zmq_init (0); assert (ctx); diff --git a/tests/test_pair_ipc.cpp b/tests/test_pair_ipc.cpp index 2c83a69..96a265f 100644 --- a/tests/test_pair_ipc.cpp +++ b/tests/test_pair_ipc.cpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -19,10 +19,13 @@ */ #include <assert.h> +#include <stdio.h> #include "testutil.hpp" int main (int argc, char *argv []) { + fprintf (stderr, "test_pair_ipc running...\n"); + void *ctx = zmq_init (1); assert (ctx); diff --git a/tests/test_pair_tcp.cpp b/tests/test_pair_tcp.cpp index 8ecfef5..464be5a 100644 --- a/tests/test_pair_tcp.cpp +++ b/tests/test_pair_tcp.cpp @@ -1,6 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2011 iMatix Corporation + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -19,10 +20,13 @@ */ #include <assert.h> +#include <stdio.h> #include "testutil.hpp" int main (int argc, char *argv []) { + fprintf (stderr, "test_pair_tcp running...\n"); + void *ctx = zmq_init (1); assert (ctx); diff --git a/tests/test_reqrep_device.cpp b/tests/test_reqrep_device.cpp index f6f06c9..d861cec 100644 --- a/tests/test_reqrep_device.cpp +++ b/tests/test_reqrep_device.cpp @@ -1,6 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2011 VMware, Inc. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -20,11 +21,14 @@ #include <assert.h> #include <string.h> +#include <stdio.h> #include "../include/zmq.h" int main (int argc, char *argv []) { + fprintf (stderr, "test_reqrep_device running...\n"); + void *ctx = zmq_init (1); assert (ctx); @@ -63,15 +67,11 @@ int main (int argc, char *argv []) assert (rc == 0); rc = zmq_recvmsg (xrep, &msg, 0); assert (rc >= 0); - int rcvlabel; - size_t sz = sizeof (rcvlabel); - rc = zmq_getsockopt (xrep, ZMQ_RCVLABEL, &rcvlabel, &sz); - assert (rc == 0); int rcvmore; + size_t sz = sizeof (rcvmore); rc = zmq_getsockopt (xrep, ZMQ_RCVMORE, &rcvmore, &sz); assert (rc == 0); - rc = zmq_sendmsg (xreq, &msg, - (rcvlabel ? ZMQ_SNDLABEL : 0) | (rcvmore ? ZMQ_SNDMORE : 0)); + rc = zmq_sendmsg (xreq, &msg, rcvmore ? ZMQ_SNDMORE : 0); assert (rc >= 0); } @@ -80,21 +80,14 @@ int main (int argc, char *argv []) rc = zmq_recv (rep, buff, 3, 0); assert (rc == 3); assert (memcmp (buff, "ABC", 3) == 0); - int rcvlabel; - size_t sz = sizeof (rcvlabel); - rc = zmq_getsockopt (rep, ZMQ_RCVLABEL, &rcvlabel, &sz); - assert (rc == 0); - assert (!rcvlabel); int rcvmore; + size_t sz = sizeof (rcvmore); rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz); assert (rc == 0); assert (rcvmore); rc = zmq_recv (rep, buff, 3, 0); assert (rc == 3); assert (memcmp (buff, "DEF", 3) == 0); - rc = zmq_getsockopt (rep, ZMQ_RCVLABEL, &rcvlabel, &sz); - assert (rc == 0); - assert (!rcvlabel); rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz); assert (rc == 0); assert (!rcvmore); @@ -112,15 +105,10 @@ int main (int argc, char *argv []) assert (rc == 0); rc = zmq_recvmsg (xreq, &msg, 0); assert (rc >= 0); - int rcvlabel; - size_t sz = sizeof (rcvlabel); - rc = zmq_getsockopt (xreq, ZMQ_RCVLABEL, &rcvlabel, &sz); - assert (rc == 0); int rcvmore; rc = zmq_getsockopt (xreq, ZMQ_RCVMORE, &rcvmore, &sz); assert (rc == 0); - rc = zmq_sendmsg (xrep, &msg, - (rcvlabel ? ZMQ_SNDLABEL : 0) | (rcvmore ? ZMQ_SNDMORE : 0)); + rc = zmq_sendmsg (xrep, &msg, rcvmore ? ZMQ_SNDMORE : 0); assert (rc >= 0); } @@ -128,18 +116,12 @@ int main (int argc, char *argv []) rc = zmq_recv (req, buff, 3, 0); assert (rc == 3); assert (memcmp (buff, "GHI", 3) == 0); - rc = zmq_getsockopt (req, ZMQ_RCVLABEL, &rcvlabel, &sz); - assert (rc == 0); - assert (!rcvlabel); rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz); assert (rc == 0); assert (rcvmore); rc = zmq_recv (req, buff, 3, 0); assert (rc == 3); assert (memcmp (buff, "JKL", 3) == 0); - rc = zmq_getsockopt (req, ZMQ_RCVLABEL, &rcvlabel, &sz); - assert (rc == 0); - assert (!rcvlabel); rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz); assert (rc == 0); assert (!rcvmore); diff --git a/tests/test_reqrep_drop.cpp b/tests/test_reqrep_drop.cpp deleted file mode 100644 index 6531357..0000000 --- a/tests/test_reqrep_drop.cpp +++ /dev/null @@ -1,143 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <assert.h> - -#include "../include/zmq.h" -#include "../include/zmq_utils.h" - -int main (int argc, char *argv []) -{ - void *ctx = zmq_init (1); - assert (ctx); - - // Check whether requests are discarded because of disconnected requester. - - // Create a server. - void *xrep = zmq_socket (ctx, ZMQ_XREP); - assert (xrep); - int rc = zmq_bind (xrep, "tcp://127.0.0.1:5560"); - assert (rc == 0); - - // Create a client. - void *xreq = zmq_socket (ctx, ZMQ_XREQ); - assert (xreq); - rc = zmq_connect (xreq, "tcp://127.0.0.1:5560"); - assert (rc == 0); - - // Send requests. - rc = zmq_send (xreq, "ABC", 3, 0); - assert (rc == 3); - rc = zmq_send (xreq, "DEF", 3, 0); - assert (rc == 3); - - // Disconnect client. - rc = zmq_close (xreq); - assert (rc == 0); - - // Wait a while for disconnect to happen. - zmq_sleep (1); - - // Try to receive a request -- it should have been discarded. - char buff [3]; - rc = zmq_recv (xrep, buff, 3, ZMQ_DONTWAIT); - assert (rc < 0); - assert (errno == EAGAIN); - - // Clean up. - rc = zmq_close (xrep); - assert (rc == 0); - - // New test. Check whether reply is dropped because of HWM overflow. - - int one = 1; - xreq = zmq_socket (ctx, ZMQ_XREQ); - assert (xreq); - rc = zmq_setsockopt (xreq, ZMQ_RCVHWM, &one, sizeof(one)); - assert (rc == 0); - rc = zmq_bind (xreq, "inproc://a"); - assert (rc == 0); - - void *rep = zmq_socket (ctx, ZMQ_REP); - assert (rep); - rc = zmq_setsockopt (rep, ZMQ_SNDHWM, &one, sizeof(one)); - assert (rc == 0); - rc = zmq_connect (rep, "inproc://a"); - assert (rc == 0); - - // Send request 1 - rc = zmq_send (xreq, buff, 1, 0); - assert (rc == 1); - - // Send request 2 - rc = zmq_send (xreq, buff, 1, 0); - assert (rc == 1); - - // Receive request 1 - rc = zmq_recv (rep, buff, 1, 0); - assert (rc == 1); - - // Send request 3 - rc = zmq_send (xreq, buff, 1, 0); - assert (rc == 1); - - // Send reply 1 - rc = zmq_send (rep, buff, 1, 0); - assert (rc == 1); - - // Receive request 2 - rc = zmq_recv (rep, buff, 1, 0); - assert (rc == 1); - - // Send reply 2 - rc = zmq_send (rep, buff, 1, 0); - assert (rc == 1); - - // Receive request 3 - rc = zmq_recv (rep, buff, 1, 0); - assert (rc == 1); - - // Send reply 3 - rc = zmq_send (rep, buff, 1, 0); - assert (rc == 1); - - // Receive reply 1 - rc = zmq_recv (xreq, buff, 1, 0); - assert (rc == 1); - - // Receive reply 2 - rc = zmq_recv (xreq, buff, 1, 0); - assert (rc == 1); - - // Try to receive reply 3, it should have been dropped. - rc = zmq_recv (xreq, buff, 1, ZMQ_DONTWAIT); - assert (rc == -1 && errno == EAGAIN); - - // Clean up. - rc = zmq_close (xreq); - assert (rc == 0); - rc = zmq_close (rep); - assert (rc == 0); - - rc = zmq_term (ctx); - assert (rc == 0); - - return 0 ; -} diff --git a/tests/test_reqrep_inproc.cpp b/tests/test_reqrep_inproc.cpp index aeff7ef..f710968 100644 --- a/tests/test_reqrep_inproc.cpp +++ b/tests/test_reqrep_inproc.cpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -19,10 +19,13 @@ */ #include <assert.h> +#include <stdio.h> #include "testutil.hpp" int main (int argc, char *argv []) { + fprintf (stderr, "test_reqrep_inproc running...\n"); + void *ctx = zmq_init (0); assert (ctx); diff --git a/tests/test_reqrep_ipc.cpp b/tests/test_reqrep_ipc.cpp index af15998..fd9b28d 100644 --- a/tests/test_reqrep_ipc.cpp +++ b/tests/test_reqrep_ipc.cpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -19,10 +19,13 @@ */ #include <assert.h> +#include <stdio.h> #include "testutil.hpp" int main (int argc, char *argv []) { + fprintf (stderr, "test_reqrep_ipc running...\n"); + void *ctx = zmq_init (1); assert (ctx); diff --git a/tests/test_reqrep_tcp.cpp b/tests/test_reqrep_tcp.cpp index c713e26..1e6bbbb 100644 --- a/tests/test_reqrep_tcp.cpp +++ b/tests/test_reqrep_tcp.cpp @@ -1,6 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2011 iMatix Corporation + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -19,10 +20,13 @@ */ #include <assert.h> +#include <stdio.h> #include "testutil.hpp" int main (int argc, char *argv []) { + fprintf (stderr, "test_reqrep_tcp running...\n"); + void *ctx = zmq_init (1); assert (ctx); diff --git a/tests/test_shutdown_stress.cpp b/tests/test_shutdown_stress.cpp index b3ee90f..811637c 100644 --- a/tests/test_shutdown_stress.cpp +++ b/tests/test_shutdown_stress.cpp @@ -1,6 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2011 iMatix Corporation + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -22,6 +23,7 @@ #include <assert.h> #include <pthread.h> #include <stddef.h> +#include <stdio.h> #define THREAD_COUNT 100 @@ -52,6 +54,8 @@ int main (int argc, char *argv []) int rc; pthread_t threads [THREAD_COUNT]; + fprintf (stderr, "test_shutdown_stress running...\n"); + for (j = 0; j != 10; j++) { // Check the shutdown with many parallel I/O threads. diff --git a/tests/test_sub_forward.cpp b/tests/test_sub_forward.cpp index d69f923..36a7f4a 100644 --- a/tests/test_sub_forward.cpp +++ b/tests/test_sub_forward.cpp @@ -1,6 +1,7 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2011 iMatix Corporation + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -19,12 +20,15 @@ */ #include <assert.h> +#include <stdio.h> #include "../include/zmq.h" #include "../include/zmq_utils.h" int main (int argc, char *argv []) { + fprintf (stderr, "test_sub_forward running...\n"); + void *ctx = zmq_init (1); assert (ctx); diff --git a/tests/test_timeo.cpp b/tests/test_timeo.cpp index a8a3fc0..29ba8a0 100644 --- a/tests/test_timeo.cpp +++ b/tests/test_timeo.cpp @@ -1,6 +1,6 @@ /* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2010-2011 250bpm s.r.o. + Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -21,6 +21,7 @@ #include <assert.h> #include <string.h> #include <pthread.h> +#include <stdio.h> #include "../include/zmq.h" #include "../include/zmq_utils.h" @@ -45,6 +46,8 @@ extern "C" int main (int argc, char *argv []) { + fprintf (stderr, "test_timeo...\n"); + void *ctx = zmq_init (1); assert (ctx); diff --git a/tests/testutil.hpp b/tests/testutil.hpp index 57db3c4..e68dc32 100644 --- a/tests/testutil.hpp +++ b/tests/testutil.hpp @@ -1,4 +1,5 @@ /* + Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file |