From 6b873d4ffdd13263f184ca046565168f0ad66a6b Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 21 Jul 2011 19:12:51 +0200 Subject: ROUTER socket blocks on SNDHWM Till now the message was droppen in such case. Signed-off-by: Martin Sustrik --- src/router.cpp | 64 +++++++++++++++++++++++++++++++--------------------------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/src/router.cpp b/src/router.cpp index 9f0dbc6..2c9ade9 100755 --- a/src/router.cpp +++ b/src/router.cpp @@ -129,40 +129,44 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) if (!more_out) { zmq_assert (!current_out); - // If we have malformed message (prefix with no subsequent message) - // then just silently ignore it. - // TODO: The connections should be killed instead. - if (msg_->flags () & msg_t::label) { - - more_out = true; - - // Find the pipe associated with the peer ID stored in the prefix. - if (unlikely (msg_->size () != 4)) { - errno = ECANTROUTE; - return -1; - } - uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ()); - outpipes_t::iterator it = outpipes.find (peer_id); - if (unlikely (it == outpipes.end ())) { - errno = ECANTROUTE; - return -1; - } - - // Check whether the pipe is available for writing. - current_out = it->second.pipe; - msg_t empty; - int rc = empty.init (); - errno_assert (rc == 0); - if (!current_out->check_write (&empty)) { - it->second.active = false; - more_out = false; - current_out = NULL; - } + // The first message part has to be label. + if (unlikely (!(msg_->flags () & msg_t::label))) { + errno = EFSM; + return -1; + } + + // Find the pipe associated with the peer ID stored in the message. + if (unlikely (msg_->size () != 4)) { + errno = ECANTROUTE; + return -1; + } + uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ()); + outpipes_t::iterator it = outpipes.find (peer_id); + if (unlikely (it == outpipes.end ())) { + errno = ECANTROUTE; + return -1; + } + + // Check whether the pipe is available for writing. + msg_t empty; + int rc = empty.init (); + errno_assert (rc == 0); + if (!it->second.pipe->check_write (&empty)) { rc = empty.close (); errno_assert (rc == 0); + it->second.active = false; + errno = EAGAIN; + return -1; } + rc = empty.close (); + errno_assert (rc == 0); - int rc = msg_->close (); + // Mark the pipe to send the message to. + current_out = it->second.pipe; + more_out = true; + + // Clean up the message object. + rc = msg_->close (); errno_assert (rc == 0); rc = msg_->init (); errno_assert (rc == 0); -- cgit v1.2.3