summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/decoder.cpp9
-rw-r--r--src/decoder.hpp8
-rw-r--r--src/encoder.cpp10
-rw-r--r--src/encoder.hpp9
-rw-r--r--src/req.cpp20
-rw-r--r--src/req.hpp8
-rw-r--r--src/session_base.cpp20
-rw-r--r--src/session_base.hpp4
8 files changed, 74 insertions, 14 deletions
diff --git a/src/decoder.cpp b/src/decoder.cpp
index 9e93b73..d57265a 100644
--- a/src/decoder.cpp
+++ b/src/decoder.cpp
@@ -23,6 +23,7 @@
#include "decoder.hpp"
#include "session_base.hpp"
+#include "likely.hpp"
#include "wire.hpp"
#include "err.hpp"
@@ -136,8 +137,14 @@ bool zmq::decoder_t::message_ready ()
{
// Message is completely read. Push it further and start reading
// new message. (in_progress is a 0-byte message after this point.)
- if (!session || !session->write (&in_progress))
+ if (unlikely (!session))
return false;
+ int rc = session->write (&in_progress);
+ if (unlikely (rc != 0)) {
+ if (errno != EAGAIN)
+ decoding_error ();
+ return false;
+ }
next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
return true;
diff --git a/src/decoder.hpp b/src/decoder.hpp
index 01021c4..de63a09 100644
--- a/src/decoder.hpp
+++ b/src/decoder.hpp
@@ -164,10 +164,18 @@ namespace zmq
private:
+ // Where to store the read data.
unsigned char *read_pos;
+
+ // How much data to read before taking next step.
size_t to_read;
+
+ // Next step. If set to NULL, it means that associated data stream
+ // is dead. Note that there can be still data in the process in such
+ // case.
step_t next;
+ // The duffer for data to decode.
size_t bufsize;
unsigned char *buf;
diff --git a/src/encoder.cpp b/src/encoder.cpp
index 6d09384..8689e45 100644
--- a/src/encoder.cpp
+++ b/src/encoder.cpp
@@ -20,6 +20,7 @@
#include "encoder.hpp"
#include "session_base.hpp"
+#include "likely.hpp"
#include "wire.hpp"
zmq::encoder_t::encoder_t (size_t bufsize_) :
@@ -62,7 +63,14 @@ bool zmq::encoder_t::message_ready ()
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
- if (!session || !session->read (&in_progress)) {
+ if (unlikely (!session)) {
+ rc = in_progress.init ();
+ errno_assert (rc == 0);
+ return false;
+ }
+ rc = session->read (&in_progress);
+ if (unlikely (rc != 0)) {
+ errno_assert (errno == EAGAIN);
rc = in_progress.init ();
errno_assert (rc == 0);
return false;
diff --git a/src/encoder.hpp b/src/encoder.hpp
index f7e3cbc..949cbdc 100644
--- a/src/encoder.hpp
+++ b/src/encoder.hpp
@@ -142,11 +142,20 @@ namespace zmq
private:
+ // Where to get the data to write from.
unsigned char *write_pos;
+
+ // How much data to write before next step should be executed.
size_t to_write;
+
+ // Next step. If set to NULL, it means that associated data stream
+ // is dead.
step_t next;
+
+ // If true, first byte of the message is being written.
bool beginning;
+ // The buffer for encoded data.
size_t bufsize;
unsigned char *buf;
diff --git a/src/req.cpp b/src/req.cpp
index 323e058..04a19fb 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -158,3 +158,23 @@ zmq::req_session_t::~req_session_t ()
{
}
+int zmq::req_session_t::write (msg_t *msg_)
+{
+ if (state == request_id) {
+ if (msg_->flags () == msg_t::label && msg_->size () == 4) {
+ state = body;
+ return xreq_session_t::write (msg_);
+ }
+ }
+ else {
+ if (msg_->flags () == msg_t::more)
+ return xreq_session_t::write (msg_);
+ if (msg_->flags () == 0) {
+ state = request_id;
+ return xreq_session_t::write (msg_);
+ }
+ }
+ errno = EFAULT;
+ return -1;
+}
+
diff --git a/src/req.hpp b/src/req.hpp
index 2c2cbc4..0207a4f 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -67,8 +67,16 @@ namespace zmq
const char *protocol_, const char *address_);
~req_session_t ();
+ // Overloads of the functions from session_base_t.
+ int write (msg_t *msg_);
+
private:
+ enum {
+ request_id,
+ body
+ } state;
+
req_session_t (const req_session_t&);
const req_session_t &operator = (const req_session_t&);
};
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 7d4c5ab..32dcd4f 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -148,28 +148,28 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
pipe->set_event_sink (this);
}
-bool zmq::session_base_t::read (msg_t *msg_)
+int zmq::session_base_t::read (msg_t *msg_)
{
- if (!pipe)
- return false;
-
- if (!pipe->read (msg_))
- return false;
+ if (!pipe || !pipe->read (msg_)) {
+ errno = EAGAIN;
+ return -1;
+ }
incomplete_in =
msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
- return true;
+ return 0;
}
-bool zmq::session_base_t::write (msg_t *msg_)
+int zmq::session_base_t::write (msg_t *msg_)
{
if (pipe && pipe->write (msg_)) {
int rc = msg_->init ();
errno_assert (rc == 0);
- return true;
+ return 0;
}
- return false;
+ errno = EAGAIN;
+ return -1;
}
void zmq::session_base_t::flush ()
diff --git a/src/session_base.hpp b/src/session_base.hpp
index 175a11d..e388d42 100644
--- a/src/session_base.hpp
+++ b/src/session_base.hpp
@@ -48,8 +48,8 @@ namespace zmq
void attach_pipe (class pipe_t *pipe_);
// Following functions are the interface exposed towards the engine.
- bool read (msg_t *msg_);
- bool write (msg_t *msg_);
+ virtual int read (msg_t *msg_);
+ virtual int write (msg_t *msg_);
void flush ();
void detach ();