From 8d6cafe06696e17afff03adf4b33bd504b55e277 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Mon, 6 Dec 2010 22:57:29 +0100 Subject: All devices conflated into a single implementation. Signed-off-by: Martin Sustrik --- src/Makefile.am | 8 +--- src/device.cpp | 130 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/device.hpp | 31 +++++++++++++ src/forwarder.cpp | 60 ------------------------- src/forwarder.hpp | 31 ------------- src/queue.cpp | 130 ------------------------------------------------------ src/queue.hpp | 31 ------------- src/streamer.cpp | 60 ------------------------- src/streamer.hpp | 31 ------------- src/zmq.cpp | 24 ++++------ 10 files changed, 172 insertions(+), 364 deletions(-) create mode 100644 src/device.cpp create mode 100644 src/device.hpp delete mode 100644 src/forwarder.cpp delete mode 100644 src/forwarder.hpp delete mode 100644 src/queue.cpp delete mode 100644 src/queue.hpp delete mode 100644 src/streamer.cpp delete mode 100644 src/streamer.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index eea7d35..0b00f1c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -71,12 +71,12 @@ libzmq_la_SOURCES = \ connect_session.hpp \ ctx.hpp \ decoder.hpp \ + device.hpp \ devpoll.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ fd.hpp \ - forwarder.hpp \ fq.hpp \ i_inout.hpp \ io_object.hpp \ @@ -106,7 +106,6 @@ libzmq_la_SOURCES = \ pub.hpp \ pull.hpp \ push.hpp \ - queue.hpp \ rep.hpp \ req.hpp \ select.hpp \ @@ -114,7 +113,6 @@ libzmq_la_SOURCES = \ session.hpp \ socket_base.hpp \ stdint.hpp \ - streamer.hpp \ sub.hpp \ swap.hpp \ tcp_connecter.hpp \ @@ -141,11 +139,11 @@ libzmq_la_SOURCES = \ ctx.cpp \ connect_session.cpp \ decoder.cpp \ + device.cpp \ devpoll.cpp \ encoder.cpp \ epoll.cpp \ err.cpp \ - forwarder.cpp \ fq.cpp \ io_object.cpp \ io_thread.cpp \ @@ -167,13 +165,11 @@ libzmq_la_SOURCES = \ pull.cpp \ push.cpp \ pub.cpp \ - queue.cpp \ rep.cpp \ req.cpp \ select.cpp \ session.cpp \ socket_base.cpp \ - streamer.cpp \ sub.cpp \ swap.cpp \ tcp_connecter.cpp \ diff --git a/src/device.cpp b/src/device.cpp new file mode 100644 index 0000000..e7c6090 --- /dev/null +++ b/src/device.cpp @@ -0,0 +1,130 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include + +#include "../include/zmq.h" + +#include "device.hpp" +#include "socket_base.hpp" +#include "likely.hpp" +#include "err.hpp" + +int zmq::device (class socket_base_t *insocket_, + class socket_base_t *outsocket_) +{ + zmq_msg_t msg; + int rc = zmq_msg_init (&msg); + zmq_assert (rc == 0); + + int64_t more; + size_t moresz; + + zmq_pollitem_t items [2]; + items [0].socket = insocket_; + items [0].fd = 0; + items [0].events = ZMQ_POLLIN; + items [0].revents = 0; + items [1].socket = outsocket_; + items [1].fd = 0; + items [1].events = ZMQ_POLLIN; + items [1].revents = 0; + + while (true) { + + // Wait while there are either requests or replies to process. + rc = zmq_poll (&items [0], 2, -1); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + // The algorithm below asumes ratio of request and replies processed + // under full load to be 1:1. Although processing requests replies + // first is tempting it is suspectible to DoS attacks (overloading + // the system with unsolicited replies). + + // Process a request. + if (items [0].revents & ZMQ_POLLIN) { + while (true) { + + rc = insocket_->recv (&msg, 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + moresz = sizeof (more); + rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + if (!more) + break; + } + } + + // Process a reply. + if (items [1].revents & ZMQ_POLLIN) { + while (true) { + + rc = outsocket_->recv (&msg, 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + moresz = sizeof (more); + rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + rc = insocket_->send (&msg, more ? ZMQ_SNDMORE : 0); + if (unlikely (rc < 0)) { + if (errno == ETERM) + return -1; + errno_assert (false); + } + + if (!more) + break; + } + } + + } + + return 0; +} + diff --git a/src/device.hpp b/src/device.hpp new file mode 100644 index 0000000..eab92c6 --- /dev/null +++ b/src/device.hpp @@ -0,0 +1,31 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_DEVICE_HPP_INCLUDED__ +#define __ZMQ_DEVICE_HPP_INCLUDED__ + +namespace zmq +{ + + int device (class socket_base_t *insocket_, + class socket_base_t *outsocket_); + +} + +#endif diff --git a/src/forwarder.cpp b/src/forwarder.cpp deleted file mode 100644 index b00af1d..0000000 --- a/src/forwarder.cpp +++ /dev/null @@ -1,60 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "forwarder.hpp" -#include "socket_base.hpp" -#include "likely.hpp" -#include "err.hpp" - -int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_) -{ - zmq_msg_t msg; - int rc = zmq_msg_init (&msg); - errno_assert (rc == 0); - - int64_t more; - size_t more_sz = sizeof (more); - - while (true) { - rc = insocket_->recv (&msg, 0); - if (unlikely (rc < 0)) { - if (errno == ETERM) - return -1; - errno_assert (false); - } - - rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz); - if (unlikely (rc < 0)) { - if (errno == ETERM) - return -1; - errno_assert (false); - } - - rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); - if (unlikely (rc < 0)) { - if (errno == ETERM) - return -1; - errno_assert (false); - } - } - - return 0; -} diff --git a/src/forwarder.hpp b/src/forwarder.hpp deleted file mode 100644 index 3ac69c2..0000000 --- a/src/forwarder.hpp +++ /dev/null @@ -1,31 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_FORWARDER_HPP_INCLUDED__ -#define __ZMQ_FORWARDER_HPP_INCLUDED__ - -namespace zmq -{ - - int forwarder (class socket_base_t *insocket_, - class socket_base_t *outsocket_); - -} - -#endif diff --git a/src/queue.cpp b/src/queue.cpp deleted file mode 100644 index 7c731d8..0000000 --- a/src/queue.cpp +++ /dev/null @@ -1,130 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include - -#include "../include/zmq.h" - -#include "queue.hpp" -#include "socket_base.hpp" -#include "likely.hpp" -#include "err.hpp" - -int zmq::queue (class socket_base_t *insocket_, - class socket_base_t *outsocket_) -{ - zmq_msg_t msg; - int rc = zmq_msg_init (&msg); - zmq_assert (rc == 0); - - int64_t more; - size_t moresz; - - zmq_pollitem_t items [2]; - items [0].socket = insocket_; - items [0].fd = 0; - items [0].events = ZMQ_POLLIN; - items [0].revents = 0; - items [1].socket = outsocket_; - items [1].fd = 0; - items [1].events = ZMQ_POLLIN; - items [1].revents = 0; - - while (true) { - - // Wait while there are either requests or replies to process. - rc = zmq_poll (&items [0], 2, -1); - if (unlikely (rc < 0)) { - if (errno == ETERM) - return -1; - errno_assert (false); - } - - // The algorithm below asumes ratio of request and replies processed - // under full load to be 1:1. Although processing requests replies - // first is tempting it is suspectible to DoS attacks (overloading - // the system with unsolicited replies). - - // Process a request. - if (items [0].revents & ZMQ_POLLIN) { - while (true) { - - rc = insocket_->recv (&msg, 0); - if (unlikely (rc < 0)) { - if (errno == ETERM) - return -1; - errno_assert (false); - } - - moresz = sizeof (more); - rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); - if (unlikely (rc < 0)) { - if (errno == ETERM) - return -1; - errno_assert (false); - } - - rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); - if (unlikely (rc < 0)) { - if (errno == ETERM) - return -1; - errno_assert (false); - } - - if (!more) - break; - } - } - - // Process a reply. - if (items [1].revents & ZMQ_POLLIN) { - while (true) { - - rc = outsocket_->recv (&msg, 0); - if (unlikely (rc < 0)) { - if (errno == ETERM) - return -1; - errno_assert (false); - } - - moresz = sizeof (more); - rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz); - if (unlikely (rc < 0)) { - if (errno == ETERM) - return -1; - errno_assert (false); - } - - rc = insocket_->send (&msg, more ? ZMQ_SNDMORE : 0); - if (unlikely (rc < 0)) { - if (errno == ETERM) - return -1; - errno_assert (false); - } - - if (!more) - break; - } - } - - } - - return 0; -} - diff --git a/src/queue.hpp b/src/queue.hpp deleted file mode 100644 index 998b2e1..0000000 --- a/src/queue.hpp +++ /dev/null @@ -1,31 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_QUEUE_HPP_INCLUDED__ -#define __ZMQ_QUEUE_HPP_INCLUDED__ - -namespace zmq -{ - - int queue (class socket_base_t *insocket_, - class socket_base_t *outsocket_); - -} - -#endif diff --git a/src/streamer.cpp b/src/streamer.cpp deleted file mode 100644 index 264fc6b..0000000 --- a/src/streamer.cpp +++ /dev/null @@ -1,60 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "streamer.hpp" -#include "socket_base.hpp" -#include "likely.hpp" -#include "err.hpp" - -int zmq::streamer (socket_base_t *insocket_, socket_base_t *outsocket_) -{ - zmq_msg_t msg; - int rc = zmq_msg_init (&msg); - errno_assert (rc == 0); - - int64_t more; - size_t more_sz = sizeof (more); - - while (true) { - rc = insocket_->recv (&msg, 0); - if (unlikely (rc < 0)) { - if (errno == ETERM) - return -1; - errno_assert (false); - } - - rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz); - if (unlikely (rc < 0)) { - if (errno == ETERM) - return -1; - errno_assert (false); - } - - rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0); - if (unlikely (rc < 0)) { - if (errno == ETERM) - return -1; - errno_assert (false); - } - } - - return 0; -} diff --git a/src/streamer.hpp b/src/streamer.hpp deleted file mode 100644 index 681f01f..0000000 --- a/src/streamer.hpp +++ /dev/null @@ -1,31 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_STREAMER_HPP_INCLUDED__ -#define __ZMQ_STREAMER_HPP_INCLUDED__ - -namespace zmq -{ - - int streamer (class socket_base_t *insocket_, - class socket_base_t *outsocket_); - -} - -#endif diff --git a/src/zmq.cpp b/src/zmq.cpp index 6a1d396..c9fae28 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -39,9 +39,7 @@ #include #include -#include "forwarder.hpp" -#include "queue.hpp" -#include "streamer.hpp" +#include "device.hpp" #include "socket_base.hpp" #include "msg_content.hpp" #include "stdint.hpp" @@ -685,19 +683,15 @@ int zmq_device (int device_, void *insocket_, void *outsocket_) errno = EFAULT; return -1; } - switch (device_) { - case ZMQ_FORWARDER: - return zmq::forwarder ((zmq::socket_base_t*) insocket_, - (zmq::socket_base_t*) outsocket_); - case ZMQ_QUEUE: - return zmq::queue ((zmq::socket_base_t*) insocket_, - (zmq::socket_base_t*) outsocket_); - case ZMQ_STREAMER: - return zmq::streamer ((zmq::socket_base_t*) insocket_, - (zmq::socket_base_t*) outsocket_); - default: - return EINVAL; + + if (device_ != ZMQ_FORWARDER && device_ != ZMQ_QUEUE && + device_ != ZMQ_STREAMER) { + errno = EINVAL; + return -1; } + + return zmq::device ((zmq::socket_base_t*) insocket_, + (zmq::socket_base_t*) outsocket_); } //////////////////////////////////////////////////////////////////////////////// -- cgit v1.2.3