summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--AUTHORS3
-rw-r--r--COPYING.LESSER20
-rwxr-xr-xautogen.sh2
-rw-r--r--configure.in13
-rw-r--r--doc/zmq_getsockopt.txt29
-rw-r--r--doc/zmq_recv.txt7
-rw-r--r--doc/zmq_recvmsg.txt7
-rw-r--r--doc/zmq_send.txt11
-rw-r--r--doc/zmq_sendmsg.txt11
-rw-r--r--doc/zmq_setsockopt.txt17
-rw-r--r--doc/zmq_socket.txt33
-rw-r--r--include/zmq.h13
-rw-r--r--include/zmq_utils.h2
-rw-r--r--perf/inproc_lat.cpp3
-rw-r--r--perf/inproc_thr.cpp3
-rw-r--r--perf/local_lat.cpp3
-rw-r--r--perf/local_thr.cpp3
-rw-r--r--perf/remote_lat.cpp3
-rw-r--r--perf/remote_thr.cpp3
-rw-r--r--src/Makefile.am7
-rw-r--r--src/array.hpp3
-rw-r--r--src/atomic_counter.hpp3
-rw-r--r--src/atomic_ptr.hpp3
-rw-r--r--src/blob.hpp35
-rw-r--r--src/clock.cpp4
-rw-r--r--src/clock.hpp4
-rw-r--r--src/command.hpp3
-rw-r--r--src/config.hpp3
-rw-r--r--src/ctx.cpp1
-rw-r--r--src/ctx.hpp3
-rw-r--r--src/decoder.cpp3
-rw-r--r--src/decoder.hpp3
-rw-r--r--src/devpoll.cpp3
-rw-r--r--src/devpoll.hpp3
-rw-r--r--src/dist.cpp20
-rw-r--r--src/dist.hpp4
-rw-r--r--src/encoder.cpp8
-rw-r--r--src/encoder.hpp3
-rw-r--r--src/epoll.cpp3
-rw-r--r--src/epoll.hpp3
-rw-r--r--src/err.cpp3
-rw-r--r--src/err.hpp3
-rw-r--r--src/fd.hpp2
-rw-r--r--src/fq.cpp6
-rw-r--r--src/fq.hpp3
-rw-r--r--src/i_engine.hpp3
-rw-r--r--src/i_poll_events.hpp3
-rw-r--r--src/io_object.cpp3
-rw-r--r--src/io_object.hpp3
-rw-r--r--src/io_thread.cpp3
-rw-r--r--src/io_thread.hpp3
-rw-r--r--src/ip.cpp3
-rw-r--r--src/ip.hpp3
-rw-r--r--src/ipc_address.cpp4
-rw-r--r--src/ipc_address.hpp4
-rw-r--r--src/ipc_connecter.cpp4
-rw-r--r--src/ipc_connecter.hpp4
-rw-r--r--src/ipc_listener.cpp4
-rw-r--r--src/ipc_listener.hpp4
-rw-r--r--src/kqueue.cpp3
-rw-r--r--src/kqueue.hpp3
-rw-r--r--src/lb.cpp9
-rw-r--r--src/lb.hpp3
-rw-r--r--src/likely.hpp4
-rw-r--r--src/mailbox.cpp3
-rw-r--r--src/mailbox.hpp3
-rw-r--r--src/msg.cpp9
-rw-r--r--src/msg.hpp12
-rw-r--r--src/mtrie.cpp4
-rw-r--r--src/mtrie.hpp4
-rw-r--r--src/mutex.hpp3
-rw-r--r--src/object.cpp3
-rw-r--r--src/object.hpp3
-rw-r--r--src/options.cpp32
-rw-r--r--src/options.hpp14
-rw-r--r--src/own.cpp4
-rw-r--r--src/own.hpp4
-rw-r--r--src/pair.cpp3
-rw-r--r--src/pair.hpp3
-rw-r--r--src/pgm_receiver.cpp4
-rw-r--r--src/pgm_receiver.hpp4
-rw-r--r--src/pgm_sender.cpp4
-rw-r--r--src/pgm_sender.hpp4
-rw-r--r--src/pgm_socket.cpp4
-rw-r--r--src/pgm_socket.hpp4
-rw-r--r--src/pipe.cpp21
-rw-r--r--src/pipe.hpp13
-rw-r--r--src/poll.cpp3
-rw-r--r--src/poll.hpp3
-rw-r--r--src/poller.hpp3
-rw-r--r--src/poller_base.cpp4
-rw-r--r--src/poller_base.hpp4
-rw-r--r--src/pub.cpp3
-rw-r--r--src/pub.hpp3
-rw-r--r--src/pull.cpp3
-rw-r--r--src/pull.hpp3
-rw-r--r--src/push.cpp3
-rw-r--r--src/push.hpp3
-rw-r--r--src/random.cpp4
-rw-r--r--src/random.hpp4
-rw-r--r--src/reaper.cpp4
-rw-r--r--src/reaper.hpp4
-rw-r--r--src/rep.cpp20
-rw-r--r--src/rep.hpp1
-rw-r--r--src/req.cpp56
-rw-r--r--src/req.hpp11
-rwxr-xr-xsrc/router.cpp285
-rwxr-xr-xsrc/router.hpp123
-rw-r--r--src/select.cpp3
-rw-r--r--src/select.hpp3
-rw-r--r--src/session_base.cpp45
-rw-r--r--src/session_base.hpp8
-rw-r--r--src/signaler.cpp4
-rw-r--r--src/signaler.hpp4
-rw-r--r--src/socket_base.cpp76
-rw-r--r--src/socket_base.hpp10
-rw-r--r--src/stdint.hpp2
-rw-r--r--src/stream_engine.cpp3
-rw-r--r--src/stream_engine.hpp3
-rw-r--r--src/sub.cpp3
-rw-r--r--src/sub.hpp3
-rw-r--r--src/tcp_address.cpp3
-rw-r--r--src/tcp_address.hpp3
-rw-r--r--src/tcp_connecter.cpp3
-rw-r--r--src/tcp_connecter.hpp3
-rw-r--r--src/tcp_listener.cpp3
-rw-r--r--src/tcp_listener.hpp3
-rw-r--r--src/thread.cpp1
-rw-r--r--src/thread.hpp1
-rw-r--r--src/trie.cpp3
-rw-r--r--src/trie.hpp3
-rw-r--r--src/vtcp_connecter.cpp251
-rw-r--r--src/vtcp_connecter.hpp120
-rw-r--r--src/vtcp_listener.cpp124
-rw-r--r--src/vtcp_listener.hpp71
-rw-r--r--src/windows.hpp3
-rw-r--r--src/wire.hpp2
-rw-r--r--src/xpub.cpp8
-rw-r--r--src/xpub.hpp4
-rw-r--r--src/xrep.cpp120
-rw-r--r--src/xrep.hpp7
-rw-r--r--src/xreq.cpp20
-rw-r--r--src/xreq.hpp3
-rw-r--r--src/xsub.cpp14
-rw-r--r--src/xsub.hpp4
-rw-r--r--src/ypipe.hpp3
-rw-r--r--src/yqueue.hpp3
-rw-r--r--src/zmq.cpp3
-rw-r--r--src/zmq_utils.cpp3
-rw-r--r--tests/Makefile.am2
-rw-r--r--tests/test_hwm.cpp7
-rw-r--r--tests/test_invalid_rep.cpp33
-rw-r--r--tests/test_pair_inproc.cpp7
-rw-r--r--tests/test_pair_ipc.cpp7
-rw-r--r--tests/test_pair_tcp.cpp8
-rw-r--r--tests/test_reqrep_device.cpp38
-rw-r--r--tests/test_reqrep_drop.cpp143
-rw-r--r--tests/test_reqrep_inproc.cpp7
-rw-r--r--tests/test_reqrep_ipc.cpp7
-rw-r--r--tests/test_reqrep_tcp.cpp8
-rw-r--r--tests/test_shutdown_stress.cpp8
-rw-r--r--tests/test_sub_forward.cpp8
-rw-r--r--tests/test_timeo.cpp7
-rw-r--r--tests/testutil.hpp1
164 files changed, 696 insertions, 1655 deletions
diff --git a/AUTHORS b/AUTHORS
index 589a160..5ebf390 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -5,6 +5,7 @@ Alexej Lotz <alexej.lotz@arcor.de>
Andrew Thompson <andy@fud.org.nz>
Asko Kauppi <askok@dnainternet.net>
Barak Amar <barak.amar@gmail.com>
+Ben Gray <ben@benjamg.com>
Bernd Prager <bernd@prager.ws>
Bernd Melchers <melchers@ZEDAT.FU-Berlin.DE>
Bob Beaty <rbeaty@peak6.com>
@@ -50,9 +51,11 @@ Mikko Koppanen <mkoppanen@php.net>
Min Ragan-Kelley <benjaminrk@gmail.com>
Neale Ferguson <neale@sinenomine.net>
Nir Soffer <nirsof@gmail.com>
+Paul Betts <paul@paulbetts.org>
Paul Colomiets <pc@gafol.net>
Pavel Gushcha <pavimus@gmail.com>
Pavol Malosek <malosek@fastmq.com>
+Perry Kundert <perry@kundert.ca>
Peter Bourgon <peter.bourgon@gmail.com>
Pieter Hintjens <ph@imatix.com>
Piotr Trojanek <piotr.trojanek@gmail.com>
diff --git a/COPYING.LESSER b/COPYING.LESSER
index 3d13ddb..9925774 100644
--- a/COPYING.LESSER
+++ b/COPYING.LESSER
@@ -166,16 +166,16 @@ Library.
--------------------------------------------------------------------------------
- SPECIAL EXCEPTION GRANTED BY IMATIX
-
-As a special exception, iMatix gives you permission to link this library with
-independent modules to produce an executable, regardless of the license terms
-of these independent modules, and to copy and distribute the resulting
-executable under terms of your choice, provided that you also meet, for each
-linked independent module, the terms and conditions of the license of that
-module. An independent module is a module which is not derived from or based on
-this library. If you modify this library, you must extend this exception to your
-version of the library.
+ SPECIAL EXCEPTION GRANTED BY COPYRIGHT HOLDERS
+
+As a special exception, copyright holders give you permission to link this
+library with independent modules to produce an executable, regardless of
+the license terms of these independent modules, and to copy and distribute
+the resulting executable under terms of your choice, provided that you also
+meet, for each linked independent module, the terms and conditions of
+the license of that module. An independent module is a module which is not
+derived from or based on this library. If you modify this library, you must
+extend this exception to your version of the library.
--------------------------------------------------------------------------------
diff --git a/autogen.sh b/autogen.sh
index d03084b..52a8812 100755
--- a/autogen.sh
+++ b/autogen.sh
@@ -1,6 +1,6 @@
#!/bin/sh
-# Copyright (c) 2007-2011 iMatix Corporation
+# Copyright (c) 2007-2009 iMatix Corporation
# Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
#
# This file is part of 0MQ.
diff --git a/configure.in b/configure.in
index b0ee7a1..86147ea 100644
--- a/configure.in
+++ b/configure.in
@@ -348,19 +348,6 @@ fi
AC_SUBST(pgm_basename)
-# VTCP extension
-libzmq_vtcp="no"
-
-AC_ARG_WITH([vtcp], [AS_HELP_STRING([--with-vtcp],
- [build libzmq with VTCP extension [default=no]])],
- [with_vtcp=$withval], [with_vtcp=no])
-
-if test "x$with_vtcp" != "xno"; then
- AC_DEFINE(ZMQ_HAVE_VTCP, 1, [Have VTCP extension])
- AC_CHECK_LIB(vtcp, vtcp_bind, ,
- [AC_MSG_ERROR([cannot link with -lvtcp, install libvtcp.])])
-fi
-
# Set -Wall, -Werror and -pedantic
AC_LANG_PUSH([C++])
diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt
index 99065c0..252834e 100644
--- a/doc/zmq_getsockopt.txt
+++ b/doc/zmq_getsockopt.txt
@@ -39,19 +39,6 @@ Default value:: N/A
Applicable socket types:: all
-ZMQ_RCVLABEL: Inquires whether last message received was a label
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-The 'ZMQ_RCVLABEL' option shall return True (1) if the message part last
-received from the 'socket' was an address label. Otherwise, this option
-shall return False (0).
-
-[horizontal]
-Option value type:: int
-Option value unit:: boolean
-Default value:: N/A
-Applicable socket types:: all
-
-
ZMQ_RCVMORE: More message data parts to follow
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RCVMORE' option shall return True (1) if the message part last
@@ -130,6 +117,22 @@ Option value unit:: N/A (bitmap)
Default value:: 0
Applicable socket types:: N/A
+ZMQ_IDENTITY: Set socket identity
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_IDENTITY' option shall retrieve the identity of the specified 'socket'.
+Socket identity is used only by request/reply pattern. Namely, it can be used
+in tandem with ROUTER socket to route messages to the peer with specific
+identity.
+
+Identity should be at least one byte and at most 255 bytes long. Identities
+starting with binary zero are reserved for use by 0MQ infrastructure.
+
+[horizontal]
+Option value type:: binary data
+Option value unit:: N/A
+Default value:: NULL
+Applicable socket types:: all
+
ZMQ_RATE: Retrieve multicast data rate
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/doc/zmq_recv.txt b/doc/zmq_recv.txt
index c9c3ce8..d1ef3e1 100644
--- a/doc/zmq_recv.txt
+++ b/doc/zmq_recv.txt
@@ -29,8 +29,7 @@ function shall fail with 'errno' set to EAGAIN.
Multi-part messages
~~~~~~~~~~~~~~~~~~~
-A 0MQ message is composed of 1 or more message parts, starting with zero or
-more address 'label' parts, followed by 1 or more 'data' parts. Each message
+A 0MQ message is composed of 1 or more message parts. Each message
part is an independent 'zmq_msg_t' in its own right. 0MQ ensures atomic
delivery of messages; peers shall receive either all _message parts_ of a
message or none at all. The total number of message parts is unlimited except
@@ -38,9 +37,7 @@ by available memory.
An application that processes multipart messages must use the _ZMQ_RCVMORE_
linkzmq:zmq_getsockopt[3] option after calling _zmq_recv()_ to determine if
-there are further parts to receive. An application that manipulates address
-labels must use _ZMQ_RCVLABEL_ to determine the zero or more label parts
-that precede the data part(s).
+there are further parts to receive.
RETURN VALUE
------------
diff --git a/doc/zmq_recvmsg.txt b/doc/zmq_recvmsg.txt
index 358ea3f..6e41b1e 100644
--- a/doc/zmq_recvmsg.txt
+++ b/doc/zmq_recvmsg.txt
@@ -29,8 +29,7 @@ function shall fail with 'errno' set to EAGAIN.
Multi-part messages
~~~~~~~~~~~~~~~~~~~
-A 0MQ message is composed of 1 or more message parts, starting with zero or
-more address 'label' parts, followed by 1 or more 'data' parts. Each message
+A 0MQ message is composed of 1 or more message parts. Each message
part is an independent 'zmq_msg_t' in its own right. 0MQ ensures atomic
delivery of messages; peers shall receive either all _message parts_ of a
message or none at all. The total number of message parts is unlimited except
@@ -38,9 +37,7 @@ by available memory.
An application that processes multipart messages must use the _ZMQ_RCVMORE_
linkzmq:zmq_getsockopt[3] option after calling _zmq_recvmsg()_ to determine if
-there are further parts to receive. An application that manipulates address
-labels must use _ZMQ_RCVLABEL_ to determine the zero or more label parts
-that precede the data part(s).
+there are further parts to receive.
RETURN VALUE
diff --git a/doc/zmq_send.txt b/doc/zmq_send.txt
index 133de97..f00e449 100644
--- a/doc/zmq_send.txt
+++ b/doc/zmq_send.txt
@@ -23,11 +23,6 @@ Specifies that the operation should be performed in non-blocking mode. If the
message cannot be queued on the 'socket', the _zmq_send()_ function shall
fail with 'errno' set to EAGAIN.
-*ZMQ_SNDLABEL*::
-Specifies that the message part being sent is an address label, and that
-further message parts are to follow. Refer to linkzmq:zmq_socket[3] for the
-semantics of address labels in each socket pattern.
-
*ZMQ_SNDMORE*::
Specifies that the message being sent is a multi-part message, and that further
message parts are to follow. Refer to the section regarding multi-part messages
@@ -40,16 +35,14 @@ the 'socket' and 0MQ has assumed responsibility for the message.
Multi-part messages
~~~~~~~~~~~~~~~~~~~
-A 0MQ message is composed of 1 or more message parts, starting with zero or
-more address 'label' parts, followed by 1 or more 'data' parts. Each message
+A 0MQ message is composed of 1 or more message parts. Each message
part is an independent 'zmq_msg_t' in its own right. 0MQ ensures atomic
delivery of messages; peers shall receive either all _message parts_ of a
message or none at all. The total number of message parts is unlimited except
by available memory.
An application that sends multipart messages must use the _ZMQ_SNDMORE_ flag
-when sending each data part except the final one. An application that sends
-address labels must use _ZMQ_SNDLABEL_ when sending each label.
+when sending each data part except the final one.
RETURN VALUE
diff --git a/doc/zmq_sendmsg.txt b/doc/zmq_sendmsg.txt
index 244c0bd..d069bd7 100644
--- a/doc/zmq_sendmsg.txt
+++ b/doc/zmq_sendmsg.txt
@@ -23,11 +23,6 @@ Specifies that the operation should be performed in non-blocking mode. If the
message cannot be queued on the 'socket', the _zmq_sendmsg()_ function shall
fail with 'errno' set to EAGAIN.
-*ZMQ_SNDLABEL*::
-Specifies that the message part being sent is an address 'label', and that
-further message parts are to follow. Refer to linkzmq:zmq_socket[3] for the
-semantics of address labels in each socket pattern.
-
*ZMQ_SNDMORE*::
Specifies that the message being sent is a multi-part message, and that further
message parts are to follow. Refer to the section regarding multi-part messages
@@ -44,16 +39,14 @@ the 'socket' and 0MQ has assumed responsibility for the message.
Multi-part messages
~~~~~~~~~~~~~~~~~~~
-A 0MQ message is composed of 1 or more message parts, starting with zero or
-more address 'label' parts, followed by 1 or more 'data' parts. Each message
+A 0MQ message is composed of 1 or more message parts. Each message
part is an independent 'zmq_msg_t' in its own right. 0MQ ensures atomic
delivery of messages; peers shall receive either all _message parts_ of a
message or none at all. The total number of message parts is unlimited except
by available memory.
An application that sends multipart messages must use the _ZMQ_SNDMORE_ flag
-when sending each data part except the final one. An application that sends
-address labels must use _ZMQ_SNDLABEL_ when sending each label.
+when sending each data part except the final one.
RETURN VALUE
------------
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index fd004f4..72d1faa 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -122,6 +122,23 @@ Default value:: N/A
Applicable socket types:: ZMQ_SUB
+ZMQ_IDENTITY: Set socket identity
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+The 'ZMQ_IDENTITY' option shall set the identity of the specified 'socket'.
+Socket identity is used only by request/reply pattern. Namely, it can be used
+in tandem with ROUTER socket to route messages to the peer with specific
+identity.
+
+Identity should be at least one byte and at most 255 bytes long. Identities
+starting with binary zero are reserved for use by 0MQ infrastructure.
+
+[horizontal]
+Option value type:: binary data
+Option value unit:: N/A
+Default value:: NULL
+Applicable socket types:: all
+
+
ZMQ_RATE: Set multicast data rate
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RATE' option shall set the maximum send or receive data rate for
diff --git a/doc/zmq_socket.txt b/doc/zmq_socket.txt
index e138ebe..1ba2f42 100644
--- a/doc/zmq_socket.txt
+++ b/doc/zmq_socket.txt
@@ -81,11 +81,6 @@ any linkzmq:zmq_send[3] operations on the socket shall block until the
exceptional state ends or at least one _service_ becomes available for sending;
messages are not discarded.
-'ZMQ_REQ' socket adds a unique 'request ID' label to every outbound message.
-When receiving a reply, it checks whether the 'request ID' of the reply matches
-the last 'request ID' sent. If it does not, the message is silently dropped and
-waiting for the reply is resumed.
-
[horizontal]
.Summary of ZMQ_REQ characteristics
Compatible peer sockets:: 'ZMQ_REP'
@@ -108,10 +103,6 @@ When a 'ZMQ_REP' socket enters an exceptional state due to having reached the
high water mark for a _client_, then any replies sent to the _client_ in
question shall be dropped until the exceptional state ends.
-'ZMQ_REP' socket strips all the labels from the incoming message, stores them
-and passes the remaining data parts to the user. When user sends the reply,
-the stored labels are re-attached to the reply.
-
[horizontal]
.Summary of ZMQ_REP characteristics
Compatible peer sockets:: 'ZMQ_REQ'
@@ -136,8 +127,6 @@ linkzmq:zmq_send[3] operations on the socket shall block until the exceptional
state ends or at least one peer becomes available for sending; messages are not
discarded.
-'ZMQ_XREQ' socket doesn't inspect or modify the message labels.
-
[horizontal]
.Summary of ZMQ_XREQ characteristics
Compatible peer sockets:: 'ZMQ_XREP', 'ZMQ_REP'
@@ -162,14 +151,6 @@ messages sent to the socket shall be dropped until the exceptional state ends.
Likewise, any messages to be routed to a non-existent peer or a peer for which
the individual high water mark has been reached shall also be dropped.
-When receiving messages a 'ZMQ_XREP' socket attaches a label uniquely
-identifying the originating peer to the message before passing it to the
-application.
-
-When sending messages a 'ZMQ_XREP' socket removes the first label from the
-message and uses it to determine which the peer the message shall be routed to.
-If the peer does not exist anymore the message is silently discarded.
-
[horizontal]
.Summary of ZMQ_XREP characteristics
Compatible peer sockets:: 'ZMQ_XREQ', 'ZMQ_REQ'
@@ -196,8 +177,6 @@ high water mark for a _subscriber_, then any messages that would be sent to the
_subscriber_ in question shall instead be dropped until the exceptional state
ends. The _zmq_send()_ function shall never block for this socket type.
-This socket type doesn't use message labels.
-
[horizontal]
.Summary of ZMQ_PUB characteristics
Compatible peer sockets:: 'ZMQ_SUB', 'ZMQ_XSUB'
@@ -215,8 +194,6 @@ any messages, use the 'ZMQ_SUBSCRIBE' option of linkzmq:zmq_setsockopt[3] to
specify which messages to subscribe to. The _zmq_send()_ function is not
implemented for this socket type.
-This socket type doesn't use message labels.
-
[horizontal]
.Summary of ZMQ_SUB characteristics
Compatible peer sockets:: 'ZMQ_PUB', 'ZMQ_XPUB'
@@ -233,8 +210,6 @@ in form of incoming messages. Subscription message is a byte 1 (for
subscriptions) or byte 0 (for unsubscriptions) followed by the subscription
body.
-This socket type doesn't use message labels.
-
[horizontal]
.Summary of ZMQ_XPUB characteristics
Compatible peer sockets:: 'ZMQ_SUB', 'ZMQ_XSUB'
@@ -250,8 +225,6 @@ Same as ZMQ_SUB except that you subscribe by sending subscription messages to
the socket. Subscription message is a byte 1 (for subscriptions) or byte 0
(for unsubscriptions) followed by the subscription body.
-This socket type doesn't use message labels.
-
[horizontal]
.Summary of ZMQ_XSUB characteristics
Compatible peer sockets:: 'ZMQ_PUB', 'ZMQ_XPUB'
@@ -282,8 +255,6 @@ _nodes_ at all, then any linkzmq:zmq_send[3] operations on the socket shall
block until the exceptional state ends or at least one downstream _node_
becomes available for sending; messages are not discarded.
-This socket type doesn't use message labels.
-
[horizontal]
.Summary of ZMQ_PUSH characteristics
Compatible peer sockets:: 'ZMQ_PULL'
@@ -301,8 +272,6 @@ from upstream pipeline _nodes_. Messages are fair-queued from among all
connected upstream _nodes_. The _zmq_send()_ function is not implemented for
this socket type.
-This socket type doesn't use message labels.
-
[horizontal]
.Summary of ZMQ_PULL characteristics
Compatible peer sockets:: 'ZMQ_PUSH'
@@ -330,8 +299,6 @@ high water mark for the connected peer, or if no peer is connected, then
any linkzmq:zmq_send[3] operations on the socket shall block until the peer
becomes available for sending; messages are not discarded.
-This socket type doesn't use message labels.
-
NOTE: 'ZMQ_PAIR' sockets are experimental, and are currently missing several
features such as auto-reconnection.
diff --git a/include/zmq.h b/include/zmq.h
index e236b2a..09ed89c 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2010 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -162,10 +164,13 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_PUSH 8
#define ZMQ_XPUB 9
#define ZMQ_XSUB 10
-#define ZMQ_ROUTER 13
+
+#define ZMQ_ROUTER ZMQ_XREP
+#define ZMQ_DEALER ZMQ_XREQ
/* Socket options. */
#define ZMQ_AFFINITY 4
+#define ZMQ_IDENTITY 5
#define ZMQ_SUBSCRIBE 6
#define ZMQ_UNSUBSCRIBE 7
#define ZMQ_RATE 8
@@ -186,15 +191,11 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_MULTICAST_HOPS 25
#define ZMQ_RCVTIMEO 27
#define ZMQ_SNDTIMEO 28
-#define ZMQ_RCVLABEL 29
-#define ZMQ_RCVCMD 30
#define ZMQ_IPV4ONLY 31
/* Send/recv options. */
#define ZMQ_DONTWAIT 1
#define ZMQ_SNDMORE 2
-#define ZMQ_SNDLABEL 4
-#define ZMQ_SNDCMD 8
ZMQ_EXPORT void *zmq_socket (void *context, int type);
ZMQ_EXPORT int zmq_close (void *s);
diff --git a/include/zmq_utils.h b/include/zmq_utils.h
index 6d8a458..341d639 100644
--- a/include/zmq_utils.h
+++ b/include/zmq_utils.h
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/perf/inproc_lat.cpp b/perf/inproc_lat.cpp
index 7c15013..5b6a830 100644
--- a/perf/inproc_lat.cpp
+++ b/perf/inproc_lat.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/perf/inproc_thr.cpp b/perf/inproc_thr.cpp
index 4673eba..b4cadfc 100644
--- a/perf/inproc_thr.cpp
+++ b/perf/inproc_thr.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/perf/local_lat.cpp b/perf/local_lat.cpp
index 999e799..714b8c0 100644
--- a/perf/local_lat.cpp
+++ b/perf/local_lat.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/perf/local_thr.cpp b/perf/local_thr.cpp
index a21707f..5c495d8 100644
--- a/perf/local_thr.cpp
+++ b/perf/local_thr.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/perf/remote_lat.cpp b/perf/remote_lat.cpp
index 0d438e9..9eb76b0 100644
--- a/perf/remote_lat.cpp
+++ b/perf/remote_lat.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/perf/remote_thr.cpp b/perf/remote_thr.cpp
index c8df333..328bdce 100644
--- a/perf/remote_thr.cpp
+++ b/perf/remote_thr.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/Makefile.am b/src/Makefile.am
index 3b7dec6..4d3cba3 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -9,6 +9,7 @@ libzmq_la_SOURCES = \
array.hpp \
atomic_counter.hpp \
atomic_ptr.hpp \
+ blob.hpp \
clock.hpp \
command.hpp \
config.hpp \
@@ -55,7 +56,6 @@ libzmq_la_SOURCES = \
reaper.hpp \
rep.hpp \
req.hpp \
- router.hpp \
select.hpp \
session_base.hpp \
signaler.hpp \
@@ -68,8 +68,6 @@ libzmq_la_SOURCES = \
tcp_listener.hpp \
thread.hpp \
trie.hpp \
- vtcp_connecter.hpp \
- vtcp_listener.hpp \
windows.hpp \
wire.hpp \
xpub.hpp \
@@ -113,7 +111,6 @@ libzmq_la_SOURCES = \
reaper.cpp \
pub.cpp \
random.cpp \
- router.cpp \
rep.cpp \
req.cpp \
select.cpp \
@@ -127,8 +124,6 @@ libzmq_la_SOURCES = \
tcp_listener.cpp \
thread.cpp \
trie.cpp \
- vtcp_connecter.cpp \
- vtcp_listener.cpp \
xpub.cpp \
xrep.cpp \
xreq.cpp \
diff --git a/src/array.hpp b/src/array.hpp
index b1f6eca..7e4ddd4 100644
--- a/src/array.hpp
+++ b/src/array.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/atomic_counter.hpp b/src/atomic_counter.hpp
index d7116d8..a0a67bf 100644
--- a/src/atomic_counter.hpp
+++ b/src/atomic_counter.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/atomic_ptr.hpp b/src/atomic_ptr.hpp
index c106cd5..c59ab81 100644
--- a/src/atomic_ptr.hpp
+++ b/src/atomic_ptr.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/blob.hpp b/src/blob.hpp
new file mode 100644
index 0000000..b8039c4
--- /dev/null
+++ b/src/blob.hpp
@@ -0,0 +1,35 @@
+/*
+ Copyright (c) 2010 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_BLOB_HPP_INCLUDED__
+#define __ZMQ_BLOB_HPP_INCLUDED__
+
+#include <string>
+
+namespace zmq
+{
+
+ // Object to hold dynamically allocated opaque binary data.
+ typedef std::basic_string <unsigned char> blob_t;
+
+}
+
+#endif
+
diff --git a/src/clock.cpp b/src/clock.cpp
index f98a2f4..92fc4be 100644
--- a/src/clock.cpp
+++ b/src/clock.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/clock.hpp b/src/clock.hpp
index 1b34989..b3b19b2 100644
--- a/src/clock.hpp
+++ b/src/clock.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/command.hpp b/src/command.hpp
index 1513ca8..ecf2d93 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/config.hpp b/src/config.hpp
index 8e512bb..f7b4c50 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 8aa10d9..d8783be 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -1,4 +1,5 @@
/*
+ Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 22ac932..619d57e 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/decoder.cpp b/src/decoder.cpp
index d57265a..48f457f 100644
--- a/src/decoder.cpp
+++ b/src/decoder.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/decoder.hpp b/src/decoder.hpp
index de63a09..c6f9100 100644
--- a/src/decoder.hpp
+++ b/src/decoder.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/devpoll.cpp b/src/devpoll.cpp
index c4b3c54..0c46d14 100644
--- a/src/devpoll.cpp
+++ b/src/devpoll.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/devpoll.hpp b/src/devpoll.hpp
index a668e9a..1de1af0 100644
--- a/src/devpoll.hpp
+++ b/src/devpoll.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/dist.cpp b/src/dist.cpp
index 795e13e..d220c43 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -1,6 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 VMware, Inc.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -111,8 +112,7 @@ int zmq::dist_t::send_to_all (msg_t *msg_, int flags_)
int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
{
// Is this end of a multipart message?
- bool msg_more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool msg_more = msg_->flags () & msg_t::more ? true : false;
// Push the message to matching pipes.
distribute (msg_, flags_);
@@ -137,6 +137,16 @@ void zmq::dist_t::distribute (msg_t *msg_, int flags_)
return;
}
+ if (msg_->is_vsm ()) {
+ for (pipes_t::size_type i = 0; i < matching; ++i)
+ write (pipes [i], msg_);
+ int rc = msg_->close();
+ errno_assert (rc == 0);
+ rc = msg_->init ();
+ errno_assert (rc == 0);
+ return;
+ }
+
// Add matching-1 references to the message. We already hold one reference,
// that's why -1.
msg_->add_refs ((int) matching - 1);
@@ -171,7 +181,7 @@ bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
eligible--;
return false;
}
- if (!(msg_->flags () & (msg_t::more | msg_t::label)))
+ if (!(msg_->flags () & msg_t::more))
pipe_->flush ();
return true;
}
diff --git a/src/dist.hpp b/src/dist.hpp
index c8d121c..a72de6e 100644
--- a/src/dist.hpp
+++ b/src/dist.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/encoder.cpp b/src/encoder.cpp
index 8689e45..030b3ef 100644
--- a/src/encoder.cpp
+++ b/src/encoder.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -89,14 +91,14 @@ bool zmq::encoder_t::message_ready ()
tmpbuf [0] = (unsigned char) size;
tmpbuf [1] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 2, &encoder_t::size_ready,
- !(in_progress.flags () & (msg_t::more | msg_t::label)));
+ !(in_progress.flags () & msg_t::more));
}
else {
tmpbuf [0] = 0xff;
put_uint64 (tmpbuf + 1, size);
tmpbuf [9] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 10, &encoder_t::size_ready,
- !(in_progress.flags () & (msg_t::more | msg_t::label)));
+ !(in_progress.flags () & msg_t::more));
}
return true;
}
diff --git a/src/encoder.hpp b/src/encoder.hpp
index 949cbdc..8001c4e 100644
--- a/src/encoder.hpp
+++ b/src/encoder.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/epoll.cpp b/src/epoll.cpp
index 39b4547..a62345d 100644
--- a/src/epoll.cpp
+++ b/src/epoll.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/epoll.hpp b/src/epoll.hpp
index dc6b3ed..9bc31a5 100644
--- a/src/epoll.hpp
+++ b/src/epoll.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/err.cpp b/src/err.cpp
index ff81e03..028d752 100644
--- a/src/err.cpp
+++ b/src/err.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/err.hpp b/src/err.hpp
index 7c7a9d8..53a6569 100644
--- a/src/err.hpp
+++ b/src/err.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/fd.hpp b/src/fd.hpp
index 3b15024..773e380 100644
--- a/src/fd.hpp
+++ b/src/fd.hpp
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/fq.cpp b/src/fq.cpp
index abd4160..429c038 100644
--- a/src/fq.cpp
+++ b/src/fq.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -91,7 +93,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_)
if (pipe_)
*pipe_ = pipes [current];
more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ msg_->flags () & msg_t::more ? true : false;
if (!more) {
current++;
if (current >= active)
diff --git a/src/fq.hpp b/src/fq.hpp
index be9c695..24d7b85 100644
--- a/src/fq.hpp
+++ b/src/fq.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index 26e475b..19359b7 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/i_poll_events.hpp b/src/i_poll_events.hpp
index fa9fb25..9cf47fd 100644
--- a/src/i_poll_events.hpp
+++ b/src/i_poll_events.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/io_object.cpp b/src/io_object.cpp
index e68917c..81b9ce5 100644
--- a/src/io_object.cpp
+++ b/src/io_object.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/io_object.hpp b/src/io_object.hpp
index fb0d1e3..bf7a625 100644
--- a/src/io_object.hpp
+++ b/src/io_object.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index c6f3880..40bbef9 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index f578d4e..986c88d 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ip.cpp b/src/ip.cpp
index 8090a8a..0b4596a 100644
--- a/src/ip.cpp
+++ b/src/ip.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ip.hpp b/src/ip.hpp
index d8553de..c5f31db 100644
--- a/src/ip.hpp
+++ b/src/ip.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ipc_address.cpp b/src/ipc_address.cpp
index 6a471a6..d601c56 100644
--- a/src/ipc_address.cpp
+++ b/src/ipc_address.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ipc_address.hpp b/src/ipc_address.hpp
index 453f5fd..4a7f230 100644
--- a/src/ipc_address.hpp
+++ b/src/ipc_address.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp
index a54e8fe..dc0ee21 100644
--- a/src/ipc_connecter.cpp
+++ b/src/ipc_connecter.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp
index 721bcf4..c02245a 100644
--- a/src/ipc_connecter.hpp
+++ b/src/ipc_connecter.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp
index 5ba41be..07a7dff 100644
--- a/src/ipc_listener.cpp
+++ b/src/ipc_listener.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ipc_listener.hpp b/src/ipc_listener.hpp
index 4cd881b..0f06d23 100644
--- a/src/ipc_listener.hpp
+++ b/src/ipc_listener.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/kqueue.cpp b/src/kqueue.cpp
index cbf38d1..0b07fab 100644
--- a/src/kqueue.cpp
+++ b/src/kqueue.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/kqueue.hpp b/src/kqueue.hpp
index 4ded81e..14f4e49 100644
--- a/src/kqueue.hpp
+++ b/src/kqueue.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/lb.cpp b/src/lb.cpp
index da7cb9d..2a0f769 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -75,7 +77,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_)
// switch back to non-dropping mode.
if (dropping) {
- more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more = msg_->flags () & msg_t::more ? true : false;
if (!more)
dropping = false;
@@ -88,8 +90,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_)
while (active > 0) {
if (pipes [current]->write (msg_)) {
- more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more = msg_->flags () & msg_t::more ? true : false;
break;
}
diff --git a/src/lb.hpp b/src/lb.hpp
index 0dfd25e..1de8549 100644
--- a/src/lb.hpp
+++ b/src/lb.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/likely.hpp b/src/likely.hpp
index a524a50..e604464 100644
--- a/src/likely.hpp
+++ b/src/likely.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2009-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/mailbox.cpp b/src/mailbox.cpp
index 9fd3ac4..ff16afe 100644
--- a/src/mailbox.cpp
+++ b/src/mailbox.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/mailbox.hpp b/src/mailbox.hpp
index 0675b99..c059c2a 100644
--- a/src/mailbox.hpp
+++ b/src/mailbox.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/msg.cpp b/src/msg.cpp
index e51ab67..60d5bf3 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -237,6 +238,11 @@ bool zmq::msg_t::is_delimiter ()
return u.base.type == type_delimiter;
}
+bool zmq::msg_t::is_vsm ()
+{
+ return u.base.type == type_vsm;
+}
+
void zmq::msg_t::add_refs (int refs_)
{
zmq_assert (refs_ >= 0);
@@ -279,3 +285,4 @@ bool zmq::msg_t::rm_refs (int refs_)
return true;
}
+
diff --git a/src/msg.hpp b/src/msg.hpp
index 514f95b..8c84670 100644
--- a/src/msg.hpp
+++ b/src/msg.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -47,10 +49,9 @@ namespace zmq
// Mesage flags.
enum
{
- label = 1,
- command = 2,
- shared = 64,
- more = 128
+ more = 1,
+ identity = 64,
+ shared = 128
};
bool check ();
@@ -68,6 +69,7 @@ namespace zmq
void set_flags (unsigned char flags_);
void reset_flags (unsigned char flags_);
bool is_delimiter ();
+ bool is_vsm ();
// After calling this function you can copy the message in POD-style
// refs_ times. No need to call copy.
diff --git a/src/mtrie.cpp b/src/mtrie.cpp
index 66bea20..1c96c98 100644
--- a/src/mtrie.cpp
+++ b/src/mtrie.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/mtrie.hpp b/src/mtrie.hpp
index 2c2cc32..8bbc22d 100644
--- a/src/mtrie.hpp
+++ b/src/mtrie.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/mutex.hpp b/src/mutex.hpp
index 9b13ffa..8d7068a 100644
--- a/src/mutex.hpp
+++ b/src/mutex.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/object.cpp b/src/object.cpp
index 807fb04..622754c 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/object.hpp b/src/object.hpp
index 1a38b24..f832596 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/options.cpp b/src/options.cpp
index 8a3e527..4db1a6c 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -27,6 +29,7 @@ zmq::options_t::options_t () :
sndhwm (1000),
rcvhwm (1000),
affinity (0),
+ identity_size (0),
rate (100),
recovery_ivl (10000),
multicast_hops (1),
@@ -43,7 +46,9 @@ zmq::options_t::options_t () :
ipv4only (1),
delay_on_close (true),
delay_on_disconnect (true),
- filter (false)
+ filter (false),
+ send_identity (false),
+ recv_identity (false)
{
}
@@ -76,6 +81,20 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
affinity = *((uint64_t*) optval_);
return 0;
+ case ZMQ_IDENTITY:
+
+ // Empty identity is invalid as well as identity longer than
+ // 255 bytes. Identity starting with binary zero is invalid
+ // as these are used for auto-generated identities.
+ if (optvallen_ < 1 || optvallen_ > 255 ||
+ *((const unsigned char*) optval_) == 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ identity_size = optvallen_;
+ memcpy (identity, optval_, identity_size);
+ return 0;
+
case ZMQ_RATE:
if (optvallen_ != sizeof (int) || *((int*) optval_) <= 0) {
errno = EINVAL;
@@ -232,6 +251,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (uint64_t);
return 0;
+ case ZMQ_IDENTITY:
+ if (*optvallen_ < identity_size) {
+ errno = EINVAL;
+ return -1;
+ }
+ memcpy (optval_, identity, identity_size);
+ *optvallen_ = identity_size;
+ return 0;
+
case ZMQ_RATE:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index 4689522..bfc9dc7 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -41,6 +43,10 @@ namespace zmq
// I/O thread affinity.
uint64_t affinity;
+ // Socket identity
+ unsigned char identity_size;
+ unsigned char identity [256];
+
// Maximum tranfer rate [kb/s]. Default 100kb/s.
int rate;
@@ -93,6 +99,12 @@ namespace zmq
// If 1, (X)SUB socket should filter the messages. If 0, it should not.
bool filter;
+
+ // Sends identity to all new connections.
+ bool send_identity;
+
+ // Receivers identity from all new connections.
+ bool recv_identity;
};
}
diff --git a/src/own.cpp b/src/own.cpp
index f2ca4b2..d6dd309 100644
--- a/src/own.cpp
+++ b/src/own.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/own.hpp b/src/own.hpp
index 0902f73..ad5c452 100644
--- a/src/own.hpp
+++ b/src/own.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pair.cpp b/src/pair.cpp
index 2fa4eac..6c652db 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pair.hpp b/src/pair.hpp
index e7390d6..67de2fd 100644
--- a/src/pair.hpp
+++ b/src/pair.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 6c292cd..122d110 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2010-2011 Miru Limited
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index b9e9a05..3c1d394 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2010-2011 Miru Limited
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 733b1ec..759802f 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2010-2011 Miru Limited
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index d3d5924..d8f046d 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2010-2011 Miru Limited
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 378370c..0274ee4 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2010-2011 Miru Limited
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp
index 8b1be54..5a5ef99 100644
--- a/src/pgm_socket.hpp
+++ b/src/pgm_socket.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2010-2011 Miru Limited
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pipe.cpp b/src/pipe.cpp
index c52deb9..25dd51c 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -63,8 +65,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
peer (NULL),
sink (NULL),
state (active),
- delay (delay_),
- pipe_id (0)
+ delay (delay_)
{
}
@@ -86,14 +87,14 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
sink = sink_;
}
-void zmq::pipe_t::set_pipe_id (uint32_t id_)
+void zmq::pipe_t::set_identity (const blob_t &identity_)
{
- pipe_id = id_;
+ identity = identity_;
}
-uint32_t zmq::pipe_t::get_pipe_id ()
+zmq::blob_t zmq::pipe_t::get_identity ()
{
- return pipe_id;
+ return identity;
}
bool zmq::pipe_t::check_read ()
@@ -136,7 +137,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
return false;
}
- if (!(msg_->flags () & (msg_t::more | msg_t::label)))
+ if (!(msg_->flags () & msg_t::more))
msgs_read++;
if (lwm > 0 && msgs_read % lwm == 0)
@@ -165,7 +166,7 @@ bool zmq::pipe_t::write (msg_t *msg_)
if (unlikely (!check_write (msg_)))
return false;
- bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool more = msg_->flags () & msg_t::more ? true : false;
outpipe->write (*msg_, more);
if (!more)
msgs_written++;
@@ -179,7 +180,7 @@ void zmq::pipe_t::rollback ()
msg_t msg;
if (outpipe) {
while (outpipe->unwrite (&msg)) {
- zmq_assert (msg.flags () & (msg_t::more | msg_t::label));
+ zmq_assert (msg.flags () & msg_t::more);
int rc = msg.close ();
errno_assert (rc == 0);
}
diff --git a/src/pipe.hpp b/src/pipe.hpp
index 437d84d..75a2021 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -27,6 +29,7 @@
#include "object.hpp"
#include "stdint.hpp"
#include "array.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -70,8 +73,8 @@ namespace zmq
void set_event_sink (i_pipe_events *sink_);
// Pipe endpoint can store an opaque ID to be used by its clients.
- void set_pipe_id (uint32_t id_);
- uint32_t get_pipe_id ();
+ void set_identity (const blob_t &identity_);
+ blob_t get_identity ();
// Returns true if there is at least one message to read in the pipe.
bool check_read ();
@@ -182,8 +185,8 @@ namespace zmq
// asks us to.
bool delay;
- // Opaque ID. To be used by the clients, not the pipe itself.
- uint32_t pipe_id;
+ // Identity of the writer. Used uniquely by the reader side.
+ blob_t identity;
// Returns true if the message is delimiter; false otherwise.
static bool is_delimiter (msg_t &msg_);
diff --git a/src/poll.cpp b/src/poll.cpp
index 9d1978b..1d1c423 100644
--- a/src/poll.cpp
+++ b/src/poll.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/poll.hpp b/src/poll.hpp
index 42f3af1..700256d 100644
--- a/src/poll.hpp
+++ b/src/poll.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/poller.hpp b/src/poller.hpp
index a8936ce..a989328 100644
--- a/src/poller.hpp
+++ b/src/poller.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/poller_base.cpp b/src/poller_base.cpp
index d5fb985..6e532ae 100644
--- a/src/poller_base.cpp
+++ b/src/poller_base.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/poller_base.hpp b/src/poller_base.hpp
index 44fe9f1..808ed38 100644
--- a/src/poller_base.hpp
+++ b/src/poller_base.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pub.cpp b/src/pub.cpp
index 15ec291..7458d5f 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pub.hpp b/src/pub.hpp
index 4a4da0f..d418fd4 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pull.cpp b/src/pull.cpp
index 06575da..6028118 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2010 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pull.hpp b/src/pull.hpp
index 6a46ead..fa36d49 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2010 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/push.cpp b/src/push.cpp
index e91b789..a0ed992 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2010 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/push.hpp b/src/push.hpp
index 1feb71d..ea93693 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2010 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/random.cpp b/src/random.cpp
index 9f7768c..326a3d9 100644
--- a/src/random.cpp
+++ b/src/random.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/random.hpp b/src/random.hpp
index d88b5ee..ca3d39a 100644
--- a/src/random.hpp
+++ b/src/random.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/reaper.cpp b/src/reaper.cpp
index 4c67b37..716f638 100644
--- a/src/reaper.cpp
+++ b/src/reaper.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/reaper.hpp b/src/reaper.hpp
index edcc319..1c1533f 100644
--- a/src/reaper.hpp
+++ b/src/reaper.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/rep.cpp b/src/rep.cpp
index 564fa89..02a825c 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -1,4 +1,5 @@
/*
+ Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
@@ -42,7 +43,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_)
return -1;
}
- bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool more = msg_->flags () & msg_t::more ? true : false;
// Push message to the reply pipe.
int rc = xrep_t::xsend (msg_, flags_);
@@ -71,19 +72,20 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
int rc = xrep_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
- if (!(msg_->flags () & msg_t::label))
- break;
+ zmq_assert (msg_->flags () & msg_t::more);
+ bool bottom = (msg_->size () == 0);
rc = xrep_t::xsend (msg_, flags_);
errno_assert (rc == 0);
+ if (bottom)
+ break;
}
request_begins = false;
}
- else {
- int rc = xrep_t::xrecv (msg_, flags_);
- if (rc != 0)
- return rc;
- }
- zmq_assert (!(msg_->flags () & msg_t::label));
+
+ // Get next message part to return to the user.
+ int rc = xrep_t::xrecv (msg_, flags_);
+ if (rc != 0)
+ return rc;
// If whole request is read, flip the FSM to reply-sending state.
if (!(msg_->flags () & msg_t::more)) {
diff --git a/src/rep.hpp b/src/rep.hpp
index 55d57bd..de9c2b8 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -1,4 +1,5 @@
/*
+ Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
diff --git a/src/req.cpp b/src/req.cpp
index 04a19fb..3ba1ec0 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -28,8 +30,7 @@
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) :
xreq_t (parent_, tid_),
receiving_reply (false),
- message_begins (true),
- request_id (generate_random ())
+ message_begins (true)
{
options.type = ZMQ_REQ;
}
@@ -49,19 +50,17 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
// First part of the request is the request identity.
if (message_begins) {
- msg_t prefix;
- int rc = prefix.init_size (4);
+ msg_t bottom;
+ int rc = bottom.init ();
errno_assert (rc == 0);
- prefix.set_flags (msg_t::label);
- unsigned char *data = (unsigned char*) prefix.data ();
- put_uint32 (data, request_id);
- rc = xreq_t::xsend (&prefix, flags_);
+ bottom.set_flags (msg_t::more);
+ rc = xreq_t::xsend (&bottom, 0);
if (rc != 0)
- return rc;
+ return -1;
message_begins = false;
}
- bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool more = msg_->flags () & msg_t::more ? true : false;
int rc = xreq_t::xsend (msg_, flags_);
if (rc != 0)
@@ -91,19 +90,11 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
return rc;
// TODO: This should also close the connection with the peer!
- if (unlikely (!(msg_->flags () & msg_t::label) || msg_->size () != 4)) {
- errno = EAGAIN;
- return -1;
- }
-
- unsigned char *data = (unsigned char*) msg_->data ();
- if (unlikely (get_uint32 (data) != request_id)) {
-
- // The request ID does not match. Drop the entire message.
+ if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
while (true) {
int rc = xreq_t::xrecv (msg_, flags_);
errno_assert (rc == 0);
- if (!(msg_->flags () & (msg_t::label | msg_t::more)))
+ if (!(msg_->flags () & msg_t::more))
break;
}
msg_->close ();
@@ -111,6 +102,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
errno = EAGAIN;
return -1;
}
+
message_begins = false;
}
@@ -119,8 +111,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
return rc;
// If the reply is fully received, flip the FSM into request-sending state.
- if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
- request_id++;
+ if (!(msg_->flags () & msg_t::more)) {
receiving_reply = false;
message_begins = true;
}
@@ -156,23 +147,32 @@ zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
zmq::req_session_t::~req_session_t ()
{
+ state = options.recv_identity ? identity : bottom;
}
int zmq::req_session_t::write (msg_t *msg_)
{
- if (state == request_id) {
- if (msg_->flags () == msg_t::label && msg_->size () == 4) {
+ switch (state) {
+ case bottom:
+ if (msg_->flags () == msg_t::more && msg_->size () == 0) {
state = body;
return xreq_session_t::write (msg_);
}
- }
- else {
+ break;
+ case body:
if (msg_->flags () == msg_t::more)
return xreq_session_t::write (msg_);
if (msg_->flags () == 0) {
- state = request_id;
+ state = bottom;
+ return xreq_session_t::write (msg_);
+ }
+ break;
+ case identity:
+ if (msg_->flags () == 0) {
+ state = bottom;
return xreq_session_t::write (msg_);
}
+ break;
}
errno = EFAULT;
return -1;
diff --git a/src/req.hpp b/src/req.hpp
index 0207a4f..8fae9d4 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -50,10 +52,6 @@ namespace zmq
// of the message must be empty message part (backtrace stack bottom).
bool message_begins;
- // Request ID. Request numbers gradually increase (and wrap over)
- // so that we don't have to generate random ID for each request.
- uint32_t request_id;
-
req_t (const req_t&);
const req_t &operator = (const req_t&);
};
@@ -73,7 +71,8 @@ namespace zmq
private:
enum {
- request_id,
+ identity,
+ bottom,
body
} state;
diff --git a/src/router.cpp b/src/router.cpp
deleted file mode 100755
index b7e19fb..0000000
--- a/src/router.cpp
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "router.hpp"
-#include "pipe.hpp"
-#include "wire.hpp"
-#include "random.hpp"
-#include "likely.hpp"
-#include "wire.hpp"
-#include "err.hpp"
-
-zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_) :
- socket_base_t (parent_, tid_),
- prefetched (false),
- more_in (false),
- current_out (NULL),
- more_out (false),
- next_peer_id (generate_random ())
-{
- options.type = ZMQ_ROUTER;
-
- prefetched_msg.init ();
-}
-
-zmq::router_t::~router_t ()
-{
- zmq_assert (outpipes.empty ());
- prefetched_msg.close ();
-}
-
-void zmq::router_t::xattach_pipe (pipe_t *pipe_)
-{
- zmq_assert (pipe_);
-
- // Generate a new peer ID. Take care to avoid duplicates.
- outpipes_t::iterator it = outpipes.lower_bound (next_peer_id);
- if (!outpipes.empty ()) {
- while (true) {
- if (it == outpipes.end ())
- it = outpipes.begin ();
- if (it->first != next_peer_id)
- break;
- ++next_peer_id;
- ++it;
- }
- }
-
- // Add the pipe to the map out outbound pipes.
- outpipe_t outpipe = {pipe_, true};
- bool ok = outpipes.insert (outpipes_t::value_type (
- next_peer_id, outpipe)).second;
- zmq_assert (ok);
-
- // Add the pipe to the list of inbound pipes.
- pipe_->set_pipe_id (next_peer_id);
- fq.attach (pipe_);
-
- // Queue the connection command.
- pending_command_t cmd = {1, next_peer_id};
- pending_commands.push_back (cmd);
-
- // Advance next peer ID so that if new connection is dropped shortly after
- // its creation we don't accidentally get two subsequent peers with
- // the same ID.
- ++next_peer_id;
-}
-
-void zmq::router_t::xterminated (pipe_t *pipe_)
-{
- fq.terminated (pipe_);
-
- for (outpipes_t::iterator it = outpipes.begin ();
- it != outpipes.end (); ++it) {
- if (it->second.pipe == pipe_) {
-
- // Queue the disconnection command.
- pending_command_t cmd = {2, it->first};
- pending_commands.push_back (cmd);
-
- // Remove the pipe.
- outpipes.erase (it);
- if (pipe_ == current_out)
- current_out = NULL;
- return;
- }
- }
- zmq_assert (false);
-}
-
-void zmq::router_t::xread_activated (pipe_t *pipe_)
-{
- fq.activated (pipe_);
-}
-
-void zmq::router_t::xwrite_activated (pipe_t *pipe_)
-{
- for (outpipes_t::iterator it = outpipes.begin ();
- it != outpipes.end (); ++it) {
- if (it->second.pipe == pipe_) {
- zmq_assert (!it->second.active);
- it->second.active = true;
- return;
- }
- }
- zmq_assert (false);
-}
-
-int zmq::router_t::xsend (msg_t *msg_, int flags_)
-{
- // If this is the first part of the message it's the ID of the
- // peer to send the message to.
- if (!more_out) {
- zmq_assert (!current_out);
-
- // The first message part has to be label.
- if (unlikely (!(msg_->flags () & msg_t::label))) {
- errno = EFSM;
- return -1;
- }
-
- // Find the pipe associated with the peer ID stored in the message.
- if (unlikely (msg_->size () != 4)) {
- errno = ECANTROUTE;
- return -1;
- }
- uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
- outpipes_t::iterator it = outpipes.find (peer_id);
- if (unlikely (it == outpipes.end ())) {
- errno = ECANTROUTE;
- return -1;
- }
-
- // Check whether the pipe is available for writing.
- msg_t empty;
- int rc = empty.init ();
- errno_assert (rc == 0);
- if (!it->second.pipe->check_write (&empty)) {
- rc = empty.close ();
- errno_assert (rc == 0);
- it->second.active = false;
- errno = EAGAIN;
- return -1;
- }
- rc = empty.close ();
- errno_assert (rc == 0);
-
- // Mark the pipe to send the message to.
- current_out = it->second.pipe;
- more_out = true;
-
- // Clean up the message object.
- rc = msg_->close ();
- errno_assert (rc == 0);
- rc = msg_->init ();
- errno_assert (rc == 0);
- return 0;
- }
-
- // Check whether this is the last part of the message.
- more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
-
- // Push the message into the pipe. If there's no out pipe, just drop it.
- if (current_out) {
- bool ok = current_out->write (msg_);
- if (unlikely (!ok))
- current_out = NULL;
- else if (!more_out) {
- current_out->flush ();
- current_out = NULL;
- }
- }
- else {
- int rc = msg_->close ();
- errno_assert (rc == 0);
- }
-
- // Detach the message from the data buffer.
- int rc = msg_->init ();
- errno_assert (rc == 0);
-
- return 0;
-}
-
-int zmq::router_t::xrecv (msg_t *msg_, int flags_)
-{
- // If there's a queued command, pass it to the caller.
- if (unlikely (!more_in && !pending_commands.empty ())) {
- msg_->init_size (5);
- unsigned char *data = (unsigned char*) msg_->data ();
- put_uint8 (data, pending_commands.front ().cmd);
- put_uint32 (data + 1, pending_commands.front ().peer);
- msg_->set_flags (msg_t::command);
- pending_commands.pop_front ();
- return 0;
- }
-
- // If there is a prefetched message, return it.
- if (prefetched) {
- int rc = msg_->move (prefetched_msg);
- errno_assert (rc == 0);
- more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
- prefetched = false;
- return 0;
- }
-
- // Get next message part.
- pipe_t *pipe;
- int rc = fq.recvpipe (msg_, flags_, &pipe);
- if (rc != 0)
- return -1;
-
- // If we are in the middle of reading a message, just return the next part.
- if (more_in) {
- more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
- return 0;
- }
-
- // We are at the beginning of a new message. Move the message part we
- // have to the prefetched and return the ID of the peer instead.
- rc = prefetched_msg.move (*msg_);
- errno_assert (rc == 0);
- prefetched = true;
- rc = msg_->close ();
- errno_assert (rc == 0);
- rc = msg_->init_size (4);
- errno_assert (rc == 0);
- put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ());
- msg_->set_flags (msg_t::label);
- return 0;
-}
-
-int zmq::router_t::rollback (void)
-{
- if (current_out) {
- current_out->rollback ();
- current_out = NULL;
- more_out = false;
- }
- return 0;
-}
-
-bool zmq::router_t::xhas_in ()
-{
- if (prefetched)
- return true;
- return fq.has_in () || !pending_commands.empty();
-}
-
-bool zmq::router_t::xhas_out ()
-{
- // In theory, GENERIC socket is always ready for writing. Whether actual
- // attempt to write succeeds depends on whitch pipe the message is going
- // to be routed to.
- return true;
-}
-
-zmq::router_session_t::router_session_t (io_thread_t *io_thread_, bool connect_,
- socket_base_t *socket_, const options_t &options_,
- const char *protocol_, const char *address_) :
- session_base_t (io_thread_, connect_, socket_, options_, protocol_,
- address_)
-{
-}
-
-zmq::router_session_t::~router_session_t ()
-{
-}
-
diff --git a/src/router.hpp b/src/router.hpp
deleted file mode 100755
index 9a5c0f9..0000000
--- a/src/router.hpp
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_ROUTER_HPP_INCLUDED__
-#define __ZMQ_ROUTER_HPP_INCLUDED__
-
-#include <map>
-#include <deque>
-
-#include "socket_base.hpp"
-#include "session_base.hpp"
-#include "stdint.hpp"
-#include "msg.hpp"
-#include "fq.hpp"
-
-namespace zmq
-{
-
- class router_t :
- public socket_base_t
- {
- public:
-
- router_t (class ctx_t *parent_, uint32_t tid_);
- ~router_t ();
-
- // Overloads of functions from socket_base_t.
- void xattach_pipe (class pipe_t *pipe_);
- int xsend (class msg_t *msg_, int flags_);
- int xrecv (class msg_t *msg_, int flags_);
- bool xhas_in ();
- bool xhas_out ();
- void xread_activated (class pipe_t *pipe_);
- void xwrite_activated (class pipe_t *pipe_);
- void xterminated (class pipe_t *pipe_);
-
- protected:
-
- // Rollback any message parts that were sent but not yet flushed.
- int rollback ();
-
- private:
-
- // Fair queueing object for inbound pipes.
- fq_t fq;
-
- // Have we prefetched a message.
- bool prefetched;
-
- // Holds the prefetched message.
- msg_t prefetched_msg;
-
- // If true, more incoming message parts are expected.
- bool more_in;
-
- struct outpipe_t
- {
- class pipe_t *pipe;
- bool active;
- };
-
- // Outbound pipes indexed by the peer IDs.
- typedef std::map <uint32_t, outpipe_t> outpipes_t;
- outpipes_t outpipes;
-
- // The pipe we are currently writing to.
- class pipe_t *current_out;
-
- // If true, more outgoing message parts are expected.
- bool more_out;
-
- // Peer ID are generated. It's a simple increment and wrap-over
- // algorithm. This value is the next ID to use (if not used already).
- uint32_t next_peer_id;
-
- // Commands to be delivered to the user.
- struct pending_command_t
- {
- uint8_t cmd;
- uint32_t peer;
- };
- typedef std::deque <pending_command_t> pending_commands_t;
- pending_commands_t pending_commands;
-
- router_t (const router_t&);
- const router_t &operator = (const router_t&);
- };
-
- class router_session_t : public session_base_t
- {
- public:
-
- router_session_t (class io_thread_t *io_thread_, bool connect_,
- class socket_base_t *socket_, const options_t &options_,
- const char *protocol_, const char *address_);
- ~router_session_t ();
-
- private:
-
- router_session_t (const router_session_t&);
- const router_session_t &operator = (const router_session_t&);
- };
-
-}
-
-#endif
diff --git a/src/select.cpp b/src/select.cpp
index 0ecdcd7..56b87ae 100644
--- a/src/select.cpp
+++ b/src/select.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/select.hpp b/src/select.hpp
index 55bc883..9231b6c 100644
--- a/src/select.hpp
+++ b/src/select.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 32dcd4f..f2ee713 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -26,7 +28,6 @@
#include "likely.hpp"
#include "tcp_connecter.hpp"
#include "ipc_connecter.hpp"
-#include "vtcp_connecter.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
@@ -40,7 +41,6 @@
#include "xsub.hpp"
#include "push.hpp"
#include "pull.hpp"
-#include "router.hpp"
#include "pair.hpp"
zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
@@ -88,10 +88,6 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
s = new (std::nothrow) pull_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
- case ZMQ_ROUTER:
- s = new (std::nothrow) router_session_t (io_thread_, connect_,
- socket_, options_, protocol_, address_);
- break;
case ZMQ_PAIR:
s = new (std::nothrow) pair_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
@@ -116,7 +112,9 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
engine (NULL),
socket (socket_),
io_thread (io_thread_),
- has_linger_timer (false)
+ has_linger_timer (false),
+ send_identity (options_.send_identity),
+ recv_identity (options_.recv_identity)
{
if (protocol_)
protocol = protocol_;
@@ -150,18 +148,33 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
int zmq::session_base_t::read (msg_t *msg_)
{
+ // First message to send is identity (if required).
+ if (send_identity) {
+ zmq_assert (!(msg_->flags () & msg_t::more));
+ msg_->init_size (options.identity_size);
+ memcpy (msg_->data (), options.identity, options.identity_size);
+ send_identity = false;
+ incomplete_in = false;
+ return 0;
+ }
+
if (!pipe || !pipe->read (msg_)) {
errno = EAGAIN;
return -1;
}
+ incomplete_in = msg_->flags () & msg_t::more ? true : false;
- incomplete_in =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
return 0;
}
int zmq::session_base_t::write (msg_t *msg_)
{
+ // First message to receive is identity (if required).
+ if (recv_identity) {
+ msg_->set_flags (msg_t::identity);
+ recv_identity = false;
+ }
+
if (pipe && pipe->write (msg_)) {
int rc = msg_->init ();
errno_assert (rc == 0);
@@ -398,18 +411,6 @@ void zmq::session_base_t::start_connecting (bool wait_)
}
#endif
-#if defined ZMQ_HAVE_VTCP
- if (protocol == "vtcp") {
-
- vtcp_connecter_t *connecter = new (std::nothrow) vtcp_connecter_t (
- io_thread, this, options, address.c_str (),
- wait_);
- alloc_assert (connecter);
- launch_child (connecter);
- return;
- }
-#endif
-
#if defined ZMQ_HAVE_OPENPGM
// Both PGM and EPGM transports are using the same infrastructure.
diff --git a/src/session_base.hpp b/src/session_base.hpp
index e388d42..c89628f 100644
--- a/src/session_base.hpp
+++ b/src/session_base.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -118,6 +120,10 @@ namespace zmq
// True is linger timer is running.
bool has_linger_timer;
+ // If true, identity is to be sent/recvd from the network.
+ bool send_identity;
+ bool recv_identity;
+
// Protocol and address to use when connecting.
std::string protocol;
std::string address;
diff --git a/src/signaler.cpp b/src/signaler.cpp
index e89f45a..e7191d3 100644
--- a/src/signaler.cpp
+++ b/src/signaler.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/signaler.hpp b/src/signaler.hpp
index dd474d9..4466c98 100644
--- a/src/signaler.hpp
+++ b/src/signaler.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index a4d89db..a59ba69 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -36,7 +38,6 @@
#include "socket_base.hpp"
#include "tcp_listener.hpp"
#include "ipc_listener.hpp"
-#include "vtcp_listener.hpp"
#include "tcp_connecter.hpp"
#include "io_thread.hpp"
#include "session_base.hpp"
@@ -60,7 +61,6 @@
#include "xrep.hpp"
#include "xpub.hpp"
#include "xsub.hpp"
-#include "router.hpp"
bool zmq::socket_base_t::check_tag ()
{
@@ -106,9 +106,6 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
case ZMQ_XSUB:
s = new (std::nothrow) xsub_t (parent_, tid_);
break;
- case ZMQ_ROUTER:
- s = new (std::nothrow) router_t (parent_, tid_);
- break;
default:
errno = EINVAL;
return NULL;
@@ -124,8 +121,6 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
destroyed (false),
last_tsc (0),
ticks (0),
- rcvlabel (false),
- rcvcmd (false),
rcvmore (false)
{
}
@@ -176,8 +171,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
// First check out whether the protcol is something we are aware of.
if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" &&
- protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys" &&
- protocol_ != "vtcp") {
+ protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys") {
errno = EPROTONOSUPPORT;
return -1;
}
@@ -191,14 +185,6 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
}
#endif
- // If 0MQ is not compiled with VTCP, vtcp transport is not avaialble.
-#if !defined ZMQ_HAVE_VTCP
- if (protocol_ == "vtcp") {
- errno = EPROTONOSUPPORT;
- return -1;
- }
-#endif
-
// IPC transport is not available on Windows and OpenVMS.
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
if (protocol_ == "ipc") {
@@ -265,26 +251,6 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return -1;
}
- if (option_ == ZMQ_RCVLABEL) {
- if (*optvallen_ < sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
- *((int*) optval_) = rcvlabel ? 1 : 0;
- *optvallen_ = sizeof (int);
- return 0;
- }
-
- if (option_ == ZMQ_RCVCMD) {
- if (*optvallen_ < sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
- *((int*) optval_) = rcvcmd ? 1 : 0;
- *optvallen_ = sizeof (int);
- return 0;
- }
-
if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
@@ -392,21 +358,6 @@ int zmq::socket_base_t::bind (const char *addr_)
}
#endif
-#if defined ZMQ_HAVE_VTCP
- if (protocol == "vtcp") {
- vtcp_listener_t *listener = new (std::nothrow) vtcp_listener_t (
- io_thread, this, options);
- alloc_assert (listener);
- int rc = listener->set_address (address.c_str ());
- if (rc != 0) {
- delete listener;
- return -1;
- }
- launch_child (listener);
- return 0;
- }
-#endif
-
zmq_assert (false);
return -1;
}
@@ -524,12 +475,8 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
return -1;
// At this point we impose the flags on the message.
- if (flags_ & ZMQ_SNDLABEL)
- msg_->set_flags (msg_t::label);
if (flags_ & ZMQ_SNDMORE)
msg_->set_flags (msg_t::more);
- if (flags_ & ZMQ_SNDCMD)
- msg_->set_flags (msg_t::command);
// Try to send the message.
rc = xsend (msg_, flags_);
@@ -898,13 +845,16 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)
void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
- rcvlabel = msg_->flags () & msg_t::label;
- if (rcvlabel)
- msg_->reset_flags (msg_t::label);
+ // Test whether IDENTITY flag is valid for this socket type.
+ if (unlikely (msg_->flags () & msg_t::identity)) {
+ zmq_assert (options.recv_identity);
+printf ("identity recvd\n");
+ }
+
+
+ // Remove MORE flag.
rcvmore = msg_->flags () & msg_t::more ? true : false;
if (rcvmore)
msg_->reset_flags (msg_t::more);
- rcvcmd = msg_->flags () & msg_t::command ? true : false;
- if (rcvcmd)
- msg_->reset_flags (msg_t::command);
}
+
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index c7c86e7..bc978ba 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -183,12 +185,6 @@ namespace zmq
// Number of messages received since last command processing.
int ticks;
- // True if the last message received had LABEL flag set.
- bool rcvlabel;
-
- // True if the last message received had COMMAND flag set.
- bool rcvcmd;
-
// True if the last message received had MORE flag set.
bool rcvmore;
diff --git a/src/stdint.hpp b/src/stdint.hpp
index 73186d3..b78afcd 100644
--- a/src/stdint.hpp
+++ b/src/stdint.hpp
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index 2647795..11ec264 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp
index 92fc55f..6d122ed 100644
--- a/src/stream_engine.hpp
+++ b/src/stream_engine.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/sub.cpp b/src/sub.cpp
index d9f2f2e..3249aea 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/sub.hpp b/src/sub.hpp
index 7d3cf0b..bb46641 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/tcp_address.cpp b/src/tcp_address.cpp
index 0aa564a..1b7577f 100644
--- a/src/tcp_address.cpp
+++ b/src/tcp_address.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/tcp_address.hpp b/src/tcp_address.hpp
index 58ac540..d4768c7 100644
--- a/src/tcp_address.hpp
+++ b/src/tcp_address.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index fe99252..75079da 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp
index d1a93cd..e420c82 100644
--- a/src/tcp_connecter.hpp
+++ b/src/tcp_connecter.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 9b6068c..0b7a90d 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2010 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp
index 60713e3..e712998 100644
--- a/src/tcp_listener.hpp
+++ b/src/tcp_listener.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2010 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/thread.cpp b/src/thread.cpp
index d1c6729..00628e5 100644
--- a/src/thread.cpp
+++ b/src/thread.cpp
@@ -1,4 +1,5 @@
/*
+ Copyright (c) 2010-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
diff --git a/src/thread.hpp b/src/thread.hpp
index f3f5f8d..52769b1 100644
--- a/src/thread.hpp
+++ b/src/thread.hpp
@@ -1,4 +1,5 @@
/*
+ Copyright (c) 2010-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
diff --git a/src/trie.cpp b/src/trie.cpp
index cd6cb7b..9718c77 100644
--- a/src/trie.cpp
+++ b/src/trie.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/trie.hpp b/src/trie.hpp
index a2b55c6..76e4fd9 100644
--- a/src/trie.hpp
+++ b/src/trie.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/vtcp_connecter.cpp b/src/vtcp_connecter.cpp
deleted file mode 100644
index 5dc147e..0000000
--- a/src/vtcp_connecter.cpp
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "vtcp_connecter.hpp"
-
-#if defined ZMQ_HAVE_VTCP
-
-#include <new>
-#include <string>
-
-#include "stream_engine.hpp"
-#include "io_thread.hpp"
-#include "platform.hpp"
-#include "random.hpp"
-#include "likely.hpp"
-#include "err.hpp"
-#include "ip.hpp"
-
-#if defined ZMQ_HAVE_WINDOWS
-#include "windows.hpp"
-#else
-#include <unistd.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <arpa/inet.h>
-#include <netinet/tcp.h>
-#include <netinet/in.h>
-#include <netdb.h>
-#include <fcntl.h>
-#ifdef ZMQ_HAVE_OPENVMS
-#include <ioctl.h>
-#endif
-#endif
-
-zmq::vtcp_connecter_t::vtcp_connecter_t (class io_thread_t *io_thread_,
- class session_base_t *session_, const options_t &options_,
- const char *address_, bool wait_) :
- own_t (io_thread_, options_),
- io_object_t (io_thread_),
- s (retired_fd),
- handle_valid (false),
- wait (wait_),
- session (session_),
- current_reconnect_ivl(options.reconnect_ivl)
-{
- subport = 0;
-
- int rc = set_address (address_);
- zmq_assert (rc == 0);
-}
-
-zmq::vtcp_connecter_t::~vtcp_connecter_t ()
-{
- if (wait)
- cancel_timer (reconnect_timer_id);
- if (handle_valid)
- rm_fd (handle);
-
- if (s != retired_fd)
- close ();
-}
-
-int zmq::vtcp_connecter_t::set_address (const char *addr_)
-{
- const char *delimiter = strrchr (addr_, '.');
- if (!delimiter) {
- delimiter = strrchr (addr_, ':');
- if (!delimiter) {
- errno = EINVAL;
- return -1;
- }
- std::string addr_str (addr_, delimiter - addr_);
- addr_str += ":9220";
- std::string subport_str (delimiter + 1);
- subport = (vtcp_subport_t) atoi (subport_str.c_str ());
- int rc = address.resolve (addr_str.c_str (), false, true);
- if (rc != 0)
- return -1;
- }
- else {
- std::string addr_str (addr_, delimiter - addr_);
- std::string subport_str (delimiter + 1);
- subport = (vtcp_subport_t) atoi (subport_str.c_str ());
- int rc = address.resolve (addr_str.c_str (), false, true);
- if (rc != 0)
- return -1;
- }
-
- return 0;
-}
-
-void zmq::vtcp_connecter_t::process_plug ()
-{
- if (wait)
- add_reconnect_timer();
- else
- start_connecting ();
-}
-
-void zmq::vtcp_connecter_t::in_event ()
-{
- // We are not polling for incomming data, so we are actually called
- // because of error here. However, we can get error on out event as well
- // on some platforms, so we'll simply handle both events in the same way.
- out_event ();
-}
-
-void zmq::vtcp_connecter_t::out_event ()
-{
- fd_t fd = connect ();
- rm_fd (handle);
- handle_valid = false;
-
- // Handle the error condition by attempt to reconnect.
- if (fd == retired_fd) {
- close ();
- wait = true;
- add_reconnect_timer();
- return;
- }
-
- // Create the engine object for this connection.
- stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options);
- alloc_assert (engine);
-
- // Attach the engine to the corresponding session object.
- send_attach (session, engine);
-
- // Shut the connecter down.
- terminate ();
-}
-
-void zmq::vtcp_connecter_t::timer_event (int id_)
-{
- zmq_assert (id_ == reconnect_timer_id);
- wait = false;
- start_connecting ();
-}
-
-void zmq::vtcp_connecter_t::start_connecting ()
-{
- // Open the connecting socket.
- int rc = open ();
-
- // Handle error condition by eventual reconnect.
- if (unlikely (rc != 0)) {
- errno_assert (false);
- wait = true;
- add_reconnect_timer();
- return;
- }
-
- // Connection establishment may be dealyed. Poll for its completion.
- handle = add_fd (s);
- handle_valid = true;
- set_pollout (handle);
-}
-
-void zmq::vtcp_connecter_t::add_reconnect_timer()
-{
- add_timer (get_new_reconnect_ivl(), reconnect_timer_id);
-}
-
-int zmq::vtcp_connecter_t::get_new_reconnect_ivl ()
-{
- // The new interval is the current interval + random value.
- int this_interval = current_reconnect_ivl +
- (generate_random () % options.reconnect_ivl);
-
- // Only change the current reconnect interval if the maximum reconnect
- // interval was set and if it's larger than the reconnect interval.
- if (options.reconnect_ivl_max > 0 &&
- options.reconnect_ivl_max > options.reconnect_ivl) {
-
- // Calculate the next interval
- current_reconnect_ivl = current_reconnect_ivl * 2;
- if(current_reconnect_ivl >= options.reconnect_ivl_max) {
- current_reconnect_ivl = options.reconnect_ivl_max;
- }
- }
- return this_interval;
-}
-
-int zmq::vtcp_connecter_t::open ()
-{
- zmq_assert (s == retired_fd);
-
- // Start the connection procedure.
- sockaddr_in *paddr = (sockaddr_in*) address.addr ();
- s = vtcp_connect (paddr->sin_addr.s_addr, ntohs (paddr->sin_port));
-
- // Connect was successfull immediately.
- if (s != retired_fd)
- return 0;
-
- // Asynchronous connect was launched.
- if (errno == EINPROGRESS) {
- errno = EAGAIN;
- return -1;
- }
-
- // Error occured.
- return -1;
-}
-
-zmq::fd_t zmq::vtcp_connecter_t::connect ()
-{
- int rc = vtcp_acceptc (s, subport);
- if (rc != 0) {
- int err = errno;
- close ();
- errno = err;
- return retired_fd;
- }
-
- tune_tcp_socket (s);
-
- fd_t result = s;
- s = retired_fd;
- return result;
-}
-
-int zmq::vtcp_connecter_t::close ()
-{
- zmq_assert (s != retired_fd);
- int rc = ::close (s);
- if (rc != 0)
- return -1;
- s = retired_fd;
- return 0;
-}
-
-#endif
-
diff --git a/src/vtcp_connecter.hpp b/src/vtcp_connecter.hpp
deleted file mode 100644
index fe5260e..0000000
--- a/src/vtcp_connecter.hpp
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __VTCP_CONNECTER_HPP_INCLUDED__
-#define __VTCP_CONNECTER_HPP_INCLUDED__
-
-#include "platform.hpp"
-
-#if defined ZMQ_HAVE_VTCP
-
-#include <vtcp.h>
-
-#include "fd.hpp"
-#include "own.hpp"
-#include "stdint.hpp"
-#include "io_object.hpp"
-#include "tcp_address.hpp"
-
-namespace zmq
-{
-
- class vtcp_connecter_t : public own_t, public io_object_t
- {
- public:
-
- // If 'delay' is true connecter first waits for a while, then starts
- // connection process.
- vtcp_connecter_t (class io_thread_t *io_thread_,
- class session_base_t *session_, const options_t &options_,
- const char *address_, bool delay_);
- ~vtcp_connecter_t ();
-
- private:
-
- // ID of the timer used to delay the reconnection.
- enum {reconnect_timer_id = 1};
-
- // Handlers for incoming commands.
- void process_plug ();
-
- // Handlers for I/O events.
- void in_event ();
- void out_event ();
- void timer_event (int id_);
-
- // Internal function to start the actual connection establishment.
- void start_connecting ();
-
- // Internal function to add a reconnect timer
- void add_reconnect_timer();
-
- // Internal function to return a reconnect backoff delay.
- // Will modify the current_reconnect_ivl used for next call
- // Returns the currently used interval
- int get_new_reconnect_ivl ();
-
- // Set address to connect to.
- int set_address (const char *addr_);
-
- // Open TCP connecting socket. Returns -1 in case of error,
- // 0 if connect was successfull immediately and 1 if async connect
- // was launched.
- int open ();
-
- // Close the connecting socket.
- int close ();
-
- // Get the file descriptor of newly created connection. Returns
- // retired_fd if the connection was unsuccessfull.
- fd_t connect ();
-
- // Address to connect to.
- tcp_address_t address;
- vtcp_subport_t subport;
-
- // Underlying socket.
- fd_t s;
-
- // Handle corresponding to the listening socket.
- handle_t handle;
-
- // If true file descriptor is registered with the poller and 'handle'
- // contains valid value.
- bool handle_valid;
-
- // If true, connecter is waiting a while before trying to connect.
- bool wait;
-
- // Reference to the session we belong to.
- class session_base_t *session;
-
- // Current reconnect ivl, updated for backoff strategy
- int current_reconnect_ivl;
-
- vtcp_connecter_t (const vtcp_connecter_t&);
- const vtcp_connecter_t &operator = (const vtcp_connecter_t&);
- };
-
-}
-
-#endif
-
-#endif
diff --git a/src/vtcp_listener.cpp b/src/vtcp_listener.cpp
deleted file mode 100644
index 7e496e5..0000000
--- a/src/vtcp_listener.cpp
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "vtcp_listener.hpp"
-
-#if defined ZMQ_HAVE_VTCP
-
-#include <string>
-#include <string.h>
-#include <vtcp.h>
-
-#include "stream_engine.hpp"
-#include "session_base.hpp"
-#include "stdint.hpp"
-#include "err.hpp"
-#include "ip.hpp"
-
-zmq::vtcp_listener_t::vtcp_listener_t (io_thread_t *io_thread_,
- socket_base_t *socket_, options_t &options_) :
- own_t (io_thread_, options_),
- io_object_t (io_thread_),
- s (retired_fd),
- socket (socket_)
-{
-}
-
-zmq::vtcp_listener_t::~vtcp_listener_t ()
-{
- if (s != retired_fd) {
- int rc = ::close (s);
- errno_assert (rc == 0);
- s = retired_fd;
- }
-}
-
-int zmq::vtcp_listener_t::set_address (const char *addr_)
-{
- // VTCP doesn't allow for binding to a specific interface. Connection
- // string has to begin with *: (INADDR_ANY).
- if (strlen (addr_) < 2 || addr_ [0] != '*' || addr_ [1] != ':') {
- errno = EADDRNOTAVAIL;
- return -1;
- }
-
- // Parse port and subport.
- uint16_t port;
- uint32_t subport;
- const char *delimiter = strrchr (addr_, '.');
- if (!delimiter) {
- port = 9220;
- subport = (uint32_t) atoi (addr_ + 2);
- }
- else {
- std::string port_str (addr_ + 2, delimiter - addr_ - 2);
- std::string subport_str (delimiter + 1);
- port = (uint16_t) atoi (port_str.c_str ());
- subport = (uint32_t) atoi (subport_str.c_str ());
- }
-
- // Start listening.
- s = vtcp_bind (port, subport);
- if (s == retired_fd)
- return -1;
-
- return 0;
-}
-
-void zmq::vtcp_listener_t::process_plug ()
-{
- // Start polling for incoming connections.
- handle = add_fd (s);
- set_pollin (handle);
-}
-
-void zmq::vtcp_listener_t::process_term (int linger_)
-{
- rm_fd (handle);
- own_t::process_term (linger_);
-}
-
-void zmq::vtcp_listener_t::in_event ()
-{
- fd_t fd = vtcp_acceptb (s);
- if (fd == retired_fd)
- return;
-
- tune_tcp_socket (fd);
-
- // Create the engine object for this connection.
- stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options);
- alloc_assert (engine);
-
- // Choose I/O thread to run connecter in. Given that we are already
- // running in an I/O thread, there must be at least one available.
- io_thread_t *io_thread = choose_io_thread (options.affinity);
- zmq_assert (io_thread);
-
- // Create and launch a session object.
- session_base_t *session = session_base_t::create (io_thread, false, socket,
- options, NULL, NULL);
- alloc_assert (session);
- session->inc_seqnum ();
- launch_child (session);
- send_attach (session, engine, false);
-}
-
-#endif
diff --git a/src/vtcp_listener.hpp b/src/vtcp_listener.hpp
deleted file mode 100644
index 78f3b51..0000000
--- a/src/vtcp_listener.hpp
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_VTCP_LISTENER_HPP_INCLUDED__
-#define __ZMQ_VTCP_LISTENER_HPP_INCLUDED__
-
-#include "platform.hpp"
-
-#if defined ZMQ_HAVE_VTCP
-
-#include "own.hpp"
-#include "io_object.hpp"
-#include "fd.hpp"
-
-namespace zmq
-{
-
- class vtcp_listener_t : public own_t, public io_object_t
- {
- public:
-
- vtcp_listener_t (class io_thread_t *io_thread_,
- class socket_base_t *socket_, class options_t &options_);
- ~vtcp_listener_t ();
-
- int set_address (const char *addr_);
-
- private:
-
- // Handlers for incoming commands.
- void process_plug ();
- void process_term (int linger_);
-
- // Handlers for I/O events.
- void in_event ();
-
- // VTCP listener socket.
- fd_t s;
-
- // Handle corresponding to the listening socket.
- handle_t handle;
-
- // Socket the listerner belongs to.
- class socket_base_t *socket;
-
- vtcp_listener_t (const vtcp_listener_t&);
- const vtcp_listener_t &operator = (const vtcp_listener_t&);
- };
-
-}
-
-#endif
-
-#endif
diff --git a/src/windows.hpp b/src/windows.hpp
index 8f39914..5e986b2 100644
--- a/src/windows.hpp
+++ b/src/windows.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/wire.hpp b/src/wire.hpp
index bc9dfe5..b0f4e0e 100644
--- a/src/wire.hpp
+++ b/src/wire.hpp
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/xpub.cpp b/src/xpub.cpp
index a245fea..5d7a97c 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -1,6 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2011 VMware, Inc.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -100,8 +101,7 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
{
- bool msg_more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool msg_more = msg_->flags () & msg_t::more ? true : false;
// For the first part of multi-part message, find the matching pipes.
if (!more)
diff --git a/src/xpub.hpp b/src/xpub.hpp
index b410e6c..14ffc58 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 9f2a947..ea19e56 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -35,9 +37,14 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
{
options.type = ZMQ_XREP;
+ // TODO: Uncomment the following line when XREP will become true XREP
+ // rather than generic router socket.
// If peer disconnect there's noone to send reply to anyway. We can drop
// all the outstanding requests from that peer.
- options.delay_on_disconnect = false;
+ // options.delay_on_disconnect = false;
+
+ options.send_identity = true;
+ options.recv_identity = true;
prefetched_msg.init ();
}
@@ -52,33 +59,22 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (pipe_);
- // Generate a new peer ID. Take care to avoid duplicates.
- outpipes_t::iterator it = outpipes.lower_bound (next_peer_id);
- if (!outpipes.empty ()) {
- while (true) {
- if (it == outpipes.end ())
- it = outpipes.begin ();
- if (it->first != next_peer_id)
- break;
- ++next_peer_id;
- ++it;
- }
- }
+ // Generate a new unique peer identity.
+ unsigned char buf [5];
+ buf [0] = 0;
+ put_uint32 (buf + 1, next_peer_id);
+ blob_t identity (buf, 5);
+ ++next_peer_id;
// Add the pipe to the map out outbound pipes.
outpipe_t outpipe = {pipe_, true};
bool ok = outpipes.insert (outpipes_t::value_type (
- next_peer_id, outpipe)).second;
+ identity, outpipe)).second;
zmq_assert (ok);
// Add the pipe to the list of inbound pipes.
- pipe_->set_pipe_id (next_peer_id);
- fq.attach (pipe_);
-
- // Advance next peer ID so that if new connection is dropped shortly after
- // its creation we don't accidentally get two subsequent peers with
- // the same ID.
- ++next_peer_id;
+ pipe_->set_identity (identity);
+ fq.attach (pipe_);
}
void zmq::xrep_t::xterminated (pipe_t *pipe_)
@@ -125,30 +121,29 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
// If we have malformed message (prefix with no subsequent message)
// then just silently ignore it.
// TODO: The connections should be killed instead.
- if (msg_->flags () & msg_t::label) {
+ if (msg_->flags () & msg_t::more) {
more_out = true;
- // Find the pipe associated with the peer ID stored in the prefix.
+ // Find the pipe associated with the identity stored in the prefix.
// If there's no such pipe just silently ignore the message.
- if (msg_->size () == 4) {
- uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
- outpipes_t::iterator it = outpipes.find (peer_id);
-
- if (it != outpipes.end ()) {
- current_out = it->second.pipe;
- msg_t empty;
- int rc = empty.init ();
- errno_assert (rc == 0);
- if (!current_out->check_write (&empty)) {
- it->second.active = false;
- more_out = false;
- current_out = NULL;
- }
- rc = empty.close ();
- errno_assert (rc == 0);
+ blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
+ outpipes_t::iterator it = outpipes.find (identity);
+
+ if (it != outpipes.end ()) {
+ current_out = it->second.pipe;
+ msg_t empty;
+ int rc = empty.init ();
+ errno_assert (rc == 0);
+ if (!current_out->check_write (&empty)) {
+ it->second.active = false;
+ more_out = false;
+ current_out = NULL;
}
+ rc = empty.close ();
+ errno_assert (rc == 0);
}
+
}
int rc = msg_->close ();
@@ -159,7 +154,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
}
// Check whether this is the last part of the message.
- more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more_out = msg_->flags () & msg_t::more ? true : false;
// Push the message into the pipe. If there's no out pipe, just drop it.
if (current_out) {
@@ -189,7 +184,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
if (prefetched) {
int rc = msg_->move (prefetched_msg);
errno_assert (rc == 0);
- more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more_in = msg_->flags () & msg_t::more ? true : false;
prefetched = false;
return 0;
}
@@ -200,9 +195,40 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
if (rc != 0)
return -1;
+ // If identity is received, change the key assigned to the pipe.
+ if (unlikely (msg_->flags () & msg_t::identity)) {
+ zmq_assert (!more_in);
+
+ // Empty identity means we can preserve the auto-generated identity.
+ if (msg_->size () != 0) {
+
+ // Actual change of the identity.
+ outpipes_t::iterator it = outpipes.begin ();
+ while (it != outpipes.end ()) {
+ if (it->second.pipe == pipe) {
+ blob_t identity ((unsigned char*) msg_->data (),
+ msg_->size ());
+ pipe->set_identity (identity);
+ outpipes.erase (it);
+ outpipe_t outpipe = {pipe, true};
+ outpipes.insert (outpipes_t::value_type (identity,
+ outpipe));
+ break;
+ }
+ ++it;
+ }
+ zmq_assert (it != outpipes.end ());
+ }
+
+ // After processing the identity, try to get the next message.
+ rc = fq.recvpipe (msg_, flags_, &pipe);
+ if (rc != 0)
+ return -1;
+ }
+
// If we are in the middle of reading a message, just return the next part.
if (more_in) {
- more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more_in = msg_->flags () & msg_t::more ? true : false;
return 0;
}
@@ -213,10 +239,12 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
prefetched = true;
rc = msg_->close ();
errno_assert (rc == 0);
- rc = msg_->init_size (4);
+
+ blob_t identity = pipe->get_identity ();
+ rc = msg_->init_size (identity.size ());
errno_assert (rc == 0);
- put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ());
- msg_->set_flags (msg_t::label);
+ memcpy (msg_->data (), identity.data (), identity.size ());
+ msg_->set_flags (msg_t::more);
return 0;
}
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 562f87d..fc02b11 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -26,6 +28,7 @@
#include "socket_base.hpp"
#include "session_base.hpp"
#include "stdint.hpp"
+#include "blob.hpp"
#include "msg.hpp"
#include "fq.hpp"
@@ -77,7 +80,7 @@ namespace zmq
};
// Outbound pipes indexed by the peer IDs.
- typedef std::map <uint32_t, outpipe_t> outpipes_t;
+ typedef std::map <blob_t, outpipe_t> outpipes_t;
outpipes_t outpipes;
// The pipe we are currently writing to.
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 79b3b94..91317f7 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -27,9 +28,14 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) :
{
options.type = ZMQ_XREQ;
+ // TODO: Uncomment the following line when XREQ will become true XREQ
+ // rather than generic dealer socket.
// If the socket is closing we can drop all the outbound requests. There'll
// be noone to receive the replies anyway.
- options.delay_on_close = false;
+ // options.delay_on_close = false;
+
+ options.send_identity = true;
+ options.recv_identity = true;
}
zmq::xreq_t::~xreq_t ()
@@ -50,7 +56,15 @@ int zmq::xreq_t::xsend (msg_t *msg_, int flags_)
int zmq::xreq_t::xrecv (msg_t *msg_, int flags_)
{
- return fq.recv (msg_, flags_);
+ // XREQ socket doesn't use identities. We can safely drop it and
+ while (true) {
+ int rc = fq.recv (msg_, flags_);
+ if (rc != 0)
+ return rc;
+ if (likely (!(msg_->flags () & msg_t::identity)))
+ break;
+ }
+ return 0;
}
bool zmq::xreq_t::xhas_in ()
diff --git a/src/xreq.hpp b/src/xreq.hpp
index d7e28c4..1d979c5 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -1,8 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
diff --git a/src/xsub.cpp b/src/xsub.cpp
index b24f082..aae2654 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -1,6 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2011 VMware, Inc.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -116,7 +117,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
int rc = msg_->move (message);
errno_assert (rc == 0);
has_message = false;
- more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more = msg_->flags () & msg_t::more ? true : false;
return 0;
}
@@ -136,14 +137,13 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (more || !options.filter || match (msg_)) {
- more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more = msg_->flags () & msg_t::more ? true : false;
return 0;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
- while (msg_->flags () & (msg_t::more | msg_t::label)) {
+ while (msg_->flags () & msg_t::more) {
rc = fq.recv (msg_, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
@@ -183,7 +183,7 @@ bool zmq::xsub_t::xhas_in ()
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
- while (message.flags () & (msg_t::more | msg_t::label)) {
+ while (message.flags () & msg_t::more) {
rc = fq.recv (&message, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
diff --git a/src/xsub.hpp b/src/xsub.hpp
index 310df6e..1eac390 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ypipe.hpp b/src/ypipe.hpp
index da4e85a..74a96bc 100644
--- a/src/ypipe.hpp
+++ b/src/ypipe.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/yqueue.hpp b/src/yqueue.hpp
index e436ea4..1c83cb8 100644
--- a/src/yqueue.hpp
+++ b/src/yqueue.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 0f54fab..b06b122 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -1,4 +1,5 @@
/*
+ Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
@@ -603,7 +604,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
}
#else
int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
- if (unlikely (rc == -1) {
+ if (unlikely (rc == -1)) {
if (errno == EINTR || errno == EBADF)
return -1;
errno_assert (false);
diff --git a/src/zmq_utils.cpp b/src/zmq_utils.cpp
index c7eb60f..8f34134 100644
--- a/src/zmq_utils.cpp
+++ b/src/zmq_utils.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 6ed3762..5f0cfc1 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -7,7 +7,6 @@ noinst_PROGRAMS = test_pair_inproc \
test_reqrep_tcp \
test_hwm \
test_reqrep_device \
- test_reqrep_drop \
test_sub_forward \
test_invalid_rep
@@ -24,7 +23,6 @@ test_reqrep_inproc_SOURCES = test_reqrep_inproc.cpp testutil.hpp
test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp
test_hwm_SOURCES = test_hwm.cpp
test_reqrep_device_SOURCES = test_reqrep_device.cpp
-test_reqrep_drop_SOURCES = test_reqrep_drop.cpp
test_sub_forward_SOURCES = test_sub_forward.cpp
test_invalid_rep_SOURCES = test_invalid_rep.cpp
diff --git a/tests/test_hwm.cpp b/tests/test_hwm.cpp
index 10b26e1..d887b31 100644
--- a/tests/test_hwm.cpp
+++ b/tests/test_hwm.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -20,11 +20,14 @@
#include <assert.h>
+#include <stdio.h>
#include "testutil.hpp"
int main (int argc, char *argv [])
{
+ fprintf (stderr, "test_hwm running...\n");
+
void *ctx = zmq_init (1);
assert (ctx);
diff --git a/tests/test_invalid_rep.cpp b/tests/test_invalid_rep.cpp
index 2657c20..9c77cc4 100644
--- a/tests/test_invalid_rep.cpp
+++ b/tests/test_invalid_rep.cpp
@@ -1,6 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2011 VMware, Inc.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -20,9 +21,12 @@
#include "../include/zmq.h"
#include <assert.h>
+#include <stdio.h>
int main (int argc, char *argv [])
{
+ fprintf (stderr, "test_invalid_rep running...\n");
+
// Create REQ/XREP wiring.
void *ctx = zmq_init (1);
assert (ctx);
@@ -45,25 +49,26 @@ int main (int argc, char *argv [])
assert (rc == 1);
// Receive the request.
- char addr [4];
- char seqn [4];
+ char addr [32];
+ int addr_size;
+ char bottom [1];
char body [1];
- rc = zmq_recv (xrep_socket, addr, sizeof (addr), 0);
- assert (rc == 4);
- rc = zmq_recv (xrep_socket, seqn, sizeof (seqn), 0);
- assert (rc == 4);
+ addr_size = zmq_recv (xrep_socket, addr, sizeof (addr), 0);
+ assert (addr_size >= 0);
+ rc = zmq_recv (xrep_socket, bottom, sizeof (bottom), 0);
+ assert (rc == 0);
rc = zmq_recv (xrep_socket, body, sizeof (body), 0);
assert (rc == 1);
// Send invalid reply.
- rc = zmq_send (xrep_socket, addr, 4, 0);
- assert (rc == 4);
+ rc = zmq_send (xrep_socket, addr, addr_size, 0);
+ assert (rc == addr_size);
// Send valid reply.
- rc = zmq_send (xrep_socket, addr, 4, ZMQ_SNDLABEL);
- assert (rc == 4);
- rc = zmq_send (xrep_socket, seqn, 4, ZMQ_SNDLABEL);
- assert (rc == 4);
+ rc = zmq_send (xrep_socket, addr, addr_size, ZMQ_SNDMORE);
+ assert (rc == addr_size);
+ rc = zmq_send (xrep_socket, bottom, 0, ZMQ_SNDMORE);
+ assert (rc == 0);
rc = zmq_send (xrep_socket, "b", 1, 0);
assert (rc == 1);
diff --git a/tests/test_pair_inproc.cpp b/tests/test_pair_inproc.cpp
index 6194f2a..6705cc5 100644
--- a/tests/test_pair_inproc.cpp
+++ b/tests/test_pair_inproc.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -19,10 +19,13 @@
*/
#include <assert.h>
+#include <stdio.h>
#include "testutil.hpp"
int main (int argc, char *argv [])
{
+ fprintf (stderr, "test_pair_inproc running...\n");
+
void *ctx = zmq_init (0);
assert (ctx);
diff --git a/tests/test_pair_ipc.cpp b/tests/test_pair_ipc.cpp
index 2c83a69..96a265f 100644
--- a/tests/test_pair_ipc.cpp
+++ b/tests/test_pair_ipc.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -19,10 +19,13 @@
*/
#include <assert.h>
+#include <stdio.h>
#include "testutil.hpp"
int main (int argc, char *argv [])
{
+ fprintf (stderr, "test_pair_ipc running...\n");
+
void *ctx = zmq_init (1);
assert (ctx);
diff --git a/tests/test_pair_tcp.cpp b/tests/test_pair_tcp.cpp
index 8ecfef5..464be5a 100644
--- a/tests/test_pair_tcp.cpp
+++ b/tests/test_pair_tcp.cpp
@@ -1,6 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2011 iMatix Corporation
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -19,10 +20,13 @@
*/
#include <assert.h>
+#include <stdio.h>
#include "testutil.hpp"
int main (int argc, char *argv [])
{
+ fprintf (stderr, "test_pair_tcp running...\n");
+
void *ctx = zmq_init (1);
assert (ctx);
diff --git a/tests/test_reqrep_device.cpp b/tests/test_reqrep_device.cpp
index f6f06c9..d861cec 100644
--- a/tests/test_reqrep_device.cpp
+++ b/tests/test_reqrep_device.cpp
@@ -1,6 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2011 VMware, Inc.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -20,11 +21,14 @@
#include <assert.h>
#include <string.h>
+#include <stdio.h>
#include "../include/zmq.h"
int main (int argc, char *argv [])
{
+ fprintf (stderr, "test_reqrep_device running...\n");
+
void *ctx = zmq_init (1);
assert (ctx);
@@ -63,15 +67,11 @@ int main (int argc, char *argv [])
assert (rc == 0);
rc = zmq_recvmsg (xrep, &msg, 0);
assert (rc >= 0);
- int rcvlabel;
- size_t sz = sizeof (rcvlabel);
- rc = zmq_getsockopt (xrep, ZMQ_RCVLABEL, &rcvlabel, &sz);
- assert (rc == 0);
int rcvmore;
+ size_t sz = sizeof (rcvmore);
rc = zmq_getsockopt (xrep, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
- rc = zmq_sendmsg (xreq, &msg,
- (rcvlabel ? ZMQ_SNDLABEL : 0) | (rcvmore ? ZMQ_SNDMORE : 0));
+ rc = zmq_sendmsg (xreq, &msg, rcvmore ? ZMQ_SNDMORE : 0);
assert (rc >= 0);
}
@@ -80,21 +80,14 @@ int main (int argc, char *argv [])
rc = zmq_recv (rep, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "ABC", 3) == 0);
- int rcvlabel;
- size_t sz = sizeof (rcvlabel);
- rc = zmq_getsockopt (rep, ZMQ_RCVLABEL, &rcvlabel, &sz);
- assert (rc == 0);
- assert (!rcvlabel);
int rcvmore;
+ size_t sz = sizeof (rcvmore);
rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (rcvmore);
rc = zmq_recv (rep, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "DEF", 3) == 0);
- rc = zmq_getsockopt (rep, ZMQ_RCVLABEL, &rcvlabel, &sz);
- assert (rc == 0);
- assert (!rcvlabel);
rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (!rcvmore);
@@ -112,15 +105,10 @@ int main (int argc, char *argv [])
assert (rc == 0);
rc = zmq_recvmsg (xreq, &msg, 0);
assert (rc >= 0);
- int rcvlabel;
- size_t sz = sizeof (rcvlabel);
- rc = zmq_getsockopt (xreq, ZMQ_RCVLABEL, &rcvlabel, &sz);
- assert (rc == 0);
int rcvmore;
rc = zmq_getsockopt (xreq, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
- rc = zmq_sendmsg (xrep, &msg,
- (rcvlabel ? ZMQ_SNDLABEL : 0) | (rcvmore ? ZMQ_SNDMORE : 0));
+ rc = zmq_sendmsg (xrep, &msg, rcvmore ? ZMQ_SNDMORE : 0);
assert (rc >= 0);
}
@@ -128,18 +116,12 @@ int main (int argc, char *argv [])
rc = zmq_recv (req, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "GHI", 3) == 0);
- rc = zmq_getsockopt (req, ZMQ_RCVLABEL, &rcvlabel, &sz);
- assert (rc == 0);
- assert (!rcvlabel);
rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (rcvmore);
rc = zmq_recv (req, buff, 3, 0);
assert (rc == 3);
assert (memcmp (buff, "JKL", 3) == 0);
- rc = zmq_getsockopt (req, ZMQ_RCVLABEL, &rcvlabel, &sz);
- assert (rc == 0);
- assert (!rcvlabel);
rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (!rcvmore);
diff --git a/tests/test_reqrep_drop.cpp b/tests/test_reqrep_drop.cpp
deleted file mode 100644
index 6531357..0000000
--- a/tests/test_reqrep_drop.cpp
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <assert.h>
-
-#include "../include/zmq.h"
-#include "../include/zmq_utils.h"
-
-int main (int argc, char *argv [])
-{
- void *ctx = zmq_init (1);
- assert (ctx);
-
- // Check whether requests are discarded because of disconnected requester.
-
- // Create a server.
- void *xrep = zmq_socket (ctx, ZMQ_XREP);
- assert (xrep);
- int rc = zmq_bind (xrep, "tcp://127.0.0.1:5560");
- assert (rc == 0);
-
- // Create a client.
- void *xreq = zmq_socket (ctx, ZMQ_XREQ);
- assert (xreq);
- rc = zmq_connect (xreq, "tcp://127.0.0.1:5560");
- assert (rc == 0);
-
- // Send requests.
- rc = zmq_send (xreq, "ABC", 3, 0);
- assert (rc == 3);
- rc = zmq_send (xreq, "DEF", 3, 0);
- assert (rc == 3);
-
- // Disconnect client.
- rc = zmq_close (xreq);
- assert (rc == 0);
-
- // Wait a while for disconnect to happen.
- zmq_sleep (1);
-
- // Try to receive a request -- it should have been discarded.
- char buff [3];
- rc = zmq_recv (xrep, buff, 3, ZMQ_DONTWAIT);
- assert (rc < 0);
- assert (errno == EAGAIN);
-
- // Clean up.
- rc = zmq_close (xrep);
- assert (rc == 0);
-
- // New test. Check whether reply is dropped because of HWM overflow.
-
- int one = 1;
- xreq = zmq_socket (ctx, ZMQ_XREQ);
- assert (xreq);
- rc = zmq_setsockopt (xreq, ZMQ_RCVHWM, &one, sizeof(one));
- assert (rc == 0);
- rc = zmq_bind (xreq, "inproc://a");
- assert (rc == 0);
-
- void *rep = zmq_socket (ctx, ZMQ_REP);
- assert (rep);
- rc = zmq_setsockopt (rep, ZMQ_SNDHWM, &one, sizeof(one));
- assert (rc == 0);
- rc = zmq_connect (rep, "inproc://a");
- assert (rc == 0);
-
- // Send request 1
- rc = zmq_send (xreq, buff, 1, 0);
- assert (rc == 1);
-
- // Send request 2
- rc = zmq_send (xreq, buff, 1, 0);
- assert (rc == 1);
-
- // Receive request 1
- rc = zmq_recv (rep, buff, 1, 0);
- assert (rc == 1);
-
- // Send request 3
- rc = zmq_send (xreq, buff, 1, 0);
- assert (rc == 1);
-
- // Send reply 1
- rc = zmq_send (rep, buff, 1, 0);
- assert (rc == 1);
-
- // Receive request 2
- rc = zmq_recv (rep, buff, 1, 0);
- assert (rc == 1);
-
- // Send reply 2
- rc = zmq_send (rep, buff, 1, 0);
- assert (rc == 1);
-
- // Receive request 3
- rc = zmq_recv (rep, buff, 1, 0);
- assert (rc == 1);
-
- // Send reply 3
- rc = zmq_send (rep, buff, 1, 0);
- assert (rc == 1);
-
- // Receive reply 1
- rc = zmq_recv (xreq, buff, 1, 0);
- assert (rc == 1);
-
- // Receive reply 2
- rc = zmq_recv (xreq, buff, 1, 0);
- assert (rc == 1);
-
- // Try to receive reply 3, it should have been dropped.
- rc = zmq_recv (xreq, buff, 1, ZMQ_DONTWAIT);
- assert (rc == -1 && errno == EAGAIN);
-
- // Clean up.
- rc = zmq_close (xreq);
- assert (rc == 0);
- rc = zmq_close (rep);
- assert (rc == 0);
-
- rc = zmq_term (ctx);
- assert (rc == 0);
-
- return 0 ;
-}
diff --git a/tests/test_reqrep_inproc.cpp b/tests/test_reqrep_inproc.cpp
index aeff7ef..f710968 100644
--- a/tests/test_reqrep_inproc.cpp
+++ b/tests/test_reqrep_inproc.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -19,10 +19,13 @@
*/
#include <assert.h>
+#include <stdio.h>
#include "testutil.hpp"
int main (int argc, char *argv [])
{
+ fprintf (stderr, "test_reqrep_inproc running...\n");
+
void *ctx = zmq_init (0);
assert (ctx);
diff --git a/tests/test_reqrep_ipc.cpp b/tests/test_reqrep_ipc.cpp
index af15998..fd9b28d 100644
--- a/tests/test_reqrep_ipc.cpp
+++ b/tests/test_reqrep_ipc.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -19,10 +19,13 @@
*/
#include <assert.h>
+#include <stdio.h>
#include "testutil.hpp"
int main (int argc, char *argv [])
{
+ fprintf (stderr, "test_reqrep_ipc running...\n");
+
void *ctx = zmq_init (1);
assert (ctx);
diff --git a/tests/test_reqrep_tcp.cpp b/tests/test_reqrep_tcp.cpp
index c713e26..1e6bbbb 100644
--- a/tests/test_reqrep_tcp.cpp
+++ b/tests/test_reqrep_tcp.cpp
@@ -1,6 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2011 iMatix Corporation
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -19,10 +20,13 @@
*/
#include <assert.h>
+#include <stdio.h>
#include "testutil.hpp"
int main (int argc, char *argv [])
{
+ fprintf (stderr, "test_reqrep_tcp running...\n");
+
void *ctx = zmq_init (1);
assert (ctx);
diff --git a/tests/test_shutdown_stress.cpp b/tests/test_shutdown_stress.cpp
index b3ee90f..811637c 100644
--- a/tests/test_shutdown_stress.cpp
+++ b/tests/test_shutdown_stress.cpp
@@ -1,6 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2011 iMatix Corporation
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -22,6 +23,7 @@
#include <assert.h>
#include <pthread.h>
#include <stddef.h>
+#include <stdio.h>
#define THREAD_COUNT 100
@@ -52,6 +54,8 @@ int main (int argc, char *argv [])
int rc;
pthread_t threads [THREAD_COUNT];
+ fprintf (stderr, "test_shutdown_stress running...\n");
+
for (j = 0; j != 10; j++) {
// Check the shutdown with many parallel I/O threads.
diff --git a/tests/test_sub_forward.cpp b/tests/test_sub_forward.cpp
index d69f923..36a7f4a 100644
--- a/tests/test_sub_forward.cpp
+++ b/tests/test_sub_forward.cpp
@@ -1,6 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2011 iMatix Corporation
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -19,12 +20,15 @@
*/
#include <assert.h>
+#include <stdio.h>
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
int main (int argc, char *argv [])
{
+ fprintf (stderr, "test_sub_forward running...\n");
+
void *ctx = zmq_init (1);
assert (ctx);
diff --git a/tests/test_timeo.cpp b/tests/test_timeo.cpp
index a8a3fc0..29ba8a0 100644
--- a/tests/test_timeo.cpp
+++ b/tests/test_timeo.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -21,6 +21,7 @@
#include <assert.h>
#include <string.h>
#include <pthread.h>
+#include <stdio.h>
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
@@ -45,6 +46,8 @@ extern "C"
int main (int argc, char *argv [])
{
+ fprintf (stderr, "test_timeo...\n");
+
void *ctx = zmq_init (1);
assert (ctx);
diff --git a/tests/testutil.hpp b/tests/testutil.hpp
index 57db3c4..e68dc32 100644
--- a/tests/testutil.hpp
+++ b/tests/testutil.hpp
@@ -1,4 +1,5 @@
/*
+ Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file