diff options
| -rw-r--r-- | src/decoder.cpp | 9 | ||||
| -rw-r--r-- | src/decoder.hpp | 8 | ||||
| -rw-r--r-- | src/encoder.cpp | 10 | ||||
| -rw-r--r-- | src/encoder.hpp | 9 | ||||
| -rw-r--r-- | src/req.cpp | 20 | ||||
| -rw-r--r-- | src/req.hpp | 8 | ||||
| -rw-r--r-- | src/session_base.cpp | 20 | ||||
| -rw-r--r-- | src/session_base.hpp | 4 | 
8 files changed, 74 insertions, 14 deletions
diff --git a/src/decoder.cpp b/src/decoder.cpp index 9e93b73..d57265a 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -23,6 +23,7 @@  #include "decoder.hpp"  #include "session_base.hpp" +#include "likely.hpp"  #include "wire.hpp"  #include "err.hpp" @@ -136,8 +137,14 @@ bool zmq::decoder_t::message_ready ()  {      //  Message is completely read. Push it further and start reading      //  new message. (in_progress is a 0-byte message after this point.) -    if (!session || !session->write (&in_progress)) +    if (unlikely (!session))          return false; +    int rc = session->write (&in_progress); +    if (unlikely (rc != 0)) { +        if (errno != EAGAIN) +            decoding_error (); +        return false; +    }      next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);      return true; diff --git a/src/decoder.hpp b/src/decoder.hpp index 01021c4..de63a09 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -164,10 +164,18 @@ namespace zmq      private: +        //  Where to store the read data.          unsigned char *read_pos; + +        //  How much data to read before taking next step.          size_t to_read; + +        //  Next step. If set to NULL, it means that associated data stream +        //  is dead. Note that there can be still data in the process in such +        //  case.          step_t next; +        //  The duffer for data to decode.          size_t bufsize;          unsigned char *buf; diff --git a/src/encoder.cpp b/src/encoder.cpp index 6d09384..8689e45 100644 --- a/src/encoder.cpp +++ b/src/encoder.cpp @@ -20,6 +20,7 @@  #include "encoder.hpp"  #include "session_base.hpp" +#include "likely.hpp"  #include "wire.hpp"  zmq::encoder_t::encoder_t (size_t bufsize_) : @@ -62,7 +63,14 @@ bool zmq::encoder_t::message_ready ()      //  Note that new state is set only if write is successful. That way      //  unsuccessful write will cause retry on the next state machine      //  invocation. -    if (!session || !session->read (&in_progress)) { +    if (unlikely (!session)) { +        rc = in_progress.init (); +        errno_assert (rc == 0); +        return false; +    } +    rc = session->read (&in_progress); +    if (unlikely (rc != 0)) { +        errno_assert (errno == EAGAIN);          rc = in_progress.init ();          errno_assert (rc == 0);          return false; diff --git a/src/encoder.hpp b/src/encoder.hpp index f7e3cbc..949cbdc 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -142,11 +142,20 @@ namespace zmq      private: +        //  Where to get the data to write from.          unsigned char *write_pos; + +        //  How much data to write before next step should be executed.          size_t to_write; + +        //  Next step. If set to NULL, it means that associated data stream +        //  is dead.          step_t next; + +        //  If true, first byte of the message is being written.          bool beginning; +        //  The buffer for encoded data.          size_t bufsize;          unsigned char *buf; diff --git a/src/req.cpp b/src/req.cpp index 323e058..04a19fb 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -158,3 +158,23 @@ zmq::req_session_t::~req_session_t ()  {  } +int zmq::req_session_t::write (msg_t *msg_) +{ +    if (state == request_id) { +        if (msg_->flags () == msg_t::label && msg_->size () == 4) { +            state = body; +            return xreq_session_t::write (msg_); +        } +    } +    else { +        if (msg_->flags () == msg_t::more) +            return xreq_session_t::write (msg_); +        if (msg_->flags () == 0) { +            state = request_id; +            return xreq_session_t::write (msg_); +        } +    } +    errno = EFAULT; +    return -1; +} + diff --git a/src/req.hpp b/src/req.hpp index 2c2cbc4..0207a4f 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -67,8 +67,16 @@ namespace zmq              const char *protocol_, const char *address_);          ~req_session_t (); +        //  Overloads of the functions from session_base_t. +        int write (msg_t *msg_); +      private: +        enum { +            request_id, +            body +        } state; +          req_session_t (const req_session_t&);          const req_session_t &operator = (const req_session_t&);      }; diff --git a/src/session_base.cpp b/src/session_base.cpp index 7d4c5ab..32dcd4f 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -148,28 +148,28 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)      pipe->set_event_sink (this);  } -bool zmq::session_base_t::read (msg_t *msg_) +int zmq::session_base_t::read (msg_t *msg_)  { -    if (!pipe) -        return false; - -    if (!pipe->read (msg_)) -        return false; +    if (!pipe || !pipe->read (msg_)) { +        errno = EAGAIN; +        return -1; +    }      incomplete_in =          msg_->flags () & (msg_t::more | msg_t::label) ? true : false; -    return true; +    return 0;  } -bool zmq::session_base_t::write (msg_t *msg_) +int zmq::session_base_t::write (msg_t *msg_)  {      if (pipe && pipe->write (msg_)) {          int rc = msg_->init ();          errno_assert (rc == 0); -        return true; +        return 0;      } -    return false; +    errno = EAGAIN; +    return -1;  }  void zmq::session_base_t::flush () diff --git a/src/session_base.hpp b/src/session_base.hpp index 175a11d..e388d42 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -48,8 +48,8 @@ namespace zmq          void attach_pipe (class pipe_t *pipe_);          //  Following functions are the interface exposed towards the engine. -        bool read (msg_t *msg_); -        bool write (msg_t *msg_); +        virtual int read (msg_t *msg_); +        virtual int write (msg_t *msg_);          void flush ();          void detach ();  | 
