diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/msg.hpp | 1 | ||||
| -rw-r--r-- | src/options.cpp | 4 | ||||
| -rw-r--r-- | src/options.hpp | 6 | ||||
| -rw-r--r-- | src/pipe.cpp | 11 | ||||
| -rw-r--r-- | src/pipe.hpp | 10 | ||||
| -rw-r--r-- | src/req.cpp | 15 | ||||
| -rw-r--r-- | src/req.hpp | 1 | ||||
| -rw-r--r-- | src/session_base.cpp | 20 | ||||
| -rw-r--r-- | src/session_base.hpp | 5 | ||||
| -rw-r--r-- | src/socket_base.cpp | 9 | ||||
| -rw-r--r-- | src/xrep.cpp | 102 | ||||
| -rw-r--r-- | src/xrep.hpp | 4 | ||||
| -rw-r--r-- | src/xreq.cpp | 14 | 
13 files changed, 146 insertions, 56 deletions
diff --git a/src/msg.hpp b/src/msg.hpp index f2f8fcf..8c84670 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -50,6 +50,7 @@ namespace zmq          enum          {              more = 1, +            identity = 64,              shared = 128          }; diff --git a/src/options.cpp b/src/options.cpp index aa94a21..4db1a6c 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -46,7 +46,9 @@ zmq::options_t::options_t () :      ipv4only (1),      delay_on_close (true),      delay_on_disconnect (true), -    filter (false) +    filter (false), +    send_identity (false), +    recv_identity (false)  {  } diff --git a/src/options.hpp b/src/options.hpp index d017c00..bfc9dc7 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -99,6 +99,12 @@ namespace zmq          //  If 1, (X)SUB socket should filter the messages. If 0, it should not.          bool filter; + +        //  Sends identity to all new connections. +        bool send_identity; + +        //  Receivers identity from all new connections. +        bool recv_identity;      };  } diff --git a/src/pipe.cpp b/src/pipe.cpp index 9f44c94..25dd51c 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -65,8 +65,7 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,      peer (NULL),      sink (NULL),      state (active), -    delay (delay_), -    pipe_id (0) +    delay (delay_)  {  } @@ -88,14 +87,14 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)      sink = sink_;  } -void zmq::pipe_t::set_pipe_id (uint32_t id_) +void zmq::pipe_t::set_identity (const blob_t &identity_)  { -    pipe_id = id_; +    identity = identity_;  } -uint32_t zmq::pipe_t::get_pipe_id () +zmq::blob_t zmq::pipe_t::get_identity ()  { -    return pipe_id; +    return identity;  }  bool zmq::pipe_t::check_read () diff --git a/src/pipe.hpp b/src/pipe.hpp index 4533e58..75a2021 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -1,6 +1,7 @@  /*      Copyright (c) 2009-2011 250bpm s.r.o.      Copyright (c) 2007-2009 iMatix Corporation +    Copyright (c) 2011 VMware, Inc.      Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file      This file is part of 0MQ. @@ -28,6 +29,7 @@  #include "object.hpp"  #include "stdint.hpp"  #include "array.hpp" +#include "blob.hpp"  namespace zmq  { @@ -71,8 +73,8 @@ namespace zmq          void set_event_sink (i_pipe_events *sink_);          //  Pipe endpoint can store an opaque ID to be used by its clients. -        void set_pipe_id (uint32_t id_); -        uint32_t get_pipe_id (); +        void set_identity (const blob_t &identity_); +        blob_t get_identity ();          //  Returns true if there is at least one message to read in the pipe.          bool check_read (); @@ -183,8 +185,8 @@ namespace zmq          //  asks us to.          bool delay; -        //  Opaque ID. To be used by the clients, not the pipe itself. -        uint32_t pipe_id; +        //  Identity of the writer. Used uniquely by the reader side. +        blob_t identity;          //  Returns true if the message is delimiter; false otherwise.          static bool is_delimiter (msg_t &msg_); diff --git a/src/req.cpp b/src/req.cpp index 40c4765..3ba1ec0 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -147,23 +147,32 @@ zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,  zmq::req_session_t::~req_session_t ()  { +    state = options.recv_identity ? identity : bottom;  }  int zmq::req_session_t::write (msg_t *msg_)  { -    if (state == bottom) { +    switch (state) { +    case bottom:          if (msg_->flags () == msg_t::more && msg_->size () == 0) {              state = body;              return xreq_session_t::write (msg_);          } -    } -    else { +        break; +    case body:          if (msg_->flags () == msg_t::more)              return xreq_session_t::write (msg_);          if (msg_->flags () == 0) {              state = bottom;              return xreq_session_t::write (msg_);          } +        break; +    case identity: +        if (msg_->flags () == 0) { +            state = bottom; +            return xreq_session_t::write (msg_); +        } +        break;      }      errno = EFAULT;      return -1; diff --git a/src/req.hpp b/src/req.hpp index 61066ca..8fae9d4 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -71,6 +71,7 @@ namespace zmq      private:          enum { +            identity,              bottom,              body          } state; diff --git a/src/session_base.cpp b/src/session_base.cpp index 4c5e512..f2ee713 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -112,7 +112,9 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,      engine (NULL),      socket (socket_),      io_thread (io_thread_), -    has_linger_timer (false) +    has_linger_timer (false), +    send_identity (options_.send_identity), +    recv_identity (options_.recv_identity)  {      if (protocol_)          protocol = protocol_; @@ -146,6 +148,16 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)  int zmq::session_base_t::read (msg_t *msg_)  { +    //  First message to send is identity (if required). +    if (send_identity) { +        zmq_assert (!(msg_->flags () & msg_t::more)); +        msg_->init_size (options.identity_size); +        memcpy (msg_->data (), options.identity, options.identity_size); +        send_identity = false; +        incomplete_in = false; +        return 0; +    } +      if (!pipe || !pipe->read (msg_)) {          errno = EAGAIN;          return -1; @@ -157,6 +169,12 @@ int zmq::session_base_t::read (msg_t *msg_)  int zmq::session_base_t::write (msg_t *msg_)  { +    //  First message to receive is identity (if required). +    if (recv_identity) { +        msg_->set_flags (msg_t::identity); +        recv_identity = false; +    } +      if (pipe && pipe->write (msg_)) {          int rc = msg_->init ();          errno_assert (rc == 0); diff --git a/src/session_base.hpp b/src/session_base.hpp index 86a670f..c89628f 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -1,6 +1,7 @@  /*      Copyright (c) 2009-2011 250bpm s.r.o.      Copyright (c) 2007-2009 iMatix Corporation +    Copyright (c) 2011 VMware, Inc.      Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file      This file is part of 0MQ. @@ -119,6 +120,10 @@ namespace zmq          //  True is linger timer is running.          bool has_linger_timer; +        //  If true, identity is to be sent/recvd from the network. +        bool send_identity; +        bool recv_identity; +          //  Protocol and address to use when connecting.          std::string protocol;          std::string address; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 583818b..a59ba69 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -845,7 +845,16 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)  void zmq::socket_base_t::extract_flags (msg_t *msg_)  { +    //  Test whether IDENTITY flag is valid for this socket type. +    if (unlikely (msg_->flags () & msg_t::identity)) { +        zmq_assert (options.recv_identity); +printf ("identity recvd\n"); +    } +   + +    //  Remove MORE flag.      rcvmore = msg_->flags () & msg_t::more ? true : false;      if (rcvmore)          msg_->reset_flags (msg_t::more);  } + diff --git a/src/xrep.cpp b/src/xrep.cpp index 350d752..ea19e56 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -43,6 +43,9 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :      //  all the outstanding requests from that peer.      //  options.delay_on_disconnect = false; +    options.send_identity = true; +    options.recv_identity = true; +      prefetched_msg.init ();  } @@ -56,33 +59,22 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_)  {      zmq_assert (pipe_); -    //  Generate a new peer ID. Take care to avoid duplicates. -    outpipes_t::iterator it = outpipes.lower_bound (next_peer_id); -    if (!outpipes.empty ()) { -        while (true) { -            if (it == outpipes.end ()) -                it = outpipes.begin (); -            if (it->first != next_peer_id) -                break; -            ++next_peer_id; -            ++it; -        } -    } +    //  Generate a new unique peer identity. +    unsigned char buf [5]; +    buf [0] = 0; +    put_uint32 (buf + 1, next_peer_id); +    blob_t identity (buf, 5); +    ++next_peer_id;      //  Add the pipe to the map out outbound pipes.      outpipe_t outpipe = {pipe_, true};      bool ok = outpipes.insert (outpipes_t::value_type ( -        next_peer_id, outpipe)).second; +        identity, outpipe)).second;      zmq_assert (ok);      //  Add the pipe to the list of inbound pipes. -    pipe_->set_pipe_id (next_peer_id); -    fq.attach (pipe_); - -    //  Advance next peer ID so that if new connection is dropped shortly after -    //  its creation we don't accidentally get two subsequent peers with -    //  the same ID. -    ++next_peer_id; +    pipe_->set_identity (identity); +    fq.attach (pipe_);      }  void zmq::xrep_t::xterminated (pipe_t *pipe_) @@ -133,26 +125,25 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)              more_out = true; -            //  Find the pipe associated with the peer ID stored in the prefix. +            //  Find the pipe associated with the identity stored in the prefix.              //  If there's no such pipe just silently ignore the message. -            if (msg_->size () == 4) { -                uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ()); -                outpipes_t::iterator it = outpipes.find (peer_id); - -                if (it != outpipes.end ()) { -                    current_out = it->second.pipe; -                    msg_t empty; -                    int rc = empty.init (); -                    errno_assert (rc == 0); -                    if (!current_out->check_write (&empty)) { -                        it->second.active = false; -                        more_out = false; -                        current_out = NULL; -                    } -                    rc = empty.close (); -                    errno_assert (rc == 0); +            blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); +            outpipes_t::iterator it = outpipes.find (identity); + +            if (it != outpipes.end ()) { +                current_out = it->second.pipe; +                msg_t empty; +                int rc = empty.init (); +                errno_assert (rc == 0); +                if (!current_out->check_write (&empty)) { +                    it->second.active = false; +                    more_out = false; +                    current_out = NULL;                  } +                rc = empty.close (); +                errno_assert (rc == 0);              } +          }          int rc = msg_->close (); @@ -204,6 +195,37 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)      if (rc != 0)          return -1; +    //  If identity is received, change the key assigned to the pipe. +    if (unlikely (msg_->flags () & msg_t::identity)) { +        zmq_assert (!more_in); + +        //  Empty identity means we can preserve the auto-generated identity. +        if (msg_->size () != 0) { + +            //  Actual change of the identity. +            outpipes_t::iterator it = outpipes.begin (); +            while (it != outpipes.end ()) { +                if (it->second.pipe == pipe) { +                    blob_t identity ((unsigned char*) msg_->data (), +                        msg_->size ()); +                    pipe->set_identity (identity); +                    outpipes.erase (it); +                    outpipe_t outpipe = {pipe, true}; +                    outpipes.insert (outpipes_t::value_type (identity, +                        outpipe)); +                    break; +                } +                ++it; +            } +            zmq_assert (it != outpipes.end ()); +        } + +        //  After processing the identity, try to get the next message. +        rc = fq.recvpipe (msg_, flags_, &pipe); +        if (rc != 0) +            return -1; +    } +      //  If we are in the middle of reading a message, just return the next part.      if (more_in) {          more_in = msg_->flags () & msg_t::more ? true : false; @@ -217,9 +239,11 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)      prefetched = true;      rc = msg_->close ();      errno_assert (rc == 0); -    rc = msg_->init_size (4); + +    blob_t identity = pipe->get_identity (); +    rc = msg_->init_size (identity.size ());      errno_assert (rc == 0); -    put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ()); +    memcpy (msg_->data (), identity.data (), identity.size ());      msg_->set_flags (msg_t::more);      return 0;  } diff --git a/src/xrep.hpp b/src/xrep.hpp index 8cec683..fc02b11 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -1,6 +1,7 @@  /*      Copyright (c) 2009-2011 250bpm s.r.o.      Copyright (c) 2011 iMatix Corporation +    Copyright (c) 2011 VMware, Inc.      Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file      This file is part of 0MQ. @@ -27,6 +28,7 @@  #include "socket_base.hpp"  #include "session_base.hpp"  #include "stdint.hpp" +#include "blob.hpp"  #include "msg.hpp"  #include "fq.hpp" @@ -78,7 +80,7 @@ namespace zmq          };          //  Outbound pipes indexed by the peer IDs. -        typedef std::map <uint32_t, outpipe_t> outpipes_t; +        typedef std::map <blob_t, outpipe_t> outpipes_t;          outpipes_t outpipes;          //  The pipe we are currently writing to. diff --git a/src/xreq.cpp b/src/xreq.cpp index f4f962f..91317f7 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -1,5 +1,6 @@  /*      Copyright (c) 2009-2011 250bpm s.r.o. +    Copyright (c) 2011 VMware, Inc.      Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file      This file is part of 0MQ. @@ -32,6 +33,9 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) :      //  If the socket is closing we can drop all the outbound requests. There'll      //  be noone to receive the replies anyway.      //  options.delay_on_close = false; + +    options.send_identity = true; +    options.recv_identity = true;  }  zmq::xreq_t::~xreq_t () @@ -52,7 +56,15 @@ int zmq::xreq_t::xsend (msg_t *msg_, int flags_)  int zmq::xreq_t::xrecv (msg_t *msg_, int flags_)  { -    return fq.recv (msg_, flags_); +    //  XREQ socket doesn't use identities. We can safely drop it and  +    while (true) { +        int rc = fq.recv (msg_, flags_); +        if (rc != 0) +            return rc; +        if (likely (!(msg_->flags () & msg_t::identity))) +            break; +    } +    return 0;  }  bool zmq::xreq_t::xhas_in ()  | 
