From fa6bf24d8030b0e54fd25b167064482e4cf08a36 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 13 Dec 2009 14:45:23 +0100 Subject: XREP & XREQ socket types added; zmq_queue device added --- src/Makefile.am | 4 +++ src/app_thread.cpp | 8 +++++ src/pgm_sender.cpp | 4 --- src/xrep.cpp | 95 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/xrep.hpp | 57 ++++++++++++++++++++++++++++++++ src/xreq.cpp | 95 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/xreq.hpp | 57 ++++++++++++++++++++++++++++++++ 7 files changed, 316 insertions(+), 4 deletions(-) create mode 100644 src/xrep.cpp create mode 100644 src/xrep.hpp create mode 100644 src/xreq.cpp create mode 100644 src/xreq.hpp (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index bbfa7f5..a733408 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -105,6 +105,8 @@ libzmq_la_SOURCES = app_thread.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ + xrep.hpp \ + xreq.hpp \ yarray.hpp \ yarray_item.hpp \ ypipe.hpp \ @@ -150,6 +152,8 @@ libzmq_la_SOURCES = app_thread.hpp \ thread.cpp \ upstream.cpp \ uuid.cpp \ + xrep.cpp \ + xreq.cpp \ ypollset.cpp \ zmq.cpp \ zmq_connecter.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index a671822..308fc36 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -45,6 +45,8 @@ #include "sub.hpp" #include "req.hpp" #include "rep.hpp" +#include "xreq.hpp" +#include "xrep.hpp" #include "upstream.hpp" #include "downstream.hpp" @@ -175,6 +177,12 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) case ZMQ_REP: s = new rep_t (this); break; + case ZMQ_XREQ: + s = new xreq_t (this); + break; + case ZMQ_XREP: + s = new xrep_t (this); + break; case ZMQ_UPSTREAM: s = new upstream_t (this); break; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 19fc0e2..69cb586 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -46,7 +46,6 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, write_pos (0), first_message_offset (-1) { - } int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) @@ -56,7 +55,6 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) void zmq::pgm_sender_t::plug (i_inout *inout_) { - // Alocate 2 fds for PGM socket. int downlink_socket_fd = 0; int uplink_socket_fd = 0; @@ -119,7 +117,6 @@ void zmq::pgm_sender_t::in_event () void zmq::pgm_sender_t::out_event () { - // POLLOUT event from send socket. If write buffer is empty, // try to read new data from the encoder. if (write_pos == write_size) { @@ -159,7 +156,6 @@ void zmq::pgm_sender_t::out_event () write_pos += nbytes; } - } size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_, diff --git a/src/xrep.cpp b/src/xrep.cpp new file mode 100644 index 0000000..7357967 --- /dev/null +++ b/src/xrep.cpp @@ -0,0 +1,95 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../bindings/c/zmq.h" + +#include "xrep.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::xrep_t::xrep_t (class app_thread_t *parent_) : + socket_base_t (parent_) +{ + options.requires_in = true; + options.requires_out = true; +} + +zmq::xrep_t::~xrep_t () +{ +} + +void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (false); +} + +void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::xrep_t::xkill (class reader_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::xrep_t::xrevive (class reader_t *pipe_) +{ + zmq_assert (false); +} + +int zmq::xrep_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + errno = EINVAL; + return -1; +} + +int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) +{ + zmq_assert (false); +} + +int zmq::xrep_t::xflush () +{ + zmq_assert (false); +} + +int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + zmq_assert (false); +} + +bool zmq::xrep_t::xhas_in () +{ + zmq_assert (false); +} + +bool zmq::xrep_t::xhas_out () +{ + zmq_assert (false); +} + + diff --git a/src/xrep.hpp b/src/xrep.hpp new file mode 100644 index 0000000..de42036 --- /dev/null +++ b/src/xrep.hpp @@ -0,0 +1,57 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_XREP_HPP_INCLUDED__ +#define __ZMQ_XREP_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + + class xrep_t : public socket_base_t + { + public: + + xrep_t (class app_thread_t *parent_); + ~xrep_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + xrep_t (const xrep_t&); + void operator = (const xrep_t&); + }; + +} + +#endif diff --git a/src/xreq.cpp b/src/xreq.cpp new file mode 100644 index 0000000..b404779 --- /dev/null +++ b/src/xreq.cpp @@ -0,0 +1,95 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../bindings/c/zmq.h" + +#include "xreq.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::xreq_t::xreq_t (class app_thread_t *parent_) : + socket_base_t (parent_) +{ + options.requires_in = true; + options.requires_out = true; +} + +zmq::xreq_t::~xreq_t () +{ +} + +void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (false); +} + +void zmq::xreq_t::xdetach_inpipe (class reader_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::xreq_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::xreq_t::xkill (class reader_t *pipe_) +{ + zmq_assert (false); +} + +void zmq::xreq_t::xrevive (class reader_t *pipe_) +{ + zmq_assert (false); +} + +int zmq::xreq_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + errno = EINVAL; + return -1; +} + +int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) +{ + zmq_assert (false); +} + +int zmq::xreq_t::xflush () +{ + zmq_assert (false); +} + +int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + zmq_assert (false); +} + +bool zmq::xreq_t::xhas_in () +{ + zmq_assert (false); +} + +bool zmq::xreq_t::xhas_out () +{ + zmq_assert (false); +} + + diff --git a/src/xreq.hpp b/src/xreq.hpp new file mode 100644 index 0000000..8d6a3b2 --- /dev/null +++ b/src/xreq.hpp @@ -0,0 +1,57 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_XREQ_HPP_INCLUDED__ +#define __ZMQ_XREQ_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + + class xreq_t : public socket_base_t + { + public: + + xreq_t (class app_thread_t *parent_); + ~xreq_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + xreq_t (const xreq_t&); + void operator = (const xreq_t&); + }; + +} + +#endif -- cgit v1.2.3