From fa6bf24d8030b0e54fd25b167064482e4cf08a36 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 13 Dec 2009 14:45:23 +0100 Subject: XREP & XREQ socket types added; zmq_queue device added --- bindings/c/zmq.h | 6 +- bindings/cl/zeromq.lisp | 6 +- bindings/java/org/zmq/Socket.java | 6 +- bindings/python/pyzmq.cpp | 6 ++ bindings/ruby/rbzmq.cpp | 2 + configure.in | 15 +++- devices/Makefile.am | 8 ++- devices/zmq_forwarder/Makefile.am | 2 +- devices/zmq_forwarder/zmq_forwarder.cpp | 2 +- devices/zmq_queue/Makefile.am | 9 +++ devices/zmq_queue/zmq_queue.cpp | 122 ++++++++++++++++++++++++++++++++ devices/zmq_streamer/Makefile.am | 2 +- devices/zmq_streamer/zmq_streamer.cpp | 2 +- src/Makefile.am | 4 ++ src/app_thread.cpp | 8 +++ src/pgm_sender.cpp | 4 -- src/xrep.cpp | 95 +++++++++++++++++++++++++ src/xrep.hpp | 57 +++++++++++++++ src/xreq.cpp | 95 +++++++++++++++++++++++++ src/xreq.hpp | 57 +++++++++++++++ 20 files changed, 490 insertions(+), 18 deletions(-) create mode 100644 devices/zmq_queue/Makefile.am create mode 100644 devices/zmq_queue/zmq_queue.cpp create mode 100644 src/xrep.cpp create mode 100644 src/xrep.hpp create mode 100644 src/xreq.cpp create mode 100644 src/xreq.hpp diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h index f0d59b1..37bad52 100644 --- a/bindings/c/zmq.h +++ b/bindings/c/zmq.h @@ -149,8 +149,10 @@ ZMQ_EXPORT int zmq_term (void *context); #define ZMQ_SUB 2 #define ZMQ_REQ 3 #define ZMQ_REP 4 -#define ZMQ_UPSTREAM 5 -#define ZMQ_DOWNSTREAM 6 +#define ZMQ_XREQ 5 +#define ZMQ_XREP 6 +#define ZMQ_UPSTREAM 7 +#define ZMQ_DOWNSTREAM 8 #define ZMQ_HWM 1 #define ZMQ_LWM 2 diff --git a/bindings/cl/zeromq.lisp b/bindings/cl/zeromq.lisp index 03befd5..a8b9c5c 100644 --- a/bindings/cl/zeromq.lisp +++ b/bindings/cl/zeromq.lisp @@ -130,8 +130,10 @@ (defconstant sub 2) (defconstant req 3) (defconstant rep 4) -(defconstant upstream 5) -(defconstant downstream 6) +(defconstant xreq 5) +(defconstant xrep 6) +(defconstant upstream 7) +(defconstant downstream 8) (defcfun* ("zmq_socket" socket) :pointer (context :pointer) diff --git a/bindings/java/org/zmq/Socket.java b/bindings/java/org/zmq/Socket.java index 935fade..075e322 100644 --- a/bindings/java/org/zmq/Socket.java +++ b/bindings/java/org/zmq/Socket.java @@ -34,8 +34,10 @@ public class Socket public static final int SUB = 2; public static final int REQ = 3; public static final int REP = 4; - public static final int UPSTREAM = 4; - public static final int DOWNSTREAM = 4; + public static final int XREQ = 5; + public static final int XREP = 6; + public static final int UPSTREAM = 7; + public static final int DOWNSTREAM = 8; public static final int HWM = 1; public static final int LWM = 2; diff --git a/bindings/python/pyzmq.cpp b/bindings/python/pyzmq.cpp index f171eab..75f872d 100644 --- a/bindings/python/pyzmq.cpp +++ b/bindings/python/pyzmq.cpp @@ -498,6 +498,12 @@ PyMODINIT_FUNC initlibpyzmq () t = PyInt_FromLong (ZMQ_REP); PyDict_SetItemString (dict, "REP", t); Py_DECREF (t); + t = PyInt_FromLong (ZMQ_XREQ); + PyDict_SetItemString (dict, "XREQ", t); + Py_DECREF (t); + t = PyInt_FromLong (ZMQ_XREP); + PyDict_SetItemString (dict, "XREP", t); + Py_DECREF (t); t = PyInt_FromLong (ZMQ_UPSTREAM); PyDict_SetItemString (dict, "UPSTREAM", t); Py_DECREF (t); diff --git a/bindings/ruby/rbzmq.cpp b/bindings/ruby/rbzmq.cpp index 43baeef..10ff55f 100644 --- a/bindings/ruby/rbzmq.cpp +++ b/bindings/ruby/rbzmq.cpp @@ -277,6 +277,8 @@ extern "C" void Init_librbzmq () rb_define_global_const ("PUB", INT2NUM (ZMQ_PUB)); rb_define_global_const ("REQ", INT2NUM (ZMQ_REQ)); rb_define_global_const ("REP", INT2NUM (ZMQ_REP)); + rb_define_global_const ("XREQ", INT2NUM (ZMQ_XREQ)); + rb_define_global_const ("XREP", INT2NUM (ZMQ_XREP)); rb_define_global_const ("UPSTREAM", INT2NUM (ZMQ_UPSTREAM)); rb_define_global_const ("DOWNSTREAM", INT2NUM (ZMQ_DOWNSTREAM)); diff --git a/configure.in b/configure.in index 9d034ed..00c4720 100644 --- a/configure.in +++ b/configure.in @@ -520,6 +520,15 @@ if test "x$with_streamer" != "xno"; then streamer="yes" fi +# Queue device +queue="no" +AC_ARG_WITH([queue], [AS_HELP_STRING([--with-queue], + [build queue device [default=no]])], [with_queue=yes], [with_queue=no]) + +if test "x$with_queue" != "xno"; then + queue="yes" +fi + # Perf perf="no" AC_ARG_WITH([perf], [AS_HELP_STRING([--with-perf], @@ -555,7 +564,8 @@ AM_CONDITIONAL(BUILD_CPP, test "x$cppzmq" = "xyes") AM_CONDITIONAL(BUILD_PGM2, test "x$pgm2_ext" = "xyes") AM_CONDITIONAL(BUILD_NO_PGM, test "x$pgm2_ext" = "xno") AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes") -AM_CONDITIONAL(BUILD_STREAMER, test "x$streamer" = "xyes") +AM_CONDITIONAL(BUILD_STREAMER, test "x$streamer" = "xyes") +AM_CONDITIONAL(BUILD_QUEUE, test "x$queue" = "xyes") AM_CONDITIONAL(BUILD_PERF, test "x$perf" = "xyes") AM_CONDITIONAL(BUILD_CHAT, test "x$chat" = "xyes") AM_CONDITIONAL(ON_MINGW, test "x$on_mingw32" = "xyes") @@ -581,7 +591,7 @@ AC_OUTPUT(Makefile src/Makefile man/Makefile bindings/python/Makefile \ bindings/java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \ perf/python/Makefile perf/ruby/Makefile perf/java/Makefile src/libzmq.pc \ devices/Makefile devices/zmq_forwarder/Makefile \ - devices/zmq_streamer/Makefile bindings/Makefile \ + devices/zmq_streamer/Makefile devices/zmq_queue/Makefile bindings/Makefile \ examples/Makefile examples/chat/Makefile) AC_MSG_RESULT([]) @@ -616,6 +626,7 @@ AC_MSG_RESULT([ inproc: yes]) AC_MSG_RESULT([ Devices:]) AC_MSG_RESULT([ Forwarder: $forwarder]) AC_MSG_RESULT([ Streamer: $streamer]) +AC_MSG_RESULT([ Queue: $queue]) AC_MSG_RESULT([ Performance tests: $perf]) AC_MSG_RESULT([ Examples:]) AC_MSG_RESULT([ Chat: $chat]) diff --git a/devices/Makefile.am b/devices/Makefile.am index ab18976..4f20456 100644 --- a/devices/Makefile.am +++ b/devices/Makefile.am @@ -6,5 +6,9 @@ if BUILD_STREAMER STREAMER_DIR = zmq_streamer endif -SUBDIRS = $(FORWARDER_DIR) $(STREAMER_DIR) -DIST_SUBDIRS = zmq_forwarder zmq_streamer +if BUILD_QUEUE +QUEUE_DIR = zmq_queue +endif + +SUBDIRS = $(FORWARDER_DIR) $(STREAMER_DIR) $(QUEUE_DIR) +DIST_SUBDIRS = zmq_forwarder zmq_streamer zmq_queue diff --git a/devices/zmq_forwarder/Makefile.am b/devices/zmq_forwarder/Makefile.am index ff51d88..ee48cda 100644 --- a/devices/zmq_forwarder/Makefile.am +++ b/devices/zmq_forwarder/Makefile.am @@ -1,4 +1,4 @@ -INCLUDES = -I$(top_builddir)/bindings/c +INCLUDES = -I$(top_srcdir)/bindings/cpp -I$(top_srcdir)/bindings/c bin_PROGRAMS = zmq_forwarder diff --git a/devices/zmq_forwarder/zmq_forwarder.cpp b/devices/zmq_forwarder/zmq_forwarder.cpp index d29ed62..277af72 100644 --- a/devices/zmq_forwarder/zmq_forwarder.cpp +++ b/devices/zmq_forwarder/zmq_forwarder.cpp @@ -29,7 +29,7 @@ int main (int argc, char *argv []) XMLNode root = XMLNode::parseFile (argv [1]); if (root.isEmpty ()) { - fprintf (stderr, "configuration file not found\n"); + fprintf (stderr, "configuration file not found or not an XML file\n"); return 1; } diff --git a/devices/zmq_queue/Makefile.am b/devices/zmq_queue/Makefile.am new file mode 100644 index 0000000..8d3ec36 --- /dev/null +++ b/devices/zmq_queue/Makefile.am @@ -0,0 +1,9 @@ +INCLUDES = -I$(top_srcdir)/bindings/cpp -I$(top_srcdir)/bindings/c + +bin_PROGRAMS = zmq_queue + +zmq_queue_LDADD = $(top_builddir)/src/libzmq.la +zmq_queue_SOURCES = zmq_queue.cpp +zmq_queue_CXXFLAGS = -Wall -pedantic -Werror + + diff --git a/devices/zmq_queue/zmq_queue.cpp b/devices/zmq_queue/zmq_queue.cpp new file mode 100644 index 0000000..78ccd2d --- /dev/null +++ b/devices/zmq_queue/zmq_queue.cpp @@ -0,0 +1,122 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../../bindings/cpp/zmq.hpp" +#include "../../foreign/xmlParser/xmlParser.cpp" + +int main (int argc, char *argv []) +{ + if (argc != 2) { + fprintf (stderr, "usage: zmq_queue \n"); + return 1; + } + + XMLNode root = XMLNode::parseFile (argv [1]); + if (root.isEmpty ()) { + fprintf (stderr, "configuration file not found or not an XML file\n"); + return 1; + } + + if (strcmp (root.getName (), "queue") != 0) { + fprintf (stderr, "root element in the configuration file should be " + "named 'queue'\n"); + return 1; + } + + XMLNode in_node = root.getChildNode ("in"); + if (in_node.isEmpty ()) { + fprintf (stderr, "'in' node is missing in the configuration file\n"); + return 1; + } + + XMLNode out_node = root.getChildNode ("out"); + if (out_node.isEmpty ()) { + fprintf (stderr, "'out' node is missing in the configuration file\n"); + return 1; + } + + // TODO: make the number of I/O threads configurable. + zmq::context_t ctx (1, 1); + zmq::socket_t in_socket (ctx, ZMQ_XREP); + zmq::socket_t out_socket (ctx, ZMQ_XREQ); + + int n = 0; + while (true) { + XMLNode bind = in_node.getChildNode ("bind", n); + if (bind.isEmpty ()) + break; + const char *addr = bind.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'bind' node is missing 'addr' attribute\n"); + return 1; + } + in_socket.bind (addr); + n++; + } + + n = 0; + while (true) { + XMLNode connect = in_node.getChildNode ("connect", n); + if (connect.isEmpty ()) + break; + const char *addr = connect.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'connect' node is missing 'addr' attribute\n"); + return 1; + } + in_socket.connect (addr); + n++; + } + + n = 0; + while (true) { + XMLNode bind = out_node.getChildNode ("bind", n); + if (bind.isEmpty ()) + break; + const char *addr = bind.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'bind' node is missing 'addr' attribute\n"); + return 1; + } + out_socket.bind (addr); + n++; + } + + n = 0; + while (true) { + XMLNode connect = out_node.getChildNode ("connect", n); + if (connect.isEmpty ()) + break; + const char *addr = connect.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'connect' node is missing 'addr' attribute\n"); + return 1; + } + out_socket.connect (addr); + n++; + } + + zmq::message_t msg; + while (true) { + in_socket.recv (&msg); + out_socket.send (msg); + } + + return 0; +} diff --git a/devices/zmq_streamer/Makefile.am b/devices/zmq_streamer/Makefile.am index e3681bf..9e0ac62 100644 --- a/devices/zmq_streamer/Makefile.am +++ b/devices/zmq_streamer/Makefile.am @@ -1,4 +1,4 @@ -INCLUDES = -I$(top_builddir)/bindings/c +INCLUDES = -I$(top_srcdir)/bindings/cpp -I$(top_srcdir)/bindings/c bin_PROGRAMS = zmq_streamer diff --git a/devices/zmq_streamer/zmq_streamer.cpp b/devices/zmq_streamer/zmq_streamer.cpp index 84e6569..c1b88ec 100644 --- a/devices/zmq_streamer/zmq_streamer.cpp +++ b/devices/zmq_streamer/zmq_streamer.cpp @@ -29,7 +29,7 @@ int main (int argc, char *argv []) XMLNode root = XMLNode::parseFile (argv [1]); if (root.isEmpty ()) { - fprintf (stderr, "configuration file not found\n"); + fprintf (stderr, "configuration file not found or not an XML file\n"); return 1; } diff --git a/src/Makefile.am b/src/Makefile.am index bbfa7f5..a733408 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -105,6 +105,8 @@ libzmq_la_SOURCES = app_thread.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ + xrep.hpp \ + xreq.hpp \ yarray.hpp \ yarray_item.hpp \ ypipe.hpp \ @@ -150,6 +152,8 @@ libzmq_la_SOURCES = app_thread.hpp \ thread.cpp \ upstream.cpp \ uuid.cpp \ + xrep.cpp \ + xreq.cpp \ ypollset.cpp \ zmq.cpp \ zmq_connecter.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index a671822..308fc36 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -45,6 +45,8 @@ #include "sub.hpp" #include "req.hpp" #include "rep.hpp" +#include "xreq.hpp" +#include "xrep.hpp" #include "upstream.hpp" #include "downstream.hpp" @@ -175,6 +177,12 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) case ZMQ_REP: s = new rep_t (this); break; + case ZMQ_XREQ: + s = new xreq_t (this); + break; + case ZMQ_XREP: + s = new xrep_t (this); + break; case ZMQ_UPSTREAM: s = new upstream_t (this); break; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 19fc0e2..69cb586 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -46,7 +46,6 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, write_pos (0), first_message_offset (-1) { - } int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) @@ -56,7 +55,6 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) void zmq::pgm_sender_t::plug (i_inout *inout_) { - // Alocate 2 fds for PGM socket. int downlink_socket_fd = 0; int uplink_socket_fd = 0; @@ -119,7 +117,6 @@ void zmq::pgm_sender_t::in_event () void zmq::pgm_sender_t::out_event () { - // POLLOUT event from send socket. If write buffer is empty, // try to read new data from the encoder. if (write_pos == write_size) { @@ -159,7 +156,6 @@ void zmq::pgm_sender_t::out_event () write_pos += nbytes; } - } size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_, diff --git a/src/xrep.cpp b/src/xrep.cpp new file mode 100644 index 0000000..7357967 --- /dev/null +++ b/src/xrep.cpp @@ -0,0 +1,95 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../bindings/c/zmq.h" + +#include "xrep.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::xrep_t::xrep_t (class app_thread_t *parent_) : + socket_base_t (parent_) +{ + options.requires_in = true; + options.requires_out = true; +} + +zmq::xrep_t::~xrep_t () +{ +} + +void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (false); +} + +void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::xrep_t::xkill (class reader_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::xrep_t::xrevive (class reader_t *pipe_) +{ + zmq_assert (false); +} + +int zmq::xrep_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + errno = EINVAL; + return -1; +} + +int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) +{ + zmq_assert (false); +} + +int zmq::xrep_t::xflush () +{ + zmq_assert (false); +} + +int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + zmq_assert (false); +} + +bool zmq::xrep_t::xhas_in () +{ + zmq_assert (false); +} + +bool zmq::xrep_t::xhas_out () +{ + zmq_assert (false); +} + + diff --git a/src/xrep.hpp b/src/xrep.hpp new file mode 100644 index 0000000..de42036 --- /dev/null +++ b/src/xrep.hpp @@ -0,0 +1,57 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_XREP_HPP_INCLUDED__ +#define __ZMQ_XREP_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + + class xrep_t : public socket_base_t + { + public: + + xrep_t (class app_thread_t *parent_); + ~xrep_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + xrep_t (const xrep_t&); + void operator = (const xrep_t&); + }; + +} + +#endif diff --git a/src/xreq.cpp b/src/xreq.cpp new file mode 100644 index 0000000..b404779 --- /dev/null +++ b/src/xreq.cpp @@ -0,0 +1,95 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../bindings/c/zmq.h" + +#include "xreq.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::xreq_t::xreq_t (class app_thread_t *parent_) : + socket_base_t (parent_) +{ + options.requires_in = true; + options.requires_out = true; +} + +zmq::xreq_t::~xreq_t () +{ +} + +void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (false); +} + +void zmq::xreq_t::xdetach_inpipe (class reader_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::xreq_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::xreq_t::xkill (class reader_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::xreq_t::xrevive (class reader_t *pipe_) +{ + zmq_assert (false); +} + +int zmq::xreq_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + errno = EINVAL; + return -1; +} + +int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) +{ + zmq_assert (false); +} + +int zmq::xreq_t::xflush () +{ + zmq_assert (false); +} + +int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + zmq_assert (false); +} + +bool zmq::xreq_t::xhas_in () +{ + zmq_assert (false); +} + +bool zmq::xreq_t::xhas_out () +{ + zmq_assert (false); +} + + diff --git a/src/xreq.hpp b/src/xreq.hpp new file mode 100644 index 0000000..8d6a3b2 --- /dev/null +++ b/src/xreq.hpp @@ -0,0 +1,57 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_XREQ_HPP_INCLUDED__ +#define __ZMQ_XREQ_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + + class xreq_t : public socket_base_t + { + public: + + xreq_t (class app_thread_t *parent_); + ~xreq_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + xreq_t (const xreq_t&); + void operator = (const xreq_t&); + }; + +} + +#endif -- cgit v1.2.3