summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-12-13 14:45:23 +0100
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-12-13 14:45:23 +0100
commitfa6bf24d8030b0e54fd25b167064482e4cf08a36 (patch)
tree9f8f833da9607c9ba3376b45aa5cfafa4305e59c
parentc43aded531014895973c283fdd82bb2e5e85c789 (diff)
XREP & XREQ socket types added; zmq_queue device added
-rw-r--r--bindings/c/zmq.h6
-rw-r--r--bindings/cl/zeromq.lisp6
-rw-r--r--bindings/java/org/zmq/Socket.java6
-rw-r--r--bindings/python/pyzmq.cpp6
-rw-r--r--bindings/ruby/rbzmq.cpp2
-rw-r--r--configure.in15
-rw-r--r--devices/Makefile.am8
-rw-r--r--devices/zmq_forwarder/Makefile.am2
-rw-r--r--devices/zmq_forwarder/zmq_forwarder.cpp2
-rw-r--r--devices/zmq_queue/Makefile.am9
-rw-r--r--devices/zmq_queue/zmq_queue.cpp122
-rw-r--r--devices/zmq_streamer/Makefile.am2
-rw-r--r--devices/zmq_streamer/zmq_streamer.cpp2
-rw-r--r--src/Makefile.am4
-rw-r--r--src/app_thread.cpp8
-rw-r--r--src/pgm_sender.cpp4
-rw-r--r--src/xrep.cpp95
-rw-r--r--src/xrep.hpp57
-rw-r--r--src/xreq.cpp95
-rw-r--r--src/xreq.hpp57
20 files changed, 490 insertions, 18 deletions
diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h
index f0d59b1..37bad52 100644
--- a/bindings/c/zmq.h
+++ b/bindings/c/zmq.h
@@ -149,8 +149,10 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_SUB 2
#define ZMQ_REQ 3
#define ZMQ_REP 4
-#define ZMQ_UPSTREAM 5
-#define ZMQ_DOWNSTREAM 6
+#define ZMQ_XREQ 5
+#define ZMQ_XREP 6
+#define ZMQ_UPSTREAM 7
+#define ZMQ_DOWNSTREAM 8
#define ZMQ_HWM 1
#define ZMQ_LWM 2
diff --git a/bindings/cl/zeromq.lisp b/bindings/cl/zeromq.lisp
index 03befd5..a8b9c5c 100644
--- a/bindings/cl/zeromq.lisp
+++ b/bindings/cl/zeromq.lisp
@@ -130,8 +130,10 @@
(defconstant sub 2)
(defconstant req 3)
(defconstant rep 4)
-(defconstant upstream 5)
-(defconstant downstream 6)
+(defconstant xreq 5)
+(defconstant xrep 6)
+(defconstant upstream 7)
+(defconstant downstream 8)
(defcfun* ("zmq_socket" socket) :pointer
(context :pointer)
diff --git a/bindings/java/org/zmq/Socket.java b/bindings/java/org/zmq/Socket.java
index 935fade..075e322 100644
--- a/bindings/java/org/zmq/Socket.java
+++ b/bindings/java/org/zmq/Socket.java
@@ -34,8 +34,10 @@ public class Socket
public static final int SUB = 2;
public static final int REQ = 3;
public static final int REP = 4;
- public static final int UPSTREAM = 4;
- public static final int DOWNSTREAM = 4;
+ public static final int XREQ = 5;
+ public static final int XREP = 6;
+ public static final int UPSTREAM = 7;
+ public static final int DOWNSTREAM = 8;
public static final int HWM = 1;
public static final int LWM = 2;
diff --git a/bindings/python/pyzmq.cpp b/bindings/python/pyzmq.cpp
index f171eab..75f872d 100644
--- a/bindings/python/pyzmq.cpp
+++ b/bindings/python/pyzmq.cpp
@@ -498,6 +498,12 @@ PyMODINIT_FUNC initlibpyzmq ()
t = PyInt_FromLong (ZMQ_REP);
PyDict_SetItemString (dict, "REP", t);
Py_DECREF (t);
+ t = PyInt_FromLong (ZMQ_XREQ);
+ PyDict_SetItemString (dict, "XREQ", t);
+ Py_DECREF (t);
+ t = PyInt_FromLong (ZMQ_XREP);
+ PyDict_SetItemString (dict, "XREP", t);
+ Py_DECREF (t);
t = PyInt_FromLong (ZMQ_UPSTREAM);
PyDict_SetItemString (dict, "UPSTREAM", t);
Py_DECREF (t);
diff --git a/bindings/ruby/rbzmq.cpp b/bindings/ruby/rbzmq.cpp
index 43baeef..10ff55f 100644
--- a/bindings/ruby/rbzmq.cpp
+++ b/bindings/ruby/rbzmq.cpp
@@ -277,6 +277,8 @@ extern "C" void Init_librbzmq ()
rb_define_global_const ("PUB", INT2NUM (ZMQ_PUB));
rb_define_global_const ("REQ", INT2NUM (ZMQ_REQ));
rb_define_global_const ("REP", INT2NUM (ZMQ_REP));
+ rb_define_global_const ("XREQ", INT2NUM (ZMQ_XREQ));
+ rb_define_global_const ("XREP", INT2NUM (ZMQ_XREP));
rb_define_global_const ("UPSTREAM", INT2NUM (ZMQ_UPSTREAM));
rb_define_global_const ("DOWNSTREAM", INT2NUM (ZMQ_DOWNSTREAM));
diff --git a/configure.in b/configure.in
index 9d034ed..00c4720 100644
--- a/configure.in
+++ b/configure.in
@@ -520,6 +520,15 @@ if test "x$with_streamer" != "xno"; then
streamer="yes"
fi
+# Queue device
+queue="no"
+AC_ARG_WITH([queue], [AS_HELP_STRING([--with-queue],
+ [build queue device [default=no]])], [with_queue=yes], [with_queue=no])
+
+if test "x$with_queue" != "xno"; then
+ queue="yes"
+fi
+
# Perf
perf="no"
AC_ARG_WITH([perf], [AS_HELP_STRING([--with-perf],
@@ -555,7 +564,8 @@ AM_CONDITIONAL(BUILD_CPP, test "x$cppzmq" = "xyes")
AM_CONDITIONAL(BUILD_PGM2, test "x$pgm2_ext" = "xyes")
AM_CONDITIONAL(BUILD_NO_PGM, test "x$pgm2_ext" = "xno")
AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes")
-AM_CONDITIONAL(BUILD_STREAMER, test "x$streamer" = "xyes")
+AM_CONDITIONAL(BUILD_STREAMER, test "x$streamer" = "xyes")
+AM_CONDITIONAL(BUILD_QUEUE, test "x$queue" = "xyes")
AM_CONDITIONAL(BUILD_PERF, test "x$perf" = "xyes")
AM_CONDITIONAL(BUILD_CHAT, test "x$chat" = "xyes")
AM_CONDITIONAL(ON_MINGW, test "x$on_mingw32" = "xyes")
@@ -581,7 +591,7 @@ AC_OUTPUT(Makefile src/Makefile man/Makefile bindings/python/Makefile \
bindings/java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \
perf/python/Makefile perf/ruby/Makefile perf/java/Makefile src/libzmq.pc \
devices/Makefile devices/zmq_forwarder/Makefile \
- devices/zmq_streamer/Makefile bindings/Makefile \
+ devices/zmq_streamer/Makefile devices/zmq_queue/Makefile bindings/Makefile \
examples/Makefile examples/chat/Makefile)
AC_MSG_RESULT([])
@@ -616,6 +626,7 @@ AC_MSG_RESULT([ inproc: yes])
AC_MSG_RESULT([ Devices:])
AC_MSG_RESULT([ Forwarder: $forwarder])
AC_MSG_RESULT([ Streamer: $streamer])
+AC_MSG_RESULT([ Queue: $queue])
AC_MSG_RESULT([ Performance tests: $perf])
AC_MSG_RESULT([ Examples:])
AC_MSG_RESULT([ Chat: $chat])
diff --git a/devices/Makefile.am b/devices/Makefile.am
index ab18976..4f20456 100644
--- a/devices/Makefile.am
+++ b/devices/Makefile.am
@@ -6,5 +6,9 @@ if BUILD_STREAMER
STREAMER_DIR = zmq_streamer
endif
-SUBDIRS = $(FORWARDER_DIR) $(STREAMER_DIR)
-DIST_SUBDIRS = zmq_forwarder zmq_streamer
+if BUILD_QUEUE
+QUEUE_DIR = zmq_queue
+endif
+
+SUBDIRS = $(FORWARDER_DIR) $(STREAMER_DIR) $(QUEUE_DIR)
+DIST_SUBDIRS = zmq_forwarder zmq_streamer zmq_queue
diff --git a/devices/zmq_forwarder/Makefile.am b/devices/zmq_forwarder/Makefile.am
index ff51d88..ee48cda 100644
--- a/devices/zmq_forwarder/Makefile.am
+++ b/devices/zmq_forwarder/Makefile.am
@@ -1,4 +1,4 @@
-INCLUDES = -I$(top_builddir)/bindings/c
+INCLUDES = -I$(top_srcdir)/bindings/cpp -I$(top_srcdir)/bindings/c
bin_PROGRAMS = zmq_forwarder
diff --git a/devices/zmq_forwarder/zmq_forwarder.cpp b/devices/zmq_forwarder/zmq_forwarder.cpp
index d29ed62..277af72 100644
--- a/devices/zmq_forwarder/zmq_forwarder.cpp
+++ b/devices/zmq_forwarder/zmq_forwarder.cpp
@@ -29,7 +29,7 @@ int main (int argc, char *argv [])
XMLNode root = XMLNode::parseFile (argv [1]);
if (root.isEmpty ()) {
- fprintf (stderr, "configuration file not found\n");
+ fprintf (stderr, "configuration file not found or not an XML file\n");
return 1;
}
diff --git a/devices/zmq_queue/Makefile.am b/devices/zmq_queue/Makefile.am
new file mode 100644
index 0000000..8d3ec36
--- /dev/null
+++ b/devices/zmq_queue/Makefile.am
@@ -0,0 +1,9 @@
+INCLUDES = -I$(top_srcdir)/bindings/cpp -I$(top_srcdir)/bindings/c
+
+bin_PROGRAMS = zmq_queue
+
+zmq_queue_LDADD = $(top_builddir)/src/libzmq.la
+zmq_queue_SOURCES = zmq_queue.cpp
+zmq_queue_CXXFLAGS = -Wall -pedantic -Werror
+
+
diff --git a/devices/zmq_queue/zmq_queue.cpp b/devices/zmq_queue/zmq_queue.cpp
new file mode 100644
index 0000000..78ccd2d
--- /dev/null
+++ b/devices/zmq_queue/zmq_queue.cpp
@@ -0,0 +1,122 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../../bindings/cpp/zmq.hpp"
+#include "../../foreign/xmlParser/xmlParser.cpp"
+
+int main (int argc, char *argv [])
+{
+ if (argc != 2) {
+ fprintf (stderr, "usage: zmq_queue <config-file>\n");
+ return 1;
+ }
+
+ XMLNode root = XMLNode::parseFile (argv [1]);
+ if (root.isEmpty ()) {
+ fprintf (stderr, "configuration file not found or not an XML file\n");
+ return 1;
+ }
+
+ if (strcmp (root.getName (), "queue") != 0) {
+ fprintf (stderr, "root element in the configuration file should be "
+ "named 'queue'\n");
+ return 1;
+ }
+
+ XMLNode in_node = root.getChildNode ("in");
+ if (in_node.isEmpty ()) {
+ fprintf (stderr, "'in' node is missing in the configuration file\n");
+ return 1;
+ }
+
+ XMLNode out_node = root.getChildNode ("out");
+ if (out_node.isEmpty ()) {
+ fprintf (stderr, "'out' node is missing in the configuration file\n");
+ return 1;
+ }
+
+ // TODO: make the number of I/O threads configurable.
+ zmq::context_t ctx (1, 1);
+ zmq::socket_t in_socket (ctx, ZMQ_XREP);
+ zmq::socket_t out_socket (ctx, ZMQ_XREQ);
+
+ int n = 0;
+ while (true) {
+ XMLNode bind = in_node.getChildNode ("bind", n);
+ if (bind.isEmpty ())
+ break;
+ const char *addr = bind.getAttribute ("addr");
+ if (!addr) {
+ fprintf (stderr, "'bind' node is missing 'addr' attribute\n");
+ return 1;
+ }
+ in_socket.bind (addr);
+ n++;
+ }
+
+ n = 0;
+ while (true) {
+ XMLNode connect = in_node.getChildNode ("connect", n);
+ if (connect.isEmpty ())
+ break;
+ const char *addr = connect.getAttribute ("addr");
+ if (!addr) {
+ fprintf (stderr, "'connect' node is missing 'addr' attribute\n");
+ return 1;
+ }
+ in_socket.connect (addr);
+ n++;
+ }
+
+ n = 0;
+ while (true) {
+ XMLNode bind = out_node.getChildNode ("bind", n);
+ if (bind.isEmpty ())
+ break;
+ const char *addr = bind.getAttribute ("addr");
+ if (!addr) {
+ fprintf (stderr, "'bind' node is missing 'addr' attribute\n");
+ return 1;
+ }
+ out_socket.bind (addr);
+ n++;
+ }
+
+ n = 0;
+ while (true) {
+ XMLNode connect = out_node.getChildNode ("connect", n);
+ if (connect.isEmpty ())
+ break;
+ const char *addr = connect.getAttribute ("addr");
+ if (!addr) {
+ fprintf (stderr, "'connect' node is missing 'addr' attribute\n");
+ return 1;
+ }
+ out_socket.connect (addr);
+ n++;
+ }
+
+ zmq::message_t msg;
+ while (true) {
+ in_socket.recv (&msg);
+ out_socket.send (msg);
+ }
+
+ return 0;
+}
diff --git a/devices/zmq_streamer/Makefile.am b/devices/zmq_streamer/Makefile.am
index e3681bf..9e0ac62 100644
--- a/devices/zmq_streamer/Makefile.am
+++ b/devices/zmq_streamer/Makefile.am
@@ -1,4 +1,4 @@
-INCLUDES = -I$(top_builddir)/bindings/c
+INCLUDES = -I$(top_srcdir)/bindings/cpp -I$(top_srcdir)/bindings/c
bin_PROGRAMS = zmq_streamer
diff --git a/devices/zmq_streamer/zmq_streamer.cpp b/devices/zmq_streamer/zmq_streamer.cpp
index 84e6569..c1b88ec 100644
--- a/devices/zmq_streamer/zmq_streamer.cpp
+++ b/devices/zmq_streamer/zmq_streamer.cpp
@@ -29,7 +29,7 @@ int main (int argc, char *argv [])
XMLNode root = XMLNode::parseFile (argv [1]);
if (root.isEmpty ()) {
- fprintf (stderr, "configuration file not found\n");
+ fprintf (stderr, "configuration file not found or not an XML file\n");
return 1;
}
diff --git a/src/Makefile.am b/src/Makefile.am
index bbfa7f5..a733408 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -105,6 +105,8 @@ libzmq_la_SOURCES = app_thread.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
+ xrep.hpp \
+ xreq.hpp \
yarray.hpp \
yarray_item.hpp \
ypipe.hpp \
@@ -150,6 +152,8 @@ libzmq_la_SOURCES = app_thread.hpp \
thread.cpp \
upstream.cpp \
uuid.cpp \
+ xrep.cpp \
+ xreq.cpp \
ypollset.cpp \
zmq.cpp \
zmq_connecter.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index a671822..308fc36 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -45,6 +45,8 @@
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
+#include "xreq.hpp"
+#include "xrep.hpp"
#include "upstream.hpp"
#include "downstream.hpp"
@@ -175,6 +177,12 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
case ZMQ_REP:
s = new rep_t (this);
break;
+ case ZMQ_XREQ:
+ s = new xreq_t (this);
+ break;
+ case ZMQ_XREP:
+ s = new xrep_t (this);
+ break;
case ZMQ_UPSTREAM:
s = new upstream_t (this);
break;
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 19fc0e2..69cb586 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -46,7 +46,6 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
write_pos (0),
first_message_offset (-1)
{
-
}
int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
@@ -56,7 +55,6 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
void zmq::pgm_sender_t::plug (i_inout *inout_)
{
-
// Alocate 2 fds for PGM socket.
int downlink_socket_fd = 0;
int uplink_socket_fd = 0;
@@ -119,7 +117,6 @@ void zmq::pgm_sender_t::in_event ()
void zmq::pgm_sender_t::out_event ()
{
-
// POLLOUT event from send socket. If write buffer is empty,
// try to read new data from the encoder.
if (write_pos == write_size) {
@@ -159,7 +156,6 @@ void zmq::pgm_sender_t::out_event ()
write_pos += nbytes;
}
-
}
size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_,
diff --git a/src/xrep.cpp b/src/xrep.cpp
new file mode 100644
index 0000000..7357967
--- /dev/null
+++ b/src/xrep.cpp
@@ -0,0 +1,95 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "xrep.hpp"
+#include "err.hpp"
+#include "pipe.hpp"
+
+zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
+ socket_base_t (parent_)
+{
+ options.requires_in = true;
+ options.requires_out = true;
+}
+
+zmq::xrep_t::~xrep_t ()
+{
+}
+
+void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::xrep_t::xkill (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::xrep_t::xrevive (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
+{
+ zmq_assert (false);
+}
+
+int zmq::xrep_t::xflush ()
+{
+ zmq_assert (false);
+}
+
+int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
+{
+ zmq_assert (false);
+}
+
+bool zmq::xrep_t::xhas_in ()
+{
+ zmq_assert (false);
+}
+
+bool zmq::xrep_t::xhas_out ()
+{
+ zmq_assert (false);
+}
+
+
diff --git a/src/xrep.hpp b/src/xrep.hpp
new file mode 100644
index 0000000..de42036
--- /dev/null
+++ b/src/xrep.hpp
@@ -0,0 +1,57 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_XREP_HPP_INCLUDED__
+#define __ZMQ_XREP_HPP_INCLUDED__
+
+#include "socket_base.hpp"
+#include "yarray.hpp"
+
+namespace zmq
+{
+
+ class xrep_t : public socket_base_t
+ {
+ public:
+
+ xrep_t (class app_thread_t *parent_);
+ ~xrep_t ();
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ xrep_t (const xrep_t&);
+ void operator = (const xrep_t&);
+ };
+
+}
+
+#endif
diff --git a/src/xreq.cpp b/src/xreq.cpp
new file mode 100644
index 0000000..b404779
--- /dev/null
+++ b/src/xreq.cpp
@@ -0,0 +1,95 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "../bindings/c/zmq.h"
+
+#include "xreq.hpp"
+#include "err.hpp"
+#include "pipe.hpp"
+
+zmq::xreq_t::xreq_t (class app_thread_t *parent_) :
+ socket_base_t (parent_)
+{
+ options.requires_in = true;
+ options.requires_out = true;
+}
+
+zmq::xreq_t::~xreq_t ()
+{
+}
+
+void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,
+ class writer_t *outpipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::xreq_t::xdetach_inpipe (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::xreq_t::xdetach_outpipe (class writer_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::xreq_t::xkill (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+void zmq::xreq_t::xrevive (class reader_t *pipe_)
+{
+ zmq_assert (false);
+}
+
+int zmq::xreq_t::xsetsockopt (int option_, const void *optval_,
+ size_t optvallen_)
+{
+ errno = EINVAL;
+ return -1;
+}
+
+int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
+{
+ zmq_assert (false);
+}
+
+int zmq::xreq_t::xflush ()
+{
+ zmq_assert (false);
+}
+
+int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_)
+{
+ zmq_assert (false);
+}
+
+bool zmq::xreq_t::xhas_in ()
+{
+ zmq_assert (false);
+}
+
+bool zmq::xreq_t::xhas_out ()
+{
+ zmq_assert (false);
+}
+
+
diff --git a/src/xreq.hpp b/src/xreq.hpp
new file mode 100644
index 0000000..8d6a3b2
--- /dev/null
+++ b/src/xreq.hpp
@@ -0,0 +1,57 @@
+/*
+ Copyright (c) 2007-2009 FastMQ Inc.
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_XREQ_HPP_INCLUDED__
+#define __ZMQ_XREQ_HPP_INCLUDED__
+
+#include "socket_base.hpp"
+#include "yarray.hpp"
+
+namespace zmq
+{
+
+ class xreq_t : public socket_base_t
+ {
+ public:
+
+ xreq_t (class app_thread_t *parent_);
+ ~xreq_t ();
+
+ // Overloads of functions from socket_base_t.
+ void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
+ void xdetach_inpipe (class reader_t *pipe_);
+ void xdetach_outpipe (class writer_t *pipe_);
+ void xkill (class reader_t *pipe_);
+ void xrevive (class reader_t *pipe_);
+ int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ int xsend (zmq_msg_t *msg_, int flags_);
+ int xflush ();
+ int xrecv (zmq_msg_t *msg_, int flags_);
+ bool xhas_in ();
+ bool xhas_out ();
+
+ private:
+
+ xreq_t (const xreq_t&);
+ void operator = (const xreq_t&);
+ };
+
+}
+
+#endif