summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2012-04-15 13:10:41 +0200
committerMartin Sustrik <sustrik@250bpm.com>2012-04-17 07:30:52 +0200
commit443d06f894751062da6d69238ce09f6fbfc27577 (patch)
tree44d21141df628f1cba965b7466267faa51f9d34c
parent692688206d5061de2a2b6a3d3040318dc537f221 (diff)
"Survey" pattern implemented
Survey pattern is "multicast with reply". There are two roles: surveyor and respondent. Surveyor publishes a survey which gets delivered to all connected respondents. Each repondent can send a response to the survey. All the responses are delivered to the original surveyor. Once the surveyor decides that the survey is over (e.g. deadline was reached) it can send initiate survey. Late responses from old surveys are automatically discarded by the surveyor socket. Socket types: SURVEYOR, XSURVEYOR, RESPONDENT, XRESPONDENT Patch also includes a test program with surveoyr, two respondents and an intermediary device. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rwxr-xr-x.gitignore1
-rw-r--r--builds/msvc/libxs/libxs.vcxproj10
-rw-r--r--builds/msvc/libxs/libxs.vcxproj.filters26
-rw-r--r--builds/msvc/tests/tests.vcxproj6
-rw-r--r--builds/msvc/tests/tests.vcxproj.filters5
-rw-r--r--include/xs.h4
-rw-r--r--src/Makefile.am8
-rw-r--r--src/respondent.cpp114
-rw-r--r--src/respondent.hpp77
-rw-r--r--src/session_base.cpp20
-rw-r--r--src/socket_base.cpp16
-rw-r--r--src/surveyor.cpp135
-rw-r--r--src/surveyor.hpp77
-rw-r--r--src/xrespondent.cpp278
-rw-r--r--src/xrespondent.hpp123
-rw-r--r--src/xsurveyor.cpp89
-rw-r--r--src/xsurveyor.hpp85
-rw-r--r--tests/Makefile.am4
-rw-r--r--tests/survey.cpp135
-rw-r--r--tests/tests.cpp7
20 files changed, 1214 insertions, 6 deletions
diff --git a/.gitignore b/.gitignore
index 2de95c2..2db7190 100755
--- a/.gitignore
+++ b/.gitignore
@@ -49,6 +49,7 @@ tests/polltimeo
tests/wireformat
tests/libzmq21
tests/resubscribe
+tests/survey
src/platform.hpp*
src/stamp-h1
perf/*.exe
diff --git a/builds/msvc/libxs/libxs.vcxproj b/builds/msvc/libxs/libxs.vcxproj
index bbab5fd..d4f40ca 100644
--- a/builds/msvc/libxs/libxs.vcxproj
+++ b/builds/msvc/libxs/libxs.vcxproj
@@ -143,12 +143,14 @@
<ClCompile Include="..\..\..\src\reaper.cpp" />
<ClCompile Include="..\..\..\src\rep.cpp" />
<ClCompile Include="..\..\..\src\req.cpp" />
+ <ClCompile Include="..\..\..\src\respondent.cpp" />
<ClCompile Include="..\..\..\src\select.cpp" />
<ClCompile Include="..\..\..\src\session_base.cpp" />
<ClCompile Include="..\..\..\src\signaler.cpp" />
<ClCompile Include="..\..\..\src\socket_base.cpp" />
<ClCompile Include="..\..\..\src\stream_engine.cpp" />
<ClCompile Include="..\..\..\src\sub.cpp" />
+ <ClCompile Include="..\..\..\src\surveyor.cpp" />
<ClCompile Include="..\..\..\src\tcp_address.cpp" />
<ClCompile Include="..\..\..\src\tcp_connecter.cpp" />
<ClCompile Include="..\..\..\src\tcp_listener.cpp" />
@@ -157,8 +159,10 @@
<ClCompile Include="..\..\..\src\xpub.cpp" />
<ClCompile Include="..\..\..\src\xrep.cpp" />
<ClCompile Include="..\..\..\src\xreq.cpp" />
+ <ClCompile Include="..\..\..\src\xrespondent.cpp" />
<ClCompile Include="..\..\..\src\xsub.cpp" />
<ClCompile Include="..\..\..\src\xs.cpp" />
+ <ClCompile Include="..\..\..\src\xsurveyor.cpp" />
<ClCompile Include="..\..\..\src\xszmq.cpp" />
</ItemGroup>
<ItemGroup>
@@ -211,6 +215,7 @@
<ClInclude Include="..\..\..\src\reaper.hpp" />
<ClInclude Include="..\..\..\src\rep.hpp" />
<ClInclude Include="..\..\..\src\req.hpp" />
+ <ClInclude Include="..\..\..\src\respondent.hpp" />
<ClInclude Include="..\..\..\src\select.hpp" />
<ClInclude Include="..\..\..\src\session_base.hpp" />
<ClInclude Include="..\..\..\src\signaler.hpp" />
@@ -218,6 +223,7 @@
<ClInclude Include="..\..\..\src\stdint.hpp" />
<ClInclude Include="..\..\..\src\stream_engine.hpp" />
<ClInclude Include="..\..\..\src\sub.hpp" />
+ <ClInclude Include="..\..\..\src\surveyor.hpp" />
<ClInclude Include="..\..\..\src\tcp_address.hpp" />
<ClInclude Include="..\..\..\src\tcp_connecter.hpp" />
<ClInclude Include="..\..\..\src\tcp_listener.hpp" />
@@ -228,7 +234,9 @@
<ClInclude Include="..\..\..\src\xpub.hpp" />
<ClInclude Include="..\..\..\src\xrep.hpp" />
<ClInclude Include="..\..\..\src\xreq.hpp" />
+ <ClInclude Include="..\..\..\src\xrespondent.hpp" />
<ClInclude Include="..\..\..\src\xsub.hpp" />
+ <ClInclude Include="..\..\..\src\xsurveyor.hpp" />
<ClInclude Include="..\..\..\src\ypipe.hpp" />
<ClInclude Include="..\..\..\src\yqueue.hpp" />
<ClInclude Include="..\platform.hpp" />
@@ -236,4 +244,4 @@
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
-</Project>
+</Project> \ No newline at end of file
diff --git a/builds/msvc/libxs/libxs.vcxproj.filters b/builds/msvc/libxs/libxs.vcxproj.filters
index eb0f505..1fefb60 100644
--- a/builds/msvc/libxs/libxs.vcxproj.filters
+++ b/builds/msvc/libxs/libxs.vcxproj.filters
@@ -176,6 +176,18 @@
<ClCompile Include="..\..\..\src\prefix_filter.cpp">
<Filter>Source Files</Filter>
</ClCompile>
+ <ClCompile Include="..\..\..\src\respondent.cpp">
+ <Filter>Source Files</Filter>
+ </ClCompile>
+ <ClCompile Include="..\..\..\src\xrespondent.cpp">
+ <Filter>Source Files</Filter>
+ </ClCompile>
+ <ClCompile Include="..\..\..\src\surveyor.cpp">
+ <Filter>Source Files</Filter>
+ </ClCompile>
+ <ClCompile Include="..\..\..\src\xsurveyor.cpp">
+ <Filter>Source Files</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\..\include\xs.h">
@@ -388,5 +400,17 @@
<ClInclude Include="..\..\..\src\prefix_filter.hpp">
<Filter>Header Files</Filter>
</ClInclude>
+ <ClInclude Include="..\..\..\src\respondent.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\..\src\surveyor.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\..\src\xrespondent.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
+ <ClInclude Include="..\..\..\src\xsurveyor.hpp">
+ <Filter>Header Files</Filter>
+ </ClInclude>
</ItemGroup>
-</Project>
+</Project> \ No newline at end of file
diff --git a/builds/msvc/tests/tests.vcxproj b/builds/msvc/tests/tests.vcxproj
index bb707c4..d1258e4 100644
--- a/builds/msvc/tests/tests.vcxproj
+++ b/builds/msvc/tests/tests.vcxproj
@@ -151,6 +151,10 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
</ClCompile>
+ <ClCompile Include="..\..\..\tests\survey.cpp">
+ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
+ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
+ </ClCompile>
<ClCompile Include="..\..\..\tests\tests.cpp" />
<ClCompile Include="..\..\..\tests\timeo.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
@@ -184,4 +188,4 @@
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
-</Project>
+</Project> \ No newline at end of file
diff --git a/builds/msvc/tests/tests.vcxproj.filters b/builds/msvc/tests/tests.vcxproj.filters
index b1eade2..0fb41a8 100644
--- a/builds/msvc/tests/tests.vcxproj.filters
+++ b/builds/msvc/tests/tests.vcxproj.filters
@@ -65,6 +65,9 @@
<ClCompile Include="..\..\..\tests\resubscribe.cpp">
<Filter>Header Files</Filter>
</ClCompile>
+ <ClCompile Include="..\..\..\tests\survey.cpp">
+ <Filter>Header Files</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<Filter Include="Header Files">
@@ -76,4 +79,4 @@
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>
-</Project>
+</Project> \ No newline at end of file
diff --git a/include/xs.h b/include/xs.h
index 17a557b..34d6c60 100644
--- a/include/xs.h
+++ b/include/xs.h
@@ -172,6 +172,10 @@ XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval,
#define XS_PUSH 8
#define XS_XPUB 9
#define XS_XSUB 10
+#define XS_SURVEYOR 11
+#define XS_RESPONDENT 12
+#define XS_XSURVEYOR 13
+#define XS_XRESPONDENT 14
/* Legacy socket type aliases. */
#define XS_ROUTER XS_XREP
diff --git a/src/Makefile.am b/src/Makefile.am
index 81ece53..3025e13 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -64,6 +64,7 @@ libxs_la_SOURCES = \
reaper.hpp \
rep.hpp \
req.hpp \
+ respondent.hpp \
select.hpp \
session_base.hpp \
signaler.hpp \
@@ -71,6 +72,7 @@ libxs_la_SOURCES = \
stdint.hpp \
stream_engine.hpp \
sub.hpp \
+ surveyor.hpp \
tcp_address.hpp \
tcp_connecter.hpp \
tcp_listener.hpp \
@@ -81,7 +83,9 @@ libxs_la_SOURCES = \
xpub.hpp \
xrep.hpp \
xreq.hpp \
+ xrespondent.hpp \
xsub.hpp \
+ xsurveyor.hpp \
ypipe.hpp \
yqueue.hpp \
clock.cpp \
@@ -121,12 +125,14 @@ libxs_la_SOURCES = \
random.cpp \
rep.cpp \
req.cpp \
+ respondent.cpp \
select.cpp \
session_base.cpp \
signaler.cpp \
socket_base.cpp \
stream_engine.cpp \
sub.cpp \
+ surveyor.cpp \
tcp_address.cpp \
tcp_connecter.cpp \
tcp_listener.cpp \
@@ -135,7 +141,9 @@ libxs_la_SOURCES = \
xpub.cpp \
xrep.cpp \
xreq.cpp \
+ xrespondent.cpp \
xsub.cpp \
+ xsurveyor.cpp \
xs.cpp
if ON_MINGW
diff --git a/src/respondent.cpp b/src/respondent.cpp
new file mode 100644
index 0000000..c2abd84
--- /dev/null
+++ b/src/respondent.cpp
@@ -0,0 +1,114 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "respondent.hpp"
+#include "err.hpp"
+#include "msg.hpp"
+
+xs::respondent_t::respondent_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
+ xrespondent_t (parent_, tid_, sid_),
+ sending_reply (false)
+{
+ options.type = XS_RESPONDENT;
+}
+
+xs::respondent_t::~respondent_t ()
+{
+}
+
+int xs::respondent_t::xsend (msg_t *msg_, int flags_)
+{
+ // If there's no ongoing survey, we cannot send reply.
+ if (!sending_reply) {
+ errno = EFSM;
+ return -1;
+ }
+
+ // Survey pattern doesn't support multipart messages.
+ if (msg_->flags () & msg_t::more || flags_ & XS_SNDMORE) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ // Push message to the reply pipe.
+ int rc = xrespondent_t::xsend (msg_, flags_);
+ if (rc != 0)
+ return rc;
+
+ // Flip the FSM back to request receiving state.
+ sending_reply = false;
+
+ return 0;
+}
+
+int xs::respondent_t::xrecv (msg_t *msg_, int flags_)
+{
+ // If we are in middle of sending a reply, we cannot receive next survey.
+ if (sending_reply) {
+ errno = EFSM;
+ return -1;
+ }
+
+ // First thing to do when receiving a srvey is to copy all the labels
+ // to the reply pipe.
+ while (true) {
+ int rc = xrespondent_t::xrecv (msg_, flags_);
+ if (rc != 0)
+ return rc;
+ if (!(msg_->flags () & msg_t::more))
+ break;
+ rc = xrespondent_t::xsend (msg_, flags_);
+ errno_assert (rc == 0);
+ }
+
+ // When whole survey is read, flip the FSM to reply-sending state.
+ sending_reply = true;
+
+ return 0;
+}
+
+bool xs::respondent_t::xhas_in ()
+{
+ if (sending_reply)
+ return false;
+
+ return xrespondent_t::xhas_in ();
+}
+
+bool xs::respondent_t::xhas_out ()
+{
+ if (!sending_reply)
+ return false;
+
+ return xrespondent_t::xhas_out ();
+}
+
+xs::respondent_session_t::respondent_session_t (io_thread_t *io_thread_,
+ bool connect_, socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ xrespondent_session_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+xs::respondent_session_t::~respondent_session_t ()
+{
+}
+
diff --git a/src/respondent.hpp b/src/respondent.hpp
new file mode 100644
index 0000000..b315350
--- /dev/null
+++ b/src/respondent.hpp
@@ -0,0 +1,77 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_RESPONDENT_HPP_INCLUDED__
+#define __XS_RESPONDENT_HPP_INCLUDED__
+
+#include "xrespondent.hpp"
+
+namespace xs
+{
+
+ class ctx_t;
+ class msg_t;
+ class io_thread_t;
+ class socket_base_t;
+
+ class respondent_t : public xrespondent_t
+ {
+ public:
+
+ respondent_t (xs::ctx_t *parent_, uint32_t tid_, int sid_);
+ ~respondent_t ();
+
+ // Overloads of functions from socket_base_t.
+ int xsend (xs::msg_t *msg_, int flags_);
+ int xrecv (xs::msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ // If true, we are in process of sending the reply. If false we are
+ // in process of receiving a request.
+ // TODO: Consider whether automatic cancelling of the previous request
+ // when recv is called again is not a better idea than returning EFSM.
+ bool sending_reply;
+
+ respondent_t (const respondent_t&);
+ const respondent_t &operator = (const respondent_t&);
+
+ };
+
+ class respondent_session_t : public xrespondent_session_t
+ {
+ public:
+
+ respondent_session_t (xs::io_thread_t *io_thread_, bool connect_,
+ xs::socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~respondent_session_t ();
+
+ private:
+
+ respondent_session_t (const respondent_session_t&);
+ const respondent_session_t &operator = (const respondent_session_t&);
+ };
+
+}
+
+#endif
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 90fa071..35c4a4e 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -42,6 +42,10 @@
#include "push.hpp"
#include "pull.hpp"
#include "pair.hpp"
+#include "surveyor.hpp"
+#include "xsurveyor.hpp"
+#include "respondent.hpp"
+#include "xrespondent.hpp"
xs::session_base_t *xs::session_base_t::create (class io_thread_t *io_thread_,
bool connect_, class socket_base_t *socket_, const options_t &options_,
@@ -93,6 +97,22 @@ xs::session_base_t *xs::session_base_t::create (class io_thread_t *io_thread_,
s = new (std::nothrow) pair_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
+ case XS_SURVEYOR:
+ s = new (std::nothrow) surveyor_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case XS_XSURVEYOR:
+ s = new (std::nothrow) xsurveyor_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case XS_RESPONDENT:
+ s = new (std::nothrow) respondent_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
+ case XS_XRESPONDENT:
+ s = new (std::nothrow) xrespondent_session_t (io_thread_, connect_,
+ socket_, options_, protocol_, address_);
+ break;
default:
errno = EINVAL;
return NULL;
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index eb9b491..df182f1 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -61,6 +61,10 @@
#include "xrep.hpp"
#include "xpub.hpp"
#include "xsub.hpp"
+#include "surveyor.hpp"
+#include "xsurveyor.hpp"
+#include "respondent.hpp"
+#include "xrespondent.hpp"
bool xs::socket_base_t::check_tag ()
{
@@ -106,6 +110,18 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_,
case XS_XSUB:
s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
break;
+ case XS_SURVEYOR:
+ s = new (std::nothrow) surveyor_t (parent_, tid_, sid_);
+ break;
+ case XS_XSURVEYOR:
+ s = new (std::nothrow) xsurveyor_t (parent_, tid_, sid_);
+ break;
+ case XS_RESPONDENT:
+ s = new (std::nothrow) respondent_t (parent_, tid_, sid_);
+ break;
+ case XS_XRESPONDENT:
+ s = new (std::nothrow) xrespondent_t (parent_, tid_, sid_);
+ break;
default:
errno = EINVAL;
return NULL;
diff --git a/src/surveyor.cpp b/src/surveyor.cpp
new file mode 100644
index 0000000..c5216bd
--- /dev/null
+++ b/src/surveyor.cpp
@@ -0,0 +1,135 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "surveyor.hpp"
+#include "err.hpp"
+#include "msg.hpp"
+#include "wire.hpp"
+#include "likely.hpp"
+#include "random.hpp"
+
+xs::surveyor_t::surveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
+ xsurveyor_t (parent_, tid_, sid_),
+ receiving_responses (false),
+ survey_id (generate_random ())
+{
+ options.type = XS_SURVEYOR;
+}
+
+xs::surveyor_t::~surveyor_t ()
+{
+}
+
+int xs::surveyor_t::xsend (msg_t *msg_, int flags_)
+{
+ int rc;
+
+ // Survey pattern works only with sigle-part messages.
+ if (flags_ & XS_SNDMORE || msg_->flags () & msg_t::more) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ // Start the new survey. First, generate new survey ID.
+ ++survey_id;
+ msg_t id;
+ rc = id.init_size (4);
+ errno_assert (rc == 0);
+ put_uint32 ((unsigned char*) id.data (), survey_id);
+ id.set_flags (msg_t::more);
+ rc = xsurveyor_t::xsend (&id, 0);
+ if (rc != 0) {
+ id.close ();
+ return -1;
+ }
+ id.close ();
+
+ // Now send the body of the survey.
+ rc = xsurveyor_t::xsend (msg_, flags_);
+ errno_assert (rc == 0);
+
+ // Start waiting for responses from the peers.
+ receiving_responses = true;
+
+ return 0;
+}
+
+int xs::surveyor_t::xrecv (msg_t *msg_, int flags_)
+{
+ int rc;
+
+ // If there's no survey underway, it's an error.
+ if (!receiving_responses) {
+ errno = EFSM;
+ return -1;
+ }
+
+ // Get the first part of the response -- the survey ID.
+ rc = xsurveyor_t::xrecv (msg_, flags_);
+ if (rc != 0)
+ return rc;
+
+ // Check whether this is response for the onging survey. If not, we can
+ // drop the response.
+ if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 4 ||
+ get_uint32 ((unsigned char*) msg_->data ()) != survey_id)) {
+ while (true) {
+ rc = xsurveyor_t::xrecv (msg_, flags_);
+ errno_assert (rc == 0);
+ if (!(msg_->flags () & msg_t::more))
+ break;
+ }
+ msg_->close ();
+ msg_->init ();
+ errno = EAGAIN;
+ return -1;
+ }
+
+ // Get the body of the response.
+ rc = xsurveyor_t::xrecv (msg_, flags_);
+ errno_assert (rc == 0);
+
+ return 0;
+}
+
+bool xs::surveyor_t::xhas_in ()
+{
+ // TODO: We need a prefetched_message here...
+ xs_assert (false);
+ return false;
+}
+
+bool xs::surveyor_t::xhas_out ()
+{
+ return xsurveyor_t::xhas_out ();
+}
+
+xs::surveyor_session_t::surveyor_session_t (io_thread_t *io_thread_,
+ bool connect_, socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ xsurveyor_session_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+xs::surveyor_session_t::~surveyor_session_t ()
+{
+}
+
diff --git a/src/surveyor.hpp b/src/surveyor.hpp
new file mode 100644
index 0000000..83fdbb0
--- /dev/null
+++ b/src/surveyor.hpp
@@ -0,0 +1,77 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_SURVEYOR_HPP_INCLUDED__
+#define __XS_SURVEYOR_HPP_INCLUDED__
+
+#include "xsurveyor.hpp"
+#include "stdint.hpp"
+
+namespace xs
+{
+
+ class ctx_t;
+ class msg_t;
+ class io_thread_t;
+ class socket_base_t;
+
+ class surveyor_t : public xsurveyor_t
+ {
+ public:
+
+ surveyor_t (xs::ctx_t *parent_, uint32_t tid_, int sid_);
+ ~surveyor_t ();
+
+ // Overloads of functions from socket_base_t.
+ int xsend (xs::msg_t *msg_, int flags_);
+ int xrecv (xs::msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ // If true, survey was already lauched and have no expriered yet.
+ bool receiving_responses;
+
+ // The ID of the ongoing survey.
+ uint32_t survey_id;
+
+ surveyor_t (const surveyor_t&);
+ const surveyor_t &operator = (const surveyor_t&);
+ };
+
+ class surveyor_session_t : public xsurveyor_session_t
+ {
+ public:
+
+ surveyor_session_t (xs::io_thread_t *io_thread_, bool connect_,
+ xs::socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~surveyor_session_t ();
+
+ private:
+
+ surveyor_session_t (const surveyor_session_t&);
+ const surveyor_session_t &operator = (const surveyor_session_t&);
+ };
+
+}
+
+#endif
diff --git a/src/xrespondent.cpp b/src/xrespondent.cpp
new file mode 100644
index 0000000..026fe8d
--- /dev/null
+++ b/src/xrespondent.cpp
@@ -0,0 +1,278 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "xrespondent.hpp"
+#include "pipe.hpp"
+#include "wire.hpp"
+#include "random.hpp"
+#include "likely.hpp"
+#include "err.hpp"
+
+xs::xrespondent_t::xrespondent_t (class ctx_t *parent_, uint32_t tid_,
+ int sid_) :
+ socket_base_t (parent_, tid_, sid_),
+ prefetched (0),
+ more_in (false),
+ current_out (NULL),
+ more_out (false),
+ next_peer_id (generate_random ())
+{
+ options.type = XS_XRESPONDENT;
+
+ prefetched_msg.init ();
+}
+
+xs::xrespondent_t::~xrespondent_t ()
+{
+ xs_assert (outpipes.empty ());
+ prefetched_msg.close ();
+}
+
+void xs::xrespondent_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
+{
+ xs_assert (pipe_);
+
+ // Add the pipe to the map out outbound pipes.
+ outpipe_t outpipe = {pipe_, true};
+ bool ok = outpipes.insert (outpipes_t::value_type (
+ next_peer_id, outpipe)).second;
+ xs_assert (ok);
+
+ // Add the pipe to the list of inbound pipes.
+ blob_t identity (4, 0);
+ put_uint32 ((unsigned char*) identity.data (), next_peer_id);
+ pipe_->set_identity (identity);
+ fq.attach (pipe_);
+
+ // Generate a new unique peer identity.
+ ++next_peer_id;
+}
+
+void xs::xrespondent_t::xterminated (pipe_t *pipe_)
+{
+ fq.terminated (pipe_);
+
+ for (outpipes_t::iterator it = outpipes.begin ();
+ it != outpipes.end (); ++it) {
+ if (it->second.pipe == pipe_) {
+ outpipes.erase (it);
+ if (pipe_ == current_out)
+ current_out = NULL;
+ return;
+ }
+ }
+ xs_assert (false);
+}
+
+void xs::xrespondent_t::xread_activated (pipe_t *pipe_)
+{
+ fq.activated (pipe_);
+}
+
+void xs::xrespondent_t::xwrite_activated (pipe_t *pipe_)
+{
+ for (outpipes_t::iterator it = outpipes.begin ();
+ it != outpipes.end (); ++it) {
+ if (it->second.pipe == pipe_) {
+ xs_assert (!it->second.active);
+ it->second.active = true;
+ return;
+ }
+ }
+ xs_assert (false);
+}
+
+int xs::xrespondent_t::xsend (msg_t *msg_, int flags_)
+{
+ // If this is the first part of the message it's the ID of the
+ // peer to send the message to.
+ if (!more_out) {
+ xs_assert (!current_out);
+
+ // If we have malformed message (prefix with no subsequent message)
+ // then just silently ignore it.
+ // TODO: The connections should be killed instead.
+ if (msg_->flags () & msg_t::more && msg_->size () == 4) {
+
+ more_out = true;
+
+ // Find the pipe associated with the identity stored in the prefix.
+ // If there's no such pipe just silently ignore the message.
+ uint32_t identity = get_uint32 ((unsigned char*) msg_->data ());
+ outpipes_t::iterator it = outpipes.find (identity);
+
+ if (it != outpipes.end ()) {
+ current_out = it->second.pipe;
+ msg_t empty;
+ int rc = empty.init ();
+ errno_assert (rc == 0);
+ if (!current_out->check_write (&empty)) {
+ it->second.active = false;
+ more_out = false;
+ current_out = NULL;
+ }
+ rc = empty.close ();
+ errno_assert (rc == 0);
+ }
+
+ }
+
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ rc = msg_->init ();
+ errno_assert (rc == 0);
+ return 0;
+ }
+
+ // Check whether this is the last part of the message.
+ more_out = msg_->flags () & msg_t::more ? true : false;
+
+ // Push the message into the pipe. If there's no out pipe, just drop it.
+ if (current_out) {
+ bool ok = current_out->write (msg_);
+ if (unlikely (!ok))
+ current_out = NULL;
+ else if (!more_out) {
+ current_out->flush ();
+ current_out = NULL;
+ }
+ }
+ else {
+ int rc = msg_->close ();
+ errno_assert (rc == 0);
+ }
+
+ // Detach the message from the data buffer.
+ int rc = msg_->init ();
+ errno_assert (rc == 0);
+
+ return 0;
+}
+
+int xs::xrespondent_t::xrecv (msg_t *msg_, int flags_)
+{
+ int rc;
+
+ // if there is a prefetched identity, return it.
+ if (prefetched == 2)
+ {
+ rc = msg_->init_size (prefetched_id.size ());
+ errno_assert (rc == 0);
+ memcpy (msg_->data (), prefetched_id.data (), prefetched_id.size ());
+ msg_->set_flags (msg_t::more);
+ prefetched = 1;
+ return 0;
+ }
+
+ // If there is a prefetched message, return it.
+ if (prefetched == 1) {
+ rc = msg_->move (prefetched_msg);
+ errno_assert (rc == 0);
+ more_in = msg_->flags () & msg_t::more ? true : false;
+ prefetched = 0;
+ return 0;
+ }
+
+ // Get next message part.
+ pipe_t *pipe = NULL;
+ rc = fq.recvpipe (msg_, flags_, &pipe);
+ if (rc != 0)
+ return -1;
+
+ // If we are in the middle of reading a message, just return the next part.
+ if (more_in) {
+ more_in = msg_->flags () & msg_t::more ? true : false;
+ return 0;
+ }
+
+ // We are at the beginning of a new message. Move the message part we
+ // have to the prefetched and return the ID of the peer instead.
+ rc = prefetched_msg.move (*msg_);
+ errno_assert (rc == 0);
+ prefetched = 1;
+ rc = msg_->close ();
+ errno_assert (rc == 0);
+
+ blob_t identity = pipe->get_identity ();
+ rc = msg_->init_size (identity.size ());
+ errno_assert (rc == 0);
+ memcpy (msg_->data (), identity.data (), identity.size ());
+ msg_->set_flags (msg_t::more);
+ return 0;
+}
+
+int xs::xrespondent_t::rollback (void)
+{
+ if (current_out) {
+ current_out->rollback ();
+ current_out = NULL;
+ more_out = false;
+ }
+ return 0;
+}
+
+bool xs::xrespondent_t::xhas_in ()
+{
+ // If we are in the middle of reading the messages, there are
+ // definitely more parts available.
+ if (more_in)
+ return true;
+
+ // We may already have a message pre-fetched.
+ if (prefetched)
+ return true;
+
+ // Try to read the next message to the pre-fetch buffer. If anything,
+ // it will be identity of the peer sending the message.
+ msg_t id;
+ id.init ();
+ int rc = xrespondent_t::xrecv (&id, XS_DONTWAIT);
+ if (rc != 0 && errno == EAGAIN) {
+ id.close ();
+ return false;
+ }
+ xs_assert (rc == 0);
+
+ // We have first part of the message prefetched now. We will store the
+ // prefetched identity as well.
+ prefetched_id.assign ((unsigned char*) id.data (), id.size ());
+ id.close ();
+ prefetched = 2;
+
+ return true;
+}
+
+bool xs::xrespondent_t::xhas_out ()
+{
+ return true;
+}
+
+xs::xrespondent_session_t::xrespondent_session_t (io_thread_t *io_thread_,
+ bool connect_, socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ session_base_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+xs::xrespondent_session_t::~xrespondent_session_t ()
+{
+}
+
diff --git a/src/xrespondent.hpp b/src/xrespondent.hpp
new file mode 100644
index 0000000..eb6ac9a
--- /dev/null
+++ b/src/xrespondent.hpp
@@ -0,0 +1,123 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_XRESPONDENT_HPP_INCLUDED__
+#define __XS_XRESPONDENT_HPP_INCLUDED__
+
+#include <map>
+
+#include "socket_base.hpp"
+#include "session_base.hpp"
+#include "stdint.hpp"
+#include "blob.hpp"
+#include "msg.hpp"
+#include "fq.hpp"
+
+namespace xs
+{
+
+ class ctx_t;
+ class pipe_t;
+ class io_thread_t;
+
+ // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
+ class xrespondent_t :
+ public socket_base_t
+ {
+ public:
+
+ xrespondent_t (xs::ctx_t *parent_, uint32_t tid_, int sid_);
+ ~xrespondent_t ();
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
+ int xsend (msg_t *msg_, int flags_);
+ int xrecv (msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+ void xread_activated (xs::pipe_t *pipe_);
+ void xwrite_activated (xs::pipe_t *pipe_);
+ void xterminated (xs::pipe_t *pipe_);
+
+ protected:
+
+ // Rollback any message parts that were sent but not yet flushed.
+ int rollback ();
+
+ private:
+
+ // Fair queueing object for inbound pipes.
+ fq_t fq;
+
+ // This value is either 0 (nothing is prefetched), 1 (only message body
+ // is prefetched) or 2 (both identity and message body are prefetched).
+ int prefetched;
+
+ // Holds the prefetched identity.
+ blob_t prefetched_id;
+
+ // Holds the prefetched message.
+ msg_t prefetched_msg;
+
+ // If true, more incoming message parts are expected.
+ bool more_in;
+
+ struct outpipe_t
+ {
+ xs::pipe_t *pipe;
+ bool active;
+ };
+
+ // Outbound pipes indexed by the peer IDs.
+ typedef std::map <uint32_t, outpipe_t> outpipes_t;
+ outpipes_t outpipes;
+
+ // The pipe we are currently writing to.
+ xs::pipe_t *current_out;
+
+ // If true, more outgoing message parts are expected.
+ bool more_out;
+
+ // Peer ID are generated. It's a simple increment and wrap-over
+ // algorithm. This value is the next ID to use (if not used already).
+ uint32_t next_peer_id;
+
+ xrespondent_t (const xrespondent_t&);
+ const xrespondent_t &operator = (const xrespondent_t&);
+ };
+
+ class xrespondent_session_t : public session_base_t
+ {
+ public:
+
+ xrespondent_session_t (xs::io_thread_t *io_thread_, bool connect_,
+ socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~xrespondent_session_t ();
+
+ private:
+
+ xrespondent_session_t (const xrespondent_session_t&);
+ const xrespondent_session_t &operator = (const xrespondent_session_t&);
+ };
+
+}
+
+#endif
diff --git a/src/xsurveyor.cpp b/src/xsurveyor.cpp
new file mode 100644
index 0000000..4265437
--- /dev/null
+++ b/src/xsurveyor.cpp
@@ -0,0 +1,89 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "xsurveyor.hpp"
+#include "err.hpp"
+#include "msg.hpp"
+
+xs::xsurveyor_t::xsurveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
+ socket_base_t (parent_, tid_, sid_)
+{
+ options.type = XS_XSURVEYOR;
+}
+
+xs::xsurveyor_t::~xsurveyor_t ()
+{
+}
+
+void xs::xsurveyor_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
+{
+ xs_assert (pipe_);
+ fq.attach (pipe_);
+ dist.attach (pipe_);
+}
+
+int xs::xsurveyor_t::xsend (msg_t *msg_, int flags_)
+{
+ return dist.send_to_all (msg_, flags_);
+}
+
+int xs::xsurveyor_t::xrecv (msg_t *msg_, int flags_)
+{
+ return fq.recv (msg_, flags_);
+}
+
+bool xs::xsurveyor_t::xhas_in ()
+{
+ return fq.has_in ();
+}
+
+bool xs::xsurveyor_t::xhas_out ()
+{
+ return dist.has_out ();
+}
+
+void xs::xsurveyor_t::xread_activated (pipe_t *pipe_)
+{
+ fq.activated (pipe_);
+}
+
+void xs::xsurveyor_t::xwrite_activated (pipe_t *pipe_)
+{
+ dist.activated (pipe_);
+}
+
+void xs::xsurveyor_t::xterminated (pipe_t *pipe_)
+{
+ fq.terminated (pipe_);
+ dist.terminated (pipe_);
+}
+
+xs::xsurveyor_session_t::xsurveyor_session_t (io_thread_t *io_thread_,
+ bool connect_, socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_) :
+ session_base_t (io_thread_, connect_, socket_, options_, protocol_,
+ address_)
+{
+}
+
+xs::xsurveyor_session_t::~xsurveyor_session_t ()
+{
+}
+
diff --git a/src/xsurveyor.hpp b/src/xsurveyor.hpp
new file mode 100644
index 0000000..b0c3f9c
--- /dev/null
+++ b/src/xsurveyor.hpp
@@ -0,0 +1,85 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __XS_XSURVEYOR_HPP_INCLUDED__
+#define __XS_XSURVEYOR_HPP_INCLUDED__
+
+#include "socket_base.hpp"
+#include "session_base.hpp"
+#include "dist.hpp"
+#include "fq.hpp"
+
+namespace xs
+{
+
+ class ctx_t;
+ class msg_t;
+ class pipe_t;
+ class io_thread_t;
+ class socket_base_t;
+
+ class xsurveyor_t : public socket_base_t
+ {
+ public:
+
+ xsurveyor_t (xs::ctx_t *parent_, uint32_t tid_, int sid_);
+ ~xsurveyor_t ();
+
+ protected:
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_);
+ int xsend (xs::msg_t *msg_, int flags_);
+ int xrecv (xs::msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+ void xread_activated (xs::pipe_t *pipe_);
+ void xwrite_activated (xs::pipe_t *pipe_);
+ void xterminated (xs::pipe_t *pipe_);
+
+ private:
+
+ // Inbound messages are fair-queued from inbound pipes.
+ // Outbound messages are distributed to all outbound pipes.
+ fq_t fq;
+ dist_t dist;
+
+ xsurveyor_t (const xsurveyor_t&);
+ const xsurveyor_t &operator = (const xsurveyor_t&);
+ };
+
+ class xsurveyor_session_t : public session_base_t
+ {
+ public:
+
+ xsurveyor_session_t (xs::io_thread_t *io_thread_, bool connect_,
+ xs::socket_base_t *socket_, const options_t &options_,
+ const char *protocol_, const char *address_);
+ ~xsurveyor_session_t ();
+
+ private:
+
+ xsurveyor_session_t (const xsurveyor_session_t&);
+ const xsurveyor_session_t &operator = (const xsurveyor_session_t&);
+ };
+
+}
+
+#endif
diff --git a/tests/Makefile.am b/tests/Makefile.am
index af826af..ba76260 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -23,7 +23,8 @@ noinst_PROGRAMS = pair_inproc \
polltimeo \
wireformat \
libzmq21 \
- resubscribe
+ resubscribe \
+ survey
pair_inproc_SOURCES = pair_inproc.cpp testutil.hpp
pair_tcp_SOURCES = pair_tcp.cpp testutil.hpp
@@ -46,5 +47,6 @@ polltimeo_SOURCES = polltimeo.cpp testutil.hpp
wireformat_SOURCES = wireformat.cpp
libzmq21_SOURCES = libzmq21.cpp
resubscribe_SOURCES = resubscribe.cpp
+survey_SOURCES = survey.cpp
TESTS = $(noinst_PROGRAMS)
diff --git a/tests/survey.cpp b/tests/survey.cpp
new file mode 100644
index 0000000..41f39aa
--- /dev/null
+++ b/tests/survey.cpp
@@ -0,0 +1,135 @@
+/*
+ Copyright (c) 2012 250bpm s.r.o.
+ Copyright (c) 2012 Other contributors as noted in the AUTHORS file
+
+ This file is part of Crossroads I/O project.
+
+ Crossroads I/O is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ Crossroads is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "testutil.hpp"
+
+int XS_TEST_MAIN ()
+{
+ int rc;
+ char buf [32];
+
+ fprintf (stderr, "survey test running...\n");
+
+ // Create the basic infrastructure.
+ void *ctx = xs_init ();
+ assert (ctx);
+ void *xsurveyor = xs_socket (ctx, XS_XSURVEYOR);
+ assert (xsurveyor);
+ rc = xs_bind (xsurveyor, "inproc://a");
+ assert (rc == 0);
+ void *xrespondent = xs_socket (ctx, XS_XRESPONDENT);
+ assert (xrespondent);
+ rc = xs_bind (xrespondent, "inproc://b");
+ assert (rc == 0);
+ void *surveyor = xs_socket (ctx, XS_SURVEYOR);
+ assert (surveyor);
+ rc = xs_connect (surveyor, "inproc://b");
+ assert (rc == 0);
+ void *respondent1 = xs_socket (ctx, XS_RESPONDENT);
+ assert (respondent1);
+ rc = xs_connect (respondent1, "inproc://a");
+ assert (rc == 0);
+ void *respondent2 = xs_socket (ctx, XS_RESPONDENT);
+ assert (respondent2);
+ rc = xs_connect (respondent2, "inproc://a");
+ assert (rc == 0);
+
+ // Send the survey.
+ rc = xs_send (surveyor, "ABC", 3, 0);
+ assert (rc == 3);
+
+ // Forward the survey through the intermediate device.
+ // Survey consist of identity (4 bytes), survey ID (4 bytes) and the body.
+ rc = xs_recv (xrespondent, buf, sizeof (buf), 0);
+ assert (rc == 4);
+ rc = xs_send (xsurveyor, buf, 4, XS_SNDMORE);
+ assert (rc == 4);
+ rc = xs_recv (xrespondent, buf, sizeof (buf), 0);
+ assert (rc == 4);
+ rc = xs_send (xsurveyor, buf, 4, XS_SNDMORE);
+ assert (rc == 4);
+ rc = xs_recv (xrespondent, buf, sizeof (buf), 0);
+ assert (rc == 3);
+ rc = xs_send (xsurveyor, buf, 3, 0);
+ assert (rc == 3);
+
+ // Respondent 1 responds to the survey.
+ rc = xs_recv (respondent1, buf, sizeof (buf), 0);
+ assert (rc == 3);
+ rc = xs_send (respondent1, "DE", 2, 0);
+ assert (rc == 2);
+
+ // Forward the response through the intermediate device.
+ rc = xs_recv (xsurveyor, buf, sizeof (buf), 0);
+ assert (rc == 4);
+ rc = xs_send (xrespondent, buf, 4, XS_SNDMORE);
+ assert (rc == 4);
+ rc = xs_recv (xsurveyor, buf, sizeof (buf), 0);
+ assert (rc == 4);
+ rc = xs_send (xrespondent, buf, 4, XS_SNDMORE);
+ assert (rc == 4);
+ rc = xs_recv (xsurveyor, buf, sizeof (buf), 0);
+ assert (rc == 2);
+ rc = xs_send (xrespondent, buf, 2, 0);
+ assert (rc == 2);
+
+ // Surveyor gets the response.
+ rc = xs_recv (surveyor, buf, sizeof (buf), 0);
+ assert (rc == 2);
+
+ // Respondent 2 responds to the survey.
+ rc = xs_recv (respondent2, buf, sizeof (buf), 0);
+ assert (rc == 3);
+ rc = xs_send (respondent2, "FGHI", 4, 0);
+ assert (rc == 4);
+
+ // Forward the response through the intermediate device.
+ rc = xs_recv (xsurveyor, buf, sizeof (buf), 0);
+ assert (rc == 4);
+ rc = xs_send (xrespondent, buf, 4, XS_SNDMORE);
+ assert (rc == 4);
+ rc = xs_recv (xsurveyor, buf, sizeof (buf), 0);
+ assert (rc == 4);
+ rc = xs_send (xrespondent, buf, 4, XS_SNDMORE);
+ assert (rc == 4);
+ rc = xs_recv (xsurveyor, buf, sizeof (buf), 0);
+ assert (rc == 4);
+ rc = xs_send (xrespondent, buf, 4, 0);
+ assert (rc == 4);
+
+ // Surveyor gets the response.
+ rc = xs_recv (surveyor, buf, sizeof (buf), 0);
+ assert (rc == 4);
+
+ rc = xs_close (respondent2);
+ assert (rc == 0);
+ rc = xs_close (respondent1);
+ assert (rc == 0);
+ rc = xs_close (surveyor);
+ assert (rc == 0);
+ rc = xs_close (xrespondent);
+ assert (rc == 0);
+ rc = xs_close (xsurveyor);
+ assert (rc == 0);
+ rc = xs_term (ctx);
+ assert (rc == 0);
+
+ return 0 ;
+}
diff --git a/tests/tests.cpp b/tests/tests.cpp
index 21d10e3..3d7951c 100644
--- a/tests/tests.cpp
+++ b/tests/tests.cpp
@@ -111,6 +111,10 @@
#include "resubscribe.cpp"
#undef XS_TEST_MAIN
+#define XS_TEST_MAIN survey
+#include "survey.cpp"
+#undef XS_TEST_MAIN
+
int main ()
{
int rc;
@@ -155,7 +159,8 @@ int main ()
assert (rc == 0);
rc = resubscribe ();
assert (rc == 0);
-
+ rc = survey ();
+ assert (rc == 0);
fprintf (stderr, "SUCCESS\n");
sleep (1);