summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-11-24 11:23:10 +0100
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-11-24 11:23:10 +0100
commitc98fd6bc3f2a49d7cb0b820a07354168c98f60b7 (patch)
tree894f3dc0e6221284c6608a8819488f4ffede1085
parent5cd98bc575517ea72c435770a5313711484f7d34 (diff)
ZMQII-25: Implement streamed request/reply
-rw-r--r--bindings/c/zmq.h6
-rw-r--r--bindings/java/org/zmq/Socket.java2
-rw-r--r--bindings/python/pyzmq.cpp6
-rw-r--r--bindings/ruby/rbzmq.cpp2
-rw-r--r--configure.in15
-rw-r--r--devices/Makefile.am8
-rw-r--r--devices/zmq_forwarder/zmq_forwarder.cpp7
-rw-r--r--devices/zmq_streamer/Makefile.am9
-rw-r--r--devices/zmq_streamer/zmq_streamer.cpp122
-rw-r--r--man/man3/zmq_socket.39
-rw-r--r--src/Makefile.am4
-rw-r--r--src/app_thread.cpp14
-rw-r--r--src/downstream.cpp131
-rw-r--r--src/downstream.hpp64
-rw-r--r--src/p2p.hpp4
-rw-r--r--src/pub.hpp4
-rw-r--r--src/rep.cpp2
-rw-r--r--src/rep.hpp4
-rw-r--r--src/req.hpp4
-rw-r--r--src/sub.hpp4
-rw-r--r--src/upstream.cpp143
-rw-r--r--src/upstream.hpp69
22 files changed, 612 insertions, 21 deletions
diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h
index 9b11a1d..a65926e 100644
--- a/bindings/c/zmq.h
+++ b/bindings/c/zmq.h
@@ -188,6 +188,12 @@ ZMQ_EXPORT int zmq_term (void *context);
// the peer that issued the last received request.
#define ZMQ_REP 4
+// Socket to receive messages from up the stream.
+#define ZMQ_UPSTREAM 5
+
+// Socket to send messages downstream.
+#define ZMQ_DOWNSTREAM 6
+
// Open a socket. 'type' is one of the socket types defined above.
//
// Errors: EINVAL - invalid socket type.
diff --git a/bindings/java/org/zmq/Socket.java b/bindings/java/org/zmq/Socket.java
index 501bc16..396a6a0 100644
--- a/bindings/java/org/zmq/Socket.java
+++ b/bindings/java/org/zmq/Socket.java
@@ -34,6 +34,8 @@ public class Socket
public static final int SUB = 2;
public static final int REQ = 3;
public static final int REP = 4;
+ public static final int UPSTREAM = 4;
+ public static final int DOWNSTREAM = 4;
public static final int HWM = 1;
public static final int LWM = 2;
diff --git a/bindings/python/pyzmq.cpp b/bindings/python/pyzmq.cpp
index b180bcd..26ca7ac 100644
--- a/bindings/python/pyzmq.cpp
+++ b/bindings/python/pyzmq.cpp
@@ -498,6 +498,12 @@ PyMODINIT_FUNC initlibpyzmq ()
t = PyInt_FromLong (ZMQ_REP);
PyDict_SetItemString (dict, "REP", t);
Py_DECREF (t);
+ t = PyInt_FromLong (ZMQ_UPSTREAM);
+ PyDict_SetItemString (dict, "UPSTREAM", t);
+ Py_DECREF (t);
+ t = PyInt_FromLong (ZMQ_DOWNSTREAM);
+ PyDict_SetItemString (dict, "DOWNSTREAM", t);
+ Py_DECREF (t);
t = PyInt_FromLong (ZMQ_HWM);
PyDict_SetItemString (dict, "HWM", t);
Py_DECREF (t);
diff --git a/bindings/ruby/rbzmq.cpp b/bindings/ruby/rbzmq.cpp
index 6112972..2a26ce1 100644
--- a/bindings/ruby/rbzmq.cpp
+++ b/bindings/ruby/rbzmq.cpp
@@ -275,6 +275,8 @@ extern "C" void Init_librbzmq ()
rb_define_global_const ("PUB", INT2NUM (ZMQ_PUB));
rb_define_global_const ("REQ", INT2NUM (ZMQ_REQ));
rb_define_global_const ("REP", INT2NUM (ZMQ_REP));
+ rb_define_global_const ("UPSTREAM", INT2NUM (ZMQ_UPSTREAM));
+ rb_define_global_const ("DOWNSTREAM", INT2NUM (ZMQ_DOWNSTREAM));
rb_define_global_const ("POLL", INT2NUM (ZMQ_POLL));
}
diff --git a/configure.in b/configure.in
index c2cf678..bd3a0f4 100644
--- a/configure.in
+++ b/configure.in
@@ -590,6 +590,14 @@ if test "x$with_forwarder" != "xno"; then
forwarder="yes"
fi
+# streamer device
+streamer="no"
+AC_ARG_WITH([streamer], [AS_HELP_STRING([--with-streamer],
+ [build streamer device [default=no]])], [with_streamer=yes], [with_streamer=no])
+
+if test "x$with_streamer" != "xno"; then
+ streamer="yes"
+fi
# Perf
perf="no"
@@ -618,7 +626,8 @@ AM_CONDITIONAL(BUILD_CPP, test "x$cppzmq" = "xyes")
AM_CONDITIONAL(BUILD_PGM1, test "x$pgm1_ext" = "xyes")
AM_CONDITIONAL(BUILD_PGM2, test "x$pgm2_ext" = "xyes")
AM_CONDITIONAL(BUILD_NO_PGM, test "x$pgm2_ext" = "xno" -a "x$pgm1_ext" = "xno")
-AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes")
+AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes")
+AM_CONDITIONAL(BUILD_STREAMER, test "x$streamer" = "xyes")
AM_CONDITIONAL(BUILD_PERF, test "x$perf" = "xyes")
AM_CONDITIONAL(ON_MINGW, test "x$on_mingw32" = "xyes")
AM_CONDITIONAL(BUILD_PGM2_EXAMPLES, test "x$with_pgm2_ext" = "xyes")
@@ -641,7 +650,8 @@ AC_OUTPUT(Makefile src/Makefile man/Makefile bindings/python/Makefile \
bindings/python/setup.py bindings/ruby/Makefile \
bindings/java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \
perf/python/Makefile perf/ruby/Makefile perf/java/Makefile src/libzmq.pc \
- devices/Makefile devices/zmq_forwarder/Makefile bindings/Makefile)
+ devices/Makefile devices/zmq_forwarder/Makefile \
+ devices/zmq_streamer/Makefile bindings/Makefile)
AC_MSG_RESULT([])
AC_MSG_RESULT([ ******************************************************** ])
@@ -676,6 +686,7 @@ AC_MSG_RESULT([ PGM: no])
fi
AC_MSG_RESULT([ Devices:])
AC_MSG_RESULT([ forwarder: $forwarder])
+AC_MSG_RESULT([ streamer: $streamer])
AC_MSG_RESULT([ Performance tests: $perf])
AC_MSG_RESULT([])
AC_MSG_RESULT([ ******************************************************** ])
diff --git a/devices/Makefile.am b/devices/Makefile.am
index 4cbad14..ab18976 100644
--- a/devices/Makefile.am
+++ b/devices/Makefile.am
@@ -2,5 +2,9 @@ if BUILD_FORWARDER
FORWARDER_DIR = zmq_forwarder
endif
-SUBDIRS = $(FORWARDER_DIR)
-DIST_SUBDIRS = zmq_forwarder
+if BUILD_STREAMER
+STREAMER_DIR = zmq_streamer
+endif
+
+SUBDIRS = $(FORWARDER_DIR) $(STREAMER_DIR)
+DIST_SUBDIRS = zmq_forwarder zmq_streamer
diff --git a/devices/zmq_forwarder/zmq_forwarder.cpp b/devices/zmq_forwarder/zmq_forwarder.cpp
index 32af5dd..d29ed62 100644
--- a/devices/zmq_forwarder/zmq_forwarder.cpp
+++ b/devices/zmq_forwarder/zmq_forwarder.cpp
@@ -23,7 +23,7 @@
int main (int argc, char *argv [])
{
if (argc != 2) {
- fprintf (stderr, "usage: forwarder <config-file>\n");
+ fprintf (stderr, "usage: zmq_forwarder <config-file>\n");
return 1;
}
@@ -53,8 +53,9 @@ int main (int argc, char *argv [])
// TODO: make the number of I/O threads configurable.
zmq::context_t ctx (1, 1);
- zmq::socket_t in_socket (ctx, ZMQ_P2P);
- zmq::socket_t out_socket (ctx, ZMQ_P2P);
+ zmq::socket_t in_socket (ctx, ZMQ_SUB);
+ in_socket.setsockopt (ZMQ_SUBSCRIBE, "*", 1);
+ zmq::socket_t out_socket (ctx, ZMQ_PUB);
int n = 0;
while (true) {
diff --git a/devices/zmq_streamer/Makefile.am b/devices/zmq_streamer/Makefile.am
new file mode 100644
index 0000000..e3681bf
--- /dev/null
+++ b/devices/zmq_streamer/Makefile.am
@@ -0,0 +1,9 @@
+INCLUDES = -I$(top_builddir)/bindings/c
+
+bin_PROGRAMS = zmq_streamer
+
+zmq_streamer_LDADD = $(top_builddir)/src/libzmq.la
+zmq_streamer_SOURCES = zmq_streamer.cpp
+zmq_streamer_CXXFLAGS = -Wall -pedantic -Werror
+
+
diff --git a/devices/zmq_streamer/zmq_streamer.cpp b/devices/zmq_streamer/zmq_streamer.cpp
new file mode 100644
index 0000000..84e6569
--- /dev/null
+++ b/devices/zmq_streamer/zmq_streamer.cpp
@@ -0,0 +1,122 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../../bindings/cpp/zmq.hpp"
+#include "../../foreign/xmlParser/xmlParser.cpp"
+
+int main (int argc, char *argv [])
+{
+ if (argc != 2) {
+ fprintf (stderr, "usage: zmq_streamer <config-file>\n");
+ return 1;
+ }
+
+ XMLNode root = XMLNode::parseFile (argv [1]);
+ if (root.isEmpty ()) {
+ fprintf (stderr, "configuration file not found\n");
+ return 1;
+ }
+
+ if (strcmp (root.getName (), "streamer") != 0) {
+ fprintf (stderr, "root element in the configuration file should be "
+ "named 'streamer'\n");
+ return 1;
+ }
+
+ XMLNode in_node = root.getChildNode ("in");
+ if (in_node.isEmpty ()) {
+ fprintf (stderr, "'in' node is missing in the configuration file\n");
+ return 1;
+ }
+
+ XMLNode out_node = root.getChildNode ("out");
+ if (out_node.isEmpty ()) {
+ fprintf (stderr, "'out' node is missing in the configuration file\n");
+ return 1;
+ }
+
+ // TODO: make the number of I/O threads configurable.
+ zmq::context_t ctx (1, 1);
+ zmq::socket_t in_socket (ctx, ZMQ_UPSTREAM);
+ zmq::socket_t out_socket (ctx, ZMQ_DOWNSTREAM);
+
+ int n = 0;
+ while (true) {
+ XMLNode bind = in_node.getChildNode ("bind", n);
+ if (bind.isEmpty ())
+ break;
+ const char *addr = bind.getAttribute ("addr");
+ if (!addr) {
+ fprintf (stderr, "'bind' node is missing 'addr' attribute\n");
+ return 1;
+ }
+ in_socket.bind (addr);
+ n++;
+ }
+
+ n = 0;
+ while (true) {
+ XMLNode connect = in_node.getChildNode ("connect", n);
+ if (connect.isEmpty ())
+ break;
+ const char *addr = connect.getAttribute ("addr");
+ if (!addr) {
+ fprintf (stderr, "'connect' node is missing 'addr' attribute\n");
+ return 1;
+ }
+ in_socket.connect (addr);
+ n++;
+ }
+
+ n = 0;
+ while (true) {
+ XMLNode bind = out_node.getChildNode ("bind", n);
+ if (bind.isEmpty ())
+ break;
+ const char *addr = bind.getAttribute ("addr");
+ if (!addr) {
+ fprintf (stderr, "'bind' node is missing 'addr' attribute\n");
+ return 1;
+ }
+ out_socket.bind (addr);
+ n++;
+ }
+
+ n = 0;
+ while (true) {
+ XMLNode connect = out_node.getChildNode ("connect", n);
+ if (connect.isEmpty ())
+ break;
+ const char *addr = connect.getAttribute ("addr");
+ if (!addr) {
+ fprintf (stderr, "'connect' node is missing 'addr' attribute\n");
+ return 1;
+ }
+ out_socket.connect (addr);
+ n++;
+ }
+
+ zmq::message_t msg;
+ while (true) {
+ in_socket.recv (&msg);
+ out_socket.send (msg);
+ }
+
+ return 0;
+}
diff --git a/man/man3/zmq_socket.3 b/man/man3/zmq_socket.3
index 8b819b5..a73bba5 100644
--- a/man/man3/zmq_socket.3
+++ b/man/man3/zmq_socket.3
@@ -37,6 +37,15 @@ Socket to receive requests and send replies. This socket type allows
only an alternated sequence of recv's and send's. Each send is routed to
the peer that issued the last received request.
+.IP "\fBZMQ_UPSTREAM\fP"
+Socket to receive messages from up the stream. Messages are fair-queued
+from among all the connected peers. Send function is not implemented for
+this socket type.
+
+.IP "\fBZMQ_DOWNSTREAM\fP"
+Socket to send messages down stream. Messages are load-balanced among all the
+connected peers. Send function is not implemented for this socket type.
+
.SH RETURN VALUE
Function returns socket handle is successful. Otherwise it returns NULL and
sets errno to one of the values below.
diff --git a/src/Makefile.am b/src/Makefile.am
index 91fb555..3d038b7 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -77,6 +77,7 @@ libzmq_la_SOURCES = app_thread.hpp \
decoder.hpp \
devpoll.hpp \
dispatcher.hpp \
+ downstream.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
@@ -117,6 +118,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
+ upstream.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
@@ -135,6 +137,7 @@ libzmq_la_SOURCES = app_thread.hpp \
app_thread.cpp \
devpoll.cpp \
dispatcher.cpp \
+ downstream.cpp \
epoll.cpp \
err.cpp \
fd_signaler.cpp \
@@ -162,6 +165,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
+ upstream.cpp \
uuid.cpp \
ypollset.cpp \
zmq.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index fbda335..a671822 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -40,11 +40,13 @@
#include "pipe.hpp"
#include "config.hpp"
#include "socket_base.hpp"
+#include "p2p.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
-#include "p2p.hpp"
+#include "upstream.hpp"
+#include "downstream.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@@ -158,6 +160,9 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
{
socket_base_t *s = NULL;
switch (type_) {
+ case ZMQ_P2P:
+ s = new p2p_t (this);
+ break;
case ZMQ_PUB:
s = new pub_t (this);
break;
@@ -170,8 +175,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
case ZMQ_REP:
s = new rep_t (this);
break;
- case ZMQ_P2P:
- s = new p2p_t (this);
+ case ZMQ_UPSTREAM:
+ s = new upstream_t (this);
+ break;
+ case ZMQ_DOWNSTREAM:
+ s = new downstream_t (this);
break;
default:
// TODO: This should be EINVAL.
diff --git a/src/downstream.cpp b/src/downstream.cpp
new file mode 100644
index 0000000..4f994e6
--- /dev/null
+++ b/src/downstream.cpp
@@ -0,0 +1,131 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "downstream.hpp"
+#include "err.hpp"
+#include "pipe.hpp"
+
+zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
+ socket_base_t (parent_),
+ current (0)
+{
+ options.requires_in = false;
+ options.requires_out = true;
+}
+
+zmq::downstream_t::~downstream_t ()
+{
+}
+
+void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (!inpipe_ && outpipe_);
+ pipes.push_back (outpipe_);
+}
+
+void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ // There are no inpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ zmq_assert (pipe_);
+ pipes.erase (pipes.index (pipe_));
+}
+
+void zmq::downstream_t::xkill (class reader_t *pipe_)
+{
+ // There are no inpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+void zmq::downstream_t::xrevive (class reader_t *pipe_)
+{
+ // There are no inpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ // No special option for this socket type.
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_)
+{
+ // If there are no pipes we cannot send the message.
+ if (pipes.empty ()) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ // Move to the next pipe (load-balancing).
+ current++;
+ if (current >= pipes.size ())
+ current = 0;
+
+ // TODO: Implement this once queue limits are in-place.
+ zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_)));
+
+ // Push message to the selected pipe.
+ pipes [current]->write (msg_);
+ pipes [current]->flush ();
+
+ // Detach the message from the data buffer.
+ int rc = zmq_msg_init (msg_);
+ zmq_assert (rc == 0);
+
+ return 0;
+}
+
+int zmq::downstream_t::xflush ()
+{
+ // TODO: Maybe there's a point in flushing messages downstream.
+ // It may be useful in the case where number of messages in a single
+ // transaction is much greater than the number of attached pipes.
+ errno = ENOTSUP;
+ return -1;
+
+}
+
+int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+bool zmq::downstream_t::xhas_in ()
+{
+ return false;
+}
+
+bool zmq::downstream_t::xhas_out ()
+{
+ // TODO: Modify this code once pipe limits are in place.
+ return true;
+}
+
+
diff --git a/src/downstream.hpp b/src/downstream.hpp
new file mode 100644
index 0000000..c6a7ed8
--- /dev/null
+++ b/src/downstream.hpp
@@ -0,0 +1,64 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__
+#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__
+
+#include "socket_base.hpp"
+#include "yarray.hpp"
+
+namespace zmq
+{
+
+ class downstream_t : public socket_base_t
+ {
+ public:
+
+ downstream_t (class app_thread_t *parent_);
+ ~downstream_t ();
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ // List of outbound pipes.
+ typedef yarray_t <class writer_t> pipes_t;
+ pipes_t pipes;
+
+ // Points to the last pipe that the most recent message was sent to.
+ pipes_t::size_type current;
+
+ downstream_t (const downstream_t&);
+ void operator = (const downstream_t&);
+ };
+
+}
+
+#endif
diff --git a/src/p2p.hpp b/src/p2p.hpp
index 1fd7e34..32d7755 100644
--- a/src/p2p.hpp
+++ b/src/p2p.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_P2P_INCLUDED__
-#define __ZMQ_P2P_INCLUDED__
+#ifndef __ZMQ_P2P_HPP_INCLUDED__
+#define __ZMQ_P2P_HPP_INCLUDED__
#include "socket_base.hpp"
diff --git a/src/pub.hpp b/src/pub.hpp
index b3e868d..9dbcb4a 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_PUB_INCLUDED__
-#define __ZMQ_PUB_INCLUDED__
+#ifndef __ZMQ_PUB_HPP_INCLUDED__
+#define __ZMQ_PUB_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
diff --git a/src/rep.cpp b/src/rep.cpp
index 7599cb5..f06f4ab 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -71,7 +71,7 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
}
// Now both inpipe and outpipe are detached. Remove them from the lists.
- if (in_pipes.index (pipe_) < active)
+ if (index < active)
active--;
in_pipes.erase (index);
out_pipes.erase (index);
diff --git a/src/rep.hpp b/src/rep.hpp
index 3e87dc1..0b327aa 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_REP_INCLUDED__
-#define __ZMQ_REP_INCLUDED__
+#ifndef __ZMQ_REP_HPP_INCLUDED__
+#define __ZMQ_REP_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
diff --git a/src/req.hpp b/src/req.hpp
index 86554b5..756cc42 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_REQ_INCLUDED__
-#define __ZMQ_REQ_INCLUDED__
+#ifndef __ZMQ_REQ_HPP_INCLUDED__
+#define __ZMQ_REQ_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
diff --git a/src/sub.hpp b/src/sub.hpp
index fb881dc..8ad8a18 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_SUB_INCLUDED__
-#define __ZMQ_SUB_INCLUDED__
+#ifndef __ZMQ_SUB_HPP_INCLUDED__
+#define __ZMQ_SUB_HPP_INCLUDED__
#include <set>
#include <string>
diff --git a/src/upstream.cpp b/src/upstream.cpp
new file mode 100644
index 0000000..da202f8
--- /dev/null
+++ b/src/upstream.cpp
@@ -0,0 +1,143 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "upstream.hpp"
+#include "err.hpp"
+#include "pipe.hpp"
+
+zmq::upstream_t::upstream_t (class app_thread_t *parent_) :
+ socket_base_t (parent_),
+ active (0),
+ current (0)
+{
+ options.requires_in = true;
+ options.requires_out = false;
+}
+
+zmq::upstream_t::~upstream_t ()
+{
+}
+
+void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (inpipe_ && !outpipe_);
+
+ pipes.push_back (inpipe_);
+ pipes.swap (active, pipes.size () - 1);
+ active++;
+}
+
+void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ // Remove the pipe from the list; adjust number of active pipes
+ // accordingly.
+ zmq_assert (pipe_);
+ pipes_t::size_type index = pipes.index (pipe_);
+ if (index < active)
+ active--;
+ pipes.erase (index);
+}
+
+void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ // There are no outpipes, so this function shouldn't be called at all.
+ zmq_assert (false);
+}
+
+void zmq::upstream_t::xkill (class reader_t *pipe_)
+{
+ // Move the pipe to the list of inactive pipes.
+ active--;
+ pipes.swap (pipes.index (pipe_), active);
+}
+
+void zmq::upstream_t::xrevive (class reader_t *pipe_)
+{
+ // Move the pipe to the list of active pipes.
+ pipes.swap (pipes.index (pipe_), active);
+ active++;
+}
+
+int zmq::upstream_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ // No special options for this socket type.
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_)
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+int zmq::upstream_t::xflush ()
+{
+ errno = ENOTSUP;
+ return -1;
+}
+
+int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_)
+{
+ // Deallocate old content of the message.
+ zmq_msg_close (msg_);
+
+ // Round-robin over the pipes to get next message.
+ for (int count = active; count != 0; count--) {
+ bool fetched = pipes [current]->read (msg_);
+ current++;
+ if (current >= active)
+ current = 0;
+ if (fetched)
+ return 0;
+ }
+
+ // No message is available. Initialise the output parameter
+ // to be a 0-byte message.
+ zmq_msg_init (msg_);
+ errno = EAGAIN;
+ return -1;
+}
+
+bool zmq::upstream_t::xhas_in ()
+{
+ // Note that messing with current doesn't break the fairness of fair
+ // queueing algorithm. If there are no messages available current will
+ // get back to its original value. Otherwise it'll point to the first
+ // pipe holding messages, skipping only pipes with no messages available.
+ for (int count = active; count != 0; count--) {
+ if (pipes [current]->check_read ())
+ return true;
+ current++;
+ if (current >= active)
+ current = 0;
+ }
+
+ return false;
+}
+
+bool zmq::upstream_t::xhas_out ()
+{
+ return false;
+}
+
diff --git a/src/upstream.hpp b/src/upstream.hpp
new file mode 100644
index 0000000..0e2f5ad
--- /dev/null
+++ b/src/upstream.hpp
@@ -0,0 +1,69 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__
+#define __ZMQ_UPSTREAM_HPP_INCLUDED__
+
+#include "socket_base.hpp"
+#include "yarray.hpp"
+
+namespace zmq
+{
+
+ class upstream_t : public socket_base_t
+ {
+ public:
+
+ upstream_t (class app_thread_t *parent_);
+ ~upstream_t ();
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ // Inbound pipes.
+ typedef yarray_t <class reader_t> pipes_t;
+ pipes_t pipes;
+
+ // Number of active pipes. All the active pipes are located at the
+ // beginning of the pipes array.
+ pipes_t::size_type active;
+
+ // Index of the next bound pipe to read a message from.
+ pipes_t::size_type current;
+
+ upstream_t (const upstream_t&);
+ void operator = (const upstream_t&);
+
+ };
+
+}
+
+#endif