summaryrefslogtreecommitdiff
path: root/src/zmq_engine.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-12 09:40:16 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-12 09:40:16 +0200
commit059beca59d39d90a8ee0e1b07f840994962ea89e (patch)
tree007a5d86450c543bb9a362a844ee271115b68c54 /src/zmq_engine.cpp
parentbda766ab401b6c565fe9c2d0bc80c11bbbe84488 (diff)
listener/connecter/init/session added
Diffstat (limited to 'src/zmq_engine.cpp')
-rw-r--r--src/zmq_engine.cpp107
1 files changed, 104 insertions, 3 deletions
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 3708c9a..3620d30 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -19,17 +19,118 @@
#include "zmq_engine.hpp"
#include "io_thread.hpp"
+#include "i_inout.hpp"
+#include "config.hpp"
+#include "err.hpp"
-zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, object_t *owner_) :
- io_object_t (parent_, owner_)
+zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_) :
+ io_object_t (parent_),
+ insize (0),
+ inpos (0),
+ outsize (0),
+ outpos (0),
+ inout (NULL)
{
+ // Allocate read & write buffer.
+ inbuf = (unsigned char*) malloc (in_batch_size);
+ zmq_assert (inbuf);
+ outbuf = (unsigned char*) malloc (out_batch_size);
+ zmq_assert (outbuf);
+
+ // Initialise the underlying socket.
+ int rc = tcp_socket.open (fd_);
+ zmq_assert (rc == 0);
}
zmq::zmq_engine_t::~zmq_engine_t ()
{
+ free (outbuf);
+ free (inbuf);
+}
+
+void zmq::zmq_engine_t::plug (i_inout *inout_)
+{
+ encoder.set_inout (inout_);
+ decoder.set_inout (inout_);
+
+ handle = add_fd (tcp_socket.get_fd ());
+ set_pollin (handle);
+ set_pollout (handle);
+
+ inout = inout_;
+}
+
+void zmq::zmq_engine_t::unplug ()
+{
+ rm_fd (handle);
+ inout = NULL;
+}
+
+void zmq::zmq_engine_t::in_event ()
+{
+ // If there's no data to process in the buffer, read new data.
+ if (inpos == insize) {
+
+ // Read as much data as possible to the read buffer.
+ insize = tcp_socket.read (inbuf, in_batch_size);
+printf ("%d bytes read\n", (int) insize);
+ inpos = 0;
+
+ // Check whether the peer has closed the connection.
+ if (insize == -1) {
+ insize = 0;
+ error ();
+ return;
+ }
+ }
+
+ // Following code should be executed even if there's not a single byte in
+ // the buffer. There still can be a decoded messages stored in the decoder.
+
+ // Push the data to the decoder.
+ int nbytes = decoder.write (inbuf + inpos, insize - inpos);
+
+ // Adjust read position. Stop polling for input if we got stuck.
+ inpos += nbytes;
+ if (inpos < insize)
+ reset_pollin (handle);
+
+ // If at least one byte was processed, flush all messages the decoder
+ // may have produced.
+ if (nbytes > 0)
+ inout->flush ();
+
}
-void zmq::zmq_engine_t::process_plug ()
+void zmq::zmq_engine_t::out_event ()
{
+ // If write buffer is empty, try to read new data from the encoder.
+ if (outpos == outsize) {
+
+ outsize = encoder.read (outbuf, out_batch_size);
+ outpos = 0;
+
+ // If there is no data to send, stop polling for output.
+ if (outsize == 0)
+ reset_pollout (handle);
+ }
+
+ // If there are any data to write in write buffer, write as much as
+ // possible to the socket.
+ if (outpos < outsize) {
+ int nbytes = tcp_socket.write (outbuf + outpos, outsize - outpos);
+
+ // Handle problems with the connection.
+ if (nbytes == -1) {
+ error ();
+ return;
+ }
+
+ outpos += nbytes;
+ }
}
+void zmq::zmq_engine_t::error ()
+{
+ zmq_assert (false);
+}