From 4ed70a930202b103e7e80b8dc925e0aaa4622595 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 29 Jul 2009 12:07:54 +0200 Subject: initial commit --- src/pipe_reader.cpp | 118 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 src/pipe_reader.cpp (limited to 'src/pipe_reader.cpp') diff --git a/src/pipe_reader.cpp b/src/pipe_reader.cpp new file mode 100644 index 0000000..5585b92 --- /dev/null +++ b/src/pipe_reader.cpp @@ -0,0 +1,118 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../include/zs.h" + +#include "pipe_reader.hpp" +#include "pipe.hpp" +#include "err.hpp" +#include "i_mux.hpp" + +zs::pipe_reader_t::pipe_reader_t (object_t *parent_, pipe_t *pipe_, + uint64_t hwm_, uint64_t lwm_) : + object_t (parent_), + pipe (pipe_), + peer (NULL), + mux (NULL), + index (-1), + hwm (hwm_), + lwm (lwm_), + head (0), + tail (0), + last_sent_head (0) +{ +} + +void zs::pipe_reader_t::set_peer (object_t *peer_) +{ + peer = peer_; +} + +zs::pipe_reader_t::~pipe_reader_t () +{ +} + +void zs::pipe_reader_t::set_mux (i_mux *mux_) +{ + mux = mux_; +} + +void zs::pipe_reader_t::set_index (int index_) +{ + index = index_; +} + +int zs::pipe_reader_t::get_index () +{ + return index; +} + +void zs::pipe_reader_t::process_tail (uint64_t bytes_) +{ + tail = bytes_; + mux->reactivate (this); +} + +bool zs::pipe_reader_t::read (struct zs_msg *msg_) +{ + // Read a message. + if (!pipe->read (msg_)) { + mux->deactivate (this); + return false; + } + + // If successfull, adjust the head of the pipe. + head += zs_msg_size (msg_); + + // If pipe writer wasn't notified about the head position for long enough, + // notify it. + if (head - last_sent_head >= hwm - lwm) { + send_head (peer, head); + last_sent_head = head; + } + + if (zs_msg_type (msg_) == ZS_DELIMITER) { + + // Detach the pipe from the mux and send termination request to + // the pipe writer. + mux->detach_pipe (this); + mux = NULL; + send_terminate (peer); + return false; + } + + return true; +} + +void zs::pipe_reader_t::terminate () +{ + // Detach the pipe from the mux and send termination request to + // the pipe writer. + if (mux) { + mux->detach_pipe (this); + mux = NULL; + } + send_terminate (peer); +} + +void zs::pipe_reader_t::process_terminate_ack () +{ + // Ask dispatcher to deallocate the pipe. + destroy_pipe (pipe); +} -- cgit v1.2.3