summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-05-11 16:23:06 +0200
committerMartin Sustrik <sustrik@250bpm.com>2012-05-11 16:23:06 +0200
commite924381bae58ca828c9691b9d4d772a20a91c717 (patch)
treed65fc8ea3352e0e4fc94144b1f733b54f03f5c65
parentf7f7eb1613b405026410299f6ca6a4e340bf47c8 (diff)
Command throttling breaking HWM algorithms -- fixed.
When HWM was set to small value it may have happened that command indicating that pipe is ready for writing wasn't processed because of command throttling. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--src/socket_base.cpp15
-rw-r--r--tests/hwm.cpp35
2 files changed, 46 insertions, 4 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index ad8666a..86c01b0 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -614,11 +614,18 @@ int xs::socket_base_t::send (msg_t *msg_, int flags_)
if (unlikely (errno != EAGAIN))
return -1;
- // In case of non-blocking send we'll simply propagate
- // the error - including EAGAIN - up the stack.
int timeout = sndtimeo ();
- if (flags_ & XS_DONTWAIT || timeout == 0)
- return -1;
+ if (flags_ & XS_DONTWAIT || timeout == 0) {
+
+ // It looks like pipe is full. However, previous process_commands may
+ // have done nothing because of the throttling. Thus, let's give it
+ // last try and force commands to be processed. Then try to re-send
+ // the message.
+ rc = process_commands (0, false);
+ if (unlikely (rc != 0))
+ return -1;
+ return xsend (msg_, flags_);
+ }
// Compute the time when the timeout should occur.
// If the timeout is infite, don't care.
diff --git a/tests/hwm.cpp b/tests/hwm.cpp
index 93e8910..10afa36 100644
--- a/tests/hwm.cpp
+++ b/tests/hwm.cpp
@@ -77,5 +77,40 @@ int XS_TEST_MAIN ()
rc = xs_term (ctx);
assert (rc == 0);
+ // Following part of the tests checks whether small HWMs don't interact
+ // with command throttling in strange ways.
+
+ ctx = xs_init ();
+ assert (ctx);
+ void *s1 = xs_socket (ctx, XS_PULL);
+ assert (s1);
+ void *s2 = xs_socket (ctx, XS_PUSH);
+ assert (s2);
+
+ hwm = 5;
+ rc = xs_setsockopt (s2, XS_SNDHWM, &hwm, sizeof (hwm));
+ assert (rc == 0);
+
+ rc = xs_bind (s1, "tcp://127.0.0.1:5858");
+ assert (rc >= 0);
+ rc = xs_connect (s2, "tcp://127.0.0.1:5858");
+ assert (rc >= 0);
+
+ for (int i = 0; i < 10; i++)
+ {
+ rc = xs_send (s2, "test", 4, XS_DONTWAIT);
+ assert (rc == 4);
+ char buf [4];
+ rc = xs_recv (s1, buf, sizeof (buf), 0);
+ assert (rc == 4);
+ }
+
+ rc = xs_close (s2);
+ assert (rc == 0);
+ rc = xs_close (s1);
+ assert (rc == 0);
+ rc = xs_term (ctx);
+ assert (rc == 0);
+
return 0;
}