diff options
-rw-r--r-- | include/zmq.h | 61 | ||||
-rw-r--r-- | src/Makefile.am | 8 | ||||
-rw-r--r-- | src/app_thread.cpp | 12 | ||||
-rw-r--r-- | src/pull.cpp (renamed from src/upstream.cpp) | 28 | ||||
-rw-r--r-- | src/pull.hpp (renamed from src/upstream.hpp) | 14 | ||||
-rw-r--r-- | src/push.cpp (renamed from src/downstream.cpp) | 28 | ||||
-rw-r--r-- | src/push.hpp (renamed from src/downstream.hpp) | 14 |
7 files changed, 84 insertions, 81 deletions
diff --git a/include/zmq.h b/include/zmq.h index 294b5c4..bce1215 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -152,33 +152,36 @@ ZMQ_EXPORT int zmq_term (void *context); /******************************************************************************/ /* Socket types. */ -#define ZMQ_PAIR 0 -#define ZMQ_PUB 1 -#define ZMQ_SUB 2 -#define ZMQ_REQ 3 -#define ZMQ_REP 4 -#define ZMQ_XREQ 5 -#define ZMQ_XREP 6 -#define ZMQ_UPSTREAM 7 -#define ZMQ_DOWNSTREAM 8 +#define ZMQ_PAIR 0 +#define ZMQ_PUB 1 +#define ZMQ_SUB 2 +#define ZMQ_REQ 3 +#define ZMQ_REP 4 +#define ZMQ_XREQ 5 +#define ZMQ_XREP 6 +#define ZMQ_PULL 7 +#define ZMQ_PUSH 8 +#define ZMQ_UPSTREAM ZMQ_PULL /* Old alias, remove in 3.x */ +#define ZMQ_DOWNSTREAM ZMQ_PUSH /* Old alias, remove in 3.x */ /* Socket options. */ -#define ZMQ_HWM 1 -#define ZMQ_SWAP 3 -#define ZMQ_AFFINITY 4 -#define ZMQ_IDENTITY 5 -#define ZMQ_SUBSCRIBE 6 -#define ZMQ_UNSUBSCRIBE 7 -#define ZMQ_RATE 8 +#define ZMQ_HWM 1 +/* ZMQ_LWM 2 no longer supported */ +#define ZMQ_SWAP 3 +#define ZMQ_AFFINITY 4 +#define ZMQ_IDENTITY 5 +#define ZMQ_SUBSCRIBE 6 +#define ZMQ_UNSUBSCRIBE 7 +#define ZMQ_RATE 8 #define ZMQ_RECOVERY_IVL 9 -#define ZMQ_MCAST_LOOP 10 -#define ZMQ_SNDBUF 11 -#define ZMQ_RCVBUF 12 -#define ZMQ_RCVMORE 13 +#define ZMQ_MCAST_LOOP 10 +#define ZMQ_SNDBUF 11 +#define ZMQ_RCVBUF 12 +#define ZMQ_RCVMORE 13 /* Send/recv options. */ -#define ZMQ_NOBLOCK 1 -#define ZMQ_SNDMORE 2 +#define ZMQ_NOBLOCK 1 +#define ZMQ_SNDMORE 2 ZMQ_EXPORT void *zmq_socket (void *context, int type); ZMQ_EXPORT int zmq_close (void *s); @@ -195,9 +198,9 @@ ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags); /* I/O multiplexing. */ /******************************************************************************/ -#define ZMQ_POLLIN 1 -#define ZMQ_POLLOUT 2 -#define ZMQ_POLLERR 4 +#define ZMQ_POLLIN 1 +#define ZMQ_POLLOUT 2 +#define ZMQ_POLLERR 4 typedef struct { @@ -214,12 +217,12 @@ typedef struct ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); /******************************************************************************/ -/* Devices - Experimental. */ +/* Devices */ /******************************************************************************/ -#define ZMQ_STREAMER 1 -#define ZMQ_FORWARDER 2 -#define ZMQ_QUEUE 3 +#define ZMQ_QUEUE 1 +#define ZMQ_FORWARDER 2 +#define ZMQ_STREAMER 3 ZMQ_EXPORT int zmq_device (int device, void * insocket, void* outsocket); diff --git a/src/Makefile.am b/src/Makefile.am index 977b655..19a80d0 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -58,7 +58,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ctx.hpp \ decoder.hpp \ devpoll.hpp \ - downstream.hpp \ + push.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ @@ -105,7 +105,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ - upstream.hpp \ + pull.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ @@ -125,7 +125,7 @@ libzmq_la_SOURCES = app_thread.hpp \ command.cpp \ ctx.cpp \ devpoll.cpp \ - downstream.cpp \ + push.cpp \ epoll.cpp \ err.cpp \ forwarder.cpp \ @@ -160,7 +160,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ - upstream.cpp \ + pull.cpp \ uuid.cpp \ xrep.cpp \ xreq.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index fbf034c..ac59464 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -46,8 +46,8 @@ #include "rep.hpp" #include "xreq.hpp" #include "xrep.hpp" -#include "upstream.hpp" -#include "downstream.hpp" +#include "pull.hpp" +#include "push.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -157,11 +157,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) case ZMQ_XREP: s = new (std::nothrow) xrep_t (this); break; - case ZMQ_UPSTREAM: - s = new (std::nothrow) upstream_t (this); + case ZMQ_PULL: + s = new (std::nothrow) pull_t (this); break; - case ZMQ_DOWNSTREAM: - s = new (std::nothrow) downstream_t (this); + case ZMQ_PUSH: + s = new (std::nothrow) push_t (this); break; default: if (sockets.empty ()) diff --git a/src/upstream.cpp b/src/pull.cpp index 1498c31..b2413ee 100644 --- a/src/upstream.cpp +++ b/src/pull.cpp @@ -19,55 +19,55 @@ #include "../include/zmq.h" -#include "upstream.hpp" +#include "pull.hpp" #include "err.hpp" -zmq::upstream_t::upstream_t (class app_thread_t *parent_) : +zmq::pull_t::pull_t (class app_thread_t *parent_) : socket_base_t (parent_) { options.requires_in = true; options.requires_out = false; } -zmq::upstream_t::~upstream_t () +zmq::pull_t::~pull_t () { } -void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_, +void zmq::pull_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (inpipe_ && !outpipe_); fq.attach (inpipe_); } -void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_) +void zmq::pull_t::xdetach_inpipe (class reader_t *pipe_) { zmq_assert (pipe_); fq.detach (pipe_); } -void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_) +void zmq::pull_t::xdetach_outpipe (class writer_t *pipe_) { // There are no outpipes, so this function shouldn't be called at all. zmq_assert (false); } -void zmq::upstream_t::xkill (class reader_t *pipe_) +void zmq::pull_t::xkill (class reader_t *pipe_) { fq.kill (pipe_); } -void zmq::upstream_t::xrevive (class reader_t *pipe_) +void zmq::pull_t::xrevive (class reader_t *pipe_) { fq.revive (pipe_); } -void zmq::upstream_t::xrevive (class writer_t *pipe_) +void zmq::pull_t::xrevive (class writer_t *pipe_) { zmq_assert (false); } -int zmq::upstream_t::xsetsockopt (int option_, const void *optval_, +int zmq::pull_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { // No special options for this socket type. @@ -75,23 +75,23 @@ int zmq::upstream_t::xsetsockopt (int option_, const void *optval_, return -1; } -int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_) +int zmq::pull_t::xsend (zmq_msg_t *msg_, int flags_) { errno = ENOTSUP; return -1; } -int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_) +int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_) { return fq.recv (msg_, flags_); } -bool zmq::upstream_t::xhas_in () +bool zmq::pull_t::xhas_in () { return fq.has_in (); } -bool zmq::upstream_t::xhas_out () +bool zmq::pull_t::xhas_out () { return false; } diff --git a/src/upstream.hpp b/src/pull.hpp index 5fe42ae..7f249e9 100644 --- a/src/upstream.hpp +++ b/src/pull.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__ -#define __ZMQ_UPSTREAM_HPP_INCLUDED__ +#ifndef __ZMQ_PULL_HPP_INCLUDED__ +#define __ZMQ_PULL_HPP_INCLUDED__ #include "socket_base.hpp" #include "fq.hpp" @@ -26,12 +26,12 @@ namespace zmq { - class upstream_t : public socket_base_t + class pull_t : public socket_base_t { public: - upstream_t (class app_thread_t *parent_); - ~upstream_t (); + pull_t (class app_thread_t *parent_); + ~pull_t (); // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, @@ -52,8 +52,8 @@ namespace zmq // Fair queueing object for inbound pipes. fq_t fq; - upstream_t (const upstream_t&); - void operator = (const upstream_t&); + pull_t (const pull_t&); + void operator = (const pull_t&); }; diff --git a/src/downstream.cpp b/src/push.cpp index 4074a9e..522101f 100644 --- a/src/downstream.cpp +++ b/src/push.cpp @@ -19,58 +19,58 @@ #include "../include/zmq.h" -#include "downstream.hpp" +#include "push.hpp" #include "err.hpp" #include "pipe.hpp" -zmq::downstream_t::downstream_t (class app_thread_t *parent_) : +zmq::push_t::push_t (class app_thread_t *parent_) : socket_base_t (parent_) { options.requires_in = false; options.requires_out = true; } -zmq::downstream_t::~downstream_t () +zmq::push_t::~push_t () { } -void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_, +void zmq::push_t::xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, const blob_t &peer_identity_) { zmq_assert (!inpipe_ && outpipe_); lb.attach (outpipe_); } -void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_) +void zmq::push_t::xdetach_inpipe (class reader_t *pipe_) { // There are no inpipes, so this function shouldn't be called at all. zmq_assert (false); } -void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_) +void zmq::push_t::xdetach_outpipe (class writer_t *pipe_) { zmq_assert (pipe_); lb.detach (pipe_); } -void zmq::downstream_t::xkill (class reader_t *pipe_) +void zmq::push_t::xkill (class reader_t *pipe_) { // There are no inpipes, so this function shouldn't be called at all. zmq_assert (false); } -void zmq::downstream_t::xrevive (class reader_t *pipe_) +void zmq::push_t::xrevive (class reader_t *pipe_) { // There are no inpipes, so this function shouldn't be called at all. zmq_assert (false); } -void zmq::downstream_t::xrevive (class writer_t *pipe_) +void zmq::push_t::xrevive (class writer_t *pipe_) { lb.revive (pipe_); } -int zmq::downstream_t::xsetsockopt (int option_, const void *optval_, +int zmq::push_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { // No special option for this socket type. @@ -78,23 +78,23 @@ int zmq::downstream_t::xsetsockopt (int option_, const void *optval_, return -1; } -int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_) +int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_) { return lb.send (msg_, flags_); } -int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_) +int zmq::push_t::xrecv (zmq_msg_t *msg_, int flags_) { errno = ENOTSUP; return -1; } -bool zmq::downstream_t::xhas_in () +bool zmq::push_t::xhas_in () { return false; } -bool zmq::downstream_t::xhas_out () +bool zmq::push_t::xhas_out () { return lb.has_out (); } diff --git a/src/downstream.hpp b/src/push.hpp index 1306743..b3c8d87 100644 --- a/src/downstream.hpp +++ b/src/push.hpp @@ -17,8 +17,8 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__ -#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__ +#ifndef __ZMQ_PUSH_HPP_INCLUDED__ +#define __ZMQ_PUSH_HPP_INCLUDED__ #include "socket_base.hpp" #include "lb.hpp" @@ -26,12 +26,12 @@ namespace zmq { - class downstream_t : public socket_base_t + class push_t : public socket_base_t { public: - downstream_t (class app_thread_t *parent_); - ~downstream_t (); + push_t (class app_thread_t *parent_); + ~push_t (); // Overloads of functions from socket_base_t. void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, @@ -52,8 +52,8 @@ namespace zmq // Load balancer managing the outbound pipes. lb_t lb; - downstream_t (const downstream_t&); - void operator = (const downstream_t&); + push_t (const push_t&); + void operator = (const push_t&); }; } |