From c8e8f2a24cd339c548e06f75a3cef96454671a85 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 15 Jul 2011 11:24:33 +0200 Subject: ZMQ_IDENTITY socket option removed This patch simplifies the whole codebase significantly, including dropping depedency on libuuid. Signed-off-by: Martin Sustrik --- src/Makefile.am | 7 -- src/command.cpp | 39 -------- src/command.hpp | 7 -- src/connect_session.cpp | 48 +--------- src/connect_session.hpp | 9 +- src/named_session.cpp | 67 ------------- src/named_session.hpp | 56 ----------- src/object.cpp | 50 ++-------- src/object.hpp | 12 +-- src/options.cpp | 22 ----- src/options.hpp | 3 +- src/pair.cpp | 2 +- src/pair.hpp | 2 +- src/pgm_socket.cpp | 21 ++--- src/pull.cpp | 2 +- src/pull.hpp | 2 +- src/push.cpp | 2 +- src/push.hpp | 2 +- src/random.cpp | 42 ++++++--- src/random.hpp | 9 +- src/req.cpp | 6 +- src/session.cpp | 26 +----- src/session.hpp | 15 +-- src/socket_base.cpp | 70 ++------------ src/socket_base.hpp | 26 +----- src/transient_session.cpp | 2 +- src/transient_session.hpp | 2 +- src/uuid.cpp | 90 ------------------ src/uuid.hpp | 33 ------- src/xpub.cpp | 2 +- src/xpub.hpp | 2 +- src/xrep.cpp | 8 +- src/xrep.hpp | 3 +- src/xreq.cpp | 2 +- src/xreq.hpp | 2 +- src/xsub.cpp | 2 +- src/xsub.hpp | 2 +- src/zmq_connecter.cpp | 15 +-- src/zmq_init.cpp | 233 ---------------------------------------------- src/zmq_init.hpp | 98 ------------------- src/zmq_listener.cpp | 18 ++-- 41 files changed, 112 insertions(+), 949 deletions(-) delete mode 100644 src/command.cpp delete mode 100644 src/named_session.cpp delete mode 100644 src/named_session.hpp delete mode 100644 src/uuid.cpp delete mode 100644 src/uuid.hpp delete mode 100644 src/zmq_init.cpp delete mode 100644 src/zmq_init.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index e78eb1c..5049ad8 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -35,7 +35,6 @@ libzmq_la_SOURCES = \ msg.hpp \ mtrie.hpp \ mutex.hpp \ - named_session.hpp \ object.hpp \ options.hpp \ own.hpp \ @@ -68,7 +67,6 @@ libzmq_la_SOURCES = \ thread.hpp \ transient_session.hpp \ trie.hpp \ - uuid.hpp \ windows.hpp \ wire.hpp \ xpub.hpp \ @@ -79,10 +77,8 @@ libzmq_la_SOURCES = \ yqueue.hpp \ zmq_connecter.hpp \ zmq_engine.hpp \ - zmq_init.hpp \ zmq_listener.hpp \ clock.cpp \ - command.cpp \ ctx.cpp \ connect_session.cpp \ decoder.cpp \ @@ -100,7 +96,6 @@ libzmq_la_SOURCES = \ mailbox.cpp \ msg.cpp \ mtrie.cpp \ - named_session.cpp \ object.cpp \ options.cpp \ own.cpp \ @@ -129,7 +124,6 @@ libzmq_la_SOURCES = \ thread.cpp \ transient_session.cpp \ trie.cpp \ - uuid.cpp \ xpub.cpp \ xrep.cpp \ xreq.cpp \ @@ -137,7 +131,6 @@ libzmq_la_SOURCES = \ zmq.cpp \ zmq_connecter.cpp \ zmq_engine.cpp \ - zmq_init.cpp \ zmq_listener.cpp \ zmq_utils.cpp diff --git a/src/command.cpp b/src/command.cpp deleted file mode 100644 index e3f3d59..0000000 --- a/src/command.cpp +++ /dev/null @@ -1,39 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include - -#include "command.hpp" - -void zmq::deallocate_command (command_t *cmd_) -{ - switch (cmd_->type) { - case command_t::attach: - if (cmd_->args.attach.peer_identity) - free (cmd_->args.attach.peer_identity); - break; - case command_t::bind: - if (cmd_->args.bind.peer_identity) - free (cmd_->args.bind.peer_identity); - break; - default: - /* noop */; - } -} diff --git a/src/command.hpp b/src/command.hpp index 15cee0a..1513ca8 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -73,16 +73,12 @@ namespace zmq // session that the connection have failed. struct { struct i_engine *engine; - unsigned char peer_identity_size; - unsigned char *peer_identity; } attach; // Sent from session to socket to establish pipe(s) between them. // Caller have used inc_seqnum beforehand sending the command. struct { class pipe_t *pipe; - unsigned char peer_identity_size; - unsigned char *peer_identity; } bind; // Sent by pipe writer to inform dormant pipe reader that there @@ -146,9 +142,6 @@ namespace zmq } args; }; - // Function to deallocate dynamically allocated components of the command. - void deallocate_command (command_t *cmd_); - } #endif diff --git a/src/connect_session.cpp b/src/connect_session.cpp index fe7332a..14666a6 100644 --- a/src/connect_session.cpp +++ b/src/connect_session.cpp @@ -29,15 +29,12 @@ zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_, const char *protocol_, const char *address_) : session_t (io_thread_, socket_, options_), protocol (protocol_), - address (address_), - connected (false) + address (address_) { } zmq::connect_session_t::~connect_session_t () { - if (connected && !peer_identity.empty ()) - unregister_session (peer_identity); } void zmq::connect_session_t::process_plug () @@ -87,7 +84,7 @@ void zmq::connect_session_t::start_connecting (bool wait_) int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); zmq_assert (rc == 0); - send_attach (this, pgm_sender, blob_t ()); + send_attach (this, pgm_sender); } else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) { @@ -99,7 +96,7 @@ void zmq::connect_session_t::start_connecting (bool wait_) int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); zmq_assert (rc == 0); - send_attach (this, pgm_receiver, blob_t ()); + send_attach (this, pgm_receiver); } else zmq_assert (false); @@ -111,45 +108,8 @@ void zmq::connect_session_t::start_connecting (bool wait_) zmq_assert (false); } -bool zmq::connect_session_t::xattached (const blob_t &peer_identity_) +bool zmq::connect_session_t::xattached () { - // If there was no previous connection... - if (!connected) { - - // Peer has transient identity. - if (peer_identity_.empty () || peer_identity_ [0] == 0) { - connected = true; - return true; - } - - // Peer has strong identity. Let's register it and check whether noone - // else is using the same identity. - if (!register_session (peer_identity_, this)) { - log ("DPID: duplicate peer identity - disconnecting peer"); - return false; - } - connected = true; - peer_identity = peer_identity_; - return true; - } - - // New engine from listener can conflict with existing engine. - // Alternatively, new engine created by reconnection process can - // conflict with engine supplied by listener in the meantime. - if (has_engine ()) { - log ("DPID: duplicate peer identity - disconnecting peer"); - return false; - } - - // If there have been a connection before, we have to check whether - // peer's identity haven't changed in the meantime. - if ((peer_identity_.empty () || peer_identity_ [0] == 0) && - peer_identity.empty ()) - return true; - if (peer_identity != peer_identity_) { - log ("CHID: peer have changed identity - disconnecting peer"); - return false; - } return true; } diff --git a/src/connect_session.hpp b/src/connect_session.hpp index 3b8a26b..bc25d26 100644 --- a/src/connect_session.hpp +++ b/src/connect_session.hpp @@ -44,7 +44,7 @@ namespace zmq private: // Handlers for events from session base class. - bool xattached (const blob_t &peer_identity_); + bool xattached (); bool xdetached (); // Start the connection process. @@ -57,13 +57,6 @@ namespace zmq std::string protocol; std::string address; - // If true, the session was already connected to the peer. - bool connected; - - // Identity of the peer. If 'connected' is false, it has no meaning. - // Otherwise, if it's empty, the peer has transient identity. - blob_t peer_identity; - connect_session_t (const connect_session_t&); const connect_session_t &operator = (const connect_session_t&); }; diff --git a/src/named_session.cpp b/src/named_session.cpp deleted file mode 100644 index 8e43fb0..0000000 --- a/src/named_session.cpp +++ /dev/null @@ -1,67 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "named_session.hpp" -#include "socket_base.hpp" - -zmq::named_session_t::named_session_t (class io_thread_t *io_thread_, - socket_base_t *socket_, const options_t &options_, - const blob_t &peer_identity_) : - session_t (io_thread_, socket_, options_), - peer_identity (peer_identity_) -{ - // Make double sure that the peer's identity is not transient. - zmq_assert (!peer_identity.empty ()); - zmq_assert (peer_identity [0] != 0); - - bool ok = socket_->register_session (peer_identity, this); - - // If new session is being created, the caller should have already - // checked that the session for specified identity doesn't exist yet. - // Thus, register_session should not fail. - zmq_assert (ok); -} - -zmq::named_session_t::~named_session_t () -{ - // Unregister the session from the global list of named sessions. - unregister_session (peer_identity); -} - -bool zmq::named_session_t::xattached (const blob_t &peer_identity_) -{ - // Double check that identities match. - zmq_assert (peer_identity == peer_identity_); - - // If the session already has an engine attached, destroy new one. - if (has_engine ()) { - log ("DPID: duplicate peer identity - disconnecting peer"); - return false; - } - return true; -} - -bool zmq::named_session_t::xdetached () -{ - // Do nothing. Named sessions are never destroyed because of disconnection. - // Neither they have to actively reconnect. - return true; -} - diff --git a/src/named_session.hpp b/src/named_session.hpp deleted file mode 100644 index 10af8cc..0000000 --- a/src/named_session.hpp +++ /dev/null @@ -1,56 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_NAMED_SESSION_HPP_INCLUDED__ -#define __ZMQ_NAMED_SESSION_HPP_INCLUDED__ - -#include "session.hpp" -#include "blob.hpp" - -namespace zmq -{ - - // Named session is created by listener object when the peer identifies - // itself by a strong name. Named session survives reconnections. - - class named_session_t : public session_t - { - public: - - named_session_t (class io_thread_t *io_thread_, - class socket_base_t *socket_, const options_t &options_, - const blob_t &peer_identity_); - ~named_session_t (); - - // Handlers for events from session base class. - bool xattached (const blob_t &peer_identity_); - bool xdetached (); - - private: - - blob_t peer_identity; - - named_session_t (const named_session_t&); - const named_session_t &operator = (const named_session_t&); - }; - -} - -#endif diff --git a/src/object.cpp b/src/object.cpp index ba77bf0..7f7d7f8 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -82,17 +82,12 @@ void zmq::object_t::process_command (command_t &cmd_) break; case command_t::attach: - process_attach (cmd_.args.attach.engine, - cmd_.args.attach.peer_identity ? - blob_t (cmd_.args.attach.peer_identity, - cmd_.args.attach.peer_identity_size) : blob_t ()); + process_attach (cmd_.args.attach.engine); process_seqnum (); break; case command_t::bind: - process_bind (cmd_.args.bind.pipe, cmd_.args.bind.peer_identity ? - blob_t (cmd_.args.bind.peer_identity, - cmd_.args.bind.peer_identity_size) : blob_t ()); + process_bind (cmd_.args.bind.pipe); process_seqnum (); break; @@ -131,10 +126,6 @@ void zmq::object_t::process_command (command_t &cmd_) default: zmq_assert (false); } - - // The assumption here is that each command is processed once only, - // so deallocating it after processing is all right. - deallocate_command (&cmd_); } int zmq::object_t::register_endpoint (const char *addr_, endpoint_t &endpoint_) @@ -211,7 +202,7 @@ void zmq::object_t::send_own (own_t *destination_, own_t *object_) } void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, - const blob_t &peer_identity_, bool inc_seqnum_) + bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -223,25 +214,11 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, cmd.destination = destination_; cmd.type = command_t::attach; cmd.args.attach.engine = engine_; - if (peer_identity_.empty ()) { - cmd.args.attach.peer_identity_size = 0; - cmd.args.attach.peer_identity = NULL; - } - else { - zmq_assert (peer_identity_.size () <= 0xff); - cmd.args.attach.peer_identity_size = - (unsigned char) peer_identity_.size (); - cmd.args.attach.peer_identity = - (unsigned char*) malloc (peer_identity_.size ()); - alloc_assert (cmd.args.attach.peer_identity_size); - memcpy (cmd.args.attach.peer_identity, peer_identity_.data (), - peer_identity_.size ()); - } send_command (cmd); } void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_, - const blob_t &peer_identity_, bool inc_seqnum_) + bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -253,20 +230,6 @@ void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_, cmd.destination = destination_; cmd.type = command_t::bind; cmd.args.bind.pipe = pipe_; - if (peer_identity_.empty ()) { - cmd.args.bind.peer_identity_size = 0; - cmd.args.bind.peer_identity = NULL; - } - else { - zmq_assert (peer_identity_.size () <= 0xff); - cmd.args.bind.peer_identity_size = - (unsigned char) peer_identity_.size (); - cmd.args.bind.peer_identity = - (unsigned char*) malloc (peer_identity_.size ()); - alloc_assert (cmd.args.bind.peer_identity_size); - memcpy (cmd.args.bind.peer_identity, peer_identity_.data (), - peer_identity_.size ()); - } send_command (cmd); } @@ -413,13 +376,12 @@ void zmq::object_t::process_own (own_t *object_) zmq_assert (false); } -void zmq::object_t::process_attach (i_engine *engine_, - const blob_t &peer_identity_) +void zmq::object_t::process_attach (i_engine *engine_) { zmq_assert (false); } -void zmq::object_t::process_bind (pipe_t *pipe_, const blob_t &peer_identity_) +void zmq::object_t::process_bind (pipe_t *pipe_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index fbad0ea..e05b958 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -22,7 +22,6 @@ #define __ZMQ_OBJECT_HPP_INCLUDED__ #include "stdint.hpp" -#include "blob.hpp" namespace zmq { @@ -64,10 +63,9 @@ namespace zmq void send_own (class own_t *destination_, class own_t *object_); void send_attach (class session_t *destination_, - struct i_engine *engine_, const blob_t &peer_identity_, - bool inc_seqnum_ = true); + struct i_engine *engine_, bool inc_seqnum_ = true); void send_bind (class own_t *destination_, class pipe_t *pipe_, - const blob_t &peer_identity_, bool inc_seqnum_ = true); + bool inc_seqnum_ = true); void send_activate_read (class pipe_t *destination_); void send_activate_write (class pipe_t *destination_, uint64_t msgs_read_); @@ -87,10 +85,8 @@ namespace zmq virtual void process_stop (); virtual void process_plug (); virtual void process_own (class own_t *object_); - virtual void process_attach (struct i_engine *engine_, - const blob_t &peer_identity_); - virtual void process_bind (class pipe_t *pipe_, - const blob_t &peer_identity_); + virtual void process_attach (struct i_engine *engine_); + virtual void process_bind (class pipe_t *pipe_); virtual void process_activate_read (); virtual void process_activate_write (uint64_t msgs_read_); virtual void process_hiccup (void *pipe_); diff --git a/src/options.cpp b/src/options.cpp index 63a1d91..45eb4aa 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -75,19 +75,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, affinity = *((uint64_t*) optval_); return 0; - case ZMQ_IDENTITY: - - // Empty identity is invalid as well as identity longer than - // 255 bytes. Identity starting with binary zero is invalid - // as these are used for auto-generated identities. - if (optvallen_ < 1 || optvallen_ > 255 || - *((const unsigned char*) optval_) == 0) { - errno = EINVAL; - return -1; - } - identity.assign ((const unsigned char*) optval_, optvallen_); - return 0; - case ZMQ_RATE: if (optvallen_ != sizeof (int) || *((int*) optval_) <= 0) { errno = EINVAL; @@ -229,15 +216,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (uint64_t); return 0; - case ZMQ_IDENTITY: - if (*optvallen_ < identity.size ()) { - errno = EINVAL; - return -1; - } - memcpy (optval_, identity.data (), identity.size ()); - *optvallen_ = identity.size (); - return 0; - case ZMQ_RATE: if (*optvallen_ < sizeof (int)) { errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index f1982da..d8847f5 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -23,7 +23,6 @@ #include "stddef.h" #include "stdint.hpp" -#include "blob.hpp" namespace zmq { @@ -39,8 +38,8 @@ namespace zmq int sndhwm; int rcvhwm; + // I/O thread affinity. uint64_t affinity; - blob_t identity; // Maximum tranfer rate [kb/s]. Default 100kb/s. int rate; diff --git a/src/pair.cpp b/src/pair.cpp index 30b56e6..12a1881 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -35,7 +35,7 @@ zmq::pair_t::~pair_t () zmq_assert (!pipe); } -void zmq::pair_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) +void zmq::pair_t::xattach_pipe (pipe_t *pipe_) { zmq_assert (!pipe); pipe = pipe_; diff --git a/src/pair.hpp b/src/pair.hpp index 1ddf50e..59300ae 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -35,7 +35,7 @@ namespace zmq ~pair_t (); // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); + void xattach_pipe (class pipe_t *pipe_); int xsend (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 90a265c..98aeeb9 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -38,7 +38,7 @@ #include "pgm_socket.hpp" #include "config.hpp" #include "err.hpp" -#include "uuid.hpp" +#include "random.hpp" #include "stdint.hpp" #ifndef MSG_ERRQUEUE @@ -253,20 +253,13 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) addr.sa_port = port_number; addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT; - if (options.identity.size () > 0) { - - // Create gsi from identity. - if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, - options.identity.data (), options.identity.size ())) - goto err_abort; - } else { + // Create random GSI. + uint32_t buf [2]; + buf [0] = generate_random (); + buf [1] = generate_random (); + if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t*) buf, 8)) + goto err_abort; - // Generate GSI from UUID. - unsigned char buf [16]; - generate_uuid (buf); - if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, buf, 16)) - goto err_abort; - } // Bind a transport to the specified network devices. struct pgm_interface_req_t if_req; diff --git a/src/pull.cpp b/src/pull.cpp index 5e48777..afde236 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -33,7 +33,7 @@ zmq::pull_t::~pull_t () { } -void zmq::pull_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) +void zmq::pull_t::xattach_pipe (pipe_t *pipe_) { zmq_assert (pipe_); fq.attach (pipe_); diff --git a/src/pull.hpp b/src/pull.hpp index cbcf05a..be82af9 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -38,7 +38,7 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); + void xattach_pipe (class pipe_t *pipe_); int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); void xread_activated (class pipe_t *pipe_); diff --git a/src/push.cpp b/src/push.cpp index 44ebc07..77cc9d8 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -33,7 +33,7 @@ zmq::push_t::~push_t () { } -void zmq::push_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) +void zmq::push_t::xattach_pipe (pipe_t *pipe_) { zmq_assert (pipe_); lb.attach (pipe_); diff --git a/src/push.hpp b/src/push.hpp index 5dabe14..222a62d 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -38,7 +38,7 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); + void xattach_pipe (class pipe_t *pipe_); int xsend (class msg_t *msg_, int flags_); bool xhas_out (); void xwrite_activated (class pipe_t *pipe_); diff --git a/src/random.cpp b/src/random.cpp index 2a1d7d6..9f7768c 100644 --- a/src/random.cpp +++ b/src/random.cpp @@ -18,23 +18,35 @@ along with this program. If not, see . */ +#include + +#include "platform.hpp" +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#endif + #include "random.hpp" #include "stdint.hpp" -#include "uuid.hpp" -#include "err.hpp" +#include "clock.hpp" + +void zmq::seed_random () +{ +#if defined ZMQ_HAVE_WINDOWS + int pid = (int) GetCurrentProcessId (); +#else + int pid = (int) getpid (); +#endif + srand ((unsigned int) (clock_t::now_us () + pid)); +} -// Here we can use different ways of generating random data, as avialable -// on different platforms. At the moment, we'll assume the UUID is random -// enough to use for that purpose. -void zmq::generate_random (void *buf_, size_t size_) +uint32_t zmq::generate_random () { - // Collapsing an UUID into 4 bytes. - zmq_assert (size_ == 4); - uint32_t buff [4]; - generate_uuid ((void*) buff); - uint32_t result = buff [0]; - result ^= buff [1]; - result ^= buff [2]; - result ^= buff [3]; - *((uint32_t*) buf_) = result; + // Compensate for the fact that rand() returns signed integer. + uint32_t low = (uint32_t) rand (); + uint32_t high = (uint32_t) rand (); + high <<= (sizeof (int) * 8 - 1); + return high | low; } + diff --git a/src/random.hpp b/src/random.hpp index 0b99bbd..d88b5ee 100644 --- a/src/random.hpp +++ b/src/random.hpp @@ -21,13 +21,16 @@ #ifndef __ZMQ_RANDOM_HPP_INCLUDED__ #define __ZMQ_RANDOM_HPP_INCLUDED__ -#include +#include "stdint.hpp" namespace zmq { - // Generates truly random bytes (not pseudo-random). - void generate_random (void *buf_, size_t size_); + // Seeds the random number generator. + void seed_random (); + + // Generates random value. + uint32_t generate_random (); } diff --git a/src/req.cpp b/src/req.cpp index e0e3321..7831672 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -21,7 +21,6 @@ #include "req.hpp" #include "err.hpp" #include "msg.hpp" -#include "uuid.hpp" #include "wire.hpp" #include "random.hpp" #include "likely.hpp" @@ -30,12 +29,9 @@ zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) : xreq_t (parent_, tid_), receiving_reply (false), message_begins (true), - request_id (0) + request_id (generate_random ()) { options.type = ZMQ_REQ; - - // Start the request ID sequence at an random point. - generate_random (&request_id, sizeof (request_id)); } zmq::req_t::~req_t () diff --git a/src/session.cpp b/src/session.cpp index 1ca69cf..0dd0e34 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -159,8 +159,7 @@ void zmq::session_t::process_plug () { } -void zmq::session_t::process_attach (i_engine *engine_, - const blob_t &peer_identity_) +void zmq::session_t::process_attach (i_engine *engine_) { // If some other object (e.g. init) notifies us that the connection failed // without creating an engine we need to start the reconnection process. @@ -171,7 +170,7 @@ void zmq::session_t::process_attach (i_engine *engine_, } // Trigger the notfication event about the attachment. - if (!attached (peer_identity_)) { + if (!attached ()) { delete engine_; return; } @@ -193,7 +192,7 @@ void zmq::session_t::process_attach (i_engine *engine_, pipe = pipes [0]; // Ask socket to plug into the remote end of the pipe. - send_bind (socket, pipes [1], peer_identity_); + send_bind (socket, pipes [1]); } // Plug in the engine. @@ -272,24 +271,9 @@ void zmq::session_t::timer_event (int id_) pipe->terminate (false); } -bool zmq::session_t::has_engine () +bool zmq::session_t::attached () { - return engine != NULL; -} - -bool zmq::session_t::register_session (const blob_t &name_, session_t *session_) -{ - return socket->register_session (name_, session_); -} - -void zmq::session_t::unregister_session (const blob_t &name_) -{ - socket->unregister_session (name_); -} - -bool zmq::session_t::attached (const blob_t &peer_identity_) -{ - return xattached (peer_identity_); + return xattached (); } void zmq::session_t::detached () diff --git a/src/session.hpp b/src/session.hpp index 4c17930..60aa7c5 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -24,7 +24,6 @@ #include "own.hpp" #include "i_engine.hpp" #include "io_object.hpp" -#include "blob.hpp" #include "pipe.hpp" namespace zmq @@ -64,27 +63,19 @@ namespace zmq // the termination process when session is about to be detached from // the peer. If it returns false, session will be terminated. // To be overloaded by the derived session type. - virtual bool xattached (const blob_t &peer_identity_) = 0; + virtual bool xattached () = 0; virtual bool xdetached () = 0; - // Returns true if there is an engine attached to the session. - bool has_engine (); - - // Allows derives session types to (un)register session names. - bool register_session (const blob_t &name_, class session_t *session_); - void unregister_session (const blob_t &name_); - ~session_t (); private: - bool attached (const blob_t &peer_identity_); + bool attached (); void detached (); // Handlers for incoming commands. void process_plug (); - void process_attach (struct i_engine *engine_, - const blob_t &peer_identity_); + void process_attach (struct i_engine *engine_); void process_term (int linger_); // i_poll_events handlers. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 1bc80f1..bc58e8b 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -45,7 +45,6 @@ #include "ctx.hpp" #include "platform.hpp" #include "likely.hpp" -#include "uuid.hpp" #include "msg.hpp" #include "pair.hpp" @@ -128,11 +127,6 @@ zmq::socket_base_t::~socket_base_t () { zmq_assert (destroyed); - // Check whether there are no session leaks. - sessions_sync.lock (); - zmq_assert (sessions.empty ()); - sessions_sync.unlock (); - // Mark the socket as dead. tag = 0xdeadbeef; } @@ -212,23 +206,14 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) return 0; } -void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, - const blob_t &peer_identity_) +void zmq::socket_base_t::attach_pipe (pipe_t *pipe_) { // First, register the pipe so that we can terminate it later on. pipe_->set_event_sink (this); pipes.push_back (pipe_); - // Then, pass the pipe to the specific socket type. - // If the peer haven't specified it's identity, let's generate one. - if (peer_identity_.size ()) { - xattach_pipe (pipe_, peer_identity_); - } - else { - blob_t identity (17, 0); - generate_uuid ((unsigned char*) identity.data () + 1); - xattach_pipe (pipe_, identity); - } + // Let the derived socket type know about new pipe. + xattach_pipe (pipe_); // If the socket is already being closed, ask any new pipes to terminate // straight away. @@ -423,12 +408,12 @@ int zmq::socket_base_t::connect (const char *addr_) errno_assert (rc == 0); // Attach local end of the pipe to this socket object. - attach_pipe (pipes [0], peer.options.identity); + attach_pipe (pipes [0]); // Attach remote end of the pipe to the peer socket. Note that peer's // seqnum was incremented in find_endpoint function. We don't need it // increased here. - send_bind (peer.socket, pipes [1], options.identity, false); + send_bind (peer.socket, pipes [1], false); return 0; } @@ -454,7 +439,7 @@ int zmq::socket_base_t::connect (const char *addr_) errno_assert (rc == 0); // Attach local end of the pipe to the socket object. - attach_pipe (pipes [0], blob_t ()); + attach_pipe (pipes [0]); // Attach remote end of the pipe to the session object later on. session->attach_pipe (pipes [1]); @@ -654,44 +639,6 @@ bool zmq::socket_base_t::has_out () return xhas_out (); } -bool zmq::socket_base_t::register_session (const blob_t &name_, - session_t *session_) -{ - sessions_sync.lock (); - bool registered = sessions.insert ( - sessions_t::value_type (name_, session_)).second; - sessions_sync.unlock (); - return registered; -} - -void zmq::socket_base_t::unregister_session (const blob_t &name_) -{ - sessions_sync.lock (); - sessions_t::iterator it = sessions.find (name_); - zmq_assert (it != sessions.end ()); - sessions.erase (it); - sessions_sync.unlock (); -} - -zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_) -{ - sessions_sync.lock (); - sessions_t::iterator it = sessions.find (name_); - if (it == sessions.end ()) { - sessions_sync.unlock (); - return NULL; - } - session_t *session = it->second; - - // Prepare the session for subsequent attach command. - // Note the connect sessions have NULL pointers registered here. - if (session) - session->inc_seqnum (); - - sessions_sync.unlock (); - return session; -} - void zmq::socket_base_t::start_reaping (poller_t *poller_) { // Plug the socket to the reaper thread. @@ -770,10 +717,9 @@ void zmq::socket_base_t::process_stop () ctx_terminated = true; } -void zmq::socket_base_t::process_bind (pipe_t *pipe_, - const blob_t &peer_identity_) +void zmq::socket_base_t::process_bind (pipe_t *pipe_) { - attach_pipe (pipe_, peer_identity_); + attach_pipe (pipe_); } void zmq::socket_base_t::process_unplug () diff --git a/src/socket_base.hpp b/src/socket_base.hpp index f114e9d..fb60bbe 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -21,21 +21,17 @@ #ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__ #define __ZMQ_SOCKET_BASE_HPP_INCLUDED__ -#include -#include +#include #include "own.hpp" #include "array.hpp" -#include "mutex.hpp" #include "stdint.hpp" #include "poller.hpp" #include "atomic_counter.hpp" #include "i_poll_events.hpp" #include "mailbox.hpp" #include "stdint.hpp" -#include "blob.hpp" #include "pipe.hpp" -#include "own.hpp" namespace zmq { @@ -78,11 +74,6 @@ namespace zmq bool has_in (); bool has_out (); - // Registry of named sessions. - bool register_session (const blob_t &name_, class session_t *session_); - void unregister_session (const blob_t &name_); - class session_t *find_session (const blob_t &name_); - // Using this function reaper thread ask the socket to regiter with // its poller. void start_reaping (poller_t *poller_); @@ -106,8 +97,7 @@ namespace zmq // Concrete algorithms for the x- methods are to be defined by // individual socket types. - virtual void xattach_pipe (class pipe_t *pipe_, - const blob_t &peer_identity_) = 0; + virtual void xattach_pipe (class pipe_t *pipe_) = 0; // The default implementation assumes there are no specific socket // options for the particular socket type. If not so, overload this @@ -158,7 +148,7 @@ namespace zmq int check_protocol (const std::string &protocol_); // Register the pipe with this socket. - void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); + void attach_pipe (class pipe_t *pipe_); // Processes commands sent to this socket (if any). If timeout is -1, // returns only after at least one command was processed. @@ -168,7 +158,7 @@ namespace zmq // Handlers for incoming commands. void process_stop (); - void process_bind (class pipe_t *pipe_, const blob_t &peer_identity_); + void process_bind (class pipe_t *pipe_); void process_unplug (); void process_term (int linger_); @@ -195,14 +185,6 @@ namespace zmq // True if the last message received had MORE flag set. bool rcvmore; - // Lists of existing sessions. This list is never referenced from - // within the socket, instead it is used by objects owned by - // the socket. As those objects can live in different threads, - // the access is synchronised by mutex. - typedef std::map sessions_t; - sessions_t sessions; - mutex_t sessions_sync; - socket_base_t (const socket_base_t&); const socket_base_t &operator = (const socket_base_t&); }; diff --git a/src/transient_session.cpp b/src/transient_session.cpp index b3c80b0..d389ff4 100644 --- a/src/transient_session.cpp +++ b/src/transient_session.cpp @@ -30,7 +30,7 @@ zmq::transient_session_t::~transient_session_t () { } -bool zmq::transient_session_t::xattached (const blob_t &peer_identity_) +bool zmq::transient_session_t::xattached () { // Transient session is always valid. return true; diff --git a/src/transient_session.hpp b/src/transient_session.hpp index 55c2b8a..eff6b65 100644 --- a/src/transient_session.hpp +++ b/src/transient_session.hpp @@ -40,7 +40,7 @@ namespace zmq private: // Handlers for events from session base class. - bool xattached (const blob_t &peer_identity_); + bool xattached (); bool xdetached (); transient_session_t (const transient_session_t&); diff --git a/src/uuid.cpp b/src/uuid.cpp deleted file mode 100644 index 02f716e..0000000 --- a/src/uuid.cpp +++ /dev/null @@ -1,90 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include - -#include "uuid.hpp" -#include "err.hpp" -#include "stdint.hpp" -#include "platform.hpp" - -#if defined ZMQ_HAVE_WINDOWS - -#include - -void zmq::generate_uuid (void *buf_) -{ - RPC_STATUS ret = UuidCreate ((::UUID*) buf_); - zmq_assert (ret == RPC_S_OK); -} - -#elif defined ZMQ_HAVE_FREEBSD || defined ZMQ_HAVE_NETBSD - -#include - -void zmq::generate_uuid (void *buf_) -{ - uint32_t status; - uuid_create ((::uuid_t*) buf_, &status); - zmq_assert (status == uuid_s_ok); -} - -#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_SOLARIS ||\ - defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_CYGWIN - -#include - -void zmq::generate_uuid (void *buf_) -{ - uuid_generate ((unsigned char*) buf_); -} - -#elif defined ZMQ_HAVE_OPENVMS - -#include - -void zmq::generate_uuid (void *buf_) -{ - sys$create_uid(buf_); -} - -#else - -#include - -void zmq::generate_uuid (void *buf_) -{ - unsigned char *buf = (unsigned char*) buf_; - - // Generate random value. - int ret = RAND_bytes (buf, 16); - zmq_assert (ret == 1); - - // Set UUID variant to 2 (UUID as specified in RFC4122). - const unsigned char variant = 2; - buf [8] = (buf [8] & 0x3f) | (variant << 6); - - // Set UUID version to 4 (randomly or pseudo-randomly generated UUID). - const unsigned char version = 4; - buf [6] = (buf [6] & 0x0f) | (version << 4); -} - -#endif - diff --git a/src/uuid.hpp b/src/uuid.hpp deleted file mode 100644 index c338208..0000000 --- a/src/uuid.hpp +++ /dev/null @@ -1,33 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_UUID_HPP_INCLUDED__ -#define __ZMQ_UUID_HPP_INCLUDED__ - -namespace zmq -{ - - // This function provides RFC 4122 (a Universally Unique IDentifier) - // implementation. The resulting UUID will be 16 bytes long. - void generate_uuid (void *buf_); - -} - -#endif diff --git a/src/xpub.cpp b/src/xpub.cpp index 7e4eecc..8da9cf9 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -36,7 +36,7 @@ zmq::xpub_t::~xpub_t () { } -void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) +void zmq::xpub_t::xattach_pipe (pipe_t *pipe_) { zmq_assert (pipe_); dist.attach (pipe_); diff --git a/src/xpub.hpp b/src/xpub.hpp index a2e7335..320a1fe 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -41,7 +41,7 @@ namespace zmq ~xpub_t (); // Implementations of virtual functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); + void xattach_pipe (class pipe_t *pipe_); int xsend (class msg_t *msg_, int flags_); bool xhas_out (); int xrecv (class msg_t *msg_, int flags_); diff --git a/src/xrep.cpp b/src/xrep.cpp index 153be46..9b57435 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -29,7 +29,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : prefetched (false), more_in (false), current_out (NULL), - more_out (false) + more_out (false), + next_peer_id (generate_random ()) { options.type = ZMQ_XREP; @@ -38,9 +39,6 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : options.delay_on_disconnect = false; prefetched_msg.init (); - - // Start the peer ID sequence from a random point. - generate_random (&next_peer_id, sizeof (next_peer_id)); } zmq::xrep_t::~xrep_t () @@ -49,7 +47,7 @@ zmq::xrep_t::~xrep_t () prefetched_msg.close (); } -void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) +void zmq::xrep_t::xattach_pipe (pipe_t *pipe_) { zmq_assert (pipe_); diff --git a/src/xrep.hpp b/src/xrep.hpp index 41c06b8..07f10ba 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -41,8 +41,7 @@ namespace zmq ~xrep_t (); // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_, - const blob_t &peer_identity_); + void xattach_pipe (class pipe_t *pipe_); int xsend (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); diff --git a/src/xreq.cpp b/src/xreq.cpp index 85e2238..7b66137 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -36,7 +36,7 @@ zmq::xreq_t::~xreq_t () { } -void zmq::xreq_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) +void zmq::xreq_t::xattach_pipe (pipe_t *pipe_) { zmq_assert (pipe_); fq.attach (pipe_); diff --git a/src/xreq.hpp b/src/xreq.hpp index 5bf1a03..a427ba9 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -40,7 +40,7 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); + void xattach_pipe (class pipe_t *pipe_); int xsend (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); diff --git a/src/xsub.cpp b/src/xsub.cpp index 98abc87..cf51cdb 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -39,7 +39,7 @@ zmq::xsub_t::~xsub_t () errno_assert (rc == 0); } -void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) +void zmq::xsub_t::xattach_pipe (pipe_t *pipe_) { zmq_assert (pipe_); fq.attach (pipe_); diff --git a/src/xsub.hpp b/src/xsub.hpp index 03b3178..ea59cdb 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -41,7 +41,7 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); + void xattach_pipe (class pipe_t *pipe_); int xsend (class msg_t *msg_, int flags_); bool xhas_out (); int xrecv (class msg_t *msg_, int flags_); diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index ca9bb77..0512c3c 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -30,7 +30,6 @@ #include "zmq_connecter.hpp" #include "zmq_engine.hpp" -#include "zmq_init.hpp" #include "io_thread.hpp" #include "err.hpp" @@ -86,16 +85,12 @@ void zmq::zmq_connecter_t::out_event () return; } - // Choose I/O thread to run connecter in. Given that we are already - // running in an I/O thread, there must be at least one available. - io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_assert (io_thread); + // Create the engine object for this connection. + zmq_engine_t *engine = new (std::nothrow) zmq_engine_t (fd, options); + alloc_assert (engine); - // Create an init object. - zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, NULL, - session, fd, options); - alloc_assert (init); - launch_sibling (init); + // Attach the engine to the corresponding session object. + send_attach (session, engine); // Shut the connecter down. terminate (); diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp deleted file mode 100644 index 8352719..0000000 --- a/src/zmq_init.cpp +++ /dev/null @@ -1,233 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include - -#include "zmq_init.hpp" -#include "transient_session.hpp" -#include "named_session.hpp" -#include "socket_base.hpp" -#include "zmq_engine.hpp" -#include "io_thread.hpp" -#include "session.hpp" -#include "uuid.hpp" -#include "blob.hpp" -#include "wire.hpp" -#include "err.hpp" - -zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, - socket_base_t *socket_, session_t *session_, fd_t fd_, - const options_t &options_) : - own_t (io_thread_, options_), - ephemeral_engine (NULL), - received (false), - socket (socket_), - session (session_), - io_thread (io_thread_) -{ - // Create the engine object for this connection. - engine = new (std::nothrow) zmq_engine_t (fd_, options); - alloc_assert (engine); - - // Generate an unique identity. - peer_identity.resize (17); - peer_identity [0] = 0; - generate_uuid (&peer_identity [1]); - - // Create a list of messages to send on connection initialisation. - if (!options.identity.empty ()) { - msg_t msg; - int rc = msg.init_size (options.identity.size ()); - errno_assert (rc == 0); - memcpy (msg.data () , options.identity.data (), msg.size ()); - to_send.push_back (msg); - } - else { - msg_t msg; - int rc = msg.init (); - errno_assert (rc == 0); - to_send.push_back (msg); - } -} - -zmq::zmq_init_t::~zmq_init_t () -{ - if (engine) - engine->terminate (); - - // If there are unsent props still queued deallocate them. - for (to_send_t::iterator it = to_send.begin (); it != to_send.end (); - ++it) { - int rc = it->close (); - errno_assert (rc == 0); - } - to_send.clear (); -} - -bool zmq::zmq_init_t::read (msg_t *msg_) -{ - // If the identity was already sent, do nothing. - if (to_send.empty ()) - return false; - - // Pass next property to the engine. - *msg_ = to_send.front (); - to_send.erase (to_send.begin ()); - - // Try finalize initialization. - finalise_initialisation (); - - return true; -} - -bool zmq::zmq_init_t::write (msg_t *msg_) -{ - // If identity was already received, we are not interested - // in subsequent messages. - if (received) - return false; - - // Retrieve the peer's identity, if any. - zmq_assert (!(msg_->flags () & msg_t::more)); - size_t size = msg_->size (); - if (size) { - unsigned char *data = (unsigned char*) msg_->data (); - peer_identity.assign (data, size); - } - - received = true; - finalise_initialisation (); - - return true; -} - -void zmq::zmq_init_t::flush () -{ - // Check if there's anything to flush. - if (!received) - return; - - // Initialization is done, dispatch engine. - if (ephemeral_engine) - dispatch_engine (); -} - -void zmq::zmq_init_t::detach () -{ - // This function is called by engine when disconnection occurs. - - // If there is an associated session, send it a null engine to let it know - // that connection process was unsuccesful. - if (session) - send_attach (session, NULL, blob_t (), true); - - // The engine will destroy itself, so let's just drop the pointer here and - // start termination of the init object. - engine = NULL; - terminate (); -} - -void zmq::zmq_init_t::process_plug () -{ - zmq_assert (engine); - engine->plug (io_thread, this); -} - -void zmq::zmq_init_t::process_unplug () -{ - if (engine) - engine->unplug (); -} - -void zmq::zmq_init_t::finalise_initialisation () -{ - // Unplug and prepare to dispatch engine. - if (to_send.empty () && received) { - ephemeral_engine = engine; - engine = NULL; - ephemeral_engine->unplug (); - return; - } -} - -void zmq::zmq_init_t::dispatch_engine () -{ - if (to_send.empty () && received) { - - // Engine must be detached. - zmq_assert (!engine); - zmq_assert (ephemeral_engine); - - // If we know what session we belong to, it's easy, just send the - // engine to that session and destroy the init object. Note that we - // know about the session only if this object is owned by it. Thus, - // lifetime of this object in contained in the lifetime of the session - // so the pointer cannot become invalid without notice. - if (session) { - send_attach (session, ephemeral_engine, peer_identity, true); - terminate (); - return; - } - - // All the cases below are listener-based. Therefore we need the socket - // reference so that new sessions can bind to that socket. - zmq_assert (socket); - - // We have no associated session. If the peer has no identity we'll - // create a transient session for the connection. Note that - // seqnum is incremented to account for attach command before the - // session is launched. That way we are sure it won't terminate before - // being attached. - if (peer_identity [0] == 0) { - session = new (std::nothrow) transient_session_t (io_thread, - socket, options); - alloc_assert (session); - session->inc_seqnum (); - launch_sibling (session); - send_attach (session, ephemeral_engine, peer_identity, false); - terminate (); - return; - } - - // Try to find the session corresponding to the peer's identity. - // If found, send the engine to that session and destroy this object. - // Note that session's seqnum is incremented by find_session rather - // than by send_attach. - session = socket->find_session (peer_identity); - if (session) { - send_attach (session, ephemeral_engine, peer_identity, false); - terminate (); - return; - } - - // There's no such named session. We have to create one. Note that - // seqnum is incremented to account for attach command before the - // session is launched. That way we are sure it won't terminate before - // being attached. - session = new (std::nothrow) named_session_t (io_thread, socket, - options, peer_identity); - alloc_assert (session); - session->inc_seqnum (); - launch_sibling (session); - send_attach (session, ephemeral_engine, peer_identity, false); - terminate (); - return; - } -} diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp deleted file mode 100644 index e220d2b..0000000 --- a/src/zmq_init.hpp +++ /dev/null @@ -1,98 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__ -#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__ - -#include - -#include "i_engine.hpp" -#include "stdint.hpp" -#include "blob.hpp" -#include "msg.hpp" -#include "own.hpp" -#include "fd.hpp" - -namespace zmq -{ - - // The class handles initialisation phase of 0MQ wire-level protocol. - - class zmq_init_t : - public own_t, - public i_engine_sink - { - public: - - zmq_init_t (class io_thread_t *io_thread_, class socket_base_t *socket_, - class session_t *session_, fd_t fd_, const options_t &options_); - ~zmq_init_t (); - - private: - - void finalise_initialisation (); - void dispatch_engine (); - - // i_engine_sink interface implementation. - bool read (class msg_t *msg_); - bool write (class msg_t *msg_); - void flush (); - void detach (); - - // Handlers for incoming commands. - void process_plug (); - void process_unplug (); - - // Associated wire-protocol engine. - i_engine *engine; - - // Detached transient engine. - i_engine *ephemeral_engine; - - // List of messages to send to the peer during the connection - // initiation phase. - typedef std::vector to_send_t; - to_send_t to_send; - - // True if peer's identity was already received. - bool received; - - // Socket the object belongs to. - class socket_base_t *socket; - - // Reference to the session the init object belongs to. - // If the associated session is unknown and should be found - // depending on peer identity this value is NULL. - class session_t *session; - - // Identity of the peer socket. - blob_t peer_identity; - - // I/O thread the object is living in. It will be used to plug - // the engine into the same I/O thread. - class io_thread_t *io_thread; - - zmq_init_t (const zmq_init_t&); - const zmq_init_t &operator = (const zmq_init_t&); - }; - -} - -#endif diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp index e3f3bd8..2d5e7bb 100644 --- a/src/zmq_listener.cpp +++ b/src/zmq_listener.cpp @@ -21,7 +21,8 @@ #include #include "zmq_listener.hpp" -#include "zmq_init.hpp" +#include "transient_session.hpp" +#include "zmq_engine.hpp" #include "io_thread.hpp" #include "err.hpp" @@ -63,16 +64,21 @@ void zmq::zmq_listener_t::in_event () // TODO: Handle specific errors like ENFILE/EMFILE etc. if (fd == retired_fd) return; + // Create the engine object for this connection. + zmq_engine_t *engine = new (std::nothrow) zmq_engine_t (fd, options); + alloc_assert (engine); // Choose I/O thread to run connecter in. Given that we are already // running in an I/O thread, there must be at least one available. io_thread_t *io_thread = choose_io_thread (options.affinity); zmq_assert (io_thread); - // Create and launch an init object. - zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, socket, - NULL, fd, options); - alloc_assert (init); - launch_child (init); + // Create and launch a session object. + transient_session_t *session = new (std::nothrow) + transient_session_t (io_thread, socket, options); + alloc_assert (session); + session->inc_seqnum (); + launch_child (session); + send_attach (session, engine, false); } -- cgit v1.2.3