From 61ad236e9543a569fe066872a5fda4fa40ea7591 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 13 Mar 2010 14:40:10 +0100 Subject: ZMQ_NOFLUSH and zmq_flush obsoleted --- doc/Makefile.am | 2 +- doc/zmq.txt | 1 - doc/zmq_flush.txt | 55 ----------------------------------------------------- doc/zmq_send.txt | 8 -------- doc/zmq_socket.txt | 1 - include/zmq.hpp | 7 ------- src/downstream.cpp | 10 ---------- src/downstream.hpp | 1 - src/lb.cpp | 3 +-- src/p2p.cpp | 10 +--------- src/p2p.hpp | 1 - src/pub.cpp | 17 +++-------------- src/pub.hpp | 1 - src/rep.cpp | 6 ------ src/rep.hpp | 1 - src/req.cpp | 6 ------ src/req.hpp | 1 - src/socket_base.cpp | 5 ----- src/socket_base.hpp | 2 -- src/sub.cpp | 6 ------ src/sub.hpp | 1 - src/upstream.cpp | 6 ------ src/upstream.hpp | 1 - src/xrep.cpp | 6 ------ src/xrep.hpp | 1 - src/xreq.cpp | 7 ------- src/xreq.hpp | 1 - src/zmq.cpp | 3 ++- 28 files changed, 8 insertions(+), 162 deletions(-) delete mode 100644 doc/zmq_flush.txt diff --git a/doc/Makefile.am b/doc/Makefile.am index 0e40813..fce124d 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -1,5 +1,5 @@ MAN1 = zmq_forwarder.1 zmq_streamer.1 zmq_queue.1 -MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_flush.3 zmq_init.3 \ +MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_init.3 \ zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \ zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \ zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \ diff --git a/doc/zmq.txt b/doc/zmq.txt index 8f8d40d..003570a 100644 --- a/doc/zmq.txt +++ b/doc/zmq.txt @@ -115,7 +115,6 @@ Establishing a message flow:: Sending and receiving messages:: linkzmq:zmq_send[3] - linkzmq:zmq_flush[3] linkzmq:zmq_recv[3] diff --git a/doc/zmq_flush.txt b/doc/zmq_flush.txt deleted file mode 100644 index 83ba32c..0000000 --- a/doc/zmq_flush.txt +++ /dev/null @@ -1,55 +0,0 @@ -zmq_flush(3) -============ - - -NAME ----- -zmq_flush - flush messages queued on a socket - - -SYNOPSIS --------- -*int zmq_flush (void '*socket');* - - -DESCRIPTION ------------ -The _zmq_flush()_ function shall flush messages previously queued on the socket -referenced by the 'socket' argument. The _zmq_flush()_ function only affects -messages that have been queued on the _message queue_ associated with 'socket' -using the 'ZMQ_NOFLUSH' flag to the _zmq_send()_ function. If no such messages -exist, the function has no effect. - -CAUTION: A successful invocation of _zmq_flush()_ does not indicate that the -flushed messages have been transmitted to the network, or even that such a -transmission has been initiated by 0MQ. This function exists merely as a way -for the application programmer to supply a hint to the 0MQ infrastructure that -the queued messages *may* be flushed as a single batch. - - -RETURN VALUE ------------- -The _zmq_flush()_ function shall return zero if successful. Otherwise it shall -return `-1` and set 'errno' to one of the values defined below. - - -ERRORS ------- -*ENOTSUP*:: -The _zmq_flush()_ operation is not supported by this socket type. -*EFSM*:: -The _zmq_flush()_ operation cannot be performed on this socket at the moment -due to the socket not being in the appropriate state. - - -SEE ALSO --------- -linkzmq:zmq_send[3] -linkzmq:zmq_socket[3] -linkzmq:zmq[7] - - -AUTHOR ------- -The 0MQ documentation was written by Martin Sustrik and -Martin Lucina . diff --git a/doc/zmq_send.txt b/doc/zmq_send.txt index b98748f..e0b19e7 100644 --- a/doc/zmq_send.txt +++ b/doc/zmq_send.txt @@ -23,13 +23,6 @@ Specifies that the operation should be performed in non-blocking mode. If the message cannot be queued on the underlying _message queue_ associated with 'socket', the _zmq_send()_ function shall fail with 'errno' set to EAGAIN. -*ZMQ_NOFLUSH*:: -Specifies that the _zmq_send()_ function should not flush the underlying -_message queue_ associated with 'socket' to the network automatically. -Instead, it should batch all messages queued with the 'ZMQ_NOFLUSH' flag and -only flush the _message queue_ once either a message without the 'ZMQ_NOFLUSH' -flag is queued, or manually on invocation of the _zmq_flush()_ function. - NOTE: A successful invocation of _zmq_send()_ does not indicate that the message has been transmitted to the network, only that it has been queued on the _message queue_ associated with the socket and 0MQ has assumed @@ -73,7 +66,6 @@ assert (rc == 0); SEE ALSO -------- -linkzmq:zmq_flush[3] linkzmq:zmq_recv[3] linkzmq:zmq_socket[7] linkzmq:zmq[7] diff --git a/doc/zmq_socket.txt b/doc/zmq_socket.txt index 9d2907d..8f10cc7 100644 --- a/doc/zmq_socket.txt +++ b/doc/zmq_socket.txt @@ -125,7 +125,6 @@ linkzmq:zmq_setsockopt[3] linkzmq:zmq_bind[3] linkzmq:zmq_connect[3] linkzmq:zmq_send[3] -linkzmq:zmq_flush[3] linkzmq:zmq_recv[3] diff --git a/include/zmq.hpp b/include/zmq.hpp index 3ddc043..6228133 100644 --- a/include/zmq.hpp +++ b/include/zmq.hpp @@ -236,13 +236,6 @@ namespace zmq throw error_t (); } - inline void flush () - { - int rc = zmq_flush (ptr); - if (rc != 0) - throw error_t (); - } - inline bool recv (message_t *msg_, int flags_ = 0) { int rc = zmq_recv (ptr, msg_, flags_); diff --git a/src/downstream.cpp b/src/downstream.cpp index ca614d8..4074a9e 100644 --- a/src/downstream.cpp +++ b/src/downstream.cpp @@ -83,16 +83,6 @@ int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_) return lb.send (msg_, flags_); } -int zmq::downstream_t::xflush () -{ - // TODO: Maybe there's a point in flushing messages downstream. - // It may be useful in the case where number of messages in a single - // transaction is much greater than the number of attached pipes. - errno = ENOTSUP; - return -1; - -} - int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_) { errno = ENOTSUP; diff --git a/src/downstream.hpp b/src/downstream.hpp index 998ab73..1306743 100644 --- a/src/downstream.hpp +++ b/src/downstream.hpp @@ -43,7 +43,6 @@ namespace zmq void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); - int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); diff --git a/src/lb.cpp b/src/lb.cpp index d7193f1..e282d24 100644 --- a/src/lb.cpp +++ b/src/lb.cpp @@ -80,8 +80,7 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) return -1; } - if (!(flags_ & ZMQ_NOFLUSH)) - pipes [current]->flush (); + pipes [current]->flush (); // Detach the message from the data buffer. int rc = zmq_msg_init (msg_); diff --git a/src/p2p.cpp b/src/p2p.cpp index f81c6c4..3f63d81 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -100,8 +100,7 @@ int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_) return -1; } - if (!(flags_ & ZMQ_NOFLUSH)) - outpipe->flush (); + outpipe->flush (); // Detach the original message from the data buffer. int rc = zmq_msg_init (msg_); @@ -110,13 +109,6 @@ int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_) return 0; } -int zmq::p2p_t::xflush () -{ - if (outpipe) - outpipe->flush (); - return 0; -} - int zmq::p2p_t::xrecv (zmq_msg_t *msg_, int flags_) { // Deallocate old content of the message. diff --git a/src/p2p.hpp b/src/p2p.hpp index 97531cf..57320d9 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -42,7 +42,6 @@ namespace zmq void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); - int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); diff --git a/src/pub.cpp b/src/pub.cpp index 2971e5c..ad78834 100644 --- a/src/pub.cpp +++ b/src/pub.cpp @@ -107,8 +107,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) for (out_pipes_t::size_type i = 0; i != pipes_count; i++) { bool written = out_pipes [i]->write (msg_); zmq_assert (written); - if (!(flags_ & ZMQ_NOFLUSH)) - out_pipes [i]->flush (); + out_pipes [i]->flush (); } int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); @@ -121,8 +120,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) if (pipes_count == 1) { bool written = out_pipes [0]->write (msg_); zmq_assert (written); - if (!(flags_ & ZMQ_NOFLUSH)) - out_pipes [0]->flush (); + out_pipes [0]->flush (); int rc = zmq_msg_init (msg_); zmq_assert (rc == 0); return 0; @@ -142,8 +140,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) for (out_pipes_t::size_type i = 0; i != pipes_count; i++) { bool written = out_pipes [i]->write (msg_); zmq_assert (written); - if (!(flags_ & ZMQ_NOFLUSH)) - out_pipes [i]->flush (); + out_pipes [i]->flush (); } // Detach the original message from the data buffer. @@ -153,14 +150,6 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) return 0; } -int zmq::pub_t::xflush () -{ - out_pipes_t::size_type pipe_count = out_pipes.size (); - for (out_pipes_t::size_type i = 0; i != pipe_count; i++) - out_pipes [i]->flush (); - return 0; -} - int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_) { errno = ENOTSUP; diff --git a/src/pub.hpp b/src/pub.hpp index 3a4fe09..89c1cd1 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -43,7 +43,6 @@ namespace zmq void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); - int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); diff --git a/src/rep.cpp b/src/rep.cpp index 08fc31b..eaeff41 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -188,12 +188,6 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) return 0; } -int zmq::rep_t::xflush () -{ - errno = ENOTSUP; - return -1; -} - int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) { // Deallocate old content of the message. diff --git a/src/rep.hpp b/src/rep.hpp index 9d2357d..51a49a9 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -43,7 +43,6 @@ namespace zmq void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); - int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); diff --git a/src/req.cpp b/src/req.cpp index 548cf52..0dfe14e 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -207,12 +207,6 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) return 0; } -int zmq::req_t::xflush () -{ - errno = ENOTSUP; - return -1; -} - int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_) { // Deallocate old content of the message. diff --git a/src/req.hpp b/src/req.hpp index 531c06f..d3e12b5 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -43,7 +43,6 @@ namespace zmq void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); - int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fdb2d12..f038dc9 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -335,11 +335,6 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) return 0; } -int zmq::socket_base_t::flush () -{ - return xflush (); -} - int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) { // Get the message. diff --git a/src/socket_base.hpp b/src/socket_base.hpp index bb40ae6..4464256 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -52,7 +52,6 @@ namespace zmq int bind (const char *addr_); int connect (const char *addr_); int send (zmq_msg_t *msg_, int flags_); - int flush (); int recv (zmq_msg_t *msg_, int flags_); int close (); @@ -113,7 +112,6 @@ namespace zmq virtual int xsetsockopt (int option_, const void *optval_, size_t optvallen_) = 0; virtual int xsend (zmq_msg_t *msg_, int options_) = 0; - virtual int xflush () = 0; virtual int xrecv (zmq_msg_t *msg_, int options_) = 0; virtual bool xhas_in () = 0; virtual bool xhas_out () = 0; diff --git a/src/sub.cpp b/src/sub.cpp index 4169ea5..e32e198 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -98,12 +98,6 @@ int zmq::sub_t::xsend (zmq_msg_t *msg_, int flags_) return -1; } -int zmq::sub_t::xflush () -{ - errno = ENOTSUP; - return -1; -} - int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_) { // If there's already a message prepared by a previous call to zmq_poll, diff --git a/src/sub.hpp b/src/sub.hpp index c319565..84fab5e 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -48,7 +48,6 @@ namespace zmq void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); - int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); diff --git a/src/upstream.cpp b/src/upstream.cpp index 8163c18..1498c31 100644 --- a/src/upstream.cpp +++ b/src/upstream.cpp @@ -81,12 +81,6 @@ int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_) return -1; } -int zmq::upstream_t::xflush () -{ - errno = ENOTSUP; - return -1; -} - int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_) { return fq.recv (msg_, flags_); diff --git a/src/upstream.hpp b/src/upstream.hpp index d9fb385..5fe42ae 100644 --- a/src/upstream.hpp +++ b/src/upstream.hpp @@ -43,7 +43,6 @@ namespace zmq void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); - int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); diff --git a/src/xrep.cpp b/src/xrep.cpp index 48546d9..33b89bd 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -129,12 +129,6 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) return 0; } -int zmq::xrep_t::xflush () -{ - zmq_assert (false); - return -1; -} - int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) { return fq.recv (msg_, flags_); diff --git a/src/xrep.hpp b/src/xrep.hpp index f2cdb2b..c56a8f9 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -46,7 +46,6 @@ namespace zmq void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); - int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); diff --git a/src/xreq.cpp b/src/xreq.cpp index 484ee97..66e5cc3 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -80,13 +80,6 @@ int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) return lb.send (msg_, flags_); } -int zmq::xreq_t::xflush () -{ - // TODO: Implement flushing. - zmq_assert (false); - return -1; -} - int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_) { return fq.recv (msg_, flags_); diff --git a/src/xreq.hpp b/src/xreq.hpp index 3d3f573..8ee0bb9 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -44,7 +44,6 @@ namespace zmq void xrevive (class writer_t *pipe_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq_msg_t *msg_, int flags_); - int xflush (); int xrecv (zmq_msg_t *msg_, int flags_); bool xhas_in (); bool xhas_out (); diff --git a/src/zmq.cpp b/src/zmq.cpp index a9430d4..80720ae 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -315,7 +315,8 @@ int zmq_send (void *s_, zmq_msg_t *msg_, int flags_) int zmq_flush (void *s_) { - return (((zmq::socket_base_t*) s_)->flush ()); + errno = ENOTSUP; + return -1; } int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) -- cgit v1.2.3