diff options
author | Martin Lucina <martin@lucina.net> | 2012-02-16 10:05:18 +0900 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2012-02-16 10:05:18 +0900 |
commit | 858b8ad76757624464c139be2367f0dce53f8c3b (patch) | |
tree | bf174496a722d151d33e861f9bdd43a199007b1d /src/socket_base.cpp | |
parent | cfba1f07987434263843f4aaee11ec088ec6ced3 (diff) |
Fix data loss for PUB/SUB and unidirectional transports (LIBZMQ-268)
With the introduction of subscription forwarding, the first message
sent on a PUB socket using a unidirectional transport (e.g. PGM) is
always lost due to the "subscribe to all" being done asynchronously.
This patch fixes the problem and also refactors the code to have a
single point where the "subscribe to all" is performed.
Signed-off-by: Martin Lucina <martin@lucina.net>
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r-- | src/socket_base.cpp | 12 |
1 files changed, 9 insertions, 3 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp index a321a71..f3ae291 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -206,14 +206,14 @@ int xs::socket_base_t::check_protocol (const std::string &protocol_) return 0; } -void xs::socket_base_t::attach_pipe (pipe_t *pipe_) +void xs::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_) { // First, register the pipe so that we can terminate it later on. pipe_->set_event_sink (this); pipes.push_back (pipe_); // Let the derived socket type know about new pipe. - xattach_pipe (pipe_); + xattach_pipe (pipe_, icanhasall_); // If the socket is already being closed, ask any new pipes to terminate // straight away. @@ -452,8 +452,14 @@ int xs::socket_base_t::connect (const char *addr_) rc = pipepair (parents, pipes, hwms, delays); errno_assert (rc == 0); + // PGM does not support subscription forwarding; ask for all data to be + // sent to this pipe. + bool icanhasall = false; + if (protocol == "pgm" || protocol == "epgm") + icanhasall = true; + // Attach local end of the pipe to the socket object. - attach_pipe (pipes [0]); + attach_pipe (pipes [0], icanhasall); // Attach remote end of the pipe to the session object later on. session->attach_pipe (pipes [1]); |