summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am2
-rw-r--r--src/blob.hpp33
-rw-r--r--src/command.cpp38
-rw-r--r--src/command.hpp7
-rw-r--r--src/dispatcher.cpp4
-rw-r--r--src/downstream.cpp3
-rw-r--r--src/downstream.hpp3
-rw-r--r--src/i_endpoint.hpp4
-rw-r--r--src/i_engine.hpp12
-rw-r--r--src/object.cpp67
-rw-r--r--src/object.hpp11
-rw-r--r--src/options.cpp16
-rw-r--r--src/options.hpp17
-rw-r--r--src/p2p.cpp3
-rw-r--r--src/p2p.hpp3
-rw-r--r--src/pgm_receiver.cpp9
-rw-r--r--src/pgm_receiver.hpp3
-rw-r--r--src/pgm_sender.cpp9
-rw-r--r--src/pgm_sender.hpp3
-rw-r--r--src/pgm_socket.cpp7
-rw-r--r--src/pub.cpp3
-rw-r--r--src/pub.hpp3
-rw-r--r--src/rep.cpp8
-rw-r--r--src/rep.hpp3
-rw-r--r--src/req.cpp3
-rw-r--r--src/req.hpp3
-rw-r--r--src/session.cpp134
-rw-r--r--src/session.hpp29
-rw-r--r--src/socket_base.cpp87
-rw-r--r--src/socket_base.hpp19
-rw-r--r--src/sub.cpp3
-rw-r--r--src/sub.hpp3
-rw-r--r--src/upstream.cpp3
-rw-r--r--src/upstream.hpp3
-rw-r--r--src/uuid.hpp6
-rw-r--r--src/xrep.cpp60
-rw-r--r--src/xrep.hpp10
-rw-r--r--src/xreq.cpp3
-rw-r--r--src/xreq.hpp3
-rw-r--r--src/zmq_decoder.cpp62
-rw-r--r--src/zmq_decoder.hpp6
-rw-r--r--src/zmq_encoder.cpp14
-rw-r--r--src/zmq_engine.cpp11
-rw-r--r--src/zmq_engine.hpp3
-rw-r--r--src/zmq_init.cpp53
-rw-r--r--src/zmq_init.hpp5
46 files changed, 528 insertions, 266 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 446b1e2..4146f68 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -63,6 +63,7 @@ libzmq_la_SOURCES = app_thread.hpp \
atomic_bitmap.hpp \
atomic_counter.hpp \
atomic_ptr.hpp \
+ blob.hpp \
command.hpp \
config.hpp \
decoder.hpp \
@@ -131,6 +132,7 @@ libzmq_la_SOURCES = app_thread.hpp \
zmq_init.hpp \
zmq_listener.hpp \
app_thread.cpp \
+ command.cpp \
devpoll.cpp \
dispatcher.cpp \
downstream.cpp \
diff --git a/src/blob.hpp b/src/blob.hpp
new file mode 100644
index 0000000..a4fa8cd
--- /dev/null
+++ b/src/blob.hpp
@@ -0,0 +1,33 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_BLOB_HPP_INCLUDED__
+#define __ZMQ_BLOB_HPP_INCLUDED__
+
+#include <string>
+
+namespace zmq
+{
+
+ // Object to hold dynamically allocated opaque binary data.
+ typedef std::basic_string <unsigned char> blob_t;
+
+}
+
+#endif
diff --git a/src/command.cpp b/src/command.cpp
new file mode 100644
index 0000000..8bf7ea2
--- /dev/null
+++ b/src/command.cpp
@@ -0,0 +1,38 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <stdlib.h>
+
+#include "command.hpp"
+
+void zmq::deallocate_command (command_t *cmd_)
+{
+ switch (cmd_->type) {
+ case command_t::attach:
+ if (cmd_->args.attach.peer_identity)
+ free (cmd_->args.attach.peer_identity);
+ break;
+ case command_t::bind:
+ if (cmd_->args.bind.peer_identity)
+ free (cmd_->args.bind.peer_identity);
+ break;
+ default:
+ /* noop */;
+ }
+}
diff --git a/src/command.hpp b/src/command.hpp
index 469d6ec..150cad1 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -66,6 +66,8 @@ namespace zmq
// Attach the engine to the session.
struct {
struct i_engine *engine;
+ unsigned char peer_identity_size;
+ unsigned char *peer_identity;
} attach;
// Sent from session to socket to establish pipe(s) between them.
@@ -73,6 +75,8 @@ namespace zmq
struct {
class reader_t *in_pipe;
class writer_t *out_pipe;
+ unsigned char peer_identity_size;
+ unsigned char *peer_identity;
} bind;
// Sent by pipe writer to inform dormant pipe reader that there
@@ -107,6 +111,9 @@ namespace zmq
} args;
};
+ // Function to deallocate dynamically allocated components of the command.
+ void deallocate_command (command_t *cmd_);
+
}
#endif
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 8aafcf8..4233278 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -117,6 +117,10 @@ zmq::dispatcher_t::~dispatcher_t ()
while (!pipes.empty ())
delete *pipes.begin ();
+ // TODO: Deallocate any commands still in the pipes. Keep in mind that
+ // simple reading from a pipe and deallocating commands won't do as
+ // command pipe has template parameter D set to true, meaning that
+ // read may return false even if there are still commands in the pipe.
delete [] command_pipes;
#ifdef ZMQ_HAVE_WINDOWS
diff --git a/src/downstream.cpp b/src/downstream.cpp
index 29b0689..3431264 100644
--- a/src/downstream.cpp
+++ b/src/downstream.cpp
@@ -26,7 +26,6 @@
zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_DOWNSTREAM;
options.requires_in = false;
options.requires_out = true;
}
@@ -36,7 +35,7 @@ zmq::downstream_t::~downstream_t ()
}
void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_ && outpipe_);
lb.attach (outpipe_);
diff --git a/src/downstream.hpp b/src/downstream.hpp
index 35dec95..dbd79a5 100644
--- a/src/downstream.hpp
+++ b/src/downstream.hpp
@@ -34,7 +34,8 @@ namespace zmq
~downstream_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/i_endpoint.hpp b/src/i_endpoint.hpp
index d60b39e..ddab6a4 100644
--- a/src/i_endpoint.hpp
+++ b/src/i_endpoint.hpp
@@ -20,6 +20,8 @@
#ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__
#define __ZMQ_I_ENDPOINT_HPP_INCLUDED__
+#include "blob.hpp"
+
namespace zmq
{
@@ -28,7 +30,7 @@ namespace zmq
virtual ~i_endpoint () {}
virtual void attach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_) = 0;
+ class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
virtual void detach_inpipe (class reader_t *pipe_) = 0;
virtual void detach_outpipe (class writer_t *pipe_) = 0;
virtual void kill (class reader_t *pipe_) = 0;
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index bcb4297..81b56df 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -22,6 +22,8 @@
#include <stddef.h>
+#include "blob.hpp"
+
namespace zmq
{
@@ -39,11 +41,11 @@ namespace zmq
// are messages to send available.
virtual void revive () = 0;
- // Start tracing the message route. Engine should add the identity
- // supplied to all inbound messages and trim identity from all the
- // outbound messages.
- virtual void traceroute (unsigned char *identity_,
- size_t identity_size_) = 0;
+ // Engine should add the prefix supplied to all inbound messages.
+ virtual void add_prefix (const blob_t &identity_) = 0;
+
+ // Engine should trim prefix from all the outbound messages.
+ virtual void trim_prefix () = 0;
};
}
diff --git a/src/object.cpp b/src/object.cpp
index a977f39..356fcd1 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include "object.hpp"
#include "dispatcher.hpp"
#include "err.hpp"
@@ -77,17 +79,21 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::own:
process_own (cmd_.args.own.object);
- return;
+ break;
case command_t::attach:
- process_attach (cmd_.args.attach.engine);
+ process_attach (cmd_.args.attach.engine,
+ blob_t (cmd_.args.attach.peer_identity,
+ cmd_.args.attach.peer_identity_size));
process_seqnum ();
- return;
+ break;
case command_t::bind:
- process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);
+ process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe,
+ blob_t (cmd_.args.bind.peer_identity,
+ cmd_.args.bind.peer_identity_size));
process_seqnum ();
- return;
+ break;
case command_t::pipe_term:
process_pipe_term ();
@@ -95,23 +101,27 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::pipe_term_ack:
process_pipe_term_ack ();
- return;
+ break;
case command_t::term_req:
process_term_req (cmd_.args.term_req.object);
- return;
+ break;
case command_t::term:
process_term ();
- return;
+ break;
case command_t::term_ack:
process_term_ack ();
- return;
+ break;
default:
zmq_assert (false);
}
+
+ // The assumption here is that each command is processed once only,
+ // so deallocating it after processing is all right.
+ deallocate_command (&cmd_);
}
void zmq::object_t::register_pipe (class pipe_t *pipe_)
@@ -176,7 +186,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
}
void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
- bool inc_seqnum_)
+ const blob_t &peer_identity_, bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
@@ -185,11 +195,26 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
cmd.destination = destination_;
cmd.type = command_t::attach;
cmd.args.attach.engine = engine_;
+ if (peer_identity_.empty ()) {
+ cmd.args.attach.peer_identity_size = 0;
+ cmd.args.attach.peer_identity = NULL;
+ }
+ else {
+ zmq_assert (peer_identity_.size () <= 0xff);
+ cmd.args.attach.peer_identity_size =
+ (unsigned char) peer_identity_.size ();
+ cmd.args.attach.peer_identity =
+ (unsigned char*) malloc (peer_identity_.size ());
+ zmq_assert (cmd.args.attach.peer_identity_size);
+ memcpy (cmd.args.attach.peer_identity, peer_identity_.data (),
+ peer_identity_.size ());
+ }
send_command (cmd);
}
void zmq::object_t::send_bind (socket_base_t *destination_,
- reader_t *in_pipe_, writer_t *out_pipe_, bool inc_seqnum_)
+ reader_t *in_pipe_, writer_t *out_pipe_, const blob_t &peer_identity_,
+ bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
@@ -199,6 +224,20 @@ void zmq::object_t::send_bind (socket_base_t *destination_,
cmd.type = command_t::bind;
cmd.args.bind.in_pipe = in_pipe_;
cmd.args.bind.out_pipe = out_pipe_;
+ if (peer_identity_.empty ()) {
+ cmd.args.bind.peer_identity_size = 0;
+ cmd.args.bind.peer_identity = NULL;
+ }
+ else {
+ zmq_assert (peer_identity_.size () <= 0xff);
+ cmd.args.bind.peer_identity_size =
+ (unsigned char) peer_identity_.size ();
+ cmd.args.bind.peer_identity =
+ (unsigned char*) malloc (peer_identity_.size ());
+ zmq_assert (cmd.args.bind.peer_identity_size);
+ memcpy (cmd.args.bind.peer_identity, peer_identity_.data (),
+ peer_identity_.size ());
+ }
send_command (cmd);
}
@@ -267,12 +306,14 @@ void zmq::object_t::process_own (owned_t *object_)
zmq_assert (false);
}
-void zmq::object_t::process_attach (i_engine *engine_)
+void zmq::object_t::process_attach (i_engine *engine_,
+ const blob_t &peer_identity_)
{
zmq_assert (false);
}
-void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
+ const blob_t &peer_identity_)
{
zmq_assert (false);
}
diff --git a/src/object.hpp b/src/object.hpp
index e6b2379..1544109 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -21,6 +21,7 @@
#define __ZMQ_OBJECT_HPP_INCLUDED__
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -64,10 +65,11 @@ namespace zmq
void send_own (class socket_base_t *destination_,
class owned_t *object_);
void send_attach (class session_t *destination_,
- struct i_engine *engine_, bool inc_seqnum_ = true);
+ struct i_engine *engine_, const blob_t &peer_identity_,
+ bool inc_seqnum_ = true);
void send_bind (class socket_base_t *destination_,
class reader_t *in_pipe_, class writer_t *out_pipe_,
- bool inc_seqnum_ = true);
+ const blob_t &peer_identity_, bool inc_seqnum_ = true);
void send_revive (class object_t *destination_);
void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_);
@@ -81,9 +83,10 @@ namespace zmq
virtual void process_stop ();
virtual void process_plug ();
virtual void process_own (class owned_t *object_);
- virtual void process_attach (struct i_engine *engine_);
+ virtual void process_attach (struct i_engine *engine_,
+ const blob_t &peer_identity_);
virtual void process_bind (class reader_t *in_pipe_,
- class writer_t *out_pipe_);
+ class writer_t *out_pipe_, const blob_t &peer_identity_);
virtual void process_revive ();
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
diff --git a/src/options.cpp b/src/options.cpp
index f9d93d6..f78d8de 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -23,7 +23,6 @@
#include "err.hpp"
zmq::options_t::options_t () :
- type (-1),
hwm (0),
lwm (0),
swap (0),
@@ -34,7 +33,9 @@ zmq::options_t::options_t () :
sndbuf (0),
rcvbuf (0),
requires_in (false),
- requires_out (false)
+ requires_out (false),
+ immediate_connect (true),
+ traceroute (false)
{
}
@@ -76,7 +77,16 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return 0;
case ZMQ_IDENTITY:
- identity.assign ((const char*) optval_, optvallen_);
+
+ // Empty identity is invalid as well as identity longer than
+ // 255 bytes. Identity starting with binary zero is invalid
+ // as these are used for auto-generated identities.
+ if (optvallen_ < 1 || optvallen_ > 255 ||
+ *((const unsigned char*) optval_) == 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ identity.assign ((const unsigned char*) optval_, optvallen_);
return 0;
case ZMQ_RATE:
diff --git a/src/options.hpp b/src/options.hpp
index dbe3701..6d9be4d 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -20,10 +20,9 @@
#ifndef __ZMQ_OPTIONS_HPP_INCLUDED__
#define __ZMQ_OPTIONS_HPP_INCLUDED__
-#include <string>
-
#include "stddef.h"
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -34,14 +33,11 @@ namespace zmq
int setsockopt (int option_, const void *optval_, size_t optvallen_);
- // Type of the associated socket. One of the constants defined in zmq.h
- int type;
-
int64_t hwm;
int64_t lwm;
int64_t swap;
uint64_t affinity;
- std::string identity;
+ blob_t identity;
// Maximum tranfer rate [kb/s]. Default 100kb/s.
uint32_t rate;
@@ -59,6 +55,15 @@ namespace zmq
// provided by the specific socket type.
bool requires_in;
bool requires_out;
+
+ // If true, when connecting, pipes are created immediately without
+ // waiting for the connection to be established. That way the socket
+ // is not aware of the peer's identity, however, it is able to send
+ // messages straight away.
+ bool immediate_connect;
+
+ // If true, socket requires tracerouting the messages.
+ bool traceroute;
};
}
diff --git a/src/p2p.cpp b/src/p2p.cpp
index 46bbd0b..ca7a8f5 100644
--- a/src/p2p.cpp
+++ b/src/p2p.cpp
@@ -29,7 +29,6 @@ zmq::p2p_t::p2p_t (class app_thread_t *parent_) :
outpipe (NULL),
alive (true)
{
- options.type = ZMQ_P2P;
options.requires_in = true;
options.requires_out = true;
}
@@ -43,7 +42,7 @@ zmq::p2p_t::~p2p_t ()
}
void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe && !outpipe);
inpipe = inpipe_;
diff --git a/src/p2p.hpp b/src/p2p.hpp
index 2ff1047..bca0eab 100644
--- a/src/p2p.hpp
+++ b/src/p2p.hpp
@@ -33,7 +33,8 @@ namespace zmq
~p2p_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index a2ba9c6..e708229 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -88,8 +88,13 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
-void zmq::pgm_receiver_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_)
+{
+ // No need for tracerouting functionality in PGM socket at the moment.
+ zmq_assert (false);
+}
+
+void zmq::pgm_receiver_t::trim_prefix ()
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index f03551f..3f0ef81 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -54,7 +54,8 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void add_prefix (const blob_t &identity_);
+ void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index fa7d7e0..27b4d0c 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -102,8 +102,13 @@ void zmq::pgm_sender_t::revive ()
out_event ();
}
-void zmq::pgm_sender_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::pgm_sender_t::add_prefix (const blob_t &identity_)
+{
+ // No need for tracerouting functionality in PGM socket at the moment.
+ zmq_assert (false);
+}
+
+void zmq::pgm_sender_t::trim_prefix ()
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 89357f5..951c417 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -52,7 +52,8 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void add_prefix (const blob_t &identity_);
+ void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 1eeb34f..462a3a9 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -89,8 +89,11 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if (options.identity.size () > 0) {
- // Create gsi from identity string.
- gsi_base = options.identity;
+ // Create gsi from identity.
+ // TODO: We assume that identity is standard C string here.
+ // What if it contains binary zeroes?
+ gsi_base.assign ((const char*) options.identity.data (),
+ options.identity.size ());
} else {
// Generate random gsi.
diff --git a/src/pub.cpp b/src/pub.cpp
index 9a2dcc6..5b9d48c 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -27,7 +27,6 @@
zmq::pub_t::pub_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_PUB;
options.requires_in = false;
options.requires_out = true;
}
@@ -40,7 +39,7 @@ zmq::pub_t::~pub_t ()
}
void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_);
out_pipes.push_back (outpipe_);
diff --git a/src/pub.hpp b/src/pub.hpp
index 5b2f348..26142a4 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -34,7 +34,8 @@ namespace zmq
~pub_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/rep.cpp b/src/rep.cpp
index 1649e83..755d78e 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -30,9 +30,13 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) :
waiting_for_reply (false),
reply_pipe (NULL)
{
- options.type = ZMQ_REP;
options.requires_in = true;
options.requires_out = true;
+
+ // We don't need immediate connect. We'll be able to send messages
+ // (replies) only when connection is established and thus requests
+ // can arrive anyway.
+ options.immediate_connect = false;
}
zmq::rep_t::~rep_t ()
@@ -40,7 +44,7 @@ zmq::rep_t::~rep_t ()
}
void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
diff --git a/src/rep.hpp b/src/rep.hpp
index 7170da7..7ead321 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -34,7 +34,8 @@ namespace zmq
~rep_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/req.cpp b/src/req.cpp
index 9b1766e..735f0aa 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -30,7 +30,6 @@ zmq::req_t::req_t (class app_thread_t *parent_) :
reply_pipe_active (false),
reply_pipe (NULL)
{
- options.type = ZMQ_REQ;
options.requires_in = true;
options.requires_out = true;
}
@@ -40,7 +39,7 @@ zmq::req_t::~req_t ()
}
void zmq::req_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
diff --git a/src/req.hpp b/src/req.hpp
index 60ee5e7..da8e61a 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -34,7 +34,8 @@ namespace zmq
~req_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/session.cpp b/src/session.cpp
index 1aece4d..05f319c 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -32,9 +32,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
out_pipe (NULL),
engine (NULL),
options (options_)
-{
- type = unnamed;
-
+{
// It's possible to register the session at this point as it will be
// searched for only on reconnect, i.e. no race condition (session found
// before it is plugged into it's I/O thread) is possible.
@@ -42,23 +40,24 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
}
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
- const options_t &options_, const char *name_) :
+ const options_t &options_, const blob_t &peer_identity_) :
owned_t (parent_, owner_),
in_pipe (NULL),
active (true),
out_pipe (NULL),
engine (NULL),
+ ordinal (0),
+ peer_identity (peer_identity_),
options (options_)
{
- if (name_) {
- type = named;
- name = name_;
- ordinal = 0;
- }
- else {
- type = transient;
- // TODO: Generate unique name here.
- ordinal = 0;
+ if (!peer_identity.empty () && peer_identity [0] != 0) {
+ if (!owner->register_session (peer_identity, this)) {
+
+ // TODO: There's already a session with the specified
+ // identity. We should presumably syslog it and drop the
+ // session.
+ zmq_assert (false);
+ }
}
}
@@ -104,7 +103,7 @@ void zmq::session_t::detach (owned_t *reconnecter_)
engine = NULL;
// Terminate transient session.
- if (type == transient)
+ if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0))
term ();
}
@@ -120,13 +119,12 @@ class zmq::socket_base_t *zmq::session_t::get_owner ()
uint64_t zmq::session_t::get_ordinal ()
{
- zmq_assert (type == unnamed);
zmq_assert (ordinal);
return ordinal;
}
void zmq::session_t::attach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
if (inpipe_) {
zmq_assert (!in_pipe);
@@ -168,52 +166,15 @@ void zmq::session_t::revive (reader_t *pipe_)
void zmq::session_t::process_plug ()
{
- // Register the session with the socket.
- if (!name.empty ()) {
- bool ok = owner->register_session (name.c_str (), this);
-
- // There's already a session with the specified identity.
- // We should syslog it and drop the session. TODO
- zmq_assert (ok);
- }
-
- // If session is created by 'connect' function, it has the pipes set
- // already. Otherwise, it's being created by the listener and the pipes
- // are yet to be created.
- if (!in_pipe && !out_pipe) {
-
- pipe_t *inbound = NULL;
- pipe_t *outbound = NULL;
-
- if (options.requires_out) {
- inbound = new (std::nothrow) pipe_t (this, owner,
- options.hwm, options.lwm);
- zmq_assert (inbound);
- in_pipe = &inbound->reader;
- in_pipe->set_endpoint (this);
- }
-
- if (options.requires_in) {
- outbound = new (std::nothrow) pipe_t (owner, this,
- options.hwm, options.lwm);
- zmq_assert (outbound);
- out_pipe = &outbound->writer;
- out_pipe->set_endpoint (this);
- }
-
- send_bind (owner, outbound ? &outbound->reader : NULL,
- inbound ? &inbound->writer : NULL);
- }
}
void zmq::session_t::process_unplug ()
{
- // Unregister the session from the socket. There's nothing to do here
- // for transient sessions.
- if (type == unnamed)
+ // Unregister the session from the socket.
+ if (ordinal)
owner->unregister_session (ordinal);
- else if (type == named)
- owner->unregister_session (name.c_str ());
+ else if (!peer_identity.empty () && peer_identity [0] != 0)
+ owner->unregister_session (peer_identity);
// Ask associated pipes to terminate.
if (in_pipe) {
@@ -232,10 +193,67 @@ void zmq::session_t::process_unplug ()
}
}
-void zmq::session_t::process_attach (i_engine *engine_)
+void zmq::session_t::process_attach (i_engine *engine_,
+ const blob_t &peer_identity_)
{
+ if (!peer_identity.empty ()) {
+
+ // If we already know the peer name do nothing, just check whether
+ // it haven't changed.
+ zmq_assert (peer_identity == peer_identity_);
+ }
+ else if (!peer_identity_.empty ()) {
+
+ // Store the peer identity.
+ peer_identity = peer_identity_;
+
+ // If the session is not registered with the ordinal, let's register
+ // it using the peer name.
+ if (!ordinal) {
+ if (!owner->register_session (peer_identity, this)) {
+
+ // TODO: There's already a session with the specified
+ // identity. We should presumably syslog it and drop the
+ // session.
+ zmq_assert (false);
+ }
+ }
+ }
+
+ // Check whether the required pipes already exist. If not so, we'll
+ // create them and bind them to the socket object.
+ reader_t *socket_reader = NULL;
+ writer_t *socket_writer = NULL;
+
+ if (options.requires_in && !out_pipe) {
+ pipe_t *pipe = new (std::nothrow) pipe_t (owner, this,
+ options.hwm, options.lwm);
+ zmq_assert (pipe);
+ out_pipe = &pipe->writer;
+ out_pipe->set_endpoint (this);
+ socket_reader = &pipe->reader;
+ }
+
+ if (options.requires_out && !in_pipe) {
+ pipe_t *pipe = new (std::nothrow) pipe_t (this, owner,
+ options.hwm, options.lwm);
+ zmq_assert (pipe);
+ in_pipe = &pipe->reader;
+ in_pipe->set_endpoint (this);
+ socket_writer = &pipe->writer;
+ }
+
+ if (socket_reader || socket_writer)
+ send_bind (owner, socket_reader, socket_writer, peer_identity);
+
+ // Plug in the engine.
zmq_assert (!engine);
zmq_assert (engine_);
engine = engine_;
engine->plug (this);
+
+ // Once the initial handshaking is over tracerouting should trim prefixes
+ // from outbound messages.
+ if (options.traceroute)
+ engine->trim_prefix ();
}
diff --git a/src/session.hpp b/src/session.hpp
index 375d095..872748c 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -20,12 +20,11 @@
#ifndef __ZMQ_SESSION_HPP_INCLUDED__
#define __ZMQ_SESSION_HPP_INCLUDED__
-#include <string>
-
#include "i_inout.hpp"
#include "i_endpoint.hpp"
#include "owned.hpp"
#include "options.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -38,10 +37,9 @@ namespace zmq
session_t (object_t *parent_, socket_base_t *owner_,
const options_t &options_);
- // Creates named session. If name is NULL, transient session with
- // auto-generated name is created.
+ // Creates named session.
session_t (object_t *parent_, socket_base_t *owner_,
- const options_t &options_, const char *name_);
+ const options_t &options_, const blob_t &peer_identity_);
// i_inout interface implementation.
bool read (::zmq_msg_t *msg_);
@@ -53,7 +51,8 @@ namespace zmq
uint64_t get_ordinal ();
// i_endpoint interface implementation.
- void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_);
@@ -66,7 +65,8 @@ namespace zmq
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
- void process_attach (struct i_engine *engine_);
+ void process_attach (struct i_engine *engine_,
+ const blob_t &peer_identity_);
// Inbound pipe, i.e. one the session is getting messages from.
class reader_t *in_pipe;
@@ -79,18 +79,13 @@ namespace zmq
struct i_engine *engine;
- enum {
- transient,
- named,
- unnamed
- } type;
-
- // Ordinal of the session (if any).
+ // Session is identified by ordinal in the case when it was created
+ // before connection to the peer was established and thus we are
+ // unaware of peer's identity.
uint64_t ordinal;
- // The name of the session. One that is used to register it with
- // socket-level repository of sessions.
- std::string name;
+ // Identity of the peer.
+ blob_t peer_identity;
// Inherited socket options.
options_t options;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index ef563e5..871f9e9 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -141,6 +141,10 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "inproc") {
+ // TODO: inproc connect is specific with respect to creating pipes
+ // as there's no 'reconnect' functionality implemented. Once that
+ // is in place we should follow generic pipe creation algorithm.
+
// Find the peer socket.
socket_base_t *peer = find_endpoint (addr_args.c_str ());
if (!peer)
@@ -165,13 +169,13 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach the pipes to this socket object.
attach_pipes (in_pipe ? &in_pipe->reader : NULL,
- out_pipe ? &out_pipe->writer : NULL);
+ out_pipe ? &out_pipe->writer : NULL, blob_t ());
// Attach the pipes to the peer socket. Note that peer's seqnum
// was incremented in find_endpoint function. The callee is notified
// about the fact via the last parameter.
send_bind (peer, out_pipe ? &out_pipe->reader : NULL,
- in_pipe ? &in_pipe->writer : NULL, false);
+ in_pipe ? &in_pipe->writer : NULL, options.identity, false);
return 0;
}
@@ -182,31 +186,37 @@ int zmq::socket_base_t::connect (const char *addr_)
this, options);
zmq_assert (session);
- pipe_t *in_pipe = NULL;
- pipe_t *out_pipe = NULL;
+ // If 'immediate connect' feature is required, we'll created the pipes
+ // to the session straight away. Otherwise, they'll be created by the
+ // session once the connection is established.
+ if (options.immediate_connect) {
- // Create inbound pipe, if required.
- if (options.requires_in) {
- in_pipe = new (std::nothrow) pipe_t (this, session,
- options.hwm, options.lwm);
- zmq_assert (in_pipe);
+ pipe_t *in_pipe = NULL;
+ pipe_t *out_pipe = NULL;
- }
+ // Create inbound pipe, if required.
+ if (options.requires_in) {
+ in_pipe = new (std::nothrow) pipe_t (this, session,
+ options.hwm, options.lwm);
+ zmq_assert (in_pipe);
- // Create outbound pipe, if required.
- if (options.requires_out) {
- out_pipe = new (std::nothrow) pipe_t (session, this,
- options.hwm, options.lwm);
- zmq_assert (out_pipe);
- }
+ }
- // Attach the pipes to the socket object.
- attach_pipes (in_pipe ? &in_pipe->reader : NULL,
- out_pipe ? &out_pipe->writer : NULL);
+ // Create outbound pipe, if required.
+ if (options.requires_out) {
+ out_pipe = new (std::nothrow) pipe_t (session, this,
+ options.hwm, options.lwm);
+ zmq_assert (out_pipe);
+ }
- // Attach the pipes to the session object.
- session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
- in_pipe ? &in_pipe->writer : NULL);
+ // Attach the pipes to the socket object.
+ attach_pipes (in_pipe ? &in_pipe->reader : NULL,
+ out_pipe ? &out_pipe->writer : NULL, blob_t ());
+
+ // Attach the pipes to the session object.
+ session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
+ in_pipe ? &in_pipe->writer : NULL, blob_t ());
+ }
// Activate the session.
send_plug (session);
@@ -215,6 +225,8 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "tcp" || addr_type == "ipc") {
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+ // Windows named pipes are not compatible with Winsock API.
+ // There's no UNIX domain socket implementation on OpenVMS.
if (addr_type == "ipc") {
errno = EPROTONOSUPPORT;
return -1;
@@ -254,6 +266,9 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "udp")
udp_encapsulation = true;
+ // At this point we'll create message pipes to the session straight
+ // away. There's no point in delaying it as no concept of 'connect'
+ // exists with PGM anyway.
if (options.requires_out) {
// PGM sender.
@@ -267,7 +282,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
- send_attach (session, pgm_sender);
+ send_attach (session, pgm_sender, blob_t ());
}
else if (options.requires_in) {
@@ -282,7 +297,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
- send_attach (session, pgm_receiver);
+ send_attach (session, pgm_receiver, blob_t ());
}
else
zmq_assert (false);
@@ -456,30 +471,29 @@ bool zmq::socket_base_t::has_out ()
return xhas_out ();
}
-bool zmq::socket_base_t::register_session (const char *name_,
+bool zmq::socket_base_t::register_session (const blob_t &peer_identity_,
session_t *session_)
{
sessions_sync.lock ();
- bool registered =
- named_sessions.insert (std::make_pair (name_, session_)).second;
+ bool registered = named_sessions.insert (
+ std::make_pair (peer_identity_, session_)).second;
sessions_sync.unlock ();
return registered;
}
-void zmq::socket_base_t::unregister_session (const char *name_)
+void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_)
{
sessions_sync.lock ();
- named_sessions_t::iterator it = named_sessions.find (name_);
+ named_sessions_t::iterator it = named_sessions.find (peer_identity_);
zmq_assert (it != named_sessions.end ());
named_sessions.erase (it);
sessions_sync.unlock ();
}
-zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
+zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_)
{
sessions_sync.lock ();
-
- named_sessions_t::iterator it = named_sessions.find (name_);
+ named_sessions_t::iterator it = named_sessions.find (peer_identity_);
if (it == named_sessions.end ()) {
sessions_sync.unlock ();
return NULL;
@@ -541,13 +555,13 @@ void zmq::socket_base_t::revive (reader_t *pipe_)
}
void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
if (inpipe_)
inpipe_->set_endpoint (this);
if (outpipe_)
outpipe_->set_endpoint (this);
- xattach_pipes (inpipe_, outpipe_);
+ xattach_pipes (inpipe_, outpipe_, peer_identity_);
}
void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_)
@@ -567,9 +581,10 @@ void zmq::socket_base_t::process_own (owned_t *object_)
io_objects.insert (object_);
}
-void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
+ const blob_t &peer_identity_)
{
- attach_pipes (in_pipe_, out_pipe_);
+ attach_pipes (in_pipe_, out_pipe_, peer_identity_);
}
void zmq::socket_base_t::process_term_req (owned_t *object_)
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 1ad9ed1..5327acc 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -23,7 +23,6 @@
#include <set>
#include <map>
#include <vector>
-#include <string>
#include "../bindings/c/zmq.h"
@@ -35,6 +34,7 @@
#include "stdint.hpp"
#include "atomic_counter.hpp"
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -78,15 +78,17 @@ namespace zmq
// There are two distinct types of sessions: those identified by name
// and those identified by ordinal number. Thus two sets of session
// management functions.
- bool register_session (const char *name_, class session_t *session_);
- void unregister_session (const char *name_);
- class session_t *find_session (const char *name_);
+ bool register_session (const blob_t &peer_identity_,
+ class session_t *session_);
+ void unregister_session (const blob_t &peer_identity_);
+ class session_t *find_session (const blob_t &peer_identity_);
uint64_t register_session (class session_t *session_);
void unregister_session (uint64_t ordinal_);
class session_t *find_session (uint64_t ordinal_);
// i_endpoint interface implementation.
- void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_);
@@ -99,7 +101,7 @@ namespace zmq
// Pipe management is done by individual socket types.
virtual void xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_) = 0;
+ class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
virtual void xdetach_inpipe (class reader_t *pipe_) = 0;
virtual void xdetach_outpipe (class writer_t *pipe_) = 0;
virtual void xkill (class reader_t *pipe_) = 0;
@@ -121,7 +123,8 @@ namespace zmq
// Handlers for incoming commands.
void process_own (class owned_t *object_);
- void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_);
+ void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_,
+ const blob_t &peer_identity_);
void process_term_req (class owned_t *object_);
void process_term_ack ();
void process_seqnum ();
@@ -155,7 +158,7 @@ namespace zmq
// within the socket, instead they are used by I/O objects owned by
// the socket. As those objects can live in different threads,
// the access is synchronised by mutex.
- typedef std::map <std::string, session_t*> named_sessions_t;
+ typedef std::map <blob_t, session_t*> named_sessions_t;
named_sessions_t named_sessions;
typedef std::map <uint64_t, session_t*> unnamed_sessions_t;
unnamed_sessions_t unnamed_sessions;
diff --git a/src/sub.cpp b/src/sub.cpp
index 31ee222..29ac951 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -28,7 +28,6 @@ zmq::sub_t::sub_t (class app_thread_t *parent_) :
socket_base_t (parent_),
has_message (false)
{
- options.type = ZMQ_SUB;
options.requires_in = true;
options.requires_out = false;
zmq_msg_init (&message);
@@ -40,7 +39,7 @@ zmq::sub_t::~sub_t ()
}
void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && !outpipe_);
fq.attach (inpipe_);
diff --git a/src/sub.hpp b/src/sub.hpp
index 9e7d6cc..8234b77 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -39,7 +39,8 @@ namespace zmq
protected:
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/upstream.cpp b/src/upstream.cpp
index 390dcbe..d7238b9 100644
--- a/src/upstream.cpp
+++ b/src/upstream.cpp
@@ -25,7 +25,6 @@
zmq::upstream_t::upstream_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_UPSTREAM;
options.requires_in = true;
options.requires_out = false;
}
@@ -35,7 +34,7 @@ zmq::upstream_t::~upstream_t ()
}
void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && !outpipe_);
fq.attach (inpipe_);
diff --git a/src/upstream.hpp b/src/upstream.hpp
index 1e6914b..d1ee7b1 100644
--- a/src/upstream.hpp
+++ b/src/upstream.hpp
@@ -34,7 +34,8 @@ namespace zmq
~upstream_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/uuid.hpp b/src/uuid.hpp
index 00424ed..03bb69d 100644
--- a/src/uuid.hpp
+++ b/src/uuid.hpp
@@ -44,6 +44,9 @@ namespace zmq
uuid_t ();
~uuid_t ();
+ // The length of textual representation of UUID.
+ enum { uuid_string_len = 36 };
+
// Returns a pointer to buffer containing the textual
// representation of the UUID. The caller is reponsible to
// free the allocated memory.
@@ -51,9 +54,6 @@ namespace zmq
private:
- // The length of textual representation of UUID.
- enum { uuid_string_len = 36 };
-
#if defined ZMQ_HAVE_WINDOWS
#ifdef ZMQ_HAVE_MINGW32
typedef unsigned char* RPC_CSTR;
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 67a9a39..6fa6bfa 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -21,13 +21,21 @@
#include "xrep.hpp"
#include "err.hpp"
+#include "pipe.hpp"
zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_XREP;
options.requires_in = true;
options.requires_out = true;
+
+ // On connect, pipes are created only after initial handshaking.
+ // That way we are aware of the peer's identity when binding to the pipes.
+ options.immediate_connect = false;
+
+ // XREP socket adds identity to inbound messages and strips identity
+ // from the outbound messages.
+ options.traceroute = true;
}
zmq::xrep_t::~xrep_t ()
@@ -35,12 +43,15 @@ zmq::xrep_t::~xrep_t ()
}
void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
fq.attach (inpipe_);
- zmq_assert (false);
+ // TODO: What if new connection has same peer identity as the old one?
+ bool ok = outpipes.insert (std::make_pair (
+ peer_identity_, outpipe_)).second;
+ zmq_assert (ok);
}
void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
@@ -51,6 +62,12 @@ void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)
{
+ for (outpipes_t::iterator it = outpipes.begin ();
+ it != outpipes.end (); ++it)
+ if (it->second == pipe_) {
+ outpipes.erase (it);
+ return;
+ }
zmq_assert (false);
}
@@ -73,8 +90,35 @@ int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
{
- zmq_assert (false);
- return -1;
+ unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
+ size_t size = zmq_msg_size (msg_);
+
+ // Check whether the message is well-formed.
+ zmq_assert (size >= 1);
+ zmq_assert (size_t (*data + 1) <= size);
+
+ // Find the corresponding outbound pipe. If there's none, just drop the
+ // message.
+ // TODO: There's an allocation here! It's the critical path! Get rid of it!
+ blob_t identity (data + 1, *data);
+ outpipes_t::iterator it = outpipes.find (identity);
+ if (it == outpipes.end ()) {
+ int rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
+ rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+ return 0;
+ }
+
+ // Push message to the selected pipe.
+ it->second->write (msg_);
+ it->second->flush ();
+
+ // Detach the message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return 0;
}
int zmq::xrep_t::xflush ()
@@ -95,8 +139,10 @@ bool zmq::xrep_t::xhas_in ()
bool zmq::xrep_t::xhas_out ()
{
- zmq_assert (false);
- return false;
+ // In theory, XREP socket is always ready for writing. Whether actual
+ // attempt to write succeeds depends on whitch pipe the message is going
+ // to be routed to.
+ return true;
}
diff --git a/src/xrep.hpp b/src/xrep.hpp
index 67ab02d..4534463 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -20,7 +20,10 @@
#ifndef __ZMQ_XREP_HPP_INCLUDED__
#define __ZMQ_XREP_HPP_INCLUDED__
+#include <map>
+
#include "socket_base.hpp"
+#include "blob.hpp"
#include "fq.hpp"
namespace zmq
@@ -34,7 +37,8 @@ namespace zmq
~xrep_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
@@ -51,6 +55,10 @@ namespace zmq
// Inbound messages are fair-queued.
fq_t fq;
+ // Outbound pipes indexed by the peer names.
+ typedef std::map <blob_t, class writer_t*> outpipes_t;
+ outpipes_t outpipes;
+
xrep_t (const xrep_t&);
void operator = (const xrep_t&);
};
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 691b66e..dda924c 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -25,7 +25,6 @@
zmq::xreq_t::xreq_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
- options.type = ZMQ_REQ;
options.requires_in = true;
options.requires_out = true;
}
@@ -35,7 +34,7 @@ zmq::xreq_t::~xreq_t ()
}
void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_)
+ class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
fq.attach (inpipe_);
diff --git a/src/xreq.hpp b/src/xreq.hpp
index d0cbb4f..e23e832 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -35,7 +35,8 @@ namespace zmq
~xreq_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
+ const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
index f502ffd..b1776df 100644
--- a/src/zmq_decoder.cpp
+++ b/src/zmq_decoder.cpp
@@ -27,9 +27,7 @@
zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :
decoder_t <zmq_decoder_t> (bufsize_),
- destination (NULL),
- prefix (NULL),
- prefix_size (0)
+ destination (NULL)
{
zmq_msg_init (&in_progress);
@@ -39,9 +37,6 @@ zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :
zmq::zmq_decoder_t::~zmq_decoder_t ()
{
- if (prefix)
- free (prefix);
-
zmq_msg_close (&in_progress);
}
@@ -50,13 +45,9 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_)
destination = destination_;
}
-void zmq::zmq_decoder_t::add_prefix (unsigned char *prefix_,
- size_t prefix_size_)
+void zmq::zmq_decoder_t::add_prefix (const blob_t &prefix_)
{
- prefix = malloc (prefix_size_);
- zmq_assert (prefix);
- memcpy (prefix, prefix_, prefix_size_);
- prefix_size = prefix_size_;
+ prefix = prefix_;
}
bool zmq::zmq_decoder_t::one_byte_size_ready ()
@@ -72,15 +63,22 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- int rc = zmq_msg_init_size (&in_progress, prefix_size + *tmpbuf);
- errno_assert (rc == 0);
-
- // Fill in the message prefix if any.
- if (prefix)
- memcpy (zmq_msg_data (&in_progress), prefix, prefix_size);
-
- next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size,
- *tmpbuf, &zmq_decoder_t::message_ready);
+ if (prefix.empty ()) {
+ int rc = zmq_msg_init_size (&in_progress, *tmpbuf);
+ errno_assert (rc == 0);
+ next_step (zmq_msg_data (&in_progress), *tmpbuf,
+ &zmq_decoder_t::message_ready);
+ }
+ else {
+ int rc = zmq_msg_init_size (&in_progress,
+ *tmpbuf + 1 + prefix.size ());
+ errno_assert (rc == 0);
+ unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress);
+ *data = (unsigned char) prefix.size ();
+ memcpy (data + 1, prefix.data (), *data);
+ next_step (data + *data + 1, *tmpbuf,
+ &zmq_decoder_t::message_ready);
+ }
}
return true;
}
@@ -95,15 +93,21 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- int rc = zmq_msg_init_size (&in_progress, prefix_size + size);
- errno_assert (rc == 0);
-
- // Fill in the message prefix if any.
- if (prefix)
- memcpy (zmq_msg_data (&in_progress), prefix, prefix_size);
+ if (prefix.empty ()) {
+ int rc = zmq_msg_init_size (&in_progress, size);
+ errno_assert (rc == 0);
+ next_step (zmq_msg_data (&in_progress), size,
+ &zmq_decoder_t::message_ready);
+ }
+ else {
+ int rc = zmq_msg_init_size (&in_progress, size + 1 + prefix.size ());
+ errno_assert (rc == 0);
+ unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress);
+ *data = (unsigned char) prefix.size ();
+ memcpy (data + 1, prefix.data (), *data);
+ next_step (data + *data + 1, size, &zmq_decoder_t::message_ready);
+ }
- next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size ,
- size, &zmq_decoder_t::message_ready);
return true;
}
diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp
index dfabece..11ee6c2 100644
--- a/src/zmq_decoder.hpp
+++ b/src/zmq_decoder.hpp
@@ -23,6 +23,7 @@
#include "../bindings/c/zmq.h"
#include "decoder.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -41,7 +42,7 @@ namespace zmq
// Once called, all decoded messages will be prefixed by the specified
// prefix.
- void add_prefix (unsigned char *prefix_, size_t prefix_size_);
+ void add_prefix (const blob_t &prefix_);
private:
@@ -53,8 +54,7 @@ namespace zmq
unsigned char tmpbuf [8];
::zmq_msg_t in_progress;
- void *prefix;
- size_t prefix_size;
+ blob_t prefix;
zmq_decoder_t (const zmq_decoder_t&);
void operator = (const zmq_decoder_t&);
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
index a60fe5e..68626fa 100644
--- a/src/zmq_encoder.cpp
+++ b/src/zmq_encoder.cpp
@@ -56,8 +56,9 @@ bool zmq::zmq_encoder_t::size_ready ()
}
else {
size_t prefix_size = *(unsigned char*) zmq_msg_data (&in_progress);
- next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size,
- zmq_msg_size (&in_progress) - prefix_size,
+ next_step (
+ (unsigned char*) zmq_msg_data (&in_progress) + prefix_size + 1,
+ zmq_msg_size (&in_progress) - prefix_size - 1,
&zmq_encoder_t::message_ready, false);
}
return true;
@@ -80,8 +81,13 @@ bool zmq::zmq_encoder_t::message_ready ()
// Get the message size. If the prefix is not to be sent, adjust the
// size accordingly.
size_t size = zmq_msg_size (&in_progress);
- if (trim)
- size -= *(unsigned char*) zmq_msg_data (&in_progress);
+ if (trim) {
+ zmq_assert (size);
+ size_t prefix_size =
+ (*(unsigned char*) zmq_msg_data (&in_progress)) + 1;
+ zmq_assert (prefix_size <= size);
+ size -= prefix_size;
+ }
// For messages less than 255 bytes long, write one byte of message size.
// For longer messages write 0xff escape character followed by 8-byte
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index bda098c..623ca63 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include <new>
#include "zmq_engine.hpp"
@@ -160,11 +162,14 @@ void zmq::zmq_engine_t::revive ()
out_event ();
}
-void zmq::zmq_engine_t::traceroute (unsigned char *identity_,
- size_t identity_size_)
+void zmq::zmq_engine_t::add_prefix (const blob_t &identity_)
+{
+ decoder.add_prefix (identity_);
+}
+
+void zmq::zmq_engine_t::trim_prefix ()
{
encoder.trim_prefix ();
- decoder.add_prefix (identity_, identity_size_);
}
void zmq::zmq_engine_t::error ()
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index 174dd1a..dc90a98 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -47,7 +47,8 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
- void traceroute (unsigned char *identity_, size_t identity_size_);
+ void add_prefix (const blob_t &identity_);
+ void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index 9492caa..3e76cb9 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -17,10 +17,13 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include "zmq_init.hpp"
#include "zmq_engine.hpp"
#include "io_thread.hpp"
#include "session.hpp"
+#include "uuid.hpp"
#include "err.hpp"
zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,
@@ -71,17 +74,21 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
if (received)
return false;
- // Retreieve the remote identity.
- peer_identity.assign ((const char*) zmq_msg_data (msg_),
- zmq_msg_size (msg_));
+ // Retreieve the remote identity. If it's empty, generate a unique name.
+ if (!zmq_msg_size (msg_)) {
+ unsigned char identity [uuid_t::uuid_string_len + 1];
+ identity [0] = 0;
+ memcpy (identity + 1, uuid_t ().to_string (), uuid_t::uuid_string_len);
+ peer_identity.assign (identity, uuid_t::uuid_string_len + 1);
+ }
+ else {
+ peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
+ zmq_msg_size (msg_));
+ }
+ if (options.traceroute)
+ engine->add_prefix (peer_identity);
received = true;
- // Once the initial handshaking is over, XREP sockets should start
- // tracerouting individual messages.
- if (options.type == ZMQ_XREP)
- engine->traceroute ((unsigned char*) peer_identity.data (),
- peer_identity.size ());
-
return true;
}
@@ -160,15 +167,16 @@ void zmq::zmq_init_t::finalise ()
return;
}
}
+ else {
- // If the peer has a unique name, find the associated session. If it
- // doesn't exist, create it.
- else if (!peer_identity.empty ()) {
- session = owner->find_session (peer_identity.c_str ());
+ // If the peer has a unique name, find the associated session.
+ // If it does not exist, create it.
+ zmq_assert (!peer_identity.empty ());
+ session = owner->find_session (peer_identity);
if (!session) {
session = new (std::nothrow) session_t (
choose_io_thread (options.affinity), owner, options,
- peer_identity.c_str ());
+ peer_identity);
zmq_assert (session);
send_plug (session);
send_own (owner, session);
@@ -178,21 +186,8 @@ void zmq::zmq_init_t::finalise ()
}
}
- // If the other party has no specific identity, let's create a
- // transient session.
- else {
- session = new (std::nothrow) session_t (
- choose_io_thread (options.affinity), owner, options, NULL);
- zmq_assert (session);
- send_plug (session);
- send_own (owner, session);
-
- // Reserve a sequence number for following 'attach' command.
- session->inc_seqnum ();
- }
-
- // No need to increment seqnum as it was laready incremented above.
- send_attach (session, engine, false);
+ // No need to increment seqnum as it was already incremented above.
+ send_attach (session, engine, peer_identity, false);
// Destroy the init object.
engine = NULL;
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index df14293..6f935c2 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -20,8 +20,6 @@
#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__
-#include <string>
-
#include "i_inout.hpp"
#include "i_engine.hpp"
#include "owned.hpp"
@@ -29,6 +27,7 @@
#include "stdint.hpp"
#include "options.hpp"
#include "stdint.hpp"
+#include "blob.hpp"
namespace zmq
{
@@ -72,7 +71,7 @@ namespace zmq
bool received;
// Identity of the peer socket.
- std::string peer_identity;
+ blob_t peer_identity;
// TCP connecter creates session before the name of the peer is known.
// Thus we know only its ordinal number.