From 059beca59d39d90a8ee0e1b07f840994962ea89e Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Wed, 12 Aug 2009 09:40:16 +0200 Subject: listener/connecter/init/session added --- src/zmq_engine.cpp | 107 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 104 insertions(+), 3 deletions(-) (limited to 'src/zmq_engine.cpp') diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 3708c9a..3620d30 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -19,17 +19,118 @@ #include "zmq_engine.hpp" #include "io_thread.hpp" +#include "i_inout.hpp" +#include "config.hpp" +#include "err.hpp" -zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, object_t *owner_) : - io_object_t (parent_, owner_) +zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) : + io_object_t (parent_), + insize (0), + inpos (0), + outsize (0), + outpos (0), + inout (NULL) { + // Allocate read & write buffer. + inbuf = (unsigned char*) malloc (in_batch_size); + zmq_assert (inbuf); + outbuf = (unsigned char*) malloc (out_batch_size); + zmq_assert (outbuf); + + // Initialise the underlying socket. + int rc = tcp_socket.open (fd_); + zmq_assert (rc == 0); } zmq::zmq_engine_t::~zmq_engine_t () { + free (outbuf); + free (inbuf); +} + +void zmq::zmq_engine_t::plug (i_inout *inout_) +{ + encoder.set_inout (inout_); + decoder.set_inout (inout_); + + handle = add_fd (tcp_socket.get_fd ()); + set_pollin (handle); + set_pollout (handle); + + inout = inout_; +} + +void zmq::zmq_engine_t::unplug () +{ + rm_fd (handle); + inout = NULL; +} + +void zmq::zmq_engine_t::in_event () +{ + // If there's no data to process in the buffer, read new data. + if (inpos == insize) { + + // Read as much data as possible to the read buffer. + insize = tcp_socket.read (inbuf, in_batch_size); +printf ("%d bytes read\n", (int) insize); + inpos = 0; + + // Check whether the peer has closed the connection. + if (insize == -1) { + insize = 0; + error (); + return; + } + } + + // Following code should be executed even if there's not a single byte in + // the buffer. There still can be a decoded messages stored in the decoder. + + // Push the data to the decoder. + int nbytes = decoder.write (inbuf + inpos, insize - inpos); + + // Adjust read position. Stop polling for input if we got stuck. + inpos += nbytes; + if (inpos < insize) + reset_pollin (handle); + + // If at least one byte was processed, flush all messages the decoder + // may have produced. + if (nbytes > 0) + inout->flush (); + } -void zmq::zmq_engine_t::process_plug () +void zmq::zmq_engine_t::out_event () { + // If write buffer is empty, try to read new data from the encoder. + if (outpos == outsize) { + + outsize = encoder.read (outbuf, out_batch_size); + outpos = 0; + + // If there is no data to send, stop polling for output. + if (outsize == 0) + reset_pollout (handle); + } + + // If there are any data to write in write buffer, write as much as + // possible to the socket. + if (outpos < outsize) { + int nbytes = tcp_socket.write (outbuf + outpos, outsize - outpos); + + // Handle problems with the connection. + if (nbytes == -1) { + error (); + return; + } + + outpos += nbytes; + } } +void zmq::zmq_engine_t::error () +{ + zmq_assert (false); +} -- cgit v1.2.3