diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile.am | 2 | ||||
| -rw-r--r-- | src/random.cpp | 39 | ||||
| -rw-r--r-- | src/random.hpp | 34 | ||||
| -rw-r--r-- | src/rep.cpp | 52 | ||||
| -rw-r--r-- | src/req.cpp | 38 | ||||
| -rw-r--r-- | src/req.hpp | 5 | ||||
| -rw-r--r-- | src/socket_base.cpp | 18 | ||||
| -rw-r--r-- | src/xrep.cpp | 45 | ||||
| -rw-r--r-- | src/xrep.hpp | 15 | 
9 files changed, 179 insertions, 69 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index 92ceb20..ae20d33 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -53,6 +53,7 @@ libzmq_la_SOURCES = \      pub.hpp \      pull.hpp \      push.hpp \ +    random.hpp \      reaper.hpp \      rep.hpp \      req.hpp \ @@ -117,6 +118,7 @@ libzmq_la_SOURCES = \      push.cpp \      reaper.cpp \      pub.cpp \ +    random.cpp \      rep.cpp \      req.cpp \      router.cpp \ diff --git a/src/random.cpp b/src/random.cpp new file mode 100644 index 0000000..ee7a7fb --- /dev/null +++ b/src/random.cpp @@ -0,0 +1,39 @@ +/* +    Copyright (c) 2007-2011 iMatix Corporation +    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "random.hpp" +#include "uuid.hpp" +#include "err.hpp" + +//  Here we can use different ways of generating random data, as avialable +//  on different platforms. At the moment, we'll assume the UUID is random +//  enough to use for that purpose. +void zmq::generate_random (void *buf_, size_t size_) +{ +    //  Collapsing an UUID into 4 bytes. +    zmq_assert (size_ == 4); +    uint32_t buff [4]; +    generate_uuid ((void*) buff); +    uint32_t result = buff [0]; +    result ^= buff [1]; +    result ^= buff [2]; +    result ^= buff [3]; +    *((uint32_t*) buf_) = result; +} diff --git a/src/random.hpp b/src/random.hpp new file mode 100644 index 0000000..0b99bbd --- /dev/null +++ b/src/random.hpp @@ -0,0 +1,34 @@ +/* +    Copyright (c) 2007-2011 iMatix Corporation +    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_RANDOM_HPP_INCLUDED__ +#define __ZMQ_RANDOM_HPP_INCLUDED__ + +#include <stddef.h> + +namespace zmq +{ + +    //  Generates truly random bytes (not pseudo-random). +    void generate_random (void *buf_, size_t size_); + +} + +#endif diff --git a/src/rep.cpp b/src/rep.cpp index b987d9c..a5d1462 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -64,54 +64,32 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)          return -1;      } +    //  First thing to do when receiving a request is to copy all the labels +    //  to the reply pipe.      if (request_begins) { - -        //  Copy the backtrace stack to the reply pipe.          while (true) { - -            //  TODO: If request can be read but reply pipe is not -            //  ready for writing, we should drop the reply. - -            //  Get next part of the backtrace stack.              int rc = xrep_t::xrecv (msg_, flags_);              if (rc != 0)                  return rc; +            if (!(msg_->flags () & msg_t::label)) +                break; -            if (msg_->flags () & (msg_t::more | msg_t::label)) { - -                //  Empty message part delimits the traceback stack. -                bool bottom = (msg_->size () == 0); - -                //  Push it to the reply pipe. -                rc = xrep_t::xsend (msg_, flags_); -                zmq_assert (rc == 0); - -                //  The end of the traceback, move to processing message body. -                if (bottom) -                    break; -            } -            else { - -                //  If the traceback stack is malformed, discard anything -                //  already sent to pipe (we're at end of invalid message) -                //  and continue reading -- that'll switch us to the next pipe -                //  and next request. -                rc = xrep_t::rollback (); -                zmq_assert (rc == 0); -            } +            //  TODO: If the reply cannot be sent to the peer because +            //  od congestion, we should drop it. +            rc = xrep_t::xsend (msg_, flags_); +            zmq_assert (rc == 0);          } -          request_begins = false;      } - -    //  Now the routing info is safely stored. Get the first part -    //  of the message payload and exit. -    int rc = xrep_t::xrecv (msg_, flags_); -    if (rc != 0) -        return rc; +    else { +        int rc = xrep_t::xrecv (msg_, flags_); +        if (rc != 0) +            return rc; +    } +    zmq_assert (!(msg_->flags () & msg_t::label));      //  If whole request is read, flip the FSM to reply-sending state. -    if (!(msg_->flags () & (msg_t::more | msg_t::label))) { +    if (!(msg_->flags () & msg_t::more)) {          sending_reply = true;          request_begins = true;      } diff --git a/src/req.cpp b/src/req.cpp index b0e58dc..e51b853 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -21,13 +21,21 @@  #include "req.hpp"  #include "err.hpp"  #include "msg.hpp" +#include "uuid.hpp" +#include "wire.hpp" +#include "random.hpp" +#include "likely.hpp"  zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) :      xreq_t (parent_, tid_),      receiving_reply (false), -    message_begins (true) +    message_begins (true), +    request_id (0)  {      options.type = ZMQ_REQ; + +    //  Start the request ID sequence at an random point. +    generate_random (&request_id, sizeof (request_id));  }  zmq::req_t::~req_t () @@ -43,12 +51,14 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)          return -1;      } -    //  First part of the request is empty message part (stack bottom). +    //  First part of the request is the request identity.      if (message_begins) {          msg_t prefix; -        int rc = prefix.init (); +        int rc = prefix.init_size (4);          errno_assert (rc == 0);          prefix.set_flags (msg_t::label); +        unsigned char *data = (unsigned char*) prefix.data (); +        put_uint32 (data, request_id);          rc = xreq_t::xsend (&prefix, flags_);          if (rc != 0)              return rc; @@ -78,13 +88,28 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)          return -1;      } -    //  First part of the reply should be empty message part (stack bottom). +    //  First part of the reply should be the original request ID.      if (message_begins) {          int rc = xreq_t::xrecv (msg_, flags_);          if (rc != 0)              return rc;          zmq_assert (msg_->flags () & msg_t::label); -        zmq_assert (msg_->size () == 0); +        zmq_assert (msg_->size () == 4); +        unsigned char *data = (unsigned char*) msg_->data (); +        if (unlikely (get_uint32 (data) != request_id)) { + +            //  The request ID does not match. Drop the entire message. +            while (true) { +                int rc = xreq_t::xrecv (msg_, flags_); +                errno_assert (rc == 0); +                if (!(msg_->flags () & (msg_t::label | msg_t::more))) +                    break; +            } +            msg_->close (); +            msg_->init (); +            errno = EAGAIN; +            return -1; +        }          message_begins = false;      } @@ -94,6 +119,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)      //  If the reply is fully received, flip the FSM into request-sending state.      if (!(msg_->flags () & (msg_t::more | msg_t::label))) { +        request_id++;          receiving_reply = false;          message_begins = true;      } @@ -103,6 +129,8 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)  bool zmq::req_t::xhas_in ()  { +    //  TODO: Duplicates should be removed here. +      if (!receiving_reply)          return false; diff --git a/src/req.hpp b/src/req.hpp index e0554ac..50dcb44 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -22,6 +22,7 @@  #define __ZMQ_REQ_HPP_INCLUDED__  #include "xreq.hpp" +#include "stdint.hpp"  namespace zmq  { @@ -49,6 +50,10 @@ namespace zmq          //  of the message must be empty message part (backtrace stack bottom).          bool message_begins; +        //  Request ID. Request numbers gradually increase (and wrap over) +        //  so that we don't have to generate random ID for each request. +        uint32_t request_id; +          req_t (const req_t&);          const req_t &operator = (const req_t&);      }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index eaf1776..804ec46 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -598,15 +598,15 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)          ticks = 0;          rc = xrecv (msg_, flags_); -        if (rc == 0) { -            rcvlabel = msg_->flags () & msg_t::label; -            if (rcvlabel) -                msg_->reset_flags (msg_t::label); -            rcvmore = msg_->flags () & msg_t::more; -            if (rcvmore) -                msg_->reset_flags (msg_t::more); -        } -        return rc; +        if (rc < 0) +            return rc; +        rcvlabel = msg_->flags () & msg_t::label; +        if (rcvlabel) +            msg_->reset_flags (msg_t::label); +        rcvmore = msg_->flags () & msg_t::more; +        if (rcvmore) +            msg_->reset_flags (msg_t::more); +        return 0;      }      //  Compute the time when the timeout should occur. diff --git a/src/xrep.cpp b/src/xrep.cpp index b935c06..ab2f0a8 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -20,6 +20,8 @@  #include "xrep.hpp"  #include "pipe.hpp" +#include "wire.hpp" +#include "random.hpp"  #include "err.hpp"  zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : @@ -32,9 +34,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :  {      options.type = ZMQ_XREP; -    //  On connect, pipes are created only after initial handshaking. -    //  That way we are aware of the peer's identity when binding to the pipes. -    options.immediate_connect = false; +    //  Start the peer ID sequence from a random point. +    generate_random (&next_peer_id, sizeof (next_peer_id));  }  zmq::xrep_t::~xrep_t () @@ -47,16 +48,33 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)  {      zmq_assert (pipe_); +    //  Generate a new peer ID. Take care to avoid duplicates. +    outpipes_t::iterator it = outpipes.lower_bound (next_peer_id); +    if (!outpipes.empty ()) { +        while (true) { +            if (it == outpipes.end ()) +                it = outpipes.begin (); +            if (it->first != next_peer_id) +                break; +            ++next_peer_id; +            ++it; +        } +    } +      //  Add the pipe to the map out outbound pipes. -    //  TODO: What if new connection has same peer identity as the old one?      outpipe_t outpipe = {pipe_, true};      bool ok = outpipes.insert (outpipes_t::value_type ( -        peer_identity_, outpipe)).second; +        next_peer_id, outpipe)).second;      zmq_assert (ok);      //  Add the pipe to the list of inbound pipes. -    inpipe_t inpipe = {pipe_, peer_identity_, true}; +    inpipe_t inpipe = {pipe_, next_peer_id, true};      inpipes.push_back (inpipe); + +    //  Advance next peer ID so that if new connection is dropped shortly after +    //  its creation we don't accidentally get two subsequent peers with +    //  the same ID. +    ++next_peer_id;  }  void zmq::xrep_t::xterminated (pipe_t *pipe_) @@ -115,7 +133,7 @@ void zmq::xrep_t::xwrite_activated (pipe_t *pipe_)  int zmq::xrep_t::xsend (msg_t *msg_, int flags_)  { -    //  If this is the first part of the message it's the identity of the +    //  If this is the first part of the message it's the ID of the      //  peer to send the message to.      if (!more_out) {          zmq_assert (!current_out); @@ -127,10 +145,11 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)              more_out = true; -            //  Find the pipe associated with the identity stored in the prefix. +            //  Find the pipe associated with the peer ID stored in the prefix.              //  If there's no such pipe just silently ignore the message. -            blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); -            outpipes_t::iterator it = outpipes.find (identity); +            zmq_assert (msg_->size () == 4); +            uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ()); +            outpipes_t::iterator it = outpipes.find (peer_id);              if (it != outpipes.end ()) {                  current_out = it->second.pipe; @@ -220,10 +239,10 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)          //  If we have a message, create a prefix and return it to the caller.          if (prefetched) { -            int rc = msg_->init_size (inpipes [current_in].identity.size ()); +            int rc = msg_->init_size (4);              errno_assert (rc == 0); -            memcpy (msg_->data (), inpipes [current_in].identity.data (), -                msg_->size ()); +            put_uint32 ((unsigned char*) msg_->data (), +                inpipes [current_in].peer_id);              msg_->set_flags (msg_t::label);              return 0;          } diff --git a/src/xrep.hpp b/src/xrep.hpp index fbc7385..d5014e0 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -25,7 +25,7 @@  #include <vector>  #include "socket_base.hpp" -#include "blob.hpp" +#include "stdint.hpp"  #include "msg.hpp"  namespace zmq @@ -41,7 +41,8 @@ namespace zmq          ~xrep_t ();          //  Overloads of functions from socket_base_t. -        void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); +        void xattach_pipe (class pipe_t *pipe_, +            const blob_t &peer_identity_);          int xsend (class msg_t *msg_, int flags_);          int xrecv (class msg_t *msg_, int flags_);          bool xhas_in (); @@ -60,7 +61,7 @@ namespace zmq          struct inpipe_t          {              class pipe_t *pipe; -            blob_t identity; +            uint32_t peer_id;              bool active;          }; @@ -86,8 +87,8 @@ namespace zmq              bool active;          }; -        //  Outbound pipes indexed by the peer names. -        typedef std::map <blob_t, outpipe_t> outpipes_t; +        //  Outbound pipes indexed by the peer IDs. +        typedef std::map <uint32_t, outpipe_t> outpipes_t;          outpipes_t outpipes;          //  The pipe we are currently writing to. @@ -96,6 +97,10 @@ namespace zmq          //  If true, more outgoing message parts are expected.          bool more_out; +        //  Peer ID are generated. It's a simple increment and wrap-over +        //  algorithm. This value is the next ID to use (if not used already). +        uint32_t next_peer_id; +          xrep_t (const xrep_t&);          const xrep_t &operator = (const xrep_t&);      }; | 
