From 99c5d9283622a0b37ee80f83ff4875c059fc5990 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 27 Aug 2009 10:54:28 +0200 Subject: pipes added --- src/pipe.cpp | 112 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 src/pipe.cpp (limited to 'src/pipe.cpp') diff --git a/src/pipe.cpp b/src/pipe.cpp new file mode 100644 index 0000000..5016631 --- /dev/null +++ b/src/pipe.cpp @@ -0,0 +1,112 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include + +#include "pipe.hpp" + +zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, + uint64_t hwm_, uint64_t lwm_) : + object_t (parent_), + pipe (pipe_), + peer (&pipe_->writer), + hwm (hwm_), + lwm (lwm_), + index (-1), + endpoint (NULL) +{ +} + +zmq::reader_t::~reader_t () +{ +} + +bool zmq::reader_t::read (zmq_msg_t *msg_) +{ + return pipe->read (msg_); + + // TODO: Adjust the size of the pipe. +} + +void zmq::reader_t::set_endpoint (i_endpoint *endpoint_) +{ + endpoint = endpoint_; +} + +void zmq::reader_t::set_index (int index_) +{ + index = index_; +} + +int zmq::reader_t::get_index () +{ + return index; +} + +void zmq::reader_t::process_revive () +{ + endpoint->revive (this); +} + +zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, + uint64_t hwm_, uint64_t lwm_) : + object_t (parent_), + pipe (pipe_), + peer (&pipe_->reader), + hwm (hwm_), + lwm (lwm_) +{ +} + +zmq::writer_t::~writer_t () +{ +} + +bool zmq::writer_t::check_write (uint64_t size_) +{ + // TODO: Check whether hwm is exceeded. + + return true; +} + +bool zmq::writer_t::write (struct zmq_msg_t *msg_) +{ + pipe->write (*msg_); + return true; + + // TODO: Adjust size of the pipe. +} + +void zmq::writer_t::flush () +{ + if (!pipe->flush ()) + send_revive (peer); +} + +zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_, + uint64_t hwm_, uint64_t lwm_) : + reader (reader_parent_, this, hwm_, lwm_), + writer (writer_parent_, this, hwm_, lwm_) +{ +} + +zmq::pipe_t::~pipe_t () +{ +} + -- cgit v1.2.3