diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/i_engine.hpp | 10 | ||||
-rw-r--r-- | src/options.cpp | 3 | ||||
-rw-r--r-- | src/options.hpp | 3 | ||||
-rw-r--r-- | src/pgm_receiver.cpp | 12 | ||||
-rw-r--r-- | src/pgm_receiver.hpp | 2 | ||||
-rw-r--r-- | src/pgm_sender.cpp | 12 | ||||
-rw-r--r-- | src/pgm_sender.hpp | 2 | ||||
-rw-r--r-- | src/session.cpp | 32 | ||||
-rw-r--r-- | src/session.hpp | 4 | ||||
-rw-r--r-- | src/xrep.cpp | 5 | ||||
-rw-r--r-- | src/zmq_decoder.cpp | 48 | ||||
-rw-r--r-- | src/zmq_decoder.hpp | 8 | ||||
-rw-r--r-- | src/zmq_encoder.cpp | 31 | ||||
-rw-r--r-- | src/zmq_encoder.hpp | 6 | ||||
-rw-r--r-- | src/zmq_engine.cpp | 10 | ||||
-rw-r--r-- | src/zmq_engine.hpp | 2 | ||||
-rw-r--r-- | src/zmq_init.cpp | 3 |
17 files changed, 47 insertions, 146 deletions
diff --git a/src/i_engine.hpp b/src/i_engine.hpp index bb5f391..ea6b850 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -22,8 +22,6 @@ #include <stddef.h> -#include "blob.hpp" - namespace zmq { @@ -41,13 +39,9 @@ namespace zmq // are messages to send available. virtual void revive () = 0; + // This method is called by the session to signalise that more + // messages can be written to the pipe. virtual void resume_input () = 0; - - // Engine should add the prefix supplied to all inbound messages. - virtual void add_prefix (const blob_t &identity_) = 0; - - // Engine should trim prefix from all the outbound messages. - virtual void trim_prefix () = 0; }; } diff --git a/src/options.cpp b/src/options.cpp index a713ede..6d12944 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -34,8 +34,7 @@ zmq::options_t::options_t () : rcvbuf (0), requires_in (false), requires_out (false), - immediate_connect (true), - traceroute (false) + immediate_connect (true) { } diff --git a/src/options.hpp b/src/options.hpp index eba8ab8..0dd2e18 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -61,9 +61,6 @@ namespace zmq // is not aware of the peer's identity, however, it is able to send // messages straight away. bool immediate_connect; - - // If true, socket requires tracerouting the messages. - bool traceroute; }; } diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 286fcc5..88b59d3 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -121,18 +121,6 @@ void zmq::pgm_receiver_t::resume_input () in_event (); } -void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_) -{ - // No need for tracerouting functionality in PGM socket at the moment. - zmq_assert (false); -} - -void zmq::pgm_receiver_t::trim_prefix () -{ - // No need for tracerouting functionality in PGM socket at the moment. - zmq_assert (false); -} - void zmq::pgm_receiver_t::in_event () { // Read data from the underlying pgm_socket. diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index becdfce..1b367bf 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -55,8 +55,6 @@ namespace zmq void unplug (); void revive (); void resume_input (); - void add_prefix (const blob_t &identity_); - void trim_prefix (); // i_poll_events interface implementation. void in_event (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 01eac2b..9aeb7a9 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -107,18 +107,6 @@ void zmq::pgm_sender_t::resume_input () zmq_assert (false); } -void zmq::pgm_sender_t::add_prefix (const blob_t &identity_) -{ - // No need for tracerouting functionality in PGM socket at the moment. - zmq_assert (false); -} - -void zmq::pgm_sender_t::trim_prefix () -{ - // No need for tracerouting functionality in PGM socket at the moment. - zmq_assert (false); -} - zmq::pgm_sender_t::~pgm_sender_t () { if (out_buffer) { diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 4b232b1..23a53bc 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -53,8 +53,6 @@ namespace zmq void unplug (); void revive (); void resume_input (); - void add_prefix (const blob_t &identity_); - void trim_prefix (); // i_poll_events interface implementation. void in_event (); diff --git a/src/session.cpp b/src/session.cpp index b99a370..9af03c8 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -28,6 +28,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_, const options_t &options_) : owned_t (parent_, owner_), in_pipe (NULL), + incomplete_in (false), active (true), out_pipe (NULL), engine (NULL), @@ -72,7 +73,11 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) if (!in_pipe || !active) return false; - return in_pipe->read (msg_); + if (!in_pipe->read (msg_)) + return false; + + incomplete_in = msg_->flags & ZMQ_MSG_TBC; + return true; } bool zmq::session_t::write (::zmq_msg_t *msg_) @@ -102,6 +107,26 @@ void zmq::session_t::detach (owned_t *reconnecter_) // Engine is terminating itself. No need to deallocate it from here. engine = NULL; + // Get rid of half-processed messages in the out pipe. Flush any + // unflushed messages upstream. + if (out_pipe) { + out_pipe->rollback (); + out_pipe->flush (); + } + + // Remove any half-read message from the in pipe. + if (in_pipe) { + while (incomplete_in) { + zmq_msg_t msg; + zmq_msg_init (&msg); + if (!read (&msg)) { + zmq_assert (!incomplete_in); + break; + } + zmq_msg_close (&msg); + } + } + // Terminate transient session. if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0)) term (); @@ -264,9 +289,4 @@ void zmq::session_t::process_attach (i_engine *engine_, zmq_assert (engine_); engine = engine_; engine->plug (this); - - // Once the initial handshaking is over tracerouting should trim prefixes - // from outbound messages. - if (options.traceroute) - engine->trim_prefix (); } diff --git a/src/session.hpp b/src/session.hpp index 25a0d12..9bda1ad 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -72,6 +72,10 @@ namespace zmq // Inbound pipe, i.e. one the session is getting messages from. class reader_t *in_pipe; + // This flag is true if the remainder of the message being processed + // is still in the in pipe. + bool incomplete_in; + // If true, in_pipe is active. Otherwise there are no messages to get. bool active; diff --git a/src/xrep.cpp b/src/xrep.cpp index 33b89bd..c70c3ac 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -33,9 +33,8 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) : // That way we are aware of the peer's identity when binding to the pipes. options.immediate_connect = false; - // XREP socket adds identity to inbound messages and strips identity - // from the outbound messages. - options.traceroute = true; + // XREP is unfunctional at the moment. Crash here! + zmq_assert (false); } zmq::xrep_t::~xrep_t () diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp index c7e20e1..8e335c9 100644 --- a/src/zmq_decoder.cpp +++ b/src/zmq_decoder.cpp @@ -45,11 +45,6 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_) destination = destination_; } -void zmq::zmq_decoder_t::add_prefix (const blob_t &prefix_) -{ - prefix = prefix_; -} - bool zmq::zmq_decoder_t::one_byte_size_ready () { // First byte of size is read. If it is 0xff read 8-byte size. @@ -64,19 +59,8 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - if (prefix.empty ()) { - int rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1); - errno_assert (rc == 0); - - } - else { - int rc = zmq_msg_init_size (&in_progress, - 1 + prefix.size () + *tmpbuf - 1); - errno_assert (rc == 0); - unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress); - *data = (unsigned char) prefix.size (); - memcpy (data + 1, prefix.data (), *data); - } + int rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1); + errno_assert (rc == 0); next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready); } return true; @@ -93,18 +77,8 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () // in_progress is initialised at this point so in theory we should // close it before calling zmq_msg_init_size, however, it's a 0-byte // message and thus we can treat it as uninitialised... - if (prefix.empty ()) { - int rc = zmq_msg_init_size (&in_progress, size - 1); - errno_assert (rc == 0); - } - else { - int rc = zmq_msg_init_size (&in_progress, - 1 + prefix.size () + size - 1); - errno_assert (rc == 0); - unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress); - *data = (unsigned char) prefix.size (); - memcpy (data + 1, prefix.data (), *data); - } + int rc = zmq_msg_init_size (&in_progress, size - 1); + errno_assert (rc == 0); next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready); return true; @@ -115,17 +89,9 @@ bool zmq::zmq_decoder_t::flags_ready () // Store the flags from the wire into the message structure. in_progress.flags = tmpbuf [0]; - if (prefix.empty ()) { - next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), - &zmq_decoder_t::message_ready); - } - else { - next_step ((unsigned char*) zmq_msg_data (&in_progress) + - prefix.size () + 1, - zmq_msg_size (&in_progress) - prefix.size () - 1, - &zmq_decoder_t::message_ready); - } - + next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), + &zmq_decoder_t::message_ready); + return true; } diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp index 2efed2a..c1e3e3e 100644 --- a/src/zmq_decoder.hpp +++ b/src/zmq_decoder.hpp @@ -33,17 +33,11 @@ namespace zmq { public: - // If prefix is not NULL, it will be glued to the beginning of every - // decoded message. zmq_decoder_t (size_t bufsize_); ~zmq_decoder_t (); void set_inout (struct i_inout *destination_); - // Once called, all decoded messages will be prefixed by the specified - // prefix. - void add_prefix (const blob_t &prefix_); - private: bool one_byte_size_ready (); @@ -55,8 +49,6 @@ namespace zmq unsigned char tmpbuf [8]; ::zmq_msg_t in_progress; - blob_t prefix; - zmq_decoder_t (const zmq_decoder_t&); void operator = (const zmq_decoder_t&); }; diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp index 774532d..dc28299 100644 --- a/src/zmq_encoder.cpp +++ b/src/zmq_encoder.cpp @@ -23,8 +23,7 @@ zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) : encoder_t <zmq_encoder_t> (bufsize_), - source (NULL), - trim (false) + source (NULL) { zmq_msg_init (&in_progress); @@ -42,25 +41,11 @@ void zmq::zmq_encoder_t::set_inout (i_inout *source_) source = source_; } -void zmq::zmq_encoder_t::trim_prefix () -{ - trim = true; -} - bool zmq::zmq_encoder_t::size_ready () { // Write message body into the buffer. - if (!trim) { - next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), - &zmq_encoder_t::message_ready, false); - } - else { - size_t prefix_size = *(unsigned char*) zmq_msg_data (&in_progress); - next_step ( - (unsigned char*) zmq_msg_data (&in_progress) + prefix_size + 1, - zmq_msg_size (&in_progress) - prefix_size - 1, - &zmq_encoder_t::message_ready, false); - } + next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), + &zmq_encoder_t::message_ready, false); return true; } @@ -78,16 +63,8 @@ bool zmq::zmq_encoder_t::message_ready () return false; } - // Get the message size. If the prefix is not to be sent, adjust the - // size accordingly. + // Get the message size. size_t size = zmq_msg_size (&in_progress); - if (trim) { - zmq_assert (size); - size_t prefix_size = - (*(unsigned char*) zmq_msg_data (&in_progress)) + 1; - zmq_assert (prefix_size <= size); - size -= prefix_size; - } // Account for the 'flags' byte. size++; diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp index a3bc4ac..61899f4 100644 --- a/src/zmq_encoder.hpp +++ b/src/zmq_encoder.hpp @@ -37,10 +37,6 @@ namespace zmq void set_inout (struct i_inout *source_); - // Once called, encoder will start trimming frefixes from outbound - // messages. - void trim_prefix (); - private: bool size_ready (); @@ -50,8 +46,6 @@ namespace zmq ::zmq_msg_t in_progress; unsigned char tmpbuf [10]; - bool trim; - zmq_encoder_t (const zmq_encoder_t&); void operator = (const zmq_encoder_t&); }; diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 8e0392c..8990b48 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -169,16 +169,6 @@ void zmq::zmq_engine_t::resume_input () in_event (); } -void zmq::zmq_engine_t::add_prefix (const blob_t &identity_) -{ - decoder.add_prefix (identity_); -} - -void zmq::zmq_engine_t::trim_prefix () -{ - encoder.trim_prefix (); -} - void zmq::zmq_engine_t::error () { zmq_assert (inout); diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index c4ef756..d89dccc 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -48,8 +48,6 @@ namespace zmq void unplug (); void revive (); void resume_input (); - void add_prefix (const blob_t &identity_); - void trim_prefix (); // i_poll_events interface implementation. void in_event (); diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index 840de85..5824f5c 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -85,8 +85,7 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_), zmq_msg_size (msg_)); } - if (options.traceroute) - engine->add_prefix (peer_identity); + received = true; return true; |