summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-02-12 15:57:54 +0100
committerMartin Sustrik <sustrik@250bpm.com>2010-02-12 15:57:54 +0100
commit36a576370ccfed3c104850b5b95a6ed3870edbea (patch)
treedbe228c0a27b7f75cc47a5b46da09f965e3ec5d3 /src
parent7b4cf2a4d040057f6f378cac2cd125513a859c1b (diff)
Multi-hop REQ/REP, part I., tracerouting switched on on XREP socket
Diffstat (limited to 'src')
-rw-r--r--src/i_engine.hpp8
-rw-r--r--src/pgm_receiver.cpp9
-rw-r--r--src/pgm_receiver.hpp1
-rw-r--r--src/pgm_sender.cpp9
-rw-r--r--src/pgm_sender.hpp1
-rw-r--r--src/zmq_decoder.cpp27
-rw-r--r--src/zmq_decoder.hpp6
-rw-r--r--src/zmq_encoder.cpp13
-rw-r--r--src/zmq_encoder.hpp8
-rw-r--r--src/zmq_engine.cpp11
-rw-r--r--src/zmq_engine.hpp1
-rw-r--r--src/zmq_init.cpp6
12 files changed, 75 insertions, 25 deletions
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index d5d7811..bcb4297 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -20,6 +20,8 @@
#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__
#define __ZMQ_I_ENGINE_HPP_INCLUDED__
+#include <stddef.h>
+
namespace zmq
{
@@ -36,6 +38,12 @@ namespace zmq
// This method is called by the session to signalise that there
// are messages to send available.
virtual void revive () = 0;
+
+ // Start tracing the message route. Engine should add the identity
+ // supplied to all inbound messages and trim identity from all the
+ // outbound messages.
+ virtual void traceroute (unsigned char *identity_,
+ size_t identity_size_) = 0;
};
}
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index d0310cc..a2ba9c6 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -88,6 +88,13 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
+void zmq::pgm_receiver_t::traceroute (unsigned char *identity_,
+ size_t identity_size_)
+{
+ // No need for tracerouting functionality in PGM socket at the moment.
+ zmq_assert (false);
+}
+
void zmq::pgm_receiver_t::in_event ()
{
// Read data from the underlying pgm_socket.
@@ -151,7 +158,7 @@ void zmq::pgm_receiver_t::in_event ()
it->second.joined = true;
// Create and connect decoder for the peer.
- it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0);
+ it->second.decoder = new (std::nothrow) zmq_decoder_t (0);
it->second.decoder->set_inout (inout);
}
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index 3d6a212..f03551f 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -54,6 +54,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
+ void traceroute (unsigned char *identity_, size_t identity_size_);
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 3f08d8e..fa7d7e0 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -36,7 +36,7 @@
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
const options_t &options_) :
io_object_t (parent_),
- encoder (0, false),
+ encoder (0),
pgm_socket (false, options_),
options (options_),
out_buffer (NULL),
@@ -102,6 +102,13 @@ void zmq::pgm_sender_t::revive ()
out_event ();
}
+void zmq::pgm_sender_t::traceroute (unsigned char *identity_,
+ size_t identity_size_)
+{
+ // No need for tracerouting functionality in PGM socket at the moment.
+ zmq_assert (false);
+}
+
zmq::pgm_sender_t::~pgm_sender_t ()
{
if (out_buffer) {
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 43adb8b..89357f5 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -52,6 +52,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
+ void traceroute (unsigned char *identity_, size_t identity_size_);
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
index cc4f846..f502ffd 100644
--- a/src/zmq_decoder.cpp
+++ b/src/zmq_decoder.cpp
@@ -25,24 +25,14 @@
#include "wire.hpp"
#include "err.hpp"
-zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_,
- void *prefix_, size_t prefix_size_) :
+zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :
decoder_t <zmq_decoder_t> (bufsize_),
- destination (NULL)
+ destination (NULL),
+ prefix (NULL),
+ prefix_size (0)
{
zmq_msg_init (&in_progress);
- if (!prefix_) {
- prefix = NULL;
- prefix_size = 0;
- }
- else {
- prefix = malloc (prefix_size_);
- zmq_assert (prefix);
- memcpy (prefix, prefix_, prefix_size_);
- prefix_size = prefix_size_;
- }
-
// At the beginning, read one byte and go to one_byte_size_ready state.
next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready);
}
@@ -60,6 +50,15 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_)
destination = destination_;
}
+void zmq::zmq_decoder_t::add_prefix (unsigned char *prefix_,
+ size_t prefix_size_)
+{
+ prefix = malloc (prefix_size_);
+ zmq_assert (prefix);
+ memcpy (prefix, prefix_, prefix_size_);
+ prefix_size = prefix_size_;
+}
+
bool zmq::zmq_decoder_t::one_byte_size_ready ()
{
// First byte of size is read. If it is 0xff read 8-byte size.
diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp
index 5d9133b..dfabece 100644
--- a/src/zmq_decoder.hpp
+++ b/src/zmq_decoder.hpp
@@ -34,11 +34,15 @@ namespace zmq
// If prefix is not NULL, it will be glued to the beginning of every
// decoded message.
- zmq_decoder_t (size_t bufsize_, void *prefix_, size_t prefix_size_);
+ zmq_decoder_t (size_t bufsize_);
~zmq_decoder_t ();
void set_inout (struct i_inout *destination_);
+ // Once called, all decoded messages will be prefixed by the specified
+ // prefix.
+ void add_prefix (unsigned char *prefix_, size_t prefix_size_);
+
private:
bool one_byte_size_ready ();
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
index 89649ef..a60fe5e 100644
--- a/src/zmq_encoder.cpp
+++ b/src/zmq_encoder.cpp
@@ -21,10 +21,10 @@
#include "i_inout.hpp"
#include "wire.hpp"
-zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_, bool trim_prefix_) :
+zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) :
encoder_t <zmq_encoder_t> (bufsize_),
source (NULL),
- trim_prefix (trim_prefix_)
+ trim (false)
{
zmq_msg_init (&in_progress);
@@ -42,10 +42,15 @@ void zmq::zmq_encoder_t::set_inout (i_inout *source_)
source = source_;
}
+void zmq::zmq_encoder_t::trim_prefix ()
+{
+ trim = true;
+}
+
bool zmq::zmq_encoder_t::size_ready ()
{
// Write message body into the buffer.
- if (!trim_prefix) {
+ if (!trim) {
next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
&zmq_encoder_t::message_ready, false);
}
@@ -75,7 +80,7 @@ bool zmq::zmq_encoder_t::message_ready ()
// Get the message size. If the prefix is not to be sent, adjust the
// size accordingly.
size_t size = zmq_msg_size (&in_progress);
- if (trim_prefix)
+ if (trim)
size -= *(unsigned char*) zmq_msg_data (&in_progress);
// For messages less than 255 bytes long, write one byte of message size.
diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp
index 8eedef8..0e23fcd 100644
--- a/src/zmq_encoder.hpp
+++ b/src/zmq_encoder.hpp
@@ -32,11 +32,15 @@ namespace zmq
{
public:
- zmq_encoder_t (size_t bufsize_, bool trim_prefix_);
+ zmq_encoder_t (size_t bufsize_);
~zmq_encoder_t ();
void set_inout (struct i_inout *source_);
+ // Once called, encoder will start trimming frefixes from outbound
+ // messages.
+ void trim_prefix ();
+
private:
bool size_ready ();
@@ -46,7 +50,7 @@ namespace zmq
::zmq_msg_t in_progress;
unsigned char tmpbuf [9];
- bool trim_prefix;
+ bool trim;
zmq_encoder_t (const zmq_encoder_t&);
void operator = (const zmq_encoder_t&);
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 01fe98c..bda098c 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -32,10 +32,10 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
io_object_t (parent_),
inpos (NULL),
insize (0),
- decoder (in_batch_size, NULL, 0),
+ decoder (in_batch_size),
outpos (NULL),
outsize (0),
- encoder (out_batch_size, false),
+ encoder (out_batch_size),
inout (NULL),
options (options_),
reconnect (reconnect_)
@@ -160,6 +160,13 @@ void zmq::zmq_engine_t::revive ()
out_event ();
}
+void zmq::zmq_engine_t::traceroute (unsigned char *identity_,
+ size_t identity_size_)
+{
+ encoder.trim_prefix ();
+ decoder.add_prefix (identity_, identity_size_);
+}
+
void zmq::zmq_engine_t::error ()
{
zmq_assert (inout);
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index ddd0931..174dd1a 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -47,6 +47,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
+ void traceroute (unsigned char *identity_, size_t identity_size_);
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index 37aafe6..9492caa 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -76,6 +76,12 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
zmq_msg_size (msg_));
received = true;
+ // Once the initial handshaking is over, XREP sockets should start
+ // tracerouting individual messages.
+ if (options.type == ZMQ_XREP)
+ engine->traceroute ((unsigned char*) peer_identity.data (),
+ peer_identity.size ());
+
return true;
}