diff options
Diffstat (limited to 'src/surveyor.cpp')
-rw-r--r-- | src/surveyor.cpp | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/src/surveyor.cpp b/src/surveyor.cpp new file mode 100644 index 0000000..c5216bd --- /dev/null +++ b/src/surveyor.cpp @@ -0,0 +1,135 @@ +/* + Copyright (c) 2012 250bpm s.r.o. + Copyright (c) 2012 Other contributors as noted in the AUTHORS file + + This file is part of Crossroads I/O project. + + Crossroads I/O is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + Crossroads is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "surveyor.hpp" +#include "err.hpp" +#include "msg.hpp" +#include "wire.hpp" +#include "likely.hpp" +#include "random.hpp" + +xs::surveyor_t::surveyor_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + xsurveyor_t (parent_, tid_, sid_), + receiving_responses (false), + survey_id (generate_random ()) +{ + options.type = XS_SURVEYOR; +} + +xs::surveyor_t::~surveyor_t () +{ +} + +int xs::surveyor_t::xsend (msg_t *msg_, int flags_) +{ + int rc; + + // Survey pattern works only with sigle-part messages. + if (flags_ & XS_SNDMORE || msg_->flags () & msg_t::more) { + errno = EINVAL; + return -1; + } + + // Start the new survey. First, generate new survey ID. + ++survey_id; + msg_t id; + rc = id.init_size (4); + errno_assert (rc == 0); + put_uint32 ((unsigned char*) id.data (), survey_id); + id.set_flags (msg_t::more); + rc = xsurveyor_t::xsend (&id, 0); + if (rc != 0) { + id.close (); + return -1; + } + id.close (); + + // Now send the body of the survey. + rc = xsurveyor_t::xsend (msg_, flags_); + errno_assert (rc == 0); + + // Start waiting for responses from the peers. + receiving_responses = true; + + return 0; +} + +int xs::surveyor_t::xrecv (msg_t *msg_, int flags_) +{ + int rc; + + // If there's no survey underway, it's an error. + if (!receiving_responses) { + errno = EFSM; + return -1; + } + + // Get the first part of the response -- the survey ID. + rc = xsurveyor_t::xrecv (msg_, flags_); + if (rc != 0) + return rc; + + // Check whether this is response for the onging survey. If not, we can + // drop the response. + if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 4 || + get_uint32 ((unsigned char*) msg_->data ()) != survey_id)) { + while (true) { + rc = xsurveyor_t::xrecv (msg_, flags_); + errno_assert (rc == 0); + if (!(msg_->flags () & msg_t::more)) + break; + } + msg_->close (); + msg_->init (); + errno = EAGAIN; + return -1; + } + + // Get the body of the response. + rc = xsurveyor_t::xrecv (msg_, flags_); + errno_assert (rc == 0); + + return 0; +} + +bool xs::surveyor_t::xhas_in () +{ + // TODO: We need a prefetched_message here... + xs_assert (false); + return false; +} + +bool xs::surveyor_t::xhas_out () +{ + return xsurveyor_t::xhas_out (); +} + +xs::surveyor_session_t::surveyor_session_t (io_thread_t *io_thread_, + bool connect_, socket_base_t *socket_, const options_t &options_, + const char *protocol_, const char *address_) : + xsurveyor_session_t (io_thread_, connect_, socket_, options_, protocol_, + address_) +{ +} + +xs::surveyor_session_t::~surveyor_session_t () +{ +} + |