From 0b5cc026fbe7ccc6de66907be29471562a2d344d Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 6 Aug 2009 12:51:32 +0200 Subject: clean up - session/socket/engine stuff removed --- src/zmq_tcp_engine.cpp | 185 ------------------------------------------------- 1 file changed, 185 deletions(-) delete mode 100644 src/zmq_tcp_engine.cpp (limited to 'src/zmq_tcp_engine.cpp') diff --git a/src/zmq_tcp_engine.cpp b/src/zmq_tcp_engine.cpp deleted file mode 100644 index 6091d80..0000000 --- a/src/zmq_tcp_engine.cpp +++ /dev/null @@ -1,185 +0,0 @@ -/* - Copyright (c) 2007-2009 FastMQ Inc. - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "zmq_tcp_engine.hpp" -#include "config.hpp" -#include "i_session.hpp" -#include "err.hpp" - -zmq::zmq_tcp_engine_t::zmq_tcp_engine_t (fd_t fd_) : - poller (NULL), - session (NULL), - terminating (false), - insize (0), - inpos (0), - outsize (0), - outpos (0) -{ - // Allocate read & write buffer. - inbuf = new unsigned char [in_batch_size]; - zmq_assert (inbuf); - outbuf = new unsigned char [out_batch_size]; - zmq_assert (outbuf); - - // Attach the socket. - int rc = socket.open (fd_); - zmq_assert (rc == 0); -} - -void zmq::zmq_tcp_engine_t::attach (i_poller *poller_, i_session *session_) -{ - zmq_assert (!poller); - poller = poller_; - zmq_assert (!session); - session = session_; - encoder.set_session (session); - decoder.set_session (session); - - // Let session know we are here. - session->set_engine (this); - - // Register the engine with the polling thread. - handle = poller->add_fd (socket.get_fd (), this); - poller->set_pollin (handle); - poller->set_pollout (handle); - - // Flush any pending inbound messages. - in_event (); -} - -void zmq::zmq_tcp_engine_t::detach () -{ - zmq_assert (poller); - poller->rm_fd (handle); - poller = NULL; - zmq_assert (session); - session->set_engine (NULL); - session = NULL; - encoder.set_session (NULL); - decoder.set_session (NULL); -} - -void zmq::zmq_tcp_engine_t::revive () -{ - zmq_assert (poller); - poller->set_pollout (handle); -} - -void zmq::zmq_tcp_engine_t::schedule_terminate () -{ - terminating = true; -} - -void zmq::zmq_tcp_engine_t::terminate () -{ - delete this; -} - -void zmq::zmq_tcp_engine_t::shutdown () -{ - delete this; -} - -zmq::zmq_tcp_engine_t::~zmq_tcp_engine_t () -{ - detach (); - delete [] outbuf; - delete [] inbuf; -} - -void zmq::zmq_tcp_engine_t::in_event () -{ - // If there's no data to process in the buffer, read new data. - if (inpos == insize) { - - // Read as much data as possible to the read buffer. - insize = socket.read (inbuf, in_batch_size); - inpos = 0; - - // Check whether the peer has closed the connection. - if (insize == -1) { - insize = 0; - error (); - return; - } - } - - // Following code should be executed even if there's not a single byte in - // the buffer. There still can be a decoded messages stored in the decoder. - - // Push the data to the decoder. - int nbytes = decoder.write (inbuf + inpos, insize - inpos); - - // Adjust read position. Stop polling for input if we got stuck. - inpos += nbytes; - if (inpos < insize) - poller->reset_pollin (handle); - - // If at least one byte was processed, flush all messages the decoder - // may have produced. If engine is disconnected from session, no need - // to flush the messages. It's important that flush is called at the - // very end of in_event as it may invoke in_event itself. - if (nbytes > 0 && session) - session->flush (); -} - -void zmq::zmq_tcp_engine_t::out_event () -{ - // If write buffer is empty, try to read new data from the encoder. - if (outpos == outsize) { - - outsize = encoder.read (outbuf, out_batch_size); - outpos = 0; - - // If there are no more pipes, engine can be deallocated. - if (terminating) { - terminate (); - return; - } - - // If there is no data to send, stop polling for output. - if (outsize == 0) - poller->reset_pollout (handle); - } - - // If there are any data to write in write buffer, write as much as - // possible to the socket. - if (outpos < outsize) { - int nbytes = socket.write (outbuf + outpos, outsize - outpos); - - // Handle problems with the connection. - if (nbytes == -1) { - error (); - return; - } - - outpos += nbytes; - } -} - -void zmq::zmq_tcp_engine_t::timer_event () -{ - zmq_assert (false); -} - -void zmq::zmq_tcp_engine_t::error () -{ - zmq_assert (false); -} - -- cgit v1.2.3