diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2011-03-20 11:50:51 +0100 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2011-03-20 11:50:51 +0100 |
commit | 92c7c18367f91c6341fc617026f5e25000466b05 (patch) | |
tree | f4c8714fa17ecb6c22906757153533fc41d1bdbd /src/dist.hpp | |
parent | fac9c2da56073d15cfe25ea2bb5833cb5b9cfff6 (diff) |
Message atomicity problem solved in PUB socket
When new peer connects to a PUB socket while it is in the middle
of sending of multi-part messages, it gets just the remaining
part of the message, i.e. message atomicity is broken.
This patch drops the tail part of the message and starts sending
to the peer only when new message is started.
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/dist.hpp')
-rw-r--r-- | src/dist.hpp | 15 |
1 files changed, 15 insertions, 0 deletions
diff --git a/src/dist.hpp b/src/dist.hpp index 7eef4ad..ad9767a 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -21,6 +21,8 @@ #ifndef __ZMQ_DIST_HPP_INCLUDED__ #define __ZMQ_DIST_HPP_INCLUDED__ +#include <vector> + #include "array.hpp" #include "pipe.hpp" @@ -51,10 +53,23 @@ namespace zmq // fails. In such a case false is returned. bool write (class writer_t *pipe_, zmq_msg_t *msg_); + // Put the message to all active pipes. + void distribute (zmq_msg_t *msg_, int flags_); + + // Plug in all the delayed pipes. + void clear_new_pipes (); + // List of outbound pipes. typedef array_t <class writer_t> pipes_t; pipes_t pipes; + // List of new pipes that were not yet inserted into 'pipes' list. + // These pipes are moves to 'pipes' list once the current multipart + // message is fully sent. This way we avoid sending incomplete messages + // to peers. + typedef std::vector <class writer_t*> new_pipes_t; + new_pipes_t new_pipes; + // Number of active pipes. All the active pipes are located at the // beginning of the pipes array. pipes_t::size_type active; |