summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--configure.in13
-rw-r--r--include/zmq.h1
-rw-r--r--src/Makefile.am7
-rw-r--r--src/command.cpp39
-rw-r--r--src/command.hpp7
-rw-r--r--src/connect_session.cpp48
-rw-r--r--src/connect_session.hpp9
-rw-r--r--src/named_session.cpp67
-rw-r--r--src/named_session.hpp56
-rw-r--r--src/object.cpp50
-rw-r--r--src/object.hpp12
-rw-r--r--src/options.cpp22
-rw-r--r--src/options.hpp3
-rw-r--r--src/pair.cpp2
-rw-r--r--src/pair.hpp2
-rw-r--r--src/pgm_socket.cpp21
-rw-r--r--src/pull.cpp2
-rw-r--r--src/pull.hpp2
-rw-r--r--src/push.cpp2
-rw-r--r--src/push.hpp2
-rw-r--r--src/random.cpp42
-rw-r--r--src/random.hpp9
-rw-r--r--src/req.cpp6
-rw-r--r--src/session.cpp26
-rw-r--r--src/session.hpp15
-rw-r--r--src/socket_base.cpp70
-rw-r--r--src/socket_base.hpp26
-rw-r--r--src/transient_session.cpp2
-rw-r--r--src/transient_session.hpp2
-rw-r--r--src/uuid.cpp90
-rw-r--r--src/uuid.hpp33
-rw-r--r--src/xpub.cpp2
-rw-r--r--src/xpub.hpp2
-rw-r--r--src/xrep.cpp8
-rw-r--r--src/xrep.hpp3
-rw-r--r--src/xreq.cpp2
-rw-r--r--src/xreq.hpp2
-rw-r--r--src/xsub.cpp2
-rw-r--r--src/xsub.hpp2
-rw-r--r--src/zmq_connecter.cpp15
-rw-r--r--src/zmq_init.cpp233
-rw-r--r--src/zmq_init.hpp98
-rw-r--r--src/zmq_listener.cpp18
43 files changed, 112 insertions, 963 deletions
diff --git a/configure.in b/configure.in
index c45e5a7..cd8f6a8 100644
--- a/configure.in
+++ b/configure.in
@@ -90,8 +90,6 @@ case "${host_os}" in
fi
AC_DEFINE(ZMQ_HAVE_LINUX, 1, [Have Linux OS])
AC_CHECK_LIB(rt, sem_init)
- AC_CHECK_LIB(uuid, uuid_generate, ,
- [AC_MSG_ERROR([cannot link with -luuid, install uuid-dev.])])
;;
*solaris*)
# Define on Solaris to enable all library features
@@ -100,8 +98,6 @@ case "${host_os}" in
AC_CHECK_LIB(socket, socket)
AC_CHECK_LIB(nsl, gethostbyname)
AC_CHECK_LIB(rt, sem_init)
- AC_CHECK_LIB(uuid, uuid_generate, ,
- [AC_MSG_ERROR([cannot link with -luuid, install uuid-dev.])])
AC_MSG_CHECKING([whether atomic operations can be used])
AC_COMPILE_IFELSE([AC_LANG_PROGRAM(
[[#include <atomic.h>]],
@@ -163,18 +159,15 @@ case "${host_os}" in
libzmq_pedantic="no"
AC_DEFINE(ZMQ_HAVE_QNXNTO, 1, [Have QNX Neutrino OS])
AC_CHECK_LIB(socket, socket)
- AC_CHECK_LIB(crypto, RAND_bytes)
;;
*aix*)
AC_DEFINE(ZMQ_HAVE_AIX, 1, [Have AIX OS])
- AC_CHECK_LIB(crypto,RAND_bytes)
;;
*hpux*)
# Define on HP-UX to enable all library features
CPPFLAGS="-D_POSIX_C_SOURCE=200112L $CPPFLAGS"
AC_DEFINE(ZMQ_HAVE_HPUX, 1, [Have HPUX OS])
AC_CHECK_LIB(rt, sem_init)
- AC_CHECK_LIB(crypto, RAND_bytes)
;;
*mingw32*)
AC_DEFINE(ZMQ_HAVE_WINDOWS, 1, [Have Windows OS])
@@ -201,12 +194,6 @@ case "${host_os}" in
# Define on Cygwin to enable all library features
CPPFLAGS="-D_GNU_SOURCE $CPPFLAGS"
AC_DEFINE(ZMQ_HAVE_CYGWIN, 1, [Have Cygwin])
- # Cygwin provides libuuid as part of the e2fsprogs package, and somewhat
- # uselessly installs the library in /usr/lib/e2fsprogs
- LDFLAGS="-L/usr/lib/e2fsprogs ${LDFLAGS}"
- AC_CHECK_LIB(uuid, uuid_generate, ,
- [AC_MSG_ERROR([cannot link with -luuid, install the e2fsprogs package.])])
-
if test "x$enable_static" = "xyes"; then
AC_MSG_ERROR([Building static libraries is not supported under Cygwin])
fi
diff --git a/include/zmq.h b/include/zmq.h
index 450bed9..2385040 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -161,7 +161,6 @@ ZMQ_EXPORT int zmq_term (void *context);
/* Socket options. */
#define ZMQ_AFFINITY 4
-#define ZMQ_IDENTITY 5
#define ZMQ_SUBSCRIBE 6
#define ZMQ_UNSUBSCRIBE 7
#define ZMQ_RATE 8
diff --git a/src/Makefile.am b/src/Makefile.am
index e78eb1c..5049ad8 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -35,7 +35,6 @@ libzmq_la_SOURCES = \
msg.hpp \
mtrie.hpp \
mutex.hpp \
- named_session.hpp \
object.hpp \
options.hpp \
own.hpp \
@@ -68,7 +67,6 @@ libzmq_la_SOURCES = \
thread.hpp \
transient_session.hpp \
trie.hpp \
- uuid.hpp \
windows.hpp \
wire.hpp \
xpub.hpp \
@@ -79,10 +77,8 @@ libzmq_la_SOURCES = \
yqueue.hpp \
zmq_connecter.hpp \
zmq_engine.hpp \
- zmq_init.hpp \
zmq_listener.hpp \
clock.cpp \
- command.cpp \
ctx.cpp \
connect_session.cpp \
decoder.cpp \
@@ -100,7 +96,6 @@ libzmq_la_SOURCES = \
mailbox.cpp \
msg.cpp \
mtrie.cpp \
- named_session.cpp \
object.cpp \
options.cpp \
own.cpp \
@@ -129,7 +124,6 @@ libzmq_la_SOURCES = \
thread.cpp \
transient_session.cpp \
trie.cpp \
- uuid.cpp \
xpub.cpp \
xrep.cpp \
xreq.cpp \
@@ -137,7 +131,6 @@ libzmq_la_SOURCES = \
zmq.cpp \
zmq_connecter.cpp \
zmq_engine.cpp \
- zmq_init.cpp \
zmq_listener.cpp \
zmq_utils.cpp
diff --git a/src/command.cpp b/src/command.cpp
deleted file mode 100644
index e3f3d59..0000000
--- a/src/command.cpp
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <stdlib.h>
-
-#include "command.hpp"
-
-void zmq::deallocate_command (command_t *cmd_)
-{
- switch (cmd_->type) {
- case command_t::attach:
- if (cmd_->args.attach.peer_identity)
- free (cmd_->args.attach.peer_identity);
- break;
- case command_t::bind:
- if (cmd_->args.bind.peer_identity)
- free (cmd_->args.bind.peer_identity);
- break;
- default:
- /* noop */;
- }
-}
diff --git a/src/command.hpp b/src/command.hpp
index 15cee0a..1513ca8 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -73,16 +73,12 @@ namespace zmq
// session that the connection have failed.
struct {
struct i_engine *engine;
- unsigned char peer_identity_size;
- unsigned char *peer_identity;
} attach;
// Sent from session to socket to establish pipe(s) between them.
// Caller have used inc_seqnum beforehand sending the command.
struct {
class pipe_t *pipe;
- unsigned char peer_identity_size;
- unsigned char *peer_identity;
} bind;
// Sent by pipe writer to inform dormant pipe reader that there
@@ -146,9 +142,6 @@ namespace zmq
} args;
};
- // Function to deallocate dynamically allocated components of the command.
- void deallocate_command (command_t *cmd_);
-
}
#endif
diff --git a/src/connect_session.cpp b/src/connect_session.cpp
index fe7332a..14666a6 100644
--- a/src/connect_session.cpp
+++ b/src/connect_session.cpp
@@ -29,15 +29,12 @@ zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_,
const char *protocol_, const char *address_) :
session_t (io_thread_, socket_, options_),
protocol (protocol_),
- address (address_),
- connected (false)
+ address (address_)
{
}
zmq::connect_session_t::~connect_session_t ()
{
- if (connected && !peer_identity.empty ())
- unregister_session (peer_identity);
}
void zmq::connect_session_t::process_plug ()
@@ -87,7 +84,7 @@ void zmq::connect_session_t::start_connecting (bool wait_)
int rc = pgm_sender->init (udp_encapsulation, address.c_str ());
zmq_assert (rc == 0);
- send_attach (this, pgm_sender, blob_t ());
+ send_attach (this, pgm_sender);
}
else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) {
@@ -99,7 +96,7 @@ void zmq::connect_session_t::start_connecting (bool wait_)
int rc = pgm_receiver->init (udp_encapsulation, address.c_str ());
zmq_assert (rc == 0);
- send_attach (this, pgm_receiver, blob_t ());
+ send_attach (this, pgm_receiver);
}
else
zmq_assert (false);
@@ -111,45 +108,8 @@ void zmq::connect_session_t::start_connecting (bool wait_)
zmq_assert (false);
}
-bool zmq::connect_session_t::xattached (const blob_t &peer_identity_)
+bool zmq::connect_session_t::xattached ()
{
- // If there was no previous connection...
- if (!connected) {
-
- // Peer has transient identity.
- if (peer_identity_.empty () || peer_identity_ [0] == 0) {
- connected = true;
- return true;
- }
-
- // Peer has strong identity. Let's register it and check whether noone
- // else is using the same identity.
- if (!register_session (peer_identity_, this)) {
- log ("DPID: duplicate peer identity - disconnecting peer");
- return false;
- }
- connected = true;
- peer_identity = peer_identity_;
- return true;
- }
-
- // New engine from listener can conflict with existing engine.
- // Alternatively, new engine created by reconnection process can
- // conflict with engine supplied by listener in the meantime.
- if (has_engine ()) {
- log ("DPID: duplicate peer identity - disconnecting peer");
- return false;
- }
-
- // If there have been a connection before, we have to check whether
- // peer's identity haven't changed in the meantime.
- if ((peer_identity_.empty () || peer_identity_ [0] == 0) &&
- peer_identity.empty ())
- return true;
- if (peer_identity != peer_identity_) {
- log ("CHID: peer have changed identity - disconnecting peer");
- return false;
- }
return true;
}
diff --git a/src/connect_session.hpp b/src/connect_session.hpp
index 3b8a26b..bc25d26 100644
--- a/src/connect_session.hpp
+++ b/src/connect_session.hpp
@@ -44,7 +44,7 @@ namespace zmq
private:
// Handlers for events from session base class.
- bool xattached (const blob_t &peer_identity_);
+ bool xattached ();
bool xdetached ();
// Start the connection process.
@@ -57,13 +57,6 @@ namespace zmq
std::string protocol;
std::string address;
- // If true, the session was already connected to the peer.
- bool connected;
-
- // Identity of the peer. If 'connected' is false, it has no meaning.
- // Otherwise, if it's empty, the peer has transient identity.
- blob_t peer_identity;
-
connect_session_t (const connect_session_t&);
const connect_session_t &operator = (const connect_session_t&);
};
diff --git a/src/named_session.cpp b/src/named_session.cpp
deleted file mode 100644
index 8e43fb0..0000000
--- a/src/named_session.cpp
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include "named_session.hpp"
-#include "socket_base.hpp"
-
-zmq::named_session_t::named_session_t (class io_thread_t *io_thread_,
- socket_base_t *socket_, const options_t &options_,
- const blob_t &peer_identity_) :
- session_t (io_thread_, socket_, options_),
- peer_identity (peer_identity_)
-{
- // Make double sure that the peer's identity is not transient.
- zmq_assert (!peer_identity.empty ());
- zmq_assert (peer_identity [0] != 0);
-
- bool ok = socket_->register_session (peer_identity, this);
-
- // If new session is being created, the caller should have already
- // checked that the session for specified identity doesn't exist yet.
- // Thus, register_session should not fail.
- zmq_assert (ok);
-}
-
-zmq::named_session_t::~named_session_t ()
-{
- // Unregister the session from the global list of named sessions.
- unregister_session (peer_identity);
-}
-
-bool zmq::named_session_t::xattached (const blob_t &peer_identity_)
-{
- // Double check that identities match.
- zmq_assert (peer_identity == peer_identity_);
-
- // If the session already has an engine attached, destroy new one.
- if (has_engine ()) {
- log ("DPID: duplicate peer identity - disconnecting peer");
- return false;
- }
- return true;
-}
-
-bool zmq::named_session_t::xdetached ()
-{
- // Do nothing. Named sessions are never destroyed because of disconnection.
- // Neither they have to actively reconnect.
- return true;
-}
-
diff --git a/src/named_session.hpp b/src/named_session.hpp
deleted file mode 100644
index 10af8cc..0000000
--- a/src/named_session.hpp
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_NAMED_SESSION_HPP_INCLUDED__
-#define __ZMQ_NAMED_SESSION_HPP_INCLUDED__
-
-#include "session.hpp"
-#include "blob.hpp"
-
-namespace zmq
-{
-
- // Named session is created by listener object when the peer identifies
- // itself by a strong name. Named session survives reconnections.
-
- class named_session_t : public session_t
- {
- public:
-
- named_session_t (class io_thread_t *io_thread_,
- class socket_base_t *socket_, const options_t &options_,
- const blob_t &peer_identity_);
- ~named_session_t ();
-
- // Handlers for events from session base class.
- bool xattached (const blob_t &peer_identity_);
- bool xdetached ();
-
- private:
-
- blob_t peer_identity;
-
- named_session_t (const named_session_t&);
- const named_session_t &operator = (const named_session_t&);
- };
-
-}
-
-#endif
diff --git a/src/object.cpp b/src/object.cpp
index ba77bf0..7f7d7f8 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -82,17 +82,12 @@ void zmq::object_t::process_command (command_t &cmd_)
break;
case command_t::attach:
- process_attach (cmd_.args.attach.engine,
- cmd_.args.attach.peer_identity ?
- blob_t (cmd_.args.attach.peer_identity,
- cmd_.args.attach.peer_identity_size) : blob_t ());
+ process_attach (cmd_.args.attach.engine);
process_seqnum ();
break;
case command_t::bind:
- process_bind (cmd_.args.bind.pipe, cmd_.args.bind.peer_identity ?
- blob_t (cmd_.args.bind.peer_identity,
- cmd_.args.bind.peer_identity_size) : blob_t ());
+ process_bind (cmd_.args.bind.pipe);
process_seqnum ();
break;
@@ -131,10 +126,6 @@ void zmq::object_t::process_command (command_t &cmd_)
default:
zmq_assert (false);
}
-
- // The assumption here is that each command is processed once only,
- // so deallocating it after processing is all right.
- deallocate_command (&cmd_);
}
int zmq::object_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
@@ -211,7 +202,7 @@ void zmq::object_t::send_own (own_t *destination_, own_t *object_)
}
void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
- const blob_t &peer_identity_, bool inc_seqnum_)
+ bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
@@ -223,25 +214,11 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
cmd.destination = destination_;
cmd.type = command_t::attach;
cmd.args.attach.engine = engine_;
- if (peer_identity_.empty ()) {
- cmd.args.attach.peer_identity_size = 0;
- cmd.args.attach.peer_identity = NULL;
- }
- else {
- zmq_assert (peer_identity_.size () <= 0xff);
- cmd.args.attach.peer_identity_size =
- (unsigned char) peer_identity_.size ();
- cmd.args.attach.peer_identity =
- (unsigned char*) malloc (peer_identity_.size ());
- alloc_assert (cmd.args.attach.peer_identity_size);
- memcpy (cmd.args.attach.peer_identity, peer_identity_.data (),
- peer_identity_.size ());
- }
send_command (cmd);
}
void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
- const blob_t &peer_identity_, bool inc_seqnum_)
+ bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
@@ -253,20 +230,6 @@ void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
cmd.destination = destination_;
cmd.type = command_t::bind;
cmd.args.bind.pipe = pipe_;
- if (peer_identity_.empty ()) {
- cmd.args.bind.peer_identity_size = 0;
- cmd.args.bind.peer_identity = NULL;
- }
- else {
- zmq_assert (peer_identity_.size () <= 0xff);
- cmd.args.bind.peer_identity_size =
- (unsigned char) peer_identity_.size ();
- cmd.args.bind.peer_identity =
- (unsigned char*) malloc (peer_identity_.size ());
- alloc_assert (cmd.args.bind.peer_identity_size);
- memcpy (cmd.args.bind.peer_identity, peer_identity_.data (),
- peer_identity_.size ());
- }
send_command (cmd);
}
@@ -413,13 +376,12 @@ void zmq::object_t::process_own (own_t *object_)
zmq_assert (false);
}
-void zmq::object_t::process_attach (i_engine *engine_,
- const blob_t &peer_identity_)
+void zmq::object_t::process_attach (i_engine *engine_)
{
zmq_assert (false);
}
-void zmq::object_t::process_bind (pipe_t *pipe_, const blob_t &peer_identity_)
+void zmq::object_t::process_bind (pipe_t *pipe_)
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index fbad0ea..e05b958 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -22,7 +22,6 @@
#define __ZMQ_OBJECT_HPP_INCLUDED__
#include "stdint.hpp"
-#include "blob.hpp"
namespace zmq
{
@@ -64,10 +63,9 @@ namespace zmq
void send_own (class own_t *destination_,
class own_t *object_);
void send_attach (class session_t *destination_,
- struct i_engine *engine_, const blob_t &peer_identity_,
- bool inc_seqnum_ = true);
+ struct i_engine *engine_, bool inc_seqnum_ = true);
void send_bind (class own_t *destination_, class pipe_t *pipe_,
- const blob_t &peer_identity_, bool inc_seqnum_ = true);
+ bool inc_seqnum_ = true);
void send_activate_read (class pipe_t *destination_);
void send_activate_write (class pipe_t *destination_,
uint64_t msgs_read_);
@@ -87,10 +85,8 @@ namespace zmq
virtual void process_stop ();
virtual void process_plug ();
virtual void process_own (class own_t *object_);
- virtual void process_attach (struct i_engine *engine_,
- const blob_t &peer_identity_);
- virtual void process_bind (class pipe_t *pipe_,
- const blob_t &peer_identity_);
+ virtual void process_attach (struct i_engine *engine_);
+ virtual void process_bind (class pipe_t *pipe_);
virtual void process_activate_read ();
virtual void process_activate_write (uint64_t msgs_read_);
virtual void process_hiccup (void *pipe_);
diff --git a/src/options.cpp b/src/options.cpp
index 63a1d91..45eb4aa 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -75,19 +75,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
affinity = *((uint64_t*) optval_);
return 0;
- case ZMQ_IDENTITY:
-
- // Empty identity is invalid as well as identity longer than
- // 255 bytes. Identity starting with binary zero is invalid
- // as these are used for auto-generated identities.
- if (optvallen_ < 1 || optvallen_ > 255 ||
- *((const unsigned char*) optval_) == 0) {
- errno = EINVAL;
- return -1;
- }
- identity.assign ((const unsigned char*) optval_, optvallen_);
- return 0;
-
case ZMQ_RATE:
if (optvallen_ != sizeof (int) || *((int*) optval_) <= 0) {
errno = EINVAL;
@@ -229,15 +216,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (uint64_t);
return 0;
- case ZMQ_IDENTITY:
- if (*optvallen_ < identity.size ()) {
- errno = EINVAL;
- return -1;
- }
- memcpy (optval_, identity.data (), identity.size ());
- *optvallen_ = identity.size ();
- return 0;
-
case ZMQ_RATE:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
diff --git a/src/options.hpp b/src/options.hpp
index f1982da..d8847f5 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -23,7 +23,6 @@
#include "stddef.h"
#include "stdint.hpp"
-#include "blob.hpp"
namespace zmq
{
@@ -39,8 +38,8 @@ namespace zmq
int sndhwm;
int rcvhwm;
+ // I/O thread affinity.
uint64_t affinity;
- blob_t identity;
// Maximum tranfer rate [kb/s]. Default 100kb/s.
int rate;
diff --git a/src/pair.cpp b/src/pair.cpp
index 30b56e6..12a1881 100644
--- a/src/pair.cpp
+++ b/src/pair.cpp
@@ -35,7 +35,7 @@ zmq::pair_t::~pair_t ()
zmq_assert (!pipe);
}
-void zmq::pair_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
+void zmq::pair_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (!pipe);
pipe = pipe_;
diff --git a/src/pair.hpp b/src/pair.hpp
index 1ddf50e..59300ae 100644
--- a/src/pair.hpp
+++ b/src/pair.hpp
@@ -35,7 +35,7 @@ namespace zmq
~pair_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
+ void xattach_pipe (class pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_);
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 90a265c..98aeeb9 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -38,7 +38,7 @@
#include "pgm_socket.hpp"
#include "config.hpp"
#include "err.hpp"
-#include "uuid.hpp"
+#include "random.hpp"
#include "stdint.hpp"
#ifndef MSG_ERRQUEUE
@@ -253,20 +253,13 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
addr.sa_port = port_number;
addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
- if (options.identity.size () > 0) {
-
- // Create gsi from identity.
- if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi,
- options.identity.data (), options.identity.size ()))
- goto err_abort;
- } else {
+ // Create random GSI.
+ uint32_t buf [2];
+ buf [0] = generate_random ();
+ buf [1] = generate_random ();
+ if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t*) buf, 8))
+ goto err_abort;
- // Generate GSI from UUID.
- unsigned char buf [16];
- generate_uuid (buf);
- if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, buf, 16))
- goto err_abort;
- }
// Bind a transport to the specified network devices.
struct pgm_interface_req_t if_req;
diff --git a/src/pull.cpp b/src/pull.cpp
index 5e48777..afde236 100644
--- a/src/pull.cpp
+++ b/src/pull.cpp
@@ -33,7 +33,7 @@ zmq::pull_t::~pull_t ()
{
}
-void zmq::pull_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
+void zmq::pull_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (pipe_);
fq.attach (pipe_);
diff --git a/src/pull.hpp b/src/pull.hpp
index cbcf05a..be82af9 100644
--- a/src/pull.hpp
+++ b/src/pull.hpp
@@ -38,7 +38,7 @@ namespace zmq
protected:
// Overloads of functions from socket_base_t.
- void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
+ void xattach_pipe (class pipe_t *pipe_);
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
void xread_activated (class pipe_t *pipe_);
diff --git a/src/push.cpp b/src/push.cpp
index 44ebc07..77cc9d8 100644
--- a/src/push.cpp
+++ b/src/push.cpp
@@ -33,7 +33,7 @@ zmq::push_t::~push_t ()
{
}
-void zmq::push_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
+void zmq::push_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (pipe_);
lb.attach (pipe_);
diff --git a/src/push.hpp b/src/push.hpp
index 5dabe14..222a62d 100644
--- a/src/push.hpp
+++ b/src/push.hpp
@@ -38,7 +38,7 @@ namespace zmq
protected:
// Overloads of functions from socket_base_t.
- void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
+ void xattach_pipe (class pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_);
bool xhas_out ();
void xwrite_activated (class pipe_t *pipe_);
diff --git a/src/random.cpp b/src/random.cpp
index 2a1d7d6..9f7768c 100644
--- a/src/random.cpp
+++ b/src/random.cpp
@@ -18,23 +18,35 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <stdlib.h>
+
+#include "platform.hpp"
+#if defined ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#else
+#include <unistd.h>
+#endif
+
#include "random.hpp"
#include "stdint.hpp"
-#include "uuid.hpp"
-#include "err.hpp"
+#include "clock.hpp"
+
+void zmq::seed_random ()
+{
+#if defined ZMQ_HAVE_WINDOWS
+ int pid = (int) GetCurrentProcessId ();
+#else
+ int pid = (int) getpid ();
+#endif
+ srand ((unsigned int) (clock_t::now_us () + pid));
+}
-// Here we can use different ways of generating random data, as avialable
-// on different platforms. At the moment, we'll assume the UUID is random
-// enough to use for that purpose.
-void zmq::generate_random (void *buf_, size_t size_)
+uint32_t zmq::generate_random ()
{
- // Collapsing an UUID into 4 bytes.
- zmq_assert (size_ == 4);
- uint32_t buff [4];
- generate_uuid ((void*) buff);
- uint32_t result = buff [0];
- result ^= buff [1];
- result ^= buff [2];
- result ^= buff [3];
- *((uint32_t*) buf_) = result;
+ // Compensate for the fact that rand() returns signed integer.
+ uint32_t low = (uint32_t) rand ();
+ uint32_t high = (uint32_t) rand ();
+ high <<= (sizeof (int) * 8 - 1);
+ return high | low;
}
+
diff --git a/src/random.hpp b/src/random.hpp
index 0b99bbd..d88b5ee 100644
--- a/src/random.hpp
+++ b/src/random.hpp
@@ -21,13 +21,16 @@
#ifndef __ZMQ_RANDOM_HPP_INCLUDED__
#define __ZMQ_RANDOM_HPP_INCLUDED__
-#include <stddef.h>
+#include "stdint.hpp"
namespace zmq
{
- // Generates truly random bytes (not pseudo-random).
- void generate_random (void *buf_, size_t size_);
+ // Seeds the random number generator.
+ void seed_random ();
+
+ // Generates random value.
+ uint32_t generate_random ();
}
diff --git a/src/req.cpp b/src/req.cpp
index e0e3321..7831672 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -21,7 +21,6 @@
#include "req.hpp"
#include "err.hpp"
#include "msg.hpp"
-#include "uuid.hpp"
#include "wire.hpp"
#include "random.hpp"
#include "likely.hpp"
@@ -30,12 +29,9 @@ zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) :
xreq_t (parent_, tid_),
receiving_reply (false),
message_begins (true),
- request_id (0)
+ request_id (generate_random ())
{
options.type = ZMQ_REQ;
-
- // Start the request ID sequence at an random point.
- generate_random (&request_id, sizeof (request_id));
}
zmq::req_t::~req_t ()
diff --git a/src/session.cpp b/src/session.cpp
index 1ca69cf..0dd0e34 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -159,8 +159,7 @@ void zmq::session_t::process_plug ()
{
}
-void zmq::session_t::process_attach (i_engine *engine_,
- const blob_t &peer_identity_)
+void zmq::session_t::process_attach (i_engine *engine_)
{
// If some other object (e.g. init) notifies us that the connection failed
// without creating an engine we need to start the reconnection process.
@@ -171,7 +170,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
}
// Trigger the notfication event about the attachment.
- if (!attached (peer_identity_)) {
+ if (!attached ()) {
delete engine_;
return;
}
@@ -193,7 +192,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
pipe = pipes [0];
// Ask socket to plug into the remote end of the pipe.
- send_bind (socket, pipes [1], peer_identity_);
+ send_bind (socket, pipes [1]);
}
// Plug in the engine.
@@ -272,24 +271,9 @@ void zmq::session_t::timer_event (int id_)
pipe->terminate (false);
}
-bool zmq::session_t::has_engine ()
+bool zmq::session_t::attached ()
{
- return engine != NULL;
-}
-
-bool zmq::session_t::register_session (const blob_t &name_, session_t *session_)
-{
- return socket->register_session (name_, session_);
-}
-
-void zmq::session_t::unregister_session (const blob_t &name_)
-{
- socket->unregister_session (name_);
-}
-
-bool zmq::session_t::attached (const blob_t &peer_identity_)
-{
- return xattached (peer_identity_);
+ return xattached ();
}
void zmq::session_t::detached ()
diff --git a/src/session.hpp b/src/session.hpp
index 4c17930..60aa7c5 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -24,7 +24,6 @@
#include "own.hpp"
#include "i_engine.hpp"
#include "io_object.hpp"
-#include "blob.hpp"
#include "pipe.hpp"
namespace zmq
@@ -64,27 +63,19 @@ namespace zmq
// the termination process when session is about to be detached from
// the peer. If it returns false, session will be terminated.
// To be overloaded by the derived session type.
- virtual bool xattached (const blob_t &peer_identity_) = 0;
+ virtual bool xattached () = 0;
virtual bool xdetached () = 0;
- // Returns true if there is an engine attached to the session.
- bool has_engine ();
-
- // Allows derives session types to (un)register session names.
- bool register_session (const blob_t &name_, class session_t *session_);
- void unregister_session (const blob_t &name_);
-
~session_t ();
private:
- bool attached (const blob_t &peer_identity_);
+ bool attached ();
void detached ();
// Handlers for incoming commands.
void process_plug ();
- void process_attach (struct i_engine *engine_,
- const blob_t &peer_identity_);
+ void process_attach (struct i_engine *engine_);
void process_term (int linger_);
// i_poll_events handlers.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 1bc80f1..bc58e8b 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -45,7 +45,6 @@
#include "ctx.hpp"
#include "platform.hpp"
#include "likely.hpp"
-#include "uuid.hpp"
#include "msg.hpp"
#include "pair.hpp"
@@ -128,11 +127,6 @@ zmq::socket_base_t::~socket_base_t ()
{
zmq_assert (destroyed);
- // Check whether there are no session leaks.
- sessions_sync.lock ();
- zmq_assert (sessions.empty ());
- sessions_sync.unlock ();
-
// Mark the socket as dead.
tag = 0xdeadbeef;
}
@@ -212,23 +206,14 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
return 0;
}
-void zmq::socket_base_t::attach_pipe (pipe_t *pipe_,
- const blob_t &peer_identity_)
+void zmq::socket_base_t::attach_pipe (pipe_t *pipe_)
{
// First, register the pipe so that we can terminate it later on.
pipe_->set_event_sink (this);
pipes.push_back (pipe_);
- // Then, pass the pipe to the specific socket type.
- // If the peer haven't specified it's identity, let's generate one.
- if (peer_identity_.size ()) {
- xattach_pipe (pipe_, peer_identity_);
- }
- else {
- blob_t identity (17, 0);
- generate_uuid ((unsigned char*) identity.data () + 1);
- xattach_pipe (pipe_, identity);
- }
+ // Let the derived socket type know about new pipe.
+ xattach_pipe (pipe_);
// If the socket is already being closed, ask any new pipes to terminate
// straight away.
@@ -423,12 +408,12 @@ int zmq::socket_base_t::connect (const char *addr_)
errno_assert (rc == 0);
// Attach local end of the pipe to this socket object.
- attach_pipe (pipes [0], peer.options.identity);
+ attach_pipe (pipes [0]);
// Attach remote end of the pipe to the peer socket. Note that peer's
// seqnum was incremented in find_endpoint function. We don't need it
// increased here.
- send_bind (peer.socket, pipes [1], options.identity, false);
+ send_bind (peer.socket, pipes [1], false);
return 0;
}
@@ -454,7 +439,7 @@ int zmq::socket_base_t::connect (const char *addr_)
errno_assert (rc == 0);
// Attach local end of the pipe to the socket object.
- attach_pipe (pipes [0], blob_t ());
+ attach_pipe (pipes [0]);
// Attach remote end of the pipe to the session object later on.
session->attach_pipe (pipes [1]);
@@ -654,44 +639,6 @@ bool zmq::socket_base_t::has_out ()
return xhas_out ();
}
-bool zmq::socket_base_t::register_session (const blob_t &name_,
- session_t *session_)
-{
- sessions_sync.lock ();
- bool registered = sessions.insert (
- sessions_t::value_type (name_, session_)).second;
- sessions_sync.unlock ();
- return registered;
-}
-
-void zmq::socket_base_t::unregister_session (const blob_t &name_)
-{
- sessions_sync.lock ();
- sessions_t::iterator it = sessions.find (name_);
- zmq_assert (it != sessions.end ());
- sessions.erase (it);
- sessions_sync.unlock ();
-}
-
-zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)
-{
- sessions_sync.lock ();
- sessions_t::iterator it = sessions.find (name_);
- if (it == sessions.end ()) {
- sessions_sync.unlock ();
- return NULL;
- }
- session_t *session = it->second;
-
- // Prepare the session for subsequent attach command.
- // Note the connect sessions have NULL pointers registered here.
- if (session)
- session->inc_seqnum ();
-
- sessions_sync.unlock ();
- return session;
-}
-
void zmq::socket_base_t::start_reaping (poller_t *poller_)
{
// Plug the socket to the reaper thread.
@@ -770,10 +717,9 @@ void zmq::socket_base_t::process_stop ()
ctx_terminated = true;
}
-void zmq::socket_base_t::process_bind (pipe_t *pipe_,
- const blob_t &peer_identity_)
+void zmq::socket_base_t::process_bind (pipe_t *pipe_)
{
- attach_pipe (pipe_, peer_identity_);
+ attach_pipe (pipe_);
}
void zmq::socket_base_t::process_unplug ()
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index f114e9d..fb60bbe 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -21,21 +21,17 @@
#ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
-#include <map>
-#include <vector>
+#include <string>
#include "own.hpp"
#include "array.hpp"
-#include "mutex.hpp"
#include "stdint.hpp"
#include "poller.hpp"
#include "atomic_counter.hpp"
#include "i_poll_events.hpp"
#include "mailbox.hpp"
#include "stdint.hpp"
-#include "blob.hpp"
#include "pipe.hpp"
-#include "own.hpp"
namespace zmq
{
@@ -78,11 +74,6 @@ namespace zmq
bool has_in ();
bool has_out ();
- // Registry of named sessions.
- bool register_session (const blob_t &name_, class session_t *session_);
- void unregister_session (const blob_t &name_);
- class session_t *find_session (const blob_t &name_);
-
// Using this function reaper thread ask the socket to regiter with
// its poller.
void start_reaping (poller_t *poller_);
@@ -106,8 +97,7 @@ namespace zmq
// Concrete algorithms for the x- methods are to be defined by
// individual socket types.
- virtual void xattach_pipe (class pipe_t *pipe_,
- const blob_t &peer_identity_) = 0;
+ virtual void xattach_pipe (class pipe_t *pipe_) = 0;
// The default implementation assumes there are no specific socket
// options for the particular socket type. If not so, overload this
@@ -158,7 +148,7 @@ namespace zmq
int check_protocol (const std::string &protocol_);
// Register the pipe with this socket.
- void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
+ void attach_pipe (class pipe_t *pipe_);
// Processes commands sent to this socket (if any). If timeout is -1,
// returns only after at least one command was processed.
@@ -168,7 +158,7 @@ namespace zmq
// Handlers for incoming commands.
void process_stop ();
- void process_bind (class pipe_t *pipe_, const blob_t &peer_identity_);
+ void process_bind (class pipe_t *pipe_);
void process_unplug ();
void process_term (int linger_);
@@ -195,14 +185,6 @@ namespace zmq
// True if the last message received had MORE flag set.
bool rcvmore;
- // Lists of existing sessions. This list is never referenced from
- // within the socket, instead it is used by objects owned by
- // the socket. As those objects can live in different threads,
- // the access is synchronised by mutex.
- typedef std::map <blob_t, session_t*> sessions_t;
- sessions_t sessions;
- mutex_t sessions_sync;
-
socket_base_t (const socket_base_t&);
const socket_base_t &operator = (const socket_base_t&);
};
diff --git a/src/transient_session.cpp b/src/transient_session.cpp
index b3c80b0..d389ff4 100644
--- a/src/transient_session.cpp
+++ b/src/transient_session.cpp
@@ -30,7 +30,7 @@ zmq::transient_session_t::~transient_session_t ()
{
}
-bool zmq::transient_session_t::xattached (const blob_t &peer_identity_)
+bool zmq::transient_session_t::xattached ()
{
// Transient session is always valid.
return true;
diff --git a/src/transient_session.hpp b/src/transient_session.hpp
index 55c2b8a..eff6b65 100644
--- a/src/transient_session.hpp
+++ b/src/transient_session.hpp
@@ -40,7 +40,7 @@ namespace zmq
private:
// Handlers for events from session base class.
- bool xattached (const blob_t &peer_identity_);
+ bool xattached ();
bool xdetached ();
transient_session_t (const transient_session_t&);
diff --git a/src/uuid.cpp b/src/uuid.cpp
deleted file mode 100644
index 02f716e..0000000
--- a/src/uuid.cpp
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <string.h>
-
-#include "uuid.hpp"
-#include "err.hpp"
-#include "stdint.hpp"
-#include "platform.hpp"
-
-#if defined ZMQ_HAVE_WINDOWS
-
-#include <rpc.h>
-
-void zmq::generate_uuid (void *buf_)
-{
- RPC_STATUS ret = UuidCreate ((::UUID*) buf_);
- zmq_assert (ret == RPC_S_OK);
-}
-
-#elif defined ZMQ_HAVE_FREEBSD || defined ZMQ_HAVE_NETBSD
-
-#include <uuid.h>
-
-void zmq::generate_uuid (void *buf_)
-{
- uint32_t status;
- uuid_create ((::uuid_t*) buf_, &status);
- zmq_assert (status == uuid_s_ok);
-}
-
-#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_SOLARIS ||\
- defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_CYGWIN
-
-#include <uuid/uuid.h>
-
-void zmq::generate_uuid (void *buf_)
-{
- uuid_generate ((unsigned char*) buf_);
-}
-
-#elif defined ZMQ_HAVE_OPENVMS
-
-#include <starlet.h>
-
-void zmq::generate_uuid (void *buf_)
-{
- sys$create_uid(buf_);
-}
-
-#else
-
-#include <openssl/rand.h>
-
-void zmq::generate_uuid (void *buf_)
-{
- unsigned char *buf = (unsigned char*) buf_;
-
- // Generate random value.
- int ret = RAND_bytes (buf, 16);
- zmq_assert (ret == 1);
-
- // Set UUID variant to 2 (UUID as specified in RFC4122).
- const unsigned char variant = 2;
- buf [8] = (buf [8] & 0x3f) | (variant << 6);
-
- // Set UUID version to 4 (randomly or pseudo-randomly generated UUID).
- const unsigned char version = 4;
- buf [6] = (buf [6] & 0x0f) | (version << 4);
-}
-
-#endif
-
diff --git a/src/uuid.hpp b/src/uuid.hpp
deleted file mode 100644
index c338208..0000000
--- a/src/uuid.hpp
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_UUID_HPP_INCLUDED__
-#define __ZMQ_UUID_HPP_INCLUDED__
-
-namespace zmq
-{
-
- // This function provides RFC 4122 (a Universally Unique IDentifier)
- // implementation. The resulting UUID will be 16 bytes long.
- void generate_uuid (void *buf_);
-
-}
-
-#endif
diff --git a/src/xpub.cpp b/src/xpub.cpp
index 7e4eecc..8da9cf9 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -36,7 +36,7 @@ zmq::xpub_t::~xpub_t ()
{
}
-void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
+void zmq::xpub_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (pipe_);
dist.attach (pipe_);
diff --git a/src/xpub.hpp b/src/xpub.hpp
index a2e7335..320a1fe 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -41,7 +41,7 @@ namespace zmq
~xpub_t ();
// Implementations of virtual functions from socket_base_t.
- void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
+ void xattach_pipe (class pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_);
bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_);
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 153be46..9b57435 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -29,7 +29,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
prefetched (false),
more_in (false),
current_out (NULL),
- more_out (false)
+ more_out (false),
+ next_peer_id (generate_random ())
{
options.type = ZMQ_XREP;
@@ -38,9 +39,6 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
options.delay_on_disconnect = false;
prefetched_msg.init ();
-
- // Start the peer ID sequence from a random point.
- generate_random (&next_peer_id, sizeof (next_peer_id));
}
zmq::xrep_t::~xrep_t ()
@@ -49,7 +47,7 @@ zmq::xrep_t::~xrep_t ()
prefetched_msg.close ();
}
-void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
+void zmq::xrep_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (pipe_);
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 41c06b8..07f10ba 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -41,8 +41,7 @@ namespace zmq
~xrep_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipe (class pipe_t *pipe_,
- const blob_t &peer_identity_);
+ void xattach_pipe (class pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_);
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 85e2238..7b66137 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -36,7 +36,7 @@ zmq::xreq_t::~xreq_t ()
{
}
-void zmq::xreq_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
+void zmq::xreq_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (pipe_);
fq.attach (pipe_);
diff --git a/src/xreq.hpp b/src/xreq.hpp
index 5bf1a03..a427ba9 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -40,7 +40,7 @@ namespace zmq
protected:
// Overloads of functions from socket_base_t.
- void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
+ void xattach_pipe (class pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_);
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
diff --git a/src/xsub.cpp b/src/xsub.cpp
index 98abc87..cf51cdb 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -39,7 +39,7 @@ zmq::xsub_t::~xsub_t ()
errno_assert (rc == 0);
}
-void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
+void zmq::xsub_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (pipe_);
fq.attach (pipe_);
diff --git a/src/xsub.hpp b/src/xsub.hpp
index 03b3178..ea59cdb 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -41,7 +41,7 @@ namespace zmq
protected:
// Overloads of functions from socket_base_t.
- void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
+ void xattach_pipe (class pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_);
bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_);
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index ca9bb77..0512c3c 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -30,7 +30,6 @@
#include "zmq_connecter.hpp"
#include "zmq_engine.hpp"
-#include "zmq_init.hpp"
#include "io_thread.hpp"
#include "err.hpp"
@@ -86,16 +85,12 @@ void zmq::zmq_connecter_t::out_event ()
return;
}
- // Choose I/O thread to run connecter in. Given that we are already
- // running in an I/O thread, there must be at least one available.
- io_thread_t *io_thread = choose_io_thread (options.affinity);
- zmq_assert (io_thread);
+ // Create the engine object for this connection.
+ zmq_engine_t *engine = new (std::nothrow) zmq_engine_t (fd, options);
+ alloc_assert (engine);
- // Create an init object.
- zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, NULL,
- session, fd, options);
- alloc_assert (init);
- launch_sibling (init);
+ // Attach the engine to the corresponding session object.
+ send_attach (session, engine);
// Shut the connecter down.
terminate ();
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
deleted file mode 100644
index 8352719..0000000
--- a/src/zmq_init.cpp
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <string.h>
-
-#include "zmq_init.hpp"
-#include "transient_session.hpp"
-#include "named_session.hpp"
-#include "socket_base.hpp"
-#include "zmq_engine.hpp"
-#include "io_thread.hpp"
-#include "session.hpp"
-#include "uuid.hpp"
-#include "blob.hpp"
-#include "wire.hpp"
-#include "err.hpp"
-
-zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
- socket_base_t *socket_, session_t *session_, fd_t fd_,
- const options_t &options_) :
- own_t (io_thread_, options_),
- ephemeral_engine (NULL),
- received (false),
- socket (socket_),
- session (session_),
- io_thread (io_thread_)
-{
- // Create the engine object for this connection.
- engine = new (std::nothrow) zmq_engine_t (fd_, options);
- alloc_assert (engine);
-
- // Generate an unique identity.
- peer_identity.resize (17);
- peer_identity [0] = 0;
- generate_uuid (&peer_identity [1]);
-
- // Create a list of messages to send on connection initialisation.
- if (!options.identity.empty ()) {
- msg_t msg;
- int rc = msg.init_size (options.identity.size ());
- errno_assert (rc == 0);
- memcpy (msg.data () , options.identity.data (), msg.size ());
- to_send.push_back (msg);
- }
- else {
- msg_t msg;
- int rc = msg.init ();
- errno_assert (rc == 0);
- to_send.push_back (msg);
- }
-}
-
-zmq::zmq_init_t::~zmq_init_t ()
-{
- if (engine)
- engine->terminate ();
-
- // If there are unsent props still queued deallocate them.
- for (to_send_t::iterator it = to_send.begin (); it != to_send.end ();
- ++it) {
- int rc = it->close ();
- errno_assert (rc == 0);
- }
- to_send.clear ();
-}
-
-bool zmq::zmq_init_t::read (msg_t *msg_)
-{
- // If the identity was already sent, do nothing.
- if (to_send.empty ())
- return false;
-
- // Pass next property to the engine.
- *msg_ = to_send.front ();
- to_send.erase (to_send.begin ());
-
- // Try finalize initialization.
- finalise_initialisation ();
-
- return true;
-}
-
-bool zmq::zmq_init_t::write (msg_t *msg_)
-{
- // If identity was already received, we are not interested
- // in subsequent messages.
- if (received)
- return false;
-
- // Retrieve the peer's identity, if any.
- zmq_assert (!(msg_->flags () & msg_t::more));
- size_t size = msg_->size ();
- if (size) {
- unsigned char *data = (unsigned char*) msg_->data ();
- peer_identity.assign (data, size);
- }
-
- received = true;
- finalise_initialisation ();
-
- return true;
-}
-
-void zmq::zmq_init_t::flush ()
-{
- // Check if there's anything to flush.
- if (!received)
- return;
-
- // Initialization is done, dispatch engine.
- if (ephemeral_engine)
- dispatch_engine ();
-}
-
-void zmq::zmq_init_t::detach ()
-{
- // This function is called by engine when disconnection occurs.
-
- // If there is an associated session, send it a null engine to let it know
- // that connection process was unsuccesful.
- if (session)
- send_attach (session, NULL, blob_t (), true);
-
- // The engine will destroy itself, so let's just drop the pointer here and
- // start termination of the init object.
- engine = NULL;
- terminate ();
-}
-
-void zmq::zmq_init_t::process_plug ()
-{
- zmq_assert (engine);
- engine->plug (io_thread, this);
-}
-
-void zmq::zmq_init_t::process_unplug ()
-{
- if (engine)
- engine->unplug ();
-}
-
-void zmq::zmq_init_t::finalise_initialisation ()
-{
- // Unplug and prepare to dispatch engine.
- if (to_send.empty () && received) {
- ephemeral_engine = engine;
- engine = NULL;
- ephemeral_engine->unplug ();
- return;
- }
-}
-
-void zmq::zmq_init_t::dispatch_engine ()
-{
- if (to_send.empty () && received) {
-
- // Engine must be detached.
- zmq_assert (!engine);
- zmq_assert (ephemeral_engine);
-
- // If we know what session we belong to, it's easy, just send the
- // engine to that session and destroy the init object. Note that we
- // know about the session only if this object is owned by it. Thus,
- // lifetime of this object in contained in the lifetime of the session
- // so the pointer cannot become invalid without notice.
- if (session) {
- send_attach (session, ephemeral_engine, peer_identity, true);
- terminate ();
- return;
- }
-
- // All the cases below are listener-based. Therefore we need the socket
- // reference so that new sessions can bind to that socket.
- zmq_assert (socket);
-
- // We have no associated session. If the peer has no identity we'll
- // create a transient session for the connection. Note that
- // seqnum is incremented to account for attach command before the
- // session is launched. That way we are sure it won't terminate before
- // being attached.
- if (peer_identity [0] == 0) {
- session = new (std::nothrow) transient_session_t (io_thread,
- socket, options);
- alloc_assert (session);
- session->inc_seqnum ();
- launch_sibling (session);
- send_attach (session, ephemeral_engine, peer_identity, false);
- terminate ();
- return;
- }
-
- // Try to find the session corresponding to the peer's identity.
- // If found, send the engine to that session and destroy this object.
- // Note that session's seqnum is incremented by find_session rather
- // than by send_attach.
- session = socket->find_session (peer_identity);
- if (session) {
- send_attach (session, ephemeral_engine, peer_identity, false);
- terminate ();
- return;
- }
-
- // There's no such named session. We have to create one. Note that
- // seqnum is incremented to account for attach command before the
- // session is launched. That way we are sure it won't terminate before
- // being attached.
- session = new (std::nothrow) named_session_t (io_thread, socket,
- options, peer_identity);
- alloc_assert (session);
- session->inc_seqnum ();
- launch_sibling (session);
- send_attach (session, ephemeral_engine, peer_identity, false);
- terminate ();
- return;
- }
-}
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
deleted file mode 100644
index e220d2b..0000000
--- a/src/zmq_init.hpp
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- Copyright (c) 2007-2011 iMatix Corporation
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the GNU Lesser General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__
-#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__
-
-#include <vector>
-
-#include "i_engine.hpp"
-#include "stdint.hpp"
-#include "blob.hpp"
-#include "msg.hpp"
-#include "own.hpp"
-#include "fd.hpp"
-
-namespace zmq
-{
-
- // The class handles initialisation phase of 0MQ wire-level protocol.
-
- class zmq_init_t :
- public own_t,
- public i_engine_sink
- {
- public:
-
- zmq_init_t (class io_thread_t *io_thread_, class socket_base_t *socket_,
- class session_t *session_, fd_t fd_, const options_t &options_);
- ~zmq_init_t ();
-
- private:
-
- void finalise_initialisation ();
- void dispatch_engine ();
-
- // i_engine_sink interface implementation.
- bool read (class msg_t *msg_);
- bool write (class msg_t *msg_);
- void flush ();
- void detach ();
-
- // Handlers for incoming commands.
- void process_plug ();
- void process_unplug ();
-
- // Associated wire-protocol engine.
- i_engine *engine;
-
- // Detached transient engine.
- i_engine *ephemeral_engine;
-
- // List of messages to send to the peer during the connection
- // initiation phase.
- typedef std::vector <msg_t> to_send_t;
- to_send_t to_send;
-
- // True if peer's identity was already received.
- bool received;
-
- // Socket the object belongs to.
- class socket_base_t *socket;
-
- // Reference to the session the init object belongs to.
- // If the associated session is unknown and should be found
- // depending on peer identity this value is NULL.
- class session_t *session;
-
- // Identity of the peer socket.
- blob_t peer_identity;
-
- // I/O thread the object is living in. It will be used to plug
- // the engine into the same I/O thread.
- class io_thread_t *io_thread;
-
- zmq_init_t (const zmq_init_t&);
- const zmq_init_t &operator = (const zmq_init_t&);
- };
-
-}
-
-#endif
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index e3f3bd8..2d5e7bb 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -21,7 +21,8 @@
#include <new>
#include "zmq_listener.hpp"
-#include "zmq_init.hpp"
+#include "transient_session.hpp"
+#include "zmq_engine.hpp"
#include "io_thread.hpp"
#include "err.hpp"
@@ -63,16 +64,21 @@ void zmq::zmq_listener_t::in_event ()
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd)
return;
+ // Create the engine object for this connection.
+ zmq_engine_t *engine = new (std::nothrow) zmq_engine_t (fd, options);
+ alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
- // Create and launch an init object.
- zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, socket,
- NULL, fd, options);
- alloc_assert (init);
- launch_child (init);
+ // Create and launch a session object.
+ transient_session_t *session = new (std::nothrow)
+ transient_session_t (io_thread, socket, options);
+ alloc_assert (session);
+ session->inc_seqnum ();
+ launch_child (session);
+ send_attach (session, engine, false);
}