summaryrefslogtreecommitdiff
path: root/src/zmq_engine.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zmq_engine.cpp')
-rw-r--r--src/zmq_engine.cpp23
1 files changed, 20 insertions, 3 deletions
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 18fc616..6b439f5 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -25,7 +25,7 @@
#include "err.hpp"
zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
- const options_t &options_) :
+ const options_t &options_, bool reconnect_, const char *address_) :
io_object_t (parent_),
inpos (NULL),
insize (0),
@@ -34,8 +34,12 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
outsize (0),
encoder (out_batch_size, false),
inout (NULL),
- options (options_)
+ options (options_),
+ reconnect (reconnect_)
{
+ if (reconnect)
+ address = address_;
+
// Initialise the underlying socket.
int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf);
zmq_assert (rc == 0);
@@ -145,7 +149,20 @@ void zmq::zmq_engine_t::revive ()
void zmq::zmq_engine_t::error ()
{
zmq_assert (inout);
- inout->detach ();
+
+ zmq_connecter_t *reconnecter = NULL;
+ if (reconnect) {
+
+ // Create a connecter object to attempt reconnect.
+ // Ask it to wait for a while before reconnecting.
+ reconnecter = new zmq_connecter_t (
+ inout->get_io_thread (), inout->get_owner (),
+ options, inout->get_session_name (), true);
+ zmq_assert (reconnecter);
+ reconnecter->set_address (address.c_str ());
+ }
+
+ inout->detach (reconnecter);
unplug ();
delete this;
}