summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-11-24 11:23:10 +0100
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-11-24 11:23:10 +0100
commitc98fd6bc3f2a49d7cb0b820a07354168c98f60b7 (patch)
tree894f3dc0e6221284c6608a8819488f4ffede1085 /src
parent5cd98bc575517ea72c435770a5313711484f7d34 (diff)
ZMQII-25: Implement streamed request/reply
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am4
-rw-r--r--src/app_thread.cpp14
-rw-r--r--src/downstream.cpp131
-rw-r--r--src/downstream.hpp64
-rw-r--r--src/p2p.hpp4
-rw-r--r--src/pub.hpp4
-rw-r--r--src/rep.cpp2
-rw-r--r--src/rep.hpp4
-rw-r--r--src/req.hpp4
-rw-r--r--src/sub.hpp4
-rw-r--r--src/upstream.cpp143
-rw-r--r--src/upstream.hpp69
12 files changed, 433 insertions, 14 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 91fb555..3d038b7 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -77,6 +77,7 @@ libzmq_la_SOURCES = app_thread.hpp \
decoder.hpp \
devpoll.hpp \
dispatcher.hpp \
+ downstream.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
@@ -117,6 +118,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
+ upstream.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
@@ -135,6 +137,7 @@ libzmq_la_SOURCES = app_thread.hpp \
app_thread.cpp \
devpoll.cpp \
dispatcher.cpp \
+ downstream.cpp \
epoll.cpp \
err.cpp \
fd_signaler.cpp \
@@ -162,6 +165,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
+ upstream.cpp \
uuid.cpp \
ypollset.cpp \
zmq.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index fbda335..a671822 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -40,11 +40,13 @@
#include "pipe.hpp"
#include "config.hpp"
#include "socket_base.hpp"
+#include "p2p.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
-#include "p2p.hpp"
+#include "upstream.hpp"
+#include "downstream.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@@ -158,6 +160,9 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
{
socket_base_t *s = NULL;
switch (type_) {
+ case ZMQ_P2P:
+ s = new p2p_t (this);
+ break;
case ZMQ_PUB:
s = new pub_t (this);
break;
@@ -170,8 +175,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
case ZMQ_REP:
s = new rep_t (this);
break;
- case ZMQ_P2P:
- s = new p2p_t (this);
+ case ZMQ_UPSTREAM:
+ s = new upstream_t (this);
+ break;
+ case ZMQ_DOWNSTREAM:
+ s = new downstream_t (this);
break;
default:
// TODO: This should be EINVAL.
diff --git a/src/downstream.cpp b/src/downstream.cpp
new file mode 100644
index 0000000..4f994e6
--- /dev/null
+++ b/src/downstream.cpp
@@ -0,0 +1,131 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "downstream.hpp"
+#include "err.hpp"
+#include "pipe.hpp"
+
+zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
+ socket_base_t (parent_),
+ current (0)
+{
+ options.requires_in = false;
+ options.requires_out = true;
+}
+
+zmq::downstream_t::~downstream_t ()
+{
+}
+
+void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (!inpipe_ && outpipe_);
+ pipes.push_back (outpipe_);
+}
+
+void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ // There are no inpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ zmq_assert (pipe_);
+ pipes.erase (pipes.index (pipe_));
+}
+
+void zmq::downstream_t::xkill (class reader_t *pipe_)
+{
+ // There are no inpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+void zmq::downstream_t::xrevive (class reader_t *pipe_)
+{
+ // There are no inpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ // No special option for this socket type.
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_)
+{
+ // If there are no pipes we cannot send the message.
+ if (pipes.empty ()) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ // Move to the next pipe (load-balancing).
+ current++;
+ if (current >= pipes.size ())
+ current = 0;
+
+ // TODO: Implement this once queue limits are in-place.
+ zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_)));
+
+ // Push message to the selected pipe.
+ pipes [current]->write (msg_);
+ pipes [current]->flush ();
+
+ // Detach the message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return 0;
+}
+
+int zmq::downstream_t::xflush ()
+{
+ // TODO: Maybe there's a point in flushing messages downstream.
+ // It may be useful in the case where number of messages in a single
+ // transaction is much greater than the number of attached pipes.
+ errno = ENOTSUP;
+ return -1;
+
+}
+
+int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+bool zmq::downstream_t::xhas_in ()
+{
+ return false;
+}
+
+bool zmq::downstream_t::xhas_out ()
+{
+ // TODO: Modify this code once pipe limits are in place.
+ return true;
+}
+
+
diff --git a/src/downstream.hpp b/src/downstream.hpp
new file mode 100644
index 0000000..c6a7ed8
--- /dev/null
+++ b/src/downstream.hpp
@@ -0,0 +1,64 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__
+#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__
+
+#include "socket_base.hpp"
+#include "yarray.hpp"
+
+namespace zmq
+{
+
+ class downstream_t : public socket_base_t
+ {
+ public:
+
+ downstream_t (class app_thread_t *parent_);
+ ~downstream_t ();
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ // List of outbound pipes.
+ typedef yarray_t <class writer_t> pipes_t;
+ pipes_t pipes;
+
+ // Points to the last pipe that the most recent message was sent to.
+ pipes_t::size_type current;
+
+ downstream_t (const downstream_t&);
+ void operator = (const downstream_t&);
+ };
+
+}
+
+#endif
diff --git a/src/p2p.hpp b/src/p2p.hpp
index 1fd7e34..32d7755 100644
--- a/src/p2p.hpp
+++ b/src/p2p.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_P2P_INCLUDED__
-#define __ZMQ_P2P_INCLUDED__
+#ifndef __ZMQ_P2P_HPP_INCLUDED__
+#define __ZMQ_P2P_HPP_INCLUDED__
#include "socket_base.hpp"
diff --git a/src/pub.hpp b/src/pub.hpp
index b3e868d..9dbcb4a 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_PUB_INCLUDED__
-#define __ZMQ_PUB_INCLUDED__
+#ifndef __ZMQ_PUB_HPP_INCLUDED__
+#define __ZMQ_PUB_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
diff --git a/src/rep.cpp b/src/rep.cpp
index 7599cb5..f06f4ab 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -71,7 +71,7 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
}
// Now both inpipe and outpipe are detached. Remove them from the lists.
- if (in_pipes.index (pipe_) < active)
+ if (index < active)
active--;
in_pipes.erase (index);
out_pipes.erase (index);
diff --git a/src/rep.hpp b/src/rep.hpp
index 3e87dc1..0b327aa 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_REP_INCLUDED__
-#define __ZMQ_REP_INCLUDED__
+#ifndef __ZMQ_REP_HPP_INCLUDED__
+#define __ZMQ_REP_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
diff --git a/src/req.hpp b/src/req.hpp
index 86554b5..756cc42 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_REQ_INCLUDED__
-#define __ZMQ_REQ_INCLUDED__
+#ifndef __ZMQ_REQ_HPP_INCLUDED__
+#define __ZMQ_REQ_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
diff --git a/src/sub.hpp b/src/sub.hpp
index fb881dc..8ad8a18 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_SUB_INCLUDED__
-#define __ZMQ_SUB_INCLUDED__
+#ifndef __ZMQ_SUB_HPP_INCLUDED__
+#define __ZMQ_SUB_HPP_INCLUDED__
#include <set>
#include <string>
diff --git a/src/upstream.cpp b/src/upstream.cpp
new file mode 100644
index 0000000..da202f8
--- /dev/null
+++ b/src/upstream.cpp
@@ -0,0 +1,143 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "upstream.hpp"
+#include "err.hpp"
+#include "pipe.hpp"
+
+zmq::upstream_t::upstream_t (class app_thread_t *parent_) :
+ socket_base_t (parent_),
+ active (0),
+ current (0)
+{
+ options.requires_in = true;
+ options.requires_out = false;
+}
+
+zmq::upstream_t::~upstream_t ()
+{
+}
+
+void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (inpipe_ && !outpipe_);
+
+ pipes.push_back (inpipe_);
+ pipes.swap (active, pipes.size () - 1);
+ active++;
+}
+
+void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ // Remove the pipe from the list; adjust number of active pipes
+ // accordingly.
+ zmq_assert (pipe_);
+ pipes_t::size_type index = pipes.index (pipe_);
+ if (index < active)
+ active--;
+ pipes.erase (index);
+}
+
+void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ // There are no outpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+void zmq::upstream_t::xkill (class reader_t *pipe_)
+{
+ // Move the pipe to the list of inactive pipes.
+ active--;
+ pipes.swap (pipes.index (pipe_), active);
+}
+
+void zmq::upstream_t::xrevive (class reader_t *pipe_)
+{
+ // Move the pipe to the list of active pipes.
+ pipes.swap (pipes.index (pipe_), active);
+ active++;
+}
+
+int zmq::upstream_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ // No special options for this socket type.
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+int zmq::upstream_t::xflush ()
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_)
+{
+ // Deallocate old content of the message.
+ zmq_msg_close (msg_);
+
+ // Round-robin over the pipes to get next message.
+ for (int count = active; count != 0; count--) {
+ bool fetched = pipes [current]->read (msg_);
+ current++;
+ if (current >= active)
+ current = 0;
+ if (fetched)
+ return 0;
+ }
+
+ // No message is available. Initialise the output parameter
+ // to be a 0-byte message.
+ zmq_msg_init (msg_);
+ errno = EAGAIN;
+ return -1;
+}
+
+bool zmq::upstream_t::xhas_in ()
+{
+ // Note that messing with current doesn't break the fairness of fair
+ // queueing algorithm. If there are no messages available current will
+ // get back to its original value. Otherwise it'll point to the first
+ // pipe holding messages, skipping only pipes with no messages available.
+ for (int count = active; count != 0; count--) {
+ if (pipes [current]->check_read ())
+ return true;
+ current++;
+ if (current >= active)
+ current = 0;
+ }
+
+ return false;
+}
+
+bool zmq::upstream_t::xhas_out ()
+{
+ return false;
+}
+
diff --git a/src/upstream.hpp b/src/upstream.hpp
new file mode 100644
index 0000000..0e2f5ad
--- /dev/null
+++ b/src/upstream.hpp
@@ -0,0 +1,69 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__
+#define __ZMQ_UPSTREAM_HPP_INCLUDED__
+
+#include "socket_base.hpp"
+#include "yarray.hpp"
+
+namespace zmq
+{
+
+ class upstream_t : public socket_base_t
+ {
+ public:
+
+ upstream_t (class app_thread_t *parent_);
+ ~upstream_t ();
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ // Inbound pipes.
+ typedef yarray_t <class reader_t> pipes_t;
+ pipes_t pipes;
+
+ // Number of active pipes. All the active pipes are located at the
+ // beginning of the pipes array.
+ pipes_t::size_type active;
+
+ // Index of the next bound pipe to read a message from.
+ pipes_t::size_type current;
+
+ upstream_t (const upstream_t&);
+ void operator = (const upstream_t&);
+
+ };
+
+}
+
+#endif