summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-04-27 17:36:00 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-04-27 17:36:00 +0200
commitad6fa9d0d4f1cf29ce63998d7efe337b1a784ef6 (patch)
tree3d938f5e634b8bf140d3717565623517be0a97bf
parent1ad6ade0ed465030716ce720077f3aa31e6cd136 (diff)
initial version of multi-hop REQ/REP
-rw-r--r--src/rep.cpp97
-rw-r--r--src/req.cpp28
-rw-r--r--src/xrep.cpp195
-rw-r--r--src/xrep.hpp35
4 files changed, 280 insertions, 75 deletions
diff --git a/src/rep.cpp b/src/rep.cpp
index f77dfce..6711509 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -167,15 +167,15 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
if (reply_pipe) {
- // Push message to the reply pipe.
+ // Push message to the reply pipe.
bool written = reply_pipe->write (msg_);
zmq_assert (!more || written);
- // The pipe is full...
- // When this happens, we simply return an error.
- // This makes REP sockets vulnerable to DoS attack when
- // misbehaving requesters stop collecting replies.
- // TODO: Tear down the underlying connection (?)
+ // The pipe is full...
+ // When this happens, we simply return an error.
+ // This makes REP sockets vulnerable to DoS attack when
+ // misbehaving requesters stop collecting replies.
+ // TODO: Tear down the underlying connection (?)
if (!written) {
errno = EAGAIN;
return -1;
@@ -185,12 +185,12 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
}
else {
- // If the requester have disconnected in the meantime, drop the reply.
+ // If the requester have disconnected in the meantime, drop the reply.
more = msg_->flags & ZMQ_MSG_MORE;
zmq_msg_close (msg_);
}
- // Flush the reply to the requester.
+ // Flush the reply to the requester.
if (!more) {
if (reply_pipe)
reply_pipe->flush ();
@@ -198,7 +198,7 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
reply_pipe = NULL;
}
- // Detach the message from the data buffer.
+ // Detach the message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
@@ -207,37 +207,70 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
{
- // Deallocate old content of the message.
- zmq_msg_close (msg_);
-
+ // If we are in middle of sending a reply, we cannot receive next request.
if (sending_reply) {
errno = EFSM;
return -1;
}
- // Round-robin over the pipes to get next message.
- for (int count = active; count != 0; count--) {
- bool fetched = in_pipes [current]->read (msg_);
- zmq_assert (!(more && !fetched));
-
- if (fetched) {
- more = msg_->flags & ZMQ_MSG_MORE;
- if (!more) {
- reply_pipe = out_pipes [current];
- sending_reply = true;
- current++;
- if (current >= active)
- current = 0;
- }
- return 0;
+ // Deallocate old content of the message.
+ zmq_msg_close (msg_);
+
+ // We haven't started reading a request yet...
+ if (!more) {
+
+ // Round-robin over the pipes to get next message.
+ int count;
+ for (count = active; count != 0; count--) {
+ if (in_pipes [current]->read (msg_))
+ break;
+ current++;
+ if (current >= active)
+ current = 0;
+ }
+
+ // No message is available. Initialise the output parameter
+ // to be a 0-byte message.
+ if (count == 0) {
+ zmq_msg_init (msg_);
+ errno = EAGAIN;
+ return -1;
+ }
+
+ // We are aware of a new message now. Setup the reply pipe.
+ reply_pipe = out_pipes [current];
+
+ // Copy the routing info to the reply pipe.
+ while (true) {
+
+ // Push message to the reply pipe.
+ // TODO: What if the pipe is full?
+ // Tear down the underlying connection?
+ bool written = reply_pipe->write (msg_);
+ zmq_assert (written);
+
+ // Message part of zero size delimits the traceback stack.
+ if (zmq_msg_size (msg_) == 0)
+ break;
+
+ // Get next part of the message.
+ bool fetched = in_pipes [current]->read (msg_);
+ zmq_assert (fetched);
}
}
- // No message is available. Initialise the output parameter
- // to be a 0-byte message.
- zmq_msg_init (msg_);
- errno = EAGAIN;
- return -1;
+ // Now the routing info is processed. Get the first part
+ // of the message payload and exit.
+ bool fetched = in_pipes [current]->read (msg_);
+ zmq_assert (fetched);
+ more = msg_->flags & ZMQ_MSG_MORE;
+ if (!more) {
+ current++;
+ if (current >= active)
+ current = 0;
+ sending_reply = true;
+ }
+ return 0;
}
bool zmq::rep_t::xhas_in ()
diff --git a/src/req.cpp b/src/req.cpp
index c8b7b98..969755b 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -190,7 +190,17 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
return -1;
}
- // Push message to the selected pipe.
+ // If we are starting to send the request, generate a prefix.
+ if (!more) {
+ zmq_msg_t prefix;
+ int rc = zmq_msg_init (&prefix);
+ zmq_assert (rc == 0);
+ prefix.flags |= ZMQ_MSG_MORE;
+ bool written = out_pipes [current]->write (&prefix);
+ zmq_assert (written);
+ }
+
+ // Push the message to the selected pipe.
bool written = out_pipes [current]->write (msg_);
zmq_assert (written);
more = msg_->flags & ZMQ_MSG_MORE;
@@ -218,7 +228,8 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
{
// Deallocate old content of the message.
- zmq_msg_close (msg_);
+ int rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
// If request wasn't send, we can't wait for reply.
if (!receiving_reply) {
@@ -234,6 +245,19 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
return -1;
}
+ // If we are starting to receive new reply, check whether prefix
+ // is well-formed and drop it.
+ if (!more) {
+ zmq_assert (msg_->flags & ZMQ_MSG_MORE);
+ zmq_assert (zmq_msg_size (msg_) == 0);
+ rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
+ }
+
+ // Get the actual reply.
+ bool recvd = reply_pipe->read (msg_);
+ zmq_assert (recvd);
+
// If this was last part of the reply, switch to request phase.
more = msg_->flags & ZMQ_MSG_MORE;
if (!more) {
diff --git a/src/xrep.cpp b/src/xrep.cpp
index c70c3ac..051a5ce 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -24,7 +24,11 @@
#include "pipe.hpp"
zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
- socket_base_t (parent_)
+ socket_base_t (parent_),
+ current_in (0),
+ more_in (false),
+ current_out (NULL),
+ more_out (false)
{
options.requires_in = true;
options.requires_out = true;
@@ -32,56 +36,96 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
// On connect, pipes are created only after initial handshaking.
// That way we are aware of the peer's identity when binding to the pipes.
options.immediate_connect = false;
-
- // XREP is unfunctional at the moment. Crash here!
- zmq_assert (false);
}
zmq::xrep_t::~xrep_t ()
{
+ for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); it++)
+ it->reader->term ();
+ for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end ();
+ it++)
+ it->second.writer->term ();
}
void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
- fq.attach (inpipe_);
// TODO: What if new connection has same peer identity as the old one?
+ outpipe_t outpipe = {outpipe_, true};
bool ok = outpipes.insert (std::make_pair (
- peer_identity_, outpipe_)).second;
+ peer_identity_, outpipe)).second;
zmq_assert (ok);
+
+ inpipe_t inpipe = {inpipe_, peer_identity_, true};
+ inpipes.push_back (inpipe);
}
void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
{
- zmq_assert (pipe_);
- fq.detach (pipe_);
+// TODO:!
+ for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
+ it++) {
+ if (it->reader == pipe_) {
+ inpipes.erase (it);
+ return;
+ }
+ }
+ zmq_assert (false);
}
void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)
{
for (outpipes_t::iterator it = outpipes.begin ();
- it != outpipes.end (); ++it)
- if (it->second == pipe_) {
+ it != outpipes.end (); ++it) {
+ if (it->second.writer == pipe_) {
outpipes.erase (it);
+ if (pipe_ == current_out)
+ current_out = NULL;
return;
}
+ }
zmq_assert (false);
}
void zmq::xrep_t::xkill (class reader_t *pipe_)
{
- fq.kill (pipe_);
+ for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
+ it++) {
+ if (it->reader == pipe_) {
+ zmq_assert (it->active);
+ it->active = false;
+ return;
+ }
+ }
+ zmq_assert (false);
}
void zmq::xrep_t::xrevive (class reader_t *pipe_)
{
- fq.revive (pipe_);
+ for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
+ it++) {
+ if (it->reader == pipe_) {
+ zmq_assert (!it->active);
+ it->active = true;
+ return;
+ }
+ }
+ zmq_assert (false);
}
void zmq::xrep_t::xrevive (class writer_t *pipe_)
{
+ for (outpipes_t::iterator it = outpipes.begin ();
+ it != outpipes.end (); ++it) {
+ if (it->second.writer == pipe_) {
+ zmq_assert (!it->second.active);
+ it->second.active = true;
+ return;
+ }
+ }
+ zmq_assert (false);
}
int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
@@ -93,33 +137,45 @@ int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
{
- unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
- size_t size = zmq_msg_size (msg_);
-
- // Check whether the message is well-formed.
- zmq_assert (size >= 1);
- zmq_assert (size_t (*data + 1) <= size);
-
- // Find the corresponding outbound pipe. If there's none, just drop the
- // message.
- // TODO: There's an allocation here! It's the critical path! Get rid of it!
- blob_t identity (data + 1, *data);
- outpipes_t::iterator it = outpipes.find (identity);
- if (it == outpipes.end ()) {
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
- rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
+ // If this is the first part of the message it's the identity of the
+ // peer to send the message to.
+ if (!more_out) {
+ zmq_assert (!current_out);
+
+ // There's no such thing as prefix with no subsequent message.
+ zmq_assert (msg_->flags & ZMQ_MSG_MORE);
+ more_out = true;
+
+ // Find the pipe associated with the identity stored in the prefix.
+ // If there's no such pipe just silently drop the message.
+ blob_t identity ((unsigned char*) zmq_msg_data (msg_),
+ zmq_msg_size (msg_));
+ outpipes_t::iterator it = outpipes.find (identity);
+ if (it == outpipes.end ())
+ return 0;
+
+ // Remember the outgoing pipe.
+ current_out = it->second.writer;
+
return 0;
}
- // Push message to the selected pipe.
- if (!it->second->write (msg_)) {
- errno = EAGAIN;
- return -1;
- }
+ // Check whether this is the last part of the message.
+ more_out = msg_->flags & ZMQ_MSG_MORE;
- it->second->flush ();
+ // Push the message into the pipe. If there's no out pipe, just drop it.
+ if (current_out) {
+ bool ok = current_out->write (msg_);
+ zmq_assert (ok);
+ if (!more_out) {
+ current_out->flush ();
+ current_out = NULL;
+ }
+ }
+ else {
+ int rc = zmq_msg_close (msg_);
+ zmq_assert (rc == 0);
+ }
// Detach the message from the data buffer.
int rc = zmq_msg_init (msg_);
@@ -130,12 +186,77 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
{
- return fq.recv (msg_, flags_);
+ // Deallocate old content of the message.
+ zmq_msg_close (msg_);
+
+ // If we are in the middle of reading a message, just grab next part of it.
+ if (more_in) {
+ zmq_assert (inpipes [current_in].active);
+ bool fetched = inpipes [current_in].reader->read (msg_);
+ zmq_assert (fetched);
+ more_in = msg_->flags & ZMQ_MSG_MORE;
+ if (!more_in) {
+ current_in++;
+ if (current_in >= inpipes.size ())
+ current_in = 0;
+ }
+ return 0;
+ }
+
+ // Round-robin over the pipes to get the next message.
+ for (int count = inpipes.size (); count != 0; count--) {
+
+ // Try to fetch new message.
+ bool fetched;
+ if (!inpipes [current_in].active)
+ fetched = false;
+ else
+ fetched = inpipes [current_in].reader->check_read ();
+
+ // If we have a message, create a prefix and return it to the caller.
+ if (fetched) {
+ int rc = zmq_msg_init_size (msg_,
+ inpipes [current_in].identity.size ());
+ zmq_assert (rc == 0);
+ memcpy (zmq_msg_data (msg_), inpipes [current_in].identity.data (),
+ zmq_msg_size (msg_));
+ more_in = true;
+ return 0;
+ }
+
+ // If me don't have a message, move to next pipe.
+ current_in++;
+ if (current_in >= inpipes.size ())
+ current_in = 0;
+ }
+
+ // No message is available. Initialise the output parameter
+ // to be a 0-byte message.
+ zmq_msg_init (msg_);
+ errno = EAGAIN;
+ return -1;
}
bool zmq::xrep_t::xhas_in ()
{
- return fq.has_in ();
+ // There are subsequent parts of the partly-read message available.
+ if (more_in)
+ return true;
+
+ // Note that messing with current doesn't break the fairness of fair
+ // queueing algorithm. If there are no messages available current will
+ // get back to its original value. Otherwise it'll point to the first
+ // pipe holding messages, skipping only pipes with no messages available.
+ for (int count = inpipes.size (); count != 0; count--) {
+ if (inpipes [current_in].active &&
+ inpipes [current_in].reader->check_read ())
+ return true;
+ current_in++;
+ if (current_in >= inpipes.size ())
+ current_in = 0;
+ }
+
+ return false;
}
bool zmq::xrep_t::xhas_out ()
diff --git a/src/xrep.hpp b/src/xrep.hpp
index c56a8f9..940d288 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -21,14 +21,15 @@
#define __ZMQ_XREP_HPP_INCLUDED__
#include <map>
+#include <vector>
#include "socket_base.hpp"
#include "blob.hpp"
-#include "fq.hpp"
namespace zmq
{
+ // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
class xrep_t : public socket_base_t
{
public:
@@ -52,13 +53,39 @@ namespace zmq
private:
- // Inbound messages are fair-queued.
- fq_t fq;
+ struct inpipe_t
+ {
+ class reader_t *reader;
+ blob_t identity;
+ bool active;
+ };
+
+ // Inbound pipes with the names of corresponging peers.
+ typedef std::vector <inpipe_t> inpipes_t;
+ inpipes_t inpipes;
+
+ // The pipe we are currently reading from.
+ inpipes_t::size_type current_in;
+
+ // If true, more incoming message parts are expected.
+ bool more_in;
+
+ struct outpipe_t
+ {
+ class writer_t *writer;
+ bool active;
+ };
// Outbound pipes indexed by the peer names.
- typedef std::map <blob_t, class writer_t*> outpipes_t;
+ typedef std::map <blob_t, outpipe_t> outpipes_t;
outpipes_t outpipes;
+ // The pipe we are currently writing to.
+ class writer_t *current_out;
+
+ // If true, more outgoing message parts are expected.
+ bool more_out;
+
xrep_t (const xrep_t&);
void operator = (const xrep_t&);
};