From 11a410b65827a3958fb5f417c29e95c1953a0b42 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Wed, 4 Aug 2010 14:38:56 +0200 Subject: Renamed ZMQ_UPSTREAM to ZMQ_PULL, and ZMQ_DOWNSTREAM to ZMQ_PUSH. Left the old definitions as aliases, to be removed in release 3.0. Also renamed the source files implementing these two socket types. This change does not break existing applications nor bindings, but allows us to fix the documentation and user guide now, rather than keeping the old (confusing) names. --- include/zmq.h | 61 +++++++++++++++++--------------- src/Makefile.am | 8 ++--- src/app_thread.cpp | 12 +++---- src/downstream.cpp | 101 ----------------------------------------------------- src/downstream.hpp | 61 -------------------------------- src/pull.cpp | 98 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/pull.hpp | 62 ++++++++++++++++++++++++++++++++ src/push.cpp | 101 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/push.hpp | 61 ++++++++++++++++++++++++++++++++ src/upstream.cpp | 98 --------------------------------------------------- src/upstream.hpp | 62 -------------------------------- 11 files changed, 364 insertions(+), 361 deletions(-) delete mode 100644 src/downstream.cpp delete mode 100644 src/downstream.hpp create mode 100644 src/pull.cpp create mode 100644 src/pull.hpp create mode 100644 src/push.cpp create mode 100644 src/push.hpp delete mode 100644 src/upstream.cpp delete mode 100644 src/upstream.hpp diff --git a/include/zmq.h b/include/zmq.h index 294b5c4..bce1215 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -152,33 +152,36 @@ ZMQ_EXPORT int zmq_term (void *context); /******************************************************************************/ /* Socket types. */ -#define ZMQ_PAIR 0 -#define ZMQ_PUB 1 -#define ZMQ_SUB 2 -#define ZMQ_REQ 3 -#define ZMQ_REP 4 -#define ZMQ_XREQ 5 -#define ZMQ_XREP 6 -#define ZMQ_UPSTREAM 7 -#define ZMQ_DOWNSTREAM 8 +#define ZMQ_PAIR 0 +#define ZMQ_PUB 1 +#define ZMQ_SUB 2 +#define ZMQ_REQ 3 +#define ZMQ_REP 4 +#define ZMQ_XREQ 5 +#define ZMQ_XREP 6 +#define ZMQ_PULL 7 +#define ZMQ_PUSH 8 +#define ZMQ_UPSTREAM ZMQ_PULL /* Old alias, remove in 3.x */ +#define ZMQ_DOWNSTREAM ZMQ_PUSH /* Old alias, remove in 3.x */ /* Socket options. */ -#define ZMQ_HWM 1 -#define ZMQ_SWAP 3 -#define ZMQ_AFFINITY 4 -#define ZMQ_IDENTITY 5 -#define ZMQ_SUBSCRIBE 6 -#define ZMQ_UNSUBSCRIBE 7 -#define ZMQ_RATE 8 +#define ZMQ_HWM 1 +/* ZMQ_LWM 2 no longer supported */ +#define ZMQ_SWAP 3 +#define ZMQ_AFFINITY 4 +#define ZMQ_IDENTITY 5 +#define ZMQ_SUBSCRIBE 6 +#define ZMQ_UNSUBSCRIBE 7 +#define ZMQ_RATE 8 #define ZMQ_RECOVERY_IVL 9 -#define ZMQ_MCAST_LOOP 10 -#define ZMQ_SNDBUF 11 -#define ZMQ_RCVBUF 12 -#define ZMQ_RCVMORE 13 +#define ZMQ_MCAST_LOOP 10 +#define ZMQ_SNDBUF 11 +#define ZMQ_RCVBUF 12 +#define ZMQ_RCVMORE 13 /* Send/recv options. */ -#define ZMQ_NOBLOCK 1 -#define ZMQ_SNDMORE 2 +#define ZMQ_NOBLOCK 1 +#define ZMQ_SNDMORE 2 ZMQ_EXPORT void *zmq_socket (void *context, int type); ZMQ_EXPORT int zmq_close (void *s); @@ -195,9 +198,9 @@ ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags); /* I/O multiplexing. */ /******************************************************************************/ -#define ZMQ_POLLIN 1 -#define ZMQ_POLLOUT 2 -#define ZMQ_POLLERR 4 +#define ZMQ_POLLIN 1 +#define ZMQ_POLLOUT 2 +#define ZMQ_POLLERR 4 typedef struct { @@ -214,12 +217,12 @@ typedef struct ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); /******************************************************************************/ -/* Devices - Experimental. */ +/* Devices */ /******************************************************************************/ -#define ZMQ_STREAMER 1 -#define ZMQ_FORWARDER 2 -#define ZMQ_QUEUE 3 +#define ZMQ_QUEUE 1 +#define ZMQ_FORWARDER 2 +#define ZMQ_STREAMER 3 ZMQ_EXPORT int zmq_device (int device, void * insocket, void* outsocket); diff --git a/src/Makefile.am b/src/Makefile.am index 977b655..19a80d0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -58,7 +58,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ctx.hpp \ decoder.hpp \ devpoll.hpp \ - downstream.hpp \ + push.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ @@ -105,7 +105,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ - upstream.hpp \ + pull.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ @@ -125,7 +125,7 @@ libzmq_la_SOURCES = app_thread.hpp \ command.cpp \ ctx.cpp \ devpoll.cpp \ - downstream.cpp \ + push.cpp \ epoll.cpp \ err.cpp \ forwarder.cpp \ @@ -160,7 +160,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ - upstream.cpp \ + pull.cpp \ uuid.cpp \ xrep.cpp \ xreq.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index fbf034c..ac59464 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -46,8 +46,8 @@ #include "rep.hpp" #include "xreq.hpp" #include "xrep.hpp" -#include "upstream.hpp" -#include "downstream.hpp" +#include "pull.hpp" +#include "push.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -157,11 +157,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) case ZMQ_XREP: s = new (std::nothrow) xrep_t (this); break; - case ZMQ_UPSTREAM: - s = new (std::nothrow) upstream_t (this); + case ZMQ_PULL: + s = new (std::nothrow) pull_t (this); break; - case ZMQ_DOWNSTREAM: - s = new (std::nothrow) downstream_t (this); + case ZMQ_PUSH: + s = new (std::nothrow) push_t (this); break; default: if (sockets.empty ()) diff --git a/src/downstream.cpp b/src/downstream.cpp deleted file mode 100644 index 4074a9e..0000000 --- a/src/downstream.cpp +++ /dev/null @@ -1,101 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "downstream.hpp" -#include "err.hpp" -#include "pipe.hpp" - -zmq::downstream_t::downstream_t (class app_thread_t *parent_) : - socket_base_t (parent_) -{ - options.requires_in = false; - options.requires_out = true; -} - -zmq::downstream_t::~downstream_t () -{ -} - -void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) -{ - zmq_assert (!inpipe_ && outpipe_); - lb.attach (outpipe_); -} - -void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_) -{ - // There are no inpipes, so this function shouldn't be called at all. - zmq_assert (false); -} - -void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_) -{ - zmq_assert (pipe_); - lb.detach (pipe_); -} - -void zmq::downstream_t::xkill (class reader_t *pipe_) -{ - // There are no inpipes, so this function shouldn't be called at all. - zmq_assert (false); -} - -void zmq::downstream_t::xrevive (class reader_t *pipe_) -{ - // There are no inpipes, so this function shouldn't be called at all. - zmq_assert (false); -} - -void zmq::downstream_t::xrevive (class writer_t *pipe_) -{ - lb.revive (pipe_); -} - -int zmq::downstream_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - // No special option for this socket type. - errno = EINVAL; - return -1; -} - -int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_) -{ - return lb.send (msg_, flags_); -} - -int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_) -{ - errno = ENOTSUP; - return -1; -} - -bool zmq::downstream_t::xhas_in () -{ - return false; -} - -bool zmq::downstream_t::xhas_out () -{ - return lb.has_out (); -} - diff --git a/src/downstream.hpp b/src/downstream.hpp deleted file mode 100644 index 1306743..0000000 --- a/src/downstream.hpp +++ /dev/null @@ -1,61 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__ -#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__ - -#include "socket_base.hpp" -#include "lb.hpp" - -namespace zmq -{ - - class downstream_t : public socket_base_t - { - public: - - downstream_t (class app_thread_t *parent_); - ~downstream_t (); - - // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, - const blob_t &peer_identity_); - void xdetach_inpipe (class reader_t *pipe_); - void xdetach_outpipe (class writer_t *pipe_); - void xkill (class reader_t *pipe_); - void xrevive (class reader_t *pipe_); - void xrevive (class writer_t *pipe_); - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); - int xsend (zmq_msg_t *msg_, int flags_); - int xrecv (zmq_msg_t *msg_, int flags_); - bool xhas_in (); - bool xhas_out (); - - private: - - // Load balancer managing the outbound pipes. - lb_t lb; - - downstream_t (const downstream_t&); - void operator = (const downstream_t&); - }; - -} - -#endif diff --git a/src/pull.cpp b/src/pull.cpp new file mode 100644 index 0000000..b2413ee --- /dev/null +++ b/src/pull.cpp @@ -0,0 +1,98 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../include/zmq.h" + +#include "pull.hpp" +#include "err.hpp" + +zmq::pull_t::pull_t (class app_thread_t *parent_) : + socket_base_t (parent_) +{ + options.requires_in = true; + options.requires_out = false; +} + +zmq::pull_t::~pull_t () +{ +} + +void zmq::pull_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_, const blob_t &peer_identity_) +{ + zmq_assert (inpipe_ && !outpipe_); + fq.attach (inpipe_); +} + +void zmq::pull_t::xdetach_inpipe (class reader_t *pipe_) +{ + zmq_assert (pipe_); + fq.detach (pipe_); +} + +void zmq::pull_t::xdetach_outpipe (class writer_t *pipe_) +{ + // There are no outpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::pull_t::xkill (class reader_t *pipe_) +{ + fq.kill (pipe_); +} + +void zmq::pull_t::xrevive (class reader_t *pipe_) +{ + fq.revive (pipe_); +} + +void zmq::pull_t::xrevive (class writer_t *pipe_) +{ + zmq_assert (false); +} + +int zmq::pull_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + // No special options for this socket type. + errno = EINVAL; + return -1; +} + +int zmq::pull_t::xsend (zmq_msg_t *msg_, int flags_) +{ + errno = ENOTSUP; + return -1; +} + +int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + return fq.recv (msg_, flags_); +} + +bool zmq::pull_t::xhas_in () +{ + return fq.has_in (); +} + +bool zmq::pull_t::xhas_out () +{ + return false; +} + diff --git a/src/pull.hpp b/src/pull.hpp new file mode 100644 index 0000000..7f249e9 --- /dev/null +++ b/src/pull.hpp @@ -0,0 +1,62 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_PULL_HPP_INCLUDED__ +#define __ZMQ_PULL_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "fq.hpp" + +namespace zmq +{ + + class pull_t : public socket_base_t + { + public: + + pull_t (class app_thread_t *parent_); + ~pull_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + void xrevive (class writer_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // Fair queueing object for inbound pipes. + fq_t fq; + + pull_t (const pull_t&); + void operator = (const pull_t&); + + }; + +} + +#endif diff --git a/src/push.cpp b/src/push.cpp new file mode 100644 index 0000000..522101f --- /dev/null +++ b/src/push.cpp @@ -0,0 +1,101 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../include/zmq.h" + +#include "push.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::push_t::push_t (class app_thread_t *parent_) : + socket_base_t (parent_) +{ + options.requires_in = false; + options.requires_out = true; +} + +zmq::push_t::~push_t () +{ +} + +void zmq::push_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_, const blob_t &peer_identity_) +{ + zmq_assert (!inpipe_ && outpipe_); + lb.attach (outpipe_); +} + +void zmq::push_t::xdetach_inpipe (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::push_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (pipe_); + lb.detach (pipe_); +} + +void zmq::push_t::xkill (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::push_t::xrevive (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::push_t::xrevive (class writer_t *pipe_) +{ + lb.revive (pipe_); +} + +int zmq::push_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + // No special option for this socket type. + errno = EINVAL; + return -1; +} + +int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_) +{ + return lb.send (msg_, flags_); +} + +int zmq::push_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + errno = ENOTSUP; + return -1; +} + +bool zmq::push_t::xhas_in () +{ + return false; +} + +bool zmq::push_t::xhas_out () +{ + return lb.has_out (); +} + diff --git a/src/push.hpp b/src/push.hpp new file mode 100644 index 0000000..b3c8d87 --- /dev/null +++ b/src/push.hpp @@ -0,0 +1,61 @@ +/* + Copyright (c) 2007-2010 iMatix Corporation + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_PUSH_HPP_INCLUDED__ +#define __ZMQ_PUSH_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "lb.hpp" + +namespace zmq +{ + + class push_t : public socket_base_t + { + public: + + push_t (class app_thread_t *parent_); + ~push_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, + const blob_t &peer_identity_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + void xrevive (class writer_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // Load balancer managing the outbound pipes. + lb_t lb; + + push_t (const push_t&); + void operator = (const push_t&); + }; + +} + +#endif diff --git a/src/upstream.cpp b/src/upstream.cpp deleted file mode 100644 index 1498c31..0000000 --- a/src/upstream.cpp +++ /dev/null @@ -1,98 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#include "../include/zmq.h" - -#include "upstream.hpp" -#include "err.hpp" - -zmq::upstream_t::upstream_t (class app_thread_t *parent_) : - socket_base_t (parent_) -{ - options.requires_in = true; - options.requires_out = false; -} - -zmq::upstream_t::~upstream_t () -{ -} - -void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_, const blob_t &peer_identity_) -{ - zmq_assert (inpipe_ && !outpipe_); - fq.attach (inpipe_); -} - -void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_) -{ - zmq_assert (pipe_); - fq.detach (pipe_); -} - -void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_) -{ - // There are no outpipes, so this function shouldn't be called at all. - zmq_assert (false); -} - -void zmq::upstream_t::xkill (class reader_t *pipe_) -{ - fq.kill (pipe_); -} - -void zmq::upstream_t::xrevive (class reader_t *pipe_) -{ - fq.revive (pipe_); -} - -void zmq::upstream_t::xrevive (class writer_t *pipe_) -{ - zmq_assert (false); -} - -int zmq::upstream_t::xsetsockopt (int option_, const void *optval_, - size_t optvallen_) -{ - // No special options for this socket type. - errno = EINVAL; - return -1; -} - -int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_) -{ - errno = ENOTSUP; - return -1; -} - -int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_) -{ - return fq.recv (msg_, flags_); -} - -bool zmq::upstream_t::xhas_in () -{ - return fq.has_in (); -} - -bool zmq::upstream_t::xhas_out () -{ - return false; -} - diff --git a/src/upstream.hpp b/src/upstream.hpp deleted file mode 100644 index 5fe42ae..0000000 --- a/src/upstream.hpp +++ /dev/null @@ -1,62 +0,0 @@ -/* - Copyright (c) 2007-2010 iMatix Corporation - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the Lesser GNU General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - Lesser GNU General Public License for more details. - - You should have received a copy of the Lesser GNU General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__ -#define __ZMQ_UPSTREAM_HPP_INCLUDED__ - -#include "socket_base.hpp" -#include "fq.hpp" - -namespace zmq -{ - - class upstream_t : public socket_base_t - { - public: - - upstream_t (class app_thread_t *parent_); - ~upstream_t (); - - // Overloads of functions from socket_base_t. - void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, - const blob_t &peer_identity_); - void xdetach_inpipe (class reader_t *pipe_); - void xdetach_outpipe (class writer_t *pipe_); - void xkill (class reader_t *pipe_); - void xrevive (class reader_t *pipe_); - void xrevive (class writer_t *pipe_); - int xsetsockopt (int option_, const void *optval_, size_t optvallen_); - int xsend (zmq_msg_t *msg_, int flags_); - int xrecv (zmq_msg_t *msg_, int flags_); - bool xhas_in (); - bool xhas_out (); - - private: - - // Fair queueing object for inbound pipes. - fq_t fq; - - upstream_t (const upstream_t&); - void operator = (const upstream_t&); - - }; - -} - -#endif -- cgit v1.2.3