summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am7
-rw-r--r--src/array.hpp3
-rw-r--r--src/atomic_counter.hpp3
-rw-r--r--src/atomic_ptr.hpp3
-rw-r--r--src/blob.hpp35
-rw-r--r--src/clock.cpp4
-rw-r--r--src/clock.hpp4
-rw-r--r--src/command.hpp3
-rw-r--r--src/config.hpp3
-rw-r--r--src/ctx.cpp1
-rw-r--r--src/ctx.hpp3
-rw-r--r--src/decoder.cpp3
-rw-r--r--src/decoder.hpp3
-rw-r--r--src/devpoll.cpp3
-rw-r--r--src/devpoll.hpp3
-rw-r--r--src/dist.cpp10
-rw-r--r--src/dist.hpp4
-rw-r--r--src/encoder.cpp8
-rw-r--r--src/encoder.hpp3
-rw-r--r--src/epoll.cpp3
-rw-r--r--src/epoll.hpp3
-rw-r--r--src/err.cpp3
-rw-r--r--src/err.hpp3
-rw-r--r--src/fd.hpp2
-rw-r--r--src/fq.cpp6
-rw-r--r--src/fq.hpp3
-rw-r--r--src/i_engine.hpp3
-rw-r--r--src/i_poll_events.hpp3
-rw-r--r--src/io_object.cpp3
-rw-r--r--src/io_object.hpp3
-rw-r--r--src/io_thread.cpp3
-rw-r--r--src/io_thread.hpp3
-rw-r--r--src/ip.cpp3
-rw-r--r--src/ip.hpp3
-rw-r--r--src/ipc_address.cpp4
-rw-r--r--src/ipc_address.hpp4
-rw-r--r--src/ipc_connecter.cpp4
-rw-r--r--src/ipc_connecter.hpp4
-rw-r--r--src/ipc_listener.cpp4
-rw-r--r--src/ipc_listener.hpp4
-rw-r--r--src/kqueue.cpp3
-rw-r--r--src/kqueue.hpp3
-rw-r--r--src/lb.cpp9
-rw-r--r--src/lb.hpp3
-rw-r--r--src/likely.hpp4
-rw-r--r--src/mailbox.cpp3
-rw-r--r--src/mailbox.hpp3
-rw-r--r--src/msg.cpp3
-rw-r--r--src/msg.hpp11
-rw-r--r--src/mtrie.cpp4
-rw-r--r--src/mtrie.hpp4
-rw-r--r--src/mutex.hpp3
-rw-r--r--src/object.cpp3
-rw-r--r--src/object.hpp3
-rw-r--r--src/options.cpp32
-rw-r--r--src/options.hpp14
-rw-r--r--src/own.cpp4
-rw-r--r--src/own.hpp4
-rw-r--r--src/pair.cpp3
-rw-r--r--src/pair.hpp3
-rw-r--r--src/pgm_receiver.cpp4
-rw-r--r--src/pgm_receiver.hpp4
-rw-r--r--src/pgm_sender.cpp4
-rw-r--r--src/pgm_sender.hpp4
-rw-r--r--src/pgm_socket.cpp4
-rw-r--r--src/pgm_socket.hpp4
-rw-r--r--src/pipe.cpp21
-rw-r--r--src/pipe.hpp13
-rw-r--r--src/poll.cpp3
-rw-r--r--src/poll.hpp3
-rw-r--r--src/poller.hpp3
-rw-r--r--src/poller_base.cpp4
-rw-r--r--src/poller_base.hpp4
-rw-r--r--src/pub.cpp3
-rw-r--r--src/pub.hpp3
-rw-r--r--src/pull.cpp3
-rw-r--r--src/pull.hpp3
-rw-r--r--src/push.cpp3
-rw-r--r--src/push.hpp3
-rw-r--r--src/random.cpp4
-rw-r--r--src/random.hpp4
-rw-r--r--src/reaper.cpp4
-rw-r--r--src/reaper.hpp4
-rw-r--r--src/rep.cpp20
-rw-r--r--src/rep.hpp1
-rw-r--r--src/req.cpp62
-rw-r--r--src/req.hpp11
-rwxr-xr-xsrc/router.cpp285
-rwxr-xr-xsrc/router.hpp123
-rw-r--r--src/select.cpp3
-rw-r--r--src/select.hpp3
-rw-r--r--src/session_base.cpp45
-rw-r--r--src/session_base.hpp8
-rw-r--r--src/signaler.cpp4
-rw-r--r--src/signaler.hpp4
-rw-r--r--src/socket_base.cpp76
-rw-r--r--src/socket_base.hpp10
-rw-r--r--src/stdint.hpp2
-rw-r--r--src/stream_engine.cpp3
-rw-r--r--src/stream_engine.hpp3
-rw-r--r--src/sub.cpp3
-rw-r--r--src/sub.hpp3
-rw-r--r--src/tcp_address.cpp3
-rw-r--r--src/tcp_address.hpp3
-rw-r--r--src/tcp_connecter.cpp3
-rw-r--r--src/tcp_connecter.hpp3
-rw-r--r--src/tcp_listener.cpp3
-rw-r--r--src/tcp_listener.hpp3
-rw-r--r--src/thread.cpp1
-rw-r--r--src/thread.hpp1
-rw-r--r--src/trie.cpp3
-rw-r--r--src/trie.hpp3
-rw-r--r--src/vtcp_connecter.cpp251
-rw-r--r--src/vtcp_connecter.hpp120
-rw-r--r--src/vtcp_listener.cpp124
-rw-r--r--src/vtcp_listener.hpp71
-rw-r--r--src/windows.hpp3
-rw-r--r--src/wire.hpp2
-rw-r--r--src/xpub.cpp8
-rw-r--r--src/xpub.hpp4
-rw-r--r--src/xrep.cpp120
-rw-r--r--src/xrep.hpp7
-rw-r--r--src/xreq.cpp20
-rw-r--r--src/xreq.hpp3
-rw-r--r--src/xsub.cpp14
-rw-r--r--src/xsub.hpp4
-rw-r--r--src/ypipe.hpp3
-rw-r--r--src/yqueue.hpp3
-rw-r--r--src/zmq.cpp1
-rw-r--r--src/zmq_utils.cpp3
130 files changed, 519 insertions, 1342 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 3b7dec6..4d3cba3 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -9,6 +9,7 @@ libzmq_la_SOURCES = \
array.hpp \
atomic_counter.hpp \
atomic_ptr.hpp \
+ blob.hpp \
clock.hpp \
command.hpp \
config.hpp \
@@ -55,7 +56,6 @@ libzmq_la_SOURCES = \
reaper.hpp \
rep.hpp \
req.hpp \
- router.hpp \
select.hpp \
session_base.hpp \
signaler.hpp \
@@ -68,8 +68,6 @@ libzmq_la_SOURCES = \
tcp_listener.hpp \
thread.hpp \
trie.hpp \
- vtcp_connecter.hpp \
- vtcp_listener.hpp \
windows.hpp \
wire.hpp \
xpub.hpp \
@@ -113,7 +111,6 @@ libzmq_la_SOURCES = \
reaper.cpp \
pub.cpp \
random.cpp \
- router.cpp \
rep.cpp \
req.cpp \
select.cpp \
@@ -127,8 +124,6 @@ libzmq_la_SOURCES = \
tcp_listener.cpp \
thread.cpp \
trie.cpp \
- vtcp_connecter.cpp \
- vtcp_listener.cpp \
xpub.cpp \
xrep.cpp \
xreq.cpp \
diff --git a/src/array.hpp b/src/array.hpp
index b1f6eca..7e4ddd4 100644
--- a/src/array.hpp
+++ b/src/array.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/atomic_counter.hpp b/src/atomic_counter.hpp
index d7116d8..a0a67bf 100644
--- a/src/atomic_counter.hpp
+++ b/src/atomic_counter.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/atomic_ptr.hpp b/src/atomic_ptr.hpp
index c106cd5..c59ab81 100644
--- a/src/atomic_ptr.hpp
+++ b/src/atomic_ptr.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/blob.hpp b/src/blob.hpp
new file mode 100644
index 0000000..b8039c4
--- /dev/null
+++ b/src/blob.hpp
@@ -0,0 +1,35 @@
+/*
+ Copyright (c) 2010 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_BLOB_HPP_INCLUDED__
+#define __ZMQ_BLOB_HPP_INCLUDED__
+
+#include <string>
+
+namespace zmq
+{
+
+ // Object to hold dynamically allocated opaque binary data.
+ typedef std::basic_string <unsigned char> blob_t;
+
+}
+
+#endif
+
diff --git a/src/clock.cpp b/src/clock.cpp
index f98a2f4..92fc4be 100644
--- a/src/clock.cpp
+++ b/src/clock.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/clock.hpp b/src/clock.hpp
index 1b34989..b3b19b2 100644
--- a/src/clock.hpp
+++ b/src/clock.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/command.hpp b/src/command.hpp
index 1513ca8..ecf2d93 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/config.hpp b/src/config.hpp
index 96e39de..c6ac264 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 8aa10d9..d8783be 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -1,4 +1,5 @@
/*
+ Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 22ac932..619d57e 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/decoder.cpp b/src/decoder.cpp
index d57265a..48f457f 100644
--- a/src/decoder.cpp
+++ b/src/decoder.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/decoder.hpp b/src/decoder.hpp
index de63a09..c6f9100 100644
--- a/src/decoder.hpp
+++ b/src/decoder.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/devpoll.cpp b/src/devpoll.cpp
index c4b3c54..0c46d14 100644
--- a/src/devpoll.cpp
+++ b/src/devpoll.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/devpoll.hpp b/src/devpoll.hpp
index a668e9a..1de1af0 100644
--- a/src/devpoll.hpp
+++ b/src/devpoll.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/dist.cpp b/src/dist.cpp
index b4fae6f..d220c43 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -1,6 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 VMware, Inc.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -111,8 +112,7 @@ int zmq::dist_t::send_to_all (msg_t *msg_, int flags_)
int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
{
// Is this end of a multipart message?
- bool msg_more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool msg_more = msg_->flags () & msg_t::more ? true : false;
// Push the message to matching pipes.
distribute (msg_, flags_);
@@ -181,7 +181,7 @@ bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
eligible--;
return false;
}
- if (!(msg_->flags () & (msg_t::more | msg_t::label)))
+ if (!(msg_->flags () & msg_t::more))
pipe_->flush ();
return true;
}
diff --git a/src/dist.hpp b/src/dist.hpp
index c8d121c..a72de6e 100644
--- a/src/dist.hpp
+++ b/src/dist.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/encoder.cpp b/src/encoder.cpp
index 8689e45..030b3ef 100644
--- a/src/encoder.cpp
+++ b/src/encoder.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -89,14 +91,14 @@ bool zmq::encoder_t::message_ready ()
tmpbuf [0] = (unsigned char) size;
tmpbuf [1] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 2, &encoder_t::size_ready,
- !(in_progress.flags () & (msg_t::more | msg_t::label)));
+ !(in_progress.flags () & msg_t::more));
}
else {
tmpbuf [0] = 0xff;
put_uint64 (tmpbuf + 1, size);
tmpbuf [9] = (in_progress.flags () & ~msg_t::shared);
next_step (tmpbuf, 10, &encoder_t::size_ready,
- !(in_progress.flags () & (msg_t::more | msg_t::label)));
+ !(in_progress.flags () & msg_t::more));
}
return true;
}
diff --git a/src/encoder.hpp b/src/encoder.hpp
index 949cbdc..8001c4e 100644
--- a/src/encoder.hpp
+++ b/src/encoder.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/epoll.cpp b/src/epoll.cpp
index 39b4547..a62345d 100644
--- a/src/epoll.cpp
+++ b/src/epoll.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/epoll.hpp b/src/epoll.hpp
index dc6b3ed..9bc31a5 100644
--- a/src/epoll.hpp
+++ b/src/epoll.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/err.cpp b/src/err.cpp
index ff81e03..028d752 100644
--- a/src/err.cpp
+++ b/src/err.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/err.hpp b/src/err.hpp
index 7c7a9d8..53a6569 100644
--- a/src/err.hpp
+++ b/src/err.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/fd.hpp b/src/fd.hpp
index 3b15024..773e380 100644
--- a/src/fd.hpp
+++ b/src/fd.hpp
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/fq.cpp b/src/fq.cpp
index abd4160..429c038 100644
--- a/src/fq.cpp
+++ b/src/fq.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -91,7 +93,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_)
if (pipe_)
*pipe_ = pipes [current];
more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ msg_->flags () & msg_t::more ? true : false;
if (!more) {
current++;
if (current >= active)
diff --git a/src/fq.hpp b/src/fq.hpp
index be9c695..24d7b85 100644
--- a/src/fq.hpp
+++ b/src/fq.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index 26e475b..19359b7 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/i_poll_events.hpp b/src/i_poll_events.hpp
index fa9fb25..9cf47fd 100644
--- a/src/i_poll_events.hpp
+++ b/src/i_poll_events.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/io_object.cpp b/src/io_object.cpp
index e68917c..81b9ce5 100644
--- a/src/io_object.cpp
+++ b/src/io_object.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/io_object.hpp b/src/io_object.hpp
index fb0d1e3..bf7a625 100644
--- a/src/io_object.hpp
+++ b/src/io_object.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index c6f3880..40bbef9 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index f578d4e..986c88d 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ip.cpp b/src/ip.cpp
index 8090a8a..0b4596a 100644
--- a/src/ip.cpp
+++ b/src/ip.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ip.hpp b/src/ip.hpp
index d8553de..c5f31db 100644
--- a/src/ip.hpp
+++ b/src/ip.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ipc_address.cpp b/src/ipc_address.cpp
index 6a471a6..d601c56 100644
--- a/src/ipc_address.cpp
+++ b/src/ipc_address.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ipc_address.hpp b/src/ipc_address.hpp
index 453f5fd..4a7f230 100644
--- a/src/ipc_address.hpp
+++ b/src/ipc_address.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp
index a54e8fe..dc0ee21 100644
--- a/src/ipc_connecter.cpp
+++ b/src/ipc_connecter.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp
index 721bcf4..c02245a 100644
--- a/src/ipc_connecter.hpp
+++ b/src/ipc_connecter.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp
index 5ba41be..07a7dff 100644
--- a/src/ipc_listener.cpp
+++ b/src/ipc_listener.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ipc_listener.hpp b/src/ipc_listener.hpp
index 4cd881b..0f06d23 100644
--- a/src/ipc_listener.hpp
+++ b/src/ipc_listener.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/kqueue.cpp b/src/kqueue.cpp
index cbf38d1..0b07fab 100644
--- a/src/kqueue.cpp
+++ b/src/kqueue.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/kqueue.hpp b/src/kqueue.hpp
index 4ded81e..14f4e49 100644
--- a/src/kqueue.hpp
+++ b/src/kqueue.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/lb.cpp b/src/lb.cpp
index da7cb9d..2a0f769 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -75,7 +77,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_)
// switch back to non-dropping mode.
if (dropping) {
- more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more = msg_->flags () & msg_t::more ? true : false;
if (!more)
dropping = false;
@@ -88,8 +90,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_)
while (active > 0) {
if (pipes [current]->write (msg_)) {
- more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more = msg_->flags () & msg_t::more ? true : false;
break;
}
diff --git a/src/lb.hpp b/src/lb.hpp
index 0dfd25e..1de8549 100644
--- a/src/lb.hpp
+++ b/src/lb.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/likely.hpp b/src/likely.hpp
index a524a50..e604464 100644
--- a/src/likely.hpp
+++ b/src/likely.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2009-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/mailbox.cpp b/src/mailbox.cpp
index 9fd3ac4..ff16afe 100644
--- a/src/mailbox.cpp
+++ b/src/mailbox.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/mailbox.hpp b/src/mailbox.hpp
index 0675b99..c059c2a 100644
--- a/src/mailbox.hpp
+++ b/src/mailbox.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/msg.cpp b/src/msg.cpp
index fba7ec9..60d5bf3 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/msg.hpp b/src/msg.hpp
index 6b4e216..8c84670 100644
--- a/src/msg.hpp
+++ b/src/msg.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -47,10 +49,9 @@ namespace zmq
// Mesage flags.
enum
{
- label = 1,
- command = 2,
- shared = 64,
- more = 128
+ more = 1,
+ identity = 64,
+ shared = 128
};
bool check ();
diff --git a/src/mtrie.cpp b/src/mtrie.cpp
index 66bea20..1c96c98 100644
--- a/src/mtrie.cpp
+++ b/src/mtrie.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/mtrie.hpp b/src/mtrie.hpp
index 2c2cc32..8bbc22d 100644
--- a/src/mtrie.hpp
+++ b/src/mtrie.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/mutex.hpp b/src/mutex.hpp
index 9b13ffa..8d7068a 100644
--- a/src/mutex.hpp
+++ b/src/mutex.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/object.cpp b/src/object.cpp
index 807fb04..622754c 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/object.hpp b/src/object.hpp
index 1a38b24..f832596 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/options.cpp b/src/options.cpp
index 8a3e527..4db1a6c 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -27,6 +29,7 @@ zmq::options_t::options_t () :
sndhwm (1000),
rcvhwm (1000),
affinity (0),
+ identity_size (0),
rate (100),
recovery_ivl (10000),
multicast_hops (1),
@@ -43,7 +46,9 @@ zmq::options_t::options_t () :
ipv4only (1),
delay_on_close (true),
delay_on_disconnect (true),
- filter (false)
+ filter (false),
+ send_identity (false),
+ recv_identity (false)
{
}
@@ -76,6 +81,20 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
affinity = *((uint64_t*) optval_);
return 0;
+ case ZMQ_IDENTITY:
+
+ // Empty identity is invalid as well as identity longer than
+ // 255 bytes. Identity starting with binary zero is invalid
+ // as these are used for auto-generated identities.
+ if (optvallen_ < 1 || optvallen_ > 255 ||
+ *((const unsigned char*) optval_) == 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ identity_size = optvallen_;
+ memcpy (identity, optval_, identity_size);
+ return 0;
+
case ZMQ_RATE:
if (optvallen_ != sizeof (int) || *((int*) optval_) <= 0) {
errno = EINVAL;
@@ -232,6 +251,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (uint64_t);
return 0;
+ case ZMQ_IDENTITY:
+ if (*optvallen_ < identity_size) {
+ errno = EINVAL;
+ return -1;
+ }
+ memcpy (optval_, identity, identity_size);
+ *optvallen_ = identity_size;
+ return 0;
+
case ZMQ_RATE:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index 4689522..bfc9dc7 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -41,6 +43,10 @@ namespace zmq
// I/O thread affinity.
uint64_t affinity;
+ // Socket identity
+ unsigned char identity_size;
+ unsigned char identity [256];
+
// Maximum tranfer rate [kb/s]. Default 100kb/s.
int rate;
@@ -93,6 +99,12 @@ namespace zmq
// If 1, (X)SUB socket should filter the messages. If 0, it should not.
bool filter;
+
+ // Sends identity to all new connections.
+ bool send_identity;
+
+ // Receivers identity from all new connections.
+ bool recv_identity;
};
}
diff --git a/src/own.cpp b/src/own.cpp
index f2ca4b2..d6dd309 100644
--- a/src/own.cpp
+++ b/src/own.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/own.hpp b/src/own.hpp
index 0902f73..ad5c452 100644
--- a/src/own.hpp
+++ b/src/own.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pair.cpp b/src/pair.cpp
index 2fa4eac..6c652db 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pair.hpp b/src/pair.hpp
index e7390d6..67de2fd 100644
--- a/src/pair.hpp
+++ b/src/pair.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 6c292cd..122d110 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2010-2011 Miru Limited
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index b9e9a05..3c1d394 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2010-2011 Miru Limited
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 733b1ec..759802f 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2010-2011 Miru Limited
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index d3d5924..d8f046d 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2010-2011 Miru Limited
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 378370c..0274ee4 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2010-2011 Miru Limited
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp
index 8b1be54..5a5ef99 100644
--- a/src/pgm_socket.hpp
+++ b/src/pgm_socket.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2010-2011 Miru Limited
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pipe.cpp b/src/pipe.cpp
index c52deb9..25dd51c 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -63,8 +65,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
peer (NULL),
sink (NULL),
state (active),
- delay (delay_),
- pipe_id (0)
+ delay (delay_)
{
}
@@ -86,14 +87,14 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
sink = sink_;
}
-void zmq::pipe_t::set_pipe_id (uint32_t id_)
+void zmq::pipe_t::set_identity (const blob_t &identity_)
{
- pipe_id = id_;
+ identity = identity_;
}
-uint32_t zmq::pipe_t::get_pipe_id ()
+zmq::blob_t zmq::pipe_t::get_identity ()
{
- return pipe_id;
+ return identity;
}
bool zmq::pipe_t::check_read ()
@@ -136,7 +137,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
return false;
}
- if (!(msg_->flags () & (msg_t::more | msg_t::label)))
+ if (!(msg_->flags () & msg_t::more))
msgs_read++;
if (lwm > 0 && msgs_read % lwm == 0)
@@ -165,7 +166,7 @@ bool zmq::pipe_t::write (msg_t *msg_)
if (unlikely (!check_write (msg_)))
return false;
- bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool more = msg_->flags () & msg_t::more ? true : false;
outpipe->write (*msg_, more);
if (!more)
msgs_written++;
@@ -179,7 +180,7 @@ void zmq::pipe_t::rollback ()
msg_t msg;
if (outpipe) {
while (outpipe->unwrite (&msg)) {
- zmq_assert (msg.flags () & (msg_t::more | msg_t::label));
+ zmq_assert (msg.flags () & msg_t::more);
int rc = msg.close ();
errno_assert (rc == 0);
}
diff --git a/src/pipe.hpp b/src/pipe.hpp
index 437d84d..75a2021 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -27,6 +29,7 @@
#include "object.hpp"
#include "stdint.hpp"
#include "array.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -70,8 +73,8 @@ namespace zmq
void set_event_sink (i_pipe_events *sink_);
// Pipe endpoint can store an opaque ID to be used by its clients.
- void set_pipe_id (uint32_t id_);
- uint32_t get_pipe_id ();
+ void set_identity (const blob_t &identity_);
+ blob_t get_identity ();
// Returns true if there is at least one message to read in the pipe.
bool check_read ();
@@ -182,8 +185,8 @@ namespace zmq
// asks us to.
bool delay;
- // Opaque ID. To be used by the clients, not the pipe itself.
- uint32_t pipe_id;
+ // Identity of the writer. Used uniquely by the reader side.
+ blob_t identity;
// Returns true if the message is delimiter; false otherwise.
static bool is_delimiter (msg_t &msg_);
diff --git a/src/poll.cpp b/src/poll.cpp
index 9d1978b..1d1c423 100644
--- a/src/poll.cpp
+++ b/src/poll.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/poll.hpp b/src/poll.hpp
index 42f3af1..700256d 100644
--- a/src/poll.hpp
+++ b/src/poll.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/poller.hpp b/src/poller.hpp
index a8936ce..a989328 100644
--- a/src/poller.hpp
+++ b/src/poller.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/poller_base.cpp b/src/poller_base.cpp
index d5fb985..6e532ae 100644
--- a/src/poller_base.cpp
+++ b/src/poller_base.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/poller_base.hpp b/src/poller_base.hpp
index 44fe9f1..808ed38 100644
--- a/src/poller_base.hpp
+++ b/src/poller_base.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pub.cpp b/src/pub.cpp
index 15ec291..7458d5f 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pub.hpp b/src/pub.hpp
index 4a4da0f..d418fd4 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pull.cpp b/src/pull.cpp
index 06575da..6028118 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2010 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/pull.hpp b/src/pull.hpp
index 6a46ead..fa36d49 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2010 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/push.cpp b/src/push.cpp
index e91b789..a0ed992 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2010 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/push.hpp b/src/push.hpp
index 1feb71d..ea93693 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2010 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/random.cpp b/src/random.cpp
index 9f7768c..326a3d9 100644
--- a/src/random.cpp
+++ b/src/random.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/random.hpp b/src/random.hpp
index d88b5ee..ca3d39a 100644
--- a/src/random.hpp
+++ b/src/random.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/reaper.cpp b/src/reaper.cpp
index 4c67b37..716f638 100644
--- a/src/reaper.cpp
+++ b/src/reaper.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/reaper.hpp b/src/reaper.hpp
index edcc319..1c1533f 100644
--- a/src/reaper.hpp
+++ b/src/reaper.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2011 250bpm s.r.o.
+ Copyright (c) 2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/rep.cpp b/src/rep.cpp
index 564fa89..02a825c 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -1,4 +1,5 @@
/*
+ Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
@@ -42,7 +43,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_)
return -1;
}
- bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool more = msg_->flags () & msg_t::more ? true : false;
// Push message to the reply pipe.
int rc = xrep_t::xsend (msg_, flags_);
@@ -71,19 +72,20 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
int rc = xrep_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
- if (!(msg_->flags () & msg_t::label))
- break;
+ zmq_assert (msg_->flags () & msg_t::more);
+ bool bottom = (msg_->size () == 0);
rc = xrep_t::xsend (msg_, flags_);
errno_assert (rc == 0);
+ if (bottom)
+ break;
}
request_begins = false;
}
- else {
- int rc = xrep_t::xrecv (msg_, flags_);
- if (rc != 0)
- return rc;
- }
- zmq_assert (!(msg_->flags () & msg_t::label));
+
+ // Get next message part to return to the user.
+ int rc = xrep_t::xrecv (msg_, flags_);
+ if (rc != 0)
+ return rc;
// If whole request is read, flip the FSM to reply-sending state.
if (!(msg_->flags () & msg_t::more)) {
diff --git a/src/rep.hpp b/src/rep.hpp
index 55d57bd..de9c2b8 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -1,4 +1,5 @@
/*
+ Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
diff --git a/src/req.cpp b/src/req.cpp
index 0832f60..3ba1ec0 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -28,8 +30,7 @@
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) :
xreq_t (parent_, tid_),
receiving_reply (false),
- message_begins (true),
- request_id (generate_random ())
+ message_begins (true)
{
options.type = ZMQ_REQ;
}
@@ -49,19 +50,17 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
// First part of the request is the request identity.
if (message_begins) {
- msg_t prefix;
- int rc = prefix.init_size (4);
+ msg_t bottom;
+ int rc = bottom.init ();
errno_assert (rc == 0);
- prefix.set_flags (msg_t::label);
- unsigned char *data = (unsigned char*) prefix.data ();
- put_uint32 (data, request_id);
- rc = xreq_t::xsend (&prefix, flags_);
+ bottom.set_flags (msg_t::more);
+ rc = xreq_t::xsend (&bottom, 0);
if (rc != 0)
- return rc;
+ return -1;
message_begins = false;
}
- bool more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool more = msg_->flags () & msg_t::more ? true : false;
int rc = xreq_t::xsend (msg_, flags_);
if (rc != 0)
@@ -91,25 +90,11 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
return rc;
// TODO: This should also close the connection with the peer!
- if (unlikely (!(msg_->flags () & msg_t::label) || msg_->size () != 4)) {
+ if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
while (true) {
int rc = xreq_t::xrecv (msg_, flags_);
errno_assert (rc == 0);
- if (!(msg_->flags () & (msg_t::label | msg_t::more)))
- break;
- }
- msg_->close ();
- msg_->init ();
- errno = EAGAIN;
- return -1;
- }
-
- unsigned char *data = (unsigned char*) msg_->data ();
- if (unlikely (get_uint32 (data) != request_id)) {
- while (true) {
- int rc = xreq_t::xrecv (msg_, flags_);
- errno_assert (rc == 0);
- if (!(msg_->flags () & (msg_t::label | msg_t::more)))
+ if (!(msg_->flags () & msg_t::more))
break;
}
msg_->close ();
@@ -117,6 +102,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
errno = EAGAIN;
return -1;
}
+
message_begins = false;
}
@@ -125,8 +111,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
return rc;
// If the reply is fully received, flip the FSM into request-sending state.
- if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
- request_id++;
+ if (!(msg_->flags () & msg_t::more)) {
receiving_reply = false;
message_begins = true;
}
@@ -162,23 +147,32 @@ zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
zmq::req_session_t::~req_session_t ()
{
+ state = options.recv_identity ? identity : bottom;
}
int zmq::req_session_t::write (msg_t *msg_)
{
- if (state == request_id) {
- if (msg_->flags () == msg_t::label && msg_->size () == 4) {
+ switch (state) {
+ case bottom:
+ if (msg_->flags () == msg_t::more && msg_->size () == 0) {
state = body;
return xreq_session_t::write (msg_);
}
- }
- else {
+ break;
+ case body:
if (msg_->flags () == msg_t::more)
return xreq_session_t::write (msg_);
if (msg_->flags () == 0) {
- state = request_id;
+ state = bottom;
+ return xreq_session_t::write (msg_);
+ }
+ break;
+ case identity:
+ if (msg_->flags () == 0) {
+ state = bottom;
return xreq_session_t::write (msg_);
}
+ break;
}
errno = EFAULT;
return -1;
diff --git a/src/req.hpp b/src/req.hpp
index 0207a4f..8fae9d4 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -50,10 +52,6 @@ namespace zmq
// of the message must be empty message part (backtrace stack bottom).
bool message_begins;
- // Request ID. Request numbers gradually increase (and wrap over)
- // so that we don't have to generate random ID for each request.
- uint32_t request_id;
-
req_t (const req_t&);
const req_t &operator = (const req_t&);
};
@@ -73,7 +71,8 @@ namespace zmq
private:
enum {
- request_id,
+ identity,
+ bottom,
body
} state;
diff --git a/src/router.cpp b/src/router.cpp
deleted file mode 100755
index b7e19fb..0000000
--- a/src/router.cpp
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "router.hpp"
-#include "pipe.hpp"
-#include "wire.hpp"
-#include "random.hpp"
-#include "likely.hpp"
-#include "wire.hpp"
-#include "err.hpp"
-
-zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_) :
- socket_base_t (parent_, tid_),
- prefetched (false),
- more_in (false),
- current_out (NULL),
- more_out (false),
- next_peer_id (generate_random ())
-{
- options.type = ZMQ_ROUTER;
-
- prefetched_msg.init ();
-}
-
-zmq::router_t::~router_t ()
-{
- zmq_assert (outpipes.empty ());
- prefetched_msg.close ();
-}
-
-void zmq::router_t::xattach_pipe (pipe_t *pipe_)
-{
- zmq_assert (pipe_);
-
- // Generate a new peer ID. Take care to avoid duplicates.
- outpipes_t::iterator it = outpipes.lower_bound (next_peer_id);
- if (!outpipes.empty ()) {
- while (true) {
- if (it == outpipes.end ())
- it = outpipes.begin ();
- if (it->first != next_peer_id)
- break;
- ++next_peer_id;
- ++it;
- }
- }
-
- // Add the pipe to the map out outbound pipes.
- outpipe_t outpipe = {pipe_, true};
- bool ok = outpipes.insert (outpipes_t::value_type (
- next_peer_id, outpipe)).second;
- zmq_assert (ok);
-
- // Add the pipe to the list of inbound pipes.
- pipe_->set_pipe_id (next_peer_id);
- fq.attach (pipe_);
-
- // Queue the connection command.
- pending_command_t cmd = {1, next_peer_id};
- pending_commands.push_back (cmd);
-
- // Advance next peer ID so that if new connection is dropped shortly after
- // its creation we don't accidentally get two subsequent peers with
- // the same ID.
- ++next_peer_id;
-}
-
-void zmq::router_t::xterminated (pipe_t *pipe_)
-{
- fq.terminated (pipe_);
-
- for (outpipes_t::iterator it = outpipes.begin ();
- it != outpipes.end (); ++it) {
- if (it->second.pipe == pipe_) {
-
- // Queue the disconnection command.
- pending_command_t cmd = {2, it->first};
- pending_commands.push_back (cmd);
-
- // Remove the pipe.
- outpipes.erase (it);
- if (pipe_ == current_out)
- current_out = NULL;
- return;
- }
- }
- zmq_assert (false);
-}
-
-void zmq::router_t::xread_activated (pipe_t *pipe_)
-{
- fq.activated (pipe_);
-}
-
-void zmq::router_t::xwrite_activated (pipe_t *pipe_)
-{
- for (outpipes_t::iterator it = outpipes.begin ();
- it != outpipes.end (); ++it) {
- if (it->second.pipe == pipe_) {
- zmq_assert (!it->second.active);
- it->second.active = true;
- return;
- }
- }
- zmq_assert (false);
-}
-
-int zmq::router_t::xsend (msg_t *msg_, int flags_)
-{
- // If this is the first part of the message it's the ID of the
- // peer to send the message to.
- if (!more_out) {
- zmq_assert (!current_out);
-
- // The first message part has to be label.
- if (unlikely (!(msg_->flags () & msg_t::label))) {
- errno = EFSM;
- return -1;
- }
-
- // Find the pipe associated with the peer ID stored in the message.
- if (unlikely (msg_->size () != 4)) {
- errno = ECANTROUTE;
- return -1;
- }
- uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
- outpipes_t::iterator it = outpipes.find (peer_id);
- if (unlikely (it == outpipes.end ())) {
- errno = ECANTROUTE;
- return -1;
- }
-
- // Check whether the pipe is available for writing.
- msg_t empty;
- int rc = empty.init ();
- errno_assert (rc == 0);
- if (!it->second.pipe->check_write (&empty)) {
- rc = empty.close ();
- errno_assert (rc == 0);
- it->second.active = false;
- errno = EAGAIN;
- return -1;
- }
- rc = empty.close ();
- errno_assert (rc == 0);
-
- // Mark the pipe to send the message to.
- current_out = it->second.pipe;
- more_out = true;
-
- // Clean up the message object.
- rc = msg_->close ();
- errno_assert (rc == 0);
- rc = msg_->init ();
- errno_assert (rc == 0);
- return 0;
- }
-
- // Check whether this is the last part of the message.
- more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
-
- // Push the message into the pipe. If there's no out pipe, just drop it.
- if (current_out) {
- bool ok = current_out->write (msg_);
- if (unlikely (!ok))
- current_out = NULL;
- else if (!more_out) {
- current_out->flush ();
- current_out = NULL;
- }
- }
- else {
- int rc = msg_->close ();
- errno_assert (rc == 0);
- }
-
- // Detach the message from the data buffer.
- int rc = msg_->init ();
- errno_assert (rc == 0);
-
- return 0;
-}
-
-int zmq::router_t::xrecv (msg_t *msg_, int flags_)
-{
- // If there's a queued command, pass it to the caller.
- if (unlikely (!more_in && !pending_commands.empty ())) {
- msg_->init_size (5);
- unsigned char *data = (unsigned char*) msg_->data ();
- put_uint8 (data, pending_commands.front ().cmd);
- put_uint32 (data + 1, pending_commands.front ().peer);
- msg_->set_flags (msg_t::command);
- pending_commands.pop_front ();
- return 0;
- }
-
- // If there is a prefetched message, return it.
- if (prefetched) {
- int rc = msg_->move (prefetched_msg);
- errno_assert (rc == 0);
- more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
- prefetched = false;
- return 0;
- }
-
- // Get next message part.
- pipe_t *pipe;
- int rc = fq.recvpipe (msg_, flags_, &pipe);
- if (rc != 0)
- return -1;
-
- // If we are in the middle of reading a message, just return the next part.
- if (more_in) {
- more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
- return 0;
- }
-
- // We are at the beginning of a new message. Move the message part we
- // have to the prefetched and return the ID of the peer instead.
- rc = prefetched_msg.move (*msg_);
- errno_assert (rc == 0);
- prefetched = true;
- rc = msg_->close ();
- errno_assert (rc == 0);
- rc = msg_->init_size (4);
- errno_assert (rc == 0);
- put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ());
- msg_->set_flags (msg_t::label);
- return 0;
-}
-
-int zmq::router_t::rollback (void)
-{
- if (current_out) {
- current_out->rollback ();
- current_out = NULL;
- more_out = false;
- }
- return 0;
-}
-
-bool zmq::router_t::xhas_in ()
-{
- if (prefetched)
- return true;
- return fq.has_in () || !pending_commands.empty();
-}
-
-bool zmq::router_t::xhas_out ()
-{
- // In theory, GENERIC socket is always ready for writing. Whether actual
- // attempt to write succeeds depends on whitch pipe the message is going
- // to be routed to.
- return true;
-}
-
-zmq::router_session_t::router_session_t (io_thread_t *io_thread_, bool connect_,
- socket_base_t *socket_, const options_t &options_,
- const char *protocol_, const char *address_) :
- session_base_t (io_thread_, connect_, socket_, options_, protocol_,
- address_)
-{
-}
-
-zmq::router_session_t::~router_session_t ()
-{
-}
-
diff --git a/src/router.hpp b/src/router.hpp
deleted file mode 100755
index 9a5c0f9..0000000
--- a/src/router.hpp
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_ROUTER_HPP_INCLUDED__
-#define __ZMQ_ROUTER_HPP_INCLUDED__
-
-#include <map>
-#include <deque>
-
-#include "socket_base.hpp"
-#include "session_base.hpp"
-#include "stdint.hpp"
-#include "msg.hpp"
-#include "fq.hpp"
-
-namespace zmq
-{
-
- class router_t :
- public socket_base_t
- {
- public:
-
- router_t (class ctx_t *parent_, uint32_t tid_);
- ~router_t ();
-
- // Overloads of functions from socket_base_t.
- void xattach_pipe (class pipe_t *pipe_);
- int xsend (class msg_t *msg_, int flags_);
- int xrecv (class msg_t *msg_, int flags_);
- bool xhas_in ();
- bool xhas_out ();
- void xread_activated (class pipe_t *pipe_);
- void xwrite_activated (class pipe_t *pipe_);
- void xterminated (class pipe_t *pipe_);
-
- protected:
-
- // Rollback any message parts that were sent but not yet flushed.
- int rollback ();
-
- private:
-
- // Fair queueing object for inbound pipes.
- fq_t fq;
-
- // Have we prefetched a message.
- bool prefetched;
-
- // Holds the prefetched message.
- msg_t prefetched_msg;
-
- // If true, more incoming message parts are expected.
- bool more_in;
-
- struct outpipe_t
- {
- class pipe_t *pipe;
- bool active;
- };
-
- // Outbound pipes indexed by the peer IDs.
- typedef std::map <uint32_t, outpipe_t> outpipes_t;
- outpipes_t outpipes;
-
- // The pipe we are currently writing to.
- class pipe_t *current_out;
-
- // If true, more outgoing message parts are expected.
- bool more_out;
-
- // Peer ID are generated. It's a simple increment and wrap-over
- // algorithm. This value is the next ID to use (if not used already).
- uint32_t next_peer_id;
-
- // Commands to be delivered to the user.
- struct pending_command_t
- {
- uint8_t cmd;
- uint32_t peer;
- };
- typedef std::deque <pending_command_t> pending_commands_t;
- pending_commands_t pending_commands;
-
- router_t (const router_t&);
- const router_t &operator = (const router_t&);
- };
-
- class router_session_t : public session_base_t
- {
- public:
-
- router_session_t (class io_thread_t *io_thread_, bool connect_,
- class socket_base_t *socket_, const options_t &options_,
- const char *protocol_, const char *address_);
- ~router_session_t ();
-
- private:
-
- router_session_t (const router_session_t&);
- const router_session_t &operator = (const router_session_t&);
- };
-
-}
-
-#endif
diff --git a/src/select.cpp b/src/select.cpp
index 0ecdcd7..56b87ae 100644
--- a/src/select.cpp
+++ b/src/select.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/select.hpp b/src/select.hpp
index 55bc883..9231b6c 100644
--- a/src/select.hpp
+++ b/src/select.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 32dcd4f..f2ee713 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -26,7 +28,6 @@
#include "likely.hpp"
#include "tcp_connecter.hpp"
#include "ipc_connecter.hpp"
-#include "vtcp_connecter.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
@@ -40,7 +41,6 @@
#include "xsub.hpp"
#include "push.hpp"
#include "pull.hpp"
-#include "router.hpp"
#include "pair.hpp"
zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
@@ -88,10 +88,6 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
s = new (std::nothrow) pull_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
- case ZMQ_ROUTER:
- s = new (std::nothrow) router_session_t (io_thread_, connect_,
- socket_, options_, protocol_, address_);
- break;
case ZMQ_PAIR:
s = new (std::nothrow) pair_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
@@ -116,7 +112,9 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
engine (NULL),
socket (socket_),
io_thread (io_thread_),
- has_linger_timer (false)
+ has_linger_timer (false),
+ send_identity (options_.send_identity),
+ recv_identity (options_.recv_identity)
{
if (protocol_)
protocol = protocol_;
@@ -150,18 +148,33 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
int zmq::session_base_t::read (msg_t *msg_)
{
+ // First message to send is identity (if required).
+ if (send_identity) {
+ zmq_assert (!(msg_->flags () & msg_t::more));
+ msg_->init_size (options.identity_size);
+ memcpy (msg_->data (), options.identity, options.identity_size);
+ send_identity = false;
+ incomplete_in = false;
+ return 0;
+ }
+
if (!pipe || !pipe->read (msg_)) {
errno = EAGAIN;
return -1;
}
+ incomplete_in = msg_->flags () & msg_t::more ? true : false;
- incomplete_in =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
return 0;
}
int zmq::session_base_t::write (msg_t *msg_)
{
+ // First message to receive is identity (if required).
+ if (recv_identity) {
+ msg_->set_flags (msg_t::identity);
+ recv_identity = false;
+ }
+
if (pipe && pipe->write (msg_)) {
int rc = msg_->init ();
errno_assert (rc == 0);
@@ -398,18 +411,6 @@ void zmq::session_base_t::start_connecting (bool wait_)
}
#endif
-#if defined ZMQ_HAVE_VTCP
- if (protocol == "vtcp") {
-
- vtcp_connecter_t *connecter = new (std::nothrow) vtcp_connecter_t (
- io_thread, this, options, address.c_str (),
- wait_);
- alloc_assert (connecter);
- launch_child (connecter);
- return;
- }
-#endif
-
#if defined ZMQ_HAVE_OPENPGM
// Both PGM and EPGM transports are using the same infrastructure.
diff --git a/src/session_base.hpp b/src/session_base.hpp
index e388d42..c89628f 100644
--- a/src/session_base.hpp
+++ b/src/session_base.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -118,6 +120,10 @@ namespace zmq
// True is linger timer is running.
bool has_linger_timer;
+ // If true, identity is to be sent/recvd from the network.
+ bool send_identity;
+ bool recv_identity;
+
// Protocol and address to use when connecting.
std::string protocol;
std::string address;
diff --git a/src/signaler.cpp b/src/signaler.cpp
index aac3e7c..7f8495f 100644
--- a/src/signaler.cpp
+++ b/src/signaler.cpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/signaler.hpp b/src/signaler.hpp
index dd474d9..4466c98 100644
--- a/src/signaler.hpp
+++ b/src/signaler.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index a4d89db..a59ba69 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -36,7 +38,6 @@
#include "socket_base.hpp"
#include "tcp_listener.hpp"
#include "ipc_listener.hpp"
-#include "vtcp_listener.hpp"
#include "tcp_connecter.hpp"
#include "io_thread.hpp"
#include "session_base.hpp"
@@ -60,7 +61,6 @@
#include "xrep.hpp"
#include "xpub.hpp"
#include "xsub.hpp"
-#include "router.hpp"
bool zmq::socket_base_t::check_tag ()
{
@@ -106,9 +106,6 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
case ZMQ_XSUB:
s = new (std::nothrow) xsub_t (parent_, tid_);
break;
- case ZMQ_ROUTER:
- s = new (std::nothrow) router_t (parent_, tid_);
- break;
default:
errno = EINVAL;
return NULL;
@@ -124,8 +121,6 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
destroyed (false),
last_tsc (0),
ticks (0),
- rcvlabel (false),
- rcvcmd (false),
rcvmore (false)
{
}
@@ -176,8 +171,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
// First check out whether the protcol is something we are aware of.
if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" &&
- protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys" &&
- protocol_ != "vtcp") {
+ protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys") {
errno = EPROTONOSUPPORT;
return -1;
}
@@ -191,14 +185,6 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
}
#endif
- // If 0MQ is not compiled with VTCP, vtcp transport is not avaialble.
-#if !defined ZMQ_HAVE_VTCP
- if (protocol_ == "vtcp") {
- errno = EPROTONOSUPPORT;
- return -1;
- }
-#endif
-
// IPC transport is not available on Windows and OpenVMS.
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
if (protocol_ == "ipc") {
@@ -265,26 +251,6 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return -1;
}
- if (option_ == ZMQ_RCVLABEL) {
- if (*optvallen_ < sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
- *((int*) optval_) = rcvlabel ? 1 : 0;
- *optvallen_ = sizeof (int);
- return 0;
- }
-
- if (option_ == ZMQ_RCVCMD) {
- if (*optvallen_ < sizeof (int)) {
- errno = EINVAL;
- return -1;
- }
- *((int*) optval_) = rcvcmd ? 1 : 0;
- *optvallen_ = sizeof (int);
- return 0;
- }
-
if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
@@ -392,21 +358,6 @@ int zmq::socket_base_t::bind (const char *addr_)
}
#endif
-#if defined ZMQ_HAVE_VTCP
- if (protocol == "vtcp") {
- vtcp_listener_t *listener = new (std::nothrow) vtcp_listener_t (
- io_thread, this, options);
- alloc_assert (listener);
- int rc = listener->set_address (address.c_str ());
- if (rc != 0) {
- delete listener;
- return -1;
- }
- launch_child (listener);
- return 0;
- }
-#endif
-
zmq_assert (false);
return -1;
}
@@ -524,12 +475,8 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
return -1;
// At this point we impose the flags on the message.
- if (flags_ & ZMQ_SNDLABEL)
- msg_->set_flags (msg_t::label);
if (flags_ & ZMQ_SNDMORE)
msg_->set_flags (msg_t::more);
- if (flags_ & ZMQ_SNDCMD)
- msg_->set_flags (msg_t::command);
// Try to send the message.
rc = xsend (msg_, flags_);
@@ -898,13 +845,16 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)
void zmq::socket_base_t::extract_flags (msg_t *msg_)
{
- rcvlabel = msg_->flags () & msg_t::label;
- if (rcvlabel)
- msg_->reset_flags (msg_t::label);
+ // Test whether IDENTITY flag is valid for this socket type.
+ if (unlikely (msg_->flags () & msg_t::identity)) {
+ zmq_assert (options.recv_identity);
+printf ("identity recvd\n");
+ }
+
+
+ // Remove MORE flag.
rcvmore = msg_->flags () & msg_t::more ? true : false;
if (rcvmore)
msg_->reset_flags (msg_t::more);
- rcvcmd = msg_->flags () & msg_t::command ? true : false;
- if (rcvcmd)
- msg_->reset_flags (msg_t::command);
}
+
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index c7c86e7..bc978ba 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -183,12 +185,6 @@ namespace zmq
// Number of messages received since last command processing.
int ticks;
- // True if the last message received had LABEL flag set.
- bool rcvlabel;
-
- // True if the last message received had COMMAND flag set.
- bool rcvcmd;
-
// True if the last message received had MORE flag set.
bool rcvmore;
diff --git a/src/stdint.hpp b/src/stdint.hpp
index 73186d3..b78afcd 100644
--- a/src/stdint.hpp
+++ b/src/stdint.hpp
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index 2647795..11ec264 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp
index 92fc55f..6d122ed 100644
--- a/src/stream_engine.hpp
+++ b/src/stream_engine.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/sub.cpp b/src/sub.cpp
index d9f2f2e..3249aea 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/sub.hpp b/src/sub.hpp
index 7d3cf0b..bb46641 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/tcp_address.cpp b/src/tcp_address.cpp
index 0aa564a..1b7577f 100644
--- a/src/tcp_address.cpp
+++ b/src/tcp_address.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/tcp_address.hpp b/src/tcp_address.hpp
index 58ac540..d4768c7 100644
--- a/src/tcp_address.hpp
+++ b/src/tcp_address.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index fe99252..75079da 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp
index d1a93cd..e420c82 100644
--- a/src/tcp_connecter.hpp
+++ b/src/tcp_connecter.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 9b6068c..0b7a90d 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2010 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp
index 60713e3..e712998 100644
--- a/src/tcp_listener.hpp
+++ b/src/tcp_listener.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2010 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/thread.cpp b/src/thread.cpp
index d1c6729..00628e5 100644
--- a/src/thread.cpp
+++ b/src/thread.cpp
@@ -1,4 +1,5 @@
/*
+ Copyright (c) 2010-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
diff --git a/src/thread.hpp b/src/thread.hpp
index f3f5f8d..52769b1 100644
--- a/src/thread.hpp
+++ b/src/thread.hpp
@@ -1,4 +1,5 @@
/*
+ Copyright (c) 2010-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
diff --git a/src/trie.cpp b/src/trie.cpp
index cd6cb7b..9718c77 100644
--- a/src/trie.cpp
+++ b/src/trie.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/trie.hpp b/src/trie.hpp
index a2b55c6..76e4fd9 100644
--- a/src/trie.hpp
+++ b/src/trie.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/vtcp_connecter.cpp b/src/vtcp_connecter.cpp
deleted file mode 100644
index 5dc147e..0000000
--- a/src/vtcp_connecter.cpp
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "vtcp_connecter.hpp"
-
-#if defined ZMQ_HAVE_VTCP
-
-#include <new>
-#include <string>
-
-#include "stream_engine.hpp"
-#include "io_thread.hpp"
-#include "platform.hpp"
-#include "random.hpp"
-#include "likely.hpp"
-#include "err.hpp"
-#include "ip.hpp"
-
-#if defined ZMQ_HAVE_WINDOWS
-#include "windows.hpp"
-#else
-#include <unistd.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <arpa/inet.h>
-#include <netinet/tcp.h>
-#include <netinet/in.h>
-#include <netdb.h>
-#include <fcntl.h>
-#ifdef ZMQ_HAVE_OPENVMS
-#include <ioctl.h>
-#endif
-#endif
-
-zmq::vtcp_connecter_t::vtcp_connecter_t (class io_thread_t *io_thread_,
- class session_base_t *session_, const options_t &options_,
- const char *address_, bool wait_) :
- own_t (io_thread_, options_),
- io_object_t (io_thread_),
- s (retired_fd),
- handle_valid (false),
- wait (wait_),
- session (session_),
- current_reconnect_ivl(options.reconnect_ivl)
-{
- subport = 0;
-
- int rc = set_address (address_);
- zmq_assert (rc == 0);
-}
-
-zmq::vtcp_connecter_t::~vtcp_connecter_t ()
-{
- if (wait)
- cancel_timer (reconnect_timer_id);
- if (handle_valid)
- rm_fd (handle);
-
- if (s != retired_fd)
- close ();
-}
-
-int zmq::vtcp_connecter_t::set_address (const char *addr_)
-{
- const char *delimiter = strrchr (addr_, '.');
- if (!delimiter) {
- delimiter = strrchr (addr_, ':');
- if (!delimiter) {
- errno = EINVAL;
- return -1;
- }
- std::string addr_str (addr_, delimiter - addr_);
- addr_str += ":9220";
- std::string subport_str (delimiter + 1);
- subport = (vtcp_subport_t) atoi (subport_str.c_str ());
- int rc = address.resolve (addr_str.c_str (), false, true);
- if (rc != 0)
- return -1;
- }
- else {
- std::string addr_str (addr_, delimiter - addr_);
- std::string subport_str (delimiter + 1);
- subport = (vtcp_subport_t) atoi (subport_str.c_str ());
- int rc = address.resolve (addr_str.c_str (), false, true);
- if (rc != 0)
- return -1;
- }
-
- return 0;
-}
-
-void zmq::vtcp_connecter_t::process_plug ()
-{
- if (wait)
- add_reconnect_timer();
- else
- start_connecting ();
-}
-
-void zmq::vtcp_connecter_t::in_event ()
-{
- // We are not polling for incomming data, so we are actually called
- // because of error here. However, we can get error on out event as well
- // on some platforms, so we'll simply handle both events in the same way.
- out_event ();
-}
-
-void zmq::vtcp_connecter_t::out_event ()
-{
- fd_t fd = connect ();
- rm_fd (handle);
- handle_valid = false;
-
- // Handle the error condition by attempt to reconnect.
- if (fd == retired_fd) {
- close ();
- wait = true;
- add_reconnect_timer();
- return;
- }
-
- // Create the engine object for this connection.
- stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options);
- alloc_assert (engine);
-
- // Attach the engine to the corresponding session object.
- send_attach (session, engine);
-
- // Shut the connecter down.
- terminate ();
-}
-
-void zmq::vtcp_connecter_t::timer_event (int id_)
-{
- zmq_assert (id_ == reconnect_timer_id);
- wait = false;
- start_connecting ();
-}
-
-void zmq::vtcp_connecter_t::start_connecting ()
-{
- // Open the connecting socket.
- int rc = open ();
-
- // Handle error condition by eventual reconnect.
- if (unlikely (rc != 0)) {
- errno_assert (false);
- wait = true;
- add_reconnect_timer();
- return;
- }
-
- // Connection establishment may be dealyed. Poll for its completion.
- handle = add_fd (s);
- handle_valid = true;
- set_pollout (handle);
-}
-
-void zmq::vtcp_connecter_t::add_reconnect_timer()
-{
- add_timer (get_new_reconnect_ivl(), reconnect_timer_id);
-}
-
-int zmq::vtcp_connecter_t::get_new_reconnect_ivl ()
-{
- // The new interval is the current interval + random value.
- int this_interval = current_reconnect_ivl +
- (generate_random () % options.reconnect_ivl);
-
- // Only change the current reconnect interval if the maximum reconnect
- // interval was set and if it's larger than the reconnect interval.
- if (options.reconnect_ivl_max > 0 &&
- options.reconnect_ivl_max > options.reconnect_ivl) {
-
- // Calculate the next interval
- current_reconnect_ivl = current_reconnect_ivl * 2;
- if(current_reconnect_ivl >= options.reconnect_ivl_max) {
- current_reconnect_ivl = options.reconnect_ivl_max;
- }
- }
- return this_interval;
-}
-
-int zmq::vtcp_connecter_t::open ()
-{
- zmq_assert (s == retired_fd);
-
- // Start the connection procedure.
- sockaddr_in *paddr = (sockaddr_in*) address.addr ();
- s = vtcp_connect (paddr->sin_addr.s_addr, ntohs (paddr->sin_port));
-
- // Connect was successfull immediately.
- if (s != retired_fd)
- return 0;
-
- // Asynchronous connect was launched.
- if (errno == EINPROGRESS) {
- errno = EAGAIN;
- return -1;
- }
-
- // Error occured.
- return -1;
-}
-
-zmq::fd_t zmq::vtcp_connecter_t::connect ()
-{
- int rc = vtcp_acceptc (s, subport);
- if (rc != 0) {
- int err = errno;
- close ();
- errno = err;
- return retired_fd;
- }
-
- tune_tcp_socket (s);
-
- fd_t result = s;
- s = retired_fd;
- return result;
-}
-
-int zmq::vtcp_connecter_t::close ()
-{
- zmq_assert (s != retired_fd);
- int rc = ::close (s);
- if (rc != 0)
- return -1;
- s = retired_fd;
- return 0;
-}
-
-#endif
-
diff --git a/src/vtcp_connecter.hpp b/src/vtcp_connecter.hpp
deleted file mode 100644
index fe5260e..0000000
--- a/src/vtcp_connecter.hpp
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __VTCP_CONNECTER_HPP_INCLUDED__
-#define __VTCP_CONNECTER_HPP_INCLUDED__
-
-#include "platform.hpp"
-
-#if defined ZMQ_HAVE_VTCP
-
-#include <vtcp.h>
-
-#include "fd.hpp"
-#include "own.hpp"
-#include "stdint.hpp"
-#include "io_object.hpp"
-#include "tcp_address.hpp"
-
-namespace zmq
-{
-
- class vtcp_connecter_t : public own_t, public io_object_t
- {
- public:
-
- // If 'delay' is true connecter first waits for a while, then starts
- // connection process.
- vtcp_connecter_t (class io_thread_t *io_thread_,
- class session_base_t *session_, const options_t &options_,
- const char *address_, bool delay_);
- ~vtcp_connecter_t ();
-
- private:
-
- // ID of the timer used to delay the reconnection.
- enum {reconnect_timer_id = 1};
-
- // Handlers for incoming commands.
- void process_plug ();
-
- // Handlers for I/O events.
- void in_event ();
- void out_event ();
- void timer_event (int id_);
-
- // Internal function to start the actual connection establishment.
- void start_connecting ();
-
- // Internal function to add a reconnect timer
- void add_reconnect_timer();
-
- // Internal function to return a reconnect backoff delay.
- // Will modify the current_reconnect_ivl used for next call
- // Returns the currently used interval
- int get_new_reconnect_ivl ();
-
- // Set address to connect to.
- int set_address (const char *addr_);
-
- // Open TCP connecting socket. Returns -1 in case of error,
- // 0 if connect was successfull immediately and 1 if async connect
- // was launched.
- int open ();
-
- // Close the connecting socket.
- int close ();
-
- // Get the file descriptor of newly created connection. Returns
- // retired_fd if the connection was unsuccessfull.
- fd_t connect ();
-
- // Address to connect to.
- tcp_address_t address;
- vtcp_subport_t subport;
-
- // Underlying socket.
- fd_t s;
-
- // Handle corresponding to the listening socket.
- handle_t handle;
-
- // If true file descriptor is registered with the poller and 'handle'
- // contains valid value.
- bool handle_valid;
-
- // If true, connecter is waiting a while before trying to connect.
- bool wait;
-
- // Reference to the session we belong to.
- class session_base_t *session;
-
- // Current reconnect ivl, updated for backoff strategy
- int current_reconnect_ivl;
-
- vtcp_connecter_t (const vtcp_connecter_t&);
- const vtcp_connecter_t &operator = (const vtcp_connecter_t&);
- };
-
-}
-
-#endif
-
-#endif
diff --git a/src/vtcp_listener.cpp b/src/vtcp_listener.cpp
deleted file mode 100644
index 7e496e5..0000000
--- a/src/vtcp_listener.cpp
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "vtcp_listener.hpp"
-
-#if defined ZMQ_HAVE_VTCP
-
-#include <string>
-#include <string.h>
-#include <vtcp.h>
-
-#include "stream_engine.hpp"
-#include "session_base.hpp"
-#include "stdint.hpp"
-#include "err.hpp"
-#include "ip.hpp"
-
-zmq::vtcp_listener_t::vtcp_listener_t (io_thread_t *io_thread_,
- socket_base_t *socket_, options_t &options_) :
- own_t (io_thread_, options_),
- io_object_t (io_thread_),
- s (retired_fd),
- socket (socket_)
-{
-}
-
-zmq::vtcp_listener_t::~vtcp_listener_t ()
-{
- if (s != retired_fd) {
- int rc = ::close (s);
- errno_assert (rc == 0);
- s = retired_fd;
- }
-}
-
-int zmq::vtcp_listener_t::set_address (const char *addr_)
-{
- // VTCP doesn't allow for binding to a specific interface. Connection
- // string has to begin with *: (INADDR_ANY).
- if (strlen (addr_) < 2 || addr_ [0] != '*' || addr_ [1] != ':') {
- errno = EADDRNOTAVAIL;
- return -1;
- }
-
- // Parse port and subport.
- uint16_t port;
- uint32_t subport;
- const char *delimiter = strrchr (addr_, '.');
- if (!delimiter) {
- port = 9220;
- subport = (uint32_t) atoi (addr_ + 2);
- }
- else {
- std::string port_str (addr_ + 2, delimiter - addr_ - 2);
- std::string subport_str (delimiter + 1);
- port = (uint16_t) atoi (port_str.c_str ());
- subport = (uint32_t) atoi (subport_str.c_str ());
- }
-
- // Start listening.
- s = vtcp_bind (port, subport);
- if (s == retired_fd)
- return -1;
-
- return 0;
-}
-
-void zmq::vtcp_listener_t::process_plug ()
-{
- // Start polling for incoming connections.
- handle = add_fd (s);
- set_pollin (handle);
-}
-
-void zmq::vtcp_listener_t::process_term (int linger_)
-{
- rm_fd (handle);
- own_t::process_term (linger_);
-}
-
-void zmq::vtcp_listener_t::in_event ()
-{
- fd_t fd = vtcp_acceptb (s);
- if (fd == retired_fd)
- return;
-
- tune_tcp_socket (fd);
-
- // Create the engine object for this connection.
- stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options);
- alloc_assert (engine);
-
- // Choose I/O thread to run connecter in. Given that we are already
- // running in an I/O thread, there must be at least one available.
- io_thread_t *io_thread = choose_io_thread (options.affinity);
- zmq_assert (io_thread);
-
- // Create and launch a session object.
- session_base_t *session = session_base_t::create (io_thread, false, socket,
- options, NULL, NULL);
- alloc_assert (session);
- session->inc_seqnum ();
- launch_child (session);
- send_attach (session, engine, false);
-}
-
-#endif
diff --git a/src/vtcp_listener.hpp b/src/vtcp_listener.hpp
deleted file mode 100644
index 78f3b51..0000000
--- a/src/vtcp_listener.hpp
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_VTCP_LISTENER_HPP_INCLUDED__
-#define __ZMQ_VTCP_LISTENER_HPP_INCLUDED__
-
-#include "platform.hpp"
-
-#if defined ZMQ_HAVE_VTCP
-
-#include "own.hpp"
-#include "io_object.hpp"
-#include "fd.hpp"
-
-namespace zmq
-{
-
- class vtcp_listener_t : public own_t, public io_object_t
- {
- public:
-
- vtcp_listener_t (class io_thread_t *io_thread_,
- class socket_base_t *socket_, class options_t &options_);
- ~vtcp_listener_t ();
-
- int set_address (const char *addr_);
-
- private:
-
- // Handlers for incoming commands.
- void process_plug ();
- void process_term (int linger_);
-
- // Handlers for I/O events.
- void in_event ();
-
- // VTCP listener socket.
- fd_t s;
-
- // Handle corresponding to the listening socket.
- handle_t handle;
-
- // Socket the listerner belongs to.
- class socket_base_t *socket;
-
- vtcp_listener_t (const vtcp_listener_t&);
- const vtcp_listener_t &operator = (const vtcp_listener_t&);
- };
-
-}
-
-#endif
-
-#endif
diff --git a/src/windows.hpp b/src/windows.hpp
index 8f39914..5e986b2 100644
--- a/src/windows.hpp
+++ b/src/windows.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/wire.hpp b/src/wire.hpp
index bc9dfe5..b0f4e0e 100644
--- a/src/wire.hpp
+++ b/src/wire.hpp
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/xpub.cpp b/src/xpub.cpp
index a245fea..5d7a97c 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -1,6 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2011 VMware, Inc.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -100,8 +101,7 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
{
- bool msg_more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ bool msg_more = msg_->flags () & msg_t::more ? true : false;
// For the first part of multi-part message, find the matching pipes.
if (!more)
diff --git a/src/xpub.hpp b/src/xpub.hpp
index b410e6c..14ffc58 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 9f2a947..ea19e56 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -35,9 +37,14 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
{
options.type = ZMQ_XREP;
+ // TODO: Uncomment the following line when XREP will become true XREP
+ // rather than generic router socket.
// If peer disconnect there's noone to send reply to anyway. We can drop
// all the outstanding requests from that peer.
- options.delay_on_disconnect = false;
+ // options.delay_on_disconnect = false;
+
+ options.send_identity = true;
+ options.recv_identity = true;
prefetched_msg.init ();
}
@@ -52,33 +59,22 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (pipe_);
- // Generate a new peer ID. Take care to avoid duplicates.
- outpipes_t::iterator it = outpipes.lower_bound (next_peer_id);
- if (!outpipes.empty ()) {
- while (true) {
- if (it == outpipes.end ())
- it = outpipes.begin ();
- if (it->first != next_peer_id)
- break;
- ++next_peer_id;
- ++it;
- }
- }
+ // Generate a new unique peer identity.
+ unsigned char buf [5];
+ buf [0] = 0;
+ put_uint32 (buf + 1, next_peer_id);
+ blob_t identity (buf, 5);
+ ++next_peer_id;
// Add the pipe to the map out outbound pipes.
outpipe_t outpipe = {pipe_, true};
bool ok = outpipes.insert (outpipes_t::value_type (
- next_peer_id, outpipe)).second;
+ identity, outpipe)).second;
zmq_assert (ok);
// Add the pipe to the list of inbound pipes.
- pipe_->set_pipe_id (next_peer_id);
- fq.attach (pipe_);
-
- // Advance next peer ID so that if new connection is dropped shortly after
- // its creation we don't accidentally get two subsequent peers with
- // the same ID.
- ++next_peer_id;
+ pipe_->set_identity (identity);
+ fq.attach (pipe_);
}
void zmq::xrep_t::xterminated (pipe_t *pipe_)
@@ -125,30 +121,29 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
// If we have malformed message (prefix with no subsequent message)
// then just silently ignore it.
// TODO: The connections should be killed instead.
- if (msg_->flags () & msg_t::label) {
+ if (msg_->flags () & msg_t::more) {
more_out = true;
- // Find the pipe associated with the peer ID stored in the prefix.
+ // Find the pipe associated with the identity stored in the prefix.
// If there's no such pipe just silently ignore the message.
- if (msg_->size () == 4) {
- uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
- outpipes_t::iterator it = outpipes.find (peer_id);
-
- if (it != outpipes.end ()) {
- current_out = it->second.pipe;
- msg_t empty;
- int rc = empty.init ();
- errno_assert (rc == 0);
- if (!current_out->check_write (&empty)) {
- it->second.active = false;
- more_out = false;
- current_out = NULL;
- }
- rc = empty.close ();
- errno_assert (rc == 0);
+ blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
+ outpipes_t::iterator it = outpipes.find (identity);
+
+ if (it != outpipes.end ()) {
+ current_out = it->second.pipe;
+ msg_t empty;
+ int rc = empty.init ();
+ errno_assert (rc == 0);
+ if (!current_out->check_write (&empty)) {
+ it->second.active = false;
+ more_out = false;
+ current_out = NULL;
}
+ rc = empty.close ();
+ errno_assert (rc == 0);
}
+
}
int rc = msg_->close ();
@@ -159,7 +154,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
}
// Check whether this is the last part of the message.
- more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more_out = msg_->flags () & msg_t::more ? true : false;
// Push the message into the pipe. If there's no out pipe, just drop it.
if (current_out) {
@@ -189,7 +184,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
if (prefetched) {
int rc = msg_->move (prefetched_msg);
errno_assert (rc == 0);
- more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more_in = msg_->flags () & msg_t::more ? true : false;
prefetched = false;
return 0;
}
@@ -200,9 +195,40 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
if (rc != 0)
return -1;
+ // If identity is received, change the key assigned to the pipe.
+ if (unlikely (msg_->flags () & msg_t::identity)) {
+ zmq_assert (!more_in);
+
+ // Empty identity means we can preserve the auto-generated identity.
+ if (msg_->size () != 0) {
+
+ // Actual change of the identity.
+ outpipes_t::iterator it = outpipes.begin ();
+ while (it != outpipes.end ()) {
+ if (it->second.pipe == pipe) {
+ blob_t identity ((unsigned char*) msg_->data (),
+ msg_->size ());
+ pipe->set_identity (identity);
+ outpipes.erase (it);
+ outpipe_t outpipe = {pipe, true};
+ outpipes.insert (outpipes_t::value_type (identity,
+ outpipe));
+ break;
+ }
+ ++it;
+ }
+ zmq_assert (it != outpipes.end ());
+ }
+
+ // After processing the identity, try to get the next message.
+ rc = fq.recvpipe (msg_, flags_, &pipe);
+ if (rc != 0)
+ return -1;
+ }
+
// If we are in the middle of reading a message, just return the next part.
if (more_in) {
- more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more_in = msg_->flags () & msg_t::more ? true : false;
return 0;
}
@@ -213,10 +239,12 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
prefetched = true;
rc = msg_->close ();
errno_assert (rc == 0);
- rc = msg_->init_size (4);
+
+ blob_t identity = pipe->get_identity ();
+ rc = msg_->init_size (identity.size ());
errno_assert (rc == 0);
- put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ());
- msg_->set_flags (msg_t::label);
+ memcpy (msg_->data (), identity.data (), identity.size ());
+ msg_->set_flags (msg_t::more);
return 0;
}
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 562f87d..fc02b11 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -1,5 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2011 iMatix Corporation
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -26,6 +28,7 @@
#include "socket_base.hpp"
#include "session_base.hpp"
#include "stdint.hpp"
+#include "blob.hpp"
#include "msg.hpp"
#include "fq.hpp"
@@ -77,7 +80,7 @@ namespace zmq
};
// Outbound pipes indexed by the peer IDs.
- typedef std::map <uint32_t, outpipe_t> outpipes_t;
+ typedef std::map <blob_t, outpipe_t> outpipes_t;
outpipes_t outpipes;
// The pipe we are currently writing to.
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 79b3b94..91317f7 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2011 VMware, Inc.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -27,9 +28,14 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) :
{
options.type = ZMQ_XREQ;
+ // TODO: Uncomment the following line when XREQ will become true XREQ
+ // rather than generic dealer socket.
// If the socket is closing we can drop all the outbound requests. There'll
// be noone to receive the replies anyway.
- options.delay_on_close = false;
+ // options.delay_on_close = false;
+
+ options.send_identity = true;
+ options.recv_identity = true;
}
zmq::xreq_t::~xreq_t ()
@@ -50,7 +56,15 @@ int zmq::xreq_t::xsend (msg_t *msg_, int flags_)
int zmq::xreq_t::xrecv (msg_t *msg_, int flags_)
{
- return fq.recv (msg_, flags_);
+ // XREQ socket doesn't use identities. We can safely drop it and
+ while (true) {
+ int rc = fq.recv (msg_, flags_);
+ if (rc != 0)
+ return rc;
+ if (likely (!(msg_->flags () & msg_t::identity)))
+ break;
+ }
+ return 0;
}
bool zmq::xreq_t::xhas_in ()
diff --git a/src/xreq.hpp b/src/xreq.hpp
index d7e28c4..1d979c5 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -1,8 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
diff --git a/src/xsub.cpp b/src/xsub.cpp
index b24f082..aae2654 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -1,6 +1,7 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2011 VMware, Inc.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
@@ -116,7 +117,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
int rc = msg_->move (message);
errno_assert (rc == 0);
has_message = false;
- more = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more = msg_->flags () & msg_t::more ? true : false;
return 0;
}
@@ -136,14 +137,13 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (more || !options.filter || match (msg_)) {
- more =
- msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+ more = msg_->flags () & msg_t::more ? true : false;
return 0;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
- while (msg_->flags () & (msg_t::more | msg_t::label)) {
+ while (msg_->flags () & msg_t::more) {
rc = fq.recv (msg_, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
@@ -183,7 +183,7 @@ bool zmq::xsub_t::xhas_in ()
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
- while (message.flags () & (msg_t::more | msg_t::label)) {
+ while (message.flags () & msg_t::more) {
rc = fq.recv (&message, ZMQ_DONTWAIT);
zmq_assert (rc == 0);
}
diff --git a/src/xsub.hpp b/src/xsub.hpp
index 310df6e..1eac390 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -1,6 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/ypipe.hpp b/src/ypipe.hpp
index da4e85a..74a96bc 100644
--- a/src/ypipe.hpp
+++ b/src/ypipe.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/yqueue.hpp b/src/yqueue.hpp
index e436ea4..1c83cb8 100644
--- a/src/yqueue.hpp
+++ b/src/yqueue.hpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
diff --git a/src/zmq.cpp b/src/zmq.cpp
index e2bc509..b06b122 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -1,4 +1,5 @@
/*
+ Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
diff --git a/src/zmq_utils.cpp b/src/zmq_utils.cpp
index c7eb60f..8f34134 100644
--- a/src/zmq_utils.cpp
+++ b/src/zmq_utils.cpp
@@ -1,5 +1,6 @@
/*
- Copyright (c) 2007-2011 iMatix Corporation
+ Copyright (c) 2009-2011 250bpm s.r.o.
+ Copyright (c) 2007-2009 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.