summaryrefslogtreecommitdiff
path: root/src/pipe.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/pipe.cpp')
-rw-r--r--src/pipe.cpp43
1 files changed, 43 insertions, 0 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 48fc3e5..fd7223c 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -205,6 +205,29 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
}
}
+void zmq::pipe_t::process_hiccup (void *pipe_)
+{
+ // Destroy old outpipe. Note that the read end of the pipe was already
+ // migrated to this thread.
+ zmq_assert (outpipe);
+ outpipe->flush ();
+ msg_t msg;
+ while (outpipe->read (&msg)) {
+ int rc = msg.close ();
+ errno_assert (rc == 0);
+ }
+ delete outpipe;
+
+ // Plug in the new outpipe.
+ zmq_assert (pipe_);
+ outpipe = (upipe_t*) pipe_;
+ out_active = true;
+
+ // If appropriate, notify the user about the hiccup.
+ if (state == active)
+ sink->hiccuped (this);
+}
+
void zmq::pipe_t::process_pipe_term ()
{
// This is the simple case of peer-induced termination. If there are no
@@ -379,3 +402,23 @@ void zmq::pipe_t::delimit ()
// Delimiter in any other state is invalid.
zmq_assert (false);
}
+
+void zmq::pipe_t::hiccup ()
+{
+ // If termination is already under way do nothing.
+ if (state != active)
+ return;
+
+ // We'll drop the pointer to the inpipe. From now on, the peer is
+ // responsible for deallocating it.
+ inpipe = NULL;
+
+ // Create new inpipe.
+ inpipe = new (std::nothrow) pipe_t::upipe_t ();
+ alloc_assert (inpipe);
+ in_active = true;
+
+ // Notify the peer about the hiccup.
+ send_hiccup (peer, (void*) inpipe);
+}
+