From e924381bae58ca828c9691b9d4d772a20a91c717 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 11 May 2012 16:23:06 +0200 Subject: Command throttling breaking HWM algorithms -- fixed. When HWM was set to small value it may have happened that command indicating that pipe is ready for writing wasn't processed because of command throttling. Signed-off-by: Martin Sustrik --- src/socket_base.cpp | 15 +++++++++++---- tests/hwm.cpp | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/src/socket_base.cpp b/src/socket_base.cpp index ad8666a..86c01b0 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -614,11 +614,18 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_) if (unlikely (errno != EAGAIN)) return -1; - // In case of non-blocking send we'll simply propagate - // the error - including EAGAIN - up the stack. int timeout = sndtimeo (); - if (flags_ & XS_DONTWAIT || timeout == 0) - return -1; + if (flags_ & XS_DONTWAIT || timeout == 0) { + + // It looks like pipe is full. However, previous process_commands may + // have done nothing because of the throttling. Thus, let's give it + // last try and force commands to be processed. Then try to re-send + // the message. + rc = process_commands (0, false); + if (unlikely (rc != 0)) + return -1; + return xsend (msg_, flags_); + } // Compute the time when the timeout should occur. // If the timeout is infite, don't care. diff --git a/tests/hwm.cpp b/tests/hwm.cpp index 93e8910..10afa36 100644 --- a/tests/hwm.cpp +++ b/tests/hwm.cpp @@ -74,6 +74,41 @@ int XS_TEST_MAIN () rc = xs_close (sb); assert (rc == 0); + rc = xs_term (ctx); + assert (rc == 0); + + // Following part of the tests checks whether small HWMs don't interact + // with command throttling in strange ways. + + ctx = xs_init (); + assert (ctx); + void *s1 = xs_socket (ctx, XS_PULL); + assert (s1); + void *s2 = xs_socket (ctx, XS_PUSH); + assert (s2); + + hwm = 5; + rc = xs_setsockopt (s2, XS_SNDHWM, &hwm, sizeof (hwm)); + assert (rc == 0); + + rc = xs_bind (s1, "tcp://127.0.0.1:5858"); + assert (rc >= 0); + rc = xs_connect (s2, "tcp://127.0.0.1:5858"); + assert (rc >= 0); + + for (int i = 0; i < 10; i++) + { + rc = xs_send (s2, "test", 4, XS_DONTWAIT); + assert (rc == 4); + char buf [4]; + rc = xs_recv (s1, buf, sizeof (buf), 0); + assert (rc == 4); + } + + rc = xs_close (s2); + assert (rc == 0); + rc = xs_close (s1); + assert (rc == 0); rc = xs_term (ctx); assert (rc == 0); -- cgit v1.2.3