From edfd05df8ef58afc498795cb74906c07ee396f76 Mon Sep 17 00:00:00 2001 From: Jon Dyte Date: Wed, 7 Apr 2010 08:20:01 +0200 Subject: devices can be created via API --- devices/zmq_forwarder/zmq_forwarder.cpp | 6 +- devices/zmq_queue/zmq_queue.cpp | 110 +------------------------------- devices/zmq_streamer/zmq_streamer.cpp | 6 +- 3 files changed, 3 insertions(+), 119 deletions(-) (limited to 'devices') diff --git a/devices/zmq_forwarder/zmq_forwarder.cpp b/devices/zmq_forwarder/zmq_forwarder.cpp index 092bc47..496c3f6 100644 --- a/devices/zmq_forwarder/zmq_forwarder.cpp +++ b/devices/zmq_forwarder/zmq_forwarder.cpp @@ -113,11 +113,7 @@ int main (int argc, char *argv []) n++; } - zmq::message_t msg; - while (true) { - in_socket.recv (&msg); - out_socket.send (msg); - } + zmq::device (ZMQ_FORWARDER, in_socket, out_socket); return 0; } diff --git a/devices/zmq_queue/zmq_queue.cpp b/devices/zmq_queue/zmq_queue.cpp index 5eae750..b488029 100644 --- a/devices/zmq_queue/zmq_queue.cpp +++ b/devices/zmq_queue/zmq_queue.cpp @@ -20,113 +20,6 @@ #include "../../include/zmq.hpp" #include "../../foreign/xmlParser/xmlParser.cpp" -class queue -{ -public: - - queue (zmq::socket_t& reply, zmq::socket_t& request) : - xrep (reply), - xreq (request) - { - items [0].socket = reply; - items [0].fd = 0; - items [0].events = ZMQ_POLLIN; - items [0].revents = 0; - - items [1].socket = request; - items [1].fd = 0; - items [1].events = ZMQ_POLLIN; - items [1].revents = 0; - - m_next_request_method = &queue::get_request; - m_next_response_method = &queue::get_response; - } - - void run() - { - while (true) { - int rc = zmq::poll (&items [0], 2, -1); - if (rc < 0) - break; - next_request(); - next_response(); - } - } - -private: - - void next_request() - { - (this->*m_next_request_method) (); - } - - void next_response() - { - (this->*m_next_response_method) (); - } - - void get_request() - { - if (items [0].revents & ZMQ_POLLIN ) { - int rc = xrep.recv (&request_msg, ZMQ_NOBLOCK); - if (!rc) - return; - items [0].events &= ~ZMQ_POLLIN; - items [1].events |= ZMQ_POLLOUT; - m_next_request_method = &queue::send_request; - } - } - - void send_request() - { - if (items [1].revents & ZMQ_POLLOUT) { - int rc = xreq.send (request_msg, ZMQ_NOBLOCK); - if (!rc) return; - items [1].events &= ~ZMQ_POLLOUT; - items [0].events |= ZMQ_POLLIN; - m_next_request_method = &queue::get_request; - } - } - - void get_response() - { - if ( items [1].revents & ZMQ_POLLIN ) { - int rc = xreq.recv (&response_msg, ZMQ_NOBLOCK); - if (!rc) - return; - items [1].events &= ~ZMQ_POLLIN; - items [0].events |= ZMQ_POLLOUT; - m_next_response_method = &queue::send_response; - } - } - - void send_response() - { - if (items [0].revents & ZMQ_POLLOUT) { - int rc = xrep.send (response_msg, ZMQ_NOBLOCK); - if (!rc) - return; - items [0].events &= ~ZMQ_POLLOUT; - items [1].events |= ZMQ_POLLIN; - m_next_response_method = &queue::get_response; - } - } - - zmq::socket_t & xrep; - zmq::socket_t & xreq; - zmq_pollitem_t items [2]; - zmq::message_t request_msg; - zmq::message_t response_msg; - - typedef void (queue::*next_method) (); - - next_method m_next_request_method; - next_method m_next_response_method; - - queue (queue const &); - void operator = (queue const &); -}; - int main (int argc, char *argv []) { if (argc != 2) { @@ -219,8 +112,7 @@ int main (int argc, char *argv []) n++; } - queue q(in_socket, out_socket); - q.run(); + zmq::device (ZMQ_QUEUE, in_socket, out_socket); return 0; } diff --git a/devices/zmq_streamer/zmq_streamer.cpp b/devices/zmq_streamer/zmq_streamer.cpp index 6eccedf..7f4e8a5 100644 --- a/devices/zmq_streamer/zmq_streamer.cpp +++ b/devices/zmq_streamer/zmq_streamer.cpp @@ -112,11 +112,7 @@ int main (int argc, char *argv []) n++; } - zmq::message_t msg; - while (true) { - in_socket.recv (&msg); - out_socket.send (msg); - } + zmq::device (ZMQ_FORWARDER, in_socket, out_socket); return 0; } -- cgit v1.2.3