diff options
-rwxr-xr-x | .gitignore | 1 | ||||
-rw-r--r-- | builds/msvc/libxs/libxs.vcxproj | 10 | ||||
-rw-r--r-- | builds/msvc/libxs/libxs.vcxproj.filters | 26 | ||||
-rw-r--r-- | builds/msvc/tests/tests.vcxproj | 6 | ||||
-rw-r--r-- | builds/msvc/tests/tests.vcxproj.filters | 5 | ||||
-rw-r--r-- | include/xs.h | 4 | ||||
-rw-r--r-- | src/Makefile.am | 8 | ||||
-rw-r--r-- | src/respondent.cpp | 114 | ||||
-rw-r--r-- | src/respondent.hpp | 77 | ||||
-rw-r--r-- | src/session_base.cpp | 20 | ||||
-rw-r--r-- | src/socket_base.cpp | 16 | ||||
-rw-r--r-- | src/surveyor.cpp | 135 | ||||
-rw-r--r-- | src/surveyor.hpp | 77 | ||||
-rw-r--r-- | src/xrespondent.cpp | 278 | ||||
-rw-r--r-- | src/xrespondent.hpp | 123 | ||||
-rw-r--r-- | src/xsurveyor.cpp | 89 | ||||
-rw-r--r-- | src/xsurveyor.hpp | 85 | ||||
-rw-r--r-- | tests/Makefile.am | 4 | ||||
-rw-r--r-- | tests/survey.cpp | 135 | ||||
-rw-r--r-- | tests/tests.cpp | 7 |
20 files changed, 1214 insertions, 6 deletions
@@ -49,6 +49,7 @@ tests/polltimeo tests/wireformat tests/libzmq21 tests/resubscribe +tests/survey src/platform.hpp* src/stamp-h1 perf/*.exe diff --git a/builds/msvc/libxs/libxs.vcxproj b/builds/msvc/libxs/libxs.vcxproj index bbab5fd..d4f40ca 100644 --- a/builds/msvc/libxs/libxs.vcxproj +++ b/builds/msvc/libxs/libxs.vcxproj @@ -143,12 +143,14 @@ <ClCompile Include="..\..\..\src\reaper.cpp" /> <ClCompile Include="..\..\..\src\rep.cpp" /> <ClCompile Include="..\..\..\src\req.cpp" /> + <ClCompile Include="..\..\..\src\respondent.cpp" /> <ClCompile Include="..\..\..\src\select.cpp" /> <ClCompile Include="..\..\..\src\session_base.cpp" /> <ClCompile Include="..\..\..\src\signaler.cpp" /> <ClCompile Include="..\..\..\src\socket_base.cpp" /> <ClCompile Include="..\..\..\src\stream_engine.cpp" /> <ClCompile Include="..\..\..\src\sub.cpp" /> + <ClCompile Include="..\..\..\src\surveyor.cpp" /> <ClCompile Include="..\..\..\src\tcp_address.cpp" /> <ClCompile Include="..\..\..\src\tcp_connecter.cpp" /> <ClCompile Include="..\..\..\src\tcp_listener.cpp" /> @@ -157,8 +159,10 @@ <ClCompile Include="..\..\..\src\xpub.cpp" /> <ClCompile Include="..\..\..\src\xrep.cpp" /> <ClCompile Include="..\..\..\src\xreq.cpp" /> + <ClCompile Include="..\..\..\src\xrespondent.cpp" /> <ClCompile Include="..\..\..\src\xsub.cpp" /> <ClCompile Include="..\..\..\src\xs.cpp" /> + <ClCompile Include="..\..\..\src\xsurveyor.cpp" /> <ClCompile Include="..\..\..\src\xszmq.cpp" /> </ItemGroup> <ItemGroup> @@ -211,6 +215,7 @@ <ClInclude Include="..\..\..\src\reaper.hpp" /> <ClInclude Include="..\..\..\src\rep.hpp" /> <ClInclude Include="..\..\..\src\req.hpp" /> + <ClInclude Include="..\..\..\src\respondent.hpp" /> <ClInclude Include="..\..\..\src\select.hpp" /> <ClInclude Include="..\..\..\src\session_base.hpp" /> <ClInclude Include="..\..\..\src\signaler.hpp" /> @@ -218,6 +223,7 @@ <ClInclude Include="..\..\..\src\stdint.hpp" /> <ClInclude Include="..\..\..\src\stream_engine.hpp" /> <ClInclude Include="..\..\..\src\sub.hpp" /> + <ClInclude Include="..\..\..\src\surveyor.hpp" /> <ClInclude Include="..\..\..\src\tcp_address.hpp" /> <ClInclude Include="..\..\..\src\tcp_connecter.hpp" /> <ClInclude Include="..\..\..\src\tcp_listener.hpp" /> @@ -228,7 +234,9 @@ <ClInclude Include="..\..\..\src\xpub.hpp" /> <ClInclude Include="..\..\..\src\xrep.hpp" /> <ClInclude Include="..\..\..\src\xreq.hpp" /> + <ClInclude Include="..\..\..\src\xrespondent.hpp" /> <ClInclude Include="..\..\..\src\xsub.hpp" /> + <ClInclude Include="..\..\..\src\xsurveyor.hpp" /> <ClInclude Include="..\..\..\src\ypipe.hpp" /> <ClInclude Include="..\..\..\src\yqueue.hpp" /> <ClInclude Include="..\platform.hpp" /> @@ -236,4 +244,4 @@ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> <ImportGroup Label="ExtensionTargets"> </ImportGroup> -</Project> +</Project>
\ No newline at end of file diff --git a/builds/msvc/libxs/libxs.vcxproj.filters b/builds/msvc/libxs/libxs.vcxproj.filters index eb0f505..1fefb60 100644 --- a/builds/msvc/libxs/libxs.vcxproj.filters +++ b/builds/msvc/libxs/libxs.vcxproj.filters @@ -176,6 +176,18 @@ <ClCompile Include="..\..\..\src\prefix_filter.cpp"> <Filter>Source Files</Filter> </ClCompile> + <ClCompile Include="..\..\..\src\respondent.cpp"> + <Filter>Source Files</Filter> + </ClCompile> + <ClCompile Include="..\..\..\src\xrespondent.cpp"> + <Filter>Source Files</Filter> + </ClCompile> + <ClCompile Include="..\..\..\src\surveyor.cpp"> + <Filter>Source Files</Filter> + </ClCompile> + <ClCompile Include="..\..\..\src\xsurveyor.cpp"> + <Filter>Source Files</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <ClInclude Include="..\..\..\include\xs.h"> @@ -388,5 +400,17 @@ <ClInclude Include="..\..\..\src\prefix_filter.hpp"> <Filter>Header Files</Filter> </ClInclude> + <ClInclude Include="..\..\..\src\respondent.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="..\..\..\src\surveyor.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="..\..\..\src\xrespondent.hpp"> + <Filter>Header Files</Filter> + </ClInclude> + <ClInclude Include="..\..\..\src\xsurveyor.hpp"> + <Filter>Header Files</Filter> + </ClInclude> </ItemGroup> -</Project> +</Project>
\ No newline at end of file diff --git a/builds/msvc/tests/tests.vcxproj b/builds/msvc/tests/tests.vcxproj index bb707c4..d1258e4 100644 --- a/builds/msvc/tests/tests.vcxproj +++ b/builds/msvc/tests/tests.vcxproj @@ -151,6 +151,10 @@ <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild> <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild> </ClCompile> + <ClCompile Include="..\..\..\tests\survey.cpp"> + <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild> + <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild> + </ClCompile> <ClCompile Include="..\..\..\tests\tests.cpp" /> <ClCompile Include="..\..\..\tests\timeo.cpp"> <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild> @@ -184,4 +188,4 @@ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> <ImportGroup Label="ExtensionTargets"> </ImportGroup> -</Project> +</Project>
\ No newline at end of file diff --git a/builds/msvc/tests/tests.vcxproj.filters b/builds/msvc/tests/tests.vcxproj.filters index b1eade2..0fb41a8 100644 --- a/builds/msvc/tests/tests.vcxproj.filters +++ b/builds/msvc/tests/tests.vcxproj.filters @@ -65,6 +65,9 @@ <ClCompile Include="..\..\..\tests\resubscribe.cpp"> <Filter>Header Files</Filter> </ClCompile> + <ClCompile Include="..\..\..\tests\survey.cpp"> + <Filter>Header Files</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <Filter Include="Header Files"> @@ -76,4 +79,4 @@ <Filter>Header Files</Filter> </ClInclude> </ItemGroup> -</Project> +</Project>
\ No newline at end of file diff --git a/include/xs.h b/include/xs.h index 17a557b..34d6c60 100644 --- a/include/xs.h +++ b/include/xs.h @@ -172,6 +172,10 @@ XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval, #define XS_PUSH 8 #define XS_XPUB 9 #define XS_XSUB 10 +#define XS_SURVEYOR 11 +#define XS_RESPONDENT 12 +#define XS_XSURVEYOR 13 +#define XS_XRESPONDENT 14 /* Legacy socket type aliases. */ #define XS_ROUTER XS_XREP diff --git a/src/Makefile.am b/src/Makefile.am index 81ece53..3025e13 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -64,6 +64,7 @@ libxs_la_SOURCES = \ reaper.hpp \ rep.hpp \ req.hpp \ + respondent.hpp \ select.hpp \ session_base.hpp \ signaler.hpp \ @@ -71,6 +72,7 @@ libxs_la_SOURCES = \ stdint.hpp \ stream_engine.hpp \ sub.hpp \ + surveyor.hpp \ tcp_address.hpp \ tcp_connecter.hpp \ tcp_listener.hpp \ @@ -81,7 +83,9 @@ libxs_la_SOURCES = \ xpub.hpp \ xrep.hpp \ xreq.hpp \ + xrespondent.hpp \ xsub.hpp \ + xsurveyor.hpp \ ypipe.hpp \ yqueue.hpp \ clock.cpp \ @@ -121,12 +125,14 @@ libxs_la_SOURCES = \ random.cpp \ rep.cpp \ req.cpp \ + respondent.cpp \ select.cpp \ session_base.cpp \ signaler.cpp \ socket_base.cpp \ stream_engine.cpp \ sub.cpp \ + surveyor.cpp \ tcp_address.cpp \ tcp_connecter.cpp \ tcp_listener.cpp \ @@ -135,7 +141,9 @@ libxs_la_SOURCES = \ xpub.cpp \ xrep.cpp \ xreq.cpp \ + xrespondent.cpp \ xsub.cpp \ + xsurveyor.cpp \ xs.cpp if ON_MINGW diff --git a/src/respondent.cpp b/src/respondent.cpp new file mode 100644 index 0000000..c2abd84 --- /dev/null +++ b/src/respondent.cpp @@ -0,0 +1,114 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "respondent.hpp" +#include "err.hpp" +#include "msg.hpp" + +xs::respondent_t::respondent_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + xrespondent_t (parent_, tid_, sid_), + sending_reply (false) +{ + options.type = XS_RESPONDENT; +} + +xs::respondent_t::~respondent_t () +{ +} + +int xs::respondent_t::xsend (msg_t *msg_, int flags_) +{ + // If there's no ongoing survey, we cannot send reply. + if (!sending_reply) { + errno = EFSM; + return -1; + } + + // Survey pattern doesn't support multipart messages. + if (msg_->flags () & msg_t::more || flags_ & XS_SNDMORE) { + errno = EINVAL; + return -1; + } + + // Push message to the reply pipe. + int rc = xrespondent_t::xsend (msg_, flags_); + if (rc != 0) + return rc; + + // Flip the FSM back to request receiving state. + sending_reply = false; + + return 0; +} + +int xs::respondent_t::xrecv (msg_t *msg_, int flags_) +{ + // If we are in middle of sending a reply, we cannot receive next survey. + if (sending_reply) { + errno = EFSM; + return -1; + } + + // First thing to do when receiving a srvey is to copy all the labels + // to the reply pipe. + while (true) { + int rc = xrespondent_t::xrecv (msg_, flags_); + if (rc != 0) + return rc; + if (!(msg_->flags () & msg_t::more)) + break; + rc = xrespondent_t::xsend (msg_, flags_); + errno_assert (rc == 0); + } + + // When whole survey is read, flip the FSM to reply-sending state. + sending_reply = true; + + return 0; +} + +bool xs::respondent_t::xhas_in () +{ + if (sending_reply) + return false; + + return xrespondent_t::xhas_in (); +} + +bool xs::respondent_t::xhas_out () +{ + if (!sending_reply) + return false; + + return xrespondent_t::xhas_out (); +} + +xs::respondent_session_t::respondent_session_t (io_thread_t *io_thread_, + bool connect_, socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + xrespondent_session_t (io_thread_, connect_, socket_, options_, protocol_, + address_) +{ +} + +xs::respondent_session_t::~respondent_session_t () +{ +} + diff --git a/src/respondent.hpp b/src/respondent.hpp new file mode 100644 index 0000000..b315350 --- /dev/null +++ b/src/respondent.hpp @@ -0,0 +1,77 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __XS_RESPONDENT_HPP_INCLUDED__ +#define __XS_RESPONDENT_HPP_INCLUDED__ + +#include "xrespondent.hpp" + +namespace xs +{ + + class ctx_t; + class msg_t; + class io_thread_t; + class socket_base_t; + + class respondent_t : public xrespondent_t + { + public: + + respondent_t (xs::ctx_t *parent_, uint32_t tid_, int sid_); + ~respondent_t (); + + // Overloads of functions from socket_base_t. + int xsend (xs::msg_t *msg_, int flags_); + int xrecv (xs::msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // If true, we are in process of sending the reply. If false we are + // in process of receiving a request. + // TODO: Consider whether automatic cancelling of the previous request + // when recv is called again is not a better idea than returning EFSM. + bool sending_reply; + + respondent_t (const respondent_t&); + const respondent_t &operator = (const respondent_t&); + + }; + + class respondent_session_t : public xrespondent_session_t + { + public: + + respondent_session_t (xs::io_thread_t *io_thread_, bool connect_, + xs::socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~respondent_session_t (); + + private: + + respondent_session_t (const respondent_session_t&); + const respondent_session_t &operator = (const respondent_session_t&); + }; + +} + +#endif diff --git a/src/session_base.cpp b/src/session_base.cpp index 90fa071..35c4a4e 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -42,6 +42,10 @@ #include "push.hpp" #include "pull.hpp" #include "pair.hpp" +#include "surveyor.hpp" +#include "xsurveyor.hpp" +#include "respondent.hpp" +#include "xrespondent.hpp" xs::session_base_t *xs::session_base_t::create (class io_thread_t *io_thread_, bool connect_, class socket_base_t *socket_, const options_t &options_, @@ -93,6 +97,22 @@ xs::session_base_t *xs::session_base_t::create (class io_thread_t *io_thread_, s = new (std::nothrow) pair_session_t (io_thread_, connect_, socket_, options_, protocol_, address_); break; + case XS_SURVEYOR: + s = new (std::nothrow) surveyor_session_t (io_thread_, connect_, + socket_, options_, protocol_, address_); + break; + case XS_XSURVEYOR: + s = new (std::nothrow) xsurveyor_session_t (io_thread_, connect_, + socket_, options_, protocol_, address_); + break; + case XS_RESPONDENT: + s = new (std::nothrow) respondent_session_t (io_thread_, connect_, + socket_, options_, protocol_, address_); + break; + case XS_XRESPONDENT: + s = new (std::nothrow) xrespondent_session_t (io_thread_, connect_, + socket_, options_, protocol_, address_); + break; default: errno = EINVAL; return NULL; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index eb9b491..df182f1 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -61,6 +61,10 @@ #include "xrep.hpp" #include "xpub.hpp" #include "xsub.hpp" +#include "surveyor.hpp" +#include "xsurveyor.hpp" +#include "respondent.hpp" +#include "xrespondent.hpp" bool xs::socket_base_t::check_tag () { @@ -106,6 +110,18 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_, case XS_XSUB: s = new (std::nothrow) xsub_t (parent_, tid_, sid_); break; + case XS_SURVEYOR: + s = new (std::nothrow) surveyor_t (parent_, tid_, sid_); + break; + case XS_XSURVEYOR: + s = new (std::nothrow) xsurveyor_t (parent_, tid_, sid_); + break; + case XS_RESPONDENT: + s = new (std::nothrow) respondent_t (parent_, tid_, sid_); + break; + case XS_XRESPONDENT: + s = new (std::nothrow) xrespondent_t (parent_, tid_, sid_); + break; default: errno = EINVAL; return NULL; diff --git a/src/surveyor.cpp b/src/surveyor.cpp new file mode 100644 index 0000000..c5216bd --- /dev/null +++ b/src/surveyor.cpp @@ -0,0 +1,135 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "surveyor.hpp" +#include "err.hpp" +#include "msg.hpp" +#include "wire.hpp" +#include "likely.hpp" +#include "random.hpp" + +xs::surveyor_t::surveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + xsurveyor_t (parent_, tid_, sid_), + receiving_responses (false), + survey_id (generate_random ()) +{ + options.type = XS_SURVEYOR; +} + +xs::surveyor_t::~surveyor_t () +{ +} + +int xs::surveyor_t::xsend (msg_t *msg_, int flags_) +{ + int rc; + + // Survey pattern works only with sigle-part messages. + if (flags_ & XS_SNDMORE || msg_->flags () & msg_t::more) { + errno = EINVAL; + return -1; + } + + // Start the new survey. First, generate new survey ID. + ++survey_id; + msg_t id; + rc = id.init_size (4); + errno_assert (rc == 0); + put_uint32 ((unsigned char*) id.data (), survey_id); + id.set_flags (msg_t::more); + rc = xsurveyor_t::xsend (&id, 0); + if (rc != 0) { + id.close (); + return -1; + } + id.close (); + + // Now send the body of the survey. + rc = xsurveyor_t::xsend (msg_, flags_); + errno_assert (rc == 0); + + // Start waiting for responses from the peers. + receiving_responses = true; + + return 0; +} + +int xs::surveyor_t::xrecv (msg_t *msg_, int flags_) +{ + int rc; + + // If there's no survey underway, it's an error. + if (!receiving_responses) { + errno = EFSM; + return -1; + } + + // Get the first part of the response -- the survey ID. + rc = xsurveyor_t::xrecv (msg_, flags_); + if (rc != 0) + return rc; + + // Check whether this is response for the onging survey. If not, we can + // drop the response. + if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 4 || + get_uint32 ((unsigned char*) msg_->data ()) != survey_id)) { + while (true) { + rc = xsurveyor_t::xrecv (msg_, flags_); + errno_assert (rc == 0); + if (!(msg_->flags () & msg_t::more)) + break; + } + msg_->close (); + msg_->init (); + errno = EAGAIN; + return -1; + } + + // Get the body of the response. + rc = xsurveyor_t::xrecv (msg_, flags_); + errno_assert (rc == 0); + + return 0; +} + +bool xs::surveyor_t::xhas_in () +{ + // TODO: We need a prefetched_message here... + xs_assert (false); + return false; +} + +bool xs::surveyor_t::xhas_out () +{ + return xsurveyor_t::xhas_out (); +} + +xs::surveyor_session_t::surveyor_session_t (io_thread_t *io_thread_, + bool connect_, socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + xsurveyor_session_t (io_thread_, connect_, socket_, options_, protocol_, + address_) +{ +} + +xs::surveyor_session_t::~surveyor_session_t () +{ +} + diff --git a/src/surveyor.hpp b/src/surveyor.hpp new file mode 100644 index 0000000..83fdbb0 --- /dev/null +++ b/src/surveyor.hpp @@ -0,0 +1,77 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __XS_SURVEYOR_HPP_INCLUDED__ +#define __XS_SURVEYOR_HPP_INCLUDED__ + +#include "xsurveyor.hpp" +#include "stdint.hpp" + +namespace xs +{ + + class ctx_t; + class msg_t; + class io_thread_t; + class socket_base_t; + + class surveyor_t : public xsurveyor_t + { + public: + + surveyor_t (xs::ctx_t *parent_, uint32_t tid_, int sid_); + ~surveyor_t (); + + // Overloads of functions from socket_base_t. + int xsend (xs::msg_t *msg_, int flags_); + int xrecv (xs::msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // If true, survey was already lauched and have no expriered yet. + bool receiving_responses; + + // The ID of the ongoing survey. + uint32_t survey_id; + + surveyor_t (const surveyor_t&); + const surveyor_t &operator = (const surveyor_t&); + }; + + class surveyor_session_t : public xsurveyor_session_t + { + public: + + surveyor_session_t (xs::io_thread_t *io_thread_, bool connect_, + xs::socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~surveyor_session_t (); + + private: + + surveyor_session_t (const surveyor_session_t&); + const surveyor_session_t &operator = (const surveyor_session_t&); + }; + +} + +#endif diff --git a/src/xrespondent.cpp b/src/xrespondent.cpp new file mode 100644 index 0000000..026fe8d --- /dev/null +++ b/src/xrespondent.cpp @@ -0,0 +1,278 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "xrespondent.hpp" +#include "pipe.hpp" +#include "wire.hpp" +#include "random.hpp" +#include "likely.hpp" +#include "err.hpp" + +xs::xrespondent_t::xrespondent_t (class ctx_t *parent_, uint32_t tid_, + int sid_) : + socket_base_t (parent_, tid_, sid_), + prefetched (0), + more_in (false), + current_out (NULL), + more_out (false), + next_peer_id (generate_random ()) +{ + options.type = XS_XRESPONDENT; + + prefetched_msg.init (); +} + +xs::xrespondent_t::~xrespondent_t () +{ + xs_assert (outpipes.empty ()); + prefetched_msg.close (); +} + +void xs::xrespondent_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) +{ + xs_assert (pipe_); + + // Add the pipe to the map out outbound pipes. + outpipe_t outpipe = {pipe_, true}; + bool ok = outpipes.insert (outpipes_t::value_type ( + next_peer_id, outpipe)).second; + xs_assert (ok); + + // Add the pipe to the list of inbound pipes. + blob_t identity (4, 0); + put_uint32 ((unsigned char*) identity.data (), next_peer_id); + pipe_->set_identity (identity); + fq.attach (pipe_); + + // Generate a new unique peer identity. + ++next_peer_id; +} + +void xs::xrespondent_t::xterminated (pipe_t *pipe_) +{ + fq.terminated (pipe_); + + for (outpipes_t::iterator it = outpipes.begin (); + it != outpipes.end (); ++it) { + if (it->second.pipe == pipe_) { + outpipes.erase (it); + if (pipe_ == current_out) + current_out = NULL; + return; + } + } + xs_assert (false); +} + +void xs::xrespondent_t::xread_activated (pipe_t *pipe_) +{ + fq.activated (pipe_); +} + +void xs::xrespondent_t::xwrite_activated (pipe_t *pipe_) +{ + for (outpipes_t::iterator it = outpipes.begin (); + it != outpipes.end (); ++it) { + if (it->second.pipe == pipe_) { + xs_assert (!it->second.active); + it->second.active = true; + return; + } + } + xs_assert (false); +} + +int xs::xrespondent_t::xsend (msg_t *msg_, int flags_) +{ + // If this is the first part of the message it's the ID of the + // peer to send the message to. + if (!more_out) { + xs_assert (!current_out); + + // If we have malformed message (prefix with no subsequent message) + // then just silently ignore it. + // TODO: The connections should be killed instead. + if (msg_->flags () & msg_t::more && msg_->size () == 4) { + + more_out = true; + + // Find the pipe associated with the identity stored in the prefix. + // If there's no such pipe just silently ignore the message. + uint32_t identity = get_uint32 ((unsigned char*) msg_->data ()); + outpipes_t::iterator it = outpipes.find (identity); + + if (it != outpipes.end ()) { + current_out = it->second.pipe; + msg_t empty; + int rc = empty.init (); + errno_assert (rc == 0); + if (!current_out->check_write (&empty)) { + it->second.active = false; + more_out = false; + current_out = NULL; + } + rc = empty.close (); + errno_assert (rc == 0); + } + + } + + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); + errno_assert (rc == 0); + return 0; + } + + // Check whether this is the last part of the message. + more_out = msg_->flags () & msg_t::more ? true : false; + + // Push the message into the pipe. If there's no out pipe, just drop it. + if (current_out) { + bool ok = current_out->write (msg_); + if (unlikely (!ok)) + current_out = NULL; + else if (!more_out) { + current_out->flush (); + current_out = NULL; + } + } + else { + int rc = msg_->close (); + errno_assert (rc == 0); + } + + // Detach the message from the data buffer. + int rc = msg_->init (); + errno_assert (rc == 0); + + return 0; +} + +int xs::xrespondent_t::xrecv (msg_t *msg_, int flags_) +{ + int rc; + + // if there is a prefetched identity, return it. + if (prefetched == 2) + { + rc = msg_->init_size (prefetched_id.size ()); + errno_assert (rc == 0); + memcpy (msg_->data (), prefetched_id.data (), prefetched_id.size ()); + msg_->set_flags (msg_t::more); + prefetched = 1; + return 0; + } + + // If there is a prefetched message, return it. + if (prefetched == 1) { + rc = msg_->move (prefetched_msg); + errno_assert (rc == 0); + more_in = msg_->flags () & msg_t::more ? true : false; + prefetched = 0; + return 0; + } + + // Get next message part. + pipe_t *pipe = NULL; + rc = fq.recvpipe (msg_, flags_, &pipe); + if (rc != 0) + return -1; + + // If we are in the middle of reading a message, just return the next part. + if (more_in) { + more_in = msg_->flags () & msg_t::more ? true : false; + return 0; + } + + // We are at the beginning of a new message. Move the message part we + // have to the prefetched and return the ID of the peer instead. + rc = prefetched_msg.move (*msg_); + errno_assert (rc == 0); + prefetched = 1; + rc = msg_->close (); + errno_assert (rc == 0); + + blob_t identity = pipe->get_identity (); + rc = msg_->init_size (identity.size ()); + errno_assert (rc == 0); + memcpy (msg_->data (), identity.data (), identity.size ()); + msg_->set_flags (msg_t::more); + return 0; +} + +int xs::xrespondent_t::rollback (void) +{ + if (current_out) { + current_out->rollback (); + current_out = NULL; + more_out = false; + } + return 0; +} + +bool xs::xrespondent_t::xhas_in () +{ + // If we are in the middle of reading the messages, there are + // definitely more parts available. + if (more_in) + return true; + + // We may already have a message pre-fetched. + if (prefetched) + return true; + + // Try to read the next message to the pre-fetch buffer. If anything, + // it will be identity of the peer sending the message. + msg_t id; + id.init (); + int rc = xrespondent_t::xrecv (&id, XS_DONTWAIT); + if (rc != 0 && errno == EAGAIN) { + id.close (); + return false; + } + xs_assert (rc == 0); + + // We have first part of the message prefetched now. We will store the + // prefetched identity as well. + prefetched_id.assign ((unsigned char*) id.data (), id.size ()); + id.close (); + prefetched = 2; + + return true; +} + +bool xs::xrespondent_t::xhas_out () +{ + return true; +} + +xs::xrespondent_session_t::xrespondent_session_t (io_thread_t *io_thread_, + bool connect_, socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + session_base_t (io_thread_, connect_, socket_, options_, protocol_, + address_) +{ +} + +xs::xrespondent_session_t::~xrespondent_session_t () +{ +} + diff --git a/src/xrespondent.hpp b/src/xrespondent.hpp new file mode 100644 index 0000000..eb6ac9a --- /dev/null +++ b/src/xrespondent.hpp @@ -0,0 +1,123 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __XS_XRESPONDENT_HPP_INCLUDED__ +#define __XS_XRESPONDENT_HPP_INCLUDED__ + +#include <map> + +#include "socket_base.hpp" +#include "session_base.hpp" +#include "stdint.hpp" +#include "blob.hpp" +#include "msg.hpp" +#include "fq.hpp" + +namespace xs +{ + + class ctx_t; + class pipe_t; + class io_thread_t; + + // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. + class xrespondent_t : + public socket_base_t + { + public: + + xrespondent_t (xs::ctx_t *parent_, uint32_t tid_, int sid_); + ~xrespondent_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); + int xsend (msg_t *msg_, int flags_); + int xrecv (msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + void xread_activated (xs::pipe_t *pipe_); + void xwrite_activated (xs::pipe_t *pipe_); + void xterminated (xs::pipe_t *pipe_); + + protected: + + // Rollback any message parts that were sent but not yet flushed. + int rollback (); + + private: + + // Fair queueing object for inbound pipes. + fq_t fq; + + // This value is either 0 (nothing is prefetched), 1 (only message body + // is prefetched) or 2 (both identity and message body are prefetched). + int prefetched; + + // Holds the prefetched identity. + blob_t prefetched_id; + + // Holds the prefetched message. + msg_t prefetched_msg; + + // If true, more incoming message parts are expected. + bool more_in; + + struct outpipe_t + { + xs::pipe_t *pipe; + bool active; + }; + + // Outbound pipes indexed by the peer IDs. + typedef std::map <uint32_t, outpipe_t> outpipes_t; + outpipes_t outpipes; + + // The pipe we are currently writing to. + xs::pipe_t *current_out; + + // If true, more outgoing message parts are expected. + bool more_out; + + // Peer ID are generated. It's a simple increment and wrap-over + // algorithm. This value is the next ID to use (if not used already). + uint32_t next_peer_id; + + xrespondent_t (const xrespondent_t&); + const xrespondent_t &operator = (const xrespondent_t&); + }; + + class xrespondent_session_t : public session_base_t + { + public: + + xrespondent_session_t (xs::io_thread_t *io_thread_, bool connect_, + socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~xrespondent_session_t (); + + private: + + xrespondent_session_t (const xrespondent_session_t&); + const xrespondent_session_t &operator = (const xrespondent_session_t&); + }; + +} + +#endif diff --git a/src/xsurveyor.cpp b/src/xsurveyor.cpp new file mode 100644 index 0000000..4265437 --- /dev/null +++ b/src/xsurveyor.cpp @@ -0,0 +1,89 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "xsurveyor.hpp" +#include "err.hpp" +#include "msg.hpp" + +xs::xsurveyor_t::xsurveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + socket_base_t (parent_, tid_, sid_) +{ + options.type = XS_XSURVEYOR; +} + +xs::xsurveyor_t::~xsurveyor_t () +{ +} + +void xs::xsurveyor_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) +{ + xs_assert (pipe_); + fq.attach (pipe_); + dist.attach (pipe_); +} + +int xs::xsurveyor_t::xsend (msg_t *msg_, int flags_) +{ + return dist.send_to_all (msg_, flags_); +} + +int xs::xsurveyor_t::xrecv (msg_t *msg_, int flags_) +{ + return fq.recv (msg_, flags_); +} + +bool xs::xsurveyor_t::xhas_in () +{ + return fq.has_in (); +} + +bool xs::xsurveyor_t::xhas_out () +{ + return dist.has_out (); +} + +void xs::xsurveyor_t::xread_activated (pipe_t *pipe_) +{ + fq.activated (pipe_); +} + +void xs::xsurveyor_t::xwrite_activated (pipe_t *pipe_) +{ + dist.activated (pipe_); +} + +void xs::xsurveyor_t::xterminated (pipe_t *pipe_) +{ + fq.terminated (pipe_); + dist.terminated (pipe_); +} + +xs::xsurveyor_session_t::xsurveyor_session_t (io_thread_t *io_thread_, + bool connect_, socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + session_base_t (io_thread_, connect_, socket_, options_, protocol_, + address_) +{ +} + +xs::xsurveyor_session_t::~xsurveyor_session_t () +{ +} + diff --git a/src/xsurveyor.hpp b/src/xsurveyor.hpp new file mode 100644 index 0000000..b0c3f9c --- /dev/null +++ b/src/xsurveyor.hpp @@ -0,0 +1,85 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __XS_XSURVEYOR_HPP_INCLUDED__ +#define __XS_XSURVEYOR_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "session_base.hpp" +#include "dist.hpp" +#include "fq.hpp" + +namespace xs +{ + + class ctx_t; + class msg_t; + class pipe_t; + class io_thread_t; + class socket_base_t; + + class xsurveyor_t : public socket_base_t + { + public: + + xsurveyor_t (xs::ctx_t *parent_, uint32_t tid_, int sid_); + ~xsurveyor_t (); + + protected: + + // Overloads of functions from socket_base_t. + void xattach_pipe (xs::pipe_t *pipe_, bool icanhasall_); + int xsend (xs::msg_t *msg_, int flags_); + int xrecv (xs::msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + void xread_activated (xs::pipe_t *pipe_); + void xwrite_activated (xs::pipe_t *pipe_); + void xterminated (xs::pipe_t *pipe_); + + private: + + // Inbound messages are fair-queued from inbound pipes. + // Outbound messages are distributed to all outbound pipes. + fq_t fq; + dist_t dist; + + xsurveyor_t (const xsurveyor_t&); + const xsurveyor_t &operator = (const xsurveyor_t&); + }; + + class xsurveyor_session_t : public session_base_t + { + public: + + xsurveyor_session_t (xs::io_thread_t *io_thread_, bool connect_, + xs::socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_); + ~xsurveyor_session_t (); + + private: + + xsurveyor_session_t (const xsurveyor_session_t&); + const xsurveyor_session_t &operator = (const xsurveyor_session_t&); + }; + +} + +#endif diff --git a/tests/Makefile.am b/tests/Makefile.am index af826af..ba76260 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -23,7 +23,8 @@ noinst_PROGRAMS = pair_inproc \ polltimeo \ wireformat \ libzmq21 \ - resubscribe + resubscribe \ + survey pair_inproc_SOURCES = pair_inproc.cpp testutil.hpp pair_tcp_SOURCES = pair_tcp.cpp testutil.hpp @@ -46,5 +47,6 @@ polltimeo_SOURCES = polltimeo.cpp testutil.hpp wireformat_SOURCES = wireformat.cpp libzmq21_SOURCES = libzmq21.cpp resubscribe_SOURCES = resubscribe.cpp +survey_SOURCES = survey.cpp TESTS = $(noinst_PROGRAMS) diff --git a/tests/survey.cpp b/tests/survey.cpp new file mode 100644 index 0000000..41f39aa --- /dev/null +++ b/tests/survey.cpp @@ -0,0 +1,135 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "testutil.hpp" + +int XS_TEST_MAIN () +{ + int rc; + char buf [32]; + + fprintf (stderr, "survey test running...\n"); + + // Create the basic infrastructure. + void *ctx = xs_init (); + assert (ctx); + void *xsurveyor = xs_socket (ctx, XS_XSURVEYOR); + assert (xsurveyor); + rc = xs_bind (xsurveyor, "inproc://a"); + assert (rc == 0); + void *xrespondent = xs_socket (ctx, XS_XRESPONDENT); + assert (xrespondent); + rc = xs_bind (xrespondent, "inproc://b"); + assert (rc == 0); + void *surveyor = xs_socket (ctx, XS_SURVEYOR); + assert (surveyor); + rc = xs_connect (surveyor, "inproc://b"); + assert (rc == 0); + void *respondent1 = xs_socket (ctx, XS_RESPONDENT); + assert (respondent1); + rc = xs_connect (respondent1, "inproc://a"); + assert (rc == 0); + void *respondent2 = xs_socket (ctx, XS_RESPONDENT); + assert (respondent2); + rc = xs_connect (respondent2, "inproc://a"); + assert (rc == 0); + + // Send the survey. + rc = xs_send (surveyor, "ABC", 3, 0); + assert (rc == 3); + + // Forward the survey through the intermediate device. + // Survey consist of identity (4 bytes), survey ID (4 bytes) and the body. + rc = xs_recv (xrespondent, buf, sizeof (buf), 0); + assert (rc == 4); + rc = xs_send (xsurveyor, buf, 4, XS_SNDMORE); + assert (rc == 4); + rc = xs_recv (xrespondent, buf, sizeof (buf), 0); + assert (rc == 4); + rc = xs_send (xsurveyor, buf, 4, XS_SNDMORE); + assert (rc == 4); + rc = xs_recv (xrespondent, buf, sizeof (buf), 0); + assert (rc == 3); + rc = xs_send (xsurveyor, buf, 3, 0); + assert (rc == 3); + + // Respondent 1 responds to the survey. + rc = xs_recv (respondent1, buf, sizeof (buf), 0); + assert (rc == 3); + rc = xs_send (respondent1, "DE", 2, 0); + assert (rc == 2); + + // Forward the response through the intermediate device. + rc = xs_recv (xsurveyor, buf, sizeof (buf), 0); + assert (rc == 4); + rc = xs_send (xrespondent, buf, 4, XS_SNDMORE); + assert (rc == 4); + rc = xs_recv (xsurveyor, buf, sizeof (buf), 0); + assert (rc == 4); + rc = xs_send (xrespondent, buf, 4, XS_SNDMORE); + assert (rc == 4); + rc = xs_recv (xsurveyor, buf, sizeof (buf), 0); + assert (rc == 2); + rc = xs_send (xrespondent, buf, 2, 0); + assert (rc == 2); + + // Surveyor gets the response. + rc = xs_recv (surveyor, buf, sizeof (buf), 0); + assert (rc == 2); + + // Respondent 2 responds to the survey. + rc = xs_recv (respondent2, buf, sizeof (buf), 0); + assert (rc == 3); + rc = xs_send (respondent2, "FGHI", 4, 0); + assert (rc == 4); + + // Forward the response through the intermediate device. + rc = xs_recv (xsurveyor, buf, sizeof (buf), 0); + assert (rc == 4); + rc = xs_send (xrespondent, buf, 4, XS_SNDMORE); + assert (rc == 4); + rc = xs_recv (xsurveyor, buf, sizeof (buf), 0); + assert (rc == 4); + rc = xs_send (xrespondent, buf, 4, XS_SNDMORE); + assert (rc == 4); + rc = xs_recv (xsurveyor, buf, sizeof (buf), 0); + assert (rc == 4); + rc = xs_send (xrespondent, buf, 4, 0); + assert (rc == 4); + + // Surveyor gets the response. + rc = xs_recv (surveyor, buf, sizeof (buf), 0); + assert (rc == 4); + + rc = xs_close (respondent2); + assert (rc == 0); + rc = xs_close (respondent1); + assert (rc == 0); + rc = xs_close (surveyor); + assert (rc == 0); + rc = xs_close (xrespondent); + assert (rc == 0); + rc = xs_close (xsurveyor); + assert (rc == 0); + rc = xs_term (ctx); + assert (rc == 0); + + return 0 ; +} diff --git a/tests/tests.cpp b/tests/tests.cpp index 21d10e3..3d7951c 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -111,6 +111,10 @@ #include "resubscribe.cpp" #undef XS_TEST_MAIN +#define XS_TEST_MAIN survey +#include "survey.cpp" +#undef XS_TEST_MAIN + int main () { int rc; @@ -155,7 +159,8 @@ int main () assert (rc == 0); rc = resubscribe (); assert (rc == 0); - + rc = survey (); + assert (rc == 0); fprintf (stderr, "SUCCESS\n"); sleep (1); |