From eb9bc1b0648d2132e612e2237a0ace47004d6f5c Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 30 Apr 2011 06:48:18 +0200 Subject: Message atomicity problem in PUB socket fixed. Reaching the HWM caused breaking message atomicity when the flow was reestablished - initial parts of multipart messages may have been lost. Signed-off-by: Martin Sustrik --- src/dist.hpp | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) (limited to 'src/dist.hpp') diff --git a/src/dist.hpp b/src/dist.hpp index ea05305..fd522b9 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -56,24 +56,22 @@ namespace zmq // Put the message to all active pipes. void distribute (class msg_t *msg_, int flags_); - // Plug in all the delayed pipes. - void clear_new_pipes (); - // List of outbound pipes. typedef array_t pipes_t; pipes_t pipes; - // List of new pipes that were not yet inserted into 'pipes' list. - // These pipes are moves to 'pipes' list once the current multipart - // message is fully sent. This way we avoid sending incomplete messages - // to peers. - typedef std::vector new_pipes_t; - new_pipes_t new_pipes; - // Number of active pipes. All the active pipes are located at the - // beginning of the pipes array. + // beginning of the pipes array. These are the pipes the messages + // can be sent to at the moment. pipes_t::size_type active; + // Number of pipes eligible for sending messages to. This includes all + // the active pipes plus all the pipes that we can in theory send + // messages to (the HWM is not yet reached), but sending a message + // to them would result in partial message being delivered, ie. message + // with initial parts missing. + pipes_t::size_type eligible; + // True if last we are in the middle of a multipart message. bool more; -- cgit v1.2.3