summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-08-07 11:24:07 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-08-25 15:39:20 +0200
commiteb7b8a413a99b2e43e8feee410f2b860e99e7056 (patch)
treec1631bb9a48b1e2a8d31e610e75761ab303ffa99 /src
parent3e97c0fef49e511dcae400e134876581cdae43f3 (diff)
REP socket layered on top of XREP socket
Diffstat (limited to 'src')
-rw-r--r--src/rep.cpp263
-rw-r--r--src/rep.hpp49
2 files changed, 45 insertions, 267 deletions
diff --git a/src/rep.cpp b/src/rep.cpp
index 7636d13..b2ada66 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -21,175 +21,36 @@
#include "rep.hpp"
#include "err.hpp"
-#include "pipe.hpp"
zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t slot_) :
- socket_base_t (parent_, slot_),
- active (0),
- current (0),
+ xrep_t (parent_, slot_),
sending_reply (false),
- more (false),
- reply_pipe (NULL)
+ request_begins (true)
{
- options.requires_in = true;
- options.requires_out = true;
-
- // We don't need immediate connect. We'll be able to send messages
- // (replies) only when connection is established and thus requests
- // can arrive anyway.
- options.immediate_connect = false;
}
zmq::rep_t::~rep_t ()
{
- zmq_assert (in_pipes.empty ());
- zmq_assert (out_pipes.empty ());
-}
-
-void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
- class writer_t *outpipe_, const blob_t &peer_identity_)
-{
- zmq_assert (inpipe_ && outpipe_);
- zmq_assert (in_pipes.size () == out_pipes.size ());
-
- inpipe_->set_event_sink (this);
- in_pipes.push_back (inpipe_);
- in_pipes.swap (active, in_pipes.size () - 1);
-
- outpipe_->set_event_sink (this);
- out_pipes.push_back (outpipe_);
- out_pipes.swap (active, out_pipes.size () - 1);
-
- active++;
-}
-
-void zmq::rep_t::xterm_pipes ()
-{
- for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++)
- in_pipes [i]->terminate ();
- for (out_pipes_t::size_type i = 0; i != out_pipes.size (); i++)
- out_pipes [i]->terminate ();
-}
-
-void zmq::rep_t::terminated (reader_t *pipe_)
-{
- // ???
- zmq_assert (sending_reply || !more || in_pipes [current] != pipe_);
-
- zmq_assert (pipe_);
- zmq_assert (in_pipes.size () == out_pipes.size ());
-
- in_pipes_t::size_type index = in_pipes.index (pipe_);
-
- if (index < active) {
- active--;
- if (current == active)
- current = 0;
- }
- in_pipes.erase (index);
-
- // ???
- if (!zombie) {
- if (out_pipes [index])
- out_pipes [index]->terminate ();
- out_pipes.erase (index);
- }
-}
-
-void zmq::rep_t::terminated (writer_t *pipe_)
-{
- zmq_assert (pipe_);
- zmq_assert (in_pipes.size () == out_pipes.size ());
-
- out_pipes_t::size_type index = out_pipes.index (pipe_);
-
- // If the connection we've got the request from disconnects,
- // there's nowhere to send the reply. Forget about the reply pipe.
- // Once the reply is sent it will be dropped.
- if (sending_reply && pipe_ == reply_pipe)
- reply_pipe = NULL;
-
- if (out_pipes.index (pipe_) < active) {
- active--;
- if (current == active)
- current = 0;
- }
-
- out_pipes.erase (index);
-
- // ???
- if (!zombie) {
- if (in_pipes [index])
- in_pipes [index]->terminate ();
- in_pipes.erase (index);
- }
-}
-
-bool zmq::rep_t::xhas_pipes ()
-{
- return !in_pipes.empty () || !out_pipes.empty ();
-}
-
-void zmq::rep_t::activated (reader_t *pipe_)
-{
- // Move the pipe to the list of active pipes.
- in_pipes_t::size_type index = in_pipes.index (pipe_);
- in_pipes.swap (index, active);
- out_pipes.swap (index, active);
- active++;
-}
-
-void zmq::rep_t::activated (writer_t *pipe_)
-{
- // TODO: What here?
- zmq_assert (false);
}
int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
{
+ // If we are in the middle of receiving a request, we cannot send reply.
if (!sending_reply) {
errno = EFSM;
return -1;
}
- if (reply_pipe) {
-
- // Push message to the reply pipe.
- bool written = reply_pipe->write (msg_);
- zmq_assert (!more || written);
+ bool more = (msg_->flags & ZMQ_MSG_MORE);
- // The pipe is full...
- // When this happens, we simply return an error.
- // This makes REP sockets vulnerable to DoS attack when
- // misbehaving requesters stop collecting replies.
- // TODO: Tear down the underlying connection (?)
- if (!written) {
+ // Push message to the reply pipe.
+ int rc = xrep_t::xsend (msg_, flags_);
+ if (rc != 0)
+ return rc;
- // TODO: The reply socket becomes deactivated here...
- errno = EAGAIN;
- return -1;
- }
-
- more = msg_->flags & ZMQ_MSG_MORE;
- }
- else {
-
- // If the requester have disconnected in the meantime, drop the reply.
- more = msg_->flags & ZMQ_MSG_MORE;
- zmq_msg_close (msg_);
- }
-
- // Flush the reply to the requester.
- if (!more) {
- if (reply_pipe)
- reply_pipe->flush ();
+ // If the reply is complete flip the FSM back to request receiving state.
+ if (!more)
sending_reply = false;
- reply_pipe = NULL;
- }
-
- // Detach the message from the data buffer.
- int rc = zmq_msg_init (msg_);
- zmq_assert (rc == 0);
return 0;
}
@@ -202,70 +63,44 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
return -1;
}
- // Deallocate old content of the message.
- zmq_msg_close (msg_);
+ if (request_begins) {
- // We haven't started reading a request yet...
- if (!more) {
+ // Copy the backtrace stack to the reply pipe.
+ bool bottom = false;
+ while (!bottom) {
- // Round-robin over the pipes to get next message.
- int count;
- for (count = active; count != 0; count--) {
- if (in_pipes [current]->read (msg_))
- break;
+ // TODO: What if request can be read but reply pipe is not
+ // ready for writing?
- // Move the pipe to the list of inactive pipes.
- active--;
- in_pipes.swap (current, active);
- out_pipes.swap (current, active);
+ // Get next part of the backtrace stack.
+ int rc = xrep_t::xrecv (msg_, flags_);
+ if (rc != 0)
+ return rc;
+ zmq_assert (msg_->flags & ZMQ_MSG_MORE);
- // Move to next pipe.
- current++;
- if (current >= active)
- current = 0;
- }
+ // Empty message part delimits the traceback stack.
+ bottom = (zmq_msg_size (msg_) == 0);
- // No message is available. Initialise the output parameter
- // to be a 0-byte message.
- if (count == 0) {
- zmq_msg_init (msg_);
- errno = EAGAIN;
- return -1;
+ // Push it to the reply pipe.
+ rc = xrep_t::xsend (msg_, flags_);
+ zmq_assert (rc == 0);
}
- // We are aware of a new message now. Setup the reply pipe.
- reply_pipe = out_pipes [current];
-
- // Copy the routing info to the reply pipe.
- while (true) {
-
- // Push message to the reply pipe.
- // TODO: What if the pipe is full?
- // Tear down the underlying connection?
- bool written = reply_pipe->write (msg_);
- zmq_assert (written);
-
- // Message part of zero size delimits the traceback stack.
- if (zmq_msg_size (msg_) == 0)
- break;
-
- // Get next part of the message.
- bool fetched = in_pipes [current]->read (msg_);
- zmq_assert (fetched);
- }
+ request_begins = false;
}
- // Now the routing info is processed. Get the first part
+ // Now the routing info is safely stored. Get the first part
// of the message payload and exit.
- bool fetched = in_pipes [current]->read (msg_);
- zmq_assert (fetched);
- more = msg_->flags & ZMQ_MSG_MORE;
- if (!more) {
- current++;
- if (current >= active)
- current = 0;
+ int rc = xrep_t::xrecv (msg_, flags_);
+ if (rc != 0)
+ return rc;
+
+ // If whole request is read, flip the FSM to reply-sending state.
+ if (!(msg_->flags & ZMQ_MSG_MORE)) {
sending_reply = true;
+ request_begins = true;
}
+
return 0;
}
@@ -274,25 +109,7 @@ bool zmq::rep_t::xhas_in ()
if (sending_reply)
return false;
- if (more)
- return true;
-
- for (int count = active; count != 0; count--) {
- if (in_pipes [current]->check_read ())
- return !sending_reply;
-
- // Move the pipe to the list of inactive pipes.
- active--;
- in_pipes.swap (current, active);
- out_pipes.swap (current, active);
-
- // Move to the next pipe.
- current++;
- if (current >= active)
- current = 0;
- }
-
- return false;
+ return xrep_t::xhas_in ();
}
bool zmq::rep_t::xhas_out ()
@@ -300,10 +117,6 @@ bool zmq::rep_t::xhas_out ()
if (!sending_reply)
return false;
- if (more)
- return true;
-
- // TODO: No check for write here...
- return sending_reply;
+ return xrep_t::xhas_out ();
}
diff --git a/src/rep.hpp b/src/rep.hpp
index 7d82a28..09eda02 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -20,17 +20,12 @@
#ifndef __ZMQ_REP_HPP_INCLUDED__
#define __ZMQ_REP_HPP_INCLUDED__
-#include "socket_base.hpp"
-#include "yarray.hpp"
-#include "pipe.hpp"
+#include "xrep.hpp"
namespace zmq
{
- class rep_t :
- public socket_base_t,
- public i_reader_events,
- public i_writer_events
+ class rep_t : public xrep_t
{
public:
@@ -38,50 +33,20 @@ namespace zmq
~rep_t ();
// Overloads of functions from socket_base_t.
- void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
- const blob_t &peer_identity_);
- void xterm_pipes ();
- bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
- // i_reader_events interface implementation.
- void activated (reader_t *pipe_);
- void terminated (reader_t *pipe_);
-
- // i_writer_events interface implementation.
- void activated (writer_t *pipe_);
- void terminated (writer_t *pipe_);
-
private:
- // List in outbound and inbound pipes. Note that the two lists are
- // always in sync. I.e. outpipe with index N communicates with the
- // same session as inpipe with index N.
- typedef yarray_t <writer_t> out_pipes_t;
- out_pipes_t out_pipes;
- typedef yarray_t <reader_t> in_pipes_t;
- in_pipes_t in_pipes;
-
- // Number of active inpipes. All the active inpipes are located at the
- // beginning of the in_pipes array.
- in_pipes_t::size_type active;
-
- // Index of the next inbound pipe to read a request from.
- in_pipes_t::size_type current;
-
- // If true, request was already received and reply wasn't completely
- // sent yet.
+ // If true, we are in process of sending the reply. If false we are
+ // in process of receiving a request.
bool sending_reply;
- // True, if message processed at the moment (either sent or received)
- // is processed only partially.
- bool more;
-
- // Pipe we are going to send reply to.
- writer_t *reply_pipe;
+ // If true, we are starting to receive a request. The beginning
+ // of the request is the backtrace stack.
+ bool request_begins;
rep_t (const rep_t&);
void operator = (const rep_t&);