summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/i_engine.hpp10
-rw-r--r--src/options.cpp3
-rw-r--r--src/options.hpp3
-rw-r--r--src/pgm_receiver.cpp12
-rw-r--r--src/pgm_receiver.hpp2
-rw-r--r--src/pgm_sender.cpp12
-rw-r--r--src/pgm_sender.hpp2
-rw-r--r--src/session.cpp32
-rw-r--r--src/session.hpp4
-rw-r--r--src/xrep.cpp5
-rw-r--r--src/zmq_decoder.cpp48
-rw-r--r--src/zmq_decoder.hpp8
-rw-r--r--src/zmq_encoder.cpp31
-rw-r--r--src/zmq_encoder.hpp6
-rw-r--r--src/zmq_engine.cpp10
-rw-r--r--src/zmq_engine.hpp2
-rw-r--r--src/zmq_init.cpp3
17 files changed, 47 insertions, 146 deletions
diff --git a/src/i_engine.hpp b/src/i_engine.hpp
index bb5f391..ea6b850 100644
--- a/src/i_engine.hpp
+++ b/src/i_engine.hpp
@@ -22,8 +22,6 @@
#include <stddef.h>
-#include "blob.hpp"
-
namespace zmq
{
@@ -41,13 +39,9 @@ namespace zmq
// are messages to send available.
virtual void revive () = 0;
+ // This method is called by the session to signalise that more
+ // messages can be written to the pipe.
virtual void resume_input () = 0;
-
- // Engine should add the prefix supplied to all inbound messages.
- virtual void add_prefix (const blob_t &identity_) = 0;
-
- // Engine should trim prefix from all the outbound messages.
- virtual void trim_prefix () = 0;
};
}
diff --git a/src/options.cpp b/src/options.cpp
index a713ede..6d12944 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -34,8 +34,7 @@ zmq::options_t::options_t () :
rcvbuf (0),
requires_in (false),
requires_out (false),
- immediate_connect (true),
- traceroute (false)
+ immediate_connect (true)
{
}
diff --git a/src/options.hpp b/src/options.hpp
index eba8ab8..0dd2e18 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -61,9 +61,6 @@ namespace zmq
// is not aware of the peer's identity, however, it is able to send
// messages straight away.
bool immediate_connect;
-
- // If true, socket requires tracerouting the messages.
- bool traceroute;
};
}
diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp
index 286fcc5..88b59d3 100644
--- a/src/pgm_receiver.cpp
+++ b/src/pgm_receiver.cpp
@@ -121,18 +121,6 @@ void zmq::pgm_receiver_t::resume_input ()
in_event ();
}
-void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_)
-{
- // No need for tracerouting functionality in PGM socket at the moment.
- zmq_assert (false);
-}
-
-void zmq::pgm_receiver_t::trim_prefix ()
-{
- // No need for tracerouting functionality in PGM socket at the moment.
- zmq_assert (false);
-}
-
void zmq::pgm_receiver_t::in_event ()
{
// Read data from the underlying pgm_socket.
diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp
index becdfce..1b367bf 100644
--- a/src/pgm_receiver.hpp
+++ b/src/pgm_receiver.hpp
@@ -55,8 +55,6 @@ namespace zmq
void unplug ();
void revive ();
void resume_input ();
- void add_prefix (const blob_t &identity_);
- void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 01eac2b..9aeb7a9 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -107,18 +107,6 @@ void zmq::pgm_sender_t::resume_input ()
zmq_assert (false);
}
-void zmq::pgm_sender_t::add_prefix (const blob_t &identity_)
-{
- // No need for tracerouting functionality in PGM socket at the moment.
- zmq_assert (false);
-}
-
-void zmq::pgm_sender_t::trim_prefix ()
-{
- // No need for tracerouting functionality in PGM socket at the moment.
- zmq_assert (false);
-}
-
zmq::pgm_sender_t::~pgm_sender_t ()
{
if (out_buffer) {
diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp
index 4b232b1..23a53bc 100644
--- a/src/pgm_sender.hpp
+++ b/src/pgm_sender.hpp
@@ -53,8 +53,6 @@ namespace zmq
void unplug ();
void revive ();
void resume_input ();
- void add_prefix (const blob_t &identity_);
- void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/session.cpp b/src/session.cpp
index b99a370..9af03c8 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -28,6 +28,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
const options_t &options_) :
owned_t (parent_, owner_),
in_pipe (NULL),
+ incomplete_in (false),
active (true),
out_pipe (NULL),
engine (NULL),
@@ -72,7 +73,11 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
if (!in_pipe || !active)
return false;
- return in_pipe->read (msg_);
+ if (!in_pipe->read (msg_))
+ return false;
+
+ incomplete_in = msg_->flags & ZMQ_MSG_TBC;
+ return true;
}
bool zmq::session_t::write (::zmq_msg_t *msg_)
@@ -102,6 +107,26 @@ void zmq::session_t::detach (owned_t *reconnecter_)
// Engine is terminating itself. No need to deallocate it from here.
engine = NULL;
+ // Get rid of half-processed messages in the out pipe. Flush any
+ // unflushed messages upstream.
+ if (out_pipe) {
+ out_pipe->rollback ();
+ out_pipe->flush ();
+ }
+
+ // Remove any half-read message from the in pipe.
+ if (in_pipe) {
+ while (incomplete_in) {
+ zmq_msg_t msg;
+ zmq_msg_init (&msg);
+ if (!read (&msg)) {
+ zmq_assert (!incomplete_in);
+ break;
+ }
+ zmq_msg_close (&msg);
+ }
+ }
+
// Terminate transient session.
if (!ordinal && (peer_identity.empty () || peer_identity [0] == 0))
term ();
@@ -264,9 +289,4 @@ void zmq::session_t::process_attach (i_engine *engine_,
zmq_assert (engine_);
engine = engine_;
engine->plug (this);
-
- // Once the initial handshaking is over tracerouting should trim prefixes
- // from outbound messages.
- if (options.traceroute)
- engine->trim_prefix ();
}
diff --git a/src/session.hpp b/src/session.hpp
index 25a0d12..9bda1ad 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -72,6 +72,10 @@ namespace zmq
// Inbound pipe, i.e. one the session is getting messages from.
class reader_t *in_pipe;
+ // This flag is true if the remainder of the message being processed
+ // is still in the in pipe.
+ bool incomplete_in;
+
// If true, in_pipe is active. Otherwise there are no messages to get.
bool active;
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 33b89bd..c70c3ac 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -33,9 +33,8 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
// That way we are aware of the peer's identity when binding to the pipes.
options.immediate_connect = false;
- // XREP socket adds identity to inbound messages and strips identity
- // from the outbound messages.
- options.traceroute = true;
+ // XREP is unfunctional at the moment. Crash here!
+ zmq_assert (false);
}
zmq::xrep_t::~xrep_t ()
diff --git a/src/zmq_decoder.cpp b/src/zmq_decoder.cpp
index c7e20e1..8e335c9 100644
--- a/src/zmq_decoder.cpp
+++ b/src/zmq_decoder.cpp
@@ -45,11 +45,6 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_)
destination = destination_;
}
-void zmq::zmq_decoder_t::add_prefix (const blob_t &prefix_)
-{
- prefix = prefix_;
-}
-
bool zmq::zmq_decoder_t::one_byte_size_ready ()
{
// First byte of size is read. If it is 0xff read 8-byte size.
@@ -64,19 +59,8 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- if (prefix.empty ()) {
- int rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1);
- errno_assert (rc == 0);
-
- }
- else {
- int rc = zmq_msg_init_size (&in_progress,
- 1 + prefix.size () + *tmpbuf - 1);
- errno_assert (rc == 0);
- unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress);
- *data = (unsigned char) prefix.size ();
- memcpy (data + 1, prefix.data (), *data);
- }
+ int rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1);
+ errno_assert (rc == 0);
next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready);
}
return true;
@@ -93,18 +77,8 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
- if (prefix.empty ()) {
- int rc = zmq_msg_init_size (&in_progress, size - 1);
- errno_assert (rc == 0);
- }
- else {
- int rc = zmq_msg_init_size (&in_progress,
- 1 + prefix.size () + size - 1);
- errno_assert (rc == 0);
- unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress);
- *data = (unsigned char) prefix.size ();
- memcpy (data + 1, prefix.data (), *data);
- }
+ int rc = zmq_msg_init_size (&in_progress, size - 1);
+ errno_assert (rc == 0);
next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready);
return true;
@@ -115,17 +89,9 @@ bool zmq::zmq_decoder_t::flags_ready ()
// Store the flags from the wire into the message structure.
in_progress.flags = tmpbuf [0];
- if (prefix.empty ()) {
- next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
- &zmq_decoder_t::message_ready);
- }
- else {
- next_step ((unsigned char*) zmq_msg_data (&in_progress) +
- prefix.size () + 1,
- zmq_msg_size (&in_progress) - prefix.size () - 1,
- &zmq_decoder_t::message_ready);
- }
-
+ next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
+ &zmq_decoder_t::message_ready);
+
return true;
}
diff --git a/src/zmq_decoder.hpp b/src/zmq_decoder.hpp
index 2efed2a..c1e3e3e 100644
--- a/src/zmq_decoder.hpp
+++ b/src/zmq_decoder.hpp
@@ -33,17 +33,11 @@ namespace zmq
{
public:
- // If prefix is not NULL, it will be glued to the beginning of every
- // decoded message.
zmq_decoder_t (size_t bufsize_);
~zmq_decoder_t ();
void set_inout (struct i_inout *destination_);
- // Once called, all decoded messages will be prefixed by the specified
- // prefix.
- void add_prefix (const blob_t &prefix_);
-
private:
bool one_byte_size_ready ();
@@ -55,8 +49,6 @@ namespace zmq
unsigned char tmpbuf [8];
::zmq_msg_t in_progress;
- blob_t prefix;
-
zmq_decoder_t (const zmq_decoder_t&);
void operator = (const zmq_decoder_t&);
};
diff --git a/src/zmq_encoder.cpp b/src/zmq_encoder.cpp
index 774532d..dc28299 100644
--- a/src/zmq_encoder.cpp
+++ b/src/zmq_encoder.cpp
@@ -23,8 +23,7 @@
zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) :
encoder_t <zmq_encoder_t> (bufsize_),
- source (NULL),
- trim (false)
+ source (NULL)
{
zmq_msg_init (&in_progress);
@@ -42,25 +41,11 @@ void zmq::zmq_encoder_t::set_inout (i_inout *source_)
source = source_;
}
-void zmq::zmq_encoder_t::trim_prefix ()
-{
- trim = true;
-}
-
bool zmq::zmq_encoder_t::size_ready ()
{
// Write message body into the buffer.
- if (!trim) {
- next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
- &zmq_encoder_t::message_ready, false);
- }
- else {
- size_t prefix_size = *(unsigned char*) zmq_msg_data (&in_progress);
- next_step (
- (unsigned char*) zmq_msg_data (&in_progress) + prefix_size + 1,
- zmq_msg_size (&in_progress) - prefix_size - 1,
- &zmq_encoder_t::message_ready, false);
- }
+ next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
+ &zmq_encoder_t::message_ready, false);
return true;
}
@@ -78,16 +63,8 @@ bool zmq::zmq_encoder_t::message_ready ()
return false;
}
- // Get the message size. If the prefix is not to be sent, adjust the
- // size accordingly.
+ // Get the message size.
size_t size = zmq_msg_size (&in_progress);
- if (trim) {
- zmq_assert (size);
- size_t prefix_size =
- (*(unsigned char*) zmq_msg_data (&in_progress)) + 1;
- zmq_assert (prefix_size <= size);
- size -= prefix_size;
- }
// Account for the 'flags' byte.
size++;
diff --git a/src/zmq_encoder.hpp b/src/zmq_encoder.hpp
index a3bc4ac..61899f4 100644
--- a/src/zmq_encoder.hpp
+++ b/src/zmq_encoder.hpp
@@ -37,10 +37,6 @@ namespace zmq
void set_inout (struct i_inout *source_);
- // Once called, encoder will start trimming frefixes from outbound
- // messages.
- void trim_prefix ();
-
private:
bool size_ready ();
@@ -50,8 +46,6 @@ namespace zmq
::zmq_msg_t in_progress;
unsigned char tmpbuf [10];
- bool trim;
-
zmq_encoder_t (const zmq_encoder_t&);
void operator = (const zmq_encoder_t&);
};
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 8e0392c..8990b48 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -169,16 +169,6 @@ void zmq::zmq_engine_t::resume_input ()
in_event ();
}
-void zmq::zmq_engine_t::add_prefix (const blob_t &identity_)
-{
- decoder.add_prefix (identity_);
-}
-
-void zmq::zmq_engine_t::trim_prefix ()
-{
- encoder.trim_prefix ();
-}
-
void zmq::zmq_engine_t::error ()
{
zmq_assert (inout);
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index c4ef756..d89dccc 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -48,8 +48,6 @@ namespace zmq
void unplug ();
void revive ();
void resume_input ();
- void add_prefix (const blob_t &identity_);
- void trim_prefix ();
// i_poll_events interface implementation.
void in_event ();
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index 840de85..5824f5c 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -85,8 +85,7 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
}
- if (options.traceroute)
- engine->add_prefix (peer_identity);
+
received = true;
return true;