diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/downstream.cpp | 1 | ||||
-rw-r--r-- | src/i_engine.hpp | 8 | ||||
-rw-r--r-- | src/options.cpp | 1 | ||||
-rw-r--r-- | src/options.hpp | 3 | ||||
-rw-r--r-- | src/p2p.cpp | 1 | ||||
-rw-r--r-- | src/pgm_receiver.cpp | 9 | ||||
-rw-r--r-- | src/pgm_receiver.hpp | 1 | ||||
-rw-r--r-- | src/pgm_sender.cpp | 9 | ||||
-rw-r--r-- | src/pgm_sender.hpp | 1 | ||||
-rw-r--r-- | src/pub.cpp | 1 | ||||
-rw-r--r-- | src/rep.cpp | 1 | ||||
-rw-r--r-- | src/req.cpp | 1 | ||||
-rw-r--r-- | src/sub.cpp | 1 | ||||
-rw-r--r-- | src/upstream.cpp | 1 | ||||
-rw-r--r-- | src/xrep.cpp | 1 | ||||
-rw-r--r-- | src/xreq.cpp | 1 | ||||
-rw-r--r-- | src/zmq_decoder.cpp | 27 | ||||
-rw-r--r-- | src/zmq_decoder.hpp | 6 | ||||
-rw-r--r-- | src/zmq_encoder.cpp | 13 | ||||
-rw-r--r-- | src/zmq_encoder.hpp | 8 | ||||
-rw-r--r-- | src/zmq_engine.cpp | 11 | ||||
-rw-r--r-- | src/zmq_engine.hpp | 1 | ||||
-rw-r--r-- | src/zmq_init.cpp | 6 |
23 files changed, 88 insertions, 25 deletions
diff --git a/src/downstream.cpp b/src/downstream.cpp index 2da08e3..29b0689 100644 --- a/src/downstream.cpp +++ b/src/downstream.cpp @@ -26,6 +26,7 @@ zmq::downstream_t::downstream_t (class app_thread_t *parent_) : socket_base_t (parent_) { + options.type = ZMQ_DOWNSTREAM; options.requires_in = false; options.requires_out = true; } diff --git a/src/i_engine.hpp b/src/i_engine.hpp index d5d7811..bcb4297 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -20,6 +20,8 @@ #ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__ #define __ZMQ_I_ENGINE_HPP_INCLUDED__ +#include <stddef.h> + namespace zmq { @@ -36,6 +38,12 @@ namespace zmq // This method is called by the session to signalise that there // are messages to send available. virtual void revive () = 0; + + // Start tracing the message route. Engine should add the identity + // supplied to all inbound messages and trim identity from all the + // outbound messages. + virtual void traceroute (unsigned char *identity_, + size_t identity_size_) = 0; }; } diff --git a/src/options.cpp b/src/options.cpp index cc02798..f9d93d6 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -23,6 +23,7 @@ #include "err.hpp" zmq::options_t::options_t () : + type (-1), hwm (0), lwm (0), swap (0), diff --git a/src/options.hpp b/src/options.hpp index b066d48..dbe3701 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -34,6 +34,9 @@ namespace zmq int setsockopt (int option_, const void *optval_, size_t optvallen_); + // Type of the associated socket. One of the constants defined in zmq.h + int type; + int64_t hwm; int64_t lwm; int64_t swap; diff --git a/src/p2p.cpp b/src/p2p.cpp index 72bc26b..46bbd0b 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -29,6 +29,7 @@ zmq::p2p_t::p2p_t (class app_thread_t *parent_) : outpipe (NULL), alive (true) { + options.type = ZMQ_P2P; options.requires_in = true; options.requires_out = true; } diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index d0310cc..a2ba9c6 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -88,6 +88,13 @@ void zmq::pgm_receiver_t::revive () zmq_assert (false); } +void zmq::pgm_receiver_t::traceroute (unsigned char *identity_, + size_t identity_size_) +{ + // No need for tracerouting functionality in PGM socket at the moment. + zmq_assert (false); +} + void zmq::pgm_receiver_t::in_event () { // Read data from the underlying pgm_socket. @@ -151,7 +158,7 @@ void zmq::pgm_receiver_t::in_event () it->second.joined = true; // Create and connect decoder for the peer. - it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0); + it->second.decoder = new (std::nothrow) zmq_decoder_t (0); it->second.decoder->set_inout (inout); } diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 3d6a212..f03551f 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -54,6 +54,7 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); + void traceroute (unsigned char *identity_, size_t identity_size_); // i_poll_events interface implementation. void in_event (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 3f08d8e..fa7d7e0 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -36,7 +36,7 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, const options_t &options_) : io_object_t (parent_), - encoder (0, false), + encoder (0), pgm_socket (false, options_), options (options_), out_buffer (NULL), @@ -102,6 +102,13 @@ void zmq::pgm_sender_t::revive () out_event (); } +void zmq::pgm_sender_t::traceroute (unsigned char *identity_, + size_t identity_size_) +{ + // No need for tracerouting functionality in PGM socket at the moment. + zmq_assert (false); +} + zmq::pgm_sender_t::~pgm_sender_t () { if (out_buffer) { diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 43adb8b..89357f5 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -52,6 +52,7 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); + void traceroute (unsigned char *identity_, size_t identity_size_); // i_poll_events interface implementation. void in_event (); diff --git a/src/pub.cpp b/src/pub.cpp index 05bfdcf..9a2dcc6 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -27,6 +27,7 @@ zmq::pub_t::pub_t (class app_thread_t *parent_) : socket_base_t (parent_) { + options.type = ZMQ_PUB; options.requires_in = false; options.requires_out = true; } diff --git a/src/rep.cpp b/src/rep.cpp index b6bffae..b7685b4 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -30,6 +30,7 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) : waiting_for_reply (false), reply_pipe (NULL) { + options.type = ZMQ_REP; options.requires_in = true; options.requires_out = true; } diff --git a/src/req.cpp b/src/req.cpp index c9240e0..9b1766e 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -30,6 +30,7 @@ zmq::req_t::req_t (class app_thread_t *parent_) : reply_pipe_active (false), reply_pipe (NULL) { + options.type = ZMQ_REQ; options.requires_in = true; options.requires_out = true; } diff --git a/src/sub.cpp b/src/sub.cpp index 06ed896..31ee222 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -28,6 +28,7 @@ zmq::sub_t::sub_t (class app_thread_t *parent_) : socket_base_t (parent_), has_message (false) { + options.type = ZMQ_SUB; options.requires_in = true; options.requires_out = false; zmq_msg_init (&message); diff --git a/src/upstream.cpp b/src/upstream.cpp index bdcd5ef..390dcbe 100644 --- a/src/upstream.cpp +++ b/src/upstream.cpp @@ -25,6 +25,7 @@ zmq::upstream_t::upstream_t (class app_thread_t *parent_) : socket_base_t (parent_) { + options.type = ZMQ_UPSTREAM; options.requires_in = true; options.requires_out = false; } diff --git a/src/xrep.cpp b/src/xrep.cpp index 4cba21c..67a9a39 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -25,6 +25,7 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) : socket_base_t (parent_) { + options.type = ZMQ_XREP; options.requires_in = true; options.requires_out = true; } diff --git a/src/xreq.cpp b/src/xreq.cpp index a4310f8..691b66e 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -25,6 +25,7 @@ zmq::xreq_t::xreq_t (class app_thread_t *parent_) : socket_base_t (parent_) { + options.type = ZMQ_REQ; options.requires_in = true; options.requires_out = true; } diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index cc4f846..f502ffd 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -25,24 +25,14 @@ #include "wire.hpp" #include "err.hpp" -zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_, - void *prefix_, size_t prefix_size_) : +zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) : decoder_t <zmq_decoder_t> (bufsize_), - destination (NULL) + destination (NULL), + prefix (NULL), + prefix_size (0) { zmq_msg_init (&in_progress); - if (!prefix_) { - prefix = NULL; - prefix_size = 0; - } - else { - prefix = malloc (prefix_size_); - zmq_assert (prefix); - memcpy (prefix, prefix_, prefix_size_); - prefix_size = prefix_size_; - } - // At the beginning, read one byte and go to one_byte_size_ready state. next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready); } @@ -60,6 +50,15 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_) destination = destination_; } +void zmq::zmq_decoder_t::add_prefix (unsigned char *prefix_, + size_t prefix_size_) +{ + prefix = malloc (prefix_size_); + zmq_assert (prefix); + memcpy (prefix, prefix_, prefix_size_); + prefix_size = prefix_size_; +} + bool zmq::zmq_decoder_t::one_byte_size_ready () { // First byte of size is read. If it is 0xff read 8-byte size. diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp index 5d9133b..dfabece 100644 --- a/src/zmq_decoder.hpp +++ b/src/zmq_decoder.hpp @@ -34,11 +34,15 @@ namespace zmq // If prefix is not NULL, it will be glued to the beginning of every // decoded message. - zmq_decoder_t (size_t bufsize_, void *prefix_, size_t prefix_size_); + zmq_decoder_t (size_t bufsize_); ~zmq_decoder_t (); void set_inout (struct i_inout *destination_); + // Once called, all decoded messages will be prefixed by the specified + // prefix. + void add_prefix (unsigned char *prefix_, size_t prefix_size_); + private: bool one_byte_size_ready (); diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index 89649ef..a60fe5e 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -21,10 +21,10 @@ #include "i_inout.hpp" #include "wire.hpp" -zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_, bool trim_prefix_) : +zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) : encoder_t <zmq_encoder_t> (bufsize_), source (NULL), - trim_prefix (trim_prefix_) + trim (false) { zmq_msg_init (&in_progress); @@ -42,10 +42,15 @@ void zmq::zmq_encoder_t::set_inout (i_inout *source_) source = source_; } +void zmq::zmq_encoder_t::trim_prefix () +{ + trim = true; +} + bool zmq::zmq_encoder_t::size_ready () { // Write message body into the buffer. - if (!trim_prefix) { + if (!trim) { next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), &zmq_encoder_t::message_ready, false); } @@ -75,7 +80,7 @@ bool zmq::zmq_encoder_t::message_ready () // Get the message size. If the prefix is not to be sent, adjust the // size accordingly. size_t size = zmq_msg_size (&in_progress); - if (trim_prefix) + if (trim) size -= *(unsigned char*) zmq_msg_data (&in_progress); // For messages less than 255 bytes long, write one byte of message size. diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp index 8eedef8..0e23fcd 100644 --- a/src/zmq_encoder.hpp +++ b/src/zmq_encoder.hpp @@ -32,11 +32,15 @@ namespace zmq { public: - zmq_encoder_t (size_t bufsize_, bool trim_prefix_); + zmq_encoder_t (size_t bufsize_); ~zmq_encoder_t (); void set_inout (struct i_inout *source_); + // Once called, encoder will start trimming frefixes from outbound + // messages. + void trim_prefix (); + private: bool size_ready (); @@ -46,7 +50,7 @@ namespace zmq ::zmq_msg_t in_progress; unsigned char tmpbuf [9]; - bool trim_prefix; + bool trim; zmq_encoder_t (const zmq_encoder_t&); void operator = (const zmq_encoder_t&); diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 01fe98c..bda098c 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -32,10 +32,10 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, io_object_t (parent_), inpos (NULL), insize (0), - decoder (in_batch_size, NULL, 0), + decoder (in_batch_size), outpos (NULL), outsize (0), - encoder (out_batch_size, false), + encoder (out_batch_size), inout (NULL), options (options_), reconnect (reconnect_) @@ -160,6 +160,13 @@ void zmq::zmq_engine_t::revive () out_event (); } +void zmq::zmq_engine_t::traceroute (unsigned char *identity_, + size_t identity_size_) +{ + encoder.trim_prefix (); + decoder.add_prefix (identity_, identity_size_); +} + void zmq::zmq_engine_t::error () { zmq_assert (inout); diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index ddd0931..174dd1a 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -47,6 +47,7 @@ namespace zmq void plug (struct i_inout *inout_); void unplug (); void revive (); + void traceroute (unsigned char *identity_, size_t identity_size_); // i_poll_events interface implementation. void in_event (); diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index 37aafe6..9492caa 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -76,6 +76,12 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) zmq_msg_size (msg_)); received = true; + // Once the initial handshaking is over, XREP sockets should start + // tracerouting individual messages. + if (options.type == ZMQ_XREP) + engine->traceroute ((unsigned char*) peer_identity.data (), + peer_identity.size ()); + return true; } |