summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
authorMartin Lucina <martin@lucina.net>2012-02-16 10:05:18 +0900
committerMartin Sustrik <sustrik@250bpm.com>2012-02-16 10:05:18 +0900
commit858b8ad76757624464c139be2367f0dce53f8c3b (patch)
treebf174496a722d151d33e861f9bdd43a199007b1d /src/socket_base.cpp
parentcfba1f07987434263843f4aaee11ec088ec6ced3 (diff)
Fix data loss for PUB/SUB and unidirectional transports (LIBZMQ-268)
With the introduction of subscription forwarding, the first message sent on a PUB socket using a unidirectional transport (e.g. PGM) is always lost due to the "subscribe to all" being done asynchronously. This patch fixes the problem and also refactors the code to have a single point where the "subscribe to all" is performed. Signed-off-by: Martin Lucina <martin@lucina.net>
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp12
1 files changed, 9 insertions, 3 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index a321a71..f3ae291 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -206,14 +206,14 @@ int xs::socket_base_t::check_protocol (const std::string &protocol_)
return 0;
}
-void xs::socket_base_t::attach_pipe (pipe_t *pipe_)
+void xs::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_)
{
// First, register the pipe so that we can terminate it later on.
pipe_->set_event_sink (this);
pipes.push_back (pipe_);
// Let the derived socket type know about new pipe.
- xattach_pipe (pipe_);
+ xattach_pipe (pipe_, icanhasall_);
// If the socket is already being closed, ask any new pipes to terminate
// straight away.
@@ -452,8 +452,14 @@ int xs::socket_base_t::connect (const char *addr_)
rc = pipepair (parents, pipes, hwms, delays);
errno_assert (rc == 0);
+ // PGM does not support subscription forwarding; ask for all data to be
+ // sent to this pipe.
+ bool icanhasall = false;
+ if (protocol == "pgm" || protocol == "epgm")
+ icanhasall = true;
+
// Attach local end of the pipe to the socket object.
- attach_pipe (pipes [0]);
+ attach_pipe (pipes [0], icanhasall);
// Attach remote end of the pipe to the session object later on.
session->attach_pipe (pipes [1]);