diff options
Diffstat (limited to 'src/pipe.cpp')
-rw-r--r-- | src/pipe.cpp | 43 |
1 files changed, 43 insertions, 0 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp index 48fc3e5..fd7223c 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -205,6 +205,29 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) } } +void zmq::pipe_t::process_hiccup (void *pipe_) +{ + // Destroy old outpipe. Note that the read end of the pipe was already + // migrated to this thread. + zmq_assert (outpipe); + outpipe->flush (); + msg_t msg; + while (outpipe->read (&msg)) { + int rc = msg.close (); + errno_assert (rc == 0); + } + delete outpipe; + + // Plug in the new outpipe. + zmq_assert (pipe_); + outpipe = (upipe_t*) pipe_; + out_active = true; + + // If appropriate, notify the user about the hiccup. + if (state == active) + sink->hiccuped (this); +} + void zmq::pipe_t::process_pipe_term () { // This is the simple case of peer-induced termination. If there are no @@ -379,3 +402,23 @@ void zmq::pipe_t::delimit () // Delimiter in any other state is invalid. zmq_assert (false); } + +void zmq::pipe_t::hiccup () +{ + // If termination is already under way do nothing. + if (state != active) + return; + + // We'll drop the pointer to the inpipe. From now on, the peer is + // responsible for deallocating it. + inpipe = NULL; + + // Create new inpipe. + inpipe = new (std::nothrow) pipe_t::upipe_t (); + alloc_assert (inpipe); + in_active = true; + + // Notify the peer about the hiccup. + send_hiccup (peer, (void*) inpipe); +} + |