summaryrefslogtreecommitdiff
path: root/src/rep.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/rep.cpp')
-rw-r--r--src/rep.cpp280
1 files changed, 280 insertions, 0 deletions
diff --git a/src/rep.cpp b/src/rep.cpp
new file mode 100644
index 0000000..34b77c4
--- /dev/null
+++ b/src/rep.cpp
@@ -0,0 +1,280 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../include/zmq.h"
+
+#include "rep.hpp"
+#include "err.hpp"
+#include "pipe.hpp"
+
+zmq::rep_t::rep_t (class app_thread_t *parent_) :
+ socket_base_t (parent_),
+ active (0),
+ current (0),
+ sending_reply (false),
+ more (false),
+ reply_pipe (NULL)
+{
+ options.requires_in = true;
+ options.requires_out = true;
+
+ // We don't need immediate connect. We'll be able to send messages
+ // (replies) only when connection is established and thus requests
+ // can arrive anyway.
+ options.immediate_connect = false;
+}
+
+zmq::rep_t::~rep_t ()
+{
+}
+
+void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_, const blob_t &peer_identity_)
+{
+ zmq_assert (inpipe_ && outpipe_);
+ zmq_assert (in_pipes.size () == out_pipes.size ());
+
+ in_pipes.push_back (inpipe_);
+ in_pipes.swap (active, in_pipes.size () - 1);
+ out_pipes.push_back (outpipe_);
+ out_pipes.swap (active, out_pipes.size () - 1);
+ active++;
+}
+
+void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ zmq_assert (sending_reply || !more || in_pipes [current] != pipe_);
+
+ zmq_assert (pipe_);
+ zmq_assert (in_pipes.size () == out_pipes.size ());
+
+ in_pipes_t::size_type index = in_pipes.index (pipe_);
+
+ if (index < active) {
+ active--;
+ if (current == active)
+ current = 0;
+ }
+
+ if (out_pipes [index])
+ out_pipes [index]->term ();
+ in_pipes.erase (index);
+ out_pipes.erase (index);
+}
+
+void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ zmq_assert (pipe_);
+ zmq_assert (in_pipes.size () == out_pipes.size ());
+
+ out_pipes_t::size_type index = out_pipes.index (pipe_);
+
+ // If the connection we've got the request from disconnects,
+ // there's nowhere to send the reply. Forget about the reply pipe.
+ // Once the reply is sent it will be dropped.
+ if (sending_reply && pipe_ == reply_pipe)
+ reply_pipe = NULL;
+
+ if (out_pipes.index (pipe_) < active) {
+ active--;
+ if (current == active)
+ current = 0;
+ }
+
+ if (in_pipes [index])
+ in_pipes [index]->term ();
+ in_pipes.erase (index);
+ out_pipes.erase (index);
+}
+
+void zmq::rep_t::xkill (class reader_t *pipe_)
+{
+ // Move the pipe to the list of inactive pipes.
+ in_pipes_t::size_type index = in_pipes.index (pipe_);
+ active--;
+ in_pipes.swap (index, active);
+ out_pipes.swap (index, active);
+}
+
+void zmq::rep_t::xrevive (class reader_t *pipe_)
+{
+ // Move the pipe to the list of active pipes.
+ in_pipes_t::size_type index = in_pipes.index (pipe_);
+ in_pipes.swap (index, active);
+ out_pipes.swap (index, active);
+ active++;
+}
+
+void zmq::rep_t::xrevive (class writer_t *pipe_)
+{
+}
+
+int zmq::rep_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
+{
+ if (!sending_reply) {
+ errno = EFSM;
+ return -1;
+ }
+
+ if (reply_pipe) {
+
+ // Push message to the reply pipe.
+ bool written = reply_pipe->write (msg_);
+ zmq_assert (!more || written);
+
+ // The pipe is full...
+ // When this happens, we simply return an error.
+ // This makes REP sockets vulnerable to DoS attack when
+ // misbehaving requesters stop collecting replies.
+ // TODO: Tear down the underlying connection (?)
+ if (!written) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ more = msg_->flags & ZMQ_MSG_MORE;
+ }
+ else {
+
+ // If the requester have disconnected in the meantime, drop the reply.
+ more = msg_->flags & ZMQ_MSG_MORE;
+ zmq_msg_close (msg_);
+ }
+
+ // Flush the reply to the requester.
+ if (!more) {
+ if (reply_pipe)
+ reply_pipe->flush ();
+ sending_reply = false;
+ reply_pipe = NULL;
+ }
+
+ // Detach the message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return 0;
+}
+
+int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
+{
+ // If we are in middle of sending a reply, we cannot receive next request.
+ if (sending_reply) {
+ errno = EFSM;
+ return -1;
+ }
+
+ // Deallocate old content of the message.
+ zmq_msg_close (msg_);
+
+ // We haven't started reading a request yet...
+ if (!more) {
+
+ // Round-robin over the pipes to get next message.
+ int count;
+ for (count = active; count != 0; count--) {
+ if (in_pipes [current]->read (msg_))
+ break;
+ current++;
+ if (current >= active)
+ current = 0;
+ }
+
+ // No message is available. Initialise the output parameter
+ // to be a 0-byte message.
+ if (count == 0) {
+ zmq_msg_init (msg_);
+ errno = EAGAIN;
+ return -1;
+ }
+
+ // We are aware of a new message now. Setup the reply pipe.
+ reply_pipe = out_pipes [current];
+
+ // Copy the routing info to the reply pipe.
+ while (true) {
+
+ // Push message to the reply pipe.
+ // TODO: What if the pipe is full?
+ // Tear down the underlying connection?
+ bool written = reply_pipe->write (msg_);
+ zmq_assert (written);
+
+ // Message part of zero size delimits the traceback stack.
+ if (zmq_msg_size (msg_) == 0)
+ break;
+
+ // Get next part of the message.
+ bool fetched = in_pipes [current]->read (msg_);
+ zmq_assert (fetched);
+ }
+ }
+
+ // Now the routing info is processed. Get the first part
+ // of the message payload and exit.
+ bool fetched = in_pipes [current]->read (msg_);
+ zmq_assert (fetched);
+ more = msg_->flags & ZMQ_MSG_MORE;
+ if (!more) {
+ current++;
+ if (current >= active)
+ current = 0;
+ sending_reply = true;
+ }
+ return 0;
+}
+
+bool zmq::rep_t::xhas_in ()
+{
+ if (sending_reply)
+ return false;
+
+ if (more)
+ return true;
+
+ for (int count = active; count != 0; count--) {
+ if (in_pipes [current]->check_read ())
+ return !sending_reply;
+ current++;
+ if (current >= active)
+ current = 0;
+ }
+
+ return false;
+}
+
+bool zmq::rep_t::xhas_out ()
+{
+ if (!sending_reply)
+ return false;
+
+ if (more)
+ return true;
+
+ // TODO: No check for write here...
+ return sending_reply;
+}
+