diff options
Diffstat (limited to 'devices/zmq_queue')
| -rw-r--r-- | devices/zmq_queue/zmq_queue.cpp | 114 | 
1 files changed, 109 insertions, 5 deletions
diff --git a/devices/zmq_queue/zmq_queue.cpp b/devices/zmq_queue/zmq_queue.cpp index 3bcce97..5eae750 100644 --- a/devices/zmq_queue/zmq_queue.cpp +++ b/devices/zmq_queue/zmq_queue.cpp @@ -20,6 +20,113 @@  #include "../../include/zmq.hpp"  #include "../../foreign/xmlParser/xmlParser.cpp" +class queue +{ +public: + +    queue (zmq::socket_t& reply, zmq::socket_t& request) : +        xrep (reply), +        xreq (request) +    { +        items [0].socket = reply; +        items [0].fd = 0; +        items [0].events = ZMQ_POLLIN; +        items [0].revents = 0; + +        items [1].socket = request; +        items [1].fd = 0; +        items [1].events = ZMQ_POLLIN; +        items [1].revents = 0; + +        m_next_request_method = &queue::get_request; +        m_next_response_method = &queue::get_response; +    } + +    void run() +    { +        while (true) { +            int rc = zmq::poll (&items [0], 2, -1); +            if (rc < 0) +                break; +            next_request(); +            next_response(); +        } +    } + +private: + +    void next_request() +    { +        (this->*m_next_request_method) (); +    } + +    void next_response() +    { +        (this->*m_next_response_method) (); +    } + +    void get_request() +    { +        if (items [0].revents & ZMQ_POLLIN ) { +            int rc = xrep.recv (&request_msg, ZMQ_NOBLOCK); +            if (!rc) +                return; +            items [0].events &= ~ZMQ_POLLIN; +            items [1].events |= ZMQ_POLLOUT; +            m_next_request_method = &queue::send_request; +        } +    } + +    void send_request() +    { +        if (items [1].revents & ZMQ_POLLOUT) { +        int rc = xreq.send (request_msg, ZMQ_NOBLOCK); +        if (!rc) return; +        items [1].events &= ~ZMQ_POLLOUT; +        items [0].events |= ZMQ_POLLIN; +        m_next_request_method = &queue::get_request; +        } +    } + +    void get_response() +    { +        if ( items [1].revents & ZMQ_POLLIN ) { +            int rc = xreq.recv (&response_msg, ZMQ_NOBLOCK); +            if (!rc) +                return; +            items [1].events &= ~ZMQ_POLLIN; +            items [0].events |= ZMQ_POLLOUT; +            m_next_response_method = &queue::send_response; +        } +    } + +    void send_response() +    { +        if (items [0].revents & ZMQ_POLLOUT) { +            int rc = xrep.send (response_msg, ZMQ_NOBLOCK); +            if (!rc) +                return; +            items [0].events &= ~ZMQ_POLLOUT; +            items [1].events |= ZMQ_POLLIN; +            m_next_response_method = &queue::get_response; +        } +    } + +    zmq::socket_t & xrep; +    zmq::socket_t & xreq; +    zmq_pollitem_t items [2]; +    zmq::message_t request_msg; +    zmq::message_t response_msg; + +    typedef void (queue::*next_method) (); + +    next_method m_next_request_method; +    next_method m_next_response_method; + +    queue (queue const &); +    void operator = (queue const &); +}; +  int main (int argc, char *argv [])  {      if (argc != 2) { @@ -112,11 +219,8 @@ int main (int argc, char *argv [])          n++;      } -    zmq::message_t msg; -    while (true) { -        in_socket.recv (&msg); -        out_socket.send (msg); -    } +    queue q(in_socket, out_socket); +    q.run();      return 0;  }  | 
