diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 6 | ||||
-rw-r--r-- | src/forwarder.cpp | 36 | ||||
-rw-r--r-- | src/forwarder.hpp | 31 | ||||
-rw-r--r-- | src/queue.cpp | 98 | ||||
-rw-r--r-- | src/queue.hpp | 31 | ||||
-rw-r--r-- | src/streamer.cpp | 36 | ||||
-rw-r--r-- | src/streamer.hpp | 31 | ||||
-rw-r--r-- | src/zmq.cpp | 20 |
8 files changed, 289 insertions, 0 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 26106d8..4e695f6 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -65,6 +65,7 @@ libzmq_la_SOURCES = app_thread.hpp \ err.hpp \ fd.hpp \ fd_signaler.hpp \ + forwarder.hpp \ fq.hpp \ i_inout.hpp \ io_object.hpp \ @@ -92,6 +93,7 @@ libzmq_la_SOURCES = app_thread.hpp \ p2p.hpp \ prefix_tree.hpp \ pub.hpp \ + queue.hpp \ rep.hpp \ req.hpp \ select.hpp \ @@ -99,6 +101,7 @@ libzmq_la_SOURCES = app_thread.hpp \ simple_semaphore.hpp \ socket_base.hpp \ stdint.hpp \ + streamer.hpp \ sub.hpp \ tcp_connecter.hpp \ tcp_listener.hpp \ @@ -129,6 +132,7 @@ libzmq_la_SOURCES = app_thread.hpp \ epoll.cpp \ err.cpp \ fd_signaler.cpp \ + forwarder.cpp \ fq.cpp \ io_object.cpp \ io_thread.cpp \ @@ -146,11 +150,13 @@ libzmq_la_SOURCES = app_thread.hpp \ pipe.cpp \ poll.cpp \ pub.cpp \ + queue.cpp \ rep.cpp \ req.cpp \ select.cpp \ session.cpp \ socket_base.cpp \ + streamer.cpp \ sub.cpp \ tcp_connecter.cpp \ tcp_listener.cpp \ diff --git a/src/forwarder.cpp b/src/forwarder.cpp new file mode 100644 index 0000000..07288e2 --- /dev/null +++ b/src/forwarder.cpp @@ -0,0 +1,36 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../include/zmq.h" + +#include "forwarder.hpp" +#include "socket_base.hpp" +#include "err.hpp" + +int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_) +{ + zmq_msg_t msg; + int rc = zmq_msg_init (&msg); + errno_assert (rc == 0); + + while (true) { + insocket_->recv (&msg, 0); + outsocket_->send (&msg, 0); + } +} diff --git a/src/forwarder.hpp b/src/forwarder.hpp new file mode 100644 index 0000000..68827a4 --- /dev/null +++ b/src/forwarder.hpp @@ -0,0 +1,31 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_FORWARDER_HPP_INCLUDED__ +#define __ZMQ_FORWARDER_HPP_INCLUDED__ + +namespace zmq +{ + + int forwarder (class socket_base_t *insocket_, + class socket_base_t *outsocket_); + +} + +#endif diff --git a/src/queue.cpp b/src/queue.cpp new file mode 100644 index 0000000..361df78 --- /dev/null +++ b/src/queue.cpp @@ -0,0 +1,98 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../include/zmq.h" + +#include "queue.hpp" +#include "socket_base.hpp" +#include "err.hpp" + +int zmq::queue (class socket_base_t *insocket_, + class socket_base_t *outsocket_) +{ + zmq_msg_t request_msg; + int rc = zmq_msg_init (&request_msg); + errno_assert (rc == 0); + bool has_request = false; + + zmq_msg_t response_msg; + rc = zmq_msg_init (&response_msg); + errno_assert (rc == 0); + bool has_response = false; + + zmq_pollitem_t items [2]; + items [0].socket = insocket_; + items [0].fd = 0; + items [0].events = ZMQ_POLLIN; + items [0].revents = 0; + items [1].socket = outsocket_; + items [1].fd = 0; + items [1].events = ZMQ_POLLIN; + items [1].revents = 0; + + while (true) { + rc = zmq_poll (&items [0], 2, -1); + errno_assert (rc > 0); + + // The algorithm below asumes ratio of request and replies processed + // under full load to be 1:1. The alternative would be to process + // replies first, handle request only when there are no more replies. + + // Receive a new request. + if (items [0].revents & ZMQ_POLLIN) { + zmq_assert (!has_request); + rc = insocket_->recv (&request_msg, ZMQ_NOBLOCK); + errno_assert (rc == 0); + items [0].events &= ~ZMQ_POLLIN; + items [1].events |= ZMQ_POLLOUT; + has_request = true; + } + + // Send the request further. + if (items [1].revents & ZMQ_POLLOUT) { + zmq_assert (has_request); + rc = outsocket_->send (&request_msg, ZMQ_NOBLOCK); + errno_assert (rc == 0); + items [0].events |= ZMQ_POLLIN; + items [1].events &= ~ZMQ_POLLOUT; + has_request = false; + } + + // Get a new reply. + if (items [1].revents & ZMQ_POLLIN) { + zmq_assert (!has_response); + rc = outsocket_->recv (&response_msg, ZMQ_NOBLOCK); + errno_assert (rc == 0); + items [0].events |= ZMQ_POLLOUT; + items [1].events &= ~ZMQ_POLLIN; + has_response = true; + } + + // Send the reply further. + if (items [0].revents & ZMQ_POLLOUT) {\ + zmq_assert (has_response); + rc = insocket_->send (&response_msg, ZMQ_NOBLOCK); + errno_assert (rc == 0); + items [0].events &= ~ZMQ_POLLOUT; + items [1].events |= ZMQ_POLLIN; + has_response = false; + } + } +} + diff --git a/src/queue.hpp b/src/queue.hpp new file mode 100644 index 0000000..dc968cb --- /dev/null +++ b/src/queue.hpp @@ -0,0 +1,31 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_QUEUE_HPP_INCLUDED__ +#define __ZMQ_QUEUE_HPP_INCLUDED__ + +namespace zmq +{ + + int queue (class socket_base_t *insocket_, + class socket_base_t *outsocket_); + +} + +#endif diff --git a/src/streamer.cpp b/src/streamer.cpp new file mode 100644 index 0000000..de23b37 --- /dev/null +++ b/src/streamer.cpp @@ -0,0 +1,36 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "../include/zmq.h" + +#include "streamer.hpp" +#include "socket_base.hpp" +#include "err.hpp" + +int zmq::streamer (socket_base_t *insocket_, socket_base_t *outsocket_) +{ + zmq_msg_t msg; + int rc = zmq_msg_init (&msg); + errno_assert (rc == 0); + + while (true) { + insocket_->recv (&msg, 0); + outsocket_->send (&msg, 0); + } +} diff --git a/src/streamer.hpp b/src/streamer.hpp new file mode 100644 index 0000000..8827cff --- /dev/null +++ b/src/streamer.hpp @@ -0,0 +1,31 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_STREAMER_HPP_INCLUDED__ +#define __ZMQ_STREAMER_HPP_INCLUDED__ + +namespace zmq +{ + + int streamer (class socket_base_t *insocket_, + class socket_base_t *outsocket_); + +} + +#endif diff --git a/src/zmq.cpp b/src/zmq.cpp index 14898d5..3eb1306 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -24,6 +24,9 @@ #include <stdlib.h> #include <new> +#include "forwarder.hpp" +#include "queue.hpp" +#include "streamer.hpp" #include "socket_base.hpp" #include "app_thread.hpp" #include "dispatcher.hpp" @@ -628,6 +631,23 @@ int zmq_errno () return errno; } +int zmq_device (int device_, void *insocket_, void *outsocket_) +{ + switch (device_) { + case ZMQ_FORWARDER: + return zmq::forwarder ((zmq::socket_base_t*) insocket_, + (zmq::socket_base_t*) outsocket_); + case ZMQ_QUEUE: + return zmq::queue ((zmq::socket_base_t*) insocket_, + (zmq::socket_base_t*) outsocket_); + case ZMQ_STREAMER: + return zmq::streamer ((zmq::socket_base_t*) insocket_, + (zmq::socket_base_t*) outsocket_); + default: + return EINVAL; + } +} + #if defined ZMQ_HAVE_WINDOWS static uint64_t now () |