summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/downstream.cpp1
-rw-r--r--src/i_engine.hpp8
-rw-r--r--src/options.cpp1
-rw-r--r--src/options.hpp3
-rw-r--r--src/p2p.cpp1
-rw-r--r--src/pgm_receiver.cpp9
-rw-r--r--src/pgm_receiver.hpp1
-rw-r--r--src/pgm_sender.cpp9
-rw-r--r--src/pgm_sender.hpp1
-rw-r--r--src/pub.cpp1
-rw-r--r--src/rep.cpp1
-rw-r--r--src/req.cpp1
-rw-r--r--src/sub.cpp1
-rw-r--r--src/upstream.cpp1
-rw-r--r--src/xrep.cpp1
-rw-r--r--src/xreq.cpp1
-rw-r--r--src/zmq_decoder.cpp27
-rw-r--r--src/zmq_decoder.hpp6
-rw-r--r--src/zmq_encoder.cpp13
-rw-r--r--src/zmq_encoder.hpp8
-rw-r--r--src/zmq_engine.cpp11
-rw-r--r--src/zmq_engine.hpp1
-rw-r--r--src/zmq_init.cpp6
23 files changed, 88 insertions, 25 deletions
diff --git a/src/downstream.cpp b/src/downstream.cpp
index 2da08e3..29b0689 100644
--- a/src/downstream.cpp
+++ b/src/downstream.cpp
@@ -26,6 +26,7 @@
zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
+ options.type = ZMQ_DOWNSTREAM;
options.requires_in = false;
options.requires_out = true;
}
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index d5d7811..bcb4297 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -20,6 +20,8 @@
#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__
#define __ZMQ_I_ENGINE_HPP_INCLUDED__
+#include <stddef.h>
+
namespace zmq
{
@@ -36,6 +38,12 @@ namespace zmq
// This method is called by the session to signalise that there
// are messages to send available.
virtual void revive () = 0;
+
+ // Start tracing the message route. Engine should add the identity
+ // supplied to all inbound messages and trim identity from all the
+ // outbound messages.
+ virtual void traceroute (unsigned char *identity_,
+ size_t identity_size_) = 0;
};
}
diff --git a/src/options.cpp b/src/options.cpp
index cc02798..f9d93d6 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -23,6 +23,7 @@
#include "err.hpp"
zmq::options_t::options_t () :
+ type (-1),
hwm (0),
lwm (0),
swap (0),
diff --git a/src/options.hpp b/src/options.hpp
index b066d48..dbe3701 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -34,6 +34,9 @@ namespace zmq
int setsockopt (int option_, const void *optval_, size_t optvallen_);
+ // Type of the associated socket. One of the constants defined in zmq.h
+ int type;
+
int64_t hwm;
int64_t lwm;
int64_t swap;
diff --git a/src/p2p.cpp b/src/p2p.cpp
index 72bc26b..46bbd0b 100644
--- a/src/p2p.cpp
+++ b/src/p2p.cpp
@@ -29,6 +29,7 @@ zmq::p2p_t::p2p_t (class app_thread_t *parent_) :
outpipe (NULL),
alive (true)
{
+ options.type = ZMQ_P2P;
options.requires_in = true;
options.requires_out = true;
}
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index d0310cc..a2ba9c6 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -88,6 +88,13 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
+void zmq::pgm_receiver_t::traceroute (unsigned char *identity_,
+ size_t identity_size_)
+{
+ // No need for tracerouting functionality in PGM socket at the moment.
+ zmq_assert (false);
+}
+
void zmq::pgm_receiver_t::in_event ()
{
// Read data from the underlying pgm_socket.
@@ -151,7 +158,7 @@ void zmq::pgm_receiver_t::in_event ()
it->second.joined = true;
// Create and connect decoder for the peer.
- it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0);
+ it->second.decoder = new (std::nothrow) zmq_decoder_t (0);
it->second.decoder->set_inout (inout);
}
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index 3d6a212..f03551f 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -54,6 +54,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
+ void traceroute (unsigned char *identity_, size_t identity_size_);
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 3f08d8e..fa7d7e0 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -36,7 +36,7 @@
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
const options_t &options_) :
io_object_t (parent_),
- encoder (0, false),
+ encoder (0),
pgm_socket (false, options_),
options (options_),
out_buffer (NULL),
@@ -102,6 +102,13 @@ void zmq::pgm_sender_t::revive ()
out_event ();
}
+void zmq::pgm_sender_t::traceroute (unsigned char *identity_,
+ size_t identity_size_)
+{
+ // No need for tracerouting functionality in PGM socket at the moment.
+ zmq_assert (false);
+}
+
zmq::pgm_sender_t::~pgm_sender_t ()
{
if (out_buffer) {
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 43adb8b..89357f5 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -52,6 +52,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
+ void traceroute (unsigned char *identity_, size_t identity_size_);
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pub.cpp b/src/pub.cpp
index 05bfdcf..9a2dcc6 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -27,6 +27,7 @@
zmq::pub_t::pub_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
+ options.type = ZMQ_PUB;
options.requires_in = false;
options.requires_out = true;
}
diff --git a/src/rep.cpp b/src/rep.cpp
index b6bffae..b7685b4 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -30,6 +30,7 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) :
waiting_for_reply (false),
reply_pipe (NULL)
{
+ options.type = ZMQ_REP;
options.requires_in = true;
options.requires_out = true;
}
diff --git a/src/req.cpp b/src/req.cpp
index c9240e0..9b1766e 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -30,6 +30,7 @@ zmq::req_t::req_t (class app_thread_t *parent_) :
reply_pipe_active (false),
reply_pipe (NULL)
{
+ options.type = ZMQ_REQ;
options.requires_in = true;
options.requires_out = true;
}
diff --git a/src/sub.cpp b/src/sub.cpp
index 06ed896..31ee222 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -28,6 +28,7 @@ zmq::sub_t::sub_t (class app_thread_t *parent_) :
socket_base_t (parent_),
has_message (false)
{
+ options.type = ZMQ_SUB;
options.requires_in = true;
options.requires_out = false;
zmq_msg_init (&message);
diff --git a/src/upstream.cpp b/src/upstream.cpp
index bdcd5ef..390dcbe 100644
--- a/src/upstream.cpp
+++ b/src/upstream.cpp
@@ -25,6 +25,7 @@
zmq::upstream_t::upstream_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
+ options.type = ZMQ_UPSTREAM;
options.requires_in = true;
options.requires_out = false;
}
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 4cba21c..67a9a39 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -25,6 +25,7 @@
zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
+ options.type = ZMQ_XREP;
options.requires_in = true;
options.requires_out = true;
}
diff --git a/src/xreq.cpp b/src/xreq.cpp
index a4310f8..691b66e 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -25,6 +25,7 @@
zmq::xreq_t::xreq_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
+ options.type = ZMQ_REQ;
options.requires_in = true;
options.requires_out = true;
}
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
index cc4f846..f502ffd 100644
--- a/src/zmq_decoder.cpp
+++ b/src/zmq_decoder.cpp
@@ -25,24 +25,14 @@
#include "wire.hpp"
#include "err.hpp"
-zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_,
- void *prefix_, size_t prefix_size_) :
+zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :
decoder_t <zmq_decoder_t> (bufsize_),
- destination (NULL)
+ destination (NULL),
+ prefix (NULL),
+ prefix_size (0)
{
zmq_msg_init (&in_progress);
- if (!prefix_) {
- prefix = NULL;
- prefix_size = 0;
- }
- else {
- prefix = malloc (prefix_size_);
- zmq_assert (prefix);
- memcpy (prefix, prefix_, prefix_size_);
- prefix_size = prefix_size_;
- }
-
// At the beginning, read one byte and go to one_byte_size_ready state.
next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready);
}
@@ -60,6 +50,15 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_)
destination = destination_;
}
+void zmq::zmq_decoder_t::add_prefix (unsigned char *prefix_,
+ size_t prefix_size_)
+{
+ prefix = malloc (prefix_size_);
+ zmq_assert (prefix);
+ memcpy (prefix, prefix_, prefix_size_);
+ prefix_size = prefix_size_;
+}
+
bool zmq::zmq_decoder_t::one_byte_size_ready ()
{
// First byte of size is read. If it is 0xff read 8-byte size.
diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp
index 5d9133b..dfabece 100644
--- a/src/zmq_decoder.hpp
+++ b/src/zmq_decoder.hpp
@@ -34,11 +34,15 @@ namespace zmq
// If prefix is not NULL, it will be glued to the beginning of every
// decoded message.
- zmq_decoder_t (size_t bufsize_, void *prefix_, size_t prefix_size_);
+ zmq_decoder_t (size_t bufsize_);
~zmq_decoder_t ();
void set_inout (struct i_inout *destination_);
+ // Once called, all decoded messages will be prefixed by the specified
+ // prefix.
+ void add_prefix (unsigned char *prefix_, size_t prefix_size_);
+
private:
bool one_byte_size_ready ();
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
index 89649ef..a60fe5e 100644
--- a/src/zmq_encoder.cpp
+++ b/src/zmq_encoder.cpp
@@ -21,10 +21,10 @@
#include "i_inout.hpp"
#include "wire.hpp"
-zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_, bool trim_prefix_) :
+zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) :
encoder_t <zmq_encoder_t> (bufsize_),
source (NULL),
- trim_prefix (trim_prefix_)
+ trim (false)
{
zmq_msg_init (&in_progress);
@@ -42,10 +42,15 @@ void zmq::zmq_encoder_t::set_inout (i_inout *source_)
source = source_;
}
+void zmq::zmq_encoder_t::trim_prefix ()
+{
+ trim = true;
+}
+
bool zmq::zmq_encoder_t::size_ready ()
{
// Write message body into the buffer.
- if (!trim_prefix) {
+ if (!trim) {
next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
&zmq_encoder_t::message_ready, false);
}
@@ -75,7 +80,7 @@ bool zmq::zmq_encoder_t::message_ready ()
// Get the message size. If the prefix is not to be sent, adjust the
// size accordingly.
size_t size = zmq_msg_size (&in_progress);
- if (trim_prefix)
+ if (trim)
size -= *(unsigned char*) zmq_msg_data (&in_progress);
// For messages less than 255 bytes long, write one byte of message size.
diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp
index 8eedef8..0e23fcd 100644
--- a/src/zmq_encoder.hpp
+++ b/src/zmq_encoder.hpp
@@ -32,11 +32,15 @@ namespace zmq
{
public:
- zmq_encoder_t (size_t bufsize_, bool trim_prefix_);
+ zmq_encoder_t (size_t bufsize_);
~zmq_encoder_t ();
void set_inout (struct i_inout *source_);
+ // Once called, encoder will start trimming frefixes from outbound
+ // messages.
+ void trim_prefix ();
+
private:
bool size_ready ();
@@ -46,7 +50,7 @@ namespace zmq
::zmq_msg_t in_progress;
unsigned char tmpbuf [9];
- bool trim_prefix;
+ bool trim;
zmq_encoder_t (const zmq_encoder_t&);
void operator = (const zmq_encoder_t&);
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 01fe98c..bda098c 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -32,10 +32,10 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
io_object_t (parent_),
inpos (NULL),
insize (0),
- decoder (in_batch_size, NULL, 0),
+ decoder (in_batch_size),
outpos (NULL),
outsize (0),
- encoder (out_batch_size, false),
+ encoder (out_batch_size),
inout (NULL),
options (options_),
reconnect (reconnect_)
@@ -160,6 +160,13 @@ void zmq::zmq_engine_t::revive ()
out_event ();
}
+void zmq::zmq_engine_t::traceroute (unsigned char *identity_,
+ size_t identity_size_)
+{
+ encoder.trim_prefix ();
+ decoder.add_prefix (identity_, identity_size_);
+}
+
void zmq::zmq_engine_t::error ()
{
zmq_assert (inout);
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index ddd0931..174dd1a 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -47,6 +47,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
+ void traceroute (unsigned char *identity_, size_t identity_size_);
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index 37aafe6..9492caa 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -76,6 +76,12 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
zmq_msg_size (msg_));
received = true;
+ // Once the initial handshaking is over, XREP sockets should start
+ // tracerouting individual messages.
+ if (options.type == ZMQ_XREP)
+ engine->traceroute ((unsigned char*) peer_identity.data (),
+ peer_identity.size ());
+
return true;
}