From 858b8ad76757624464c139be2367f0dce53f8c3b Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Thu, 16 Feb 2012 10:05:18 +0900 Subject: Fix data loss for PUB/SUB and unidirectional transports (LIBZMQ-268) With the introduction of subscription forwarding, the first message sent on a PUB socket using a unidirectional transport (e.g. PGM) is always lost due to the "subscribe to all" being done asynchronously. This patch fixes the problem and also refactors the code to have a single point where the "subscribe to all" is performed. Signed-off-by: Martin Lucina --- src/pair.cpp | 2 +- src/pair.hpp | 2 +- src/pgm_sender.cpp | 10 ---------- src/pull.cpp | 2 +- src/pull.hpp | 2 +- src/push.cpp | 2 +- src/push.hpp | 2 +- src/socket_base.cpp | 12 +++++++++--- src/socket_base.hpp | 5 +++-- src/xpub.cpp | 7 ++++++- src/xpub.hpp | 2 +- src/xrep.cpp | 2 +- src/xrep.hpp | 2 +- src/xreq.cpp | 2 +- src/xreq.hpp | 2 +- src/xsub.cpp | 2 +- src/xsub.hpp | 2 +- 17 files changed, 31 insertions(+), 29 deletions(-) diff --git a/src/pair.cpp b/src/pair.cpp index aa64ce9..dd14d8d 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -36,7 +36,7 @@ xs::pair_t::~pair_t () xs_assert (!pipe); } -void xs::pair_t::xattach_pipe (pipe_t *pipe_) +void xs::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (!pipe); pipe = pipe_; diff --git a/src/pair.hpp b/src/pair.hpp index c03ee1e..8ae3acf 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -42,7 +42,7 @@ namespace xs ~pair_t (); // Overloads of functions from socket_base_t. - void xattach_pipe (xs::pipe_t *pipe_); + void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (xs::msg_t *msg_, int flags_); int xrecv (xs::msg_t *msg_, int flags_); bool xhas_in (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index f68c044..dde6284 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -92,16 +92,6 @@ void xs::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) // Set POLLOUT for downlink_socket_handle. set_pollout (handle); - - // PGM is not able to pass subscriptions upstream, thus we have no idea - // what messages are peers interested in. Because of that we have to - // subscribe for all the messages. - msg_t msg; - msg.init_size (1); - *(unsigned char*) msg.data () = 1; - int rc = session_->write (&msg); - errno_assert (rc == 0); - session_->flush (); } void xs::pgm_sender_t::unplug () diff --git a/src/pull.cpp b/src/pull.cpp index 9495e12..e168c03 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -34,7 +34,7 @@ xs::pull_t::~pull_t () { } -void xs::pull_t::xattach_pipe (pipe_t *pipe_) +void xs::pull_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); fq.attach (pipe_); diff --git a/src/pull.hpp b/src/pull.hpp index 446956c..29ca4d9 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -45,7 +45,7 @@ namespace xs protected: // Overloads of functions from socket_base_t. - void xattach_pipe (xs::pipe_t *pipe_); + void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xrecv (xs::msg_t *msg_, int flags_); bool xhas_in (); void xread_activated (xs::pipe_t *pipe_); diff --git a/src/push.cpp b/src/push.cpp index b0002e9..45f0c62 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -34,7 +34,7 @@ xs::push_t::~push_t () { } -void xs::push_t::xattach_pipe (pipe_t *pipe_) +void xs::push_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); lb.attach (pipe_); diff --git a/src/push.hpp b/src/push.hpp index f90479c..85c2279 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -45,7 +45,7 @@ namespace xs protected: // Overloads of functions from socket_base_t. - void xattach_pipe (xs::pipe_t *pipe_); + void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (xs::msg_t *msg_, int flags_); bool xhas_out (); void xwrite_activated (xs::pipe_t *pipe_); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index a321a71..f3ae291 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -206,14 +206,14 @@ int xs::socket_base_t::check_protocol (const std::string &protocol_) return 0; } -void xs::socket_base_t::attach_pipe (pipe_t *pipe_) +void xs::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_) { // First, register the pipe so that we can terminate it later on. pipe_->set_event_sink (this); pipes.push_back (pipe_); // Let the derived socket type know about new pipe. - xattach_pipe (pipe_); + xattach_pipe (pipe_, icanhasall_); // If the socket is already being closed, ask any new pipes to terminate // straight away. @@ -452,8 +452,14 @@ int xs::socket_base_t::connect (const char *addr_) rc = pipepair (parents, pipes, hwms, delays); errno_assert (rc == 0); + // PGM does not support subscription forwarding; ask for all data to be + // sent to this pipe. + bool icanhasall = false; + if (protocol == "pgm" || protocol == "epgm") + icanhasall = true; + // Attach local end of the pipe to the socket object. - attach_pipe (pipes [0]); + attach_pipe (pipes [0], icanhasall); // Attach remote end of the pipe to the session object later on. session->attach_pipe (pipes [1]); diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 01a1564..1e35ffa 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -102,7 +102,8 @@ namespace xs // Concrete algorithms for the x- methods are to be defined by // individual socket types. - virtual void xattach_pipe (xs::pipe_t *pipe_) = 0; + virtual void xattach_pipe (xs::pipe_t *pipe_, + bool icanhasall_ = false) = 0; // The default implementation assumes there are no specific socket // options for the particular socket type. If not so, overload this @@ -157,7 +158,7 @@ namespace xs int check_protocol (const std::string &protocol_); // Register the pipe with this socket. - void attach_pipe (xs::pipe_t *pipe_); + void attach_pipe (xs::pipe_t *pipe_, bool icanhasall_ = false); // Processes commands sent to this socket (if any). If timeout is -1, // returns only after at least one command was processed. diff --git a/src/xpub.cpp b/src/xpub.cpp index bf73399..b4bc135 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -37,11 +37,16 @@ xs::xpub_t::~xpub_t () { } -void xs::xpub_t::xattach_pipe (pipe_t *pipe_) +void xs::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); dist.attach (pipe_); + // If icanhasall_ is specified, the caller would like to subscribe + // to all data on this pipe, implicitly. + if (icanhasall_) + subscriptions.add (NULL, 0, pipe_); + // The pipe is active when attached. Let's read the subscriptions from // it, if any. xread_activated (pipe_); diff --git a/src/xpub.hpp b/src/xpub.hpp index 92f97a1..a26076e 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -48,7 +48,7 @@ namespace xs ~xpub_t (); // Implementations of virtual functions from socket_base_t. - void xattach_pipe (xs::pipe_t *pipe_); + void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (xs::msg_t *msg_, int flags_); bool xhas_out (); int xrecv (xs::msg_t *msg_, int flags_); diff --git a/src/xrep.cpp b/src/xrep.cpp index d65f21b..051159d 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -55,7 +55,7 @@ xs::xrep_t::~xrep_t () prefetched_msg.close (); } -void xs::xrep_t::xattach_pipe (pipe_t *pipe_) +void xs::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); diff --git a/src/xrep.hpp b/src/xrep.hpp index d73549b..d8723db 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -48,7 +48,7 @@ namespace xs ~xrep_t (); // Overloads of functions from socket_base_t. - void xattach_pipe (xs::pipe_t *pipe_); + void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (msg_t *msg_, int flags_); int xrecv (msg_t *msg_, int flags_); bool xhas_in (); diff --git a/src/xreq.cpp b/src/xreq.cpp index eaf318a..d16f60e 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -46,7 +46,7 @@ xs::xreq_t::~xreq_t () prefetched_msg.close (); } -void xs::xreq_t::xattach_pipe (pipe_t *pipe_) +void xs::xreq_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); fq.attach (pipe_); diff --git a/src/xreq.hpp b/src/xreq.hpp index 837cc8a..ab3a360 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -46,7 +46,7 @@ namespace xs protected: // Overloads of functions from socket_base_t. - void xattach_pipe (xs::pipe_t *pipe_); + void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (xs::msg_t *msg_, int flags_); int xrecv (xs::msg_t *msg_, int flags_); bool xhas_in (); diff --git a/src/xsub.cpp b/src/xsub.cpp index 7797bdf..80cc205 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -48,7 +48,7 @@ xs::xsub_t::~xsub_t () errno_assert (rc == 0); } -void xs::xsub_t::xattach_pipe (pipe_t *pipe_) +void xs::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { xs_assert (pipe_); fq.attach (pipe_); diff --git a/src/xsub.hpp b/src/xsub.hpp index 6b1e26b..f9e5210 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -45,7 +45,7 @@ namespace xs protected: // Overloads of functions from socket_base_t. - void xattach_pipe (xs::pipe_t *pipe_); + void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); int xsend (xs::msg_t *msg_, int flags_); bool xhas_out (); int xrecv (xs::msg_t *msg_, int flags_); -- cgit v1.2.3