diff options
Diffstat (limited to 'devices/zmq_queue')
-rw-r--r-- | devices/zmq_queue/zmq_queue.cpp | 110 |
1 files changed, 1 insertions, 109 deletions
diff --git a/devices/zmq_queue/zmq_queue.cpp b/devices/zmq_queue/zmq_queue.cpp index 5eae750..b488029 100644 --- a/devices/zmq_queue/zmq_queue.cpp +++ b/devices/zmq_queue/zmq_queue.cpp @@ -20,113 +20,6 @@ #include "../../include/zmq.hpp" #include "../../foreign/xmlParser/xmlParser.cpp" -class queue -{ -public: - - queue (zmq::socket_t& reply, zmq::socket_t& request) : - xrep (reply), - xreq (request) - { - items [0].socket = reply; - items [0].fd = 0; - items [0].events = ZMQ_POLLIN; - items [0].revents = 0; - - items [1].socket = request; - items [1].fd = 0; - items [1].events = ZMQ_POLLIN; - items [1].revents = 0; - - m_next_request_method = &queue::get_request; - m_next_response_method = &queue::get_response; - } - - void run() - { - while (true) { - int rc = zmq::poll (&items [0], 2, -1); - if (rc < 0) - break; - next_request(); - next_response(); - } - } - -private: - - void next_request() - { - (this->*m_next_request_method) (); - } - - void next_response() - { - (this->*m_next_response_method) (); - } - - void get_request() - { - if (items [0].revents & ZMQ_POLLIN ) { - int rc = xrep.recv (&request_msg, ZMQ_NOBLOCK); - if (!rc) - return; - items [0].events &= ~ZMQ_POLLIN; - items [1].events |= ZMQ_POLLOUT; - m_next_request_method = &queue::send_request; - } - } - - void send_request() - { - if (items [1].revents & ZMQ_POLLOUT) { - int rc = xreq.send (request_msg, ZMQ_NOBLOCK); - if (!rc) return; - items [1].events &= ~ZMQ_POLLOUT; - items [0].events |= ZMQ_POLLIN; - m_next_request_method = &queue::get_request; - } - } - - void get_response() - { - if ( items [1].revents & ZMQ_POLLIN ) { - int rc = xreq.recv (&response_msg, ZMQ_NOBLOCK); - if (!rc) - return; - items [1].events &= ~ZMQ_POLLIN; - items [0].events |= ZMQ_POLLOUT; - m_next_response_method = &queue::send_response; - } - } - - void send_response() - { - if (items [0].revents & ZMQ_POLLOUT) { - int rc = xrep.send (response_msg, ZMQ_NOBLOCK); - if (!rc) - return; - items [0].events &= ~ZMQ_POLLOUT; - items [1].events |= ZMQ_POLLIN; - m_next_response_method = &queue::get_response; - } - } - - zmq::socket_t & xrep; - zmq::socket_t & xreq; - zmq_pollitem_t items [2]; - zmq::message_t request_msg; - zmq::message_t response_msg; - - typedef void (queue::*next_method) (); - - next_method m_next_request_method; - next_method m_next_response_method; - - queue (queue const &); - void operator = (queue const &); -}; - int main (int argc, char *argv []) { if (argc != 2) { @@ -219,8 +112,7 @@ int main (int argc, char *argv []) n++; } - queue q(in_socket, out_socket); - q.run(); + zmq::device (ZMQ_QUEUE, in_socket, out_socket); return 0; } |