From dcb983699e52bf2e075baaeef250bcd3c82e4846 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 13 Mar 2010 08:59:46 +0100 Subject: zmq_queue implementation added --- devices/zmq_queue/zmq_queue.cpp | 114 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 109 insertions(+), 5 deletions(-) diff --git a/devices/zmq_queue/zmq_queue.cpp b/devices/zmq_queue/zmq_queue.cpp index 3bcce97..5eae750 100644 --- a/devices/zmq_queue/zmq_queue.cpp +++ b/devices/zmq_queue/zmq_queue.cpp @@ -20,6 +20,113 @@ #include "../../include/zmq.hpp" #include "../../foreign/xmlParser/xmlParser.cpp" +class queue +{ +public: + + queue (zmq::socket_t& reply, zmq::socket_t& request) : + xrep (reply), + xreq (request) + { + items [0].socket = reply; + items [0].fd = 0; + items [0].events = ZMQ_POLLIN; + items [0].revents = 0; + + items [1].socket = request; + items [1].fd = 0; + items [1].events = ZMQ_POLLIN; + items [1].revents = 0; + + m_next_request_method = &queue::get_request; + m_next_response_method = &queue::get_response; + } + + void run() + { + while (true) { + int rc = zmq::poll (&items [0], 2, -1); + if (rc < 0) + break; + next_request(); + next_response(); + } + } + +private: + + void next_request() + { + (this->*m_next_request_method) (); + } + + void next_response() + { + (this->*m_next_response_method) (); + } + + void get_request() + { + if (items [0].revents & ZMQ_POLLIN ) { + int rc = xrep.recv (&request_msg, ZMQ_NOBLOCK); + if (!rc) + return; + items [0].events &= ~ZMQ_POLLIN; + items [1].events |= ZMQ_POLLOUT; + m_next_request_method = &queue::send_request; + } + } + + void send_request() + { + if (items [1].revents & ZMQ_POLLOUT) { + int rc = xreq.send (request_msg, ZMQ_NOBLOCK); + if (!rc) return; + items [1].events &= ~ZMQ_POLLOUT; + items [0].events |= ZMQ_POLLIN; + m_next_request_method = &queue::get_request; + } + } + + void get_response() + { + if ( items [1].revents & ZMQ_POLLIN ) { + int rc = xreq.recv (&response_msg, ZMQ_NOBLOCK); + if (!rc) + return; + items [1].events &= ~ZMQ_POLLIN; + items [0].events |= ZMQ_POLLOUT; + m_next_response_method = &queue::send_response; + } + } + + void send_response() + { + if (items [0].revents & ZMQ_POLLOUT) { + int rc = xrep.send (response_msg, ZMQ_NOBLOCK); + if (!rc) + return; + items [0].events &= ~ZMQ_POLLOUT; + items [1].events |= ZMQ_POLLIN; + m_next_response_method = &queue::get_response; + } + } + + zmq::socket_t & xrep; + zmq::socket_t & xreq; + zmq_pollitem_t items [2]; + zmq::message_t request_msg; + zmq::message_t response_msg; + + typedef void (queue::*next_method) (); + + next_method m_next_request_method; + next_method m_next_response_method; + + queue (queue const &); + void operator = (queue const &); +}; + int main (int argc, char *argv []) { if (argc != 2) { @@ -112,11 +219,8 @@ int main (int argc, char *argv []) n++; } - zmq::message_t msg; - while (true) { - in_socket.recv (&msg); - out_socket.send (msg); - } + queue q(in_socket, out_socket); + q.run(); return 0; } -- cgit v1.2.3