From f5ce81f2893ec0707c2f4346740878e68b51e13a Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 13 Feb 2010 14:07:30 +0100 Subject: Multi-hop REQ/REP, part VIII., new blob_t type used for holding identity --- src/Makefile.am | 1 + src/blob.hpp | 33 +++++++++++++++++++++++++++++++++ src/i_engine.hpp | 5 +++-- src/object.cpp | 19 +++++++++---------- src/object.hpp | 7 ++++--- src/pgm_receiver.cpp | 3 +-- src/pgm_receiver.hpp | 2 +- src/pgm_sender.cpp | 3 +-- src/pgm_sender.hpp | 2 +- src/session.cpp | 32 +++++++++++--------------------- src/session.hpp | 13 +++++-------- src/socket_base.cpp | 24 ++++++++++-------------- src/socket_base.hpp | 14 ++++++-------- src/zmq_decoder.cpp | 34 +++++++++++++--------------------- src/zmq_decoder.hpp | 6 +++--- src/zmq_engine.cpp | 5 ++--- src/zmq_engine.hpp | 2 +- src/zmq_init.cpp | 17 ++++++----------- src/zmq_init.hpp | 5 ++--- 19 files changed, 113 insertions(+), 114 deletions(-) create mode 100644 src/blob.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index 32f088e..f19c0f6 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -57,6 +57,7 @@ libzmq_la_SOURCES = app_thread.hpp \ atomic_bitmap.hpp \ atomic_counter.hpp \ atomic_ptr.hpp \ + blob.hpp \ command.hpp \ config.hpp \ decoder.hpp \ diff --git a/src/blob.hpp b/src/blob.hpp new file mode 100644 index 0000000..a4fa8cd --- /dev/null +++ b/src/blob.hpp @@ -0,0 +1,33 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_BLOB_HPP_INCLUDED__ +#define __ZMQ_BLOB_HPP_INCLUDED__ + +#include + +namespace zmq +{ + + // Object to hold dynamically allocated opaque binary data. + typedef std::basic_string blob_t; + +} + +#endif diff --git a/src/i_engine.hpp b/src/i_engine.hpp index bcb4297..d64027d 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -22,6 +22,8 @@ #include +#include "blob.hpp" + namespace zmq { @@ -42,8 +44,7 @@ namespace zmq // Start tracing the message route. Engine should add the identity // supplied to all inbound messages and trim identity from all the // outbound messages. - virtual void traceroute (unsigned char *identity_, - size_t identity_size_) = 0; + virtual void traceroute (const blob_t &identity_) = 0; }; } diff --git a/src/object.cpp b/src/object.cpp index 73a17a3..eaa67c9 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -83,8 +83,8 @@ void zmq::object_t::process_command (command_t &cmd_) case command_t::attach: process_attach (cmd_.args.attach.engine, - cmd_.args.attach.peer_identity_size, - cmd_.args.attach.peer_identity); + blob_t (cmd_.args.attach.peer_identity, + cmd_.args.attach.peer_identity_size)); process_seqnum (); break; @@ -184,8 +184,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) } void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, - unsigned char peer_identity_size_, unsigned char *peer_identity_, - bool inc_seqnum_) + const blob_t &peer_identity_, bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -194,17 +193,17 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, cmd.destination = destination_; cmd.type = command_t::attach; cmd.args.attach.engine = engine_; - if (!peer_identity_size_) { + if (peer_identity_.empty ()) { cmd.args.attach.peer_identity_size = 0; cmd.args.attach.peer_identity = NULL; } else { - cmd.args.attach.peer_identity_size = peer_identity_size_; + cmd.args.attach.peer_identity_size = peer_identity_.size (); cmd.args.attach.peer_identity = - (unsigned char*) malloc (peer_identity_size_); + (unsigned char*) malloc (peer_identity_.size ()); zmq_assert (cmd.args.attach.peer_identity_size); - memcpy (cmd.args.attach.peer_identity, peer_identity_, - peer_identity_size_); + memcpy (cmd.args.attach.peer_identity, peer_identity_.data (), + peer_identity_.size ()); } send_command (cmd); } @@ -289,7 +288,7 @@ void zmq::object_t::process_own (owned_t *object_) } void zmq::object_t::process_attach (i_engine *engine_, - unsigned char peer_identity_size_, unsigned char *peer_identity_) + const blob_t &peer_identity_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index 4c82a0d..d492695 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -21,6 +21,7 @@ #define __ZMQ_OBJECT_HPP_INCLUDED__ #include "stdint.hpp" +#include "blob.hpp" namespace zmq { @@ -64,8 +65,8 @@ namespace zmq void send_own (class socket_base_t *destination_, class owned_t *object_); void send_attach (class session_t *destination_, - struct i_engine *engine_, unsigned char peer_identity_size_, - unsigned char *peer_identity_, bool inc_seqnum_ = true); + struct i_engine *engine_, const blob_t &peer_identity_, + bool inc_seqnum_ = true); void send_bind (class socket_base_t *destination_, class reader_t *in_pipe_, class writer_t *out_pipe_, bool inc_seqnum_ = true); @@ -83,7 +84,7 @@ namespace zmq virtual void process_plug (); virtual void process_own (class owned_t *object_); virtual void process_attach (struct i_engine *engine_, - unsigned char peer_identity_size_, unsigned char *peer_identity_); + const blob_t &peer_identity_); virtual void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_); virtual void process_revive (); diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index a2ba9c6..b7ca327 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -88,8 +88,7 @@ void zmq::pgm_receiver_t::revive () zmq_assert (false); } -void zmq::pgm_receiver_t::traceroute (unsigned char *identity_, - size_t identity_size_) +void zmq::pgm_receiver_t::traceroute (const blob_t &identity_) { // No need for tracerouting functionality in PGM socket at the moment. zmq_assert (false); diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index f03551f..b01e5b0 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -54,7 +54,7 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); - void traceroute (unsigned char *identity_, size_t identity_size_); + void traceroute (const blob_t &identity_); // i_poll_events interface implementation. void in_event (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index fa7d7e0..acbc3fb 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -102,8 +102,7 @@ void zmq::pgm_sender_t::revive () out_event (); } -void zmq::pgm_sender_t::traceroute (unsigned char *identity_, - size_t identity_size_) +void zmq::pgm_sender_t::traceroute (const blob_t &identity_) { // No need for tracerouting functionality in PGM socket at the moment. zmq_assert (false); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 89357f5..7041610 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -52,7 +52,7 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); - void traceroute (unsigned char *identity_, size_t identity_size_); + void traceroute (const blob_t &identity_); // i_poll_events interface implementation. void in_event (); diff --git a/src/session.cpp b/src/session.cpp index 1fab3c2..b2393d8 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -40,24 +40,18 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, } zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_, unsigned char peer_identity_size_, - unsigned char *peer_identity_) : + const options_t &options_, const blob_t &peer_identity_) : owned_t (parent_, owner_), in_pipe (NULL), active (true), out_pipe (NULL), engine (NULL), ordinal (0), + peer_identity (peer_identity_), options (options_) { - -if (!peer_identity_size_) - - // If peer identity is not supplied, leave it empty. - if (peer_identity_size_) { - peer_identity.assign ((char*) peer_identity_, peer_identity_size_); - if (!owner->register_session (peer_identity_size_, peer_identity_, - this)) { + if (!peer_identity.empty ()) { + if (!owner->register_session (peer_identity, this)) { // TODO: There's already a session with the specified // identity. We should presumably syslog it and drop the @@ -180,8 +174,7 @@ void zmq::session_t::process_unplug () if (ordinal) owner->unregister_session (ordinal); else if (!peer_identity.empty ()) - owner->unregister_session ((unsigned char) peer_identity.size (), - (unsigned char*) peer_identity.data ()); + owner->unregister_session (peer_identity); // Ask associated pipes to terminate. if (in_pipe) { @@ -201,26 +194,23 @@ void zmq::session_t::process_unplug () } void zmq::session_t::process_attach (i_engine *engine_, - unsigned char peer_identity_size_, unsigned char *peer_identity_) + const blob_t &peer_identity_) { if (!peer_identity.empty ()) { // If we already know the peer name do nothing, just check whether // it haven't changed. - zmq_assert (peer_identity.size () == peer_identity_size_); - zmq_assert (memcmp (peer_identity.data (), peer_identity_, - peer_identity_size_) == 0); + zmq_assert (peer_identity == peer_identity_); } - else if (peer_identity_size_) { + else if (!peer_identity_.empty ()) { - // Remember the peer identity. - peer_identity.assign ((char*) peer_identity_, peer_identity_size_); + // Store the peer identity. + peer_identity = peer_identity_; // If the session is not registered with the ordinal, let's register // it using the peer name. if (!ordinal) { - if (!owner->register_session (peer_identity_size_, peer_identity_, - this)) { + if (!owner->register_session (peer_identity, this)) { // TODO: There's already a session with the specified // identity. We should presumably syslog it and drop the diff --git a/src/session.hpp b/src/session.hpp index 7607cfb..d412728 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -20,12 +20,11 @@ #ifndef __ZMQ_SESSION_HPP_INCLUDED__ #define __ZMQ_SESSION_HPP_INCLUDED__ -#include - #include "i_inout.hpp" #include "i_endpoint.hpp" #include "owned.hpp" #include "options.hpp" +#include "blob.hpp" namespace zmq { @@ -38,11 +37,9 @@ namespace zmq session_t (object_t *parent_, socket_base_t *owner_, const options_t &options_); - // Creates named session. If name is NULL, transient session with - // auto-generated name is created. + // Creates named session. session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_, unsigned char peer_identity_size_, - unsigned char *peer_identity_); + const options_t &options_, const blob_t &peer_identity_); // i_inout interface implementation. bool read (::zmq_msg_t *msg_); @@ -68,7 +65,7 @@ namespace zmq void process_plug (); void process_unplug (); void process_attach (struct i_engine *engine_, - unsigned char peer_identity_size_, unsigned char *peer_identity_); + const blob_t &peer_identity_); // Inbound pipe, i.e. one the session is getting messages from. class reader_t *in_pipe; @@ -87,7 +84,7 @@ namespace zmq uint64_t ordinal; // Identity of the peer. - std::string peer_identity; + blob_t peer_identity; // Inherited socket options. options_t options; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 4af69a0..7d90236 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -267,7 +267,7 @@ int zmq::socket_base_t::connect (const char *addr_) return -1; } - send_attach (session, pgm_sender, 0, NULL); + send_attach (session, pgm_sender, blob_t ()); } else if (options.requires_in) { @@ -282,7 +282,7 @@ int zmq::socket_base_t::connect (const char *addr_) return -1; } - send_attach (session, pgm_receiver, 0, NULL); + send_attach (session, pgm_receiver, blob_t ()); } else zmq_assert (false); @@ -454,33 +454,29 @@ bool zmq::socket_base_t::has_out () return xhas_out (); } -bool zmq::socket_base_t::register_session (unsigned char peer_identity_size_, - unsigned char *peer_identity_, session_t *session_) +bool zmq::socket_base_t::register_session (const blob_t &peer_identity_, + session_t *session_) { sessions_sync.lock (); - bool registered = named_sessions.insert (std::make_pair (std::string ( - (char*) peer_identity_, peer_identity_size_), session_)).second; + bool registered = named_sessions.insert ( + std::make_pair (peer_identity_, session_)).second; sessions_sync.unlock (); return registered; } -void zmq::socket_base_t::unregister_session (unsigned char peer_identity_size_, - unsigned char *peer_identity_) +void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_) { sessions_sync.lock (); - named_sessions_t::iterator it = named_sessions.find (std::string ( - (char*) peer_identity_, peer_identity_size_)); + named_sessions_t::iterator it = named_sessions.find (peer_identity_); zmq_assert (it != named_sessions.end ()); named_sessions.erase (it); sessions_sync.unlock (); } -zmq::session_t *zmq::socket_base_t::find_session ( - unsigned char peer_identity_size_, unsigned char *peer_identity_) +zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_) { sessions_sync.lock (); - named_sessions_t::iterator it = named_sessions.find (std::string ( - (char*) peer_identity_, peer_identity_size_)); + named_sessions_t::iterator it = named_sessions.find (peer_identity_); if (it == named_sessions.end ()) { sessions_sync.unlock (); return NULL; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index a2878ea..39f09de 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -23,7 +23,6 @@ #include #include #include -#include #include "../bindings/c/zmq.h" @@ -35,6 +34,7 @@ #include "stdint.hpp" #include "atomic_counter.hpp" #include "stdint.hpp" +#include "blob.hpp" namespace zmq { @@ -78,12 +78,10 @@ namespace zmq // There are two distinct types of sessions: those identified by name // and those identified by ordinal number. Thus two sets of session // management functions. - bool register_session (unsigned char peer_identity_size_, - unsigned char *peer_identity_, class session_t *session_); - void unregister_session (unsigned char peer_identity_size_, - unsigned char *peer_identity_); - class session_t *find_session (unsigned char peer_identity_size_, - unsigned char *peer_identity_); + bool register_session (const blob_t &peer_identity_, + class session_t *session_); + void unregister_session (const blob_t &peer_identity_); + class session_t *find_session (const blob_t &peer_identity_); uint64_t register_session (class session_t *session_); void unregister_session (uint64_t ordinal_); class session_t *find_session (uint64_t ordinal_); @@ -158,7 +156,7 @@ namespace zmq // within the socket, instead they are used by I/O objects owned by // the socket. As those objects can live in different threads, // the access is synchronised by mutex. - typedef std::map named_sessions_t; + typedef std::map named_sessions_t; named_sessions_t named_sessions; typedef std::map unnamed_sessions_t; unnamed_sessions_t unnamed_sessions; diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index f502ffd..20e07bc 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -27,9 +27,7 @@ zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) : decoder_t (bufsize_), - destination (NULL), - prefix (NULL), - prefix_size (0) + destination (NULL) { zmq_msg_init (&in_progress); @@ -39,9 +37,6 @@ zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) : zmq::zmq_decoder_t::~zmq_decoder_t () { - if (prefix) - free (prefix); - zmq_msg_close (&in_progress); } @@ -50,13 +45,9 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_) destination = destination_; } -void zmq::zmq_decoder_t::add_prefix (unsigned char *prefix_, - size_t prefix_size_) +void zmq::zmq_decoder_t::add_prefix (const blob_t &prefix_) { - prefix = malloc (prefix_size_); - zmq_assert (prefix); - memcpy (prefix, prefix_, prefix_size_); - prefix_size = prefix_size_; + prefix = prefix_; } bool zmq::zmq_decoder_t::one_byte_size_ready () @@ -72,15 +63,16 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - int rc = zmq_msg_init_size (&in_progress, prefix_size + *tmpbuf); + int rc = zmq_msg_init_size (&in_progress, prefix.size () + *tmpbuf); errno_assert (rc == 0); // Fill in the message prefix if any. - if (prefix) - memcpy (zmq_msg_data (&in_progress), prefix, prefix_size); + if (!prefix.empty ()) + memcpy (zmq_msg_data (&in_progress), prefix.data (), + prefix.size ()); - next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size, - *tmpbuf, &zmq_decoder_t::message_ready); + next_step ((unsigned char*) zmq_msg_data (&in_progress) + + prefix.size (), *tmpbuf, &zmq_decoder_t::message_ready); } return true; } @@ -95,14 +87,14 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - int rc = zmq_msg_init_size (&in_progress, prefix_size + size); + int rc = zmq_msg_init_size (&in_progress, prefix.size () + size); errno_assert (rc == 0); // Fill in the message prefix if any. - if (prefix) - memcpy (zmq_msg_data (&in_progress), prefix, prefix_size); + if (!prefix.empty ()) + memcpy (zmq_msg_data (&in_progress), prefix.data (), prefix.size ()); - next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size , + next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix.size (), size, &zmq_decoder_t::message_ready); return true; } diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp index dfabece..11ee6c2 100644 --- a/src/zmq_decoder.hpp +++ b/src/zmq_decoder.hpp @@ -23,6 +23,7 @@ #include "../bindings/c/zmq.h" #include "decoder.hpp" +#include "blob.hpp" namespace zmq { @@ -41,7 +42,7 @@ namespace zmq // Once called, all decoded messages will be prefixed by the specified // prefix. - void add_prefix (unsigned char *prefix_, size_t prefix_size_); + void add_prefix (const blob_t &prefix_); private: @@ -53,8 +54,7 @@ namespace zmq unsigned char tmpbuf [8]; ::zmq_msg_t in_progress; - void *prefix; - size_t prefix_size; + blob_t prefix; zmq_decoder_t (const zmq_decoder_t&); void operator = (const zmq_decoder_t&); diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index bda098c..75f3441 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -160,11 +160,10 @@ void zmq::zmq_engine_t::revive () out_event (); } -void zmq::zmq_engine_t::traceroute (unsigned char *identity_, - size_t identity_size_) +void zmq::zmq_engine_t::traceroute (const blob_t &identity_) { encoder.trim_prefix (); - decoder.add_prefix (identity_, identity_size_); + decoder.add_prefix (identity_); } void zmq::zmq_engine_t::error () diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 174dd1a..8657e8e 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -47,7 +47,7 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); - void traceroute (unsigned char *identity_, size_t identity_size_); + void traceroute (const blob_t &identity_); // i_poll_events interface implementation. void in_event (); diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index f062ede..9aebad0 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -72,15 +72,14 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) return false; // Retreieve the remote identity. - peer_identity.assign ((const char*) zmq_msg_data (msg_), + peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_), zmq_msg_size (msg_)); received = true; // Once the initial handshaking is over, XREP sockets should start // tracerouting individual messages. if (options.traceroute) - engine->traceroute ((unsigned char*) peer_identity.data (), - peer_identity.size ()); + engine->traceroute (peer_identity); return true; } @@ -164,14 +163,11 @@ void zmq::zmq_init_t::finalise () // If the peer has a unique name, find the associated session. If it // doesn't exist, create it. else if (!peer_identity.empty ()) { - session = owner->find_session ( - (unsigned char) peer_identity.size (), - (unsigned char*) peer_identity.data ()); + session = owner->find_session (peer_identity); if (!session) { session = new (std::nothrow) session_t ( choose_io_thread (options.affinity), owner, options, - (unsigned char) peer_identity.size (), - (unsigned char*) peer_identity.c_str ()); + peer_identity); zmq_assert (session); send_plug (session); send_own (owner, session); @@ -185,7 +181,7 @@ void zmq::zmq_init_t::finalise () // transient session. else { session = new (std::nothrow) session_t ( - choose_io_thread (options.affinity), owner, options, 0, NULL); + choose_io_thread (options.affinity), owner, options, blob_t ()); zmq_assert (session); send_plug (session); send_own (owner, session); @@ -195,8 +191,7 @@ void zmq::zmq_init_t::finalise () } // No need to increment seqnum as it was already incremented above. - send_attach (session, engine, (unsigned char) peer_identity.size (), - (unsigned char*) peer_identity.data (), false); + send_attach (session, engine, peer_identity, false); // Destroy the init object. engine = NULL; diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp index df14293..6f935c2 100644 --- a/src/zmq_init.hpp +++ b/src/zmq_init.hpp @@ -20,8 +20,6 @@ #ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__ #define __ZMQ_ZMQ_INIT_HPP_INCLUDED__ -#include - #include "i_inout.hpp" #include "i_engine.hpp" #include "owned.hpp" @@ -29,6 +27,7 @@ #include "stdint.hpp" #include "options.hpp" #include "stdint.hpp" +#include "blob.hpp" namespace zmq { @@ -72,7 +71,7 @@ namespace zmq bool received; // Identity of the peer socket. - std::string peer_identity; + blob_t peer_identity; // TCP connecter creates session before the name of the peer is known. // Thus we know only its ordinal number. -- cgit v1.2.3