summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2011-09-16 09:29:43 +0200
committerMartin Sustrik <sustrik@250bpm.com>2011-09-16 09:29:43 +0200
commit06bdf2c4f96a6324b3fe667cebb03d44cd100a73 (patch)
treee7678e0f861ae538fe03c75484d708042f62659d
parentf78d9b6bfca13e298c29fadabbbc870b37a0a573 (diff)
Check message syntax in REQ asynchronously
This patch adds support for checking messages as they arrive (as opposed to when they are recv'd by the user) and drop the connection if they are malformed. It also uses this new feature to check for validity of inbound messages in REQ socket. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--src/decoder.cpp9
-rw-r--r--src/decoder.hpp8
-rw-r--r--src/encoder.cpp10
-rw-r--r--src/encoder.hpp9
-rw-r--r--src/req.cpp20
-rw-r--r--src/req.hpp8
-rw-r--r--src/session_base.cpp20
-rw-r--r--src/session_base.hpp4
8 files changed, 74 insertions, 14 deletions
diff --git a/src/decoder.cpp b/src/decoder.cpp
index 9e93b73..d57265a 100644
--- a/src/decoder.cpp
+++ b/src/decoder.cpp
@@ -23,6 +23,7 @@
#include "decoder.hpp"
#include "session_base.hpp"
+#include "likely.hpp"
#include "wire.hpp"
#include "err.hpp"
@@ -136,8 +137,14 @@ bool zmq::decoder_t::message_ready ()
{
// Message is completely read. Push it further and start reading
// new message. (in_progress is a 0-byte message after this point.)
- if (!session || !session->write (&in_progress))
+ if (unlikely (!session))
return false;
+ int rc = session->write (&in_progress);
+ if (unlikely (rc != 0)) {
+ if (errno != EAGAIN)
+ decoding_error ();
+ return false;
+ }
next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
return true;
diff --git a/src/decoder.hpp b/src/decoder.hpp
index 01021c4..de63a09 100644
--- a/src/decoder.hpp
+++ b/src/decoder.hpp
@@ -164,10 +164,18 @@ namespace zmq
private:
+ // Where to store the read data.
unsigned char *read_pos;
+
+ // How much data to read before taking next step.
size_t to_read;
+
+ // Next step. If set to NULL, it means that associated data stream
+ // is dead. Note that there can be still data in the process in such
+ // case.
step_t next;
+ // The duffer for data to decode.
size_t bufsize;
unsigned char *buf;
diff --git a/src/encoder.cpp b/src/encoder.cpp
index 6d09384..8689e45 100644
--- a/src/encoder.cpp
+++ b/src/encoder.cpp
@@ -20,6 +20,7 @@
#include "encoder.hpp"
#include "session_base.hpp"
+#include "likely.hpp"
#include "wire.hpp"
zmq::encoder_t::encoder_t (size_t bufsize_) :
@@ -62,7 +63,14 @@ bool zmq::encoder_t::message_ready ()
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
- if (!session || !session->read (&in_progress)) {
+ if (unlikely (!session)) {
+ rc = in_progress.init ();
+ errno_assert (rc == 0);
+ return false;
+ }
+ rc = session->read (&in_progress);
+ if (unlikely (rc != 0)) {
+ errno_assert (errno == EAGAIN);
rc = in_progress.init ();
errno_assert (rc == 0);
return false;
diff --git a/src/encoder.hpp b/src/encoder.hpp
index f7e3cbc..949cbdc 100644
--- a/src/encoder.hpp
+++ b/src/encoder.hpp
@@ -142,11 +142,20 @@ namespace zmq
private:
+ // Where to get the data to write from.
unsigned char *write_pos;
+
+ // How much data to write before next step should be executed.
size_t to_write;
+
+ // Next step. If set to NULL, it means that associated data stream
+ // is dead.
step_t next;
+
+ // If true, first byte of the message is being written.
bool beginning;
+ // The buffer for encoded data.
size_t bufsize;
unsigned char *buf;
diff --git a/src/req.cpp b/src/req.cpp
index 323e058..04a19fb 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -158,3 +158,23 @@ zmq::req_session_t::~req_session_t ()
{
}
+int zmq::req_session_t::write (msg_t *msg_)
+{
+ if (state == request_id) {
+ if (msg_->flags () == msg_t::label && msg_->size () == 4) {
+ state = body;
+ return xreq_session_t::write (msg_);
+ }
+ }
+ else {
+ if (msg_->flags () == msg_t::more)
+ return xreq_session_t::write (msg_);
+ if (msg_->flags () == 0) {
+ state = request_id;
+ return xreq_session_t::write (msg_);
+ }
+ }
+ errno = EFAULT;
+ return -1;
+}
+
diff --git a/src/req.hpp b/src/req.hpp
index 2c2cbc4..0207a4f 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -67,8 +67,16 @@ namespace zmq
const char *protocol_, const char *address_);
~req_session_t ();
+ // Overloads of the functions from session_base_t.
+ int write (msg_t *msg_);
+
private:
+ enum {
+ request_id,
+ body
+ } state;
+
req_session_t (const req_session_t&);
const req_session_t &operator = (const req_session_t&);
};
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 7d4c5ab..32dcd4f 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -148,28 +148,28 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
pipe->set_event_sink (this);
}
-bool zmq::session_base_t::read (msg_t *msg_)
+int zmq::session_base_t::read (msg_t *msg_)
{
- if (!pipe)
- return false;
-
- if (!pipe->read (msg_))
- return false;
+ if (!pipe || !pipe->read (msg_)) {
+ errno = EAGAIN;
+ return -1;
+ }
incomplete_in =
msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
- return true;
+ return 0;
}
-bool zmq::session_base_t::write (msg_t *msg_)
+int zmq::session_base_t::write (msg_t *msg_)
{
if (pipe && pipe->write (msg_)) {
int rc = msg_->init ();
errno_assert (rc == 0);
- return true;
+ return 0;
}
- return false;
+ errno = EAGAIN;
+ return -1;
}
void zmq::session_base_t::flush ()
diff --git a/src/session_base.hpp b/src/session_base.hpp
index 175a11d..e388d42 100644
--- a/src/session_base.hpp
+++ b/src/session_base.hpp
@@ -48,8 +48,8 @@ namespace zmq
void attach_pipe (class pipe_t *pipe_);
// Following functions are the interface exposed towards the engine.
- bool read (msg_t *msg_);
- bool write (msg_t *msg_);
+ virtual int read (msg_t *msg_);
+ virtual int write (msg_t *msg_);
void flush ();
void detach ();