diff options
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/blob.hpp | 33 | ||||
-rw-r--r-- | src/command.cpp | 38 | ||||
-rw-r--r-- | src/command.hpp | 7 | ||||
-rw-r--r-- | src/dispatcher.cpp | 4 | ||||
-rw-r--r-- | src/downstream.cpp | 1 | ||||
-rw-r--r-- | src/i_engine.hpp | 5 | ||||
-rw-r--r-- | src/object.cpp | 67 | ||||
-rw-r--r-- | src/object.hpp | 11 | ||||
-rw-r--r-- | src/options.cpp | 7 | ||||
-rw-r--r-- | src/options.hpp | 17 | ||||
-rw-r--r-- | src/p2p.cpp | 1 | ||||
-rw-r--r-- | src/pgm_receiver.cpp | 3 | ||||
-rw-r--r-- | src/pgm_receiver.hpp | 2 | ||||
-rw-r--r-- | src/pgm_sender.cpp | 3 | ||||
-rw-r--r-- | src/pgm_sender.hpp | 2 | ||||
-rw-r--r-- | src/pgm_socket.cpp | 7 | ||||
-rw-r--r-- | src/pub.cpp | 1 | ||||
-rw-r--r-- | src/rep.cpp | 6 | ||||
-rw-r--r-- | src/req.cpp | 1 | ||||
-rw-r--r-- | src/session.cpp | 127 | ||||
-rw-r--r-- | src/session.hpp | 26 | ||||
-rw-r--r-- | src/socket_base.cpp | 79 | ||||
-rw-r--r-- | src/socket_base.hpp | 14 | ||||
-rw-r--r-- | src/sub.cpp | 1 | ||||
-rw-r--r-- | src/upstream.cpp | 1 | ||||
-rw-r--r-- | src/xrep.cpp | 9 | ||||
-rw-r--r-- | src/xreq.cpp | 1 | ||||
-rw-r--r-- | src/zmq_decoder.cpp | 34 | ||||
-rw-r--r-- | src/zmq_decoder.hpp | 6 | ||||
-rw-r--r-- | src/zmq_engine.cpp | 5 | ||||
-rw-r--r-- | src/zmq_engine.hpp | 2 | ||||
-rw-r--r-- | src/zmq_init.cpp | 17 | ||||
-rw-r--r-- | src/zmq_init.hpp | 5 |
34 files changed, 349 insertions, 196 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 446b1e2..4146f68 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -63,6 +63,7 @@ libzmq_la_SOURCES = app_thread.hpp \ atomic_bitmap.hpp \ atomic_counter.hpp \ atomic_ptr.hpp \ + blob.hpp \ command.hpp \ config.hpp \ decoder.hpp \ @@ -131,6 +132,7 @@ libzmq_la_SOURCES = app_thread.hpp \ zmq_init.hpp \ zmq_listener.hpp \ app_thread.cpp \ + command.cpp \ devpoll.cpp \ dispatcher.cpp \ downstream.cpp \ diff --git a/src/blob.hpp b/src/blob.hpp new file mode 100644 index 0000000..a4fa8cd --- /dev/null +++ b/src/blob.hpp @@ -0,0 +1,33 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_BLOB_HPP_INCLUDED__ +#define __ZMQ_BLOB_HPP_INCLUDED__ + +#include <string> + +namespace zmq +{ + + // Object to hold dynamically allocated opaque binary data. + typedef std::basic_string <unsigned char> blob_t; + +} + +#endif diff --git a/src/command.cpp b/src/command.cpp new file mode 100644 index 0000000..8bf7ea2 --- /dev/null +++ b/src/command.cpp @@ -0,0 +1,38 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdlib.h> + +#include "command.hpp" + +void zmq::deallocate_command (command_t *cmd_) +{ + switch (cmd_->type) { + case command_t::attach: + if (cmd_->args.attach.peer_identity) + free (cmd_->args.attach.peer_identity); + break; + case command_t::bind: + if (cmd_->args.bind.peer_identity) + free (cmd_->args.bind.peer_identity); + break; + default: + /* noop */; + } +} diff --git a/src/command.hpp b/src/command.hpp index 469d6ec..150cad1 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -66,6 +66,8 @@ namespace zmq // Attach the engine to the session. struct { struct i_engine *engine; + unsigned char peer_identity_size; + unsigned char *peer_identity; } attach; // Sent from session to socket to establish pipe(s) between them. @@ -73,6 +75,8 @@ namespace zmq struct { class reader_t *in_pipe; class writer_t *out_pipe; + unsigned char peer_identity_size; + unsigned char *peer_identity; } bind; // Sent by pipe writer to inform dormant pipe reader that there @@ -107,6 +111,9 @@ namespace zmq } args; }; + // Function to deallocate dynamically allocated components of the command. + void deallocate_command (command_t *cmd_); + } #endif diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 8aafcf8..4233278 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -117,6 +117,10 @@ zmq::dispatcher_t::~dispatcher_t () while (!pipes.empty ()) delete *pipes.begin (); + // TODO: Deallocate any commands still in the pipes. Keep in mind that + // simple reading from a pipe and deallocating commands won't do as + // command pipe has template parameter D set to true, meaning that + // read may return false even if there are still commands in the pipe. delete [] command_pipes; #ifdef ZMQ_HAVE_WINDOWS diff --git a/src/downstream.cpp b/src/downstream.cpp index 29b0689..2da08e3 100644 --- a/src/downstream.cpp +++ b/src/downstream.cpp @@ -26,7 +26,6 @@ zmq::downstream_t::downstream_t (class app_thread_t *parent_) : socket_base_t (parent_) { - options.type = ZMQ_DOWNSTREAM; options.requires_in = false; options.requires_out = true; } diff --git a/src/i_engine.hpp b/src/i_engine.hpp index bcb4297..d64027d 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -22,6 +22,8 @@ #include <stddef.h> +#include "blob.hpp" + namespace zmq { @@ -42,8 +44,7 @@ namespace zmq // Start tracing the message route. Engine should add the identity // supplied to all inbound messages and trim identity from all the // outbound messages. - virtual void traceroute (unsigned char *identity_, - size_t identity_size_) = 0; + virtual void traceroute (const blob_t &identity_) = 0; }; } diff --git a/src/object.cpp b/src/object.cpp index a977f39..356fcd1 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -17,6 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include <string.h> + #include "object.hpp" #include "dispatcher.hpp" #include "err.hpp" @@ -77,17 +79,21 @@ void zmq::object_t::process_command (command_t &cmd_) case command_t::own: process_own (cmd_.args.own.object); - return; + break; case command_t::attach: - process_attach (cmd_.args.attach.engine); + process_attach (cmd_.args.attach.engine, + blob_t (cmd_.args.attach.peer_identity, + cmd_.args.attach.peer_identity_size)); process_seqnum (); - return; + break; case command_t::bind: - process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); + process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe, + blob_t (cmd_.args.bind.peer_identity, + cmd_.args.bind.peer_identity_size)); process_seqnum (); - return; + break; case command_t::pipe_term: process_pipe_term (); @@ -95,23 +101,27 @@ void zmq::object_t::process_command (command_t &cmd_) case command_t::pipe_term_ack: process_pipe_term_ack (); - return; + break; case command_t::term_req: process_term_req (cmd_.args.term_req.object); - return; + break; case command_t::term: process_term (); - return; + break; case command_t::term_ack: process_term_ack (); - return; + break; default: zmq_assert (false); } + + // The assumption here is that each command is processed once only, + // so deallocating it after processing is all right. + deallocate_command (&cmd_); } void zmq::object_t::register_pipe (class pipe_t *pipe_) @@ -176,7 +186,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) } void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, - bool inc_seqnum_) + const blob_t &peer_identity_, bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -185,11 +195,26 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, cmd.destination = destination_; cmd.type = command_t::attach; cmd.args.attach.engine = engine_; + if (peer_identity_.empty ()) { + cmd.args.attach.peer_identity_size = 0; + cmd.args.attach.peer_identity = NULL; + } + else { + zmq_assert (peer_identity_.size () <= 0xff); + cmd.args.attach.peer_identity_size = + (unsigned char) peer_identity_.size (); + cmd.args.attach.peer_identity = + (unsigned char*) malloc (peer_identity_.size ()); + zmq_assert (cmd.args.attach.peer_identity_size); + memcpy (cmd.args.attach.peer_identity, peer_identity_.data (), + peer_identity_.size ()); + } send_command (cmd); } void zmq::object_t::send_bind (socket_base_t *destination_, - reader_t *in_pipe_, writer_t *out_pipe_, bool inc_seqnum_) + reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_, + bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -199,6 +224,20 @@ void zmq::object_t::send_bind (socket_base_t *destination_, cmd.type = command_t::bind; cmd.args.bind.in_pipe = in_pipe_; cmd.args.bind.out_pipe = out_pipe_; + if (peer_identity_.empty ()) { + cmd.args.bind.peer_identity_size = 0; + cmd.args.bind.peer_identity = NULL; + } + else { + zmq_assert (peer_identity_.size () <= 0xff); + cmd.args.bind.peer_identity_size = + (unsigned char) peer_identity_.size (); + cmd.args.bind.peer_identity = + (unsigned char*) malloc (peer_identity_.size ()); + zmq_assert (cmd.args.bind.peer_identity_size); + memcpy (cmd.args.bind.peer_identity, peer_identity_.data (), + peer_identity_.size ()); + } send_command (cmd); } @@ -267,12 +306,14 @@ void zmq::object_t::process_own (owned_t *object_) zmq_assert (false); } -void zmq::object_t::process_attach (i_engine *engine_) +void zmq::object_t::process_attach (i_engine *engine_, + const blob_t &peer_identity_) { zmq_assert (false); } -void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, + const blob_t &peer_identity_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index e6b2379..1544109 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -21,6 +21,7 @@ #define __ZMQ_OBJECT_HPP_INCLUDED__ #include "stdint.hpp" +#include "blob.hpp" namespace zmq { @@ -64,10 +65,11 @@ namespace zmq void send_own (class socket_base_t *destination_, class owned_t *object_); void send_attach (class session_t *destination_, - struct i_engine *engine_, bool inc_seqnum_ = true); + struct i_engine *engine_, const blob_t &peer_identity_, + bool inc_seqnum_ = true); void send_bind (class socket_base_t *destination_, class reader_t *in_pipe_, class writer_t *out_pipe_, - bool inc_seqnum_ = true); + const blob_t &peer_identity_, bool inc_seqnum_ = true); void send_revive (class object_t *destination_); void send_pipe_term (class writer_t *destination_); void send_pipe_term_ack (class reader_t *destination_); @@ -81,9 +83,10 @@ namespace zmq virtual void process_stop (); virtual void process_plug (); virtual void process_own (class owned_t *object_); - virtual void process_attach (struct i_engine *engine_); + virtual void process_attach (struct i_engine *engine_, + const blob_t &peer_identity_); virtual void process_bind (class reader_t *in_pipe_, - class writer_t *out_pipe_); + class writer_t *out_pipe_, const blob_t &peer_identity_); virtual void process_revive (); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); diff --git a/src/options.cpp b/src/options.cpp index f9d93d6..b77af24 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -23,7 +23,6 @@ #include "err.hpp" zmq::options_t::options_t () : - type (-1), hwm (0), lwm (0), swap (0), @@ -34,7 +33,9 @@ zmq::options_t::options_t () : sndbuf (0), rcvbuf (0), requires_in (false), - requires_out (false) + requires_out (false), + immediate_connect (true), + traceroute (false) { } @@ -76,7 +77,7 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, return 0; case ZMQ_IDENTITY: - identity.assign ((const char*) optval_, optvallen_); + identity.assign ((const unsigned char*) optval_, optvallen_); return 0; case ZMQ_RATE: diff --git a/src/options.hpp b/src/options.hpp index dbe3701..6d9be4d 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -20,10 +20,9 @@ #ifndef __ZMQ_OPTIONS_HPP_INCLUDED__ #define __ZMQ_OPTIONS_HPP_INCLUDED__ -#include <string> - #include "stddef.h" #include "stdint.hpp" +#include "blob.hpp" namespace zmq { @@ -34,14 +33,11 @@ namespace zmq int setsockopt (int option_, const void *optval_, size_t optvallen_); - // Type of the associated socket. One of the constants defined in zmq.h - int type; - int64_t hwm; int64_t lwm; int64_t swap; uint64_t affinity; - std::string identity; + blob_t identity; // Maximum tranfer rate [kb/s]. Default 100kb/s. uint32_t rate; @@ -59,6 +55,15 @@ namespace zmq // provided by the specific socket type. bool requires_in; bool requires_out; + + // If true, when connecting, pipes are created immediately without + // waiting for the connection to be established. That way the socket + // is not aware of the peer's identity, however, it is able to send + // messages straight away. + bool immediate_connect; + + // If true, socket requires tracerouting the messages. + bool traceroute; }; } diff --git a/src/p2p.cpp b/src/p2p.cpp index 46bbd0b..72bc26b 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -29,7 +29,6 @@ zmq::p2p_t::p2p_t (class app_thread_t *parent_) : outpipe (NULL), alive (true) { - options.type = ZMQ_P2P; options.requires_in = true; options.requires_out = true; } diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index a2ba9c6..b7ca327 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -88,8 +88,7 @@ void zmq::pgm_receiver_t::revive () zmq_assert (false); } -void zmq::pgm_receiver_t::traceroute (unsigned char *identity_, - size_t identity_size_) +void zmq::pgm_receiver_t::traceroute (const blob_t &identity_) { // No need for tracerouting functionality in PGM socket at the moment. zmq_assert (false); diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index f03551f..b01e5b0 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -54,7 +54,7 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); - void traceroute (unsigned char *identity_, size_t identity_size_); + void traceroute (const blob_t &identity_); // i_poll_events interface implementation. void in_event (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index fa7d7e0..acbc3fb 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -102,8 +102,7 @@ void zmq::pgm_sender_t::revive () out_event (); } -void zmq::pgm_sender_t::traceroute (unsigned char *identity_, - size_t identity_size_) +void zmq::pgm_sender_t::traceroute (const blob_t &identity_) { // No need for tracerouting functionality in PGM socket at the moment. zmq_assert (false); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 89357f5..7041610 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -52,7 +52,7 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); - void traceroute (unsigned char *identity_, size_t identity_size_); + void traceroute (const blob_t &identity_); // i_poll_events interface implementation. void in_event (); diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 1eeb34f..462a3a9 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -89,8 +89,11 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (options.identity.size () > 0) { - // Create gsi from identity string. - gsi_base = options.identity; + // Create gsi from identity. + // TODO: We assume that identity is standard C string here. + // What if it contains binary zeroes? + gsi_base.assign ((const char*) options.identity.data (), + options.identity.size ()); } else { // Generate random gsi. diff --git a/src/pub.cpp b/src/pub.cpp index 9a2dcc6..05bfdcf 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -27,7 +27,6 @@ zmq::pub_t::pub_t (class app_thread_t *parent_) : socket_base_t (parent_) { - options.type = ZMQ_PUB; options.requires_in = false; options.requires_out = true; } diff --git a/src/rep.cpp b/src/rep.cpp index b7685b4..8c5b86c 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -30,9 +30,13 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) : waiting_for_reply (false), reply_pipe (NULL) { - options.type = ZMQ_REP; options.requires_in = true; options.requires_out = true; + + // We don't need immediate connect. We'll be able to send messages + // (replies) only when connection is established and thus requests + // can arrive anyway. + options.immediate_connect = false; } zmq::rep_t::~rep_t () diff --git a/src/req.cpp b/src/req.cpp index 9b1766e..c9240e0 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -30,7 +30,6 @@ zmq::req_t::req_t (class app_thread_t *parent_) : reply_pipe_active (false), reply_pipe (NULL) { - options.type = ZMQ_REQ; options.requires_in = true; options.requires_out = true; } diff --git a/src/session.cpp b/src/session.cpp index 1aece4d..f86327e 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -32,9 +32,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, out_pipe (NULL), engine (NULL), options (options_) -{ - type = unnamed; - +{ // It's possible to register the session at this point as it will be // searched for only on reconnect, i.e. no race condition (session found // before it is plugged into it's I/O thread) is possible. @@ -42,23 +40,24 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, } zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_, const char *name_) : + const options_t &options_, const blob_t &peer_identity_) : owned_t (parent_, owner_), in_pipe (NULL), active (true), out_pipe (NULL), engine (NULL), + ordinal (0), + peer_identity (peer_identity_), options (options_) { - if (name_) { - type = named; - name = name_; - ordinal = 0; - } - else { - type = transient; - // TODO: Generate unique name here. - ordinal = 0; + if (!peer_identity.empty ()) { + if (!owner->register_session (peer_identity, this)) { + + // TODO: There's already a session with the specified + // identity. We should presumably syslog it and drop the + // session. + zmq_assert (false); + } } } @@ -104,7 +103,7 @@ void zmq::session_t::detach (owned_t *reconnecter_) engine = NULL; // Terminate transient session. - if (type == transient) + if (!ordinal && peer_identity.empty ()) term (); } @@ -120,7 +119,6 @@ class zmq::socket_base_t *zmq::session_t::get_owner () uint64_t zmq::session_t::get_ordinal () { - zmq_assert (type == unnamed); zmq_assert (ordinal); return ordinal; } @@ -168,52 +166,15 @@ void zmq::session_t::revive (reader_t *pipe_) void zmq::session_t::process_plug () { - // Register the session with the socket. - if (!name.empty ()) { - bool ok = owner->register_session (name.c_str (), this); - - // There's already a session with the specified identity. - // We should syslog it and drop the session. TODO - zmq_assert (ok); - } - - // If session is created by 'connect' function, it has the pipes set - // already. Otherwise, it's being created by the listener and the pipes - // are yet to be created. - if (!in_pipe && !out_pipe) { - - pipe_t *inbound = NULL; - pipe_t *outbound = NULL; - - if (options.requires_out) { - inbound = new (std::nothrow) pipe_t (this, owner, - options.hwm, options.lwm); - zmq_assert (inbound); - in_pipe = &inbound->reader; - in_pipe->set_endpoint (this); - } - - if (options.requires_in) { - outbound = new (std::nothrow) pipe_t (owner, this, - options.hwm, options.lwm); - zmq_assert (outbound); - out_pipe = &outbound->writer; - out_pipe->set_endpoint (this); - } - - send_bind (owner, outbound ? &outbound->reader : NULL, - inbound ? &inbound->writer : NULL); - } } void zmq::session_t::process_unplug () { - // Unregister the session from the socket. There's nothing to do here - // for transient sessions. - if (type == unnamed) + // Unregister the session from the socket. + if (ordinal) owner->unregister_session (ordinal); - else if (type == named) - owner->unregister_session (name.c_str ()); + else if (!peer_identity.empty ()) + owner->unregister_session (peer_identity); // Ask associated pipes to terminate. if (in_pipe) { @@ -232,8 +193,60 @@ void zmq::session_t::process_unplug () } } -void zmq::session_t::process_attach (i_engine *engine_) +void zmq::session_t::process_attach (i_engine *engine_, + const blob_t &peer_identity_) { + if (!peer_identity.empty ()) { + + // If we already know the peer name do nothing, just check whether + // it haven't changed. + zmq_assert (peer_identity == peer_identity_); + } + else if (!peer_identity_.empty ()) { + + // Store the peer identity. + peer_identity = peer_identity_; + + // If the session is not registered with the ordinal, let's register + // it using the peer name. + if (!ordinal) { + if (!owner->register_session (peer_identity, this)) { + + // TODO: There's already a session with the specified + // identity. We should presumably syslog it and drop the + // session. + zmq_assert (false); + } + } + } + + // Check whether the required pipes already exist. If not so, we'll + // create them and bind them to the socket object. + reader_t *socket_reader = NULL; + writer_t *socket_writer = NULL; + + if (options.requires_in && !out_pipe) { + pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, + options.hwm, options.lwm); + zmq_assert (pipe); + out_pipe = &pipe->writer; + out_pipe->set_endpoint (this); + socket_reader = &pipe->reader; + } + + if (options.requires_out && !in_pipe) { + pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, + options.hwm, options.lwm); + zmq_assert (pipe); + in_pipe = &pipe->reader; + in_pipe->set_endpoint (this); + socket_writer = &pipe->writer; + } + + if (socket_reader || socket_writer) + send_bind (owner, socket_reader, socket_writer, peer_identity); + + // Plug in the engine. zmq_assert (!engine); zmq_assert (engine_); engine = engine_; diff --git a/src/session.hpp b/src/session.hpp index 375d095..d412728 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -20,12 +20,11 @@ #ifndef __ZMQ_SESSION_HPP_INCLUDED__ #define __ZMQ_SESSION_HPP_INCLUDED__ -#include <string> - #include "i_inout.hpp" #include "i_endpoint.hpp" #include "owned.hpp" #include "options.hpp" +#include "blob.hpp" namespace zmq { @@ -38,10 +37,9 @@ namespace zmq session_t (object_t *parent_, socket_base_t *owner_, const options_t &options_); - // Creates named session. If name is NULL, transient session with - // auto-generated name is created. + // Creates named session. session_t (object_t *parent_, socket_base_t *owner_, - const options_t &options_, const char *name_); + const options_t &options_, const blob_t &peer_identity_); // i_inout interface implementation. bool read (::zmq_msg_t *msg_); @@ -66,7 +64,8 @@ namespace zmq // Handlers for incoming commands. void process_plug (); void process_unplug (); - void process_attach (struct i_engine *engine_); + void process_attach (struct i_engine *engine_, + const blob_t &peer_identity_); // Inbound pipe, i.e. one the session is getting messages from. class reader_t *in_pipe; @@ -79,18 +78,13 @@ namespace zmq struct i_engine *engine; - enum { - transient, - named, - unnamed - } type; - - // Ordinal of the session (if any). + // Session is identified by ordinal in the case when it was created + // before connection to the peer was established and thus we are + // unaware of peer's identity. uint64_t ordinal; - // The name of the session. One that is used to register it with - // socket-level repository of sessions. - std::string name; + // Identity of the peer. + blob_t peer_identity; // Inherited socket options. options_t options; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 720e8cd..222b769 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -141,6 +141,10 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "inproc") { + // TODO: inproc connect is specific with respect to creating pipes + // as there's no 'reconnect' functionality implemented. Once that + // is in place we should follow generic pipe creation algorithm. + // Find the peer socket. socket_base_t *peer = find_endpoint (addr_args.c_str ()); if (!peer) @@ -171,7 +175,7 @@ int zmq::socket_base_t::connect (const char *addr_) // was incremented in find_endpoint function. The callee is notified // about the fact via the last parameter. send_bind (peer, out_pipe ? &out_pipe->reader : NULL, - in_pipe ? &in_pipe->writer : NULL, false); + in_pipe ? &in_pipe->writer : NULL, options.identity, false); return 0; } @@ -182,31 +186,37 @@ int zmq::socket_base_t::connect (const char *addr_) this, options); zmq_assert (session); - pipe_t *in_pipe = NULL; - pipe_t *out_pipe = NULL; + // If 'immediate connect' feature is required, we'll created the pipes + // to the session straight away. Otherwise, they'll be created by the + // session once the connection is established. + if (options.immediate_connect) { - // Create inbound pipe, if required. - if (options.requires_in) { - in_pipe = new (std::nothrow) pipe_t (this, session, - options.hwm, options.lwm); - zmq_assert (in_pipe); + pipe_t *in_pipe = NULL; + pipe_t *out_pipe = NULL; - } + // Create inbound pipe, if required. + if (options.requires_in) { + in_pipe = new (std::nothrow) pipe_t (this, session, + options.hwm, options.lwm); + zmq_assert (in_pipe); - // Create outbound pipe, if required. - if (options.requires_out) { - out_pipe = new (std::nothrow) pipe_t (session, this, - options.hwm, options.lwm); - zmq_assert (out_pipe); - } + } - // Attach the pipes to the socket object. - attach_pipes (in_pipe ? &in_pipe->reader : NULL, - out_pipe ? &out_pipe->writer : NULL); + // Create outbound pipe, if required. + if (options.requires_out) { + out_pipe = new (std::nothrow) pipe_t (session, this, + options.hwm, options.lwm); + zmq_assert (out_pipe); + } - // Attach the pipes to the session object. - session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, - in_pipe ? &in_pipe->writer : NULL); + // Attach the pipes to the socket object. + attach_pipes (in_pipe ? &in_pipe->reader : NULL, + out_pipe ? &out_pipe->writer : NULL); + + // Attach the pipes to the session object. + session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, + in_pipe ? &in_pipe->writer : NULL); + } // Activate the session. send_plug (session); @@ -215,6 +225,8 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "tcp" || addr_type == "ipc") { #if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS + // Windows named pipes are not compatible with Winsock API. + // There's no UNIX domain socket implementation on OpenVMS. if (addr_type == "ipc") { errno = EPROTONOSUPPORT; return -1; @@ -254,6 +266,9 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "udp") udp_encapsulation = true; + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with PGM anyway. if (options.requires_out) { // PGM sender. @@ -267,7 +282,7 @@ int zmq::socket_base_t::connect (const char *addr_) return -1; } - send_attach (session, pgm_sender); + send_attach (session, pgm_sender, blob_t ()); } else if (options.requires_in) { @@ -282,7 +297,7 @@ int zmq::socket_base_t::connect (const char *addr_) return -1; } - send_attach (session, pgm_receiver); + send_attach (session, pgm_receiver, blob_t ()); } else zmq_assert (false); @@ -454,30 +469,29 @@ bool zmq::socket_base_t::has_out () return xhas_out (); } -bool zmq::socket_base_t::register_session (const char *name_, +bool zmq::socket_base_t::register_session (const blob_t &peer_identity_, session_t *session_) { sessions_sync.lock (); - bool registered = - named_sessions.insert (std::make_pair (name_, session_)).second; + bool registered = named_sessions.insert ( + std::make_pair (peer_identity_, session_)).second; sessions_sync.unlock (); return registered; } -void zmq::socket_base_t::unregister_session (const char *name_) +void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_) { sessions_sync.lock (); - named_sessions_t::iterator it = named_sessions.find (name_); + named_sessions_t::iterator it = named_sessions.find (peer_identity_); zmq_assert (it != named_sessions.end ()); named_sessions.erase (it); sessions_sync.unlock (); } -zmq::session_t *zmq::socket_base_t::find_session (const char *name_) +zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_) { sessions_sync.lock (); - - named_sessions_t::iterator it = named_sessions.find (name_); + named_sessions_t::iterator it = named_sessions.find (peer_identity_); if (it == named_sessions.end ()) { sessions_sync.unlock (); return NULL; @@ -565,7 +579,8 @@ void zmq::socket_base_t::process_own (owned_t *object_) io_objects.insert (object_); } -void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, + const blob_t &peer_identity_) { attach_pipes (in_pipe_, out_pipe_); } diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 1ad9ed1..a1702a7 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -23,7 +23,6 @@ #include <set> #include <map> #include <vector> -#include <string> #include "../bindings/c/zmq.h" @@ -35,6 +34,7 @@ #include "stdint.hpp" #include "atomic_counter.hpp" #include "stdint.hpp" +#include "blob.hpp" namespace zmq { @@ -78,9 +78,10 @@ namespace zmq // There are two distinct types of sessions: those identified by name // and those identified by ordinal number. Thus two sets of session // management functions. - bool register_session (const char *name_, class session_t *session_); - void unregister_session (const char *name_); - class session_t *find_session (const char *name_); + bool register_session (const blob_t &peer_identity_, + class session_t *session_); + void unregister_session (const blob_t &peer_identity_); + class session_t *find_session (const blob_t &peer_identity_); uint64_t register_session (class session_t *session_); void unregister_session (uint64_t ordinal_); class session_t *find_session (uint64_t ordinal_); @@ -121,7 +122,8 @@ namespace zmq // Handlers for incoming commands. void process_own (class owned_t *object_); - void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_); + void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_, + const blob_t &peer_identity_); void process_term_req (class owned_t *object_); void process_term_ack (); void process_seqnum (); @@ -155,7 +157,7 @@ namespace zmq // within the socket, instead they are used by I/O objects owned by // the socket. As those objects can live in different threads, // the access is synchronised by mutex. - typedef std::map <std::string, session_t*> named_sessions_t; + typedef std::map <blob_t, session_t*> named_sessions_t; named_sessions_t named_sessions; typedef std::map <uint64_t, session_t*> unnamed_sessions_t; unnamed_sessions_t unnamed_sessions; diff --git a/src/sub.cpp b/src/sub.cpp index 31ee222..06ed896 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -28,7 +28,6 @@ zmq::sub_t::sub_t (class app_thread_t *parent_) : socket_base_t (parent_), has_message (false) { - options.type = ZMQ_SUB; options.requires_in = true; options.requires_out = false; zmq_msg_init (&message); diff --git a/src/upstream.cpp b/src/upstream.cpp index 390dcbe..bdcd5ef 100644 --- a/src/upstream.cpp +++ b/src/upstream.cpp @@ -25,7 +25,6 @@ zmq::upstream_t::upstream_t (class app_thread_t *parent_) : socket_base_t (parent_) { - options.type = ZMQ_UPSTREAM; options.requires_in = true; options.requires_out = false; } diff --git a/src/xrep.cpp b/src/xrep.cpp index 67a9a39..328a832 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -25,9 +25,16 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) : socket_base_t (parent_) { - options.type = ZMQ_XREP; options.requires_in = true; options.requires_out = true; + + // On connect, pipes are created only after initial handshaking. + // That way we are aware of the peer's identity when binding to the pipes. + options.immediate_connect = false; + + // XREP socket adds identity to inbound messages and strips identity + // from the outbound messages. + options.traceroute = true; } zmq::xrep_t::~xrep_t () diff --git a/src/xreq.cpp b/src/xreq.cpp index 691b66e..a4310f8 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -25,7 +25,6 @@ zmq::xreq_t::xreq_t (class app_thread_t *parent_) : socket_base_t (parent_) { - options.type = ZMQ_REQ; options.requires_in = true; options.requires_out = true; } diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index f502ffd..20e07bc 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -27,9 +27,7 @@ zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) : decoder_t <zmq_decoder_t> (bufsize_), - destination (NULL), - prefix (NULL), - prefix_size (0) + destination (NULL) { zmq_msg_init (&in_progress); @@ -39,9 +37,6 @@ zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) : zmq::zmq_decoder_t::~zmq_decoder_t () { - if (prefix) - free (prefix); - zmq_msg_close (&in_progress); } @@ -50,13 +45,9 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_) destination = destination_; } -void zmq::zmq_decoder_t::add_prefix (unsigned char *prefix_, - size_t prefix_size_) +void zmq::zmq_decoder_t::add_prefix (const blob_t &prefix_) { - prefix = malloc (prefix_size_); - zmq_assert (prefix); - memcpy (prefix, prefix_, prefix_size_); - prefix_size = prefix_size_; + prefix = prefix_; } bool zmq::zmq_decoder_t::one_byte_size_ready () @@ -72,15 +63,16 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - int rc = zmq_msg_init_size (&in_progress, prefix_size + *tmpbuf); + int rc = zmq_msg_init_size (&in_progress, prefix.size () + *tmpbuf); errno_assert (rc == 0); // Fill in the message prefix if any. - if (prefix) - memcpy (zmq_msg_data (&in_progress), prefix, prefix_size); + if (!prefix.empty ()) + memcpy (zmq_msg_data (&in_progress), prefix.data (), + prefix.size ()); - next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size, - *tmpbuf, &zmq_decoder_t::message_ready); + next_step ((unsigned char*) zmq_msg_data (&in_progress) + + prefix.size (), *tmpbuf, &zmq_decoder_t::message_ready); } return true; } @@ -95,14 +87,14 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - int rc = zmq_msg_init_size (&in_progress, prefix_size + size); + int rc = zmq_msg_init_size (&in_progress, prefix.size () + size); errno_assert (rc == 0); // Fill in the message prefix if any. - if (prefix) - memcpy (zmq_msg_data (&in_progress), prefix, prefix_size); + if (!prefix.empty ()) + memcpy (zmq_msg_data (&in_progress), prefix.data (), prefix.size ()); - next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size , + next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix.size (), size, &zmq_decoder_t::message_ready); return true; } diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp index dfabece..11ee6c2 100644 --- a/src/zmq_decoder.hpp +++ b/src/zmq_decoder.hpp @@ -23,6 +23,7 @@ #include "../bindings/c/zmq.h" #include "decoder.hpp" +#include "blob.hpp" namespace zmq { @@ -41,7 +42,7 @@ namespace zmq // Once called, all decoded messages will be prefixed by the specified // prefix. - void add_prefix (unsigned char *prefix_, size_t prefix_size_); + void add_prefix (const blob_t &prefix_); private: @@ -53,8 +54,7 @@ namespace zmq unsigned char tmpbuf [8]; ::zmq_msg_t in_progress; - void *prefix; - size_t prefix_size; + blob_t prefix; zmq_decoder_t (const zmq_decoder_t&); void operator = (const zmq_decoder_t&); diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index bda098c..75f3441 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -160,11 +160,10 @@ void zmq::zmq_engine_t::revive () out_event (); } -void zmq::zmq_engine_t::traceroute (unsigned char *identity_, - size_t identity_size_) +void zmq::zmq_engine_t::traceroute (const blob_t &identity_) { encoder.trim_prefix (); - decoder.add_prefix (identity_, identity_size_); + decoder.add_prefix (identity_); } void zmq::zmq_engine_t::error () diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index 174dd1a..8657e8e 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -47,7 +47,7 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); - void traceroute (unsigned char *identity_, size_t identity_size_); + void traceroute (const blob_t &identity_); // i_poll_events interface implementation. void in_event (); diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index 9492caa..9aebad0 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -72,15 +72,14 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) return false; // Retreieve the remote identity. - peer_identity.assign ((const char*) zmq_msg_data (msg_), + peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_), zmq_msg_size (msg_)); received = true; // Once the initial handshaking is over, XREP sockets should start // tracerouting individual messages. - if (options.type == ZMQ_XREP) - engine->traceroute ((unsigned char*) peer_identity.data (), - peer_identity.size ()); + if (options.traceroute) + engine->traceroute (peer_identity); return true; } @@ -164,11 +163,11 @@ void zmq::zmq_init_t::finalise () // If the peer has a unique name, find the associated session. If it // doesn't exist, create it. else if (!peer_identity.empty ()) { - session = owner->find_session (peer_identity.c_str ()); + session = owner->find_session (peer_identity); if (!session) { session = new (std::nothrow) session_t ( choose_io_thread (options.affinity), owner, options, - peer_identity.c_str ()); + peer_identity); zmq_assert (session); send_plug (session); send_own (owner, session); @@ -182,7 +181,7 @@ void zmq::zmq_init_t::finalise () // transient session. else { session = new (std::nothrow) session_t ( - choose_io_thread (options.affinity), owner, options, NULL); + choose_io_thread (options.affinity), owner, options, blob_t ()); zmq_assert (session); send_plug (session); send_own (owner, session); @@ -191,8 +190,8 @@ void zmq::zmq_init_t::finalise () session->inc_seqnum (); } - // No need to increment seqnum as it was laready incremented above. - send_attach (session, engine, false); + // No need to increment seqnum as it was already incremented above. + send_attach (session, engine, peer_identity, false); // Destroy the init object. engine = NULL; diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp index df14293..6f935c2 100644 --- a/src/zmq_init.hpp +++ b/src/zmq_init.hpp @@ -20,8 +20,6 @@ #ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__ #define __ZMQ_ZMQ_INIT_HPP_INCLUDED__ -#include <string> - #include "i_inout.hpp" #include "i_engine.hpp" #include "owned.hpp" @@ -29,6 +27,7 @@ #include "stdint.hpp" #include "options.hpp" #include "stdint.hpp" +#include "blob.hpp" namespace zmq { @@ -72,7 +71,7 @@ namespace zmq bool received; // Identity of the peer socket. - std::string peer_identity; + blob_t peer_identity; // TCP connecter creates session before the name of the peer is known. // Thus we know only its ordinal number. |