diff options
-rw-r--r-- | src/decoder.cpp | 9 | ||||
-rw-r--r-- | src/decoder.hpp | 8 | ||||
-rw-r--r-- | src/encoder.cpp | 10 | ||||
-rw-r--r-- | src/encoder.hpp | 9 | ||||
-rw-r--r-- | src/req.cpp | 20 | ||||
-rw-r--r-- | src/req.hpp | 8 | ||||
-rw-r--r-- | src/session_base.cpp | 20 | ||||
-rw-r--r-- | src/session_base.hpp | 4 |
8 files changed, 74 insertions, 14 deletions
diff --git a/src/decoder.cpp b/src/decoder.cpp index 9e93b73..d57265a 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -23,6 +23,7 @@ #include "decoder.hpp" #include "session_base.hpp" +#include "likely.hpp" #include "wire.hpp" #include "err.hpp" @@ -136,8 +137,14 @@ bool zmq::decoder_t::message_ready () { // Message is completely read. Push it further and start reading // new message. (in_progress is a 0-byte message after this point.) - if (!session || !session->write (&in_progress)) + if (unlikely (!session)) return false; + int rc = session->write (&in_progress); + if (unlikely (rc != 0)) { + if (errno != EAGAIN) + decoding_error (); + return false; + } next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready); return true; diff --git a/src/decoder.hpp b/src/decoder.hpp index 01021c4..de63a09 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -164,10 +164,18 @@ namespace zmq private: + // Where to store the read data. unsigned char *read_pos; + + // How much data to read before taking next step. size_t to_read; + + // Next step. If set to NULL, it means that associated data stream + // is dead. Note that there can be still data in the process in such + // case. step_t next; + // The duffer for data to decode. size_t bufsize; unsigned char *buf; diff --git a/src/encoder.cpp b/src/encoder.cpp index 6d09384..8689e45 100644 --- a/src/encoder.cpp +++ b/src/encoder.cpp @@ -20,6 +20,7 @@ #include "encoder.hpp" #include "session_base.hpp" +#include "likely.hpp" #include "wire.hpp" zmq::encoder_t::encoder_t (size_t bufsize_) : @@ -62,7 +63,14 @@ bool zmq::encoder_t::message_ready () // Note that new state is set only if write is successful. That way // unsuccessful write will cause retry on the next state machine // invocation. - if (!session || !session->read (&in_progress)) { + if (unlikely (!session)) { + rc = in_progress.init (); + errno_assert (rc == 0); + return false; + } + rc = session->read (&in_progress); + if (unlikely (rc != 0)) { + errno_assert (errno == EAGAIN); rc = in_progress.init (); errno_assert (rc == 0); return false; diff --git a/src/encoder.hpp b/src/encoder.hpp index f7e3cbc..949cbdc 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -142,11 +142,20 @@ namespace zmq private: + // Where to get the data to write from. unsigned char *write_pos; + + // How much data to write before next step should be executed. size_t to_write; + + // Next step. If set to NULL, it means that associated data stream + // is dead. step_t next; + + // If true, first byte of the message is being written. bool beginning; + // The buffer for encoded data. size_t bufsize; unsigned char *buf; diff --git a/src/req.cpp b/src/req.cpp index 323e058..04a19fb 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -158,3 +158,23 @@ zmq::req_session_t::~req_session_t () { } +int zmq::req_session_t::write (msg_t *msg_) +{ + if (state == request_id) { + if (msg_->flags () == msg_t::label && msg_->size () == 4) { + state = body; + return xreq_session_t::write (msg_); + } + } + else { + if (msg_->flags () == msg_t::more) + return xreq_session_t::write (msg_); + if (msg_->flags () == 0) { + state = request_id; + return xreq_session_t::write (msg_); + } + } + errno = EFAULT; + return -1; +} + diff --git a/src/req.hpp b/src/req.hpp index 2c2cbc4..0207a4f 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -67,8 +67,16 @@ namespace zmq const char *protocol_, const char *address_); ~req_session_t (); + // Overloads of the functions from session_base_t. + int write (msg_t *msg_); + private: + enum { + request_id, + body + } state; + req_session_t (const req_session_t&); const req_session_t &operator = (const req_session_t&); }; diff --git a/src/session_base.cpp b/src/session_base.cpp index 7d4c5ab..32dcd4f 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -148,28 +148,28 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_) pipe->set_event_sink (this); } -bool zmq::session_base_t::read (msg_t *msg_) +int zmq::session_base_t::read (msg_t *msg_) { - if (!pipe) - return false; - - if (!pipe->read (msg_)) - return false; + if (!pipe || !pipe->read (msg_)) { + errno = EAGAIN; + return -1; + } incomplete_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; - return true; + return 0; } -bool zmq::session_base_t::write (msg_t *msg_) +int zmq::session_base_t::write (msg_t *msg_) { if (pipe && pipe->write (msg_)) { int rc = msg_->init (); errno_assert (rc == 0); - return true; + return 0; } - return false; + errno = EAGAIN; + return -1; } void zmq::session_base_t::flush () diff --git a/src/session_base.hpp b/src/session_base.hpp index 175a11d..e388d42 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -48,8 +48,8 @@ namespace zmq void attach_pipe (class pipe_t *pipe_); // Following functions are the interface exposed towards the engine. - bool read (msg_t *msg_); - bool write (msg_t *msg_); + virtual int read (msg_t *msg_); + virtual int write (msg_t *msg_); void flush (); void detach (); |