diff options
| -rw-r--r-- | src/rep.cpp | 263 | ||||
| -rw-r--r-- | src/rep.hpp | 49 | 
2 files changed, 45 insertions, 267 deletions
diff --git a/src/rep.cpp b/src/rep.cpp index 7636d13..b2ada66 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -21,175 +21,36 @@  #include "rep.hpp"  #include "err.hpp" -#include "pipe.hpp"  zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t slot_) : -    socket_base_t (parent_, slot_), -    active (0), -    current (0), +    xrep_t (parent_, slot_),      sending_reply (false), -    more (false), -    reply_pipe (NULL) +    request_begins (true)  { -    options.requires_in = true; -    options.requires_out = true; - -    //  We don't need immediate connect. We'll be able to send messages -    //  (replies) only when connection is established and thus requests -    //  can arrive anyway. -    options.immediate_connect = false;  }  zmq::rep_t::~rep_t ()  { -    zmq_assert (in_pipes.empty ()); -    zmq_assert (out_pipes.empty ()); -} - -void zmq::rep_t::xattach_pipes (class reader_t *inpipe_, -    class writer_t *outpipe_, const blob_t &peer_identity_) -{ -    zmq_assert (inpipe_ && outpipe_); -    zmq_assert (in_pipes.size () == out_pipes.size ()); - -    inpipe_->set_event_sink (this); -    in_pipes.push_back (inpipe_); -    in_pipes.swap (active, in_pipes.size () - 1); - -    outpipe_->set_event_sink (this); -    out_pipes.push_back (outpipe_); -    out_pipes.swap (active, out_pipes.size () - 1); - -    active++; -} - -void zmq::rep_t::xterm_pipes () -{ -    for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++) -        in_pipes [i]->terminate (); -    for (out_pipes_t::size_type i = 0; i != out_pipes.size (); i++) -        out_pipes [i]->terminate (); -} - -void zmq::rep_t::terminated (reader_t *pipe_) -{ -    //  ??? -    zmq_assert (sending_reply || !more || in_pipes [current] != pipe_); - -    zmq_assert (pipe_); -    zmq_assert (in_pipes.size () == out_pipes.size ()); - -    in_pipes_t::size_type index = in_pipes.index (pipe_); - -    if (index < active) { -        active--; -        if (current == active) -            current = 0; -    } -    in_pipes.erase (index); - -    //  ??? -    if (!zombie) { -        if (out_pipes [index]) -            out_pipes [index]->terminate (); -        out_pipes.erase (index); -    } -} - -void zmq::rep_t::terminated (writer_t *pipe_) -{ -    zmq_assert (pipe_); -    zmq_assert (in_pipes.size () == out_pipes.size ()); - -    out_pipes_t::size_type index = out_pipes.index (pipe_); - -    //  If the connection we've got the request from disconnects, -    //  there's nowhere to send the reply. Forget about the reply pipe. -    //  Once the reply is sent it will be dropped. -    if (sending_reply && pipe_ == reply_pipe) -        reply_pipe = NULL; - -    if (out_pipes.index (pipe_) < active) { -        active--; -        if (current == active) -            current = 0; -    } - -    out_pipes.erase (index); - -    //  ??? -    if (!zombie) { -        if (in_pipes [index]) -            in_pipes [index]->terminate (); -        in_pipes.erase (index); -    } -} - -bool zmq::rep_t::xhas_pipes () -{ -    return !in_pipes.empty () || !out_pipes.empty (); -} - -void zmq::rep_t::activated (reader_t *pipe_) -{ -    //  Move the pipe to the list of active pipes. -    in_pipes_t::size_type index = in_pipes.index (pipe_); -    in_pipes.swap (index, active); -    out_pipes.swap (index, active); -    active++; -} - -void zmq::rep_t::activated (writer_t *pipe_) -{ -    //  TODO: What here? -    zmq_assert (false);  }  int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)  { +    //  If we are in the middle of receiving a request, we cannot send reply.      if (!sending_reply) {          errno = EFSM;          return -1;      } -    if (reply_pipe) { - -        // Push message to the reply pipe. -        bool written = reply_pipe->write (msg_); -        zmq_assert (!more || written); +    bool more = (msg_->flags & ZMQ_MSG_MORE); -        // The pipe is full... -        // When this happens, we simply return an error. -        // This makes REP sockets vulnerable to DoS attack when -        // misbehaving requesters stop collecting replies. -        // TODO: Tear down the underlying connection (?) -        if (!written) { +    //  Push message to the reply pipe. +    int rc = xrep_t::xsend (msg_, flags_); +    if (rc != 0) +        return rc; -            //  TODO: The reply socket becomes deactivated here... -            errno = EAGAIN; -            return -1; -        } - -        more = msg_->flags & ZMQ_MSG_MORE; -    } -    else { - -        // If the requester have disconnected in the meantime, drop the reply. -        more = msg_->flags & ZMQ_MSG_MORE; -        zmq_msg_close (msg_); -    } - -    // Flush the reply to the requester. -    if (!more) { -        if (reply_pipe) -            reply_pipe->flush (); +    //  If the reply is complete flip the FSM back to request receiving state. +    if (!more)          sending_reply = false; -        reply_pipe = NULL; -    } - -    // Detach the message from the data buffer. -    int rc = zmq_msg_init (msg_); -    zmq_assert (rc == 0);      return 0;  } @@ -202,70 +63,44 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)          return -1;      } -    //  Deallocate old content of the message. -    zmq_msg_close (msg_); +    if (request_begins) { -    //  We haven't started reading a request yet... -    if (!more) { +        //  Copy the backtrace stack to the reply pipe. +        bool bottom = false; +        while (!bottom) { -        //  Round-robin over the pipes to get next message. -        int count; -        for (count = active; count != 0; count--) { -            if (in_pipes [current]->read (msg_)) -                break; +            //  TODO: What if request can be read but reply pipe is not +            //  ready for writing? -            //  Move the pipe to the list of inactive pipes. -            active--; -            in_pipes.swap (current, active); -            out_pipes.swap (current, active); +            //  Get next part of the backtrace stack. +            int rc = xrep_t::xrecv (msg_, flags_); +            if (rc != 0) +                return rc; +            zmq_assert (msg_->flags & ZMQ_MSG_MORE); -            //  Move to next pipe. -            current++; -            if (current >= active) -                current = 0; -        } +            //  Empty message part delimits the traceback stack. +            bottom = (zmq_msg_size (msg_) == 0); -        //  No message is available. Initialise the output parameter -        //  to be a 0-byte message. -        if (count == 0) { -            zmq_msg_init (msg_); -            errno = EAGAIN; -            return -1; +            //  Push it to the reply pipe. +            rc = xrep_t::xsend (msg_, flags_); +            zmq_assert (rc == 0);          } -        //  We are aware of a new message now. Setup the reply pipe. -        reply_pipe = out_pipes [current]; - -        //  Copy the routing info to the reply pipe. -        while (true) { - -            //  Push message to the reply pipe. -            //  TODO: What if the pipe is full? -            //  Tear down the underlying connection? -            bool written = reply_pipe->write (msg_); -            zmq_assert (written); - -            //  Message part of zero size delimits the traceback stack. -            if (zmq_msg_size (msg_) == 0) -                break; - -            //  Get next part of the message. -            bool fetched = in_pipes [current]->read (msg_); -            zmq_assert (fetched); -        } +        request_begins = false;      } -    //  Now the routing info is processed. Get the first part +    //  Now the routing info is safely stored. Get the first part      //  of the message payload and exit. -    bool fetched = in_pipes [current]->read (msg_); -    zmq_assert (fetched); -    more = msg_->flags & ZMQ_MSG_MORE; -    if (!more) { -        current++; -        if (current >= active) -            current = 0; +    int rc = xrep_t::xrecv (msg_, flags_); +    if (rc != 0) +        return rc; + +    //  If whole request is read, flip the FSM to reply-sending state. +    if (!(msg_->flags & ZMQ_MSG_MORE)) {          sending_reply = true; +        request_begins = true;      } +      return 0;  } @@ -274,25 +109,7 @@ bool zmq::rep_t::xhas_in ()      if (sending_reply)          return false; -    if (more) -        return true; - -    for (int count = active; count != 0; count--) { -        if (in_pipes [current]->check_read ()) -            return !sending_reply; - -        //  Move the pipe to the list of inactive pipes. -        active--; -        in_pipes.swap (current, active); -        out_pipes.swap (current, active); - -        //  Move to the next pipe. -        current++; -        if (current >= active) -            current = 0; -    } - -    return false; +    return xrep_t::xhas_in ();  }  bool zmq::rep_t::xhas_out () @@ -300,10 +117,6 @@ bool zmq::rep_t::xhas_out ()      if (!sending_reply)          return false; -    if (more) -        return true; - -    //  TODO: No check for write here... -    return sending_reply; +    return xrep_t::xhas_out ();  } diff --git a/src/rep.hpp b/src/rep.hpp index 7d82a28..09eda02 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -20,17 +20,12 @@  #ifndef __ZMQ_REP_HPP_INCLUDED__  #define __ZMQ_REP_HPP_INCLUDED__ -#include "socket_base.hpp" -#include "yarray.hpp" -#include "pipe.hpp" +#include "xrep.hpp"  namespace zmq  { -    class rep_t : -        public socket_base_t, -        public i_reader_events, -        public i_writer_events +    class rep_t : public xrep_t      {      public: @@ -38,50 +33,20 @@ namespace zmq          ~rep_t ();          //  Overloads of functions from socket_base_t. -        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, -            const blob_t &peer_identity_); -        void xterm_pipes (); -        bool xhas_pipes ();          int xsend (zmq_msg_t *msg_, int flags_);          int xrecv (zmq_msg_t *msg_, int flags_);          bool xhas_in ();          bool xhas_out (); -        //  i_reader_events interface implementation. -        void activated (reader_t *pipe_); -        void terminated (reader_t *pipe_); - -        //  i_writer_events interface implementation. -        void activated (writer_t *pipe_); -        void terminated (writer_t *pipe_); -      private: -        //  List in outbound and inbound pipes. Note that the two lists are -        //  always in sync. I.e. outpipe with index N communicates with the -        //  same session as inpipe with index N. -        typedef yarray_t <writer_t> out_pipes_t; -        out_pipes_t out_pipes; -        typedef yarray_t <reader_t> in_pipes_t; -        in_pipes_t in_pipes; - -        //  Number of active inpipes. All the active inpipes are located at the -        //  beginning of the in_pipes array. -        in_pipes_t::size_type active; - -        //  Index of the next inbound pipe to read a request from. -        in_pipes_t::size_type current; - -        //  If true, request was already received and reply wasn't completely -        //  sent yet. +        //  If true, we are in process of sending the reply. If false we are +        //  in process of receiving a request.          bool sending_reply; -        //  True, if message processed at the moment (either sent or received) -        //  is processed only partially. -        bool more; - -        //  Pipe we are going to send reply to. -        writer_t *reply_pipe; +        //  If true, we are starting to receive a request. The beginning +        //  of the request is the backtrace stack. +        bool request_begins;          rep_t (const rep_t&);          void operator = (const rep_t&);  | 
