summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/decoder.cpp26
-rw-r--r--src/decoder.hpp23
-rw-r--r--src/zmq_engine.cpp26
3 files changed, 53 insertions, 22 deletions
diff --git a/src/decoder.cpp b/src/decoder.cpp
index 131ee24..1217193 100644
--- a/src/decoder.cpp
+++ b/src/decoder.cpp
@@ -54,16 +54,22 @@ bool zmq::decoder_t::one_byte_size_ready ()
next_step (tmpbuf, 8, &decoder_t::eight_byte_size_ready);
else {
- // TODO: Handle over-sized message decently.
-
// There has to be at least one byte (the flags) in the message).
- zmq_assert (*tmpbuf > 0);
+ if (!*tmpbuf) {
+ decoding_error ();
+ return false;
+ }
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1);
+ if (rc != 0 && errno == ENOMEM) {
+ decoding_error ();
+ return false;
+ }
errno_assert (rc == 0);
+
next_step (tmpbuf, 1, &decoder_t::flags_ready);
}
return true;
@@ -75,19 +81,23 @@ bool zmq::decoder_t::eight_byte_size_ready ()
// read the message data into it.
size_t size = (size_t) get_uint64 (tmpbuf);
- // TODO: Handle over-sized message decently.
-
// There has to be at least one byte (the flags) in the message).
- zmq_assert (size > 0);
-
+ if (!size) {
+ decoding_error ();
+ return false;
+ }
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, size - 1);
+ if (rc != 0 && errno == ENOMEM) {
+ decoding_error ();
+ return false;
+ }
errno_assert (rc == 0);
- next_step (tmpbuf, 1, &decoder_t::flags_ready);
+ next_step (tmpbuf, 1, &decoder_t::flags_ready);
return true;
}
diff --git a/src/decoder.hpp b/src/decoder.hpp
index 87982a0..ab7d454 100644
--- a/src/decoder.hpp
+++ b/src/decoder.hpp
@@ -98,9 +98,13 @@ namespace zmq
read_pos += size_;
to_read -= size_;
- while (!to_read)
- if (!(static_cast <T*> (this)->*next) ())
+ while (!to_read) {
+ if (!(static_cast <T*> (this)->*next) ()) {
+ if (unlikely (!(static_cast <T*> (this)->next)))
+ return (size_t) -1;
return size_;
+ }
+ }
return size_;
}
@@ -109,9 +113,13 @@ namespace zmq
// Try to get more space in the message to fill in.
// If none is available, return.
- while (!to_read)
- if (!(static_cast <T*> (this)->*next) ())
+ while (!to_read) {
+ if (!(static_cast <T*> (this)->*next) ()) {
+ if (unlikely (!(static_cast <T*> (this)->next)))
+ return (size_t) -1;
return pos;
+ }
+ }
// If there are no more data in the buffer, return.
if (pos == size_)
@@ -142,6 +150,13 @@ namespace zmq
next = next_;
}
+ // This function should be called from the derived class to
+ // abort decoder state machine.
+ inline void decoding_error ()
+ {
+ next = NULL;
+ }
+
private:
unsigned char *read_pos;
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 815697c..761f6fe 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -119,18 +119,24 @@ void zmq::zmq_engine_t::in_event ()
// Push the data to the decoder.
size_t processed = decoder.process_buffer (inpos, insize);
- // Stop polling for input if we got stuck.
- if (processed < insize) {
-
- // This may happen if queue limits are in effect or when
- // init object reads all required information from the socket
- // and rejects to read more data.
- reset_pollin (handle);
+ if (unlikely (processed == (size_t) -1)) {
+ disconnection = true;
}
+ else {
+
+ // Stop polling for input if we got stuck.
+ if (processed < insize) {
- // Adjust the buffer.
- inpos += processed;
- insize -= processed;
+ // This may happen if queue limits are in effect or when
+ // init object reads all required information from the socket
+ // and rejects to read more data.
+ reset_pollin (handle);
+ }
+
+ // Adjust the buffer.
+ inpos += processed;
+ insize -= processed;
+ }
// Flush all messages the decoder may have produced.
inout->flush ();