diff options
author | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-07-29 12:07:54 +0200 |
---|---|---|
committer | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-07-29 12:07:54 +0200 |
commit | 4ed70a930202b103e7e80b8dc925e0aaa4622595 (patch) | |
tree | aeed881ce17629f81b7c90f7d675aac8ecf69d4f /src/zmq_tcp_engine.cpp |
initial commit
Diffstat (limited to 'src/zmq_tcp_engine.cpp')
-rw-r--r-- | src/zmq_tcp_engine.cpp | 185 |
1 files changed, 185 insertions, 0 deletions
diff --git a/src/zmq_tcp_engine.cpp b/src/zmq_tcp_engine.cpp new file mode 100644 index 0000000..0f55808 --- /dev/null +++ b/src/zmq_tcp_engine.cpp @@ -0,0 +1,185 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "zmq_tcp_engine.hpp" +#include "config.hpp" +#include "i_session.hpp" +#include "err.hpp" + +zs::zmq_tcp_engine_t::zmq_tcp_engine_t (fd_t fd_) : + poller (NULL), + session (NULL), + terminating (false), + insize (0), + inpos (0), + outsize (0), + outpos (0) +{ + // Allocate read & write buffer. + inbuf = new unsigned char [in_batch_size]; + zs_assert (inbuf); + outbuf = new unsigned char [out_batch_size]; + zs_assert (outbuf); + + // Attach the socket. + int rc = socket.open (fd_); + zs_assert (rc == 0); +} + +void zs::zmq_tcp_engine_t::attach (i_poller *poller_, i_session *session_) +{ + zs_assert (!poller); + poller = poller_; + zs_assert (!session); + session = session_; + encoder.set_session (session); + decoder.set_session (session); + + // Let session know we are here. + session->set_engine (this); + + // Register the engine with the polling thread. + handle = poller->add_fd (socket.get_fd (), this); + poller->set_pollin (handle); + poller->set_pollout (handle); + + // Flush any pending inbound messages. + in_event (); +} + +void zs::zmq_tcp_engine_t::detach () +{ + zs_assert (poller); + poller->rm_fd (handle); + poller = NULL; + zs_assert (session); + session->set_engine (NULL); + session = NULL; + encoder.set_session (NULL); + decoder.set_session (NULL); +} + +void zs::zmq_tcp_engine_t::revive () +{ + zs_assert (poller); + poller->set_pollout (handle); +} + +void zs::zmq_tcp_engine_t::schedule_terminate () +{ + terminating = true; +} + +void zs::zmq_tcp_engine_t::terminate () +{ + delete this; +} + +void zs::zmq_tcp_engine_t::shutdown () +{ + delete this; +} + +zs::zmq_tcp_engine_t::~zmq_tcp_engine_t () +{ + detach (); + delete [] outbuf; + delete [] inbuf; +} + +void zs::zmq_tcp_engine_t::in_event () +{ + // If there's no data to process in the buffer, read new data. + if (inpos == insize) { + + // Read as much data as possible to the read buffer. + insize = socket.read (inbuf, in_batch_size); + inpos = 0; + + // Check whether the peer has closed the connection. + if (insize == -1) { + insize = 0; + error (); + return; + } + } + + // Following code should be executed even if there's not a single byte in + // the buffer. There still can be a decoded messages stored in the decoder. + + // Push the data to the decoder. + int nbytes = decoder.write (inbuf + inpos, insize - inpos); + + // Adjust read position. Stop polling for input if we got stuck. + inpos += nbytes; + if (inpos < insize) + poller->reset_pollin (handle); + + // If at least one byte was processed, flush all messages the decoder + // may have produced. If engine is disconnected from session, no need + // to flush the messages. It's important that flush is called at the + // very end of in_event as it may invoke in_event itself. + if (nbytes > 0 && session) + session->flush (); +} + +void zs::zmq_tcp_engine_t::out_event () +{ + // If write buffer is empty, try to read new data from the encoder. + if (outpos == outsize) { + + outsize = encoder.read (outbuf, out_batch_size); + outpos = 0; + + // If there are no more pipes, engine can be deallocated. + if (terminating) { + terminate (); + return; + } + + // If there is no data to send, stop polling for output. + if (outsize == 0) + poller->reset_pollout (handle); + } + + // If there are any data to write in write buffer, write as much as + // possible to the socket. + if (outpos < outsize) { + int nbytes = socket.write (outbuf + outpos, outsize - outpos); + + // Handle problems with the connection. + if (nbytes == -1) { + error (); + return; + } + + outpos += nbytes; + } +} + +void zs::zmq_tcp_engine_t::timer_event () +{ + zs_assert (false); +} + +void zs::zmq_tcp_engine_t::error () +{ + zs_assert (false); +} + |