summaryrefslogtreecommitdiff
path: root/src/pipe.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-05-30 10:07:34 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-05-30 10:07:34 +0200
commit0b59866a84f733e5a53b0d2f32570581691747ef (patch)
tree8861d97915544dc4385177931f299a6f27603c92 /src/pipe.cpp
parent311fb0d852374e769d8ff791c9df38f0464960c6 (diff)
Patches from sub-forward branch incorporated
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/pipe.cpp')
-rw-r--r--src/pipe.cpp43
1 files changed, 43 insertions, 0 deletions
diff --git a/src/pipe.cpp b/src/pipe.cpp
index 48fc3e5..fd7223c 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -205,6 +205,29 @@ void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
}
}
+void zmq::pipe_t::process_hiccup (void *pipe_)
+{
+ // Destroy old outpipe. Note that the read end of the pipe was already
+ // migrated to this thread.
+ zmq_assert (outpipe);
+ outpipe->flush ();
+ msg_t msg;
+ while (outpipe->read (&msg)) {
+ int rc = msg.close ();
+ errno_assert (rc == 0);
+ }
+ delete outpipe;
+
+ // Plug in the new outpipe.
+ zmq_assert (pipe_);
+ outpipe = (upipe_t*) pipe_;
+ out_active = true;
+
+ // If appropriate, notify the user about the hiccup.
+ if (state == active)
+ sink->hiccuped (this);
+}
+
void zmq::pipe_t::process_pipe_term ()
{
// This is the simple case of peer-induced termination. If there are no
@@ -379,3 +402,23 @@ void zmq::pipe_t::delimit ()
// Delimiter in any other state is invalid.
zmq_assert (false);
}
+
+void zmq::pipe_t::hiccup ()
+{
+ // If termination is already under way do nothing.
+ if (state != active)
+ return;
+
+ // We'll drop the pointer to the inpipe. From now on, the peer is
+ // responsible for deallocating it.
+ inpipe = NULL;
+
+ // Create new inpipe.
+ inpipe = new (std::nothrow) pipe_t::upipe_t ();
+ alloc_assert (inpipe);
+ in_active = true;
+
+ // Notify the peer about the hiccup.
+ send_hiccup (peer, (void*) inpipe);
+}
+