diff options
| -rw-r--r-- | src/rep.cpp | 97 | ||||
| -rw-r--r-- | src/req.cpp | 28 | ||||
| -rw-r--r-- | src/xrep.cpp | 195 | ||||
| -rw-r--r-- | src/xrep.hpp | 35 | 
4 files changed, 280 insertions, 75 deletions
diff --git a/src/rep.cpp b/src/rep.cpp index f77dfce..6711509 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -167,15 +167,15 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)      if (reply_pipe) { -        //  Push message to the reply pipe. +        // Push message to the reply pipe.          bool written = reply_pipe->write (msg_);          zmq_assert (!more || written); -        //  The pipe is full... -        //  When this happens, we simply return an error. -        //  This makes REP sockets vulnerable to DoS attack when -        //  misbehaving requesters stop collecting replies. -        //  TODO: Tear down the underlying connection (?) +        // The pipe is full... +        // When this happens, we simply return an error. +        // This makes REP sockets vulnerable to DoS attack when +        // misbehaving requesters stop collecting replies. +        // TODO: Tear down the underlying connection (?)          if (!written) {              errno = EAGAIN;              return -1; @@ -185,12 +185,12 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)      }      else { -        //  If the requester have disconnected in the meantime, drop the reply. +        // If the requester have disconnected in the meantime, drop the reply.          more = msg_->flags & ZMQ_MSG_MORE;          zmq_msg_close (msg_);      } -    //  Flush the reply to the requester. +    // Flush the reply to the requester.      if (!more) {          if (reply_pipe)              reply_pipe->flush (); @@ -198,7 +198,7 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)          reply_pipe = NULL;      } -    //  Detach the message from the data buffer. +    // Detach the message from the data buffer.      int rc = zmq_msg_init (msg_);      zmq_assert (rc == 0); @@ -207,37 +207,70 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)  int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)  { -    //  Deallocate old content of the message. -    zmq_msg_close (msg_); - +    //  If we are in middle of sending a reply, we cannot receive next request.      if (sending_reply) {          errno = EFSM;          return -1;      } -    //  Round-robin over the pipes to get next message. -    for (int count = active; count != 0; count--) { -        bool fetched = in_pipes [current]->read (msg_); -        zmq_assert (!(more && !fetched)); -         -        if (fetched) { -            more = msg_->flags & ZMQ_MSG_MORE; -            if (!more) { -                reply_pipe = out_pipes [current]; -                sending_reply = true; -                current++; -                if (current >= active) -                    current = 0; -            } -            return 0; +    //  Deallocate old content of the message. +    zmq_msg_close (msg_); + +    //  We haven't started reading a request yet... +    if (!more) { + +        //  Round-robin over the pipes to get next message. +        int count; +        for (count = active; count != 0; count--) { +            if (in_pipes [current]->read (msg_)) +                break; +            current++; +            if (current >= active) +                current = 0; +        } + +        //  No message is available. Initialise the output parameter +        //  to be a 0-byte message. +        if (count == 0) { +            zmq_msg_init (msg_); +            errno = EAGAIN; +            return -1; +        } + +        //  We are aware of a new message now. Setup the reply pipe. +        reply_pipe = out_pipes [current]; + +        //  Copy the routing info to the reply pipe. +        while (true) { + +            //  Push message to the reply pipe. +            //  TODO: What if the pipe is full? +            //  Tear down the underlying connection? +            bool written = reply_pipe->write (msg_); +            zmq_assert (written); + +            //  Message part of zero size delimits the traceback stack. +            if (zmq_msg_size (msg_) == 0) +                break; + +            //  Get next part of the message. +            bool fetched = in_pipes [current]->read (msg_); +            zmq_assert (fetched);          }      } -    //  No message is available. Initialise the output parameter -    //  to be a 0-byte message. -    zmq_msg_init (msg_); -    errno = EAGAIN; -    return -1; +    //  Now the routing info is processed. Get the first part +    //  of the message payload and exit. +    bool fetched = in_pipes [current]->read (msg_); +    zmq_assert (fetched); +    more = msg_->flags & ZMQ_MSG_MORE; +    if (!more) { +        current++; +        if (current >= active) +            current = 0; +        sending_reply = true; +    } +    return 0;  }  bool zmq::rep_t::xhas_in () diff --git a/src/req.cpp b/src/req.cpp index c8b7b98..969755b 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -190,7 +190,17 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)          return -1;      } -    //  Push message to the selected pipe. +    //  If we are starting to send the request, generate a prefix. +    if (!more) { +        zmq_msg_t prefix; +        int rc = zmq_msg_init (&prefix); +        zmq_assert (rc == 0); +        prefix.flags |= ZMQ_MSG_MORE; +        bool written = out_pipes [current]->write (&prefix); +        zmq_assert (written); +    } + +    //  Push the message to the selected pipe.      bool written = out_pipes [current]->write (msg_);      zmq_assert (written);      more = msg_->flags & ZMQ_MSG_MORE; @@ -218,7 +228,8 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)  int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)  {      //  Deallocate old content of the message. -    zmq_msg_close (msg_); +    int rc = zmq_msg_close (msg_); +    zmq_assert (rc == 0);      //  If request wasn't send, we can't wait for reply.      if (!receiving_reply) { @@ -234,6 +245,19 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)          return -1;      } +    //  If we are starting to receive new reply, check whether prefix +    //  is well-formed and drop it. +    if (!more) { +        zmq_assert (msg_->flags & ZMQ_MSG_MORE); +        zmq_assert (zmq_msg_size (msg_) == 0); +        rc = zmq_msg_close (msg_); +        zmq_assert (rc == 0); +    } + +    //  Get the actual reply. +    bool recvd = reply_pipe->read (msg_); +    zmq_assert (recvd); +      //  If this was last part of the reply, switch to request phase.      more = msg_->flags & ZMQ_MSG_MORE;      if (!more) { diff --git a/src/xrep.cpp b/src/xrep.cpp index c70c3ac..051a5ce 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -24,7 +24,11 @@  #include "pipe.hpp"  zmq::xrep_t::xrep_t (class app_thread_t *parent_) : -    socket_base_t (parent_) +    socket_base_t (parent_), +    current_in (0), +    more_in (false), +    current_out (NULL), +    more_out (false)  {      options.requires_in = true;      options.requires_out = true; @@ -32,56 +36,96 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) :      //  On connect, pipes are created only after initial handshaking.      //  That way we are aware of the peer's identity when binding to the pipes.      options.immediate_connect = false; - -    //  XREP is unfunctional at the moment. Crash here! -    zmq_assert (false);  }  zmq::xrep_t::~xrep_t ()  { +    for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); it++) +        it->reader->term (); +    for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end (); +          it++) +        it->second.writer->term ();  }  void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_,      class writer_t *outpipe_, const blob_t &peer_identity_)  {      zmq_assert (inpipe_ && outpipe_); -    fq.attach (inpipe_);      //  TODO: What if new connection has same peer identity as the old one? +    outpipe_t outpipe = {outpipe_, true};      bool ok = outpipes.insert (std::make_pair ( -        peer_identity_, outpipe_)).second; +        peer_identity_, outpipe)).second;      zmq_assert (ok); + +    inpipe_t inpipe = {inpipe_, peer_identity_, true}; +    inpipes.push_back (inpipe);  }  void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)  { -    zmq_assert (pipe_); -    fq.detach (pipe_); +// TODO:! +    for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); +          it++) { +        if (it->reader == pipe_) { +            inpipes.erase (it); +            return; +        } +    } +    zmq_assert (false);  }  void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)  {      for (outpipes_t::iterator it = outpipes.begin (); -          it != outpipes.end (); ++it) -        if (it->second == pipe_) { +          it != outpipes.end (); ++it) { +        if (it->second.writer == pipe_) {              outpipes.erase (it); +            if (pipe_ == current_out) +                current_out = NULL;              return;          } +    }      zmq_assert (false);  }  void zmq::xrep_t::xkill (class reader_t *pipe_)  { -    fq.kill (pipe_); +    for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); +          it++) { +        if (it->reader == pipe_) { +            zmq_assert (it->active); +            it->active = false; +            return; +        } +    } +    zmq_assert (false);  }  void zmq::xrep_t::xrevive (class reader_t *pipe_)  { -    fq.revive (pipe_); +    for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); +          it++) { +        if (it->reader == pipe_) { +            zmq_assert (!it->active); +            it->active = true; +            return; +        } +    } +    zmq_assert (false);  }  void zmq::xrep_t::xrevive (class writer_t *pipe_)  { +    for (outpipes_t::iterator it = outpipes.begin (); +          it != outpipes.end (); ++it) { +        if (it->second.writer == pipe_) { +            zmq_assert (!it->second.active); +            it->second.active = true; +            return; +        } +    } +    zmq_assert (false);  }  int zmq::xrep_t::xsetsockopt (int option_, const void *optval_, @@ -93,33 +137,45 @@ int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,  int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)  { -    unsigned char *data = (unsigned char*) zmq_msg_data (msg_); -    size_t size = zmq_msg_size (msg_); - -    //  Check whether the message is well-formed. -    zmq_assert (size >= 1); -    zmq_assert (size_t (*data + 1) <= size); - -    //  Find the corresponding outbound pipe. If there's none, just drop the -    //  message. -    //  TODO: There's an allocation here! It's the critical path! Get rid of it! -    blob_t identity (data + 1, *data); -    outpipes_t::iterator it = outpipes.find (identity); -    if (it == outpipes.end ()) { -        int rc = zmq_msg_close (msg_); -        zmq_assert (rc == 0); -        rc = zmq_msg_init (msg_); -        zmq_assert (rc == 0); +    //  If this is the first part of the message it's the identity of the +    //  peer to send the message to. +    if (!more_out) { +        zmq_assert (!current_out); + +        //  There's no such thing as prefix with no subsequent message. +        zmq_assert (msg_->flags & ZMQ_MSG_MORE); +        more_out = true; + +        //  Find the pipe associated with the identity stored in the prefix. +        //  If there's no such pipe just silently drop the message. +        blob_t identity ((unsigned char*) zmq_msg_data (msg_), +            zmq_msg_size (msg_)); +        outpipes_t::iterator it = outpipes.find (identity); +        if (it == outpipes.end ()) +            return 0; +         +        //  Remember the outgoing pipe. +        current_out = it->second.writer; +          return 0;      } -    //  Push message to the selected pipe. -    if (!it->second->write (msg_)) { -        errno = EAGAIN; -        return -1; -    } +    //  Check whether this is the last part of the message. +    more_out = msg_->flags & ZMQ_MSG_MORE; -    it->second->flush (); +    //  Push the message into the pipe. If there's no out pipe, just drop it. +    if (current_out) { +        bool ok = current_out->write (msg_); +        zmq_assert (ok); +        if (!more_out) { +            current_out->flush (); +            current_out = NULL; +        } +    } +    else { +        int rc = zmq_msg_close (msg_); +        zmq_assert (rc == 0); +    }      //  Detach the message from the data buffer.      int rc = zmq_msg_init (msg_); @@ -130,12 +186,77 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)  int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)  { -    return fq.recv (msg_, flags_); +    //  Deallocate old content of the message. +    zmq_msg_close (msg_); + +    //  If we are in the middle of reading a message, just grab next part of it. +    if (more_in) { +        zmq_assert (inpipes [current_in].active); +        bool fetched = inpipes [current_in].reader->read (msg_); +        zmq_assert (fetched); +        more_in = msg_->flags & ZMQ_MSG_MORE; +        if (!more_in) { +            current_in++; +            if (current_in >= inpipes.size ()) +                current_in = 0; +        } +        return 0; +    } + +    //  Round-robin over the pipes to get the next message. +    for (int count = inpipes.size (); count != 0; count--) { + +        //  Try to fetch new message. +        bool fetched; +        if (!inpipes [current_in].active) +            fetched = false; +        else +            fetched = inpipes [current_in].reader->check_read (); + +        //  If we have a message, create a prefix and return it to the caller. +        if (fetched) { +            int rc = zmq_msg_init_size (msg_, +                inpipes [current_in].identity.size ()); +            zmq_assert (rc == 0); +            memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (), +                zmq_msg_size (msg_)); +            more_in = true; +            return 0; +        } + +        //  If me don't have a message, move to next pipe. +        current_in++; +        if (current_in >= inpipes.size ()) +            current_in = 0; +    } + +    //  No message is available. Initialise the output parameter +    //  to be a 0-byte message. +    zmq_msg_init (msg_); +    errno = EAGAIN; +    return -1;  }  bool zmq::xrep_t::xhas_in ()  { -    return fq.has_in (); +    //  There are subsequent parts of the partly-read message available. +    if (more_in) +        return true; + +    //  Note that messing with current doesn't break the fairness of fair +    //  queueing algorithm. If there are no messages available current will +    //  get back to its original value. Otherwise it'll point to the first +    //  pipe holding messages, skipping only pipes with no messages available. +    for (int count = inpipes.size (); count != 0; count--) { +        if (inpipes [current_in].active && +              inpipes [current_in].reader->check_read ()) +            return true; +        current_in++; +        if (current_in >= inpipes.size ()) +            current_in = 0; +    } + +    return false;  }  bool zmq::xrep_t::xhas_out () diff --git a/src/xrep.hpp b/src/xrep.hpp index c56a8f9..940d288 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -21,14 +21,15 @@  #define __ZMQ_XREP_HPP_INCLUDED__  #include <map> +#include <vector>  #include "socket_base.hpp"  #include "blob.hpp" -#include "fq.hpp"  namespace zmq  { +    //  TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.      class xrep_t : public socket_base_t      {      public: @@ -52,13 +53,39 @@ namespace zmq      private: -        //  Inbound messages are fair-queued. -        fq_t fq; +        struct inpipe_t +        { +            class reader_t *reader; +            blob_t identity; +            bool active; +        }; + +        //  Inbound pipes with the names of corresponging peers. +        typedef std::vector <inpipe_t> inpipes_t; +        inpipes_t inpipes; + +        //  The pipe we are currently reading from. +        inpipes_t::size_type current_in; + +        //  If true, more incoming message parts are expected. +        bool more_in; + +        struct outpipe_t +        { +            class writer_t *writer; +            bool active; +        };          //  Outbound pipes indexed by the peer names. -        typedef std::map <blob_t, class writer_t*> outpipes_t; +        typedef std::map <blob_t, outpipe_t> outpipes_t;          outpipes_t outpipes; +        //  The pipe we are currently writing to. +        class writer_t *current_out; + +        //  If true, more outgoing message parts are expected. +        bool more_out; +          xrep_t (const xrep_t&);          void operator = (const xrep_t&);      };  | 
