diff options
43 files changed, 112 insertions, 963 deletions
diff --git a/configure.in b/configure.in index c45e5a7..cd8f6a8 100644 --- a/configure.in +++ b/configure.in @@ -90,8 +90,6 @@ case "${host_os}" in fi AC_DEFINE(ZMQ_HAVE_LINUX, 1, [Have Linux OS]) AC_CHECK_LIB(rt, sem_init) - AC_CHECK_LIB(uuid, uuid_generate, , - [AC_MSG_ERROR([cannot link with -luuid, install uuid-dev.])]) ;; *solaris*) # Define on Solaris to enable all library features @@ -100,8 +98,6 @@ case "${host_os}" in AC_CHECK_LIB(socket, socket) AC_CHECK_LIB(nsl, gethostbyname) AC_CHECK_LIB(rt, sem_init) - AC_CHECK_LIB(uuid, uuid_generate, , - [AC_MSG_ERROR([cannot link with -luuid, install uuid-dev.])]) AC_MSG_CHECKING([whether atomic operations can be used]) AC_COMPILE_IFELSE([AC_LANG_PROGRAM( [[#include <atomic.h>]], @@ -163,18 +159,15 @@ case "${host_os}" in libzmq_pedantic="no" AC_DEFINE(ZMQ_HAVE_QNXNTO, 1, [Have QNX Neutrino OS]) AC_CHECK_LIB(socket, socket) - AC_CHECK_LIB(crypto, RAND_bytes) ;; *aix*) AC_DEFINE(ZMQ_HAVE_AIX, 1, [Have AIX OS]) - AC_CHECK_LIB(crypto,RAND_bytes) ;; *hpux*) # Define on HP-UX to enable all library features CPPFLAGS="-D_POSIX_C_SOURCE=200112L $CPPFLAGS" AC_DEFINE(ZMQ_HAVE_HPUX, 1, [Have HPUX OS]) AC_CHECK_LIB(rt, sem_init) - AC_CHECK_LIB(crypto, RAND_bytes) ;; *mingw32*) AC_DEFINE(ZMQ_HAVE_WINDOWS, 1, [Have Windows OS]) @@ -201,12 +194,6 @@ case "${host_os}" in # Define on Cygwin to enable all library features CPPFLAGS="-D_GNU_SOURCE $CPPFLAGS" AC_DEFINE(ZMQ_HAVE_CYGWIN, 1, [Have Cygwin]) - # Cygwin provides libuuid as part of the e2fsprogs package, and somewhat - # uselessly installs the library in /usr/lib/e2fsprogs - LDFLAGS="-L/usr/lib/e2fsprogs ${LDFLAGS}" - AC_CHECK_LIB(uuid, uuid_generate, , - [AC_MSG_ERROR([cannot link with -luuid, install the e2fsprogs package.])]) - if test "x$enable_static" = "xyes"; then AC_MSG_ERROR([Building static libraries is not supported under Cygwin]) fi diff --git a/include/zmq.h b/include/zmq.h index 450bed9..2385040 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -161,7 +161,6 @@ ZMQ_EXPORT int zmq_term (void *context); /* Socket options. */ #define ZMQ_AFFINITY 4 -#define ZMQ_IDENTITY 5 #define ZMQ_SUBSCRIBE 6 #define ZMQ_UNSUBSCRIBE 7 #define ZMQ_RATE 8 diff --git a/src/Makefile.am b/src/Makefile.am index e78eb1c..5049ad8 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -35,7 +35,6 @@ libzmq_la_SOURCES = \ msg.hpp \ mtrie.hpp \ mutex.hpp \ - named_session.hpp \ object.hpp \ options.hpp \ own.hpp \ @@ -68,7 +67,6 @@ libzmq_la_SOURCES = \ thread.hpp \ transient_session.hpp \ trie.hpp \ - uuid.hpp \ windows.hpp \ wire.hpp \ xpub.hpp \ @@ -79,10 +77,8 @@ libzmq_la_SOURCES = \ yqueue.hpp \ zmq_connecter.hpp \ zmq_engine.hpp \ - zmq_init.hpp \ zmq_listener.hpp \ clock.cpp \ - command.cpp \ ctx.cpp \ connect_session.cpp \ decoder.cpp \ @@ -100,7 +96,6 @@ libzmq_la_SOURCES = \ mailbox.cpp \ msg.cpp \ mtrie.cpp \ - named_session.cpp \ object.cpp \ options.cpp \ own.cpp \ @@ -129,7 +124,6 @@ libzmq_la_SOURCES = \ thread.cpp \ transient_session.cpp \ trie.cpp \ - uuid.cpp \ xpub.cpp \ xrep.cpp \ xreq.cpp \ @@ -137,7 +131,6 @@ libzmq_la_SOURCES = \ zmq.cpp \ zmq_connecter.cpp \ zmq_engine.cpp \ - zmq_init.cpp \ zmq_listener.cpp \ zmq_utils.cpp diff --git a/src/command.cpp b/src/command.cpp deleted file mode 100644 index e3f3d59..0000000 --- a/src/command.cpp +++ /dev/null @@ -1,39 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <stdlib.h> - -#include "command.hpp" - -void zmq::deallocate_command (command_t *cmd_) -{ - switch (cmd_->type) { - case command_t::attach: - if (cmd_->args.attach.peer_identity) - free (cmd_->args.attach.peer_identity); - break; - case command_t::bind: - if (cmd_->args.bind.peer_identity) - free (cmd_->args.bind.peer_identity); - break; - default: - /* noop */; - } -} diff --git a/src/command.hpp b/src/command.hpp index 15cee0a..1513ca8 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -73,16 +73,12 @@ namespace zmq // session that the connection have failed. struct { struct i_engine *engine; - unsigned char peer_identity_size; - unsigned char *peer_identity; } attach; // Sent from session to socket to establish pipe(s) between them. // Caller have used inc_seqnum beforehand sending the command. struct { class pipe_t *pipe; - unsigned char peer_identity_size; - unsigned char *peer_identity; } bind; // Sent by pipe writer to inform dormant pipe reader that there @@ -146,9 +142,6 @@ namespace zmq } args; }; - // Function to deallocate dynamically allocated components of the command. - void deallocate_command (command_t *cmd_); - } #endif diff --git a/src/connect_session.cpp b/src/connect_session.cpp index fe7332a..14666a6 100644 --- a/src/connect_session.cpp +++ b/src/connect_session.cpp @@ -29,15 +29,12 @@ zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_, const char *protocol_, const char *address_) : session_t (io_thread_, socket_, options_), protocol (protocol_), - address (address_), - connected (false) + address (address_) { } zmq::connect_session_t::~connect_session_t () { - if (connected && !peer_identity.empty ()) - unregister_session (peer_identity); } void zmq::connect_session_t::process_plug () @@ -87,7 +84,7 @@ void zmq::connect_session_t::start_connecting (bool wait_) int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); zmq_assert (rc == 0); - send_attach (this, pgm_sender, blob_t ()); + send_attach (this, pgm_sender); } else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) { @@ -99,7 +96,7 @@ void zmq::connect_session_t::start_connecting (bool wait_) int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); zmq_assert (rc == 0); - send_attach (this, pgm_receiver, blob_t ()); + send_attach (this, pgm_receiver); } else zmq_assert (false); @@ -111,45 +108,8 @@ void zmq::connect_session_t::start_connecting (bool wait_) zmq_assert (false); } -bool zmq::connect_session_t::xattached (const blob_t &peer_identity_) +bool zmq::connect_session_t::xattached () { - // If there was no previous connection... - if (!connected) { - - // Peer has transient identity. - if (peer_identity_.empty () || peer_identity_ [0] == 0) { - connected = true; - return true; - } - - // Peer has strong identity. Let's register it and check whether noone - // else is using the same identity. - if (!register_session (peer_identity_, this)) { - log ("DPID: duplicate peer identity - disconnecting peer"); - return false; - } - connected = true; - peer_identity = peer_identity_; - return true; - } - - // New engine from listener can conflict with existing engine. - // Alternatively, new engine created by reconnection process can - // conflict with engine supplied by listener in the meantime. - if (has_engine ()) { - log ("DPID: duplicate peer identity - disconnecting peer"); - return false; - } - - // If there have been a connection before, we have to check whether - // peer's identity haven't changed in the meantime. - if ((peer_identity_.empty () || peer_identity_ [0] == 0) && - peer_identity.empty ()) - return true; - if (peer_identity != peer_identity_) { - log ("CHID: peer have changed identity - disconnecting peer"); - return false; - } return true; } diff --git a/src/connect_session.hpp b/src/connect_session.hpp index 3b8a26b..bc25d26 100644 --- a/src/connect_session.hpp +++ b/src/connect_session.hpp @@ -44,7 +44,7 @@ namespace zmq private: // Handlers for events from session base class. - bool xattached (const blob_t &peer_identity_); + bool xattached (); bool xdetached (); // Start the connection process. @@ -57,13 +57,6 @@ namespace zmq std::string protocol; std::string address; - // If true, the session was already connected to the peer. - bool connected; - - // Identity of the peer. If 'connected' is false, it has no meaning. - // Otherwise, if it's empty, the peer has transient identity. - blob_t peer_identity; - connect_session_t (const connect_session_t&); const connect_session_t &operator = (const connect_session_t&); }; diff --git a/src/named_session.cpp b/src/named_session.cpp deleted file mode 100644 index 8e43fb0..0000000 --- a/src/named_session.cpp +++ /dev/null @@ -1,67 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include "named_session.hpp" -#include "socket_base.hpp" - -zmq::named_session_t::named_session_t (class io_thread_t *io_thread_, - socket_base_t *socket_, const options_t &options_, - const blob_t &peer_identity_) : - session_t (io_thread_, socket_, options_), - peer_identity (peer_identity_) -{ - // Make double sure that the peer's identity is not transient. - zmq_assert (!peer_identity.empty ()); - zmq_assert (peer_identity [0] != 0); - - bool ok = socket_->register_session (peer_identity, this); - - // If new session is being created, the caller should have already - // checked that the session for specified identity doesn't exist yet. - // Thus, register_session should not fail. - zmq_assert (ok); -} - -zmq::named_session_t::~named_session_t () -{ - // Unregister the session from the global list of named sessions. - unregister_session (peer_identity); -} - -bool zmq::named_session_t::xattached (const blob_t &peer_identity_) -{ - // Double check that identities match. - zmq_assert (peer_identity == peer_identity_); - - // If the session already has an engine attached, destroy new one. - if (has_engine ()) { - log ("DPID: duplicate peer identity - disconnecting peer"); - return false; - } - return true; -} - -bool zmq::named_session_t::xdetached () -{ - // Do nothing. Named sessions are never destroyed because of disconnection. - // Neither they have to actively reconnect. - return true; -} - diff --git a/src/named_session.hpp b/src/named_session.hpp deleted file mode 100644 index 10af8cc..0000000 --- a/src/named_session.hpp +++ /dev/null @@ -1,56 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#ifndef __ZMQ_NAMED_SESSION_HPP_INCLUDED__ -#define __ZMQ_NAMED_SESSION_HPP_INCLUDED__ - -#include "session.hpp" -#include "blob.hpp" - -namespace zmq -{ - - // Named session is created by listener object when the peer identifies - // itself by a strong name. Named session survives reconnections. - - class named_session_t : public session_t - { - public: - - named_session_t (class io_thread_t *io_thread_, - class socket_base_t *socket_, const options_t &options_, - const blob_t &peer_identity_); - ~named_session_t (); - - // Handlers for events from session base class. - bool xattached (const blob_t &peer_identity_); - bool xdetached (); - - private: - - blob_t peer_identity; - - named_session_t (const named_session_t&); - const named_session_t &operator = (const named_session_t&); - }; - -} - -#endif diff --git a/src/object.cpp b/src/object.cpp index ba77bf0..7f7d7f8 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -82,17 +82,12 @@ void zmq::object_t::process_command (command_t &cmd_) break; case command_t::attach: - process_attach (cmd_.args.attach.engine, - cmd_.args.attach.peer_identity ? - blob_t (cmd_.args.attach.peer_identity, - cmd_.args.attach.peer_identity_size) : blob_t ()); + process_attach (cmd_.args.attach.engine); process_seqnum (); break; case command_t::bind: - process_bind (cmd_.args.bind.pipe, cmd_.args.bind.peer_identity ? - blob_t (cmd_.args.bind.peer_identity, - cmd_.args.bind.peer_identity_size) : blob_t ()); + process_bind (cmd_.args.bind.pipe); process_seqnum (); break; @@ -131,10 +126,6 @@ void zmq::object_t::process_command (command_t &cmd_) default: zmq_assert (false); } - - // The assumption here is that each command is processed once only, - // so deallocating it after processing is all right. - deallocate_command (&cmd_); } int zmq::object_t::register_endpoint (const char *addr_, endpoint_t &endpoint_) @@ -211,7 +202,7 @@ void zmq::object_t::send_own (own_t *destination_, own_t *object_) } void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, - const blob_t &peer_identity_, bool inc_seqnum_) + bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -223,25 +214,11 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, cmd.destination = destination_; cmd.type = command_t::attach; cmd.args.attach.engine = engine_; - if (peer_identity_.empty ()) { - cmd.args.attach.peer_identity_size = 0; - cmd.args.attach.peer_identity = NULL; - } - else { - zmq_assert (peer_identity_.size () <= 0xff); - cmd.args.attach.peer_identity_size = - (unsigned char) peer_identity_.size (); - cmd.args.attach.peer_identity = - (unsigned char*) malloc (peer_identity_.size ()); - alloc_assert (cmd.args.attach.peer_identity_size); - memcpy (cmd.args.attach.peer_identity, peer_identity_.data (), - peer_identity_.size ()); - } send_command (cmd); } void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_, - const blob_t &peer_identity_, bool inc_seqnum_) + bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); @@ -253,20 +230,6 @@ void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_, cmd.destination = destination_; cmd.type = command_t::bind; cmd.args.bind.pipe = pipe_; - if (peer_identity_.empty ()) { - cmd.args.bind.peer_identity_size = 0; - cmd.args.bind.peer_identity = NULL; - } - else { - zmq_assert (peer_identity_.size () <= 0xff); - cmd.args.bind.peer_identity_size = - (unsigned char) peer_identity_.size (); - cmd.args.bind.peer_identity = - (unsigned char*) malloc (peer_identity_.size ()); - alloc_assert (cmd.args.bind.peer_identity_size); - memcpy (cmd.args.bind.peer_identity, peer_identity_.data (), - peer_identity_.size ()); - } send_command (cmd); } @@ -413,13 +376,12 @@ void zmq::object_t::process_own (own_t *object_) zmq_assert (false); } -void zmq::object_t::process_attach (i_engine *engine_, - const blob_t &peer_identity_) +void zmq::object_t::process_attach (i_engine *engine_) { zmq_assert (false); } -void zmq::object_t::process_bind (pipe_t *pipe_, const blob_t &peer_identity_) +void zmq::object_t::process_bind (pipe_t *pipe_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index fbad0ea..e05b958 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -22,7 +22,6 @@ #define __ZMQ_OBJECT_HPP_INCLUDED__ #include "stdint.hpp" -#include "blob.hpp" namespace zmq { @@ -64,10 +63,9 @@ namespace zmq void send_own (class own_t *destination_, class own_t *object_); void send_attach (class session_t *destination_, - struct i_engine *engine_, const blob_t &peer_identity_, - bool inc_seqnum_ = true); + struct i_engine *engine_, bool inc_seqnum_ = true); void send_bind (class own_t *destination_, class pipe_t *pipe_, - const blob_t &peer_identity_, bool inc_seqnum_ = true); + bool inc_seqnum_ = true); void send_activate_read (class pipe_t *destination_); void send_activate_write (class pipe_t *destination_, uint64_t msgs_read_); @@ -87,10 +85,8 @@ namespace zmq virtual void process_stop (); virtual void process_plug (); virtual void process_own (class own_t *object_); - virtual void process_attach (struct i_engine *engine_, - const blob_t &peer_identity_); - virtual void process_bind (class pipe_t *pipe_, - const blob_t &peer_identity_); + virtual void process_attach (struct i_engine *engine_); + virtual void process_bind (class pipe_t *pipe_); virtual void process_activate_read (); virtual void process_activate_write (uint64_t msgs_read_); virtual void process_hiccup (void *pipe_); diff --git a/src/options.cpp b/src/options.cpp index 63a1d91..45eb4aa 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -75,19 +75,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, affinity = *((uint64_t*) optval_); return 0; - case ZMQ_IDENTITY: - - // Empty identity is invalid as well as identity longer than - // 255 bytes. Identity starting with binary zero is invalid - // as these are used for auto-generated identities. - if (optvallen_ < 1 || optvallen_ > 255 || - *((const unsigned char*) optval_) == 0) { - errno = EINVAL; - return -1; - } - identity.assign ((const unsigned char*) optval_, optvallen_); - return 0; - case ZMQ_RATE: if (optvallen_ != sizeof (int) || *((int*) optval_) <= 0) { errno = EINVAL; @@ -229,15 +216,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *optvallen_ = sizeof (uint64_t); return 0; - case ZMQ_IDENTITY: - if (*optvallen_ < identity.size ()) { - errno = EINVAL; - return -1; - } - memcpy (optval_, identity.data (), identity.size ()); - *optvallen_ = identity.size (); - return 0; - case ZMQ_RATE: if (*optvallen_ < sizeof (int)) { errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index f1982da..d8847f5 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -23,7 +23,6 @@ #include "stddef.h" #include "stdint.hpp" -#include "blob.hpp" namespace zmq { @@ -39,8 +38,8 @@ namespace zmq int sndhwm; int rcvhwm; + // I/O thread affinity. uint64_t affinity; - blob_t identity; // Maximum tranfer rate [kb/s]. Default 100kb/s. int rate; diff --git a/src/pair.cpp b/src/pair.cpp index 30b56e6..12a1881 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -35,7 +35,7 @@ zmq::pair_t::~pair_t () zmq_assert (!pipe); } -void zmq::pair_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) +void zmq::pair_t::xattach_pipe (pipe_t *pipe_) { zmq_assert (!pipe); pipe = pipe_; diff --git a/src/pair.hpp b/src/pair.hpp index 1ddf50e..59300ae 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -35,7 +35,7 @@ namespace zmq ~pair_t (); // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); + void xattach_pipe (class pipe_t *pipe_); int xsend (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_); bool xhas_in (); diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 90a265c..98aeeb9 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -38,7 +38,7 @@ #include "pgm_socket.hpp" #include "config.hpp" #include "err.hpp" -#include "uuid.hpp" +#include "random.hpp" #include "stdint.hpp" #ifndef MSG_ERRQUEUE @@ -253,20 +253,13 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) addr.sa_port = port_number; addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT; - if (options.identity.size () > 0) { - - // Create gsi from identity. - if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, - options.identity.data (), options.identity.size ())) - goto err_abort; - } else { + // Create random GSI. + uint32_t buf [2]; + buf [0] = generate_random (); + buf [1] = generate_random (); + if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t*) buf, 8)) + goto err_abort; - // Generate GSI from UUID. - unsigned char buf [16]; - generate_uuid (buf); - if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, buf, 16)) - goto err_abort; - } // Bind a transport to the specified network devices. struct pgm_interface_req_t if_req; diff --git a/src/pull.cpp b/src/pull.cpp index 5e48777..afde236 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -33,7 +33,7 @@ zmq::pull_t::~pull_t () { } -void zmq::pull_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_) +void zmq::pull_t::xattach_pipe (pipe_t *pipe_) { zmq_assert (pipe_); fq.attach (pipe_); diff --git a/src/pull.hpp b/src/pull.hpp index cbcf05a..be82af9 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -38,7 +38,7 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_); + void xattach_pipe (class pipe_t *pipe_); int xrecv (class msg_t *msg_, int flags_); |