summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am8
-rw-r--r--src/respondent.cpp114
-rw-r--r--src/respondent.hpp77
-rw-r--r--src/session_base.cpp20
-rw-r--r--src/socket_base.cpp16
-rw-r--r--src/surveyor.cpp135
-rw-r--r--src/surveyor.hpp77
-rw-r--r--src/xrespondent.cpp278
-rw-r--r--src/xrespondent.hpp123
-rw-r--r--src/xsurveyor.cpp89
-rw-r--r--src/xsurveyor.hpp85
11 files changed, 1022 insertions, 0 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 81ece53..3025e13 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -64,6 +64,7 @@ libxs_la_SOURCES = \
reaper.hpp \
rep.hpp \
req.hpp \
+ respondent.hpp \
select.hpp \
session_base.hpp \
signaler.hpp \
@@ -71,6 +72,7 @@ libxs_la_SOURCES = \
stdint.hpp \
stream_engine.hpp \
sub.hpp \
+ surveyor.hpp \
tcp_address.hpp \
tcp_connecter.hpp \
tcp_listener.hpp \
@@ -81,7 +83,9 @@ libxs_la_SOURCES = \
xpub.hpp \
xrep.hpp \
xreq.hpp \
+ xrespondent.hpp \
xsub.hpp \
+ xsurveyor.hpp \
ypipe.hpp \
yqueue.hpp \
clock.cpp \
@@ -121,12 +125,14 @@ libxs_la_SOURCES = \
random.cpp \
rep.cpp \
req.cpp \
+ respondent.cpp \
select.cpp \
session_base.cpp \
signaler.cpp \
socket_base.cpp \
stream_engine.cpp \
sub.cpp \
+ surveyor.cpp \
tcp_address.cpp \
tcp_connecter.cpp \
tcp_listener.cpp \
@@ -135,7 +141,9 @@ libxs_la_SOURCES = \
xpub.cpp \
xrep.cpp \
xreq.cpp \
+ xrespondent.cpp \
xsub.cpp \
+ xsurveyor.cpp \
xs.cpp
if ON_MINGW
diff --git a/src/respondent.cpp b/src/respondent.cpp
new file mode 100644
index 0000000..c2abd84
--- /dev/null
+++ b/src/respondent.cpp
@@ -0,0 +1,114 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "respondent.hpp"
+#include "err.hpp"
+#include "msg.hpp"
+
+xs::respondent_t::respondent_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
+ xrespondent_t (parent_, tid_, sid_),
+ sending_reply (false)
+{
+ options.type = XS_RESPONDENT;
+}
+
+xs::respondent_t::~respondent_t ()
+{
+}
+
+int xs::respondent_t::xsend (msg_t *msg_, int flags_)
+{
+ // If there's no ongoing survey, we cannot send reply.
+ if (!sending_reply) {
+ errno = EFSM;
+ return -1;
+ }
+
+ // Survey pattern doesn't support multipart messages.
+ if (msg_->flags () & msg_t::more || flags_ & XS_SNDMORE) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ // Push message to the reply pipe.
+ int rc = xrespondent_t::xsend (msg_, flags_);
+ if (rc != 0)
+ return rc;
+
+ // Flip the FSM back to request receiving state.
+ sending_reply = false;
+
+ return 0;
+}
+
+int xs::respondent_t::xrecv (msg_t *msg_, int flags_)
+{
+ // If we are in middle of sending a reply, we cannot receive next survey.
+ if (sending_reply) {
+ errno = EFSM;
+ return -1;
+ }
+
+ // First thing to do when receiving a srvey is to copy all the labels
+ // to the reply pipe.
+ while (true) {
+ int rc = xrespondent_t::xrecv (msg_, flags_);
+ if (rc != 0)
+ return rc;
+ if (!(msg_->flags () & msg_t::more))
+ break;
+ rc = xrespondent_t::xsend (msg_, flags_);
+ errno_assert (rc == 0);
+ }
+
+ // When whole survey is read, flip the FSM to reply-sending state.
+ sending_reply = true;
+
+ return 0;
+}
+
+bool xs::respondent_t::xhas_in ()
+{
+ if (sending_reply)
+ return false;
+
+ return xrespondent_t::xhas_in ();
+}
+
+bool xs::respondent_t::xhas_out ()
+{
+ if (!sending_reply)
+ return false;
+
+ return xrespondent_t::xhas_out ();
+}
+
+xs::respondent_session_t::respondent_session_t (io_thread_t *io_thread_,
+ bool connect_, socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ xrespondent_session_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+xs::respondent_session_t::~respondent_session_t ()
+{
+}
+
diff --git a/src/respondent.hpp b/src/respondent.hpp
new file mode 100644
index 0000000..b315350
--- /dev/null
+++ b/src/respondent.hpp
@@ -0,0 +1,77 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_RESPONDENT_HPP_INCLUDED__
+#define __XS_RESPONDENT_HPP_INCLUDED__
+
+#include "xrespondent.hpp"
+
+namespace xs
+{
+
+ class ctx_t;
+ class msg_t;
+ class io_thread_t;
+ class socket_base_t;
+
+ class respondent_t : public xrespondent_t
+ {
+ public:
+
+ respondent_t (xs::ctx_t *parent_, uint32_t tid_, int sid_);
+ ~respondent_t ();
+
+ // Overloads of functions from socket_base_t.
+ int xsend (xs::msg_t *msg_, int flags_);
+ int xrecv (xs::msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ // If true, we are in process of sending the reply. If false we are
+ // in process of receiving a request.
+ // TODO: Consider whether automatic cancelling of the previous request
+ // when recv is called again is not a better idea than returning EFSM.
+ bool sending_reply;
+
+ respondent_t (const respondent_t&);
+ const respondent_t &operator = (const respondent_t&);
+
+ };
+
+ class respondent_session_t : public xrespondent_session_t
+ {
+ public:
+
+ respondent_session_t (xs::io_thread_t *io_thread_, bool connect_,
+ xs::socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~respondent_session_t ();
+
+ private:
+
+ respondent_session_t (const respondent_session_t&);
+ const respondent_session_t &operator = (const respondent_session_t&);
+ };
+
+}
+
+#endif
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 90fa071..35c4a4e 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -42,6 +42,10 @@
#include "push.hpp"
#include "pull.hpp"
#include "pair.hpp"
+#include "surveyor.hpp"
+#include "xsurveyor.hpp"
+#include "respondent.hpp"
+#include "xrespondent.hpp"
xs::session_base_t *xs::session_base_t::create (class io_thread_t *io_thread_,
bool connect_, class socket_base_t *socket_, const options_t &options_,
@@ -93,6 +97,22 @@ xs::session_base_t *xs::session_base_t::create (class io_thread_t *io_thread_,
s = new (std::nothrow) pair_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
+ case XS_SURVEYOR:
+ s = new (std::nothrow) surveyor_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case XS_XSURVEYOR:
+ s = new (std::nothrow) xsurveyor_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case XS_RESPONDENT:
+ s = new (std::nothrow) respondent_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case XS_XRESPONDENT:
+ s = new (std::nothrow) xrespondent_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
default:
errno = EINVAL;
return NULL;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index eb9b491..df182f1 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -61,6 +61,10 @@
#include "xrep.hpp"
#include "xpub.hpp"
#include "xsub.hpp"
+#include "surveyor.hpp"
+#include "xsurveyor.hpp"
+#include "respondent.hpp"
+#include "xrespondent.hpp"
bool xs::socket_base_t::check_tag ()
{
@@ -106,6 +110,18 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_,
case XS_XSUB:
s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
break;
+ case XS_SURVEYOR:
+ s = new (std::nothrow) surveyor_t (parent_, tid_, sid_);
+ break;
+ case XS_XSURVEYOR:
+ s = new (std::nothrow) xsurveyor_t (parent_, tid_, sid_);
+ break;
+ case XS_RESPONDENT:
+ s = new (std::nothrow) respondent_t (parent_, tid_, sid_);
+ break;
+ case XS_XRESPONDENT:
+ s = new (std::nothrow) xrespondent_t (parent_, tid_, sid_);
+ break;
default:
errno = EINVAL;
return NULL;
diff --git a/src/surveyor.cpp b/src/surveyor.cpp
new file mode 100644
index 0000000..c5216bd
--- /dev/null
+++ b/src/surveyor.cpp
@@ -0,0 +1,135 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "surveyor.hpp"
+#include "err.hpp"
+#include "msg.hpp"
+#include "wire.hpp"
+#include "likely.hpp"
+#include "random.hpp"
+
+xs::surveyor_t::surveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
+ xsurveyor_t (parent_, tid_, sid_),
+ receiving_responses (false),
+ survey_id (generate_random ())
+{
+ options.type = XS_SURVEYOR;
+}
+
+xs::surveyor_t::~surveyor_t ()
+{
+}
+
+int xs::surveyor_t::xsend (msg_t *msg_, int flags_)
+{
+ int rc;
+
+ // Survey pattern works only with sigle-part messages.
+ if (flags_ & XS_SNDMORE || msg_->flags () & msg_t::more) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ // Start the new survey. First, generate new survey ID.
+ ++survey_id;
+ msg_t id;
+ rc = id.init_size (4);
+ errno_assert (rc == 0);
+ put_uint32 ((unsigned char*) id.data (), survey_id);
+ id.set_flags (msg_t::more);
+ rc = xsurveyor_t::xsend (&id, 0);
+ if (rc != 0) {
+ id.close ();
+ return -1;
+ }
+ id.close ();
+
+ // Now send the body of the survey.
+ rc = xsurveyor_t::xsend (msg_, flags_);
+ errno_assert (rc == 0);
+
+ // Start waiting for responses from the peers.
+ receiving_responses = true;
+
+ return 0;
+}
+
+int xs::surveyor_t::xrecv (msg_t *msg_, int flags_)
+{
+ int rc;
+
+ // If there's no survey underway, it's an error.
+ if (!receiving_responses) {
+ errno = EFSM;
+ return -1;
+ }
+
+ // Get the first part of the response -- the survey ID.
+ rc = xsurveyor_t::xrecv (msg_, flags_);
+ if (rc != 0)
+ return rc;
+
+ // Check whether this is response for the onging survey. If not, we can
+ // drop the response.
+ if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 4 ||
+ get_uint32 ((unsigned char*) msg_->data ()) != survey_id)) {
+ while (true) {
+ rc = xsurveyor_t::xrecv (msg_, flags_);
+ errno_assert (rc == 0);
+ if (!(msg_->flags () & msg_t::more))
+ break;
+ }
+ msg_->close ();
+ msg_->init ();
+ errno = EAGAIN;
+ return -1;
+ }
+
+ // Get the body of the response.
+ rc = xsurveyor_t::xrecv (msg_, flags_);
+ errno_assert (rc == 0);
+
+ return 0;
+}
+
+bool xs::surveyor_t::xhas_in ()
+{
+ // TODO: We need a prefetched_message here...
+ xs_assert (false);
+ return false;
+}
+
+bool xs::surveyor_t::xhas_out ()
+{
+ return xsurveyor_t::xhas_out ();
+}
+
+xs::surveyor_session_t::surveyor_session_t (io_thread_t *io_thread_,
+ bool connect_, socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ xsurveyor_session_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+xs::surveyor_session_t::~surveyor_session_t ()
+{
+}
+
diff --git a/src/surveyor.hpp b/src/surveyor.hpp
new file mode 100644
index 0000000..83fdbb0
--- /dev/null
+++ b/src/surveyor.hpp
@@ -0,0 +1,77 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_SURVEYOR_HPP_INCLUDED__
+#define __XS_SURVEYOR_HPP_INCLUDED__
+
+#include "xsurveyor.hpp"
+#include "stdint.hpp"
+
+namespace xs
+{
+
+ class ctx_t;
+ class msg_t;
+ class io_thread_t;
+ class socket_base_t;
+
+ class surveyor_t : public xsurveyor_t
+ {
+ public:
+
+ surveyor_t (xs::ctx_t *parent_, uint32_t tid_, int sid_);
+ ~surveyor_t ();
+
+ // Overloads of functions from socket_base_t.
+ int xsend (xs::msg_t *msg_, int flags_);
+ int xrecv (xs::msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ // If true, survey was already lauched and have no expriered yet.
+ bool receiving_responses;
+
+ // The ID of the ongoing survey.
+ uint32_t survey_id;
+
+ surveyor_t (const surveyor_t&);
+ const surveyor_t &operator = (const surveyor_t&);
+ };
+
+ class surveyor_session_t : public xsurveyor_session_t
+ {
+ public:
+
+ surveyor_session_t (xs::io_thread_t *io_thread_, bool connect_,
+ xs::socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~surveyor_session_t ();
+
+ private:
+
+ surveyor_session_t (const surveyor_session_t&);
+ const surveyor_session_t &operator = (const surveyor_session_t&);
+ };
+
+}
+
+#endif
diff --git a/src/xrespondent.cpp b/src/xrespondent.cpp
new file mode 100644
index 0000000..026fe8d
--- /dev/null
+++ b/src/xrespondent.cpp
@@ -0,0 +1,278 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "xrespondent.hpp"
+#include "pipe.hpp"
+#include "wire.hpp"
+#include "random.hpp"
+#include "likely.hpp"
+#include "err.hpp"
+
+xs::xrespondent_t::xrespondent_t (class ctx_t *parent_, uint32_t tid_,
+ int sid_) :
+ socket_base_t (parent_, tid_, sid_),
+ prefetched (0),
+ more_in (false),
+ current_out (NULL),
+ more_out (false),
+ next_peer_id (generate_random ())
+{
+ options.type = XS_XRESPONDENT;
+
+ prefetched_msg.init ();
+}
+
+xs::xrespondent_t::~xrespondent_t ()
+{
+ xs_assert (outpipes.empty ());
+ prefetched_msg.close ();
+}
+
+void xs::xrespondent_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
+{
+ xs_assert (pipe_);
+
+ // Add the pipe to the map out outbound pipes.
+ outpipe_t outpipe = {pipe_, true};
+ bool ok = outpipes.insert (outpipes_t::value_type (
+ next_peer_id, outpipe)).second;
+ xs_assert (ok);
+
+ // Add the pipe to the list of inbound pipes.
+ blob_t identity (4, 0);
+ put_uint32 ((unsigned char*) identity.data (), next_peer_id);
+ pipe_->set_identity (identity);
+ fq.attach (pipe_);
+
+ // Generate a new unique peer identity.
+ ++next_peer_id;
+}
+
+void xs::xrespondent_t::xterminated (pipe_t *pipe_)
+{
+ fq.terminated (pipe_);
+
+ for (outpipes_t::iterator it = outpipes.begin ();
+ it != outpipes.end (); ++it) {
+ if (it->second.pipe == pipe_) {
+ outpipes.erase (it);
+ if (pipe_ == current_out)
+ current_out = NULL;
+ return;
+ }
+ }
+ xs_assert (false);
+}
+
+void xs::xrespondent_t::xread_activated (pipe_t *pipe_)
+{
+ fq.activated (pipe_);
+}
+
+void xs::xrespondent_t::xwrite_activated (pipe_t *pipe_)
+{
+ for (outpipes_t::iterator it = outpipes.begin ();
+ it != outpipes.end (); ++it) {
+ if (it->second.pipe == pipe_) {
+ xs_assert (!it->second.active);
+ it->second.active = true;
+ return;
+ }
+ }
+ xs_assert (false);
+}
+
+int xs::xrespondent_t::xsend (msg_t *msg_, int flags_)
+{
+ // If this is the first part of the message it's the ID of the
+ // peer to send the message to.
+ if (!more_out) {
+ xs_assert (!current_out);
+
+ // If we have malformed message (prefix with no subsequent message)
+ // then just silently ignore it.
+ // TODO: The connections should be killed instead.
+ if (msg_->flags () & msg_t::more && msg_->size () == 4) {
+
+ more_out = true;
+
+ // Find the pipe associated with the identity stored in the prefix.
+ // If there's no such pipe just silently ignore the message.
+ uint32_t identity = get_uint32 ((unsigned char*) msg_->data ());
+ outpipes_t::iterator it = outpipes.find (identity);
+
+ if (it != outpipes.end ()) {
+ current_out = it->second.pipe;
+ msg_t empty;
+ int rc = empty.init ();
+ errno_assert (rc == 0);
+ if (!current_out->check_write (&empty)) {
+ it->second.active = false;
+ more_out = false;
+ current_out = NULL;
+ }
+ rc = empty.close ();
+ errno_assert (rc == 0);
+ }
+
+ }
+
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ rc = msg_->init ();
+ errno_assert (rc == 0);
+ return 0;
+ }
+
+ // Check whether this is the last part of the message.
+ more_out = msg_->flags () & msg_t::more ? true : false;
+
+ // Push the message into the pipe. If there's no out pipe, just drop it.
+ if (current_out) {
+ bool ok = current_out->write (msg_);
+ if (unlikely (!ok))
+ current_out = NULL;
+ else if (!more_out) {
+ current_out->flush ();
+ current_out = NULL;
+ }
+ }
+ else {
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ }
+
+ // Detach the message from the data buffer.
+ int rc = msg_->init ();
+ errno_assert (rc == 0);
+
+ return 0;
+}
+
+int xs::xrespondent_t::xrecv (msg_t *msg_, int flags_)
+{
+ int rc;
+
+ // if there is a prefetched identity, return it.
+ if (prefetched == 2)
+ {
+ rc = msg_->init_size (prefetched_id.size ());
+ errno_assert (rc == 0);
+ memcpy (msg_->data (), prefetched_id.data (), prefetched_id.size ());
+ msg_->set_flags (msg_t::more);
+ prefetched = 1;
+ return 0;
+ }
+
+ // If there is a prefetched message, return it.
+ if (prefetched == 1) {
+ rc = msg_->move (prefetched_msg);
+ errno_assert (rc == 0);
+ more_in = msg_->flags () & msg_t::more ? true : false;
+ prefetched = 0;
+ return 0;
+ }
+
+ // Get next message part.
+ pipe_t *pipe = NULL;
+ rc = fq.recvpipe (msg_, flags_, &pipe);
+ if (rc != 0)
+ return -1;
+
+ // If we are in the middle of reading a message, just return the next part.
+ if (more_in) {
+ more_in = msg_->flags () & msg_t::more ? true : false;
+ return 0;
+ }
+
+ // We are at the beginning of a new message. Move the message part we
+ // have to the prefetched and return the ID of the peer instead.
+ rc = prefetched_msg.move (*msg_);
+ errno_assert (rc == 0);
+ prefetched = 1;
+ rc = msg_->close ();
+ errno_assert (rc == 0);
+
+ blob_t identity = pipe->get_identity ();
+ rc = msg_->init_size (identity.size ());
+ errno_assert (rc == 0);
+ memcpy (msg_->data (), identity.data (), identity.size ());
+ msg_->set_flags (msg_t::more);
+ return 0;
+}
+
+int xs::xrespondent_t::rollback (void)
+{
+ if (current_out) {
+ current_out->rollback ();
+ current_out = NULL;
+ more_out = false;
+ }
+ return 0;
+}
+
+bool xs::xrespondent_t::xhas_in ()
+{
+ // If we are in the middle of reading the messages, there are
+ // definitely more parts available.
+ if (more_in)
+ return true;
+
+ // We may already have a message pre-fetched.
+ if (prefetched)
+ return true;
+
+ // Try to read the next message to the pre-fetch buffer. If anything,
+ // it will be identity of the peer sending the message.
+ msg_t id;
+ id.init ();
+ int rc = xrespondent_t::xrecv (&id, XS_DONTWAIT);
+ if (rc != 0 && errno == EAGAIN) {
+ id.close ();
+ return false;
+ }
+ xs_assert (rc == 0);
+
+ // We have first part of the message prefetched now. We will store the
+ // prefetched identity as well.
+ prefetched_id.assign ((unsigned char*) id.data (), id.size ());
+ id.close ();
+ prefetched = 2;
+
+ return true;
+}
+
+bool xs::xrespondent_t::xhas_out ()
+{
+ return true;
+}
+
+xs::xrespondent_session_t::xrespondent_session_t (io_thread_t *io_thread_,
+ bool connect_, socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ session_base_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+xs::xrespondent_session_t::~xrespondent_session_t ()
+{
+}
+
diff --git a/src/xrespondent.hpp b/src/xrespondent.hpp
new file mode 100644
index 0000000..eb6ac9a
--- /dev/null
+++ b/src/xrespondent.hpp
@@ -0,0 +1,123 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_XRESPONDENT_HPP_INCLUDED__
+#define __XS_XRESPONDENT_HPP_INCLUDED__
+
+#include <map>
+
+#include "socket_base.hpp"
+#include "session_base.hpp"
+#include "stdint.hpp"
+#include "blob.hpp"
+#include "msg.hpp"
+#include "fq.hpp"
+
+namespace xs
+{
+
+ class ctx_t;
+ class pipe_t;
+ class io_thread_t;
+
+ // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
+ class xrespondent_t :
+ public socket_base_t
+ {
+ public:
+
+ xrespondent_t (xs::ctx_t *parent_, uint32_t tid_, int sid_);
+ ~xrespondent_t ();
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
+ int xsend (msg_t *msg_, int flags_);
+ int xrecv (msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+ void xread_activated (xs::pipe_t *pipe_);
+ void xwrite_activated (xs::pipe_t *pipe_);
+ void xterminated (xs::pipe_t *pipe_);
+
+ protected:
+
+ // Rollback any message parts that were sent but not yet flushed.
+ int rollback ();
+
+ private:
+
+ // Fair queueing object for inbound pipes.
+ fq_t fq;
+
+ // This value is either 0 (nothing is prefetched), 1 (only message body
+ // is prefetched) or 2 (both identity and message body are prefetched).
+ int prefetched;
+
+ // Holds the prefetched identity.
+ blob_t prefetched_id;
+
+ // Holds the prefetched message.
+ msg_t prefetched_msg;
+
+ // If true, more incoming message parts are expected.
+ bool more_in;
+
+ struct outpipe_t
+ {
+ xs::pipe_t *pipe;
+ bool active;
+ };
+
+ // Outbound pipes indexed by the peer IDs.
+ typedef std::map <uint32_t, outpipe_t> outpipes_t;
+ outpipes_t outpipes;
+
+ // The pipe we are currently writing to.
+ xs::pipe_t *current_out;
+
+ // If true, more outgoing message parts are expected.
+ bool more_out;
+
+ // Peer ID are generated. It's a simple increment and wrap-over
+ // algorithm. This value is the next ID to use (if not used already).
+ uint32_t next_peer_id;
+
+ xrespondent_t (const xrespondent_t&);
+ const xrespondent_t &operator = (const xrespondent_t&);
+ };
+
+ class xrespondent_session_t : public session_base_t
+ {
+ public:
+
+ xrespondent_session_t (xs::io_thread_t *io_thread_, bool connect_,
+ socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~xrespondent_session_t ();
+
+ private:
+
+ xrespondent_session_t (const xrespondent_session_t&);
+ const xrespondent_session_t &operator = (const xrespondent_session_t&);
+ };
+
+}
+
+#endif
diff --git a/src/xsurveyor.cpp b/src/xsurveyor.cpp
new file mode 100644
index 0000000..4265437
--- /dev/null
+++ b/src/xsurveyor.cpp
@@ -0,0 +1,89 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "xsurveyor.hpp"
+#include "err.hpp"
+#include "msg.hpp"
+
+xs::xsurveyor_t::xsurveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
+ socket_base_t (parent_, tid_, sid_)
+{
+ options.type = XS_XSURVEYOR;
+}
+
+xs::xsurveyor_t::~xsurveyor_t ()
+{
+}
+
+void xs::xsurveyor_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
+{
+ xs_assert (pipe_);
+ fq.attach (pipe_);
+ dist.attach (pipe_);
+}
+
+int xs::xsurveyor_t::xsend (msg_t *msg_, int flags_)
+{
+ return dist.send_to_all (msg_, flags_);
+}
+
+int xs::xsurveyor_t::xrecv (msg_t *msg_, int flags_)
+{
+ return fq.recv (msg_, flags_);
+}
+
+bool xs::xsurveyor_t::xhas_in ()
+{
+ return fq.has_in ();
+}
+
+bool xs::xsurveyor_t::xhas_out ()
+{
+ return dist.has_out ();
+}
+
+void xs::xsurveyor_t::xread_activated (pipe_t *pipe_)
+{
+ fq.activated (pipe_);
+}
+
+void xs::xsurveyor_t::xwrite_activated (pipe_t *pipe_)
+{
+ dist.activated (pipe_);
+}
+
+void xs::xsurveyor_t::xterminated (pipe_t *pipe_)
+{
+ fq.terminated (pipe_);
+ dist.terminated (pipe_);
+}
+
+xs::xsurveyor_session_t::xsurveyor_session_t (io_thread_t *io_thread_,
+ bool connect_, socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ session_base_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+xs::xsurveyor_session_t::~xsurveyor_session_t ()
+{
+}
+
diff --git a/src/xsurveyor.hpp b/src/xsurveyor.hpp
new file mode 100644
index 0000000..b0c3f9c
--- /dev/null
+++ b/src/xsurveyor.hpp
@@ -0,0 +1,85 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_XSURVEYOR_HPP_INCLUDED__
+#define __XS_XSURVEYOR_HPP_INCLUDED__
+
+#include "socket_base.hpp"
+#include "session_base.hpp"
+#include "dist.hpp"
+#include "fq.hpp"
+
+namespace xs
+{
+
+ class ctx_t;
+ class msg_t;
+ class pipe_t;
+ class io_thread_t;
+ class socket_base_t;
+
+ class xsurveyor_t : public socket_base_t
+ {
+ public:
+
+ xsurveyor_t (xs::ctx_t *parent_, uint32_t tid_, int sid_);
+ ~xsurveyor_t ();
+
+ protected:
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
+ int xsend (xs::msg_t *msg_, int flags_);
+ int xrecv (xs::msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+ void xread_activated (xs::pipe_t *pipe_);
+ void xwrite_activated (xs::pipe_t *pipe_);
+ void xterminated (xs::pipe_t *pipe_);
+
+ private:
+
+ // Inbound messages are fair-queued from inbound pipes.
+ // Outbound messages are distributed to all outbound pipes.
+ fq_t fq;
+ dist_t dist;
+
+ xsurveyor_t (const xsurveyor_t&);
+ const xsurveyor_t &operator = (const xsurveyor_t&);
+ };
+
+ class xsurveyor_session_t : public session_base_t
+ {
+ public:
+
+ xsurveyor_session_t (xs::io_thread_t *io_thread_, bool connect_,
+ xs::socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~xsurveyor_session_t ();
+
+ private:
+
+ xsurveyor_session_t (const xsurveyor_session_t&);
+ const xsurveyor_session_t &operator = (const xsurveyor_session_t&);
+ };
+
+}
+
+#endif