From c98fd6bc3f2a49d7cb0b820a07354168c98f60b7 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Tue, 24 Nov 2009 11:23:10 +0100 Subject: ZMQII-25: Implement streamed request/reply --- src/Makefile.am | 4 ++ src/app_thread.cpp | 14 ++++-- src/downstream.cpp | 131 ++++++++++++++++++++++++++++++++++++++++++++++++ src/downstream.hpp | 64 ++++++++++++++++++++++++ src/p2p.hpp | 4 +- src/pub.hpp | 4 +- src/rep.cpp | 2 +- src/rep.hpp | 4 +- src/req.hpp | 4 +- src/sub.hpp | 4 +- src/upstream.cpp | 143 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/upstream.hpp | 69 ++++++++++++++++++++++++++ 12 files changed, 433 insertions(+), 14 deletions(-) create mode 100644 src/downstream.cpp create mode 100644 src/downstream.hpp create mode 100644 src/upstream.cpp create mode 100644 src/upstream.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index 91fb555..3d038b7 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -77,6 +77,7 @@ libzmq_la_SOURCES = app_thread.hpp \ decoder.hpp \ devpoll.hpp \ dispatcher.hpp \ + downstream.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ @@ -117,6 +118,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ + upstream.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ @@ -135,6 +137,7 @@ libzmq_la_SOURCES = app_thread.hpp \ app_thread.cpp \ devpoll.cpp \ dispatcher.cpp \ + downstream.cpp \ epoll.cpp \ err.cpp \ fd_signaler.cpp \ @@ -162,6 +165,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ + upstream.cpp \ uuid.cpp \ ypollset.cpp \ zmq.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index fbda335..a671822 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -40,11 +40,13 @@ #include "pipe.hpp" #include "config.hpp" #include "socket_base.hpp" +#include "p2p.hpp" #include "pub.hpp" #include "sub.hpp" #include "req.hpp" #include "rep.hpp" -#include "p2p.hpp" +#include "upstream.hpp" +#include "downstream.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -158,6 +160,9 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) { socket_base_t *s = NULL; switch (type_) { + case ZMQ_P2P: + s = new p2p_t (this); + break; case ZMQ_PUB: s = new pub_t (this); break; @@ -170,8 +175,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) case ZMQ_REP: s = new rep_t (this); break; - case ZMQ_P2P: - s = new p2p_t (this); + case ZMQ_UPSTREAM: + s = new upstream_t (this); + break; + case ZMQ_DOWNSTREAM: + s = new downstream_t (this); break; default: // TODO: This should be EINVAL. diff --git a/src/downstream.cpp b/src/downstream.cpp new file mode 100644 index 0000000..4f994e6 --- /dev/null +++ b/src/downstream.cpp @@ -0,0 +1,131 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../bindings/c/zmq.h" + +#include "downstream.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::downstream_t::downstream_t (class app_thread_t *parent_) : + socket_base_t (parent_), + current (0) +{ + options.requires_in = false; + options.requires_out = true; +} + +zmq::downstream_t::~downstream_t () +{ +} + +void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (!inpipe_ && outpipe_); + pipes.push_back (outpipe_); +} + +void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (pipe_); + pipes.erase (pipes.index (pipe_)); +} + +void zmq::downstream_t::xkill (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::downstream_t::xrevive (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +int zmq::downstream_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + // No special option for this socket type. + errno = EINVAL; + return -1; +} + +int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_) +{ + // If there are no pipes we cannot send the message. + if (pipes.empty ()) { + errno = EAGAIN; + return -1; + } + + // Move to the next pipe (load-balancing). + current++; + if (current >= pipes.size ()) + current = 0; + + // TODO: Implement this once queue limits are in-place. + zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_))); + + // Push message to the selected pipe. + pipes [current]->write (msg_); + pipes [current]->flush (); + + // Detach the message from the data buffer. + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + + return 0; +} + +int zmq::downstream_t::xflush () +{ + // TODO: Maybe there's a point in flushing messages downstream. + // It may be useful in the case where number of messages in a single + // transaction is much greater than the number of attached pipes. + errno = ENOTSUP; + return -1; + +} + +int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + errno = ENOTSUP; + return -1; +} + +bool zmq::downstream_t::xhas_in () +{ + return false; +} + +bool zmq::downstream_t::xhas_out () +{ + // TODO: Modify this code once pipe limits are in place. + return true; +} + + diff --git a/src/downstream.hpp b/src/downstream.hpp new file mode 100644 index 0000000..c6a7ed8 --- /dev/null +++ b/src/downstream.hpp @@ -0,0 +1,64 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__ +#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + + class downstream_t : public socket_base_t + { + public: + + downstream_t (class app_thread_t *parent_); + ~downstream_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // List of outbound pipes. + typedef yarray_t pipes_t; + pipes_t pipes; + + // Points to the last pipe that the most recent message was sent to. + pipes_t::size_type current; + + downstream_t (const downstream_t&); + void operator = (const downstream_t&); + }; + +} + +#endif diff --git a/src/p2p.hpp b/src/p2p.hpp index 1fd7e34..32d7755 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_P2P_INCLUDED__ -#define __ZMQ_P2P_INCLUDED__ +#ifndef __ZMQ_P2P_HPP_INCLUDED__ +#define __ZMQ_P2P_HPP_INCLUDED__ #include "socket_base.hpp" diff --git a/src/pub.hpp b/src/pub.hpp index b3e868d..9dbcb4a 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_PUB_INCLUDED__ -#define __ZMQ_PUB_INCLUDED__ +#ifndef __ZMQ_PUB_HPP_INCLUDED__ +#define __ZMQ_PUB_HPP_INCLUDED__ #include "socket_base.hpp" #include "yarray.hpp" diff --git a/src/rep.cpp b/src/rep.cpp index 7599cb5..f06f4ab 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -71,7 +71,7 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_) } // Now both inpipe and outpipe are detached. Remove them from the lists. - if (in_pipes.index (pipe_) < active) + if (index < active) active--; in_pipes.erase (index); out_pipes.erase (index); diff --git a/src/rep.hpp b/src/rep.hpp index 3e87dc1..0b327aa 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_REP_INCLUDED__ -#define __ZMQ_REP_INCLUDED__ +#ifndef __ZMQ_REP_HPP_INCLUDED__ +#define __ZMQ_REP_HPP_INCLUDED__ #include "socket_base.hpp" #include "yarray.hpp" diff --git a/src/req.hpp b/src/req.hpp index 86554b5..756cc42 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_REQ_INCLUDED__ -#define __ZMQ_REQ_INCLUDED__ +#ifndef __ZMQ_REQ_HPP_INCLUDED__ +#define __ZMQ_REQ_HPP_INCLUDED__ #include "socket_base.hpp" #include "yarray.hpp" diff --git a/src/sub.hpp b/src/sub.hpp index fb881dc..8ad8a18 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_SUB_INCLUDED__ -#define __ZMQ_SUB_INCLUDED__ +#ifndef __ZMQ_SUB_HPP_INCLUDED__ +#define __ZMQ_SUB_HPP_INCLUDED__ #include #include diff --git a/src/upstream.cpp b/src/upstream.cpp new file mode 100644 index 0000000..da202f8 --- /dev/null +++ b/src/upstream.cpp @@ -0,0 +1,143 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../bindings/c/zmq.h" + +#include "upstream.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::upstream_t::upstream_t (class app_thread_t *parent_) : + socket_base_t (parent_), + active (0), + current (0) +{ + options.requires_in = true; + options.requires_out = false; +} + +zmq::upstream_t::~upstream_t () +{ +} + +void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (inpipe_ && !outpipe_); + + pipes.push_back (inpipe_); + pipes.swap (active, pipes.size () - 1); + active++; +} + +void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_) +{ + // Remove the pipe from the list; adjust number of active pipes + // accordingly. + zmq_assert (pipe_); + pipes_t::size_type index = pipes.index (pipe_); + if (index < active) + active--; + pipes.erase (index); +} + +void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_) +{ + // There are no outpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::upstream_t::xkill (class reader_t *pipe_) +{ + // Move the pipe to the list of inactive pipes. + active--; + pipes.swap (pipes.index (pipe_), active); +} + +void zmq::upstream_t::xrevive (class reader_t *pipe_) +{ + // Move the pipe to the list of active pipes. + pipes.swap (pipes.index (pipe_), active); + active++; +} + +int zmq::upstream_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + // No special options for this socket type. + errno = EINVAL; + return -1; +} + +int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_) +{ + errno = ENOTSUP; + return -1; +} + +int zmq::upstream_t::xflush () +{ + errno = ENOTSUP; + return -1; +} + +int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + // Deallocate old content of the message. + zmq_msg_close (msg_); + + // Round-robin over the pipes to get next message. + for (int count = active; count != 0; count--) { + bool fetched = pipes [current]->read (msg_); + current++; + if (current >= active) + current = 0; + if (fetched) + return 0; + } + + // No message is available. Initialise the output parameter + // to be a 0-byte message. + zmq_msg_init (msg_); + errno = EAGAIN; + return -1; +} + +bool zmq::upstream_t::xhas_in () +{ + // Note that messing with current doesn't break the fairness of fair + // queueing algorithm. If there are no messages available current will + // get back to its original value. Otherwise it'll point to the first + // pipe holding messages, skipping only pipes with no messages available. + for (int count = active; count != 0; count--) { + if (pipes [current]->check_read ()) + return true; + current++; + if (current >= active) + current = 0; + } + + return false; +} + +bool zmq::upstream_t::xhas_out () +{ + return false; +} + diff --git a/src/upstream.hpp b/src/upstream.hpp new file mode 100644 index 0000000..0e2f5ad --- /dev/null +++ b/src/upstream.hpp @@ -0,0 +1,69 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__ +#define __ZMQ_UPSTREAM_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + + class upstream_t : public socket_base_t + { + public: + + upstream_t (class app_thread_t *parent_); + ~upstream_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // Inbound pipes. + typedef yarray_t pipes_t; + pipes_t pipes; + + // Number of active pipes. All the active pipes are located at the + // beginning of the pipes array. + pipes_t::size_type active; + + // Index of the next bound pipe to read a message from. + pipes_t::size_type current; + + upstream_t (const upstream_t&); + void operator = (const upstream_t&); + + }; + +} + +#endif -- cgit v1.2.3