diff options
Diffstat (limited to 'src')
46 files changed, 528 insertions, 266 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 446b1e2..4146f68 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -63,6 +63,7 @@ libzmq_la_SOURCES = app_thread.hpp \ atomic_bitmap.hpp \ atomic_counter.hpp \ atomic_ptr.hpp \ + blob.hpp \ command.hpp \ config.hpp \ decoder.hpp \ @@ -131,6 +132,7 @@ libzmq_la_SOURCES = app_thread.hpp \ zmq_init.hpp \ zmq_listener.hpp \ app_thread.cpp \ + command.cpp \ devpoll.cpp \ dispatcher.cpp \ downstream.cpp \ diff --git a/src/blob.hpp b/src/blob.hpp new file mode 100644 index 0000000..a4fa8cd --- /dev/null +++ b/src/blob.hpp @@ -0,0 +1,33 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_BLOB_HPP_INCLUDED__ +#define __ZMQ_BLOB_HPP_INCLUDED__ + +#include <string> + +namespace zmq +{ + + // Object to hold dynamically allocated opaque binary data. + typedef std::basic_string <unsigned char> blob_t; + +} + +#endif diff --git a/src/command.cpp b/src/command.cpp new file mode 100644 index 0000000..8bf7ea2 --- /dev/null +++ b/src/command.cpp @@ -0,0 +1,38 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdlib.h> + +#include "command.hpp" + +void zmq::deallocate_command (command_t *cmd_) +{ + switch (cmd_->type) { + case command_t::attach: + if (cmd_->args.attach.peer_identity) + free (cmd_->args.attach.peer_identity); + break; + case command_t::bind: + if (cmd_->args.bind.peer_identity) + free (cmd_->args.bind.peer_identity); + break; + default: + /* noop */; + } +} diff --git a/src/command.hpp b/src/command.hpp index 469d6ec..150cad1 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -66,6 +66,8 @@ namespace zmq // Attach the engine to the session. struct { struct i_engine *engine; + unsigned char peer_identity_size; + unsigned char *peer_identity; } attach; // Sent from session to socket to establish pipe(s) between them. @@ -73,6 +75,8 @@ namespace zmq struct { class reader_t *in_pipe; class writer_t *out_pipe; + unsigned char peer_identity_size; + unsigned char *peer_identity; } bind; // Sent by pipe writer to inform dormant pipe reader that there @@ -107,6 +111,9 @@ namespace zmq } args; }; + // Function to deallocate dynamically allocated components of the command. + void deallocate_command (command_t *cmd_); + } #endif diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 8aafcf8..4233278 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -117,6 +117,10 @@ zmq::dispatcher_t::~dispatcher_t () while (!pipes.empty ()) delete *pipes.begin (); + // TODO: Deallocate any commands still in the pipes. Keep in mind that + // simple reading from a pipe and deallocating commands won't do as + // command pipe has template parameter D set to true, meaning that + // read may return false even if there are still commands in the pipe. delete [] command_pipes; #ifdef ZMQ_HAVE_WINDOWS diff --git a/src/downstream.cpp b/src/downstream.cpp index 29b0689..3431264 100644 --- a/src/downstream.cpp +++ b/src/downstream.cpp @@ -26,7 +26,6 @@ zmq::downstream_t::downstream_t (class app_thread_t *parent_) : socket_base_t (parent_) { - options.type = ZMQ_DOWNSTREAM; options.requires_in = false; options.requires_out = true; } @@ -36,7 +35,7 @@ zmq::downstream_t::~downstream_t () } void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (!inpipe_ && outpipe_); lb.attach (outpipe_); diff --git a/src/downstream.hpp b/src/downstream.hpp index 35dec95..dbd79a5 100644 --- a/src/downstream.hpp +++ b/src/downstream.hpp @@ -34,7 +34,8 @@ namespace zmq ~downstream_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp index d60b39e..ddab6a4 100644 --- a/src/i_endpoint.hpp +++ b/src/i_endpoint.hpp @@ -20,6 +20,8 @@ #ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__ #define __ZMQ_I_ENDPOINT_HPP_INCLUDED__ +#include "blob.hpp" + namespace zmq { @@ -28,7 +30,7 @@ namespace zmq virtual ~i_endpoint () {} virtual void attach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) = 0; + class writer_t *outpipe_, const blob_t &peer_identity_) = 0; virtual void detach_inpipe (class reader_t *pipe_) = 0; virtual void detach_outpipe (class writer_t *pipe_) = 0; virtual void kill (class reader_t *pipe_) = 0; diff --git a/src/i_engine.hpp b/src/i_engine.hpp index bcb4297..81b56df 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -22,6 +22,8 @@ #include <stddef.h> +#include "blob.hpp" + namespace zmq { @@ -39,11 +41,11 @@ namespace zmq // are messages to send available. virtual void revive () = 0; - // Start tracing the message route. Engine should add the identity - // supplied to all inbound messages and trim identity from all the - // outbound messages. - virtual void traceroute (unsigned char *identity_, - size_t identity_size_) = 0; + // Engine should add the prefix supplied to all inbound messages. + virtual void add_prefix (const blob_t &identity_) = 0; + + // Engine should trim prefix from all the outbound messages. + virtual void trim_prefix () = 0; }; } diff --git a/src/object.cpp b/src/object.cpp index a977f39..356fcd1 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <string.h> + #include "object.hpp" #include "dispatcher.hpp" #include "err.hpp" @@ -77,17 +79,21 @@ void zmq::object_t::process_command (command_t &cmd_) case command_t::own: process_own (cmd_.args.own.object); - return; + break; case command_t::attach: - process_attach (cmd_.args.attach.engine); + process_attach (cmd_.args.attach.engine, + blob_t (cmd_.args.attach.peer_identity, + cmd_.args.attach.peer_identity_size)); process_seqnum (); - return; + break; case command_t::bind: - process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); + process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe, + blob_t (cmd_.args.bind.peer_identity, + cmd_.args.bind.peer_identity_size)); process_seqnum (); - return; + break; case command_t::pipe_term: process_pipe_term (); @@ -95,23 +101,27 @@ void zmq::object_t::process_command (command_t &cmd_) case command_t::pipe_term_ack: process_pipe_term_ack (); - return; + break; case command_t::term_req: process_term_req (cmd_.args.term_req.object); - return; + break; case command_t::term: process_term (); - return; + break; case command_t::term_ack: process_term_ack (); - return; + break; default: zmq_assert (false); } + + // The assumption here is that each command is processed once only, + // so deallocating it after processing is all right. + deallocate_command (&cmd_); } void zmq::object_t::register_pipe (class pipe_t *pipe_) @@ -176,7 +186,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) } void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, - bool inc_seqnum_) + const blob_t &peer_identity_, bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -185,11 +195,26 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, cmd.destination = destination_; cmd.type = command_t::attach; cmd.args.attach.engine = engine_; + if (peer_identity_.empty ()) { + cmd.args.attach.peer_identity_size = 0; + cmd.args.attach.peer_identity = NULL; + } + else { + zmq_assert (peer_identity_.size () <= 0xff); + cmd.args.attach.peer_identity_size = + (unsigned char) peer_identity_.size (); + cmd.args.attach.peer_identity = + (unsigned char*) malloc (peer_identity_.size ()); + zmq_assert (cmd.args.attach.peer_identity_size); + memcpy (cmd.args.attach.peer_identity, peer_identity_.data (), + peer_identity_.size ()); + } send_command (cmd); } void zmq::object_t::send_bind (socket_base_t *destination_, - reader_t *in_pipe_, writer_t *out_pipe_, bool inc_seqnum_) + reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_, + bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -199,6 +224,20 @@ void zmq::object_t::send_bind (socket_base_t *destination_, cmd.type = command_t::bind; cmd.args.bind.in_pipe = in_pipe_; cmd.args.bind.out_pipe = out_pipe_; + if (peer_identity_.empty ()) { + cmd.args.bind.peer_identity_size = 0; + cmd.args.bind.peer_identity = NULL; + } + else { + zmq_assert (peer_identity_.size () <= 0xff); + cmd.args.bind.peer_identity_size = + (unsigned char) peer_identity_.size (); + cmd.args.bind.peer_identity = + (unsigned char*) malloc (peer_identity_.size ()); + zmq_assert (cmd.args.bind.peer_identity_size); + memcpy (cmd.args.bind.peer_identity, peer_identity_.data (), + peer_identity_.size ()); + } send_command (cmd); } @@ -267,12 +306,14 @@ void zmq::object_t::process_own (owned_t *object_) zmq_assert (false); } -void zmq::object_t::process_attach (i_engine *engine_) +void zmq::object_t::process_attach (i_engine *engine_, + const blob_t &peer_identity_) { zmq_assert (false); } -void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, + const blob_t &peer_identity_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index e6b2379..1544109 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -21,6 +21,7 @@ #define __ZMQ_OBJECT_HPP_INCLUDED__ #include "stdint.hpp" +#include "blob.hpp" namespace zmq { @@ -64,10 +65,11 @@ namespace zmq void send_own (class socket_base_t *destination_, class owned_t *object_); void send_attach (class session_t *destination_, - struct i_engine *engine_, bool inc_seqnum_ = true); + struct i_engine *engine_, const blob_t &peer_identity_, + bool inc_seqnum_ = true); void send_bind (class socket_base_t *destination_, class reader_t *in_pipe_, class writer_t *out_pipe_, - bool inc_seqnum_ = true); + const blob_t &peer_identity_, bool inc_seqnum_ = true); void send_revive (class object_t *destination_); void send_pipe_term (class writer_t *destination_); void send_pipe_term_ack (class reader_t *destination_); @@ -81,9 +83,10 @@ namespace zmq virtual void process_stop (); virtual void process_plug (); virtual void process_own (class owned_t *object_); - virtual void process_attach (struct i_engine *engine_); + virtual void process_attach (struct i_engine *engine_, + const blob_t &peer_identity_); virtual void process_bind (class reader_t *in_pipe_, - class writer_t *out_pipe_); + class writer_t *out_pipe_, const blob_t &peer_identity_); virtual void process_revive (); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); diff --git a/src/options.cpp b/src/options.cpp index f9d93d6..f78d8de 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -23,7 +23,6 @@ #include "err.hpp" zmq::options_t::options_t () : - type (-1), hwm (0), lwm (0), swap (0), @@ -34,7 +33,9 @@ zmq::options_t::options_t () : sndbuf (0), rcvbuf (0), requires_in (false), - requires_out (false) + requires_out (false), + immediate_connect (true), + traceroute (false) { } @@ -76,7 +77,16 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, return 0; case ZMQ_IDENTITY: - identity.assign ((const char*) optval_, optvallen_); + + // Empty identity is invalid as well as identity longer than + // 255 bytes. Identity starting with binary zero is invalid + // as these are used for auto-generated identities. + if (optvallen_ < 1 || optvallen_ > 255 || + *((const unsigned char*) optval_) == 0) { + errno = EINVAL; + return -1; + } + identity.assign ((const unsigned char*) optval_, optvallen_); return 0; case ZMQ_RATE: diff --git a/src/options.hpp b/src/options.hpp index dbe3701..6d9be4d 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -20,10 +20,9 @@ #ifndef __ZMQ_OPTIONS_HPP_INCLUDED__ #define __ZMQ_OPTIONS_HPP_INCLUDED__ -#include <string> - #include "stddef.h" #include "stdint.hpp" +#include "blob.hpp" namespace zmq { @@ -34,14 +33,11 @@ namespace zmq int setsockopt (int option_, const void *optval_, size_t optvallen_); - // Type of the associated socket. One of the constants defined in zmq.h - int type; - int64_t hwm; int64_t lwm; int64_t swap; uint64_t affinity; - std::string identity; + blob_t identity; // Maximum tranfer rate [kb/s]. Default 100kb/s. uint32_t rate; @@ -59,6 +55,15 @@ namespace zmq // provided by the specific socket type. bool requires_in; bool requires_out; + + // If true, when connecting, pipes are created immediately without + // waiting for the connection to be established. That way the socket + // is not aware of the peer's identity, however, it is able to send + // messages straight away. + bool immediate_connect; + + // If true, socket requires tracerouting the messages. + bool traceroute; }; } diff --git a/src/p2p.cpp b/src/p2p.cpp index 46bbd0b..ca7a8f5 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -29,7 +29,6 @@ zmq::p2p_t::p2p_t (class app_thread_t *parent_) : outpipe (NULL), alive (true) { - options.type = ZMQ_P2P; options.requires_in = true; options.requires_out = true; } @@ -43,7 +42,7 @@ zmq::p2p_t::~p2p_t () } void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (!inpipe && !outpipe); inpipe = inpipe_; diff --git a/src/p2p.hpp b/src/p2p.hpp index 2ff1047..bca0eab 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -33,7 +33,8 @@ namespace zmq ~p2p_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index a2ba9c6..e708229 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -88,8 +88,13 @@ void zmq::pgm_receiver_t::revive () zmq_assert (false); } -void zmq::pgm_receiver_t::traceroute (unsigned char *identity_, - size_t identity_size_) +void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_) +{ + // No need for tracerouting functionality in PGM socket at the moment. + zmq_assert (false); +} + +void zmq::pgm_receiver_t::trim_prefix () { // No need for tracerouting functionality in PGM socket at the moment. zmq_assert (false); diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index f03551f..3f0ef81 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -54,7 +54,8 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); - void traceroute (unsigned char *identity_, size_t identity_size_); + void add_prefix (const blob_t &identity_); + void trim_prefix (); // i_poll_events interface implementation. void in_event (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index fa7d7e0..27b4d0c 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -102,8 +102,13 @@ void zmq::pgm_sender_t::revive () out_event (); } -void zmq::pgm_sender_t::traceroute (unsigned char *identity_, - size_t identity_size_) +void zmq::pgm_sender_t::add_prefix (const blob_t &identity_) +{ + // No need for tracerouting functionality in PGM socket at the moment. + zmq_assert (false); +} + +void zmq::pgm_sender_t::trim_prefix () { // No need for tracerouting functionality in PGM socket at the moment. zmq_assert (false); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 89357f5..951c417 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -52,7 +52,8 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); - void traceroute (unsigned char *identity_, size_t identity_size_); + void add_prefix (const blob_t &identity_); + void trim_prefix (); // i_poll_events interface implementation. void in_event (); diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 1eeb34f..462a3a9 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -89,8 +89,11 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (options.identity.size () > 0) { - // Create gsi from identity string. - gsi_base = options.identity; + // Create gsi from identity. + // TODO: We assume that identity is standard C string here. + // What if it contains binary zeroes? + gsi_base.assign ((const char*) options.identity.data (), + options.identity.size ()); } else { // Generate random gsi. diff --git a/src/pub.cpp b/src/pub.cpp index 9a2dcc6..5b9d48c 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -27,7 +27,6 @@ zmq::pub_t::pub_t (class app_thread_t *parent_) : socket_base_t (parent_) { - options.type = ZMQ_PUB; options.requires_in = false; options.requires_out = true; } @@ -40,7 +39,7 @@ zmq::pub_t::~pub_t () } void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (!inpipe_); out_pipes.push_back (outpipe_); diff --git a/src/pub.hpp b/src/pub.hpp index 5b2f348..26142a4 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -34,7 +34,8 @@ namespace zmq ~pub_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); |