summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/zmq.h61
-rw-r--r--src/Makefile.am8
-rw-r--r--src/app_thread.cpp12
-rw-r--r--src/pull.cpp (renamed from src/upstream.cpp)28
-rw-r--r--src/pull.hpp (renamed from src/upstream.hpp)14
-rw-r--r--src/push.cpp (renamed from src/downstream.cpp)28
-rw-r--r--src/push.hpp (renamed from src/downstream.hpp)14
7 files changed, 84 insertions, 81 deletions
diff --git a/include/zmq.h b/include/zmq.h
index 294b5c4..bce1215 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -152,33 +152,36 @@ ZMQ_EXPORT int zmq_term (void *context);
/******************************************************************************/
/* Socket types. */
-#define ZMQ_PAIR 0
-#define ZMQ_PUB 1
-#define ZMQ_SUB 2
-#define ZMQ_REQ 3
-#define ZMQ_REP 4
-#define ZMQ_XREQ 5
-#define ZMQ_XREP 6
-#define ZMQ_UPSTREAM 7
-#define ZMQ_DOWNSTREAM 8
+#define ZMQ_PAIR 0
+#define ZMQ_PUB 1
+#define ZMQ_SUB 2
+#define ZMQ_REQ 3
+#define ZMQ_REP 4
+#define ZMQ_XREQ 5
+#define ZMQ_XREP 6
+#define ZMQ_PULL 7
+#define ZMQ_PUSH 8
+#define ZMQ_UPSTREAM ZMQ_PULL /* Old alias, remove in 3.x */
+#define ZMQ_DOWNSTREAM ZMQ_PUSH /* Old alias, remove in 3.x */
/* Socket options. */
-#define ZMQ_HWM 1
-#define ZMQ_SWAP 3
-#define ZMQ_AFFINITY 4
-#define ZMQ_IDENTITY 5
-#define ZMQ_SUBSCRIBE 6
-#define ZMQ_UNSUBSCRIBE 7
-#define ZMQ_RATE 8
+#define ZMQ_HWM 1
+/* ZMQ_LWM 2 no longer supported */
+#define ZMQ_SWAP 3
+#define ZMQ_AFFINITY 4
+#define ZMQ_IDENTITY 5
+#define ZMQ_SUBSCRIBE 6
+#define ZMQ_UNSUBSCRIBE 7
+#define ZMQ_RATE 8
#define ZMQ_RECOVERY_IVL 9
-#define ZMQ_MCAST_LOOP 10
-#define ZMQ_SNDBUF 11
-#define ZMQ_RCVBUF 12
-#define ZMQ_RCVMORE 13
+#define ZMQ_MCAST_LOOP 10
+#define ZMQ_SNDBUF 11
+#define ZMQ_RCVBUF 12
+#define ZMQ_RCVMORE 13
/* Send/recv options. */
-#define ZMQ_NOBLOCK 1
-#define ZMQ_SNDMORE 2
+#define ZMQ_NOBLOCK 1
+#define ZMQ_SNDMORE 2
ZMQ_EXPORT void *zmq_socket (void *context, int type);
ZMQ_EXPORT int zmq_close (void *s);
@@ -195,9 +198,9 @@ ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags);
/* I/O multiplexing. */
/******************************************************************************/
-#define ZMQ_POLLIN 1
-#define ZMQ_POLLOUT 2
-#define ZMQ_POLLERR 4
+#define ZMQ_POLLIN 1
+#define ZMQ_POLLOUT 2
+#define ZMQ_POLLERR 4
typedef struct
{
@@ -214,12 +217,12 @@ typedef struct
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
/******************************************************************************/
-/* Devices - Experimental. */
+/* Devices */
/******************************************************************************/
-#define ZMQ_STREAMER 1
-#define ZMQ_FORWARDER 2
-#define ZMQ_QUEUE 3
+#define ZMQ_QUEUE 1
+#define ZMQ_FORWARDER 2
+#define ZMQ_STREAMER 3
ZMQ_EXPORT int zmq_device (int device, void * insocket, void* outsocket);
diff --git a/src/Makefile.am b/src/Makefile.am
index 977b655..19a80d0 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -58,7 +58,7 @@ libzmq_la_SOURCES = app_thread.hpp \
ctx.hpp \
decoder.hpp \
devpoll.hpp \
- downstream.hpp \
+ push.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
@@ -105,7 +105,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
- upstream.hpp \
+ pull.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
@@ -125,7 +125,7 @@ libzmq_la_SOURCES = app_thread.hpp \
command.cpp \
ctx.cpp \
devpoll.cpp \
- downstream.cpp \
+ push.cpp \
epoll.cpp \
err.cpp \
forwarder.cpp \
@@ -160,7 +160,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
- upstream.cpp \
+ pull.cpp \
uuid.cpp \
xrep.cpp \
xreq.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index fbf034c..ac59464 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -46,8 +46,8 @@
#include "rep.hpp"
#include "xreq.hpp"
#include "xrep.hpp"
-#include "upstream.hpp"
-#include "downstream.hpp"
+#include "pull.hpp"
+#include "push.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@@ -157,11 +157,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
case ZMQ_XREP:
s = new (std::nothrow) xrep_t (this);
break;
- case ZMQ_UPSTREAM:
- s = new (std::nothrow) upstream_t (this);
+ case ZMQ_PULL:
+ s = new (std::nothrow) pull_t (this);
break;
- case ZMQ_DOWNSTREAM:
- s = new (std::nothrow) downstream_t (this);
+ case ZMQ_PUSH:
+ s = new (std::nothrow) push_t (this);
break;
default:
if (sockets.empty ())
diff --git a/src/upstream.cpp b/src/pull.cpp
index 1498c31..b2413ee 100644
--- a/src/upstream.cpp
+++ b/src/pull.cpp
@@ -19,55 +19,55 @@
#include "../include/zmq.h"
-#include "upstream.hpp"
+#include "pull.hpp"
#include "err.hpp"
-zmq::upstream_t::upstream_t (class app_thread_t *parent_) :
+zmq::pull_t::pull_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
options.requires_in = true;
options.requires_out = false;
}
-zmq::upstream_t::~upstream_t ()
+zmq::pull_t::~pull_t ()
{
}
-void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_,
+void zmq::pull_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && !outpipe_);
fq.attach (inpipe_);
}
-void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_)
+void zmq::pull_t::xdetach_inpipe (class reader_t *pipe_)
{
zmq_assert (pipe_);
fq.detach (pipe_);
}
-void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_)
+void zmq::pull_t::xdetach_outpipe (class writer_t *pipe_)
{
// There are no outpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
-void zmq::upstream_t::xkill (class reader_t *pipe_)
+void zmq::pull_t::xkill (class reader_t *pipe_)
{
fq.kill (pipe_);
}
-void zmq::upstream_t::xrevive (class reader_t *pipe_)
+void zmq::pull_t::xrevive (class reader_t *pipe_)
{
fq.revive (pipe_);
}
-void zmq::upstream_t::xrevive (class writer_t *pipe_)
+void zmq::pull_t::xrevive (class writer_t *pipe_)
{
zmq_assert (false);
}
-int zmq::upstream_t::xsetsockopt (int option_, const void *optval_,
+int zmq::pull_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
// No special options for this socket type.
@@ -75,23 +75,23 @@ int zmq::upstream_t::xsetsockopt (int option_, const void *optval_,
return -1;
}
-int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_)
+int zmq::pull_t::xsend (zmq_msg_t *msg_, int flags_)
{
errno = ENOTSUP;
return -1;
}
-int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_)
+int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_)
{
return fq.recv (msg_, flags_);
}
-bool zmq::upstream_t::xhas_in ()
+bool zmq::pull_t::xhas_in ()
{
return fq.has_in ();
}
-bool zmq::upstream_t::xhas_out ()
+bool zmq::pull_t::xhas_out ()
{
return false;
}
diff --git a/src/upstream.hpp b/src/pull.hpp
index 5fe42ae..7f249e9 100644
--- a/src/upstream.hpp
+++ b/src/pull.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__
-#define __ZMQ_UPSTREAM_HPP_INCLUDED__
+#ifndef __ZMQ_PULL_HPP_INCLUDED__
+#define __ZMQ_PULL_HPP_INCLUDED__
#include "socket_base.hpp"
#include "fq.hpp"
@@ -26,12 +26,12 @@
namespace zmq
{
- class upstream_t : public socket_base_t
+ class pull_t : public socket_base_t
{
public:
- upstream_t (class app_thread_t *parent_);
- ~upstream_t ();
+ pull_t (class app_thread_t *parent_);
+ ~pull_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
@@ -52,8 +52,8 @@ namespace zmq
// Fair queueing object for inbound pipes.
fq_t fq;
- upstream_t (const upstream_t&);
- void operator = (const upstream_t&);
+ pull_t (const pull_t&);
+ void operator = (const pull_t&);
};
diff --git a/src/downstream.cpp b/src/push.cpp
index 4074a9e..522101f 100644
--- a/src/downstream.cpp
+++ b/src/push.cpp
@@ -19,58 +19,58 @@
#include "../include/zmq.h"
-#include "downstream.hpp"
+#include "push.hpp"
#include "err.hpp"
#include "pipe.hpp"
-zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
+zmq::push_t::push_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
options.requires_in = false;
options.requires_out = true;
}
-zmq::downstream_t::~downstream_t ()
+zmq::push_t::~push_t ()
{
}
-void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
+void zmq::push_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_ && outpipe_);
lb.attach (outpipe_);
}
-void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_)
+void zmq::push_t::xdetach_inpipe (class reader_t *pipe_)
{
// There are no inpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
-void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_)
+void zmq::push_t::xdetach_outpipe (class writer_t *pipe_)
{
zmq_assert (pipe_);
lb.detach (pipe_);
}
-void zmq::downstream_t::xkill (class reader_t *pipe_)
+void zmq::push_t::xkill (class reader_t *pipe_)
{
// There are no inpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
-void zmq::downstream_t::xrevive (class reader_t *pipe_)
+void zmq::push_t::xrevive (class reader_t *pipe_)
{
// There are no inpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
-void zmq::downstream_t::xrevive (class writer_t *pipe_)
+void zmq::push_t::xrevive (class writer_t *pipe_)
{
lb.revive (pipe_);
}
-int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,
+int zmq::push_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
// No special option for this socket type.
@@ -78,23 +78,23 @@ int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,
return -1;
}
-int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_)
+int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_)
{
return lb.send (msg_, flags_);
}
-int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_)
+int zmq::push_t::xrecv (zmq_msg_t *msg_, int flags_)
{
errno = ENOTSUP;
return -1;
}
-bool zmq::downstream_t::xhas_in ()
+bool zmq::push_t::xhas_in ()
{
return false;
}
-bool zmq::downstream_t::xhas_out ()
+bool zmq::push_t::xhas_out ()
{
return lb.has_out ();
}
diff --git a/src/downstream.hpp b/src/push.hpp
index 1306743..b3c8d87 100644
--- a/src/downstream.hpp
+++ b/src/push.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__
-#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__
+#ifndef __ZMQ_PUSH_HPP_INCLUDED__
+#define __ZMQ_PUSH_HPP_INCLUDED__
#include "socket_base.hpp"
#include "lb.hpp"
@@ -26,12 +26,12 @@
namespace zmq
{
- class downstream_t : public socket_base_t
+ class push_t : public socket_base_t
{
public:
- downstream_t (class app_thread_t *parent_);
- ~downstream_t ();
+ push_t (class app_thread_t *parent_);
+ ~push_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
@@ -52,8 +52,8 @@ namespace zmq
// Load balancer managing the outbound pipes.
lb_t lb;
- downstream_t (const downstream_t&);
- void operator = (const downstream_t&);
+ push_t (const push_t&);
+ void operator = (const push_t&);
};
}