summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-07-21 19:12:51 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-07-21 19:12:51 +0200
commit6b873d4ffdd13263f184ca046565168f0ad66a6b (patch)
tree834cf7a728869eba136a29e304e64038d91683fd
parenta1e09facb2438f6487b32cdcfff21f0ece735460 (diff)
ROUTER socket blocks on SNDHWM
Till now the message was droppen in such case. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rwxr-xr-xsrc/router.cpp64
1 files changed, 34 insertions, 30 deletions
diff --git a/src/router.cpp b/src/router.cpp
index 9f0dbc6..2c9ade9 100755
--- a/src/router.cpp
+++ b/src/router.cpp
@@ -129,40 +129,44 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
if (!more_out) {
zmq_assert (!current_out);
- // If we have malformed message (prefix with no subsequent message)
- // then just silently ignore it.
- // TODO: The connections should be killed instead.
- if (msg_->flags () & msg_t::label) {
-
- more_out = true;
-
- // Find the pipe associated with the peer ID stored in the prefix.
- if (unlikely (msg_->size () != 4)) {
- errno = ECANTROUTE;
- return -1;
- }
- uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
- outpipes_t::iterator it = outpipes.find (peer_id);
- if (unlikely (it == outpipes.end ())) {
- errno = ECANTROUTE;
- return -1;
- }
-
- // Check whether the pipe is available for writing.
- current_out = it->second.pipe;
- msg_t empty;
- int rc = empty.init ();
- errno_assert (rc == 0);
- if (!current_out->check_write (&empty)) {
- it->second.active = false;
- more_out = false;
- current_out = NULL;
- }
+ // The first message part has to be label.
+ if (unlikely (!(msg_->flags () & msg_t::label))) {
+ errno = EFSM;
+ return -1;
+ }
+
+ // Find the pipe associated with the peer ID stored in the message.
+ if (unlikely (msg_->size () != 4)) {
+ errno = ECANTROUTE;
+ return -1;
+ }
+ uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
+ outpipes_t::iterator it = outpipes.find (peer_id);
+ if (unlikely (it == outpipes.end ())) {
+ errno = ECANTROUTE;
+ return -1;
+ }
+
+ // Check whether the pipe is available for writing.
+ msg_t empty;
+ int rc = empty.init ();
+ errno_assert (rc == 0);
+ if (!it->second.pipe->check_write (&empty)) {
rc = empty.close ();
errno_assert (rc == 0);
+ it->second.active = false;
+ errno = EAGAIN;
+ return -1;
}
+ rc = empty.close ();
+ errno_assert (rc == 0);
- int rc = msg_->close ();
+ // Mark the pipe to send the message to.
+ current_out = it->second.pipe;
+ more_out = true;
+
+ // Clean up the message object.
+ rc = msg_->close ();
errno_assert (rc == 0);
rc = msg_->init ();
errno_assert (rc == 0);