diff options
| -rwxr-xr-x | .gitignore | 1 | ||||
| -rw-r--r-- | builds/msvc/libxs/libxs.vcxproj | 10 | ||||
| -rw-r--r-- | builds/msvc/libxs/libxs.vcxproj.filters | 26 | ||||
| -rw-r--r-- | builds/msvc/tests/tests.vcxproj | 6 | ||||
| -rw-r--r-- | builds/msvc/tests/tests.vcxproj.filters | 5 | ||||
| -rw-r--r-- | include/xs.h | 4 | ||||
| -rw-r--r-- | src/Makefile.am | 8 | ||||
| -rw-r--r-- | src/respondent.cpp | 114 | ||||
| -rw-r--r-- | src/respondent.hpp | 77 | ||||
| -rw-r--r-- | src/session_base.cpp | 20 | ||||
| -rw-r--r-- | src/socket_base.cpp | 16 | ||||
| -rw-r--r-- | src/surveyor.cpp | 135 | ||||
| -rw-r--r-- | src/surveyor.hpp | 77 | ||||
| -rw-r--r-- | src/xrespondent.cpp | 278 | ||||
| -rw-r--r-- | src/xrespondent.hpp | 123 | ||||
| -rw-r--r-- | src/xsurveyor.cpp | 89 | ||||
| -rw-r--r-- | src/xsurveyor.hpp | 85 | ||||
| -rw-r--r-- | tests/Makefile.am | 4 | ||||
| -rw-r--r-- | tests/survey.cpp | 135 | ||||
| -rw-r--r-- | tests/tests.cpp | 7 | 
20 files changed, 1214 insertions, 6 deletions
| @@ -49,6 +49,7 @@ tests/polltimeo  tests/wireformat  tests/libzmq21  tests/resubscribe +tests/survey  src/platform.hpp*  src/stamp-h1  perf/*.exe diff --git a/builds/msvc/libxs/libxs.vcxproj b/builds/msvc/libxs/libxs.vcxproj index bbab5fd..d4f40ca 100644 --- a/builds/msvc/libxs/libxs.vcxproj +++ b/builds/msvc/libxs/libxs.vcxproj @@ -143,12 +143,14 @@      <ClCompile Include="..\..\..\src\reaper.cpp" />      <ClCompile Include="..\..\..\src\rep.cpp" />      <ClCompile Include="..\..\..\src\req.cpp" /> +    <ClCompile Include="..\..\..\src\respondent.cpp" />      <ClCompile Include="..\..\..\src\select.cpp" />      <ClCompile Include="..\..\..\src\session_base.cpp" />      <ClCompile Include="..\..\..\src\signaler.cpp" />      <ClCompile Include="..\..\..\src\socket_base.cpp" />      <ClCompile Include="..\..\..\src\stream_engine.cpp" />      <ClCompile Include="..\..\..\src\sub.cpp" /> +    <ClCompile Include="..\..\..\src\surveyor.cpp" />      <ClCompile Include="..\..\..\src\tcp_address.cpp" />      <ClCompile Include="..\..\..\src\tcp_connecter.cpp" />      <ClCompile Include="..\..\..\src\tcp_listener.cpp" /> @@ -157,8 +159,10 @@      <ClCompile Include="..\..\..\src\xpub.cpp" />      <ClCompile Include="..\..\..\src\xrep.cpp" />      <ClCompile Include="..\..\..\src\xreq.cpp" /> +    <ClCompile Include="..\..\..\src\xrespondent.cpp" />      <ClCompile Include="..\..\..\src\xsub.cpp" />      <ClCompile Include="..\..\..\src\xs.cpp" /> +    <ClCompile Include="..\..\..\src\xsurveyor.cpp" />      <ClCompile Include="..\..\..\src\xszmq.cpp" />    </ItemGroup>    <ItemGroup> @@ -211,6 +215,7 @@      <ClInclude Include="..\..\..\src\reaper.hpp" />      <ClInclude Include="..\..\..\src\rep.hpp" />      <ClInclude Include="..\..\..\src\req.hpp" /> +    <ClInclude Include="..\..\..\src\respondent.hpp" />      <ClInclude Include="..\..\..\src\select.hpp" />      <ClInclude Include="..\..\..\src\session_base.hpp" />      <ClInclude Include="..\..\..\src\signaler.hpp" /> @@ -218,6 +223,7 @@      <ClInclude Include="..\..\..\src\stdint.hpp" />      <ClInclude Include="..\..\..\src\stream_engine.hpp" />      <ClInclude Include="..\..\..\src\sub.hpp" /> +    <ClInclude Include="..\..\..\src\surveyor.hpp" />      <ClInclude Include="..\..\..\src\tcp_address.hpp" />      <ClInclude Include="..\..\..\src\tcp_connecter.hpp" />      <ClInclude Include="..\..\..\src\tcp_listener.hpp" /> @@ -228,7 +234,9 @@      <ClInclude Include="..\..\..\src\xpub.hpp" />      <ClInclude Include="..\..\..\src\xrep.hpp" />      <ClInclude Include="..\..\..\src\xreq.hpp" /> +    <ClInclude Include="..\..\..\src\xrespondent.hpp" />      <ClInclude Include="..\..\..\src\xsub.hpp" /> +    <ClInclude Include="..\..\..\src\xsurveyor.hpp" />      <ClInclude Include="..\..\..\src\ypipe.hpp" />      <ClInclude Include="..\..\..\src\yqueue.hpp" />      <ClInclude Include="..\platform.hpp" /> @@ -236,4 +244,4 @@    <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />    <ImportGroup Label="ExtensionTargets">    </ImportGroup> -</Project> +</Project>
\ No newline at end of file diff --git a/builds/msvc/libxs/libxs.vcxproj.filters b/builds/msvc/libxs/libxs.vcxproj.filters index eb0f505..1fefb60 100644 --- a/builds/msvc/libxs/libxs.vcxproj.filters +++ b/builds/msvc/libxs/libxs.vcxproj.filters @@ -176,6 +176,18 @@      <ClCompile Include="..\..\..\src\prefix_filter.cpp">        <Filter>Source Files</Filter>      </ClCompile> +    <ClCompile Include="..\..\..\src\respondent.cpp"> +      <Filter>Source Files</Filter> +    </ClCompile> +    <ClCompile Include="..\..\..\src\xrespondent.cpp"> +      <Filter>Source Files</Filter> +    </ClCompile> +    <ClCompile Include="..\..\..\src\surveyor.cpp"> +      <Filter>Source Files</Filter> +    </ClCompile> +    <ClCompile Include="..\..\..\src\xsurveyor.cpp"> +      <Filter>Source Files</Filter> +    </ClCompile>    </ItemGroup>    <ItemGroup>      <ClInclude Include="..\..\..\include\xs.h"> @@ -388,5 +400,17 @@      <ClInclude Include="..\..\..\src\prefix_filter.hpp">        <Filter>Header Files</Filter>      </ClInclude> +    <ClInclude Include="..\..\..\src\respondent.hpp"> +      <Filter>Header Files</Filter> +    </ClInclude> +    <ClInclude Include="..\..\..\src\surveyor.hpp"> +      <Filter>Header Files</Filter> +    </ClInclude> +    <ClInclude Include="..\..\..\src\xrespondent.hpp"> +      <Filter>Header Files</Filter> +    </ClInclude> +    <ClInclude Include="..\..\..\src\xsurveyor.hpp"> +      <Filter>Header Files</Filter> +    </ClInclude>    </ItemGroup> -</Project> +</Project>
\ No newline at end of file diff --git a/builds/msvc/tests/tests.vcxproj b/builds/msvc/tests/tests.vcxproj index bb707c4..d1258e4 100644 --- a/builds/msvc/tests/tests.vcxproj +++ b/builds/msvc/tests/tests.vcxproj @@ -151,6 +151,10 @@        <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>        <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>      </ClCompile> +    <ClCompile Include="..\..\..\tests\survey.cpp"> +      <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild> +      <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild> +    </ClCompile>      <ClCompile Include="..\..\..\tests\tests.cpp" />      <ClCompile Include="..\..\..\tests\timeo.cpp">        <ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild> @@ -184,4 +188,4 @@    <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />    <ImportGroup Label="ExtensionTargets">    </ImportGroup> -</Project> +</Project>
\ No newline at end of file diff --git a/builds/msvc/tests/tests.vcxproj.filters b/builds/msvc/tests/tests.vcxproj.filters index b1eade2..0fb41a8 100644 --- a/builds/msvc/tests/tests.vcxproj.filters +++ b/builds/msvc/tests/tests.vcxproj.filters @@ -65,6 +65,9 @@      <ClCompile Include="..\..\..\tests\resubscribe.cpp">        <Filter>Header Files</Filter>      </ClCompile> +    <ClCompile Include="..\..\..\tests\survey.cpp"> +      <Filter>Header Files</Filter> +    </ClCompile>    </ItemGroup>    <ItemGroup>      <Filter Include="Header Files"> @@ -76,4 +79,4 @@        <Filter>Header Files</Filter>      </ClInclude>    </ItemGroup> -</Project> +</Project>
\ No newline at end of file diff --git a/include/xs.h b/include/xs.h index 17a557b..34d6c60 100644 --- a/include/xs.h +++ b/include/xs.h @@ -172,6 +172,10 @@ XS_EXPORT int xs_setctxopt (void *context, int option, const void *optval,  #define XS_PUSH 8  #define XS_XPUB 9  #define XS_XSUB 10 +#define XS_SURVEYOR 11 +#define XS_RESPONDENT 12 +#define XS_XSURVEYOR 13 +#define XS_XRESPONDENT 14  /*  Legacy socket type aliases.                                               */  #define XS_ROUTER XS_XREP diff --git a/src/Makefile.am b/src/Makefile.am index 81ece53..3025e13 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -64,6 +64,7 @@ libxs_la_SOURCES = \      reaper.hpp \      rep.hpp \      req.hpp \ +    respondent.hpp \      select.hpp \      session_base.hpp \      signaler.hpp \ @@ -71,6 +72,7 @@ libxs_la_SOURCES = \      stdint.hpp \      stream_engine.hpp \      sub.hpp \ +    surveyor.hpp \      tcp_address.hpp \      tcp_connecter.hpp \      tcp_listener.hpp \ @@ -81,7 +83,9 @@ libxs_la_SOURCES = \      xpub.hpp \      xrep.hpp \      xreq.hpp \ +    xrespondent.hpp \      xsub.hpp \ +    xsurveyor.hpp \      ypipe.hpp \      yqueue.hpp \      clock.cpp \ @@ -121,12 +125,14 @@ libxs_la_SOURCES = \      random.cpp \      rep.cpp \      req.cpp \ +    respondent.cpp \      select.cpp \      session_base.cpp \      signaler.cpp \      socket_base.cpp \      stream_engine.cpp \      sub.cpp \ +    surveyor.cpp \      tcp_address.cpp \      tcp_connecter.cpp \      tcp_listener.cpp \ @@ -135,7 +141,9 @@ libxs_la_SOURCES = \      xpub.cpp \      xrep.cpp \      xreq.cpp \ +    xrespondent.cpp \      xsub.cpp \ +    xsurveyor.cpp \      xs.cpp  if ON_MINGW diff --git a/src/respondent.cpp b/src/respondent.cpp new file mode 100644 index 0000000..c2abd84 --- /dev/null +++ b/src/respondent.cpp @@ -0,0 +1,114 @@ +/* +    Copyright (c) 2012 250bpm s.r.o. +    Copyright (c) 2012 Other contributors as noted in the AUTHORS file + +    This file is part of Crossroads I/O project. + +    Crossroads I/O is free software; you can redistribute it and/or modify it +    under the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    Crossroads is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "respondent.hpp" +#include "err.hpp" +#include "msg.hpp" + +xs::respondent_t::respondent_t (class ctx_t *parent_, uint32_t tid_, int sid_) : +    xrespondent_t (parent_, tid_, sid_), +    sending_reply (false) +{ +    options.type = XS_RESPONDENT; +} + +xs::respondent_t::~respondent_t () +{ +} + +int xs::respondent_t::xsend (msg_t *msg_, int flags_) +{ +    //  If there's no ongoing survey, we cannot send reply. +    if (!sending_reply) { +        errno = EFSM; +        return -1; +    } + +    //  Survey pattern doesn't support multipart messages. +    if (msg_->flags () & msg_t::more || flags_ & XS_SNDMORE) { +        errno = EINVAL; +        return -1; +    } + +    //  Push message to the reply pipe. +    int rc = xrespondent_t::xsend (msg_, flags_); +    if (rc != 0) +        return rc; + +    //  Flip the FSM back to request receiving state. +    sending_reply = false; + +    return 0; +} + +int xs::respondent_t::xrecv (msg_t *msg_, int flags_) +{ +    //  If we are in middle of sending a reply, we cannot receive next survey. +    if (sending_reply) { +        errno = EFSM; +        return -1; +    } + +    //  First thing to do when receiving a srvey is to copy all the labels +    //  to the reply pipe. +    while (true) { +        int rc = xrespondent_t::xrecv (msg_, flags_); +        if (rc != 0) +            return rc; +        if (!(msg_->flags () & msg_t::more)) +            break; +        rc = xrespondent_t::xsend (msg_, flags_); +        errno_assert (rc == 0); +    } + +    //  When whole survey is read, flip the FSM to reply-sending state. +    sending_reply = true; + +    return 0; +} + +bool xs::respondent_t::xhas_in () +{ +    if (sending_reply) +        return false; + +    return xrespondent_t::xhas_in (); +} + +bool xs::respondent_t::xhas_out () +{ +    if (!sending_reply) +        return false; + +    return xrespondent_t::xhas_out (); +} + +xs::respondent_session_t::respondent_session_t (io_thread_t *io_thread_, +      bool connect_, socket_base_t *socket_, const options_t &options_, +      const char *protocol_, const char *address_) : +    xrespondent_session_t (io_thread_, connect_, socket_, options_, protocol_, +        address_) +{ +} + +xs::respondent_session_t::~respondent_session_t () +{ +} + diff --git a/src/respondent.hpp b/src/respondent.hpp new file mode 100644 index 0000000..b315350 --- /dev/null +++ b/src/respondent.hpp @@ -0,0 +1,77 @@ +/* +    Copyright (c) 2012 250bpm s.r.o. +    Copyright (c) 2012 Other contributors as noted in the AUTHORS file + +    This file is part of Crossroads I/O project. + +    Crossroads I/O is free software; you can redistribute it and/or modify it +    under the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    Crossroads is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __XS_RESPONDENT_HPP_INCLUDED__ +#define __XS_RESPONDENT_HPP_INCLUDED__ + +#include "xrespondent.hpp" + +namespace xs +{ + +    class ctx_t; +    class msg_t; +    class io_thread_t; +    class socket_base_t; + +    class respondent_t : public xrespondent_t +    { +    public: + +        respondent_t (xs::ctx_t *parent_, uint32_t tid_, int sid_); +        ~respondent_t (); + +        //  Overloads of functions from socket_base_t. +        int xsend (xs::msg_t *msg_, int flags_); +        int xrecv (xs::msg_t *msg_, int flags_); +        bool xhas_in (); +        bool xhas_out (); + +    private: + +        //  If true, we are in process of sending the reply. If false we are +        //  in process of receiving a request. +        //  TODO: Consider whether automatic cancelling of the previous request +        //  when recv is called again is not a better idea than returning EFSM. +        bool sending_reply; + +        respondent_t (const respondent_t&); +        const respondent_t &operator = (const respondent_t&); + +    }; + +    class respondent_session_t : public xrespondent_session_t +    { +    public: + +        respondent_session_t (xs::io_thread_t *io_thread_, bool connect_, +            xs::socket_base_t *socket_, const options_t &options_, +            const char *protocol_, const char *address_); +        ~respondent_session_t (); + +    private: + +        respondent_session_t (const respondent_session_t&); +        const respondent_session_t &operator = (const respondent_session_t&); +    }; + +} + +#endif diff --git a/src/session_base.cpp b/src/session_base.cpp index 90fa071..35c4a4e 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -42,6 +42,10 @@  #include "push.hpp"  #include "pull.hpp"  #include "pair.hpp" +#include "surveyor.hpp" +#include "xsurveyor.hpp" +#include "respondent.hpp" +#include "xrespondent.hpp"  xs::session_base_t *xs::session_base_t::create (class io_thread_t *io_thread_,      bool connect_, class socket_base_t *socket_, const options_t &options_, @@ -93,6 +97,22 @@ xs::session_base_t *xs::session_base_t::create (class io_thread_t *io_thread_,          s = new (std::nothrow) pair_session_t (io_thread_, connect_,              socket_, options_, protocol_, address_);          break; +    case XS_SURVEYOR: +        s = new (std::nothrow) surveyor_session_t (io_thread_, connect_, +            socket_, options_, protocol_, address_); +        break; +    case XS_XSURVEYOR: +        s = new (std::nothrow) xsurveyor_session_t (io_thread_, connect_, +            socket_, options_, protocol_, address_); +        break; +    case XS_RESPONDENT: +        s = new (std::nothrow) respondent_session_t (io_thread_, connect_, +            socket_, options_, protocol_, address_); +        break; +    case XS_XRESPONDENT: +        s = new (std::nothrow) xrespondent_session_t (io_thread_, connect_, +            socket_, options_, protocol_, address_); +        break;      default:          errno = EINVAL;          return NULL; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index eb9b491..df182f1 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -61,6 +61,10 @@  #include "xrep.hpp"  #include "xpub.hpp"  #include "xsub.hpp" +#include "surveyor.hpp" +#include "xsurveyor.hpp" +#include "respondent.hpp" +#include "xrespondent.hpp"  bool xs::socket_base_t::check_tag ()  { @@ -106,6 +110,18 @@ xs::socket_base_t *xs::socket_base_t::create (int type_, class ctx_t *parent_,      case XS_XSUB:          s = new (std::nothrow) xsub_t (parent_, tid_, sid_);          break; +    case XS_SURVEYOR: +        s = new (std::nothrow) surveyor_t (parent_, tid_, sid_); +        break; +    case XS_XSURVEYOR: +        s = new (std::nothrow) xsurveyor_t (parent_, tid_, sid_); +        break; +    case XS_RESPONDENT: +        s = new (std::nothrow) respondent_t (parent_, tid_, sid_); +        break; +    case XS_XRESPONDENT: +        s = new (std::nothrow) xrespondent_t (parent_, tid_, sid_); +        break;      default:          errno = EINVAL;          return NULL; diff --git a/src/surveyor.cpp b/src/surveyor.cpp new file mode 100644 index 0000000..c5216bd --- /dev/null +++ b/src/surveyor.cpp @@ -0,0 +1,135 @@ +/* +    Copyright (c) 2012 250bpm s.r.o. +    Copyright (c) 2012 Other contributors as noted in the AUTHORS file + +    This file is part of Crossroads I/O project. + +    Crossroads I/O is free software; you can redistribute it and/or modify it +    under the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    Crossroads is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "surveyor.hpp" +#include "err.hpp" +#include "msg.hpp" +#include "wire.hpp" +#include "likely.hpp" +#include "random.hpp" + +xs::surveyor_t::surveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) : +    xsurveyor_t (parent_, tid_, sid_), +    receiving_responses (false), +    survey_id (generate_random ()) +{ +    options.type = XS_SURVEYOR; +} + +xs::surveyor_t::~surveyor_t () +{ +} + +int xs::surveyor_t::xsend (msg_t *msg_, int flags_) +{ +    int rc; + +    //  Survey pattern works only with sigle-part messages. +    if (flags_ & XS_SNDMORE || msg_->flags () & msg_t::more) { +        errno = EINVAL; +        return -1; +    } + +    //  Start the new survey. First, generate new survey ID. +    ++survey_id; +    msg_t id; +    rc = id.init_size (4); +    errno_assert (rc == 0); +    put_uint32 ((unsigned char*) id.data (), survey_id); +    id.set_flags (msg_t::more); +    rc = xsurveyor_t::xsend (&id, 0); +    if (rc != 0) { +        id.close (); +        return -1; +    } +    id.close (); + +    //  Now send the body of the survey. +    rc = xsurveyor_t::xsend (msg_, flags_); +    errno_assert (rc == 0); + +    //  Start waiting for responses from the peers. +    receiving_responses = true; + +    return 0; +} + +int xs::surveyor_t::xrecv (msg_t *msg_, int flags_) +{ +    int rc; + +    //  If there's no survey underway, it's an error. +    if (!receiving_responses) { +        errno = EFSM; +        return -1; +    } + +    //  Get the first part of the response -- the survey ID. +    rc = xsurveyor_t::xrecv (msg_, flags_); +    if (rc != 0) +        return rc; + +    //  Check whether this is response for the onging survey. If not, we can +    //  drop the response. +    if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 4 || +          get_uint32 ((unsigned char*) msg_->data ()) != survey_id)) { +        while (true) { +            rc = xsurveyor_t::xrecv (msg_, flags_); +            errno_assert (rc == 0); +            if (!(msg_->flags () & msg_t::more)) +                break; +        } +        msg_->close (); +        msg_->init (); +        errno = EAGAIN; +        return -1; +    } + +    //  Get the body of the response. +    rc = xsurveyor_t::xrecv (msg_, flags_); +    errno_assert (rc == 0); + +    return 0; +} + +bool xs::surveyor_t::xhas_in () +{ +    //  TODO: We need a prefetched_message here... +    xs_assert (false); +    return false; +} + +bool xs::surveyor_t::xhas_out () +{ +    return xsurveyor_t::xhas_out (); +} + +xs::surveyor_session_t::surveyor_session_t (io_thread_t *io_thread_, +      bool connect_, socket_base_t *socket_, const options_t &options_, +      const char *protocol_, const char *address_) : +    xsurveyor_session_t (io_thread_, connect_, socket_, options_, protocol_, +        address_) +{ +} + +xs::surveyor_session_t::~surveyor_session_t () +{ +} + diff --git a/src/surveyor.hpp b/src/surveyor.hpp new file mode 100644 index 0000000..83fdbb0 --- /dev/null +++ b/src/surveyor.hpp @@ -0,0 +1,77 @@ +/* +    Copyright (c) 2012 250bpm s.r.o. +    Copyright (c) 2012 Other contributors as noted in the AUTHORS file + +    This file is part of Crossroads I/O project. + +    Crossroads I/O is free software; you can redistribute it and/or modify it +    under the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    Crossroads is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __XS_SURVEYOR_HPP_INCLUDED__ +#define __XS_SURVEYOR_HPP_INCLUDED__ + +#include "xsurveyor.hpp" +#include "stdint.hpp" + +namespace xs +{ + +    class ctx_t; +    class msg_t; +    class io_thread_t; +    class socket_base_t; + +    class surveyor_t : public xsurveyor_t +    { +    public: + +        surveyor_t (xs::ctx_t *parent_, uint32_t tid_, int sid_); +        ~surveyor_t (); + +        //  Overloads of functions from socket_base_t. +        int xsend (xs::msg_t *msg_, int flags_); +        int xrecv (xs::msg_t *msg_, int flags_); +        bool xhas_in (); +        bool xhas_out (); + +    private: + +        //  If true, survey was already lauched and have no expriered yet. +        bool receiving_responses; + +        //  The ID of the ongoing survey. +        uint32_t survey_id; + +        surveyor_t (const surveyor_t&); +        const surveyor_t &operator = (const surveyor_t&); +    }; + +    class surveyor_session_t : public xsurveyor_session_t +    { +    public: + +        surveyor_session_t (xs::io_thread_t *io_thread_, bool connect_, +            xs::socket_base_t *socket_, const options_t &options_, +            const char *protocol_, const char *address_); +        ~surveyor_session_t (); + +    private: + +        surveyor_session_t (const surveyor_session_t&); +        const surveyor_session_t &operator = (const surveyor_session_t&); +    }; + +} + +#endif diff --git a/src/xrespondent.cpp b/src/xrespondent.cpp new file mode 100644 index 0000000..026fe8d --- /dev/null +++ b/src/xrespondent.cpp @@ -0,0 +1,278 @@ +/* +    Copyright (c) 2012 250bpm s.r.o. +    Copyright (c) 2012 Other contributors as noted in the AUTHORS file + +    This file is part of Crossroads I/O project. + +    Crossroads I/O is free software; you can redistribute it and/or modify it +    under the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    Crossroads is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "xrespondent.hpp" +#include "pipe.hpp" +#include "wire.hpp" +#include "random.hpp" +#include "likely.hpp" +#include "err.hpp" + +xs::xrespondent_t::xrespondent_t (class ctx_t *parent_, uint32_t tid_, +        int sid_) : +    socket_base_t (parent_, tid_, sid_), +    prefetched (0), +    more_in (false), +    current_out (NULL), +    more_out (false), +    next_peer_id (generate_random ()) +{ +    options.type = XS_XRESPONDENT; + +    prefetched_msg.init (); +} + +xs::xrespondent_t::~xrespondent_t () +{ +    xs_assert (outpipes.empty ()); +    prefetched_msg.close (); +} + +void xs::xrespondent_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) +{ +    xs_assert (pipe_); + +    //  Add the pipe to the map out outbound pipes. +    outpipe_t outpipe = {pipe_, true}; +    bool ok = outpipes.insert (outpipes_t::value_type ( +        next_peer_id, outpipe)).second; +    xs_assert (ok); + +    //  Add the pipe to the list of inbound pipes. +    blob_t identity (4, 0); +    put_uint32 ((unsigned char*) identity.data (), next_peer_id); +    pipe_->set_identity (identity); +    fq.attach (pipe_); + +    //  Generate a new unique peer identity. +    ++next_peer_id;     +} + +void xs::xrespondent_t::xterminated (pipe_t *pipe_) +{ +    fq.terminated (pipe_); + +    for (outpipes_t::iterator it = outpipes.begin (); +          it != outpipes.end (); ++it) { +        if (it->second.pipe == pipe_) { +            outpipes.erase (it); +            if (pipe_ == current_out) +                current_out = NULL; +            return; +        } +    } +    xs_assert (false); +} + +void xs::xrespondent_t::xread_activated (pipe_t *pipe_) +{ +    fq.activated (pipe_); +} + +void xs::xrespondent_t::xwrite_activated (pipe_t *pipe_) +{ +    for (outpipes_t::iterator it = outpipes.begin (); +          it != outpipes.end (); ++it) { +        if (it->second.pipe == pipe_) { +            xs_assert (!it->second.active); +            it->second.active = true; +            return; +        } +    } +    xs_assert (false); +} + +int xs::xrespondent_t::xsend (msg_t *msg_, int flags_) +{ +    //  If this is the first part of the message it's the ID of the +    //  peer to send the message to. +    if (!more_out) { +        xs_assert (!current_out); + +        //  If we have malformed message (prefix with no subsequent message) +        //  then just silently ignore it. +        //  TODO: The connections should be killed instead. +        if (msg_->flags () & msg_t::more && msg_->size () == 4) { + +            more_out = true; + +            //  Find the pipe associated with the identity stored in the prefix. +            //  If there's no such pipe just silently ignore the message. +            uint32_t identity = get_uint32 ((unsigned char*) msg_->data ()); +            outpipes_t::iterator it = outpipes.find (identity); + +            if (it != outpipes.end ()) { +                current_out = it->second.pipe; +                msg_t empty; +                int rc = empty.init (); +                errno_assert (rc == 0); +                if (!current_out->check_write (&empty)) { +                    it->second.active = false; +                    more_out = false; +                    current_out = NULL; +                } +                rc = empty.close (); +                errno_assert (rc == 0); +            } + +        } + +        int rc = msg_->close (); +        errno_assert (rc == 0); +        rc = msg_->init (); +        errno_assert (rc == 0); +        return 0; +    } + +    //  Check whether this is the last part of the message. +    more_out = msg_->flags () & msg_t::more ? true : false; + +    //  Push the message into the pipe. If there's no out pipe, just drop it. +    if (current_out) { +        bool ok = current_out->write (msg_); +        if (unlikely (!ok)) +            current_out = NULL; +        else if (!more_out) { +            current_out->flush (); +            current_out = NULL; +        } +    } +    else { +        int rc = msg_->close (); +        errno_assert (rc == 0); +    } + +    //  Detach the message from the data buffer. +    int rc = msg_->init (); +    errno_assert (rc == 0); + +    return 0; +} + +int xs::xrespondent_t::xrecv (msg_t *msg_, int flags_) +{ +    int rc; + +    //  if there is a prefetched identity, return it. +    if (prefetched == 2) +    { +        rc = msg_->init_size (prefetched_id.size ()); +        errno_assert (rc == 0); +        memcpy (msg_->data (), prefetched_id.data (), prefetched_id.size ()); +        msg_->set_flags (msg_t::more); +        prefetched = 1; +        return 0; +    } + +    //  If there is a prefetched message, return it. +    if (prefetched == 1) { +        rc = msg_->move (prefetched_msg); | 
