diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rep.cpp | 71 | ||||
| -rw-r--r-- | src/rep.hpp | 9 | ||||
| -rw-r--r-- | src/req.cpp | 61 | ||||
| -rw-r--r-- | src/req.hpp | 9 | 
4 files changed, 101 insertions, 49 deletions
diff --git a/src/rep.cpp b/src/rep.cpp index eaeff41..881f39a 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -27,7 +27,8 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) :      socket_base_t (parent_),      active (0),      current (0), -    waiting_for_reply (false), +    sending_reply (false), +    tbc (false),      reply_pipe (NULL)  {      options.requires_in = true; @@ -58,6 +59,8 @@ void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,  void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)  { +    zmq_assert (sending_reply || !tbc || in_pipes [current] != pipe_); +      zmq_assert (pipe_);      zmq_assert (in_pipes.size () == out_pipes.size ()); @@ -90,6 +93,8 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)  void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_)  { +    zmq_assert (!sending_reply || !tbc || reply_pipe != pipe_); +      zmq_assert (pipe_);      zmq_assert (in_pipes.size () == out_pipes.size ()); @@ -98,7 +103,7 @@ void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_)      //  If the connection we've got the request from disconnects,      //  there's nowhere to send the reply. Forget about the reply pipe.      //  Once the reply is sent it will be dropped. -    if (waiting_for_reply && pipe_ == reply_pipe) +    if (sending_reply && pipe_ == reply_pipe)          reply_pipe = NULL;      //  If corresponding inpipe is still in place simply nullify the pointer @@ -157,29 +162,36 @@ int zmq::rep_t::xsetsockopt (int option_, const void *optval_,  int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)  { -    if (!waiting_for_reply) { +    if (!sending_reply) {          errno = EFSM;          return -1;      } -    //  Push message to the selected pipe. If requester have disconnected -    //  in the meantime, drop the reply. +    //  Check whether it's last part of the reply. +    tbc = msg_->flags & ZMQ_MSG_TBC; +      if (reply_pipe) { + +        //  Push message to the reply pipe.          bool written = reply_pipe->write (msg_); -        if (written) -            reply_pipe->flush (); -        else -            //  The pipe is full; just drop the reference to -            //  the message content. -            //  TODO: Tear down the underlying connection. -            zmq_msg_close (msg_); +        zmq_assert (!tbc || written); + +        //  The pipe is full... +        //  TODO: Tear down the underlying connection (?) +        zmq_assert (written);           }      else { + +        //  If the requester have disconnected in the meantime, drop the reply.          zmq_msg_close (msg_);      } -    waiting_for_reply = false; -    reply_pipe = NULL; +    //  Flush the reply to the requester. +    if (!tbc) { +        reply_pipe->flush (); +        sending_reply = false; +        reply_pipe = NULL; +    }      //  Detach the message from the data buffer.      int rc = zmq_msg_init (msg_); @@ -193,7 +205,7 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)      //  Deallocate old content of the message.      zmq_msg_close (msg_); -    if (waiting_for_reply) { +    if (sending_reply) {          errno = EFSM;          return -1;      } @@ -201,15 +213,19 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)      //  Round-robin over the pipes to get next message.      for (int count = active; count != 0; count--) {          bool fetched = in_pipes [current]->read (msg_); +        zmq_assert (!(tbc && !fetched)); +                  if (fetched) { -            reply_pipe = out_pipes [current]; -            waiting_for_reply = true; -        } -        current++; -        if (current >= active) -            current = 0; -        if (fetched) +            tbc = msg_->flags & ZMQ_MSG_TBC; +            if (!tbc) { +                reply_pipe = out_pipes [current]; +                sending_reply = true; +                current++; +                if (current >= active) +                    current = 0; +            }              return 0; +        }      }      //  No message is available. Initialise the output parameter @@ -221,9 +237,12 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)  bool zmq::rep_t::xhas_in ()  { +    if (!sending_reply && tbc) +        return true; +      for (int count = active; count != 0; count--) {          if (in_pipes [current]->check_read ()) -            return !waiting_for_reply; +            return !sending_reply;          current++;          if (current >= active)              current = 0; @@ -234,6 +253,10 @@ bool zmq::rep_t::xhas_in ()  bool zmq::rep_t::xhas_out ()  { -    return waiting_for_reply; +    if (sending_reply && tbc) +        return true; + +    //  TODO: No check for write here... +    return sending_reply;  } diff --git a/src/rep.hpp b/src/rep.hpp index 51a49a9..3ec2b53 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -64,8 +64,13 @@ namespace zmq          //  Index of the next inbound pipe to read a request from.          in_pipes_t::size_type current; -        //  If true, request was already received and reply wasn't sent yet. -        bool waiting_for_reply; +        //  If true, request was already received and reply wasn't completely +        //  sent yet. +        bool sending_reply; + +        //  True, if message processed at the moment (either sent or received) +        //  is processed only partially. +        bool tbc;          //  Pipe we are going to send reply to.          class writer_t *reply_pipe; diff --git a/src/req.cpp b/src/req.cpp index 0dfe14e..4d77de4 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -27,8 +27,9 @@ zmq::req_t::req_t (class app_thread_t *parent_) :      socket_base_t (parent_),      active (0),      current (0), -    waiting_for_reply (false), +    receiving_reply (false),      reply_pipe_active (false), +    tbc (false),      reply_pipe (NULL)  {      options.requires_in = true; @@ -56,12 +57,14 @@ void zmq::req_t::xattach_pipes (class reader_t *inpipe_,  void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)  { +    zmq_assert (!receiving_reply || !tbc || reply_pipe != pipe_); +      zmq_assert (pipe_);      zmq_assert (in_pipes.size () == out_pipes.size ());      //  TODO: The pipe we are awaiting the reply from is detached. What now?      //  Return ECONNRESET from subsequent recv? -    if (waiting_for_reply && pipe_ == reply_pipe) { +    if (receiving_reply && pipe_ == reply_pipe) {          zmq_assert (false);      } @@ -93,6 +96,8 @@ void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)  void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)  { +    zmq_assert (receiving_reply || !tbc || out_pipes [current] != pipe_); +      zmq_assert (pipe_);      zmq_assert (in_pipes.size () == out_pipes.size ()); @@ -124,7 +129,7 @@ void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)  void zmq::req_t::xkill (class reader_t *pipe_)  { -    zmq_assert (waiting_for_reply); +    zmq_assert (receiving_reply);      zmq_assert (pipe_ == reply_pipe);      reply_pipe_active = false; @@ -161,7 +166,7 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)  {      //  If we've sent a request and we still haven't got the reply,      //  we can't send another request. -    if (waiting_for_reply) { +    if (receiving_reply) {          errno = EFSM;          return -1;      } @@ -170,6 +175,7 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)          if (out_pipes [current]->check_write ())              break; +        zmq_assert (!tbc);          active--;          if (current < active) {              in_pipes.swap (current, active); @@ -187,23 +193,25 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)      //  Push message to the selected pipe.      bool written = out_pipes [current]->write (msg_);      zmq_assert (written); -    out_pipes [current]->flush (); - -    waiting_for_reply = true; -    reply_pipe = in_pipes [current]; - -    //  We can safely assume that the reply pipe is active as the last time -    //  we've used it we've read the reply and haven't tried to read from it -    //  anymore. -    reply_pipe_active = true; +    tbc = msg_->flags & ZMQ_MSG_TBC; +    if (!tbc) { +        out_pipes [current]->flush (); +        receiving_reply = true; +        reply_pipe = in_pipes [current]; + +        //  We can safely assume that the reply pipe is active as the last time +        //  we've used it we've read the reply and haven't tried to read from it +        //  anymore. +        reply_pipe_active = true; + +        //  Move to the next pipe (load-balancing). +        current = (current + 1) % active; +    }      //  Detach the message from the data buffer.      int rc = zmq_msg_init (msg_);      zmq_assert (rc == 0); -    //  Move to the next pipe (load-balancing). -    current = (current + 1) % active; -      return 0;  } @@ -213,7 +221,7 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)      zmq_msg_close (msg_);      //  If request wasn't send, we can't wait for reply. -    if (!waiting_for_reply) { +    if (!receiving_reply) {          zmq_msg_init (msg_);          errno = EFSM;          return -1; @@ -226,14 +234,22 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)          return -1;      } -    waiting_for_reply = false; -    reply_pipe = NULL; +    //  If this was last part of the reply, switch to request phase. +    tbc = msg_->flags & ZMQ_MSG_TBC; +    if (!tbc) { +        receiving_reply = false; +        reply_pipe = NULL; +    } +      return 0;  }  bool zmq::req_t::xhas_in ()  { -    if (!waiting_for_reply || !reply_pipe_active) +    if (receiving_reply && tbc) +        return true; + +    if (!receiving_reply || !reply_pipe_active)          return false;      zmq_assert (reply_pipe);     @@ -247,7 +263,10 @@ bool zmq::req_t::xhas_in ()  bool zmq::req_t::xhas_out ()  { -    if (waiting_for_reply) +    if (!receiving_reply && tbc) +        return true; + +    if (receiving_reply)          return false;      while (active > 0) { diff --git a/src/req.hpp b/src/req.hpp index d3e12b5..93dc745 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -70,12 +70,17 @@ namespace zmq          //  that's processing the request at the moment.          out_pipes_t::size_type current; -        //  If true, request was already sent and reply wasn't received yet. -        bool waiting_for_reply; +        //  If true, request was already sent and reply wasn't received yet or +        //  was raceived partially. +        bool receiving_reply;          //  True, if read can be attempted from the reply pipe.          bool reply_pipe_active; +        //  True, if message processed at the moment (either sent or received) +        //  is processed only partially. +        bool tbc; +          //  Pipe we are awaiting the reply from.          class reader_t *reply_pipe;  | 
