summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/downstream.cpp10
-rw-r--r--src/downstream.hpp1
-rw-r--r--src/lb.cpp3
-rw-r--r--src/p2p.cpp10
-rw-r--r--src/p2p.hpp1
-rw-r--r--src/pub.cpp17
-rw-r--r--src/pub.hpp1
-rw-r--r--src/rep.cpp6
-rw-r--r--src/rep.hpp1
-rw-r--r--src/req.cpp6
-rw-r--r--src/req.hpp1
-rw-r--r--src/socket_base.cpp5
-rw-r--r--src/socket_base.hpp2
-rw-r--r--src/sub.cpp6
-rw-r--r--src/sub.hpp1
-rw-r--r--src/upstream.cpp6
-rw-r--r--src/upstream.hpp1
-rw-r--r--src/xrep.cpp6
-rw-r--r--src/xrep.hpp1
-rw-r--r--src/xreq.cpp7
-rw-r--r--src/xreq.hpp1
-rw-r--r--src/zmq.cpp3
22 files changed, 7 insertions, 89 deletions
diff --git a/src/downstream.cpp b/src/downstream.cpp
index ca614d8..4074a9e 100644
--- a/src/downstream.cpp
+++ b/src/downstream.cpp
@@ -83,16 +83,6 @@ int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_)
return lb.send (msg_, flags_);
}
-int zmq::downstream_t::xflush ()
-{
- // TODO: Maybe there's a point in flushing messages downstream.
- // It may be useful in the case where number of messages in a single
- // transaction is much greater than the number of attached pipes.
- errno = ENOTSUP;
- return -1;
-
-}
-
int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_)
{
errno = ENOTSUP;
diff --git a/src/downstream.hpp b/src/downstream.hpp
index 998ab73..1306743 100644
--- a/src/downstream.hpp
+++ b/src/downstream.hpp
@@ -43,7 +43,6 @@ namespace zmq
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
- int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
diff --git a/src/lb.cpp b/src/lb.cpp
index d7193f1..e282d24 100644
--- a/src/lb.cpp
+++ b/src/lb.cpp
@@ -80,8 +80,7 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
return -1;
}
- if (!(flags_ & ZMQ_NOFLUSH))
- pipes [current]->flush ();
+ pipes [current]->flush ();
// Detach the message from the data buffer.
int rc = zmq_msg_init (msg_);
diff --git a/src/p2p.cpp b/src/p2p.cpp
index f81c6c4..3f63d81 100644
--- a/src/p2p.cpp
+++ b/src/p2p.cpp
@@ -100,8 +100,7 @@ int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_)
return -1;
}
- if (!(flags_ & ZMQ_NOFLUSH))
- outpipe->flush ();
+ outpipe->flush ();
// Detach the original message from the data buffer.
int rc = zmq_msg_init (msg_);
@@ -110,13 +109,6 @@ int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_)
return 0;
}
-int zmq::p2p_t::xflush ()
-{
- if (outpipe)
- outpipe->flush ();
- return 0;
-}
-
int zmq::p2p_t::xrecv (zmq_msg_t *msg_, int flags_)
{
// Deallocate old content of the message.
diff --git a/src/p2p.hpp b/src/p2p.hpp
index 97531cf..57320d9 100644
--- a/src/p2p.hpp
+++ b/src/p2p.hpp
@@ -42,7 +42,6 @@ namespace zmq
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
- int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
diff --git a/src/pub.cpp b/src/pub.cpp
index 2971e5c..ad78834 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -107,8 +107,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
bool written = out_pipes [i]->write (msg_);
zmq_assert (written);
- if (!(flags_ & ZMQ_NOFLUSH))
- out_pipes [i]->flush ();
+ out_pipes [i]->flush ();
}
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
@@ -121,8 +120,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
if (pipes_count == 1) {
bool written = out_pipes [0]->write (msg_);
zmq_assert (written);
- if (!(flags_ & ZMQ_NOFLUSH))
- out_pipes [0]->flush ();
+ out_pipes [0]->flush ();
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
@@ -142,8 +140,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
bool written = out_pipes [i]->write (msg_);
zmq_assert (written);
- if (!(flags_ & ZMQ_NOFLUSH))
- out_pipes [i]->flush ();
+ out_pipes [i]->flush ();
}
// Detach the original message from the data buffer.
@@ -153,14 +150,6 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
return 0;
}
-int zmq::pub_t::xflush ()
-{
- out_pipes_t::size_type pipe_count = out_pipes.size ();
- for (out_pipes_t::size_type i = 0; i != pipe_count; i++)
- out_pipes [i]->flush ();
- return 0;
-}
-
int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_)
{
errno = ENOTSUP;
diff --git a/src/pub.hpp b/src/pub.hpp
index 3a4fe09..89c1cd1 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -43,7 +43,6 @@ namespace zmq
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
- int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
diff --git a/src/rep.cpp b/src/rep.cpp
index 08fc31b..eaeff41 100644
--- a/src/rep.cpp
+++ b/src/rep.cpp
@@ -188,12 +188,6 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
return 0;
}
-int zmq::rep_t::xflush ()
-{
- errno = ENOTSUP;
- return -1;
-}
-
int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
{
// Deallocate old content of the message.
diff --git a/src/rep.hpp b/src/rep.hpp
index 9d2357d..51a49a9 100644
--- a/src/rep.hpp
+++ b/src/rep.hpp
@@ -43,7 +43,6 @@ namespace zmq
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
- int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
diff --git a/src/req.cpp b/src/req.cpp
index 548cf52..0dfe14e 100644
--- a/src/req.cpp
+++ b/src/req.cpp
@@ -207,12 +207,6 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
return 0;
}
-int zmq::req_t::xflush ()
-{
- errno = ENOTSUP;
- return -1;
-}
-
int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
{
// Deallocate old content of the message.
diff --git a/src/req.hpp b/src/req.hpp
index 531c06f..d3e12b5 100644
--- a/src/req.hpp
+++ b/src/req.hpp
@@ -43,7 +43,6 @@ namespace zmq
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
- int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index fdb2d12..f038dc9 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -335,11 +335,6 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
return 0;
}
-int zmq::socket_base_t::flush ()
-{
- return xflush ();
-}
-
int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
{
// Get the message.
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index bb40ae6..4464256 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -52,7 +52,6 @@ namespace zmq
int bind (const char *addr_);
int connect (const char *addr_);
int send (zmq_msg_t *msg_, int flags_);
- int flush ();
int recv (zmq_msg_t *msg_, int flags_);
int close ();
@@ -113,7 +112,6 @@ namespace zmq
virtual int xsetsockopt (int option_, const void *optval_,
size_t optvallen_) = 0;
virtual int xsend (zmq_msg_t *msg_, int options_) = 0;
- virtual int xflush () = 0;
virtual int xrecv (zmq_msg_t *msg_, int options_) = 0;
virtual bool xhas_in () = 0;
virtual bool xhas_out () = 0;
diff --git a/src/sub.cpp b/src/sub.cpp
index 4169ea5..e32e198 100644
--- a/src/sub.cpp
+++ b/src/sub.cpp
@@ -98,12 +98,6 @@ int zmq::sub_t::xsend (zmq_msg_t *msg_, int flags_)
return -1;
}
-int zmq::sub_t::xflush ()
-{
- errno = ENOTSUP;
- return -1;
-}
-
int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
{
// If there's already a message prepared by a previous call to zmq_poll,
diff --git a/src/sub.hpp b/src/sub.hpp
index c319565..84fab5e 100644
--- a/src/sub.hpp
+++ b/src/sub.hpp
@@ -48,7 +48,6 @@ namespace zmq
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
- int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
diff --git a/src/upstream.cpp b/src/upstream.cpp
index 8163c18..1498c31 100644
--- a/src/upstream.cpp
+++ b/src/upstream.cpp
@@ -81,12 +81,6 @@ int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_)
return -1;
}
-int zmq::upstream_t::xflush ()
-{
- errno = ENOTSUP;
- return -1;
-}
-
int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_)
{
return fq.recv (msg_, flags_);
diff --git a/src/upstream.hpp b/src/upstream.hpp
index d9fb385..5fe42ae 100644
--- a/src/upstream.hpp
+++ b/src/upstream.hpp
@@ -43,7 +43,6 @@ namespace zmq
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
- int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
diff --git a/src/xrep.cpp b/src/xrep.cpp
index 48546d9..33b89bd 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -129,12 +129,6 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
return 0;
}
-int zmq::xrep_t::xflush ()
-{
- zmq_assert (false);
- return -1;
-}
-
int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
{
return fq.recv (msg_, flags_);
diff --git a/src/xrep.hpp b/src/xrep.hpp
index f2cdb2b..c56a8f9 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -46,7 +46,6 @@ namespace zmq
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
- int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
diff --git a/src/xreq.cpp b/src/xreq.cpp
index 484ee97..66e5cc3 100644
--- a/src/xreq.cpp
+++ b/src/xreq.cpp
@@ -80,13 +80,6 @@ int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
return lb.send (msg_, flags_);
}
-int zmq::xreq_t::xflush ()
-{
- // TODO: Implement flushing.
- zmq_assert (false);
- return -1;
-}
-
int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_)
{
return fq.recv (msg_, flags_);
diff --git a/src/xreq.hpp b/src/xreq.hpp
index 3d3f573..8ee0bb9 100644
--- a/src/xreq.hpp
+++ b/src/xreq.hpp
@@ -44,7 +44,6 @@ namespace zmq
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
- int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
diff --git a/src/zmq.cpp b/src/zmq.cpp
index a9430d4..80720ae 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -315,7 +315,8 @@ int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)
int zmq_flush (void *s_)
{
- return (((zmq::socket_base_t*) s_)->flush ());
+ errno = ENOTSUP;
+ return -1;
}
int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)