summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-04-15 13:10:41 +0200
committerMartin Sustrik <sustrik@250bpm.com>2012-04-17 07:30:52 +0200
commit443d06f894751062da6d69238ce09f6fbfc27577 (patch)
tree44d21141df628f1cba965b7466267faa51f9d34c /src
parent692688206d5061de2a2b6a3d3040318dc537f221 (diff)
"Survey" pattern implemented
Survey pattern is "multicast with reply". There are two roles: surveyor and respondent. Surveyor publishes a survey which gets delivered to all connected respondents. Each repondent can send a response to the survey. All the responses are delivered to the original surveyor. Once the surveyor decides that the survey is over (e.g. deadline was reached) it can send initiate survey. Late responses from old surveys are automatically discarded by the surveyor socket. Socket types: SURVEYOR, XSURVEYOR, RESPONDENT, XRESPONDENT Patch also includes a test program with surveoyr, two respondents and an intermediary device. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am8
-rw-r--r--src/respondent.cpp114
-rw-r--r--src/respondent.hpp77
-rw-r--r--src/session_base.cpp20
-rw-r--r--src/socket_base.cpp16
-rw-r--r--src/surveyor.cpp135
-rw-r--r--src/surveyor.hpp77
-rw-r--r--src/xrespondent.cpp278
-rw-r--r--src/xrespondent.hpp123
-rw-r--r--src/xsurveyor.cpp89
-rw-r--r--src/xsurveyor.hpp85
11 files changed, 1022 insertions, 0 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 81ece53..3025e13 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -64,6 +64,7 @@ libxs_la_SOURCES = \
reaper.hpp \
rep.hpp \
req.hpp \
+ respondent.hpp \
select.hpp \
session_base.hpp \
signaler.hpp \
@@ -71,6 +72,7 @@ libxs_la_SOURCES = \
stdint.hpp \
stream_engine.hpp \
sub.hpp \
+ surveyor.hpp \
tcp_address.hpp \
tcp_connecter.hpp \
tcp_listener.hpp \
@@ -81,7 +83,9 @@ libxs_la_SOURCES = \
xpub.hpp \
xrep.hpp \
xreq.hpp \
+ xrespondent.hpp \
xsub.hpp \
+ xsurveyor.hpp \
ypipe.hpp \
yqueue.hpp \
clock.cpp \
@@ -121,12 +125,14 @@ libxs_la_SOURCES = \
random.cpp \
rep.cpp \
req.cpp \
+ respondent.cpp \
select.cpp \
session_base.cpp \
signaler.cpp \
socket_base.cpp \
stream_engine.cpp \
sub.cpp \
+ surveyor.cpp \
tcp_address.cpp \
tcp_connecter.cpp \
tcp_listener.cpp \
@@ -135,7 +141,9 @@ libxs_la_SOURCES = \
xpub.cpp \
xrep.cpp \
xreq.cpp \
+ xrespondent.cpp \
xsub.cpp \
+ xsurveyor.cpp \
xs.cpp
if ON_MINGW
diff --git a/src/respondent.cpp b/src/respondent.cpp
new file mode 100644
index 0000000..c2abd84
--- /dev/null
+++ b/src/respondent.cpp
@@ -0,0 +1,114 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "respondent.hpp"
+#include "err.hpp"
+#include "msg.hpp"
+
+xs::respondent_t::respondent_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
+ xrespondent_t (parent_, tid_, sid_),
+ sending_reply (false)
+{
+ options.type = XS_RESPONDENT;
+}
+
+xs::respondent_t::~respondent_t ()
+{
+}
+
+int xs::respondent_t::xsend (msg_t *msg_, int flags_)
+{
+ // If there's no ongoing survey, we cannot send reply.
+ if (!sending_reply) {
+ errno = EFSM;
+ return -1;
+ }
+
+ // Survey pattern doesn't support multipart messages.
+ if (msg_->flags () & msg_t::more || flags_ & XS_SNDMORE) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ // Push message to the reply pipe.
+ int rc = xrespondent_t::xsend (msg_, flags_);
+ if (rc != 0)
+ return rc;
+
+ // Flip the FSM back to request receiving state.
+ sending_reply = false;
+
+ return 0;
+}
+
+int xs::respondent_t::xrecv (msg_t *msg_, int flags_)
+{
+ // If we are in middle of sending a reply, we cannot receive next survey.
+ if (sending_reply) {
+ errno = EFSM;
+ return -1;
+ }
+
+ // First thing to do when receiving a srvey is to copy all the labels
+ // to the reply pipe.
+ while (true) {
+ int rc = xrespondent_t::xrecv (msg_, flags_);
+ if (rc != 0)
+ return rc;
+ if (!(msg_->flags () & msg_t::more))
+ break;
+ rc = xrespondent_t::xsend (msg_, flags_);
+ errno_assert (rc == 0);
+ }
+
+ // When whole survey is read, flip the FSM to reply-sending state.
+ sending_reply = true;
+
+ return 0;
+}
+
+bool xs::respondent_t::xhas_in ()
+{
+ if (sending_reply)
+ return false;
+
+ return xrespondent_t::xhas_in ();
+}
+
+bool xs::respondent_t::xhas_out ()
+{
+ if (!sending_reply)
+ return false;
+
+ return xrespondent_t::xhas_out ();
+}
+
+xs::respondent_session_t::respondent_session_t (io_thread_t *io_thread_,
+ bool connect_, socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ xrespondent_session_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+xs::respondent_session_t::~respondent_session_t ()
+{
+}
+
diff --git a/src/respondent.hpp b/src/respondent.hpp
new file mode 100644
index 0000000..b315350
--- /dev/null
+++ b/src/respondent.hpp
@@ -0,0 +1,77 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_RESPONDENT_HPP_INCLUDED__
+#define __XS_RESPONDENT_HPP_INCLUDED__
+
+#include "xrespondent.hpp"
+
+namespace xs
+{
+
+ class ctx_t;
+ class msg_t;
+ class io_thread_t;
+ class socket_base_t;
+
+ class respondent_t : public xrespondent_t
+ {
+ public:
+
+ respondent_t (xs::ctx_t *parent_, uint32_t tid_, int sid_);
+ ~respondent_t ();
+
+ // Overloads of functions from socket_base_t.
+ int xsend (xs::msg_t *msg_, int flags_);
+ int xrecv (xs::msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ // If true, we are in process of sending the reply. If false we are
+ // in process of receiving a request.
+ // TODO: Consider whether automatic cancelling of the previous request
+ // when recv is called again is not a better idea than returning EFSM.
+ bool sending_reply;
+
+ respondent_t (const respondent_t&);
+ const respondent_t &operator = (const respondent_t&);
+
+ };
+
+ class respondent_session_t : public xrespondent_session_t
+ {
+ public:
+
+ respondent_session_t (xs::io_thread_t *io_thread_, bool connect_,
+ xs::socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~respondent_session_t ();
+
+ private:
+
+ respondent_session_t (const respondent_session_t&);
+ const respondent_session_t &operator = (const respondent_session_t&);
+ };
+
+}
+
+#endif
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 90fa071..35c4a4e 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -42,6 +42,10 @@
#include "push.hpp"
#include "pull.hpp"
#include "pair.hpp"
+#include "surveyor.hpp"
+#include "xsurveyor.hpp"
+#include "respondent.hpp"
+#include "xrespondent.hpp"
xs::session_base_t *xs::session_base_t::create (class io_thread_t *io_thread_,
bool connect_, class socket_base_t *socket_, const options_t &options_,
@@ -93,6 +97,22 @@ xs::session_base_t *xs::session_base_t::create (class io_thread_t *io_thread_,
s = new (std::nothrow) pair_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
+ case XS_SURVEYOR:
+ s = new (std::nothrow) surveyor_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case XS_XSURVEYOR:
+ s = new (std::nothrow) xsurveyor_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case XS_RESPONDENT:
+ s = new (std::nothrow) respondent_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case XS_XRESPONDENT:
+ s = new (std::nothrow) xrespondent_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
default:
errno = EINVAL;
return NULL;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index eb9b491..df182f1 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -61,6 +61,10 @@
#include "xrep.hpp"
#include "xpub.hpp"
#include "xsub.hpp"
+#include "surveyor.hpp"
+#include "xsurveyor.hpp"
+#include "respondent.hpp"
+#include "xrespondent.hpp"
bool xs::socket_base_t::check_tag ()
{
@@ -106,6 +110,18 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_,
case XS_XSUB:
s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
break;
+ case XS_SURVEYOR:
+ s = new (std::nothrow) surveyor_t (parent_, tid_, sid_);
+ break;
+ case XS_XSURVEYOR:
+ s = new (std::nothrow) xsurveyor_t (parent_, tid_, sid_);
+ break;
+ case XS_RESPONDENT:
+ s = new (std::nothrow) respondent_t (parent_, tid_, sid_);
+ break;
+ case XS_XRESPONDENT:
+ s = new (std::nothrow) xrespondent_t (parent_, tid_, sid_);
+ break;
default:
errno = EINVAL;
return NULL;
diff --git a/src/surveyor.cpp b/src/surveyor.cpp
new file mode 100644
index 0000000..c5216bd
--- /dev/null
+++ b/src/surveyor.cpp
@@ -0,0 +1,135 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "surveyor.hpp"
+#include "err.hpp"
+#include "msg.hpp"
+#include "wire.hpp"
+#include "likely.hpp"
+#include "random.hpp"
+
+xs::surveyor_t::surveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
+ xsurveyor_t (parent_, tid_, sid_),
+ receiving_responses (false),
+ survey_id (generate_random ())
+{
+ options.type = XS_SURVEYOR;
+}
+
+xs::surveyor_t::~surveyor_t ()
+{
+}
+
+int xs::surveyor_t::xsend (msg_t *msg_, int flags_)
+{
+ int rc;
+
+ // Survey pattern works only with sigle-part messages.
+ if (flags_ & XS_SNDMORE || msg_->flags () & msg_t::more) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ // Start the new survey. First, generate new survey ID.
+ ++survey_id;
+ msg_t id;
+ rc = id.init_size (4);
+ errno_assert (rc == 0);
+ put_uint32 ((unsigned char*) id.data (), survey_id);
+ id.set_flags (msg_t::more);
+ rc = xsurveyor_t::xsend (&id, 0);
+ if (rc != 0) {
+ id.close ();
+ return -1;
+ }
+ id.close ();
+
+ // Now send the body of the survey.
+ rc = xsurveyor_t::xsend (msg_, flags_);
+ errno_assert (rc == 0);
+
+ // Start waiting for responses from the peers.
+ receiving_responses = true;
+
+ return 0;
+}
+
+int xs::surveyor_t::xrecv (msg_t *msg_, int flags_)
+{
+ int rc;
+
+ // If there's no survey underway, it's an error.
+ if (!receiving_responses) {
+ errno = EFSM;
+ return -1;
+ }
+
+ // Get the first part of the response -- the survey ID.
+ rc = xsurveyor_t::xrecv (msg_, flags_);
+ if (rc != 0)
+ return rc;
+
+ // Check whether this is response for the onging survey. If not, we can
+ // drop the response.
+ if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 4 ||
+ get_uint32 ((unsigned char*) msg_->data ()) != survey_id)) {
+ while (true) {
+ rc = xsurveyor_t::xrecv (msg_, flags_);
+ errno_assert (rc == 0);
+ if (!(msg_->flags () & msg_t::more))
+ break;
+ }
+ msg_->close ();
+ msg_->init ();
+ errno = EAGAIN;
+ return -1;
+ }
+
+ // Get the body of the response.
+ rc = xsurveyor_t::xrecv (msg_, flags_);
+ errno_assert (rc == 0);
+
+ return 0;
+}
+
+bool xs::surveyor_t::xhas_in ()
+{
+ // TODO: We need a prefetched_message here...
+ xs_assert (false);
+ return false;
+}
+
+bool xs::surveyor_t::xhas_out ()
+{
+ return xsurveyor_t::xhas_out ();
+}
+
+xs::surveyor_session_t::surveyor_session_t (io_thread_t *io_thread_,
+ bool connect_, socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ xsurveyor_session_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+xs::surveyor_session_t::~surveyor_session_t ()
+{
+}
+
diff --git a/src/surveyor.hpp b/src/surveyor.hpp
new file mode 100644
index 0000000..83fdbb0
--- /dev/null
+++ b/src/surveyor.hpp
@@ -0,0 +1,77 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_SURVEYOR_HPP_INCLUDED__
+#define __XS_SURVEYOR_HPP_INCLUDED__
+
+#include "xsurveyor.hpp"
+#include "stdint.hpp"
+
+namespace xs
+{
+
+ class ctx_t;
+ class msg_t;
+ class io_thread_t;
+ class socket_base_t;
+
+ class surveyor_t : public xsurveyor_t
+ {
+ public:
+
+ surveyor_t (xs::ctx_t *parent_, uint32_t tid_, int sid_);
+ ~surveyor_t ();
+
+ // Overloads of functions from socket_base_t.
+ int xsend (xs::msg_t *msg_, int flags_);
+ int xrecv (xs::msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ // If true, survey was already lauched and have no expriered yet.
+ bool receiving_responses;
+
+ // The ID of the ongoing survey.
+ uint32_t survey_id;
+
+ surveyor_t (const surveyor_t&);
+ const surveyor_t &operator = (const surveyor_t&);
+ };
+
+ class surveyor_session_t : public xsurveyor_session_t
+ {
+ public:
+
+ surveyor_session_t (xs::io_thread_t *io_thread_, bool connect_,
+ xs::socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~surveyor_session_t ();
+
+ private:
+
+ surveyor_session_t (const surveyor_session_t&);
+ const surveyor_session_t &operator = (const surveyor_session_t&);
+ };
+
+}
+
+#endif
diff --git a/src/xrespondent.cpp b/src/xrespondent.cpp
new file mode 100644
index 0000000..026fe8d
--- /dev/null
+++ b/src/xrespondent.cpp
@@ -0,0 +1,278 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "xrespondent.hpp"
+#include "pipe.hpp"
+#include "wire.hpp"
+#include "random.hpp"
+#include "likely.hpp"
+#include "err.hpp"
+
+xs::xrespondent_t::xrespondent_t (class ctx_t *parent_, uint32_t tid_,
+ int sid_) :
+ socket_base_t (parent_, tid_, sid_),
+ prefetched (0),
+ more_in (false),
+ current_out (NULL),
+ more_out (false),
+ next_peer_id (generate_random ())
+{
+ options.type = XS_XRESPONDENT;
+
+ prefetched_msg.init ();
+}
+
+xs::xrespondent_t::~xrespondent_t ()
+{
+ xs_assert (outpipes.empty ());
+ prefetched_msg.close ();
+}
+
+void xs::xrespondent_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
+{
+ xs_assert (pipe_);
+
+ // Add the pipe to the map out outbound pipes.
+ outpipe_t outpipe = {pipe_, true};
+ bool ok = outpipes.insert (outpipes_t::value_type (
+ next_peer_id, outpipe)).second;
+ xs_assert (ok);
+
+ // Add the pipe to the list of inbound pipes.
+ blob_t identity (4, 0);
+ put_uint32 ((unsigned char*) identity.data (), next_peer_id);
+ pipe_->set_identity (identity);
+ fq.attach (pipe_);
+
+ // Generate a new unique peer identity.
+ ++next_peer_id;
+}
+
+void xs::xrespondent_t::xterminated (pipe_t *pipe_)
+{
+ fq.terminated (pipe_);
+
+ for (outpipes_t::iterator it = outpipes.begin ();
+ it != outpipes.end (); ++it) {
+ if (it->second.pipe == pipe_) {
+ outpipes.erase (it);
+ if (pipe_ == current_out)
+ current_out = NULL;
+ return;
+ }
+ }
+ xs_assert (false);
+}
+
+void xs::xrespondent_t::xread_activated (pipe_t *pipe_)
+{
+ fq.activated (pipe_);
+}
+
+void xs::xrespondent_t::xwrite_activated (pipe_t *pipe_)
+{
+ for (outpipes_t::iterator it = outpipes.begin ();
+ it != outpipes.end (); ++it) {
+ if (it->second.pipe == pipe_) {
+ xs_assert (!it->second.active);
+ it->second.active = true;
+ return;
+ }
+ }
+ xs_assert (false);
+}
+
+int xs::xrespondent_t::xsend (msg_t *msg_, int flags_)
+{
+ // If this is the first part of the message it's the ID of the
+ // peer to send the message to.
+ if (!more_out) {
+ xs_assert (!current_out);
+
+ // If we have malformed message (prefix with no subsequent message)
+ // then just silently ignore it.
+ // TODO: The connections should be killed instead.
+ if (msg_->flags () & msg_t::more && msg_->size () == 4) {
+
+ more_out = true;
+
+ // Find the pipe associated with the identity stored in the prefix.
+ // If there's no such pipe just silently ignore the message.
+ uint32_t identity = get_uint32 ((unsigned char*) msg_->data ());
+ outpipes_t::iterator it = outpipes.find (identity);
+
+ if (it != outpipes.end ()) {
+ current_out = it->second.pipe;
+ msg_t empty;
+ int rc = empty.init ();
+ errno_assert (rc == 0);
+ if (!current_out->check_write (&empty)) {
+ it->second.active = false;
+ more_out = false;
+ current_out = NULL;
+ }
+ rc = empty.close ();
+ errno_assert (rc == 0);
+ }
+
+ }
+
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ rc = msg_->init ();
+ errno_assert (rc == 0);
+ return 0;
+ }
+
+ // Check whether this is the last part of the message.
+ more_out = msg_->flags () & msg_t::more ? true : false;
+
+ // Push the message into the pipe. If there's no out pipe, just drop it.
+ if (current_out) {
+ bool ok = current_out->write (msg_);
+ if (unlikely (!ok))
+ current_out = NULL;
+ else if (!more_out) {
+ current_out->flush ();
+ current_out = NULL;
+ }
+ }
+ else {
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ }
+
+ // Detach the message from the data buffer.
+ int rc = msg_->init ();
+ errno_assert (rc == 0);
+
+ return 0;
+}
+
+int xs::xrespondent_t::xrecv (msg_t *msg_, int flags_)
+{
+ int rc;
+
+ // if there is a prefetched identity, return it.
+ if (prefetched == 2)
+ {
+ rc = msg_->init_size (prefetched_id.size ());
+ errno_assert (rc == 0);
+ memcpy (msg_->data (), prefetched_id.data (), prefetched_id.size ());
+ msg_->set_flags (msg_t::more);
+ prefetched = 1;
+ return 0;
+ }
+
+ // If there is a prefetched message, return it.
+ if (prefetched == 1) {
+ rc = msg_->move (prefetched_msg);
+ errno_assert (rc == 0);
+ more_in = msg_->flags () & msg_t::more ? true : false;
+ prefetched = 0;
+ return 0;
+ }
+
+ // Get next message part.
+ pipe_t *pipe = NULL;
+ rc = fq.recvpipe (msg_, flags_, &pipe);
+ if (rc != 0)
+ return -1;
+
+ // If we are in the middle of reading a message, just return the next part.
+ if (more_in) {
+ more_in = msg_->flags () & msg_t::more ? true : false;
+ return 0;
+ }
+
+ // We are at the beginning of a new message. Move the message part we
+ // have to the prefetched and return the ID of the peer instead.
+ rc = prefetched_msg.move (*msg_);
+ errno_assert (rc == 0);
+ prefetched = 1;
+ rc = msg_->close ();
+ errno_assert (rc == 0);
+
+ blob_t identity = pipe->get_identity ();
+ rc = msg_->init_size (identity.size ());
+ errno_assert (rc == 0);
+ memcpy (msg_->data (), identity.data (), identity.size ());
+ msg_->set_flags (msg_t::more);
+ return 0;
+}
+
+int xs::xrespondent_t::rollback (void)
+{
+ if (current_out) {
+ current_out->rollback ();
+ current_out = NULL;
+ more_out = false;
+ }
+ return 0;
+}
+
+bool xs::xrespondent_t::xhas_in ()
+{
+ // If we are in the middle of reading the messages, there are
+ // definitely more parts available.
+ if (more_in)
+ return true;
+
+ // We may already have a message pre-fetched.
+ if (prefetched)
+ return true;
+
+ // Try to read the next message to the pre-fetch buffer. If anything,
+ // it will be identity of the peer sending the message.
+ msg_t id;
+ id.init ();
+ int rc = xrespondent_t::xrecv (&id, XS_DONTWAIT);
+ if (rc != 0 && errno == EAGAIN) {
+ id.close ();
+ return false;
+ }
+ xs_assert (rc == 0);
+
+ // We have first part of the message prefetched now. We will store the
+ // prefetched identity as well.
+ prefetched_id.assign ((unsigned char*) id.data (), id.size ());
+ id.close ();
+ prefetched = 2;
+
+ return true;
+}
+
+bool xs::xrespondent_t::xhas_out ()
+{
+ return true;
+}
+
+xs::xrespondent_session_t::xrespondent_session_t (io_thread_t *io_thread_,
+ bool connect_, socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ session_base_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+xs::xrespondent_session_t::~xrespondent_session_t ()
+{
+}
+
diff --git a/src/xrespondent.hpp b/src/xrespondent.hpp
new file mode 100644
index 0000000..eb6ac9a
--- /dev/null
+++ b/src/xrespondent.hpp
@@ -0,0 +1,123 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_XRESPONDENT_HPP_INCLUDED__
+#define __XS_XRESPONDENT_HPP_INCLUDED__
+
+#include <map>
+
+#include "socket_base.hpp"
+#include "session_base.hpp"
+#include "stdint.hpp"
+#include "blob.hpp"
+#include "msg.hpp"
+#include "fq.hpp"
+
+namespace xs
+{
+
+ class ctx_t;
+ class pipe_t;
+ class io_thread_t;
+
+ // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
+ class xrespondent_t :
+ public socket_base_t
+ {
+ public:
+
+ xrespondent_t (xs::ctx_t *parent_, uint32_t tid_, int sid_);
+ ~xrespondent_t ();
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
+ int xsend (msg_t *msg_, int flags_);
+ int xrecv (msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+ void xread_activated (xs::pipe_t *pipe_);
+ void xwrite_activated (xs::pipe_t *pipe_);
+ void xterminated (xs::pipe_t *pipe_);
+
+ protected:
+
+ // Rollback any message parts that were sent but not yet flushed.
+ int rollback ();
+
+ private:
+
+ // Fair queueing object for inbound pipes.
+ fq_t fq;
+
+ // This value is either 0 (nothing is prefetched), 1 (only message body
+ // is prefetched) or 2 (both identity and message body are prefetched).
+ int prefetched;
+
+ // Holds the prefetched identity.
+ blob_t prefetched_id;
+
+ // Holds the prefetched message.
+ msg_t prefetched_msg;
+
+ // If true, more incoming message parts are expected.
+ bool more_in;
+
+ struct outpipe_t
+ {
+ xs::pipe_t *pipe;
+ bool active;
+ };
+
+ // Outbound pipes indexed by the peer IDs.
+ typedef std::map <uint32_t, outpipe_t> outpipes_t;
+ outpipes_t outpipes;
+
+ // The pipe we are currently writing to.
+ xs::pipe_t *current_out;
+
+ // If true, more outgoing message parts are expected.
+ bool more_out;
+
+ // Peer ID are generated. It's a simple increment and wrap-over
+ // algorithm. This value is the next ID to use (if not used already).
+ uint32_t next_peer_id;
+
+ xrespondent_t (const xrespondent_t&);
+ const xrespondent_t &operator = (const xrespondent_t&);
+ };
+
+ class xrespondent_session_t : public session_base_t
+ {
+ public:
+
+ xrespondent_session_t (xs::io_thread_t *io_thread_, bool connect_,
+ socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~xrespondent_session_t ();
+
+ private:
+
+ xrespondent_session_t (const xrespondent_session_t&);
+ const xrespondent_session_t &operator = (const xrespondent_session_t&);
+ };
+
+}
+
+#endif
diff --git a/src/xsurveyor.cpp b/src/xsurveyor.cpp
new file mode 100644
index 0000000..4265437
--- /dev/null
+++ b/src/xsurveyor.cpp
@@ -0,0 +1,89 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "xsurveyor.hpp"
+#include "err.hpp"
+#include "msg.hpp"
+
+xs::xsurveyor_t::xsurveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
+ socket_base_t (parent_, tid_, sid_)
+{
+ options.type = XS_XSURVEYOR;
+}
+
+xs::xsurveyor_t::~xsurveyor_t ()
+{
+}
+
+void xs::xsurveyor_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
+{
+ xs_assert (pipe_);
+ fq.attach (pipe_);
+ dist.attach (pipe_);
+}
+
+int xs::xsurveyor_t::xsend (msg_t *msg_, int flags_)
+{
+ return dist.send_to_all (msg_, flags_);
+}
+
+int xs::xsurveyor_t::xrecv (msg_t *msg_, int flags_)
+{
+ return fq.recv (msg_, flags_);
+}
+
+bool xs::xsurveyor_t::xhas_in ()
+{
+ return fq.has_in ();
+}
+
+bool xs::xsurveyor_t::xhas_out ()
+{
+ return dist.has_out ();
+}
+
+void xs::xsurveyor_t::xread_activated (pipe_t *pipe_)
+{
+ fq.activated (pipe_);
+}
+
+void xs::xsurveyor_t::xwrite_activated (pipe_t *pipe_)
+{
+ dist.activated (pipe_);
+}
+
+void xs::xsurveyor_t::xterminated (pipe_t *pipe_)
+{
+ fq.terminated (pipe_);
+ dist.terminated (pipe_);
+}
+
+xs::xsurveyor_session_t::xsurveyor_session_t (io_thread_t *io_thread_,
+ bool connect_, socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ session_base_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+xs::xsurveyor_session_t::~xsurveyor_session_t ()
+{
+}
+
diff --git a/src/xsurveyor.hpp b/src/xsurveyor.hpp
new file mode 100644
index 0000000..b0c3f9c
--- /dev/null
+++ b/src/xsurveyor.hpp
@@ -0,0 +1,85 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_XSURVEYOR_HPP_INCLUDED__
+#define __XS_XSURVEYOR_HPP_INCLUDED__
+
+#include "socket_base.hpp"
+#include "session_base.hpp"
+#include "dist.hpp"
+#include "fq.hpp"
+
+namespace xs
+{
+
+ class ctx_t;
+ class msg_t;
+ class pipe_t;
+ class io_thread_t;
+ class socket_base_t;
+
+ class xsurveyor_t : public socket_base_t
+ {
+ public:
+
+ xsurveyor_t (xs::ctx_t *parent_, uint32_t tid_, int sid_);
+ ~xsurveyor_t ();
+
+ protected:
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
+ int xsend (xs::msg_t *msg_, int flags_);
+ int xrecv (xs::msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+ void xread_activated (xs::pipe_t *pipe_);
+ void xwrite_activated (xs::pipe_t *pipe_);
+ void xterminated (xs::pipe_t *pipe_);
+
+ private:
+
+ // Inbound messages are fair-queued from inbound pipes.
+ // Outbound messages are distributed to all outbound pipes.
+ fq_t fq;
+ dist_t dist;
+
+ xsurveyor_t (const xsurveyor_t&);
+ const xsurveyor_t &operator = (const xsurveyor_t&);
+ };
+
+ class xsurveyor_session_t : public session_base_t
+ {
+ public:
+
+ xsurveyor_session_t (xs::io_thread_t *io_thread_, bool connect_,
+ xs::socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~xsurveyor_session_t ();
+
+ private:
+
+ xsurveyor_session_t (const xsurveyor_session_t&);
+ const xsurveyor_session_t &operator = (const xsurveyor_session_t&);
+ };
+
+}
+
+#endif