summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-x.gitignore1
-rw-r--r--builds/msvc/libxs/libxs.vcxproj10
-rw-r--r--builds/msvc/libxs/libxs.vcxproj.filters26
-rw-r--r--builds/msvc/tests/tests.vcxproj6
-rw-r--r--builds/msvc/tests/tests.vcxproj.filters5
-rw-r--r--include/xs.h4
-rw-r--r--src/Makefile.am8
-rw-r--r--src/respondent.cpp114
-rw-r--r--src/respondent.hpp77
-rw-r--r--src/session_base.cpp20
-rw-r--r--src/socket_base.cpp16
-rw-r--r--src/surveyor.cpp135
-rw-r--r--src/surveyor.hpp77
-rw-r--r--src/xrespondent.cpp278
-rw-r--r--src/xrespondent.hpp123
-rw-r--r--src/xsurveyor.cpp89
-rw-r--r--src/xsurveyor.hpp85
-rw-r--r--tests/Makefile.am4
-rw-r--r--tests/survey.cpp135
-rw-r--r--tests/tests.cpp7
20 files changed, 1214 insertions, 6 deletions
diff --git a/.gitignore b/.gitignore
index 2de95c2..2db7190 100755
--- a/.gitignore
+++ b/.gitignore
@@ -49,6 +49,7 @@ tests/polltimeo
tests/wireformat
tests/libzmq21
tests/resubscribe
+tests/survey
src/platform.hpp*
src/stamp-h1
perf/*.exe
diff --git a/builds/msvc/libxs/libxs.vcxproj b/builds/msvc/libxs/libxs.vcxproj
index bbab5fd..d4f40ca 100644
--- a/builds/msvc/libxs/libxs.vcxproj
+++ b/builds/msvc/libxs/libxs.vcxproj
@@ -143,12 +143,14 @@
<ClCompile Include="..\..\..\src\reaper.cpp" />
<ClCompile Include="..\..\..\src\rep.cpp" />
<ClCompile Include="..\..\..\src\req.cpp" />
+ <ClCompile Include="..\..\..\src\respondent.cpp" />
<ClCompile Include="..\..\..\src\select.cpp" />
<ClCompile Include="..\..\..\src\session_base.cpp" />
<ClCompile Include="..\..\..\src\signaler.cpp" />
<ClCompile Include="..\..\..\src\socket_base.cpp" />
<ClCompile Include="..\..\..\src\stream_engine.cpp" />
<ClCompile Include="..\..\..\src\sub.cpp" />
+ <ClCompile Include="..\..\..\src\surveyor.cpp" />
<ClCompile Include="..\..\..\src\tcp_address.cpp" />
<ClCompile Include="..\..\..\src\tcp_connecter.cpp" />
<ClCompile Include="..\..\..\src\tcp_listener.cpp" />
@@ -157,8 +159,10 @@
<ClCompile Include="..\..\..\src\xpub.cpp" />
<ClCompile Include="..\..\..\src\xrep.cpp" />
<ClCompile Include="..\..\..\src\xreq.cpp" />
+ <ClCompile Include="..\..\..\src\xrespondent.cpp" />
<ClCompile Include="..\..\..\src\xsub.cpp" />
<ClCompile Include="..\..\..\src\xs.cpp" />
+ <ClCompile Include="..\..\..\src\xsurveyor.cpp" />
<ClCompile Include="..\..\..\src\xszmq.cpp" />
</ItemGroup>
<ItemGroup>
@@ -211,6 +215,7 @@
<ClInclude Include="..\..\..\src\reaper.hpp" />
<ClInclude Include="..\..\..\src\rep.hpp" />
<ClInclude Include="..\..\..\src\req.hpp" />
+ <ClInclude Include="..\..\..\src\respondent.hpp" />
<ClInclude Include="..\..\..\src\select.hpp" />
<ClInclude Include="..\..\..\src\session_base.hpp" />
<ClInclude Include="..\..\..\src\signaler.hpp" />
@@ -218,6 +223,7 @@
<ClInclude Include="..\..\..\src\stdint.hpp" />
<ClInclude Include="..\..\..\src\stream_engine.hpp" />
<ClInclude Include="..\..\..\src\sub.hpp" />
+ <ClInclude Include="..\..\..\src\surveyor.hpp" />
<ClInclude Include="..\..\..\src\tcp_address.hpp" />
<ClInclude Include="..\..\..\src\tcp_connecter.hpp" />
<ClInclude Include="..\..\..\src\tcp_listener.hpp" />
@@ -228,7 +234,9 @@
<ClInclude Include="..\..\..\src\xpub.hpp" />
<ClInclude Include="..\..\..\src\xrep.hpp" />
<ClInclude Include="..\..\..\src\xreq.hpp" />
+ <ClInclude Include="..\..\..\src\xrespondent.hpp" />
<ClInclude Include="..\..\..\src\xsub.hpp" />
+ <ClInclude Include="..\..\..\src\xsurveyor.hpp" />
<ClInclude Include="..\..\..\src\ypipe.hpp" />
<ClInclude Include="..\..\..\src\yqueue.hpp" />
<ClInclude Include="..\platform.hpp" />
@@ -236,4 +244,4 @@
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
-</Project>
+</Project> \ No newline at end of file
diff --git a/builds/msvc/libxs/libxs.vcxproj.filters b/builds/msvc/libxs/libxs.vcxproj.filters
index eb0f505..1fefb60 100644
--- a/builds/msvc/libxs/libxs.vcxproj.filters
+++ b/builds/msvc/libxs/libxs.vcxproj.filters
@@ -176,6 +176,18 @@
<ClCompile Include="..\..\..\src\prefix_filter.cpp">
<Filter>Source Files</Filter>
</ClCompile>
+ <ClCompile Include="..\..\..\src\respondent.cpp">
+ <Filter>Source Files</Filter>
+ </ClCompile>
+ <ClCompile Include="..\..\..\src\xrespondent.cpp">
+ <Filter>Source Files</Filter>
+ </ClCompile>
+ <ClCompile Include="..\..\..\src\surveyor.cpp">
+ <Filter>Source Files</Filter>
+ </ClCompile>
+ <ClCompile Include="..\..\..\src\xsurveyor.cpp">
+ <Filter>Source Files</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\..\include\xs.h">
@@ -388,5 +400,17 @@
<ClInclude Include="..\..\..\src\prefix_filter.hpp">
<Filter>Header Files</Filter>
</ClInclude>
+ <ClInclude Include="..\..\..\src\respondent.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\..\src\surveyor.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\..\src\xrespondent.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\..\src\xsurveyor.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
</ItemGroup>
-</Project>
+</Project> \ No newline at end of file
diff --git a/builds/msvc/tests/tests.vcxproj b/builds/msvc/tests/tests.vcxproj
index bb707c4..d1258e4 100644
--- a/builds/msvc/tests/tests.vcxproj
+++ b/builds/msvc/tests/tests.vcxproj
@@ -151,6 +151,10 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
</ClCompile>
+ <ClCompile Include="..\..\..\tests\survey.cpp">
+ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
+ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
+ </ClCompile>
<ClCompile Include="..\..\..\tests\tests.cpp" />
<ClCompile Include="..\..\..\tests\timeo.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
@@ -184,4 +188,4 @@
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
-</Project>
+</Project> \ No newline at end of file
diff --git a/builds/msvc/tests/tests.vcxproj.filters b/builds/msvc/tests/tests.vcxproj.filters
index b1eade2..0fb41a8 100644
--- a/builds/msvc/tests/tests.vcxproj.filters
+++ b/builds/msvc/tests/tests.vcxproj.filters
@@ -65,6 +65,9 @@
<ClCompile Include="..\..\..\tests\resubscribe.cpp">
<Filter>Header Files</Filter>
</ClCompile>
+ <ClCompile Include="..\..\..\tests\survey.cpp">
+ <Filter>Header Files</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<Filter Include="Header Files">
@@ -76,4 +79,4 @@
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>
-</Project>
+</Project> \ No newline at end of file
diff --git a/include/xs.h b/include/xs.h
index 17a557b..34d6c60 100644
--- a/include/xs.h
+++ b/include/xs.h
@@ -172,6 +172,10 @@ XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval,
#define XS_PUSH 8
#define XS_XPUB 9
#define XS_XSUB 10
+#define XS_SURVEYOR 11
+#define XS_RESPONDENT 12
+#define XS_XSURVEYOR 13
+#define XS_XRESPONDENT 14
/* Legacy socket type aliases. */
#define XS_ROUTER XS_XREP
diff --git a/src/Makefile.am b/src/Makefile.am
index 81ece53..3025e13 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -64,6 +64,7 @@ libxs_la_SOURCES = \
reaper.hpp \
rep.hpp \
req.hpp \
+ respondent.hpp \
select.hpp \
session_base.hpp \
signaler.hpp \
@@ -71,6 +72,7 @@ libxs_la_SOURCES = \
stdint.hpp \
stream_engine.hpp \
sub.hpp \
+ surveyor.hpp \
tcp_address.hpp \
tcp_connecter.hpp \
tcp_listener.hpp \
@@ -81,7 +83,9 @@ libxs_la_SOURCES = \
xpub.hpp \
xrep.hpp \
xreq.hpp \
+ xrespondent.hpp \
xsub.hpp \
+ xsurveyor.hpp \
ypipe.hpp \
yqueue.hpp \
clock.cpp \
@@ -121,12 +125,14 @@ libxs_la_SOURCES = \
random.cpp \
rep.cpp \
req.cpp \
+ respondent.cpp \
select.cpp \
session_base.cpp \
signaler.cpp \
socket_base.cpp \
stream_engine.cpp \
sub.cpp \
+ surveyor.cpp \
tcp_address.cpp \
tcp_connecter.cpp \
tcp_listener.cpp \
@@ -135,7 +141,9 @@ libxs_la_SOURCES = \
xpub.cpp \
xrep.cpp \
xreq.cpp \
+ xrespondent.cpp \
xsub.cpp \
+ xsurveyor.cpp \
xs.cpp
if ON_MINGW
diff --git a/src/respondent.cpp b/src/respondent.cpp
new file mode 100644
index 0000000..c2abd84
--- /dev/null
+++ b/src/respondent.cpp
@@ -0,0 +1,114 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "respondent.hpp"
+#include "err.hpp"
+#include "msg.hpp"
+
+xs::respondent_t::respondent_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
+ xrespondent_t (parent_, tid_, sid_),
+ sending_reply (false)
+{
+ options.type = XS_RESPONDENT;
+}
+
+xs::respondent_t::~respondent_t ()
+{
+}
+
+int xs::respondent_t::xsend (msg_t *msg_, int flags_)
+{
+ // If there's no ongoing survey, we cannot send reply.
+ if (!sending_reply) {
+ errno = EFSM;
+ return -1;
+ }
+
+ // Survey pattern doesn't support multipart messages.
+ if (msg_->flags () & msg_t::more || flags_ & XS_SNDMORE) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ // Push message to the reply pipe.
+ int rc = xrespondent_t::xsend (msg_, flags_);
+ if (rc != 0)
+ return rc;
+
+ // Flip the FSM back to request receiving state.
+ sending_reply = false;
+
+ return 0;
+}
+
+int xs::respondent_t::xrecv (msg_t *msg_, int flags_)
+{
+ // If we are in middle of sending a reply, we cannot receive next survey.
+ if (sending_reply) {
+ errno = EFSM;
+ return -1;
+ }
+
+ // First thing to do when receiving a srvey is to copy all the labels
+ // to the reply pipe.
+ while (true) {
+ int rc = xrespondent_t::xrecv (msg_, flags_);
+ if (rc != 0)
+ return rc;
+ if (!(msg_->flags () & msg_t::more))
+ break;
+ rc = xrespondent_t::xsend (msg_, flags_);
+ errno_assert (rc == 0);
+ }
+
+ // When whole survey is read, flip the FSM to reply-sending state.
+ sending_reply = true;
+
+ return 0;
+}
+
+bool xs::respondent_t::xhas_in ()
+{
+ if (sending_reply)
+ return false;
+
+ return xrespondent_t::xhas_in ();
+}
+
+bool xs::respondent_t::xhas_out ()
+{
+ if (!sending_reply)
+ return false;
+
+ return xrespondent_t::xhas_out ();
+}
+
+xs::respondent_session_t::respondent_session_t (io_thread_t *io_thread_,
+ bool connect_, socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ xrespondent_session_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+xs::respondent_session_t::~respondent_session_t ()
+{
+}
+
diff --git a/src/respondent.hpp b/src/respondent.hpp
new file mode 100644
index 0000000..b315350
--- /dev/null
+++ b/src/respondent.hpp
@@ -0,0 +1,77 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_RESPONDENT_HPP_INCLUDED__
+#define __XS_RESPONDENT_HPP_INCLUDED__
+
+#include "xrespondent.hpp"
+
+namespace xs
+{
+
+ class ctx_t;
+ class msg_t;
+ class io_thread_t;
+ class socket_base_t;
+
+ class respondent_t : public xrespondent_t
+ {
+ public:
+
+ respondent_t (xs::ctx_t *parent_, uint32_t tid_, int sid_);
+ ~respondent_t ();
+
+ // Overloads of functions from socket_base_t.
+ int xsend (xs::msg_t *msg_, int flags_);
+ int xrecv (xs::msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ // If true, we are in process of sending the reply. If false we are
+ // in process of receiving a request.
+ // TODO: Consider whether automatic cancelling of the previous request
+ // when recv is called again is not a better idea than returning EFSM.
+ bool sending_reply;
+
+ respondent_t (const respondent_t&);
+ const respondent_t &operator = (const respondent_t&);
+
+ };
+
+ class respondent_session_t : public xrespondent_session_t
+ {
+ public:
+
+ respondent_session_t (xs::io_thread_t *io_thread_, bool connect_,
+ xs::socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~respondent_session_t ();
+
+ private:
+
+ respondent_session_t (const respondent_session_t&);
+ const respondent_session_t &operator = (const respondent_session_t&);
+ };
+
+}
+
+#endif
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 90fa071..35c4a4e 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -42,6 +42,10 @@
#include "push.hpp"
#include "pull.hpp"
#include "pair.hpp"
+#include "surveyor.hpp"
+#include "xsurveyor.hpp"
+#include "respondent.hpp"
+#include "xrespondent.hpp"
xs::session_base_t *xs::session_base_t::create (class io_thread_t *io_thread_,
bool connect_, class socket_base_t *socket_, const options_t &options_,
@@ -93,6 +97,22 @@ xs::session_base_t *xs::session_base_t::create (class io_thread_t *io_thread_,
s = new (std::nothrow) pair_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
+ case XS_SURVEYOR:
+ s = new (std::nothrow) surveyor_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case XS_XSURVEYOR:
+ s = new (std::nothrow) xsurveyor_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case XS_RESPONDENT:
+ s = new (std::nothrow) respondent_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case XS_XRESPONDENT:
+ s = new (std::nothrow) xrespondent_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
default:
errno = EINVAL;
return NULL;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index eb9b491..df182f1 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -61,6 +61,10 @@
#include "xrep.hpp"
#include "xpub.hpp"
#include "xsub.hpp"
+#include "surveyor.hpp"
+#include "xsurveyor.hpp"
+#include "respondent.hpp"
+#include "xrespondent.hpp"
bool xs::socket_base_t::check_tag ()
{
@@ -106,6 +110,18 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_,
case XS_XSUB:
s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
break;
+ case XS_SURVEYOR:
+ s = new (std::nothrow) surveyor_t (parent_, tid_, sid_);
+ break;
+ case XS_XSURVEYOR:
+ s = new (std::nothrow) xsurveyor_t (parent_, tid_, sid_);
+ break;
+ case XS_RESPONDENT:
+ s = new (std::nothrow) respondent_t (parent_, tid_, sid_);
+ break;
+ case XS_XRESPONDENT:
+ s = new (std::nothrow) xrespondent_t (parent_, tid_, sid_);
+ break;
default:
errno = EINVAL;
return NULL;
diff --git a/src/surveyor.cpp b/src/surveyor.cpp
new file mode 100644
index 0000000..c5216bd
--- /dev/null
+++ b/src/surveyor.cpp
@@ -0,0 +1,135 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "surveyor.hpp"
+#include "err.hpp"
+#include "msg.hpp"
+#include "wire.hpp"
+#include "likely.hpp"
+#include "random.hpp"
+
+xs::surveyor_t::surveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
+ xsurveyor_t (parent_, tid_, sid_),
+ receiving_responses (false),
+ survey_id (generate_random ())
+{
+ options.type = XS_SURVEYOR;
+}
+
+xs::surveyor_t::~surveyor_t ()
+{
+}
+
+int xs::surveyor_t::xsend (msg_t *msg_, int flags_)
+{
+ int rc;
+
+ // Survey pattern works only with sigle-part messages.
+ if (flags_ & XS_SNDMORE || msg_->flags () & msg_t::more) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ // Start the new survey. First, generate new survey ID.
+ ++survey_id;
+ msg_t id;
+ rc = id.init_size (4);
+ errno_assert (rc == 0);
+ put_uint32 ((unsigned char*) id.data (), survey_id);
+ id.set_flags (msg_t::more);
+ rc = xsurveyor_t::xsend (&id, 0);
+ if (rc != 0) {
+ id.close ();
+ return -1;
+ }
+ id.close ();
+
+ // Now send the body of the survey.
+ rc = xsurveyor_t::xsend (msg_, flags_);
+ errno_assert (rc == 0);
+
+ // Start waiting for responses from the peers.
+ receiving_responses = true;
+
+ return 0;
+}
+
+int xs::surveyor_t::xrecv (msg_t *msg_, int flags_)
+{
+ int rc;
+
+ // If there's no survey underway, it's an error.
+ if (!receiving_responses) {
+ errno = EFSM;
+ return -1;
+ }
+
+ // Get the first part of the response -- the survey ID.
+ rc = xsurveyor_t::xrecv (msg_, flags_);
+ if (rc != 0)
+ return rc;
+
+ // Check whether this is response for the onging survey. If not, we can
+ // drop the response.
+ if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 4 ||
+ get_uint32 ((unsigned char*) msg_->data ()) != survey_id)) {
+ while (true) {
+ rc = xsurveyor_t::xrecv (msg_, flags_);
+ errno_assert (rc == 0);
+ if (!(msg_->flags () & msg_t::more))
+ break;
+ }
+ msg_->close ();
+ msg_->init ();
+ errno = EAGAIN;
+ return -1;
+ }
+
+ // Get the body of the response.
+ rc = xsurveyor_t::xrecv (msg_, flags_);
+ errno_assert (rc == 0);
+
+ return 0;
+}
+
+bool xs::surveyor_t::xhas_in ()
+{
+ // TODO: We need a prefetched_message here...
+ xs_assert (false);
+ return false;
+}
+
+bool xs::surveyor_t::xhas_out ()
+{
+ return xsurveyor_t::xhas_out ();
+}
+
+xs::surveyor_session_t::surveyor_session_t (io_thread_t *io_thread_,
+ bool connect_, socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ xsurveyor_session_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+xs::surveyor_session_t::~surveyor_session_t ()
+{
+}
+
diff --git a/src/surveyor.hpp b/src/surveyor.hpp
new file mode 100644
index 0000000..83fdbb0
--- /dev/null
+++ b/src/surveyor.hpp
@@ -0,0 +1,77 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_SURVEYOR_HPP_INCLUDED__
+#define __XS_SURVEYOR_HPP_INCLUDED__
+
+#include "xsurveyor.hpp"
+#include "stdint.hpp"
+
+namespace xs
+{
+
+ class ctx_t;
+ class msg_t;
+ class io_thread_t;
+ class socket_base_t;
+
+ class surveyor_t : public xsurveyor_t
+ {
+ public:
+
+ surveyor_t (xs::ctx_t *parent_, uint32_t tid_, int sid_);
+ ~surveyor_t ();
+
+ // Overloads of functions from socket_base_t.
+ int xsend (xs::msg_t *msg_, int flags_);
+ int xrecv (xs::msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ // If true, survey was already lauched and have no expriered yet.
+ bool receiving_responses;
+
+ // The ID of the ongoing survey.
+ uint32_t survey_id;
+
+ surveyor_t (const surveyor_t&);
+ const surveyor_t &operator = (const surveyor_t&);
+ };
+
+ class surveyor_session_t : public xsurveyor_session_t
+ {
+ public:
+
+ surveyor_session_t (xs::io_thread_t *io_thread_, bool connect_,
+ xs::socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~surveyor_session_t ();
+
+ private:
+
+ surveyor_session_t (const surveyor_session_t&);
+ const surveyor_session_t &operator = (const surveyor_session_t&);
+ };
+
+}
+
+#endif
diff --git a/src/xrespondent.cpp b/src/xrespondent.cpp
new file mode 100644
index 0000000..026fe8d
--- /dev/null
+++ b/src/xrespondent.cpp
@@ -0,0 +1,278 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "xrespondent.hpp"
+#include "pipe.hpp"
+#include "wire.hpp"
+#include "random.hpp"
+#include "likely.hpp"
+#include "err.hpp"
+
+xs::xrespondent_t::xrespondent_t (class ctx_t *parent_, uint32_t tid_,
+ int sid_) :
+ socket_base_t (parent_, tid_, sid_),
+ prefetched (0),
+ more_in (false),
+ current_out (NULL),
+ more_out (false),
+ next_peer_id (generate_random ())
+{
+ options.type = XS_XRESPONDENT;
+
+ prefetched_msg.init ();
+}
+
+xs::xrespondent_t::~xrespondent_t ()
+{
+ xs_assert (outpipes.empty ());
+ prefetched_msg.close ();
+}
+
+void xs::xrespondent_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
+{
+ xs_assert (pipe_);
+
+ // Add the pipe to the map out outbound pipes.
+ outpipe_t outpipe = {pipe_, true};
+ bool ok = outpipes.insert (outpipes_t::value_type (
+ next_peer_id, outpipe)).second;
+ xs_assert (ok);
+
+ // Add the pipe to the list of inbound pipes.
+ blob_t identity (4, 0);
+ put_uint32 ((unsigned char*) identity.data (), next_peer_id);
+ pipe_->set_identity (identity);
+ fq.attach (pipe_);
+
+ // Generate a new unique peer identity.
+ ++next_peer_id;
+}
+
+void xs::xrespondent_t::xterminated (pipe_t *pipe_)
+{
+ fq.terminated (pipe_);
+
+ for (outpipes_t::iterator it = outpipes.begin ();
+ it != outpipes.end (); ++it) {
+ if (it->second.pipe == pipe_) {
+ outpipes.erase (it);
+ if (pipe_ == current_out)
+ current_out = NULL;
+ return;
+ }
+ }
+ xs_assert (false);
+}
+
+void xs::xrespondent_t::xread_activated (pipe_t *pipe_)
+{
+ fq.activated (pipe_);
+}
+
+void xs::xrespondent_t::xwrite_activated (pipe_t *pipe_)
+{
+ for (outpipes_t::iterator it = outpipes.begin ();
+ it != outpipes.end (); ++it) {
+ if (it->second.pipe == pipe_) {
+ xs_assert (!it->second.active);
+ it->second.active = true;
+ return;
+ }
+ }
+ xs_assert (false);
+}
+
+int xs::xrespondent_t::xsend (msg_t *msg_, int flags_)
+{
+ // If this is the first part of the message it's the ID of the
+ // peer to send the message to.
+ if (!more_out) {
+ xs_assert (!current_out);
+
+ // If we have malformed message (prefix with no subsequent message)
+ // then just silently ignore it.
+ // TODO: The connections should be killed instead.
+ if (msg_->flags () & msg_t::more && msg_->size () == 4) {
+
+ more_out = true;
+
+ // Find the pipe associated with the identity stored in the prefix.
+ // If there's no such pipe just silently ignore the message.
+ uint32_t identity = get_uint32 ((unsigned char*) msg_->data ());
+ outpipes_t::iterator it = outpipes.find (identity);
+
+ if (it != outpipes.end ()) {
+ current_out = it->second.pipe;
+ msg_t empty;
+ int rc = empty.init ();
+ errno_assert (rc == 0);
+ if (!current_out->check_write (&empty)) {
+ it->second.active = false;
+ more_out = false;
+ current_out = NULL;
+ }
+ rc = empty.close ();
+ errno_assert (rc == 0);
+ }
+
+ }
+
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ rc = msg_->init ();
+ errno_assert (rc == 0);
+ return 0;
+ }
+
+ // Check whether this is the last part of the message.
+ more_out = msg_->flags () & msg_t::more ? true : false;
+
+ // Push the message into the pipe. If there's no out pipe, just drop it.
+ if (current_out) {
+ bool ok = current_out->write (msg_);
+ if (unlikely (!ok))
+ current_out = NULL;
+ else if (!more_out) {
+ current_out->flush ();
+ current_out = NULL;
+ }
+ }
+ else {
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ }
+
+ // Detach the message from the data buffer.
+ int rc = msg_->init ();
+ errno_assert (rc == 0);
+
+ return 0;
+}
+
+int xs::xrespondent_t::xrecv (msg_t *msg_, int flags_)
+{
+ int rc;
+
+ // if there is a prefetched identity, return it.
+ if (prefetched == 2)
+ {
+ rc = msg_->init_size (prefetched_id.size ());
+ errno_assert (rc == 0);
+ memcpy (msg_->data (), prefetched_id.data (), prefetched_id.size ());
+ msg_->set_flags (msg_t::more);
+ prefetched = 1;
+ return 0;
+ }
+
+ // If there is a prefetched message, return it.
+ if (prefetched == 1) {
+ rc = msg_->move (prefetched_msg);
+ errno_assert (rc == 0);
+ more_in = msg_->flags () & msg_t::more ? true : false;
+ prefetched = 0;
+ return 0;
+ }
+
+ // Get next message part.
+ pipe_t *pipe = NULL;
+ rc = fq.recvpipe (msg_, flags_, &pipe);
+ if (rc != 0)
+ return -1;
+
+ // If we are in the middle of reading a message, just return the next part.
+ if (more_in) {
+ more_in = msg_->flags () & msg_t::more ? true : false;
+ return 0;
+ }
+
+ // We are at the beginning of a new message. Move the message part we
+ // have to the prefetched and return the ID of the peer instead.
+ rc = prefetched_msg.move (*msg_);
+ errno_assert (rc == 0);
+ prefetched = 1;
+ rc = msg_->close ();
+ errno_assert (rc == 0);
+
+ blob_t identity = pipe->get_identity ();
+ rc = msg_->init_size (identity.size ());
+ errno_assert (rc == 0);