diff options
-rw-r--r-- | devices/zmq_forwarder/zmq_forwarder.cpp | 6 | ||||
-rw-r--r-- | devices/zmq_queue/zmq_queue.cpp | 110 | ||||
-rw-r--r-- | devices/zmq_streamer/zmq_streamer.cpp | 6 | ||||
-rw-r--r-- | include/zmq.h | 10 | ||||
-rw-r--r-- | include/zmq.hpp | 7 | ||||
-rw-r--r-- | src/Makefile.am | 6 | ||||
-rw-r--r-- | src/forwarder.cpp | 36 | ||||
-rw-r--r-- | src/forwarder.hpp | 31 | ||||
-rw-r--r-- | src/queue.cpp | 98 | ||||
-rw-r--r-- | src/queue.hpp | 31 | ||||
-rw-r--r-- | src/streamer.cpp | 36 | ||||
-rw-r--r-- | src/streamer.hpp | 31 | ||||
-rw-r--r-- | src/zmq.cpp | 20 |
13 files changed, 309 insertions, 119 deletions
diff --git a/devices/zmq_forwarder/zmq_forwarder.cpp b/devices/zmq_forwarder/zmq_forwarder.cpp index 092bc47..496c3f6 100644 --- a/devices/zmq_forwarder/zmq_forwarder.cpp +++ b/devices/zmq_forwarder/zmq_forwarder.cpp @@ -113,11 +113,7 @@ int main (int argc, char *argv []) n++; } - zmq::message_t msg; - while (true) { - in_socket.recv (&msg); - out_socket.send (msg); - } + zmq::device (ZMQ_FORWARDER, in_socket, out_socket); return 0; } diff --git a/devices/zmq_queue/zmq_queue.cpp b/devices/zmq_queue/zmq_queue.cpp index 5eae750..b488029 100644 --- a/devices/zmq_queue/zmq_queue.cpp +++ b/devices/zmq_queue/zmq_queue.cpp @@ -20,113 +20,6 @@ #include "../../include/zmq.hpp" #include "../../foreign/xmlParser/xmlParser.cpp" -class queue -{ -public: - - queue (zmq::socket_t& reply, zmq::socket_t& request) : - xrep (reply), - xreq (request) - { - items [0].socket = reply; - items [0].fd = 0; - items [0].events = ZMQ_POLLIN; - items [0].revents = 0; - - items [1].socket = request; - items [1].fd = 0; - items [1].events = ZMQ_POLLIN; - items [1].revents = 0; - - m_next_request_method = &queue::get_request; - m_next_response_method = &queue::get_response; - } - - void run() - { - while (true) { - int rc = zmq::poll (&items [0], 2, -1); - if (rc < 0) - break; - next_request(); - next_response(); - } - } - -private: - - void next_request() - { - (this->*m_next_request_method) (); - } - - void next_response() - { - (this->*m_next_response_method) (); - } - - void get_request() - { - if (items [0].revents & ZMQ_POLLIN ) { - int rc = xrep.recv (&request_msg, ZMQ_NOBLOCK); - if (!rc) - return; - items [0].events &= ~ZMQ_POLLIN; - items [1].events |= ZMQ_POLLOUT; - m_next_request_method = &queue::send_request; - } - } - - void send_request() - { - if (items [1].revents & ZMQ_POLLOUT) { - int rc = xreq.send (request_msg, ZMQ_NOBLOCK); - if (!rc) return; - items [1].events &= ~ZMQ_POLLOUT; - items [0].events |= ZMQ_POLLIN; - m_next_request_method = &queue::get_request; - } - } - - void get_response() - { - if ( items [1].revents & ZMQ_POLLIN ) { - int rc = xreq.recv (&response_msg, ZMQ_NOBLOCK); - if (!rc) - return; - items [1].events &= ~ZMQ_POLLIN; - items [0].events |= ZMQ_POLLOUT; - m_next_response_method = &queue::send_response; - } - } - - void send_response() - { - if (items [0].revents & ZMQ_POLLOUT) { - int rc = xrep.send (response_msg, ZMQ_NOBLOCK); - if (!rc) - return; - items [0].events &= ~ZMQ_POLLOUT; - items [1].events |= ZMQ_POLLIN; - m_next_response_method = &queue::get_response; - } - } - - zmq::socket_t & xrep; - zmq::socket_t & xreq; - zmq_pollitem_t items [2]; - zmq::message_t request_msg; - zmq::message_t response_msg; - - typedef void (queue::*next_method) (); - - next_method m_next_request_method; - next_method m_next_response_method; - - queue (queue const &); - void operator = (queue const &); -}; - int main (int argc, char *argv []) { if (argc != 2) { @@ -219,8 +112,7 @@ int main (int argc, char *argv []) n++; } - queue q(in_socket, out_socket); - q.run(); + zmq::device (ZMQ_QUEUE, in_socket, out_socket); return 0; } diff --git a/devices/zmq_streamer/zmq_streamer.cpp b/devices/zmq_streamer/zmq_streamer.cpp index 6eccedf..7f4e8a5 100644 --- a/devices/zmq_streamer/zmq_streamer.cpp +++ b/devices/zmq_streamer/zmq_streamer.cpp @@ -112,11 +112,7 @@ int main (int argc, char *argv []) n++; } - zmq::message_t msg; - while (true) { - in_socket.recv (&msg); - out_socket.send (msg); - } + zmq::device (ZMQ_FORWARDER, in_socket, out_socket); return 0; } diff --git a/include/zmq.h b/include/zmq.h index a1fcf31..3d641ef 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -225,6 +225,16 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); ZMQ_EXPORT int zmq_errno (); //////////////////////////////////////////////////////////////////////////////// +// Devices - Experimental +//////////////////////////////////////////////////////////////////////////////// + +#define ZMQ_STREAMER 1 +#define ZMQ_FORWARDER 2 +#define ZMQ_QUEUE 3 + +ZMQ_EXPORT int zmq_device (int device, void * insocket, void* outsocket); + +//////////////////////////////////////////////////////////////////////////////// // Helper functions. //////////////////////////////////////////////////////////////////////////////// diff --git a/include/zmq.hpp b/include/zmq.hpp index 6228133..63d2835 100644 --- a/include/zmq.hpp +++ b/include/zmq.hpp @@ -56,6 +56,13 @@ namespace zmq return rc; } + inline void device (int device_, void * insocket_, void* outsocket_) + { + int rc = zmq_device (device_, insocket_, outsocket_); + if (rc != 0) + throw error_t (); + } + class message_t : private zmq_msg_t { friend class socket_t; diff --git a/src/Makefile.am b/src/Makefile.am index 26106d8..4e695f6 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -65,6 +65,7 @@ libzmq_la_SOURCES = app_thread.hpp \ err.hpp \ fd.hpp \ fd_signaler.hpp \ + forwarder.hpp \ fq.hpp \ i_inout.hpp \ io_object.hpp \ @@ -92,6 +93,7 @@ libzmq_la_SOURCES = app_thread.hpp \ p2p.hpp \ prefix_tree.hpp \ pub.hpp \ + queue.hpp \ rep.hpp \ req.hpp \ select.hpp \ @@ -99,6 +101,7 @@ libzmq_la_SOURCES = app_thread.hpp \ simple_semaphore.hpp \ socket_base.hpp \ stdint.hpp \ + streamer.hpp \ sub.hpp \ tcp_connecter.hpp \ tcp_listener.hpp \ @@ -129,6 +132,7 @@ libzmq_la_SOURCES = app_thread.hpp \ epoll.cpp \ err.cpp \ fd_signaler.cpp \ + forwarder.cpp \ fq.cpp \ io_object.cpp \ io_thread.cpp \ @@ -146,11 +150,13 @@ libzmq_la_SOURCES = app_thread.hpp \ pipe.cpp \ poll.cpp \ pub.cpp \ + queue.cpp \ rep.cpp \ req.cpp \ select.cpp \ session.cpp \ socket_base.cpp \ + streamer.cpp \ sub.cpp \ tcp_connecter.cpp \ tcp_listener.cpp \ diff --git a/src/forwarder.cpp b/src/forwarder.cpp new file mode 100644 index 0000000..07288e2 --- /dev/null +++ b/src/forwarder.cpp @@ -0,0 +1,36 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../include/zmq.h" + +#include "forwarder.hpp" +#include "socket_base.hpp" +#include "err.hpp" + +int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_) +{ + zmq_msg_t msg; + int rc = zmq_msg_init (&msg); + errno_assert (rc == 0); + + while (true) { + insocket_->recv (&msg, 0); + outsocket_->send (&msg, 0); + } +} diff --git a/src/forwarder.hpp b/src/forwarder.hpp new file mode 100644 index 0000000..68827a4 --- /dev/null +++ b/src/forwarder.hpp @@ -0,0 +1,31 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_FORWARDER_HPP_INCLUDED__ +#define __ZMQ_FORWARDER_HPP_INCLUDED__ + +namespace zmq +{ + + int forwarder (class socket_base_t *insocket_, + class socket_base_t *outsocket_); + +} + +#endif diff --git a/src/queue.cpp b/src/queue.cpp new file mode 100644 index 0000000..361df78 --- /dev/null +++ b/src/queue.cpp @@ -0,0 +1,98 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../include/zmq.h" + +#include "queue.hpp" +#include "socket_base.hpp" +#include "err.hpp" + +int zmq::queue (class socket_base_t *insocket_, + class socket_base_t *outsocket_) +{ + zmq_msg_t request_msg; + int rc = zmq_msg_init (&request_msg); + errno_assert (rc == 0); + bool has_request = false; + + zmq_msg_t response_msg; + rc = zmq_msg_init (&response_msg); + errno_assert (rc == 0); + bool has_response = false; + + zmq_pollitem_t items [2]; + items [0].socket = insocket_; + items [0].fd = 0; + items [0].events = ZMQ_POLLIN; + items [0].revents = 0; + items [1].socket = outsocket_; + items [1].fd = 0; + items [1].events = ZMQ_POLLIN; + items [1].revents = 0; + + while (true) { + rc = zmq_poll (&items [0], 2, -1); + errno_assert (rc > 0); + + // The algorithm below asumes ratio of request and replies processed + // under full load to be 1:1. The alternative would be to process + // replies first, handle request only when there are no more replies. + + // Receive a new request. + if (items [0].revents & ZMQ_POLLIN) { + zmq_assert (!has_request); + rc = insocket_->recv (&request_msg, ZMQ_NOBLOCK); + errno_assert (rc == 0); + items [0].events &= ~ZMQ_POLLIN; + items [1].events |= ZMQ_POLLOUT; + has_request = true; + } + + // Send the request further. + if (items [1].revents & ZMQ_POLLOUT) { + zmq_assert (has_request); + rc = outsocket_->send (&request_msg, ZMQ_NOBLOCK); + errno_assert (rc == 0); + items [0].events |= ZMQ_POLLIN; + items [1].events &= ~ZMQ_POLLOUT; + has_request = false; + } + + // Get a new reply. + if (items [1].revents & ZMQ_POLLIN) { + zmq_assert (!has_response); + rc = outsocket_->recv (&response_msg, ZMQ_NOBLOCK); + errno_assert (rc == 0); + items [0].events |= ZMQ_POLLOUT; + items [1].events &= ~ZMQ_POLLIN; + has_response = true; + } + + // Send the reply further. + if (items [0].revents & ZMQ_POLLOUT) {\ + zmq_assert (has_response); + rc = insocket_->send (&response_msg, ZMQ_NOBLOCK); + errno_assert (rc == 0); + items [0].events &= ~ZMQ_POLLOUT; + items [1].events |= ZMQ_POLLIN; + has_response = false; + } + } +} + diff --git a/src/queue.hpp b/src/queue.hpp new file mode 100644 index 0000000..dc968cb --- /dev/null +++ b/src/queue.hpp @@ -0,0 +1,31 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_QUEUE_HPP_INCLUDED__ +#define __ZMQ_QUEUE_HPP_INCLUDED__ + +namespace zmq +{ + + int queue (class socket_base_t *insocket_, + class socket_base_t *outsocket_); + +} + +#endif diff --git a/src/streamer.cpp b/src/streamer.cpp new file mode 100644 index 0000000..de23b37 --- /dev/null +++ b/src/streamer.cpp @@ -0,0 +1,36 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../include/zmq.h" + +#include "streamer.hpp" +#include "socket_base.hpp" +#include "err.hpp" + +int zmq::streamer (socket_base_t *insocket_, socket_base_t *outsocket_) +{ + zmq_msg_t msg; + int rc = zmq_msg_init (&msg); + errno_assert (rc == 0); + + while (true) { + insocket_->recv (&msg, 0); + outsocket_->send (&msg, 0); + } +} diff --git a/src/streamer.hpp b/src/streamer.hpp new file mode 100644 index 0000000..8827cff --- /dev/null +++ b/src/streamer.hpp @@ -0,0 +1,31 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_STREAMER_HPP_INCLUDED__ +#define __ZMQ_STREAMER_HPP_INCLUDED__ + +namespace zmq +{ + + int streamer (class socket_base_t *insocket_, + class socket_base_t *outsocket_); + +} + +#endif diff --git a/src/zmq.cpp b/src/zmq.cpp index 14898d5..3eb1306 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -24,6 +24,9 @@ #include <stdlib.h> #include <new> +#include "forwarder.hpp" +#include "queue.hpp" +#include "streamer.hpp" #include "socket_base.hpp" #include "app_thread.hpp" #include "dispatcher.hpp" @@ -628,6 +631,23 @@ int zmq_errno () return errno; } +int zmq_device (int device_, void *insocket_, void *outsocket_) +{ + switch (device_) { + case ZMQ_FORWARDER: + return zmq::forwarder ((zmq::socket_base_t*) insocket_, + (zmq::socket_base_t*) outsocket_); + case ZMQ_QUEUE: + return zmq::queue ((zmq::socket_base_t*) insocket_, + (zmq::socket_base_t*) outsocket_); + case ZMQ_STREAMER: + return zmq::streamer ((zmq::socket_base_t*) insocket_, + (zmq::socket_base_t*) outsocket_); + default: + return EINVAL; + } +} + #if defined ZMQ_HAVE_WINDOWS static uint64_t now () |