summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/zmq_setsockopt.txt10
-rw-r--r--src/Makefile.am2
-rw-r--r--src/blob.hpp33
-rw-r--r--src/command.cpp38
-rw-r--r--src/command.hpp7
-rw-r--r--src/dispatcher.cpp4
-rw-r--r--src/downstream.cpp3
-rw-r--r--src/downstream.hpp3
-rw-r--r--src/i_endpoint.hpp4
-rw-r--r--src/i_engine.hpp12
-rw-r--r--src/object.cpp67
-rw-r--r--src/object.hpp11
-rw-r--r--src/options.cpp16
-rw-r--r--src/options.hpp17
-rw-r--r--src/p2p.cpp3
-rw-r--r--src/p2p.hpp3
-rw-r--r--src/pgm_receiver.cpp9
-rw-r--r--src/pgm_receiver.hpp3
-rw-r--r--src/pgm_sender.cpp9
-rw-r--r--src/pgm_sender.hpp3
-rw-r--r--src/pgm_socket.cpp7
-rw-r--r--src/pub.cpp3
-rw-r--r--src/pub.hpp3
-rw-r--r--src/rep.cpp8
-rw-r--r--src/rep.hpp3
-rw-r--r--src/req.cpp3
-rw-r--r--src/req.hpp3
-rw-r--r--src/session.cpp134
-rw-r--r--src/session.hpp29
-rw-r--r--src/socket_base.cpp87
-rw-r--r--src/socket_base.hpp19
-rw-r--r--src/sub.cpp3
-rw-r--r--src/sub.hpp3
-rw-r--r--src/upstream.cpp3
-rw-r--r--src/upstream.hpp3
-rw-r--r--src/uuid.hpp6
-rw-r--r--src/xrep.cpp60
-rw-r--r--src/xrep.hpp10
-rw-r--r--src/xreq.cpp3
-rw-r--r--src/xreq.hpp3
-rw-r--r--src/zmq_decoder.cpp62
-rw-r--r--src/zmq_decoder.hpp6
-rw-r--r--src/zmq_encoder.cpp14
-rw-r--r--src/zmq_engine.cpp11
-rw-r--r--src/zmq_engine.hpp3
-rw-r--r--src/zmq_init.cpp53
-rw-r--r--src/zmq_init.hpp5
47 files changed, 534 insertions, 270 deletions
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index 549a2de..629bffc 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -60,9 +60,11 @@ If the socket has no identity, each run of the application is completely
separated from other runs. However, with identity application reconnects to
existing infrastructure left by the previous run. Thus it may receive
messages that were sent in the meantime, it shares pipe limits with the
-previous run etc.
+previous run etc. Identity should be at least one byte and at most 255 bytes
+long. Identities starting with binary zero are reserver for use by 0MQ
+infrastructure.
+
-Type: string Unit: N/A Default: NULL
+Type: BLOB Unit: N/A Default: NULL
*ZMQ_SUBSCRIBE*::
Applicable only to ZMQ_SUB socket type. It establishes new message filter.
@@ -72,7 +74,7 @@ beginning with specific prefix (e.g. "animals.mammals.dogs."). Multiple
filters can be attached to a single 'sub' socket. In that case message passes
if it matches at least one of the filters.
+
-Type: string Unit: N/A Default: N/A
+Type: BLOB Unit: N/A Default: N/A
*ZMQ_UNSUBSCRIBE*::
Applicable only to ZMQ_SUB socket type. Removes existing message filter.
@@ -81,7 +83,7 @@ exactly. If there were several instances of the same filter created,
this options removes only one of them, leaving the rest in place
and functional.
+
-Type: string Unit: N/A Default: N/A
+Type: BLOB Unit: N/A Default: N/A
*ZMQ_RATE*::
This option applies only to sending side of multicast transports (pgm & udp).
diff --git a/src/Makefile.am b/src/Makefile.am
index 446b1e2..4146f68 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -63,6 +63,7 @@ libzmq_la_SOURCES = app_thread.hpp \
atomic_bitmap.hpp \
atomic_counter.hpp \
atomic_ptr.hpp \
+ blob.hpp \
command.hpp \
config.hpp \
decoder.hpp \
@@ -131,6 +132,7 @@ libzmq_la_SOURCES = app_thread.hpp \
zmq_init.hpp \
zmq_listener.hpp \
app_thread.cpp \
+ command.cpp \
devpoll.cpp \
dispatcher.cpp \
downstream.cpp \
diff --git a/src/blob.hpp b/src/blob.hpp
new file mode 100644
index 0000000..a4fa8cd
--- /dev/null
+++ b/src/blob.hpp
@@ -0,0 +1,33 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_BLOB_HPP_INCLUDED__
+#define __ZMQ_BLOB_HPP_INCLUDED__
+
+#include <string>
+
+namespace zmq
+{
+
+ // Object to hold dynamically allocated opaque binary data.
+ typedef std::basic_string <unsigned char> blob_t;
+
+}
+
+#endif
diff --git a/src/command.cpp b/src/command.cpp
new file mode 100644
index 0000000..8bf7ea2
--- /dev/null
+++ b/src/command.cpp
@@ -0,0 +1,38 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <stdlib.h>
+
+#include "command.hpp"
+
+void zmq::deallocate_command (command_t *cmd_)
+{
+ switch (cmd_->type) {
+ case command_t::attach:
+ if (cmd_->args.attach.peer_identity)
+ free (cmd_->args.attach.peer_identity);
+ break;
+ case command_t::bind:
+ if (cmd_->args.bind.peer_identity)
+ free (cmd_->args.bind.peer_identity);
+ break;
+ default:
+ /* noop */;
+ }
+}
diff --git a/src/command.hpp b/src/command.hpp
index 469d6ec..150cad1 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -66,6 +66,8 @@ namespace zmq
// Attach the engine to the session.
struct {
struct i_engine *engine;
+ unsigned char peer_identity_size;
+ unsigned char *peer_identity;
} attach;
// Sent from session to socket to establish pipe(s) between them.
@@ -73,6 +75,8 @@ namespace zmq
struct {
class reader_t *in_pipe;
class writer_t *out_pipe;
+ unsigned char peer_identity_size;
+ unsigned char *peer_identity;
} bind;
// Sent by pipe writer to inform dormant pipe reader that there
@@ -107,6 +111,9 @@ namespace zmq
} args;
};
+ // Function to deallocate dynamically allocated components of the command.
+ void deallocate_command (command_t *cmd_);
+
}
#endif
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 8aafcf8..4233278 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -117,6 +117,10 @@ zmq::dispatcher_t::~dispatcher_t ()
while (!pipes.empty ())
delete *pipes.begin ();
+ // TODO: Deallocate any commands still in the pipes. Keep in mind that
+ // simple reading from a pipe and deallocating commands won't do as
+ // command pipe has template parameter D set to true, meaning that
+ // read may return false even if there are still commands in the pipe.
delete [] command_pipes;
#ifdef ZMQ_HAVE_WINDOWS
diff --git a/src/downstream.cpp b/src/downstream.cpp
index 29b0689..3431264 100644
--- a/src/downstream.cpp
+++ b/src/downstream.cpp
@@ -26,7 +26,6 @@
zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_DOWNSTREAM;
options.requires_in = false;
options.requires_out = true;
}
@@ -36,7 +35,7 @@ zmq::downstream_t::~downstream_t ()
}
void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_ && outpipe_);
lb.attach (outpipe_);
diff --git a/src/downstream.hpp b/src/downstream.hpp
index 35dec95..dbd79a5 100644
--- a/src/downstream.hpp
+++ b/src/downstream.hpp
@@ -34,7 +34,8 @@ namespace zmq
~downstream_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp
index d60b39e..ddab6a4 100644
--- a/src/i_endpoint.hpp
+++ b/src/i_endpoint.hpp
@@ -20,6 +20,8 @@
#ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__
#define __ZMQ_I_ENDPOINT_HPP_INCLUDED__
+#include "blob.hpp"
+
namespace zmq
{
@@ -28,7 +30,7 @@ namespace zmq
virtual ~i_endpoint () {}
virtual void attach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_) = 0;
+ class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
virtual void detach_inpipe (class reader_t *pipe_) = 0;
virtual void detach_outpipe (class writer_t *pipe_) = 0;
virtual void kill (class reader_t *pipe_) = 0;
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index bcb4297..81b56df 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -22,6 +22,8 @@
#include <stddef.h>
+#include "blob.hpp"
+
namespace zmq
{
@@ -39,11 +41,11 @@ namespace zmq
// are messages to send available.
virtual void revive () = 0;
- // Start tracing the message route. Engine should add the identity
- // supplied to all inbound messages and trim identity from all the
- // outbound messages.
- virtual void traceroute (unsigned char *identity_,
- size_t identity_size_) = 0;
+ // Engine should add the prefix supplied to all inbound messages.
+ virtual void add_prefix (const blob_t &identity_) = 0;
+
+ // Engine should trim prefix from all the outbound messages.
+ virtual void trim_prefix () = 0;
};
}
diff --git a/src/object.cpp b/src/object.cpp
index a977f39..356fcd1 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include "object.hpp"
#include "dispatcher.hpp"
#include "err.hpp"
@@ -77,17 +79,21 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::own:
process_own (cmd_.args.own.object);
- return;
+ break;
case command_t::attach:
- process_attach (cmd_.args.attach.engine);
+ process_attach (cmd_.args.attach.engine,
+ blob_t (cmd_.args.attach.peer_identity,
+ cmd_.args.attach.peer_identity_size));
process_seqnum ();
- return;
+ break;
case command_t::bind:
- process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);
+ process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe,
+ blob_t (cmd_.args.bind.peer_identity,
+ cmd_.args.bind.peer_identity_size));
process_seqnum ();
- return;
+ break;
case command_t::pipe_term:
process_pipe_term ();
@@ -95,23 +101,27 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::pipe_term_ack:
process_pipe_term_ack ();
- return;
+ break;
case command_t::term_req:
process_term_req (cmd_.args.term_req.object);
- return;
+ break;
case command_t::term:
process_term ();
- return;
+ break;
case command_t::term_ack:
process_term_ack ();
- return;
+ break;
default:
zmq_assert (false);
}
+
+ // The assumption here is that each command is processed once only,
+ // so deallocating it after processing is all right.
+ deallocate_command (&cmd_);
}
void zmq::object_t::register_pipe (class pipe_t *pipe_)
@@ -176,7 +186,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
}
void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
- bool inc_seqnum_)
+ const blob_t &peer_identity_, bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
@@ -185,11 +195,26 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
cmd.destination = destination_;
cmd.type = command_t::attach;
cmd.args.attach.engine = engine_;
+ if (peer_identity_.empty ()) {
+ cmd.args.attach.peer_identity_size = 0;
+ cmd.args.attach.peer_identity = NULL;
+ }
+ else {
+ zmq_assert (peer_identity_.size () <= 0xff);
+ cmd.args.attach.peer_identity_size =
+ (unsigned char) peer_identity_.size ();
+ cmd.args.attach.peer_identity =
+ (unsigned char*) malloc (peer_identity_.size ());
+ zmq_assert (cmd.args.attach.peer_identity_size);
+ memcpy (cmd.args.attach.peer_identity, peer_identity_.data (),
+ peer_identity_.size ());
+ }
send_command (cmd);
}
void zmq::object_t::send_bind (socket_base_t *destination_,
- reader_t *in_pipe_, writer_t *out_pipe_, bool inc_seqnum_)
+ reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_,
+ bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
@@ -199,6 +224,20 @@ void zmq::object_t::send_bind (socket_base_t *destination_,
cmd.type = command_t::bind;
cmd.args.bind.in_pipe = in_pipe_;
cmd.args.bind.out_pipe = out_pipe_;
+ if (peer_identity_.empty ()) {
+ cmd.args.bind.peer_identity_size = 0;
+ cmd.args.bind.peer_identity = NULL;
+ }
+ else {
+ zmq_assert (peer_identity_.size () <= 0xff);
+ cmd.args.bind.peer_identity_size =
+ (unsigned char) peer_identity_.size ();
+ cmd.args.bind.peer_identity =
+ (unsigned char*) malloc (peer_identity_.size ());
+ zmq_assert (cmd.args.bind.peer_identity_size);
+ memcpy (cmd.args.bind.peer_identity, peer_identity_.data (),
+ peer_identity_.size ());
+ }
send_command (cmd);
}
@@ -267,12 +306,14 @@ void zmq::object_t::process_own (owned_t *object_)
zmq_assert (false);
}
-void zmq::object_t::process_attach (i_engine *engine_)
+void zmq::object_t::process_attach (i_engine *engine_,
+ const blob_t &peer_identity_)
{
zmq_assert (false);
}
-void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
+ const blob_t &peer_identity_)
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index e6b2379..1544109 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -21,6 +21,7 @@
#define __ZMQ_OBJECT_HPP_INCLUDED__
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -64,10 +65,11 @@ namespace zmq
void send_own (class socket_base_t *destination_,
class owned_t *object_);
void send_attach (class session_t *destination_,
- struct i_engine *engine_, bool inc_seqnum_ = true);
+ struct i_engine *engine_, const blob_t &peer_identity_,
+ bool inc_seqnum_ = true);
void send_bind (class socket_base_t *destination_,
class reader_t *in_pipe_, class writer_t *out_pipe_,
- bool inc_seqnum_ = true);
+ const blob_t &peer_identity_, bool inc_seqnum_ = true);
void send_revive (class object_t *destination_);
void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_);
@@ -81,9 +83,10 @@ namespace zmq
virtual void process_stop ();
virtual void process_plug ();
virtual void process_own (class owned_t *object_);
- virtual void process_attach (struct i_engine *engine_);
+ virtual void process_attach (struct i_engine *engine_,
+ const blob_t &peer_identity_);
virtual void process_bind (class reader_t *in_pipe_,
- class writer_t *out_pipe_);
+ class writer_t *out_pipe_, const blob_t &peer_identity_);
virtual void process_revive ();
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
diff --git a/src/options.cpp b/src/options.cpp
index f9d93d6..f78d8de 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -23,7 +23,6 @@
#include "err.hpp"
zmq::options_t::options_t () :
- type (-1),
hwm (0),
lwm (0),
swap (0),
@@ -34,7 +33,9 @@ zmq::options_t::options_t () :
sndbuf (0),
rcvbuf (0),
requires_in (false),
- requires_out (false)
+ requires_out (false),
+ immediate_connect (true),
+ traceroute (false)
{
}
@@ -76,7 +77,16 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return 0;
case ZMQ_IDENTITY:
- identity.assign ((const char*) optval_, optvallen_);
+
+ // Empty identity is invalid as well as identity longer than
+ // 255 bytes. Identity starting with binary zero is invalid
+ // as these are used for auto-generated identities.
+ if (optvallen_ < 1 || optvallen_ > 255 ||
+ *((const unsigned char*) optval_) == 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ identity.assign ((const unsigned char*) optval_, optvallen_);
return 0;
case ZMQ_RATE:
diff --git a/src/options.hpp b/src/options.hpp
index dbe3701..6d9be4d 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -20,10 +20,9 @@
#ifndef __ZMQ_OPTIONS_HPP_INCLUDED__
#define __ZMQ_OPTIONS_HPP_INCLUDED__
-#include <string>
-
#include "stddef.h"
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -34,14 +33,11 @@ namespace zmq
int setsockopt (int option_, const void *optval_, size_t optvallen_);
- // Type of the associated socket. One of the constants defined in zmq.h
- int type;
-
int64_t hwm;
int64_t lwm;
int64_t swap;
uint64_t affinity;
- std::string identity;
+ blob_t identity;
// Maximum tranfer rate [kb/s]. Default 100kb/s.
uint32_t rate;
@@ -59,6 +55,15 @@ namespace zmq
// provided by the specific socket type.
bool requires_in;
bool requires_out;
+
+ // If true, when connecting, pipes are created immediately without
+ // waiting for the connection to be established. That way the socket
+ // is not aware of the peer's identity, however, it is able to send
+ // messages straight away.
+ bool immediate_connect;
+
+ // If true, socket requires tracerouting the messages.
+ bool traceroute;
};
}
diff --git a/src/p2p.cpp b/src/p2p.cpp
index 46bbd0b..ca7a8f5 100644
--- a/src/p2p.cpp
+++ b/src/p2p.cpp
@@ -29,7 +29,6 @@ zmq::p2p_t::p2p_t (class app_thread_t *parent_) :
outpipe (NULL),
alive (true)
{
- options.type = ZMQ_P2P;
options.requires_in = true;
options.requires_out = true;
}
@@ -43,7 +42,7 @@ zmq::p2p_t::~p2p_t ()
}
void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe && !outpipe);
inpipe = inpipe_;
diff --git a/src/p2p.hpp b/src/p2p.hpp
index 2ff1047..bca0eab 100644
--- a/src/p2p.hpp
+++ b/src/p2p.hpp
@@ -33,7 +33,8 @@ namespace zmq
~p2p_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index a2ba9c6..e708229 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -88,8 +88,13 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
-void zmq::pgm_receiver_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_)
+{
+ // No need for tracerouting functionality in PGM socket at the moment.
+ zmq_assert (false);
+}
+
+void zmq::pgm_receiver_t::trim_prefix ()
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index f03551f..3f0ef81 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -54,7 +54,8 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void add_prefix (const blob_t &identity_);
+ void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index fa7d7e0..27b4d0c 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -102,8 +102,13 @@ void zmq::pgm_sender_t::revive ()
out_event ();
}
-void zmq::pgm_sender_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::pgm_sender_t::add_prefix (const blob_t &identity_)
+{
+ // No need for tracerouting functionality in PGM socket at the moment.
+ zmq_assert (false);
+}
+
+void zmq::pgm_sender_t::trim_prefix ()
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 89357f5..951c417 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -52,7 +52,8 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void add_prefix (const blob_t &identity_);
+ void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 1eeb34f..462a3a9 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -89,8 +89,11 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if (options.identity.size () > 0) {
- // Create gsi from identity string.
- gsi_base = options.identity;
+ // Create gsi from identity.
+ // TODO: We assume that identity is standard C string here.
+ // What if it contains binary zeroes?
+ gsi_base.assign ((const char*) options.identity.data (),
+ options.identity.size ());
} else {
// Generate random gsi.
diff --git a/src/pub.cpp b/src/pub.cpp
index 9a2dcc6..5b9d48c 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -27,7 +27,6 @@
zmq::pub_t::pub_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_PUB;
options.requires_in = false;
options.requires_out = true;
}
@@ -40,7 +39,7 @@ zmq::pub_t::~pub_t ()
}
void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_);
out_pipes.push_back (outpipe_);
diff --git a/src/pub.hpp b/src/pub.hpp
index 5b2f348..26142a4 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -34,7 +34,8 @@ namespace zmq
~pub_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/rep.cpp b/src/rep.cpp
index 1649e83..755d78e 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -30,9 +30,13 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) :
waiting_for_reply (false),
reply_pipe (NULL)
{
- options.type = ZMQ_REP;
options.requires_in = true;
options.requires_out = true;
+
+ // We don't need immediate connect. We'll be able to send messages
+ // (replies) only when connection is established and thus requests
+ // can arrive anyway.
+ options.immediate_connect = false;
}
zmq::rep_t::~rep_t ()
@@ -40,7 +44,7 @@ zmq::rep_t::~rep_t ()
}
void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
diff --git a/src/rep.hpp b/src/rep.hpp
index 7170da7..7ead321 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -34,7 +34,8 @@ namespace zmq
~rep_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/req.cpp b/src/req.cpp
index 9b1766e..735f0aa 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -30,7 +30,6 @@ zmq::req_t::req_t (class app_thread_t *parent_) :
reply_pipe_active (false),
reply_pipe (NULL)
{
- options.type = ZMQ_REQ;
options.requires_in = true;
options.requires_out = true;
}
@@ -40,7 +39,7 @@ zmq::req_t::~req_t ()
}
void zmq::req_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
diff --git a/src/req.hpp b/src/req.hpp
index 60ee5e7..da8e61a 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -34,7 +34,8 @@ namespace zmq
~req_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/session.cpp b/src/session.cpp
index 1aece4d..05f319c 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -32,9 +32,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
out_pipe (NULL),
engine (NULL),
options (options_)
-{
- type = unnamed;
-
+{
// It's possible to register the session at this point as it will be
// searched for only on reconnect, i.e. no race condition (session found
// before it is plugged into it's I/O thread) is possible.
@@ -42,23 +40,24 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
}
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
- const options_t &options_, const char *name_) :
+ const options_t &options_, const blob_t &peer_identity_) :
owned_t (parent_, owner_),
in_pipe (NULL),
active (true),
out_pipe (NULL),
engine (NULL),
+ ordinal (0),
+ peer_identity (peer_identity_),
options (options_)
{
- if (name_) {
- type = named;
- name = name_;
- ordinal = 0;
- }
- else {
- type = transient;
- // TODO: Generate unique name here.
- ordinal = 0;
+ if (!peer_identity.empty () && peer_identity [0] != 0) {
+ if (!owner->register_session (peer_identity, this)) {
+
+ // TODO: There's already a session with the specified
+ // identity. We should presumably syslog it and drop the
+ // session.
+ zmq_assert (false);
+ }
}
}
@@ -104,7 +103,7 @@ void zmq::session_t::detach (owned_t *reconnecter_)
engine = NULL;
// Terminate transient session.
- if (type == transient)
+ if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0))
term ();
}
@@ -120,13 +119,12 @@ class zmq::socket_base_t *zmq::session_t::get_owner ()
uint64_t zmq::session_t::get_ordinal ()
{
- zmq_assert (type == unnamed);
zmq_assert (ordinal);
return ordinal;
}
void zmq::session_t::attach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
if (inpipe_) {
zmq_assert (!in_pipe);
@@ -168,52 +166,15 @@ void zmq::session_t::revive (reader_t *pipe_)
void zmq::session_t::process_plug ()
{
- // Register the session with the socket.
- if (!name.empty ()) {
- bool ok = owner->register_session (name.c_str (), this);
-
- // There's already a session with the specified identity.
- // We should syslog it and drop the session. TODO
- zmq_assert (ok);
- }
-
- // If session is created by 'connect' function, it has the pipes set
- // already. Otherwise, it's being created by the listener and the pipes
- // are yet to be created.
- if (!in_pipe && !out_pipe) {
-
- pipe_t *inbound = NULL;
- pipe_t *outbound = NULL;
-
- if (options.requires_out) {
- inbound = new (std::nothrow) pipe_t (this, owner,
- options.hwm, options.lwm);
- zmq_assert (inbound);
- in_pipe = &inbound->reader;
- in_pipe->set_endpoint (this);
- }
-
- if (options.requires_in) {
- outbound = new (std::nothrow) pipe_t (owner, this,
- options.hwm, options.lwm);
- zmq_assert (outbound);
- out_pipe = &outbound->writer;
- out_pipe->set_endpoint (this);
- }
-
- send_bind (owner, outbound ? &outbound->reader : NULL,
- inbound ? &inbound->writer : NULL);
- }
}
void zmq::session_t::process_unplug ()
{
- // Unregister the session from the socket. There's nothing to do here
- // for transient sessions.
- if (type == unnamed)
+ // Unregister the session from the socket.
+ if (ordinal)
owner->unregister_session (ordinal);
- else if (type == named)
- owner->unregister_session (name.c_str ());
+ else if (!peer_identity.empty () && peer_identity [0] != 0)
+ owner->unregister_session (peer_identity);
// Ask associated pipes to terminate.
if (in_pipe) {
@@ -232,10 +193,67 @@ void zmq::session_t::process_unplug ()
}
}
-void zmq::session_t::process_attach (i_engine *engine_)
+void zmq::session_t::process_attach (i_engine *engine_,
+ const blob_t &peer_identity_)
{
+ if (!peer_identity.empty ()) {
+
+ // If we already know the peer name do nothing, just check whether
+ // it haven't changed.
+ zmq_assert (peer_identity == peer_identity_);
+ }
+ else if (!peer_identity_.empty ()) {
+
+ // Store the peer identity.
+ peer_identity = peer_identity_;
+
+ // If the session is not registered with the ordinal, let's register
+ // it using the peer name.
+ if (!ordinal) {
+ if (!owner->register_session (peer_identity, this)) {
+
+ // TODO: There's already a session with the specified
+ // identity. We should presumably syslog it and drop the
+ // session.
+ zmq_assert (false);
+ }
+ }
+ }
+
+ // Check whether the required pipes already exist. If not so, we'll
+ // create them and bind them to the socket object.
+ reader_t *socket_reader = NULL;
+ writer_t *socket_writer = NULL;
+
+ if (options.requires_in && !out_pipe) {
+ pipe_t *pipe = new (std::nothrow) pipe_t (owner, this,
+ options.hwm, options.lwm);
+ zmq_assert (pipe);
+ out_pipe = &pipe->writer;
+ out_pipe->set_endpoint (this);
+ socket_reader = &pipe->reader;
+ }
+
+ if (options.requires_out && !in_pipe) {
+ pipe_t *pipe = new (std::nothrow) pipe_t (this, owner,
+ options.hwm, options.lwm);
+ zmq_assert (pipe);
+ in_pipe = &pipe->reader;
+ in_pipe->set_endpoint (this);
+ socket_writer = &pipe->writer;
+ }
+
+ if (socket_reader || socket_writer)
+ send_bind (owner, socket_reader, socket_writer, peer_identity);
+
+ // Plug in the engine.
zmq_assert (!engine);
zmq_assert (engine_);
engine = engine_;
engine->plug (this);
+
+ // Once the initial handshaking is over tracerouting should trim prefixes
+ // from outbound messages.
+ if (options.traceroute)
+ engine->trim_prefix ();
}
diff --git a/src/session.hpp b/src/session.hpp
index 375d095..872748c 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -20,12 +20,11 @@
#ifndef __ZMQ_SESSION_HPP_INCLUDED__
#define __ZMQ_SESSION_HPP_INCLUDED__
-#include <string>
-
#include "i_inout.hpp"
#include "i_endpoint.hpp"
#include "owned.hpp"
#include "options.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -38,10 +37,9 @@ namespace zmq
session_t (object_t *parent_, socket_base_t *owner_,
const options_t &options_);
- // Creates named session. If name is NULL, transient session with
- // auto-generated name is created.
+ // Creates named session.
session_t (object_t *parent_, socket_base_t *owner_,
- const options_t &options_, const char *name_);
+ const options_t &options_, const blob_t &peer_identity_);
// i_inout interface implementation.
bool read (::zmq_msg_t *msg_);
@@ -53,7 +51,8 @@ namespace zmq
uint64_t get_ordinal ();
// i_endpoint interface implementation.
- void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_);
@@ -66,7 +65,8 @@ namespace zmq
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
- void process_attach (struct i_engine *engine_);
+ void process_attach (struct i_engine *engine_,
+ const blob_t &peer_identity_);
// Inbound pipe, i.e. one the session is getting messages from.
class reader_t *in_pipe;
@@ -79,18 +79,13 @@ namespace zmq
struct i_engine *engine;
- enum {
- transient,
- named,
- unnamed
- } type;
-
- // Ordinal of the session (if any).
+ // Session is identified by ordinal in the case when it was created
+ // before connection to the peer was established and thus we are
+ // unaware of peer's identity.
uint64_t ordinal;
- // The name of the session. One that is used to register it with
- // socket-level repository of sessions.
- std::string name;
+ // Identity of the peer.
+ blob_t peer_identity;
// Inherited socket options.
options_t options;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index ef563e5..871f9e9 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -141,6 +141,10 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "inproc") {
+ // TODO: inproc connect is specific with respect to creating pipes
+ // as there's no 'reconnect' functionality implemented. Once that
+ // is in place we should follow generic pipe creation algorithm.
+
// Find the peer socket.
socket_base_t *peer = find_endpoint (addr_args.c_str ());
if (!peer)
@@ -165,13 +169,13 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach the pipes to this socket object.
attach_pipes (in_pipe ? &in_pipe->reader : NULL,
- out_pipe ? &out_pipe->writer : NULL);
+ out_pipe ? &out_pipe->writer : NULL, blob_t ());
// Attach the pipes to the peer socket. Note that peer's seqnum
// was incremented in find_endpoint function. The callee is notified
// about the fact via the last parameter.
send_bind (peer, out_pipe ? &out_pipe->reader : NULL,
- in_pipe ? &in_pipe->writer : NULL, false);
+ in_pipe ? &in_pipe->writer : NULL, options.identity, false);
return 0;
}
@@ -182,31 +186,37 @@ int zmq::socket_base_t::connect (const char *addr_)
this, options);
zmq_assert (session);
- pipe_t *in_pipe = NULL;
- pipe_t *out_pipe = NULL;
+ // If 'immediate connect' feature is required, we'll created the pipes
+ // to the session straight away. Otherwise, they'll be created by the
+ // session once the connection is established.
+ if (options.immediate_connect) {
- // Create inbound pipe, if required.
- if (options.requires_in) {
- in_pipe = new (std::nothrow) pipe_t (this, session,
- options.hwm, options.lwm);
- zmq_assert (in_pipe);
+ pipe_t *in_pipe = NULL;
+ pipe_t *out_pipe = NULL;
- }
+ // Create inbound pipe, if required.
+ if (options.requires_in) {
+ in_pipe = new (std::nothrow) pipe_t (this, session,
+ options.hwm, options.lwm);
+ zmq_assert (in_pipe);
- // Create outbound pipe, if required.
- if (options.requires_out) {
- out_pipe = new (std::nothrow) pipe_t (session, this,
- options.hwm, options.lwm);
- zmq_assert (out_pipe);
- }
+ }
- // Attach the pipes to the socket object.
- attach_pipes (in_pipe ? &in_pipe->reader : NULL,
- out_pipe ? &out_pipe->writer : NULL);
+ // Create outbound pipe, if required.
+ if (options.requires_out) {
+ out_pipe = new (std::nothrow) pipe_t (session, this,
+ options.hwm, options.lwm);
+ zmq_assert (out_pipe);
+ }
- // Attach the pipes to the session object.
- session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
- in_pipe ? &in_pipe->writer : NULL);
+ // Attach the pipes to the socket object.
+ attach_pipes (in_pipe ? &in_pipe->reader : NULL,
+ out_pipe ? &out_pipe->writer : NULL, blob_t ());
+
+ // Attach the pipes to the session object.
+ session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
+ in_pipe ? &in_pipe->writer : NULL, blob_t ());
+ }
// Activate the session.
send_plug (session);
@@ -215,6 +225,8 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "tcp" || addr_type == "ipc") {
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+ // Windows named pipes are not compatible with Winsock API.
+ // There's no UNIX domain socket implementation on OpenVMS.
if (addr_type == "ipc") {
errno = EPROTONOSUPPORT;
return -1;
@@ -254,6 +266,9 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "udp")
udp_encapsulation = true;
+ // At this point we'll create message pipes to the session straight
+ // away. There's no point in delaying it as no concept of 'connect'
+ // exists with PGM anyway.
if (options.requires_out) {
// PGM sender.
@@ -267,7 +282,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
- send_attach (session, pgm_sender);
+ send_attach (session, pgm_sender, blob_t ());
}
else if (options.requires_in) {
@@ -282,7 +297,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
- send_attach (session, pgm_receiver);
+ send_attach (session, pgm_receiver, blob_t ());
}
else
zmq_assert (false);
@@ -456,30 +471,29 @@ bool zmq::socket_base_t::has_out ()
return xhas_out ();
}
-bool zmq::socket_base_t::register_session (const char *name_,
+bool zmq::socket_base_t::register_session (const blob_t &peer_identity_,
session_t *session_)
{
sessions_sync.lock ();
- bool registered =
- named_sessions.insert (std::make_pair (name_, session_)).second;
+ bool registered = named_sessions.insert (
+ std::make_pair (peer_identity_, session_)).second;
sessions_sync.unlock ();
return registered;
}
-void zmq::socket_base_t::unregister_session (const char *name_)
+void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_)
{
sessions_sync.lock ();
- named_sessions_t::iterator it = named_sessions.find (name_);
+ named_sessions_t::iterator it = named_sessions.find (peer_identity_);
zmq_assert (it != named_sessions.end ());
named_sessions.erase (it);
sessions_sync.unlock ();
}
-zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
+zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_)
{
sessions_sync.lock ();
-
- named_sessions_t::iterator it = named_sessions.find (name_);
+ named_sessions_t::iterator it = named_sessions.find (peer_identity_);
if (it == named_sessions.end ()) {
sessions_sync.unlock ();
return NULL;
@@ -541,13 +555,13 @@ void zmq::socket_base_t::revive (reader_t *pipe_)
}
void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
if (inpipe_)
inpipe_->set_endpoint (this);
if (outpipe_)
outpipe_->set_endpoint (this);
- xattach_pipes (inpipe_, outpipe_);
+ xattach_pipes (inpipe_, outpipe_, peer_identity_);
}
void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_)
@@ -567,9 +581,10 @@ void zmq::socket_base_t::process_own (owned_t *object_)
io_objects.insert (object_);
}
-void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
+ const blob_t &peer_identity_)
{
- attach_pipes (in_pipe_, out_pipe_);
+ attach_pipes (in_pipe_, out_pipe_, peer_identity_);
}
void zmq::socket_base_t::process_term_req (owned_t *object_)
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 1ad9ed1..5327acc 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -23,7 +23,6 @@
#include <set>
#include <map>
#include <vector>
-#include <string>
#include "../bindings/c/zmq.h"
@@ -35,6 +34,7 @@
#include "stdint.hpp"
#include "atomic_counter.hpp"
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -78,15 +78,17 @@ namespace zmq
// There are two distinct types of sessions: those identified by name
// and those identified by ordinal number. Thus two sets of session
// management functions.
- bool register_session (const char *name_, class session_t *session_);
- void unregister_session (const char *name_);
- class session_t *find_session (const char *name_);
+ bool register_session (const blob_t &peer_identity_,
+ class session_t *session_);
+ void unregister_session (const blob_t &peer_identity_);
+ class session_t *find_session (const blob_t &peer_identity_);
uint64_t register_session (class session_t *session_);
void unregister_session (uint64_t ordinal_);
class session_t *find_session (uint64_t ordinal_);
// i_endpoint interface implementation.
- void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_);
@@ -99,7 +101,7 @@ namespace zmq
// Pipe management is done by individual socket types.
virtual void xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_) = 0;
+ class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
virtual void xdetach_inpipe (class reader_t *pipe_) = 0;
virtual void xdetach_outpipe (class writer_t *pipe_) = 0;
virtual void xkill (class reader_t *pipe_) = 0;
@@ -121,7 +123,8 @@ namespace zmq
// Handlers for incoming commands.
void process_own (class owned_t *object_);
- void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_);
+ void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_,
+ const blob_t &peer_identity_);
void process_term_req (class owned_t *object_);
void process_term_ack ();
void process_seqnum ();
@@ -155,7 +158,7 @@ namespace zmq
// within the socket, instead they are used by I/O objects owned by
// the socket. As those objects can live in different threads,
// the access is synchronised by mutex.
- typedef std::map <std::string, session_t*> named_sessions_t;
+ typedef std::map <blob_t, session_t*> named_sessions_t;
named_sessions_t named_sessions;
typedef std::map <uint64_t, session_t*> unnamed_sessions_t;
unnamed_sessions_t unnamed_sessions;
diff --git a/src/sub.cpp b/src/sub.cpp
index 31ee222..29ac951 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -28,7 +28,6 @@ zmq::sub_t::sub_t (class app_thread_t *parent_) :
socket_base_t (parent_),
has_message (false)
{
- options.type = ZMQ_SUB;
options.requires_in = true;
options.requires_out = false;
zmq_msg_init (&message);
@@ -40,7 +39,7 @@ zmq::sub_t::~sub_t ()
}
void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && !outpipe_);
fq.attach (inpipe_);
diff --git a/src/sub.hpp b/src/sub.hpp
index 9e7d6cc..8234b77 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -39,7 +39,8 @@ namespace zmq
protected:
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/upstream.cpp b/src/upstream.cpp
index 390dcbe..d7238b9 100644
--- a/src/upstream.cpp
+++ b/src/upstream.cpp
@@ -25,7 +25,6 @@
zmq::upstream_t::upstream_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_UPSTREAM;
options.requires_in = true;
options.requires_out = false;
}
@@ -35,7 +34,7 @@ zmq::upstream_t::~upstream_t ()
}
void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && !outpipe_);
fq.attach (inpipe_);
diff --git a/src/upstream.hpp b/src/upstream.hpp
index 1e6914b..d1ee7b1 100644
--- a/src/upstream.hpp
+++ b/src/upstream.hpp
@@ -34,7 +34,8 @@ namespace zmq
~upstream_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/uuid.hpp b/src/uuid.hpp
index 00424ed..03bb69d 100644
--- a/src/uuid.hpp
+++ b/src/uuid.hpp
@@ -44,6 +44,9 @@ namespace zmq
uuid_t ();
~uuid_t ();
+ // The length of textual representation of UUID.
+ enum { uuid_string_len = 36 };
+
// Returns a pointer to buffer containing the textual
// representation of the UUID. The caller is reponsible to
// free the allocated memory.
@@ -51,9 +54,6 @@ namespace zmq
private:
- // The length of textual representation of UUID.
- enum { uuid_string_len = 36 };
-
#if defined ZMQ_HAVE_WINDOWS
#ifdef ZMQ_HAVE_MINGW32
typedef unsigned char* RPC_CSTR;
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 67a9a39..6fa6bfa 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -21,13 +21,21 @@
#include "xrep.hpp"
#include "err.hpp"
+#include "pipe.hpp"
zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_XREP;
options.requires_in = true;
options.requires_out = true;
+
+ // On connect, pipes are created only after initial handshaking.
+ // That way we are aware of the peer's identity when binding to the pipes.
+ options.immediate_connect = false;
+
+ // XREP socket adds identity to inbound messages and strips identity
+ // from the outbound messages.
+ options.traceroute = true;
}
zmq::xrep_t::~xrep_t ()
@@ -35,12 +43,15 @@ zmq::xrep_t::~xrep_t ()
}
void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
fq.attach (inpipe_);
- zmq_assert (false);
+ // TODO: What if new connection has same peer identity as the old one?
+ bool ok = outpipes.insert (std::make_pair (
+ peer_identity_, outpipe_)).second;
+ zmq_assert (ok);
}
void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
@@ -51,6 +62,12 @@ void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)
{
+ for (outpipes_t::iterator it = outpipes.begin ();
+ it != outpipes.end (); ++it)
+ if (it->second == pipe_) {
+ outpipes.erase (it);
+ return;
+ }
zmq_assert (false);
}
@@ -73,8 +90,35 @@ int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
{
- zmq_assert (false);
- return -1;
+ unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
+ size_t size = zmq_msg_size (msg_);
+
+ // Check whether the message is well-formed.
+ zmq_assert (size >= 1);
+ zmq_assert (size_t (*data + 1) <= size);
+
+ // Find the corresponding outbound pipe. If there's none, just drop the
+ // message.
+ // TODO: There's an allocation here! It's the critical path! Get rid of it!
+ blob_t identity (data + 1, *data);
+ outpipes_t::iterator it = outpipes.find (identity);
+ if (it == outpipes.end ()) {
+ int rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
+ rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
+ // Push message to the selected pipe.
+ it->second->write (msg_);
+ it->second->flush ();
+
+ // Detach the message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return 0;
}
int zmq::xrep_t::xflush ()
@@ -95,8 +139,10 @@ bool zmq::xrep_t::xhas_in ()
bool zmq::xrep_t::xhas_out ()
{
- zmq_assert (false);
- return false;
+ // In theory, XREP socket is always ready for writing. Whether actual
+ // attempt to write succeeds depends on whitch pipe the message is going
+ // to be routed to.
+ return true;
}
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 67ab02d..4534463 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -20,7 +20,10 @@
#ifndef __ZMQ_XREP_HPP_INCLUDED__
#define __ZMQ_XREP_HPP_INCLUDED__
+#include <map>
+
#include "socket_base.hpp"
+#include "blob.hpp"
#include "fq.hpp"
namespace zmq
@@ -34,7 +37,8 @@ namespace zmq
~xrep_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
@@ -51,6 +55,10 @@ namespace zmq
// Inbound messages are fair-queued.
fq_t fq;
+ // Outbound pipes indexed by the peer names.
+ typedef std::map <blob_t, class writer_t*> outpipes_t;
+ outpipes_t outpipes;
+
xrep_t (const xrep_t&);
void operator = (const xrep_t&);
};
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 691b66e..dda924c 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -25,7 +25,6 @@
zmq::xreq_t::xreq_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_REQ;
options.requires_in = true;
options.requires_out = true;
}
@@ -35,7 +34,7 @@ zmq::xreq_t::~xreq_t ()
}
void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
fq.attach (inpipe_);
diff --git a/src/xreq.hpp b/src/xreq.hpp
index d0cbb4f..e23e832 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -35,7 +35,8 @@ namespace zmq
~xreq_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
index f502ffd..b1776df 100644
--- a/src/zmq_decoder.cpp
+++ b/src/zmq_decoder.cpp
@@ -27,9 +27,7 @@
zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :
decoder_t <zmq_decoder_t> (bufsize_),
- destination (NULL),
- prefix (NULL),
- prefix_size (0)
+ destination (NULL)
{
zmq_msg_init (&in_progress);
@@ -39,9 +37,6 @@ zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :
zmq::zmq_decoder_t::~zmq_decoder_t ()
{
- if (prefix)
- free (prefix);
-
zmq_msg_close (&in_progress);
}
@@ -50,13 +45,9 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_)
destination = destination_;
}
-void zmq::zmq_decoder_t::add_prefix (unsigned char *prefix_,
- size_t prefix_size_)
+void zmq::zmq_decoder_t::add_prefix (const blob_t &prefix_)
{
- prefix = malloc (prefix_size_);
- zmq_assert (prefix);
- memcpy (prefix, prefix_, prefix_size_);
- prefix_size = prefix_size_;
+ prefix = prefix_;
}
bool zmq::zmq_decoder_t::one_byte_size_ready ()
@@ -72,15 +63,22 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- int rc = zmq_msg_init_size (&in_progress, prefix_size + *tmpbuf);
- errno_assert (rc == 0);
-
- // Fill in the message prefix if any.
- if (prefix)
- memcpy (zmq_msg_data (&in_progress), prefix, prefix_size);
-
- next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size,
- *tmpbuf, &zmq_decoder_t::message_ready);
+ if (prefix.empty ()) {
+ int rc = zmq_msg_init_size (&in_progress, *tmpbuf);
+ errno_assert (rc == 0);
+ next_step (zmq_msg_data (&in_progress), *tmpbuf,
+ &zmq_decoder_t::message_ready);
+ }
+ else {
+ int rc = zmq_msg_init_size (&in_progress,
+ *tmpbuf + 1 + prefix.size ());
+ errno_assert (rc == 0);
+ unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress);
+ *data = (unsigned char) prefix.size ();
+ memcpy (data + 1, prefix.data (), *data);
+ next_step (data + *data + 1, *tmpbuf,
+ &zmq_decoder_t::message_ready);
+ }
}
return true;
}
@@ -95,15 +93,21 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- int rc = zmq_msg_init_size (&in_progress, prefix_size + size);
- errno_assert (rc == 0);
-
- // Fill in the message prefix if any.
- if (prefix)
- memcpy (zmq_msg_data (&in_progress), prefix, prefix_size);
+ if (prefix.empty ()) {
+ int rc = zmq_msg_init_size (&in_progress, size);
+ errno_assert (rc == 0);
+ next_step (zmq_msg_data (&in_progress), size,
+ &zmq_decoder_t::message_ready);
+ }
+ else {
+ int rc = zmq_msg_init_size (&in_progress, size + 1 + prefix.size ());
+ errno_assert (rc == 0);
+ unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress);
+ *data = (unsigned char) prefix.size ();
+ memcpy (data + 1, prefix.data (), *data);
+ next_step (data + *data + 1, size, &zmq_decoder_t::message_ready);
+ }
- next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size ,
- size, &zmq_decoder_t::message_ready);
return true;
}
diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp
index dfabece..11ee6c2 100644
--- a/src/zmq_decoder.hpp
+++ b/src/zmq_decoder.hpp
@@ -23,6 +23,7 @@
#include "../bindings/c/zmq.h"
#include "decoder.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -41,7 +42,7 @@ namespace zmq
// Once called, all decoded messages will be prefixed by the specified
// prefix.
- void add_prefix (unsigned char *prefix_, size_t prefix_size_);
+ void add_prefix (const blob_t &prefix_);
private:
@@ -53,8 +54,7 @@ namespace zmq
unsigned char tmpbuf [8];
::zmq_msg_t in_progress;
- void *prefix;
- size_t prefix_size;
+ blob_t prefix;
zmq_decoder_t (const zmq_decoder_t&);
void operator = (const zmq_decoder_t&);
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
index a60fe5e..68626fa 100644
--- a/src/zmq_encoder.cpp
+++ b/src/zmq_encoder.cpp
@@ -56,8 +56,9 @@ bool zmq::zmq_encoder_t::size_ready ()
}
else {
size_t prefix_size = *(unsigned char*) zmq_msg_data (&in_progress);
- next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size,
- zmq_msg_size (&in_progress) - prefix_size,
+ next_step (
+ (unsigned char*) zmq_msg_data (&in_progress) + prefix_size + 1,
+ zmq_msg_size (&in_progress) - prefix_size - 1,
&zmq_encoder_t::message_ready, false);
}
return true;
@@ -80,8 +81,13 @@ bool zmq::zmq_encoder_t::message_ready ()
// Get the message size. If the prefix is not to be sent, adjust the
// size accordingly.
size_t size = zmq_msg_size (&in_progress);
- if (trim)
- size -= *(unsigned char*) zmq_msg_data (&in_progress);
+ if (trim) {
+ zmq_assert (size);
+ size_t prefix_size =
+ (*(unsigned char*) zmq_msg_data (&in_progress)) + 1;
+ zmq_assert (prefix_size <= size);
+ size -= prefix_size;
+ }
// For messages less than 255 bytes long, write one byte of message size.
// For longer messages write 0xff escape character followed by 8-byte
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index bda098c..623ca63 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include <new>
#include "zmq_engine.hpp"
@@ -160,11 +162,14 @@ void zmq::zmq_engine_t::revive ()
out_event ();
}
-void zmq::zmq_engine_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::zmq_engine_t::add_prefix (const blob_t &identity_)
+{
+ decoder.add_prefix (identity_);
+}
+
+void zmq::zmq_engine_t::trim_prefix ()
{
encoder.trim_prefix ();
- decoder.add_prefix (identity_, identity_size_);
}
void zmq::zmq_engine_t::error ()
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index 174dd1a..dc90a98 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -47,7 +47,8 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void add_prefix (const blob_t &identity_);
+ void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index 9492caa..3e76cb9 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -17,10 +17,13 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include "zmq_init.hpp"
#include "zmq_engine.hpp"
#include "io_thread.hpp"
#include "session.hpp"
+#include "uuid.hpp"
#include "err.hpp"
zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,
@@ -71,17 +74,21 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
if (received)
return false;
- // Retreieve the remote identity.
- peer_identity.assign ((const char*) zmq_msg_data (msg_),
- zmq_msg_size (msg_));
+ // Retreieve the remote identity. If it's empty, generate a unique name.
+ if (!zmq_msg_size (msg_)) {
+ unsigned char identity [uuid_t::uuid_string_len + 1];
+ identity [0] = 0;
+ memcpy (identity + 1, uuid_t ().to_string (), uuid_t::uuid_string_len);
+ peer_identity.assign (identity, uuid_t::uuid_string_len + 1);
+ }
+ else {
+ peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
+ zmq_msg_size (msg_));
+ }
+ if (options.traceroute)
+ engine->add_prefix (peer_identity);
received = true;
- // Once the initial handshaking is over, XREP sockets should start
- // tracerouting individual messages.
- if (options.type == ZMQ_XREP)
- engine->traceroute ((unsigned char*) peer_identity.data (),
- peer_identity.size ());
-
return true;
}
@@ -160,15 +167,16 @@ void zmq::zmq_init_t::finalise ()
return;
}
}
+ else {
- // If the peer has a unique name, find the associated session. If it
- // doesn't exist, create it.
- else if (!peer_identity.empty ()) {
- session = owner->find_session (peer_identity.c_str ());
+ // If the peer has a unique name, find the associated session.
+ // If it does not exist, create it.
+ zmq_assert (!peer_identity.empty ());
+ session = owner->find_session (peer_identity);
if (!session) {
session = new (std::nothrow) session_t (
choose_io_thread (options.affinity), owner, options,
- peer_identity.c_str ());
+ peer_identity);
zmq_assert (session);
send_plug (session);
send_own (owner, session);
@@ -178,21 +186,8 @@ void zmq::zmq_init_t::finalise ()
}
}
- // If the other party has no specific identity, let's create a
- // transient session.
- else {
- session = new (std::nothrow) session_t (
- choose_io_thread (options.affinity), owner, options, NULL);
- zmq_assert (session);
- send_plug (session);
- send_own (owner, session);
-
- // Reserve a sequence number for following 'attach' command.
- session->inc_seqnum ();
- }
-
- // No need to increment seqnum as it was laready incremented above.
- send_attach (session, engine, false);
+ // No need to increment seqnum as it was already incremented above.
+ send_attach (session, engine, peer_identity, false);
// Destroy the init object.
engine = NULL;
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index df14293..6f935c2 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -20,8 +20,6 @@
#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__
-#include <string>
-
#include "i_inout.hpp"
#include "i_engine.hpp"
#include "owned.hpp"
@@ -29,6 +27,7 @@
#include "stdint.hpp"
#include "options.hpp"
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -72,7 +71,7 @@ namespace zmq
bool received;
// Identity of the peer socket.
- std::string peer_identity;
+ blob_t peer_identity;
// TCP connecter creates session before the name of the peer is known.
// Thus we know only its ordinal number.