diff options
Diffstat (limited to 'src')
46 files changed, 528 insertions, 266 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index 446b1e2..4146f68 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -63,6 +63,7 @@ libzmq_la_SOURCES = app_thread.hpp \      atomic_bitmap.hpp \      atomic_counter.hpp \      atomic_ptr.hpp \ +    blob.hpp \      command.hpp \      config.hpp \      decoder.hpp \ @@ -131,6 +132,7 @@ libzmq_la_SOURCES = app_thread.hpp \      zmq_init.hpp \      zmq_listener.hpp \      app_thread.cpp \ +    command.cpp \      devpoll.cpp \      dispatcher.cpp \      downstream.cpp \ diff --git a/src/blob.hpp b/src/blob.hpp new file mode 100644 index 0000000..a4fa8cd --- /dev/null +++ b/src/blob.hpp @@ -0,0 +1,33 @@ +/* +    Copyright (c) 2007-2010 iMatix Corporation + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_BLOB_HPP_INCLUDED__ +#define __ZMQ_BLOB_HPP_INCLUDED__ + +#include <string> + +namespace zmq +{ + +    //  Object to hold dynamically allocated opaque binary data. +    typedef std::basic_string <unsigned char> blob_t; + +} + +#endif diff --git a/src/command.cpp b/src/command.cpp new file mode 100644 index 0000000..8bf7ea2 --- /dev/null +++ b/src/command.cpp @@ -0,0 +1,38 @@ +/* +    Copyright (c) 2007-2010 iMatix Corporation + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdlib.h> + +#include "command.hpp" + +void zmq::deallocate_command (command_t *cmd_) +{ +    switch (cmd_->type) { +    case command_t::attach: +        if (cmd_->args.attach.peer_identity) +            free (cmd_->args.attach.peer_identity); +        break; +    case command_t::bind: +        if (cmd_->args.bind.peer_identity) +            free (cmd_->args.bind.peer_identity); +        break; +    default: +        /*  noop  */; +    } +} diff --git a/src/command.hpp b/src/command.hpp index 469d6ec..150cad1 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -66,6 +66,8 @@ namespace zmq              //  Attach the engine to the session.              struct {                  struct i_engine *engine; +                unsigned char peer_identity_size; +                unsigned char *peer_identity;              } attach;              //  Sent from session to socket to establish pipe(s) between them. @@ -73,6 +75,8 @@ namespace zmq              struct {                  class reader_t *in_pipe;                  class writer_t *out_pipe; +                unsigned char peer_identity_size; +                unsigned char *peer_identity;              } bind;              //  Sent by pipe writer to inform dormant pipe reader that there @@ -107,6 +111,9 @@ namespace zmq          } args;      }; +    //  Function to deallocate dynamically allocated components of the command. +    void deallocate_command (command_t *cmd_); +  }      #endif diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 8aafcf8..4233278 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -117,6 +117,10 @@ zmq::dispatcher_t::~dispatcher_t ()      while (!pipes.empty ())          delete *pipes.begin (); +    //  TODO: Deallocate any commands still in the pipes. Keep in mind that +    //  simple reading from a pipe and deallocating commands won't do as +    //  command pipe has template parameter D set to true, meaning that +    //  read may return false even if there are still commands in the pipe.      delete [] command_pipes;  #ifdef ZMQ_HAVE_WINDOWS diff --git a/src/downstream.cpp b/src/downstream.cpp index 29b0689..3431264 100644 --- a/src/downstream.cpp +++ b/src/downstream.cpp @@ -26,7 +26,6 @@  zmq::downstream_t::downstream_t (class app_thread_t *parent_) :      socket_base_t (parent_)  { -    options.type = ZMQ_DOWNSTREAM;      options.requires_in = false;      options.requires_out = true;  } @@ -36,7 +35,7 @@ zmq::downstream_t::~downstream_t ()  }  void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_, -    class writer_t *outpipe_) +    class writer_t *outpipe_, const blob_t &peer_identity_)  {      zmq_assert (!inpipe_ && outpipe_);      lb.attach (outpipe_); diff --git a/src/downstream.hpp b/src/downstream.hpp index 35dec95..dbd79a5 100644 --- a/src/downstream.hpp +++ b/src/downstream.hpp @@ -34,7 +34,8 @@ namespace zmq          ~downstream_t ();          //  Overloads of functions from socket_base_t. -        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); +        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, +            const blob_t &peer_identity_);          void xdetach_inpipe (class reader_t *pipe_);          void xdetach_outpipe (class writer_t *pipe_);          void xkill (class reader_t *pipe_); diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp index d60b39e..ddab6a4 100644 --- a/src/i_endpoint.hpp +++ b/src/i_endpoint.hpp @@ -20,6 +20,8 @@  #ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__  #define __ZMQ_I_ENDPOINT_HPP_INCLUDED__ +#include "blob.hpp" +  namespace zmq  { @@ -28,7 +30,7 @@ namespace zmq          virtual ~i_endpoint () {}          virtual void attach_pipes (class reader_t *inpipe_, -            class writer_t *outpipe_) = 0; +            class writer_t *outpipe_, const blob_t &peer_identity_) = 0;          virtual void detach_inpipe (class reader_t *pipe_) = 0;          virtual void detach_outpipe (class writer_t *pipe_) = 0;          virtual void kill (class reader_t *pipe_) = 0; diff --git a/src/i_engine.hpp b/src/i_engine.hpp index bcb4297..81b56df 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -22,6 +22,8 @@  #include <stddef.h> +#include "blob.hpp" +  namespace zmq  { @@ -39,11 +41,11 @@ namespace zmq          //  are messages to send available.          virtual void revive () = 0; -        //  Start tracing the message route. Engine should add the identity -        //  supplied to all inbound messages and trim identity from all the -        //  outbound messages. -        virtual void traceroute (unsigned char *identity_, -            size_t identity_size_) = 0; +        //  Engine should add the prefix supplied to all inbound messages. +        virtual void add_prefix (const blob_t &identity_) = 0; + +        //  Engine should trim prefix from all the outbound messages. +        virtual void trim_prefix () = 0;      };  } diff --git a/src/object.cpp b/src/object.cpp index a977f39..356fcd1 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -17,6 +17,8 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ +#include <string.h> +  #include "object.hpp"  #include "dispatcher.hpp"  #include "err.hpp" @@ -77,17 +79,21 @@ void zmq::object_t::process_command (command_t &cmd_)      case command_t::own:          process_own (cmd_.args.own.object); -        return; +        break;      case command_t::attach: -        process_attach (cmd_.args.attach.engine); +        process_attach (cmd_.args.attach.engine, +            blob_t (cmd_.args.attach.peer_identity, +            cmd_.args.attach.peer_identity_size));          process_seqnum (); -        return; +        break;      case command_t::bind: -        process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); +        process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe, +            blob_t (cmd_.args.bind.peer_identity, +            cmd_.args.bind.peer_identity_size));          process_seqnum (); -        return; +        break;      case command_t::pipe_term:          process_pipe_term (); @@ -95,23 +101,27 @@ void zmq::object_t::process_command (command_t &cmd_)      case command_t::pipe_term_ack:          process_pipe_term_ack (); -        return; +        break;      case command_t::term_req:          process_term_req (cmd_.args.term_req.object); -        return; +        break;      case command_t::term:          process_term (); -        return; +        break;      case command_t::term_ack:          process_term_ack (); -        return; +        break;      default:          zmq_assert (false);      } + +    //  The assumption here is that each command is processed once only, +    //  so deallocating it after processing is all right. +    deallocate_command (&cmd_);  }  void zmq::object_t::register_pipe (class pipe_t *pipe_) @@ -176,7 +186,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)  }  void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, -    bool inc_seqnum_) +    const blob_t &peer_identity_, bool inc_seqnum_)  {      if (inc_seqnum_)          destination_->inc_seqnum (); @@ -185,11 +195,26 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,      cmd.destination = destination_;      cmd.type = command_t::attach;      cmd.args.attach.engine = engine_; +    if (peer_identity_.empty ()) { +        cmd.args.attach.peer_identity_size = 0; +        cmd.args.attach.peer_identity = NULL; +    } +    else { +        zmq_assert (peer_identity_.size () <= 0xff); +        cmd.args.attach.peer_identity_size = +            (unsigned char) peer_identity_.size (); +        cmd.args.attach.peer_identity = +            (unsigned char*) malloc (peer_identity_.size ()); +        zmq_assert (cmd.args.attach.peer_identity_size); +        memcpy (cmd.args.attach.peer_identity, peer_identity_.data (), +            peer_identity_.size ()); +    }      send_command (cmd);  }  void zmq::object_t::send_bind (socket_base_t *destination_, -    reader_t *in_pipe_, writer_t *out_pipe_, bool inc_seqnum_) +    reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_, +    bool inc_seqnum_)  {      if (inc_seqnum_)          destination_->inc_seqnum (); @@ -199,6 +224,20 @@ void zmq::object_t::send_bind (socket_base_t *destination_,      cmd.type = command_t::bind;      cmd.args.bind.in_pipe = in_pipe_;      cmd.args.bind.out_pipe = out_pipe_; +    if (peer_identity_.empty ()) { +        cmd.args.bind.peer_identity_size = 0; +        cmd.args.bind.peer_identity = NULL; +    } +    else { +        zmq_assert (peer_identity_.size () <= 0xff); +        cmd.args.bind.peer_identity_size = +            (unsigned char) peer_identity_.size (); +        cmd.args.bind.peer_identity = +            (unsigned char*) malloc (peer_identity_.size ()); +        zmq_assert (cmd.args.bind.peer_identity_size); +        memcpy (cmd.args.bind.peer_identity, peer_identity_.data (), +            peer_identity_.size ()); +    }      send_command (cmd);  } @@ -267,12 +306,14 @@ void zmq::object_t::process_own (owned_t *object_)      zmq_assert (false);  } -void zmq::object_t::process_attach (i_engine *engine_) +void zmq::object_t::process_attach (i_engine *engine_, +    const blob_t &peer_identity_)  {      zmq_assert (false);  } -void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, +    const blob_t &peer_identity_)  {      zmq_assert (false);  } diff --git a/src/object.hpp b/src/object.hpp index e6b2379..1544109 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -21,6 +21,7 @@  #define __ZMQ_OBJECT_HPP_INCLUDED__  #include "stdint.hpp" +#include "blob.hpp"  namespace zmq  { @@ -64,10 +65,11 @@ namespace zmq          void send_own (class socket_base_t *destination_,              class owned_t *object_);          void send_attach (class session_t *destination_, -            struct i_engine *engine_, bool inc_seqnum_ = true); +             struct i_engine *engine_, const blob_t &peer_identity_, +             bool inc_seqnum_ = true);          void send_bind (class socket_base_t *destination_,               class reader_t *in_pipe_, class writer_t *out_pipe_, -             bool inc_seqnum_ = true); +             const blob_t &peer_identity_, bool inc_seqnum_ = true);          void send_revive (class object_t *destination_);          void send_pipe_term (class writer_t *destination_);          void send_pipe_term_ack (class reader_t *destination_); @@ -81,9 +83,10 @@ namespace zmq          virtual void process_stop ();          virtual void process_plug ();          virtual void process_own (class owned_t *object_); -        virtual void process_attach (struct i_engine *engine_); +        virtual void process_attach (struct i_engine *engine_, +            const blob_t &peer_identity_);          virtual void process_bind (class reader_t *in_pipe_, -            class writer_t *out_pipe_); +            class writer_t *out_pipe_, const blob_t &peer_identity_);          virtual void process_revive ();          virtual void process_pipe_term ();          virtual void process_pipe_term_ack (); diff --git a/src/options.cpp b/src/options.cpp index f9d93d6..f78d8de 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -23,7 +23,6 @@  #include "err.hpp"  zmq::options_t::options_t () : -    type (-1),      hwm (0),      lwm (0),      swap (0), @@ -34,7 +33,9 @@ zmq::options_t::options_t () :      sndbuf (0),      rcvbuf (0),      requires_in (false), -    requires_out (false) +    requires_out (false), +    immediate_connect (true), +    traceroute (false)  {  } @@ -76,7 +77,16 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,          return 0;      case ZMQ_IDENTITY: -        identity.assign ((const char*) optval_, optvallen_); + +        //  Empty identity is invalid as well as identity longer than +        //  255 bytes. Identity starting with binary zero is invalid +        //  as these are used for auto-generated identities. +        if (optvallen_ < 1 || optvallen_ > 255 || +              *((const unsigned char*) optval_) == 0) { +            errno = EINVAL; +            return -1; +        } +        identity.assign ((const unsigned char*) optval_, optvallen_);          return 0;      case ZMQ_RATE: diff --git a/src/options.hpp b/src/options.hpp index dbe3701..6d9be4d 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -20,10 +20,9 @@  #ifndef __ZMQ_OPTIONS_HPP_INCLUDED__  #define __ZMQ_OPTIONS_HPP_INCLUDED__ -#include <string> -  #include "stddef.h"  #include "stdint.hpp" +#include "blob.hpp"  namespace zmq  { @@ -34,14 +33,11 @@ namespace zmq          int setsockopt (int option_, const void *optval_, size_t optvallen_); -        //  Type of the associated socket. One of the constants defined in zmq.h -        int type; -          int64_t hwm;          int64_t lwm;          int64_t swap;          uint64_t affinity; -        std::string identity; +        blob_t identity;          //  Maximum tranfer rate [kb/s]. Default 100kb/s.          uint32_t rate; @@ -59,6 +55,15 @@ namespace zmq          //  provided by the specific socket type.          bool requires_in;          bool requires_out; + +        //  If true, when connecting, pipes are created immediately without +        //  waiting for the connection to be established. That way the socket +        //  is not aware of the peer's identity, however, it is able to send +        //  messages straight away. +        bool immediate_connect; + +        //  If true, socket requires tracerouting the messages. +        bool traceroute;      };  } diff --git a/src/p2p.cpp b/src/p2p.cpp index 46bbd0b..ca7a8f5 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -29,7 +29,6 @@ zmq::p2p_t::p2p_t (class app_thread_t *parent_) :      outpipe (NULL),      alive (true)  { -    options.type = ZMQ_P2P;      options.requires_in = true;      options.requires_out = true;  } @@ -43,7 +42,7 @@ zmq::p2p_t::~p2p_t ()  }  void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_, -    class writer_t *outpipe_) +    class writer_t *outpipe_, const blob_t &peer_identity_)  {      zmq_assert (!inpipe && !outpipe);      inpipe = inpipe_; diff --git a/src/p2p.hpp b/src/p2p.hpp index 2ff1047..bca0eab 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -33,7 +33,8 @@ namespace zmq          ~p2p_t ();          //  Overloads of functions from socket_base_t. -        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); +        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, +            const blob_t &peer_identity_);          void xdetach_inpipe (class reader_t *pipe_);          void xdetach_outpipe (class writer_t *pipe_);          void xkill (class reader_t *pipe_); diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index a2ba9c6..e708229 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -88,8 +88,13 @@ void zmq::pgm_receiver_t::revive ()      zmq_assert (false);  } -void zmq::pgm_receiver_t::traceroute (unsigned char *identity_, -    size_t identity_size_) +void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_) +{ +    //  No need for tracerouting functionality in PGM socket at the moment. +    zmq_assert (false); +} + +void zmq::pgm_receiver_t::trim_prefix ()  {      //  No need for tracerouting functionality in PGM socket at the moment.      zmq_assert (false); diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index f03551f..3f0ef81 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -54,7 +54,8 @@ namespace zmq          void plug (struct i_inout *inout_);          void unplug ();          void revive (); -        void traceroute (unsigned char *identity_, size_t identity_size_); +        void add_prefix (const blob_t &identity_); +        void trim_prefix ();          //  i_poll_events interface implementation.          void in_event (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index fa7d7e0..27b4d0c 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -102,8 +102,13 @@ void zmq::pgm_sender_t::revive ()      out_event ();  } -void zmq::pgm_sender_t::traceroute (unsigned char *identity_, -    size_t identity_size_) +void zmq::pgm_sender_t::add_prefix (const blob_t &identity_) +{ +    //  No need for tracerouting functionality in PGM socket at the moment. +    zmq_assert (false); +} + +void zmq::pgm_sender_t::trim_prefix ()  {      //  No need for tracerouting functionality in PGM socket at the moment.      zmq_assert (false); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 89357f5..951c417 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -52,7 +52,8 @@ namespace zmq          void plug (struct i_inout *inout_);          void unplug ();          void revive (); -        void traceroute (unsigned char *identity_, size_t identity_size_); +        void add_prefix (const blob_t &identity_); +        void trim_prefix ();          //  i_poll_events interface implementation.          void in_event (); diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 1eeb34f..462a3a9 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -89,8 +89,11 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)      if (options.identity.size () > 0) { -        //  Create gsi from identity string. -        gsi_base = options.identity; +        //  Create gsi from identity. +        //  TODO: We assume that identity is standard C string here. +        //  What if it contains binary zeroes? +        gsi_base.assign ((const char*) options.identity.data (), +            options.identity.size ());      } else {          //  Generate random gsi. diff --git a/src/pub.cpp b/src/pub.cpp index 9a2dcc6..5b9d48c 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -27,7 +27,6 @@  zmq::pub_t::pub_t (class app_thread_t *parent_) :      socket_base_t (parent_)  { -    options.type = ZMQ_PUB;      options.requires_in = false;      options.requires_out = true;  } @@ -40,7 +39,7 @@ zmq::pub_t::~pub_t ()  }  void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, -    class writer_t *outpipe_) +    class writer_t *outpipe_, const blob_t &peer_identity_)  {      zmq_assert (!inpipe_);      out_pipes.push_back (outpipe_); diff --git a/src/pub.hpp b/src/pub.hpp index 5b2f348..26142a4 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -34,7 +34,8 @@ namespace zmq          ~pub_t ();          //  Overloads of functions from socket_base_t. -        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); +        void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, +            const blob_t &peer_identity_);          void xdetach_inpipe (class reader_t *pipe_);          void xdetach_outpipe (class writer_t *pipe_);          void xkill (class  | 
