diff options
| -rw-r--r-- | src/options.cpp | 4 | ||||
| -rw-r--r-- | src/options.hpp | 8 | ||||
| -rw-r--r-- | src/pipe.hpp | 5 | ||||
| -rw-r--r-- | src/session.cpp | 2 | ||||
| -rw-r--r-- | src/socket_base.cpp | 4 | ||||
| -rw-r--r-- | src/xrep.cpp | 4 | ||||
| -rw-r--r-- | src/xreq.cpp | 4 | ||||
| -rw-r--r-- | tests/Makefile.am | 7 | ||||
| -rw-r--r-- | tests/test_reqrep_drop.cpp | 69 | 
9 files changed, 97 insertions, 10 deletions
diff --git a/src/options.cpp b/src/options.cpp index 80ab294..aa92f93 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -41,7 +41,9 @@ zmq::options_t::options_t () :      filter (1),      rcvtimeo (-1),      sndtimeo (-1), -    immediate_connect (true) +    immediate_connect (true), +    delay_on_close (true), +    delay_on_disconnect (true)  {  } diff --git a/src/options.hpp b/src/options.hpp index 858ec2e..70144b2 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -87,6 +87,14 @@ namespace zmq          //  is not aware of the peer's identity, however, it is able to send          //  messages straight away.          bool immediate_connect; + +        //  If true, session reads all the pending messages from the pipe and +        //  sends them to the network when socket is closed. +        bool delay_on_close; + +        //  If true, socket reads all the messages from the pipe and delivers +        //  them to the user when the peer terminates. +        bool delay_on_disconnect;      };  } diff --git a/src/pipe.hpp b/src/pipe.hpp index df85bc2..437d84d 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -34,8 +34,9 @@ namespace zmq      //  Create a pipepair for bi-directional transfer of messages.      //  First HWM is for messages passed from first pipe to the second pipe.      //  Second HWM is for messages passed from second pipe to the first pipe. -    //  Delay specifies whether the pipe receives all the pending messages -    //  before terminating or whether it terminates straight away. +    //  Delay specifies how the pipe behaves when the peer terminates. If true +    //  pipe receives all the pending messages before terminating, otherwise it +    //  terminates straight away.      int pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],          int hwms_ [2], bool delays_ [2]); diff --git a/src/session.cpp b/src/session.cpp index 334763a..9286054 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -180,7 +180,7 @@ void zmq::session_t::process_attach (i_engine *engine_,          object_t *parents [2] = {this, socket};          pipe_t *pipes [2] = {NULL, NULL};          int hwms [2] = {options.rcvhwm, options.sndhwm}; -        bool delays [2] = {true, true}; +        bool delays [2] = {options.delay_on_close, options.delay_on_disconnect};          int rc = pipepair (parents, pipes, hwms, delays);          errno_assert (rc == 0); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 804ec46..74a345b 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -426,7 +426,7 @@ int zmq::socket_base_t::connect (const char *addr_)          object_t *parents [2] = {this, peer.socket};          pipe_t *pipes [2] = {NULL, NULL};          int hwms [2] = {sndhwm, rcvhwm}; -        bool delays [2] = {true, true}; +        bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};          int rc = pipepair (parents, pipes, hwms, delays);          errno_assert (rc == 0); @@ -462,7 +462,7 @@ int zmq::socket_base_t::connect (const char *addr_)          object_t *parents [2] = {this, session};          pipe_t *pipes [2] = {NULL, NULL};          int hwms [2] = {options.sndhwm, options.rcvhwm}; -        bool delays [2] = {true, true}; +        bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};          int rc = pipepair (parents, pipes, hwms, delays);          errno_assert (rc == 0); diff --git a/src/xrep.cpp b/src/xrep.cpp index 4a474cf..fe9ab52 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -33,6 +33,10 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :  {      options.type = ZMQ_XREP; +    //  If peer disconnect there's noone to send reply to anyway. We can drop +    //  all the outstanding requests from that peer. +    options.delay_on_disconnect = false; +      prefetched_msg.init ();      //  Start the peer ID sequence from a random point. diff --git a/src/xreq.cpp b/src/xreq.cpp index 2371a34..85e2238 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -26,6 +26,10 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) :      socket_base_t (parent_, tid_)  {      options.type = ZMQ_XREQ; + +    //  If the socket is closing we can drop all the outbound requests. There'll +    //  be noone to receive the replies anyway. +    options.delay_on_close = false;  }  zmq::xreq_t::~xreq_t () diff --git a/tests/Makefile.am b/tests/Makefile.am index 9238850..8ec3b0c 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -6,7 +6,8 @@ noinst_PROGRAMS = test_pair_inproc \                    test_reqrep_inproc \                    test_reqrep_tcp \                    test_hwm \ -                  test_reqrep_device +                  test_reqrep_device \ +                  test_reqrep_drop  if !ON_MINGW  noinst_PROGRAMS += test_shutdown_stress \ @@ -17,13 +18,11 @@ endif  test_pair_inproc_SOURCES = test_pair_inproc.cpp testutil.hpp  test_pair_tcp_SOURCES = test_pair_tcp.cpp testutil.hpp -  test_reqrep_inproc_SOURCES = test_reqrep_inproc.cpp testutil.hpp  test_reqrep_tcp_SOURCES = test_reqrep_tcp.cpp testutil.hpp -  test_hwm_SOURCES = test_hwm.cpp -  test_reqrep_device_SOURCES = test_reqrep_device.cpp +test_reqrep_drop_SOURCES = test_reqrep_drop.cpp  if !ON_MINGW  test_shutdown_stress_SOURCES = test_shutdown_stress.cpp diff --git a/tests/test_reqrep_drop.cpp b/tests/test_reqrep_drop.cpp new file mode 100644 index 0000000..353aee7 --- /dev/null +++ b/tests/test_reqrep_drop.cpp @@ -0,0 +1,69 @@ +/* +    Copyright (c) 2007-2011 iMatix Corporation +    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the GNU Lesser General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    GNU Lesser General Public License for more details. + +    You should have received a copy of the GNU Lesser General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <assert.h> + +#include "../include/zmq.h" +#include "../include/zmq_utils.h" + +int main (int argc, char *argv []) +{ +    void *ctx = zmq_init (1); +    assert (ctx); + +    //  Create a server. +    void *xrep = zmq_socket (ctx, ZMQ_XREP); +    assert (xrep); +    int rc = zmq_bind (xrep, "tcp://127.0.0.1:5560"); +    assert (rc == 0); + +    //  Create a client. +    void *xreq = zmq_socket (ctx, ZMQ_XREQ); +    assert (xreq); +    rc = zmq_connect (xreq, "tcp://127.0.0.1:5560"); +    assert (rc == 0); + +    //  Send requests. +    rc = zmq_send (xreq, "ABC", 3, 0); +    assert (rc == 3); +    rc = zmq_send (xreq, "DEF", 3, 0); +    assert (rc == 3); + +    //  Disconnect client. +    rc = zmq_close (xreq); +    assert (rc == 0); + +    //  Wait a while for disconnect to happen. +    zmq_sleep (1); + +    //  Try to receive a request -- it should have been discarded. +    char buff [3]; +    rc = zmq_recv (xrep, buff, 3, ZMQ_DONTWAIT); +    assert (rc < 0); +    assert (errno == EAGAIN); + +    //  Clean up. +    rc = zmq_close (xrep); +    assert (rc == 0); +    rc = zmq_term (ctx); +    assert (rc == 0); + +    return 0 ; +}  | 
