diff options
Diffstat (limited to 'src')
46 files changed, 528 insertions, 266 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 446b1e2..4146f68 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -63,6 +63,7 @@ libzmq_la_SOURCES = app_thread.hpp \ atomic_bitmap.hpp \ atomic_counter.hpp \ atomic_ptr.hpp \ + blob.hpp \ command.hpp \ config.hpp \ decoder.hpp \ @@ -131,6 +132,7 @@ libzmq_la_SOURCES = app_thread.hpp \ zmq_init.hpp \ zmq_listener.hpp \ app_thread.cpp \ + command.cpp \ devpoll.cpp \ dispatcher.cpp \ downstream.cpp \ diff --git a/src/blob.hpp b/src/blob.hpp new file mode 100644 index 0000000..a4fa8cd --- /dev/null +++ b/src/blob.hpp @@ -0,0 +1,33 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_BLOB_HPP_INCLUDED__ +#define __ZMQ_BLOB_HPP_INCLUDED__ + +#include <string> + +namespace zmq +{ + + // Object to hold dynamically allocated opaque binary data. + typedef std::basic_string <unsigned char> blob_t; + +} + +#endif diff --git a/src/command.cpp b/src/command.cpp new file mode 100644 index 0000000..8bf7ea2 --- /dev/null +++ b/src/command.cpp @@ -0,0 +1,38 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdlib.h> + +#include "command.hpp" + +void zmq::deallocate_command (command_t *cmd_) +{ + switch (cmd_->type) { + case command_t::attach: + if (cmd_->args.attach.peer_identity) + free (cmd_->args.attach.peer_identity); + break; + case command_t::bind: + if (cmd_->args.bind.peer_identity) + free (cmd_->args.bind.peer_identity); + break; + default: + /* noop */; + } +} diff --git a/src/command.hpp b/src/command.hpp index 469d6ec..150cad1 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -66,6 +66,8 @@ namespace zmq // Attach the engine to the session. struct { struct i_engine *engine; + unsigned char peer_identity_size; + unsigned char *peer_identity; } attach; // Sent from session to socket to establish pipe(s) between them. @@ -73,6 +75,8 @@ namespace zmq struct { class reader_t *in_pipe; class writer_t *out_pipe; + unsigned char peer_identity_size; + unsigned char *peer_identity; } bind; // Sent by pipe writer to inform dormant pipe reader that there @@ -107,6 +111,9 @@ namespace zmq } args; }; + // Function to deallocate dynamically allocated components of the command. + void deallocate_command (command_t *cmd_); + } #endif diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 8aafcf8..4233278 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -117,6 +117,10 @@ zmq::dispatcher_t::~dispatcher_t () while (!pipes.empty ()) delete *pipes.begin (); + // TODO: Deallocate any commands still in the pipes. Keep in mind that + // simple reading from a pipe and deallocating commands won't do as + // command pipe has template parameter D set to true, meaning that + // read may return false even if there are still commands in the pipe. delete [] command_pipes; #ifdef ZMQ_HAVE_WINDOWS diff --git a/src/downstream.cpp b/src/downstream.cpp index 29b0689..3431264 100644 --- a/src/downstream.cpp +++ b/src/downstream.cpp @@ -26,7 +26,6 @@ zmq::downstream_t::downstream_t (class app_thread_t *parent_) : socket_base_t (parent_) { - options.type = ZMQ_DOWNSTREAM; options.requires_in = false; options.requires_out = true; } @@ -36,7 +35,7 @@ zmq::downstream_t::~downstream_t () } void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (!inpipe_ && outpipe_); lb.attach (outpipe_); diff --git a/src/downstream.hpp b/src/downstream.hpp index 35dec95..dbd79a5 100644 --- a/src/downstream.hpp +++ b/src/downstream.hpp @@ -34,7 +34,8 @@ namespace zmq ~downstream_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp index d60b39e..ddab6a4 100644 --- a/src/i_endpoint.hpp +++ b/src/i_endpoint.hpp @@ -20,6 +20,8 @@ #ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__ #define __ZMQ_I_ENDPOINT_HPP_INCLUDED__ +#include "blob.hpp" + namespace zmq { @@ -28,7 +30,7 @@ namespace zmq virtual ~i_endpoint () {} virtual void attach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) = 0; + class writer_t *outpipe_, const blob_t &peer_identity_) = 0; virtual void detach_inpipe (class reader_t *pipe_) = 0; virtual void detach_outpipe (class writer_t *pipe_) = 0; virtual void kill (class reader_t *pipe_) = 0; diff --git a/src/i_engine.hpp b/src/i_engine.hpp index bcb4297..81b56df 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -22,6 +22,8 @@ #include <stddef.h> +#include "blob.hpp" + namespace zmq { @@ -39,11 +41,11 @@ namespace zmq // are messages to send available. virtual void revive () = 0; - // Start tracing the message route. Engine should add the identity - // supplied to all inbound messages and trim identity from all the - // outbound messages. - virtual void traceroute (unsigned char *identity_, - size_t identity_size_) = 0; + // Engine should add the prefix supplied to all inbound messages. + virtual void add_prefix (const blob_t &identity_) = 0; + + // Engine should trim prefix from all the outbound messages. + virtual void trim_prefix () = 0; }; } diff --git a/src/object.cpp b/src/object.cpp index a977f39..356fcd1 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <string.h> + #include "object.hpp" #include "dispatcher.hpp" #include "err.hpp" @@ -77,17 +79,21 @@ void zmq::object_t::process_command (command_t &cmd_) case command_t::own: process_own (cmd_.args.own.object); - return; + break; case command_t::attach: - process_attach (cmd_.args.attach.engine); + process_attach (cmd_.args.attach.engine, + blob_t (cmd_.args.attach.peer_identity, + cmd_.args.attach.peer_identity_size)); process_seqnum (); - return; + break; case command_t::bind: - process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); + process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe, + blob_t (cmd_.args.bind.peer_identity, + cmd_.args.bind.peer_identity_size)); process_seqnum (); - return; + break; case command_t::pipe_term: process_pipe_term (); @@ -95,23 +101,27 @@ void zmq::object_t::process_command (command_t &cmd_) case command_t::pipe_term_ack: process_pipe_term_ack (); - return; + break; case command_t::term_req: process_term_req (cmd_.args.term_req.object); - return; + break; case command_t::term: process_term (); - return; + break; case command_t::term_ack: process_term_ack (); - return; + break; default: zmq_assert (false); } + + // The assumption here is that each command is processed once only, + // so deallocating it after processing is all right. + deallocate_command (&cmd_); } void zmq::object_t::register_pipe (class pipe_t *pipe_) @@ -176,7 +186,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) } void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, - bool inc_seqnum_) + const blob_t &peer_identity_, bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -185,11 +195,26 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, cmd.destination = destination_; cmd.type = command_t::attach; cmd.args.attach.engine = engine_; + if (peer_identity_.empty ()) { + cmd.args.attach.peer_identity_size = 0; + cmd.args.attach.peer_identity = NULL; + } + else { + zmq_assert (peer_identity_.size () <= 0xff); + cmd.args.attach.peer_identity_size = + (unsigned char) peer_identity_.size (); + cmd.args.attach.peer_identity = + (unsigned char*) malloc (peer_identity_.size ()); + zmq_assert (cmd.args.attach.peer_identity_size); + memcpy (cmd.args.attach.peer_identity, peer_identity_.data (), + peer_identity_.size ()); + } send_command (cmd); } void zmq::object_t::send_bind (socket_base_t *destination_, - reader_t *in_pipe_, writer_t *out_pipe_, bool inc_seqnum_) + reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_, + bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -199,6 +224,20 @@ void zmq::object_t::send_bind (socket_base_t *destination_, cmd.type = command_t::bind; cmd.args.bind.in_pipe = in_pipe_; cmd.args.bind.out_pipe = out_pipe_; + if (peer_identity_.empty ()) { + cmd.args.bind.peer_identity_size = 0; + cmd.args.bind.peer_identity = NULL; + } + else { + zmq_assert (peer_identity_.size () <= 0xff); + cmd.args.bind.peer_identity_size = + (unsigned char) peer_identity_.size (); + cmd.args.bind.peer_identity = + (unsigned char*) malloc (peer_identity_.size ()); + zmq_assert (cmd.args.bind.peer_identity_size); + memcpy (cmd.args.bind.peer_identity, peer_identity_.data (), + peer_identity_.size ()); + } send_command (cmd); } @@ -267,12 +306,14 @@ void zmq::object_t::process_own (owned_t *object_) zmq_assert (false); } -void zmq::object_t::process_attach (i_engine *engine_) +void zmq::object_t::process_attach (i_engine *engine_, + const blob_t &peer_identity_) { zmq_assert (false); } -void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, + const blob_t &peer_identity_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index e6b2379..1544109 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -21,6 +21,7 @@ #define __ZMQ_OBJECT_HPP_INCLUDED__ #include "stdint.hpp" +#include "blob.hpp" namespace zmq { @@ -64,10 +65,11 @@ namespace zmq void send_own (class socket_base_t *destination_, class owned_t *object_); void send_attach (class session_t *destination_, - struct i_engine *engine_, bool inc_seqnum_ = true); + struct i_engine *engine_, const blob_t &peer_identity_, + bool inc_seqnum_ = true); void send_bind (class socket_base_t *destination_, class reader_t *in_pipe_, class writer_t *out_pipe_, - bool inc_seqnum_ = true); + const blob_t &peer_identity_, bool inc_seqnum_ = true); void send_revive (class object_t *destination_); void send_pipe_term (class writer_t *destination_); void send_pipe_term_ack (class reader_t *destination_); @@ -81,9 +83,10 @@ namespace zmq virtual void process_stop (); virtual void process_plug (); virtual void process_own (class owned_t *object_); - virtual void process_attach (struct i_engine *engine_); + virtual void process_attach (struct i_engine *engine_, + const blob_t &peer_identity_); virtual void process_bind (class reader_t *in_pipe_, - class writer_t *out_pipe_); + class writer_t *out_pipe_, const blob_t &peer_identity_); virtual void process_revive (); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); diff --git a/src/options.cpp b/src/options.cpp index f9d93d6..f78d8de 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -23,7 +23,6 @@ #include "err.hpp" zmq::options_t::options_t () : - type (-1), hwm (0), lwm (0), swap (0), @@ -34,7 +33,9 @@ zmq::options_t::options_t () : sndbuf (0), rcvbuf (0), requires_in (false), - requires_out (false) + requires_out (false), + immediate_connect (true), + traceroute (false) { } @@ -76,7 +77,16 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, return 0; case ZMQ_IDENTITY: - identity.assign ((const char*) optval_, optvallen_); + + // Empty identity is invalid as well as identity longer than + // 255 bytes. Identity starting with binary zero is invalid + // as these are used for auto-generated identities. + if (optvallen_ < 1 || optvallen_ > 255 || + *((const unsigned char*) optval_) == 0) { + errno = EINVAL; + return -1; + } + identity.assign ((const unsigned char*) optval_, optvallen_); return 0; case ZMQ_RATE: diff --git a/src/options.hpp b/src/options.hpp index dbe3701..6d9be4d 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -20,10 +20,9 @@ #ifndef __ZMQ_OPTIONS_HPP_INCLUDED__ #define __ZMQ_OPTIONS_HPP_INCLUDED__ -#include <string> - #include "stddef.h" #include "stdint.hpp" +#include "blob.hpp" namespace zmq { @@ -34,14 +33,11 @@ namespace zmq int setsockopt (int option_, const void *optval_, size_t optvallen_); - // Type of the associated socket. One of the constants defined in zmq.h - int type; - int64_t hwm; int64_t lwm; int64_t swap; uint64_t affinity; - std::string identity; + blob_t identity; // Maximum tranfer rate [kb/s]. Default 100kb/s. uint32_t rate; @@ -59,6 +55,15 @@ namespace zmq // provided by the specific socket type. bool requires_in; bool requires_out; + + // If true, when connecting, pipes are created immediately without + // waiting for the connection to be established. That way the socket + // is not aware of the peer's identity, however, it is able to send + // messages straight away. + bool immediate_connect; + + // If true, socket requires tracerouting the messages. + bool traceroute; }; } diff --git a/src/p2p.cpp b/src/p2p.cpp index 46bbd0b..ca7a8f5 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -29,7 +29,6 @@ zmq::p2p_t::p2p_t (class app_thread_t *parent_) : outpipe (NULL), alive (true) { - options.type = ZMQ_P2P; options.requires_in = true; options.requires_out = true; } @@ -43,7 +42,7 @@ zmq::p2p_t::~p2p_t () } void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (!inpipe && !outpipe); inpipe = inpipe_; diff --git a/src/p2p.hpp b/src/p2p.hpp index 2ff1047..bca0eab 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -33,7 +33,8 @@ namespace zmq ~p2p_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index a2ba9c6..e708229 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -88,8 +88,13 @@ void zmq::pgm_receiver_t::revive () zmq_assert (false); } -void zmq::pgm_receiver_t::traceroute (unsigned char *identity_, - size_t identity_size_) +void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_) +{ + // No need for tracerouting functionality in PGM socket at the moment. + zmq_assert (false); +} + +void zmq::pgm_receiver_t::trim_prefix () { // No need for tracerouting functionality in PGM socket at the moment. zmq_assert (false); diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index f03551f..3f0ef81 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -54,7 +54,8 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); - void traceroute (unsigned char *identity_, size_t identity_size_); + void add_prefix (const blob_t &identity_); + void trim_prefix (); // i_poll_events interface implementation. void in_event (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index fa7d7e0..27b4d0c 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -102,8 +102,13 @@ void zmq::pgm_sender_t::revive () out_event (); } -void zmq::pgm_sender_t::traceroute (unsigned char *identity_, - size_t identity_size_) +void zmq::pgm_sender_t::add_prefix (const blob_t &identity_) +{ + // No need for tracerouting functionality in PGM socket at the moment. + zmq_assert (false); +} + +void zmq::pgm_sender_t::trim_prefix () { // No need for tracerouting functionality in PGM socket at the moment. zmq_assert (false); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 89357f5..951c417 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -52,7 +52,8 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); - void traceroute (unsigned char *identity_, size_t identity_size_); + void add_prefix (const blob_t &identity_); + void trim_prefix (); // i_poll_events interface implementation. void in_event (); diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 1eeb34f..462a3a9 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -89,8 +89,11 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (options.identity.size () > 0) { - // Create gsi from identity string. - gsi_base = options.identity; + // Create gsi from identity. + // TODO: We assume that identity is standard C string here. + // What if it contains binary zeroes? + gsi_base.assign ((const char*) options.identity.data (), + options.identity.size ()); } else { // Generate random gsi. diff --git a/src/pub.cpp b/src/pub.cpp index 9a2dcc6..5b9d48c 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -27,7 +27,6 @@ zmq::pub_t::pub_t (class app_thread_t *parent_) : socket_base_t (parent_) { - options.type = ZMQ_PUB; options.requires_in = false; options.requires_out = true; } @@ -40,7 +39,7 @@ zmq::pub_t::~pub_t () } void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (!inpipe_); out_pipes.push_back (outpipe_); diff --git a/src/pub.hpp b/src/pub.hpp index 5b2f348..26142a4 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -34,7 +34,8 @@ namespace zmq ~pub_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/rep.cpp b/src/rep.cpp index 1649e83..755d78e 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -30,9 +30,13 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) : waiting_for_reply (false), reply_pipe (NULL) { - options.type = ZMQ_REP; options.requires_in = true; options.requires_out = true; + + // We don't need immediate connect. We'll be able to send messages + // (replies) only when connection is established and thus requests + // can arrive anyway. + options.immediate_connect = false; } zmq::rep_t::~rep_t () @@ -40,7 +44,7 @@ zmq::rep_t::~rep_t () } void zmq::rep_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (inpipe_ && outpipe_); zmq_assert (in_pipes.size () == out_pipes.size ()); diff --git a/src/rep.hpp b/src/rep.hpp index 7170da7..7ead321 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -34,7 +34,8 @@ namespace zmq ~rep_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/req.cpp b/src/req.cpp index 9b1766e..735f0aa 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -30,7 +30,6 @@ zmq::req_t::req_t (class app_thread_t *parent_) : reply_pipe_active (false), reply_pipe (NULL) { - options.type = ZMQ_REQ; options.requires_in = true; options.requires_out = true; } @@ -40,7 +39,7 @@ zmq::req_t::~req_t () } void zmq::req_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (inpipe_ && outpipe_); zmq_assert (in_pipes.size () == out_pipes.size ()); diff --git a/src/req.hpp b/src/req.hpp index 60ee5e7..da8e61a 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -34,7 +34,8 @@ namespace zmq ~req_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/session.cpp b/src/session.cpp index 1aece4d..05f319c 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -32,9 +32,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, out_pipe (NULL), engine (NULL), options (options_) -{ - type = unnamed; - +{ // It's possible to register the session at this point as it will be // searched for only on reconnect, i.e. no race condition (session found // before it is plugged into it's I/O thread) is possible. @@ -42,23 +40,24 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, } zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_, const char *name_) : + const options_t &options_, const blob_t &peer_identity_) : owned_t (parent_, owner_), in_pipe (NULL), active (true), out_pipe (NULL), engine (NULL), + ordinal (0), + peer_identity (peer_identity_), options (options_) { - if (name_) { - type = named; - name = name_; - ordinal = 0; - } - else { - type = transient; - // TODO: Generate unique name here. - ordinal = 0; + if (!peer_identity.empty () && peer_identity [0] != 0) { + if (!owner->register_session (peer_identity, this)) { + + // TODO: There's already a session with the specified + // identity. We should presumably syslog it and drop the + // session. + zmq_assert (false); + } } } @@ -104,7 +103,7 @@ void zmq::session_t::detach (owned_t *reconnecter_) engine = NULL; // Terminate transient session. - if (type == transient) + if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0)) term (); } @@ -120,13 +119,12 @@ class zmq::socket_base_t *zmq::session_t::get_owner () uint64_t zmq::session_t::get_ordinal () { - zmq_assert (type == unnamed); zmq_assert (ordinal); return ordinal; } void zmq::session_t::attach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { if (inpipe_) { zmq_assert (!in_pipe); @@ -168,52 +166,15 @@ void zmq::session_t::revive (reader_t *pipe_) void zmq::session_t::process_plug () { - // Register the session with the socket. - if (!name.empty ()) { - bool ok = owner->register_session (name.c_str (), this); - - // There's already a session with the specified identity. - // We should syslog it and drop the session. TODO - zmq_assert (ok); - } - - // If session is created by 'connect' function, it has the pipes set - // already. Otherwise, it's being created by the listener and the pipes - // are yet to be created. - if (!in_pipe && !out_pipe) { - - pipe_t *inbound = NULL; - pipe_t *outbound = NULL; - - if (options.requires_out) { - inbound = new (std::nothrow) pipe_t (this, owner, - options.hwm, options.lwm); - zmq_assert (inbound); - in_pipe = &inbound->reader; - in_pipe->set_endpoint (this); - } - - if (options.requires_in) { - outbound = new (std::nothrow) pipe_t (owner, this, - options.hwm, options.lwm); - zmq_assert (outbound); - out_pipe = &outbound->writer; - out_pipe->set_endpoint (this); - } - - send_bind (owner, outbound ? &outbound->reader : NULL, - inbound ? &inbound->writer : NULL); - } } void zmq::session_t::process_unplug () { - // Unregister the session from the socket. There's nothing to do here - // for transient sessions. - if (type == unnamed) + // Unregister the session from the socket. + if (ordinal) owner->unregister_session (ordinal); - else if (type == named) - owner->unregister_session (name.c_str ()); + else if (!peer_identity.empty () && peer_identity [0] != 0) + owner->unregister_session (peer_identity); // Ask associated pipes to terminate. if (in_pipe) { @@ -232,10 +193,67 @@ void zmq::session_t::process_unplug () } } -void zmq::session_t::process_attach (i_engine *engine_) +void zmq::session_t::process_attach (i_engine *engine_, + const blob_t &peer_identity_) { + if (!peer_identity.empty ()) { + + // If we already know the peer name do nothing, just check whether + // it haven't changed. + zmq_assert (peer_identity == peer_identity_); + } + else if (!peer_identity_.empty ()) { + + // Store the peer identity. + peer_identity = peer_identity_; + + // If the session is not registered with the ordinal, let's register + // it using the peer name. + if (!ordinal) { + if (!owner->register_session (peer_identity, this)) { + + // TODO: There's already a session with the specified + // identity. We should presumably syslog it and drop the + // session. + zmq_assert (false); + } + } + } + + // Check whether the required pipes already exist. If not so, we'll + // create them and bind them to the socket object. + reader_t *socket_reader = NULL; + writer_t *socket_writer = NULL; + + if (options.requires_in && !out_pipe) { + pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, + options.hwm, options.lwm); + zmq_assert (pipe); + out_pipe = &pipe->writer; + out_pipe->set_endpoint (this); + socket_reader = &pipe->reader; + } + + if (options.requires_out && !in_pipe) { + pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, + options.hwm, options.lwm); + zmq_assert (pipe); + in_pipe = &pipe->reader; + in_pipe->set_endpoint (this); + socket_writer = &pipe->writer; + } + + if (socket_reader || socket_writer) + send_bind (owner, socket_reader, socket_writer, peer_identity); + + // Plug in the engine. zmq_assert (!engine); zmq_assert (engine_); engine = engine_; engine->plug (this); + + // Once the initial handshaking is over tracerouting should trim prefixes + // from outbound messages. + if (options.traceroute) + engine->trim_prefix (); } diff --git a/src/session.hpp b/src/session.hpp index 375d095..872748c 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -20,12 +20,11 @@ #ifndef __ZMQ_SESSION_HPP_INCLUDED__ #define __ZMQ_SESSION_HPP_INCLUDED__ -#include <string> - #include "i_inout.hpp" #include "i_endpoint.hpp" #include "owned.hpp" #include "options.hpp" +#include "blob.hpp" namespace zmq { @@ -38,10 +37,9 @@ namespace zmq session_t (object_t *parent_, socket_base_t *owner_, const options_t &options_); - // Creates named session. If name is NULL, transient session with - // auto-generated name is created. + // Creates named session. session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_, const char *name_); + const options_t &options_, const blob_t &peer_identity_); // i_inout interface implementation. bool read (::zmq_msg_t *msg_); @@ -53,7 +51,8 @@ namespace zmq uint64_t get_ordinal (); // i_endpoint interface implementation. - void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void detach_inpipe (class reader_t *pipe_); void detach_outpipe (class writer_t *pipe_); void kill (class reader_t *pipe_); @@ -66,7 +65,8 @@ namespace zmq // Handlers for incoming commands. void process_plug (); void process_unplug (); - void process_attach (struct i_engine *engine_); + void process_attach (struct i_engine *engine_, + const blob_t &peer_identity_); // Inbound pipe, i.e. one the session is getting messages from. class reader_t *in_pipe; @@ -79,18 +79,13 @@ namespace zmq struct i_engine *engine; - enum { - transient, - named, - unnamed - } type; - - // Ordinal of the session (if any). + // Session is identified by ordinal in the case when it was created + // before connection to the peer was established and thus we are + // unaware of peer's identity. uint64_t ordinal; - // The name of the session. One that is used to register it with - // socket-level repository of sessions. - std::string name; + // Identity of the peer. + blob_t peer_identity; // Inherited socket options. options_t options; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index ef563e5..871f9e9 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -141,6 +141,10 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "inproc") { + // TODO: inproc connect is specific with respect to creating pipes + // as there's no 'reconnect' functionality implemented. Once that + // is in place we should follow generic pipe creation algorithm. + // Find the peer socket. socket_base_t *peer = find_endpoint (addr_args.c_str ()); if (!peer) @@ -165,13 +169,13 @@ int zmq::socket_base_t::connect (const char *addr_) // Attach the pipes to this socket object. attach_pipes (in_pipe ? &in_pipe->reader : NULL, - out_pipe ? &out_pipe->writer : NULL); + out_pipe ? &out_pipe->writer : NULL, blob_t ()); // Attach the pipes to the peer socket. Note that peer's seqnum // was incremented in find_endpoint function. The callee is notified // about the fact via the last parameter. send_bind (peer, out_pipe ? &out_pipe->reader : NULL, - in_pipe ? &in_pipe->writer : NULL, false); + in_pipe ? &in_pipe->writer : NULL, options.identity, false); return 0; } @@ -182,31 +186,37 @@ int zmq::socket_base_t::connect (const char *addr_) this, options); zmq_assert (session); - pipe_t *in_pipe = NULL; - pipe_t *out_pipe = NULL; + // If 'immediate connect' feature is required, we'll created the pipes + // to the session straight away. Otherwise, they'll be created by the + // session once the connection is established. + if (options.immediate_connect) { - // Create inbound pipe, if required. - if (options.requires_in) { - in_pipe = new (std::nothrow) pipe_t (this, session, - options.hwm, options.lwm); - zmq_assert (in_pipe); + pipe_t *in_pipe = NULL; + pipe_t *out_pipe = NULL; - } + // Create inbound pipe, if required. + if (options.requires_in) { + in_pipe = new (std::nothrow) pipe_t (this, session, + options.hwm, options.lwm); + zmq_assert (in_pipe); - // Create outbound pipe, if required. - if (options.requires_out) { - out_pipe = new (std::nothrow) pipe_t (session, this, - options.hwm, options.lwm); - zmq_assert (out_pipe); - } + } - // Attach the pipes to the socket object. - attach_pipes (in_pipe ? &in_pipe->reader : NULL, - out_pipe ? &out_pipe->writer : NULL); + // Create outbound pipe, if required. + if (options.requires_out) { + out_pipe = new (std::nothrow) pipe_t (session, this, + options.hwm, options.lwm); + zmq_assert (out_pipe); + } - // Attach the pipes to the session object. - session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, - in_pipe ? &in_pipe->writer : NULL); + // Attach the pipes to the socket object. + attach_pipes (in_pipe ? &in_pipe->reader : NULL, + out_pipe ? &out_pipe->writer : NULL, blob_t ()); + + // Attach the pipes to the session object. + session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, + in_pipe ? &in_pipe->writer : NULL, blob_t ()); + } // Activate the session. send_plug (session); @@ -215,6 +225,8 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "tcp" || addr_type == "ipc") { #if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS + // Windows named pipes are not compatible with Winsock API. + // There's no UNIX domain socket implementation on OpenVMS. if (addr_type == "ipc") { errno = EPROTONOSUPPORT; return -1; @@ -254,6 +266,9 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "udp") udp_encapsulation = true; + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with PGM anyway. if (options.requires_out) { // PGM sender. @@ -267,7 +282,7 @@ int zmq::socket_base_t::connect (const char *addr_) return -1; } - send_attach (session, pgm_sender); + send_attach (session, pgm_sender, blob_t ()); } else if (options.requires_in) { @@ -282,7 +297,7 @@ int zmq::socket_base_t::connect (const char *addr_) return -1; } - send_attach (session, pgm_receiver); + send_attach (session, pgm_receiver, blob_t ()); } else zmq_assert (false); @@ -456,30 +471,29 @@ bool zmq::socket_base_t::has_out () return xhas_out (); } -bool zmq::socket_base_t::register_session (const char *name_, +bool zmq::socket_base_t::register_session (const blob_t &peer_identity_, session_t *session_) { sessions_sync.lock (); - bool registered = - named_sessions.insert (std::make_pair (name_, session_)).second; + bool registered = named_sessions.insert ( + std::make_pair (peer_identity_, session_)).second; sessions_sync.unlock (); return registered; } -void zmq::socket_base_t::unregister_session (const char *name_) +void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_) { sessions_sync.lock (); - named_sessions_t::iterator it = named_sessions.find (name_); + named_sessions_t::iterator it = named_sessions.find (peer_identity_); zmq_assert (it != named_sessions.end ()); named_sessions.erase (it); sessions_sync.unlock (); } -zmq::session_t *zmq::socket_base_t::find_session (const char *name_) +zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_) { sessions_sync.lock (); - - named_sessions_t::iterator it = named_sessions.find (name_); + named_sessions_t::iterator it = named_sessions.find (peer_identity_); if (it == named_sessions.end ()) { sessions_sync.unlock (); return NULL; @@ -541,13 +555,13 @@ void zmq::socket_base_t::revive (reader_t *pipe_) } void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { if (inpipe_) inpipe_->set_endpoint (this); if (outpipe_) outpipe_->set_endpoint (this); - xattach_pipes (inpipe_, outpipe_); + xattach_pipes (inpipe_, outpipe_, peer_identity_); } void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_) @@ -567,9 +581,10 @@ void zmq::socket_base_t::process_own (owned_t *object_) io_objects.insert (object_); } -void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, + const blob_t &peer_identity_) { - attach_pipes (in_pipe_, out_pipe_); + attach_pipes (in_pipe_, out_pipe_, peer_identity_); } void zmq::socket_base_t::process_term_req (owned_t *object_) diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 1ad9ed1..5327acc 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -23,7 +23,6 @@ #include <set> #include <map> #include <vector> -#include <string> #include "../bindings/c/zmq.h" @@ -35,6 +34,7 @@ #include "stdint.hpp" #include "atomic_counter.hpp" #include "stdint.hpp" +#include "blob.hpp" namespace zmq { @@ -78,15 +78,17 @@ namespace zmq // There are two distinct types of sessions: those identified by name // and those identified by ordinal number. Thus two sets of session // management functions. - bool register_session (const char *name_, class session_t *session_); - void unregister_session (const char *name_); - class session_t *find_session (const char *name_); + bool register_session (const blob_t &peer_identity_, + class session_t *session_); + void unregister_session (const blob_t &peer_identity_); + class session_t *find_session (const blob_t &peer_identity_); uint64_t register_session (class session_t *session_); void unregister_session (uint64_t ordinal_); class session_t *find_session (uint64_t ordinal_); // i_endpoint interface implementation. - void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void detach_inpipe (class reader_t *pipe_); void detach_outpipe (class writer_t *pipe_); void kill (class reader_t *pipe_); @@ -99,7 +101,7 @@ namespace zmq // Pipe management is done by individual socket types. virtual void xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) = 0; + class writer_t *outpipe_, const blob_t &peer_identity_) = 0; virtual void xdetach_inpipe (class reader_t *pipe_) = 0; virtual void xdetach_outpipe (class writer_t *pipe_) = 0; virtual void xkill (class reader_t *pipe_) = 0; @@ -121,7 +123,8 @@ namespace zmq // Handlers for incoming commands. void process_own (class owned_t *object_); - void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_); + void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_, + const blob_t &peer_identity_); void process_term_req (class owned_t *object_); void process_term_ack (); void process_seqnum (); @@ -155,7 +158,7 @@ namespace zmq // within the socket, instead they are used by I/O objects owned by // the socket. As those objects can live in different threads, // the access is synchronised by mutex. - typedef std::map <std::string, session_t*> named_sessions_t; + typedef std::map <blob_t, session_t*> named_sessions_t; named_sessions_t named_sessions; typedef std::map <uint64_t, session_t*> unnamed_sessions_t; unnamed_sessions_t unnamed_sessions; diff --git a/src/sub.cpp b/src/sub.cpp index 31ee222..29ac951 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -28,7 +28,6 @@ zmq::sub_t::sub_t (class app_thread_t *parent_) : socket_base_t (parent_), has_message (false) { - options.type = ZMQ_SUB; options.requires_in = true; options.requires_out = false; zmq_msg_init (&message); @@ -40,7 +39,7 @@ zmq::sub_t::~sub_t () } void zmq::sub_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (inpipe_ && !outpipe_); fq.attach (inpipe_); diff --git a/src/sub.hpp b/src/sub.hpp index 9e7d6cc..8234b77 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -39,7 +39,8 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/upstream.cpp b/src/upstream.cpp index 390dcbe..d7238b9 100644 --- a/src/upstream.cpp +++ b/src/upstream.cpp @@ -25,7 +25,6 @@ zmq::upstream_t::upstream_t (class app_thread_t *parent_) : socket_base_t (parent_) { - options.type = ZMQ_UPSTREAM; options.requires_in = true; options.requires_out = false; } @@ -35,7 +34,7 @@ zmq::upstream_t::~upstream_t () } void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (inpipe_ && !outpipe_); fq.attach (inpipe_); diff --git a/src/upstream.hpp b/src/upstream.hpp index 1e6914b..d1ee7b1 100644 --- a/src/upstream.hpp +++ b/src/upstream.hpp @@ -34,7 +34,8 @@ namespace zmq ~upstream_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/uuid.hpp b/src/uuid.hpp index 00424ed..03bb69d 100644 --- a/src/uuid.hpp +++ b/src/uuid.hpp @@ -44,6 +44,9 @@ namespace zmq uuid_t (); ~uuid_t (); + // The length of textual representation of UUID. + enum { uuid_string_len = 36 }; + // Returns a pointer to buffer containing the textual // representation of the UUID. The caller is reponsible to // free the allocated memory. @@ -51,9 +54,6 @@ namespace zmq private: - // The length of textual representation of UUID. - enum { uuid_string_len = 36 }; - #if defined ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_MINGW32 typedef unsigned char* RPC_CSTR; diff --git a/src/xrep.cpp b/src/xrep.cpp index 67a9a39..6fa6bfa 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -21,13 +21,21 @@ #include "xrep.hpp" #include "err.hpp" +#include "pipe.hpp" zmq::xrep_t::xrep_t (class app_thread_t *parent_) : socket_base_t (parent_) { - options.type = ZMQ_XREP; options.requires_in = true; options.requires_out = true; + + // On connect, pipes are created only after initial handshaking. + // That way we are aware of the peer's identity when binding to the pipes. + options.immediate_connect = false; + + // XREP socket adds identity to inbound messages and strips identity + // from the outbound messages. + options.traceroute = true; } zmq::xrep_t::~xrep_t () @@ -35,12 +43,15 @@ zmq::xrep_t::~xrep_t () } void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (inpipe_ && outpipe_); fq.attach (inpipe_); - zmq_assert (false); + // TODO: What if new connection has same peer identity as the old one? + bool ok = outpipes.insert (std::make_pair ( + peer_identity_, outpipe_)).second; + zmq_assert (ok); } void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_) @@ -51,6 +62,12 @@ void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_) void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_) { + for (outpipes_t::iterator it = outpipes.begin (); + it != outpipes.end (); ++it) + if (it->second == pipe_) { + outpipes.erase (it); + return; + } zmq_assert (false); } @@ -73,8 +90,35 @@ int zmq::xrep_t::xsetsockopt (int option_, const void *optval_, int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) { - zmq_assert (false); - return -1; + unsigned char *data = (unsigned char*) zmq_msg_data (msg_); + size_t size = zmq_msg_size (msg_); + + // Check whether the message is well-formed. + zmq_assert (size >= 1); + zmq_assert (size_t (*data + 1) <= size); + + // Find the corresponding outbound pipe. If there's none, just drop the + // message. + // TODO: There's an allocation here! It's the critical path! Get rid of it! + blob_t identity (data + 1, *data); + outpipes_t::iterator it = outpipes.find (identity); + if (it == outpipes.end ()) { + int rc = zmq_msg_close (msg_); + zmq_assert (rc == 0); + rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + return 0; + } + + // Push message to the selected pipe. + it->second->write (msg_); + it->second->flush (); + + // Detach the message from the data buffer. + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + + return 0; } int zmq::xrep_t::xflush () @@ -95,8 +139,10 @@ bool zmq::xrep_t::xhas_in () bool zmq::xrep_t::xhas_out () { - zmq_assert (false); - return false; + // In theory, XREP socket is always ready for writing. Whether actual + // attempt to write succeeds depends on whitch pipe the message is going + // to be routed to. + return true; } diff --git a/src/xrep.hpp b/src/xrep.hpp index 67ab02d..4534463 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -20,7 +20,10 @@ #ifndef __ZMQ_XREP_HPP_INCLUDED__ #define __ZMQ_XREP_HPP_INCLUDED__ +#include <map> + #include "socket_base.hpp" +#include "blob.hpp" #include "fq.hpp" namespace zmq @@ -34,7 +37,8 @@ namespace zmq ~xrep_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); @@ -51,6 +55,10 @@ namespace zmq // Inbound messages are fair-queued. fq_t fq; + // Outbound pipes indexed by the peer names. + typedef std::map <blob_t, class writer_t*> outpipes_t; + outpipes_t outpipes; + xrep_t (const xrep_t&); void operator = (const xrep_t&); }; diff --git a/src/xreq.cpp b/src/xreq.cpp index 691b66e..dda924c 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -25,7 +25,6 @@ zmq::xreq_t::xreq_t (class app_thread_t *parent_) : socket_base_t (parent_) { - options.type = ZMQ_REQ; options.requires_in = true; options.requires_out = true; } @@ -35,7 +34,7 @@ zmq::xreq_t::~xreq_t () } void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (inpipe_ && outpipe_); fq.attach (inpipe_); diff --git a/src/xreq.hpp b/src/xreq.hpp index d0cbb4f..e23e832 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -35,7 +35,8 @@ namespace zmq ~xreq_t (); // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); void xdetach_inpipe (class reader_t *pipe_); void xdetach_outpipe (class writer_t *pipe_); void xkill (class reader_t *pipe_); diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index f502ffd..b1776df 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -27,9 +27,7 @@ zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) : decoder_t <zmq_decoder_t> (bufsize_), - destination (NULL), - prefix (NULL), - prefix_size (0) + destination (NULL) { zmq_msg_init (&in_progress); @@ -39,9 +37,6 @@ zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) : zmq::zmq_decoder_t::~zmq_decoder_t () { - if (prefix) - free (prefix); - zmq_msg_close (&in_progress); } @@ -50,13 +45,9 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_) destination = destination_; } -void zmq::zmq_decoder_t::add_prefix (unsigned char *prefix_, - size_t prefix_size_) +void zmq::zmq_decoder_t::add_prefix (const blob_t &prefix_) { - prefix = malloc (prefix_size_); - zmq_assert (prefix); - memcpy (prefix, prefix_, prefix_size_); - prefix_size = prefix_size_; + prefix = prefix_; } bool zmq::zmq_decoder_t::one_byte_size_ready () @@ -72,15 +63,22 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - int rc = zmq_msg_init_size (&in_progress, prefix_size + *tmpbuf); - errno_assert (rc == 0); - - // Fill in the message prefix if any. - if (prefix) - memcpy (zmq_msg_data (&in_progress), prefix, prefix_size); - - next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size, - *tmpbuf, &zmq_decoder_t::message_ready); + if (prefix.empty ()) { + int rc = zmq_msg_init_size (&in_progress, *tmpbuf); + errno_assert (rc == 0); + next_step (zmq_msg_data (&in_progress), *tmpbuf, + &zmq_decoder_t::message_ready); + } + else { + int rc = zmq_msg_init_size (&in_progress, + *tmpbuf + 1 + prefix.size ()); + errno_assert (rc == 0); + unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress); + *data = (unsigned char) prefix.size (); + memcpy (data + 1, prefix.data (), *data); + next_step (data + *data + 1, *tmpbuf, + &zmq_decoder_t::message_ready); + } } return true; } @@ -95,15 +93,21 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - int rc = zmq_msg_init_size (&in_progress, prefix_size + size); - errno_assert (rc == 0); - - // Fill in the message prefix if any. - if (prefix) - memcpy (zmq_msg_data (&in_progress), prefix, prefix_size); + if (prefix.empty ()) { + int rc = zmq_msg_init_size (&in_progress, size); + errno_assert (rc == 0); + next_step (zmq_msg_data (&in_progress), size, + &zmq_decoder_t::message_ready); + } + else { + int rc = zmq_msg_init_size (&in_progress, size + 1 + prefix.size ()); + errno_assert (rc == 0); + unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress); + *data = (unsigned char) prefix.size (); + memcpy (data + 1, prefix.data (), *data); + next_step (data + *data + 1, size, &zmq_decoder_t::message_ready); + } - next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size , - size, &zmq_decoder_t::message_ready); return true; } diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp index dfabece..11ee6c2 100644 --- a/src/zmq_decoder.hpp +++ b/src/zmq_decoder.hpp @@ -23,6 +23,7 @@ #include "../bindings/c/zmq.h" #include "decoder.hpp" +#include "blob.hpp" namespace zmq { @@ -41,7 +42,7 @@ namespace zmq // Once called, all decoded messages will be prefixed by the specified // prefix. - void add_prefix (unsigned char *prefix_, size_t prefix_size_); + void add_prefix (const blob_t &prefix_); private: @@ -53,8 +54,7 @@ namespace zmq unsigned char tmpbuf [8]; ::zmq_msg_t in_progress; - void *prefix; - size_t prefix_size; + blob_t prefix; zmq_decoder_t (const zmq_decoder_t&); void operator = (const zmq_decoder_t&); diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index a60fe5e..68626fa 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -56,8 +56,9 @@ bool zmq::zmq_encoder_t::size_ready () } else { size_t prefix_size = *(unsigned char*) zmq_msg_data (&in_progress); - next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size, - zmq_msg_size (&in_progress) - prefix_size, + next_step ( + (unsigned char*) zmq_msg_data (&in_progress) + prefix_size + 1, + zmq_msg_size (&in_progress) - prefix_size - 1, &zmq_encoder_t::message_ready, false); } return true; @@ -80,8 +81,13 @@ bool zmq::zmq_encoder_t::message_ready () // Get the message size. If the prefix is not to be sent, adjust the // size accordingly. size_t size = zmq_msg_size (&in_progress); - if (trim) - size -= *(unsigned char*) zmq_msg_data (&in_progress); + if (trim) { + zmq_assert (size); + size_t prefix_size = + (*(unsigned char*) zmq_msg_data (&in_progress)) + 1; + zmq_assert (prefix_size <= size); + size -= prefix_size; + } // For messages less than 255 bytes long, write one byte of message size. // For longer messages write 0xff escape character followed by 8-byte diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index bda098c..623ca63 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <string.h> + #include <new> #include "zmq_engine.hpp" @@ -160,11 +162,14 @@ void zmq::zmq_engine_t::revive () out_event (); } -void zmq::zmq_engine_t::traceroute (unsigned char *identity_, - size_t identity_size_) +void zmq::zmq_engine_t::add_prefix (const blob_t &identity_) +{ + decoder.add_prefix (identity_); +} + +void zmq::zmq_engine_t::trim_prefix () { encoder.trim_prefix (); - decoder.add_prefix (identity_, identity_size_); } void zmq::zmq_engine_t::error () diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 174dd1a..dc90a98 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -47,7 +47,8 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); - void traceroute (unsigned char *identity_, size_t identity_size_); + void add_prefix (const blob_t &identity_); + void trim_prefix (); // i_poll_events interface implementation. void in_event (); diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index 9492caa..3e76cb9 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -17,10 +17,13 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <string.h> + #include "zmq_init.hpp" #include "zmq_engine.hpp" #include "io_thread.hpp" #include "session.hpp" +#include "uuid.hpp" #include "err.hpp" zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_, @@ -71,17 +74,21 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) if (received) return false; - // Retreieve the remote identity. - peer_identity.assign ((const char*) zmq_msg_data (msg_), - zmq_msg_size (msg_)); + // Retreieve the remote identity. If it's empty, generate a unique name. + if (!zmq_msg_size (msg_)) { + unsigned char identity [uuid_t::uuid_string_len + 1]; + identity [0] = 0; + memcpy (identity + 1, uuid_t ().to_string (), uuid_t::uuid_string_len); + peer_identity.assign (identity, uuid_t::uuid_string_len + 1); + } + else { + peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_), + zmq_msg_size (msg_)); + } + if (options.traceroute) + engine->add_prefix (peer_identity); received = true; - // Once the initial handshaking is over, XREP sockets should start - // tracerouting individual messages. - if (options.type == ZMQ_XREP) - engine->traceroute ((unsigned char*) peer_identity.data (), - peer_identity.size ()); - return true; } @@ -160,15 +167,16 @@ void zmq::zmq_init_t::finalise () return; } } + else { - // If the peer has a unique name, find the associated session. If it - // doesn't exist, create it. - else if (!peer_identity.empty ()) { - session = owner->find_session (peer_identity.c_str ()); + // If the peer has a unique name, find the associated session. + // If it does not exist, create it. + zmq_assert (!peer_identity.empty ()); + session = owner->find_session (peer_identity); if (!session) { session = new (std::nothrow) session_t ( choose_io_thread (options.affinity), owner, options, - peer_identity.c_str ()); + peer_identity); zmq_assert (session); send_plug (session); send_own (owner, session); @@ -178,21 +186,8 @@ void zmq::zmq_init_t::finalise () } } - // If the other party has no specific identity, let's create a - // transient session. - else { - session = new (std::nothrow) session_t ( - choose_io_thread (options.affinity), owner, options, NULL); - zmq_assert (session); - send_plug (session); - send_own (owner, session); - - // Reserve a sequence number for following 'attach' command. - session->inc_seqnum (); - } - - // No need to increment seqnum as it was laready incremented above. - send_attach (session, engine, false); + // No need to increment seqnum as it was already incremented above. + send_attach (session, engine, peer_identity, false); // Destroy the init object. engine = NULL; diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp index df14293..6f935c2 100644 --- a/src/zmq_init.hpp +++ b/src/zmq_init.hpp @@ -20,8 +20,6 @@ #ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__ #define __ZMQ_ZMQ_INIT_HPP_INCLUDED__ -#include <string> - #include "i_inout.hpp" #include "i_engine.hpp" #include "owned.hpp" @@ -29,6 +27,7 @@ #include "stdint.hpp" #include "options.hpp" #include "stdint.hpp" +#include "blob.hpp" namespace zmq { @@ -72,7 +71,7 @@ namespace zmq bool received; // Identity of the peer socket. - std::string peer_identity; + blob_t peer_identity; // TCP connecter creates session before the name of the peer is known. // Thus we know only its ordinal number. |