summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--perf/c/remote_lat.c10
-rw-r--r--perf/cpp/remote_lat.cpp5
-rw-r--r--src/session.cpp9
-rw-r--r--src/session.hpp5
-rw-r--r--src/socket_base.cpp4
-rw-r--r--src/zmq_connecter.cpp22
-rw-r--r--src/zmq_connecter.hpp5
-rw-r--r--src/zmq_connecter_init.cpp22
-rw-r--r--src/zmq_engine.cpp1
-rw-r--r--src/zmq_engine.hpp1
-rw-r--r--src/zmq_listener_init.cpp2
11 files changed, 66 insertions, 20 deletions
diff --git a/perf/c/remote_lat.c b/perf/c/remote_lat.c
index 23695b4..15dfc46 100644
--- a/perf/c/remote_lat.c
+++ b/perf/c/remote_lat.c
@@ -20,6 +20,7 @@
#include <zmq.h>
#include <stdio.h>
#include <stdlib.h>
+#include <string.h>
#include <assert.h>
int main (int argc, char *argv [])
@@ -54,10 +55,11 @@ int main (int argc, char *argv [])
rc = zmq_connect (s, connect_to);
assert (rc == 0);
- watch = zmq_stopwatch_start ();
-
rc = zmq_msg_init_size (&msg, message_size);
assert (rc == 0);
+ memset (zmq_msg_data (&msg), 0, message_size);
+
+ watch = zmq_stopwatch_start ();
for (i = 0; i != roundtrip_count; i++) {
rc = zmq_send (s, &msg, 0);
@@ -67,11 +69,11 @@ int main (int argc, char *argv [])
assert (zmq_msg_size (&msg) == message_size);
}
+ elapsed = zmq_stopwatch_stop (watch);
+
rc = zmq_msg_close (&msg);
assert (rc == 0);
- elapsed = zmq_stopwatch_stop (watch);
-
latency = (double) elapsed / (roundtrip_count * 2);
printf ("message size: %d [B]\n", (int) message_size);
diff --git a/perf/cpp/remote_lat.cpp b/perf/cpp/remote_lat.cpp
index f1d2a17..f6ccbb4 100644
--- a/perf/cpp/remote_lat.cpp
+++ b/perf/cpp/remote_lat.cpp
@@ -22,6 +22,7 @@
#include <stdlib.h>
#include <assert.h>
#include <stddef.h>
+#include <string.h>
int main (int argc, char *argv [])
{
@@ -39,10 +40,12 @@ int main (int argc, char *argv [])
zmq::socket_t s (ctx, ZMQ_REQ);
s.connect (connect_to);
+ zmq::message_t msg (message_size);
+ memset (msg.data (), 0, message_size);
+
void *watch = zmq_stopwatch_start ();
for (int i = 0; i != roundtrip_count; i++) {
- zmq::message_t msg (message_size);
s.send (msg);
s.recv (&msg);
assert (msg.size () == message_size);
diff --git a/src/session.cpp b/src/session.cpp
index 31c6354..9593827 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -23,14 +23,15 @@
#include "pipe.hpp"
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
- const char *name_, const options_t &options_) :
+ const char *name_, const options_t &options_, bool reconnect_) :
owned_t (parent_, owner_),
in_pipe (NULL),
active (true),
out_pipe (NULL),
engine (NULL),
name (name_),
- options (options_)
+ options (options_),
+ reconnect (reconnect_)
{
}
@@ -69,7 +70,9 @@ void zmq::session_t::flush ()
void zmq::session_t::detach ()
{
- // Engine is terminating itself.
+ // TODO: Start reconnection process here.
+
+ // Engine is terminating itself. No need to deallocate it from here.
engine = NULL;
// In the case od anonymous connection, terminate the session.
diff --git a/src/session.hpp b/src/session.hpp
index 195bdca..55aa8ea 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -35,7 +35,7 @@ namespace zmq
public:
session_t (object_t *parent_, socket_base_t *owner_, const char *name_,
- const options_t &options_);
+ const options_t &options_, bool reconnect_);
// i_inout interface implementation.
bool read (::zmq_msg_t *msg_);
@@ -77,6 +77,9 @@ namespace zmq
// Inherited socket options.
options_t options;
+ // If true, reconnection is required after connection breaks.
+ bool reconnect;
+
session_t (const session_t&);
void operator = (const session_t&);
};
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index c195e91..10f1404 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -236,7 +236,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create the session.
io_thread_t *io_thread = choose_io_thread (options.affinity);
session_t *session = new session_t (io_thread, this, session_name.c_str (),
- options);
+ options, true);
zmq_assert (session);
// Create inbound pipe.
@@ -267,7 +267,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// it can bind the new connection to the session once it is established.
zmq_connecter_t *connecter = new zmq_connecter_t (
choose_io_thread (options.affinity), this, options,
- session_name.c_str ());
+ session_name.c_str (), false);
int rc = connecter->set_address (addr_args.c_str ());
if (rc != 0) {
delete connecter;
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index be88bff..fb147c0 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -24,10 +24,11 @@
zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_,
socket_base_t *owner_, const options_t &options_,
- const char *session_name_) :
+ const char *session_name_, bool wait_) :
owned_t (parent_, owner_),
io_object_t (parent_),
handle_valid (false),
+ wait (wait_),
options (options_),
session_name (session_name_)
{
@@ -44,12 +45,17 @@ int zmq::zmq_connecter_t::set_address (const char *addr_)
void zmq::zmq_connecter_t::process_plug ()
{
- start_connecting ();
+ if (wait)
+ add_timer ();
+ else
+ start_connecting ();
owned_t::process_plug ();
}
void zmq::zmq_connecter_t::process_unplug ()
{
+ if (wait)
+ cancel_timer ();
if (handle_valid)
rm_fd (handle);
}
@@ -68,8 +74,13 @@ void zmq::zmq_connecter_t::out_event ()
rm_fd (handle);
handle_valid = false;
- // TODO: Handle the error condition by eventual reconnect.
- zmq_assert (fd != retired_fd);
+ // Handle the error condition by attempt to reconnect.
+ if (fd == retired_fd) {
+ tcp_connecter.close ();
+ wait = true;
+ add_timer ();
+ return;
+ }
// Create an init object.
io_thread_t *io_thread = choose_io_thread (options.affinity);
@@ -85,7 +96,8 @@ void zmq::zmq_connecter_t::out_event ()
void zmq::zmq_connecter_t::timer_event ()
{
- zmq_assert (false);
+ wait = false;
+ start_connecting ();
}
void zmq::zmq_connecter_t::start_connecting ()
diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp
index e308502..c443ce8 100644
--- a/src/zmq_connecter.hpp
+++ b/src/zmq_connecter.hpp
@@ -36,7 +36,7 @@ namespace zmq
public:
zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_,
- const options_t &options_, const char *session_name_);
+ const options_t &options_, const char *session_name_, bool wait_);
~zmq_connecter_t ();
// Set IP address to connect to.
@@ -66,6 +66,9 @@ namespace zmq
// contains valid value.
bool handle_valid;
+ // If true, connecter is waiting a while before trying to connect.
+ bool wait;
+
// Associated socket options.
options_t options;
diff --git a/src/zmq_connecter_init.cpp b/src/zmq_connecter_init.cpp
index 730077a..ffd4a64 100644
--- a/src/zmq_connecter_init.cpp
+++ b/src/zmq_connecter_init.cpp
@@ -18,6 +18,7 @@
*/
#include "zmq_connecter_init.hpp"
+#include "zmq_connecter.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "err.hpp"
@@ -83,8 +84,25 @@ void zmq::zmq_connecter_init_t::flush ()
void zmq::zmq_connecter_init_t::detach ()
{
- // TODO: Engine is closing down. Init object is to be closed as well.
- zmq_assert (false);
+ // TODO: Start reconnection process here.
+/*
+ // Create a connecter object to attempt reconnect. Ask it to wait for a
+ // while before reconnecting.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ zmq_connecter_t *connecter = new zmq_connecter_t (io_thread, owner,
+ options, session_name.c_str (), true);
+ connecter->set_address (...);
+ zmq_assert (connecter);
+ send_plug (connecter);
+ send_own (owner, connecter);
+*/
+
+ // This function is called by engine when disconnection occurs.
+ // The engine will destroy itself, so we just drop the pointer here and
+ // start termination of the init object.
+ engine = NULL;
+ term ();
+
}
void zmq::zmq_connecter_init_t::process_plug ()
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 2c5eb1c..b82af0a 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -18,6 +18,7 @@
*/
#include "zmq_engine.hpp"
+#include "zmq_connecter.hpp"
#include "io_thread.hpp"
#include "i_inout.hpp"
#include "config.hpp"
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index 8299ebf..0d1b10a 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -25,6 +25,7 @@
#include "tcp_socket.hpp"
#include "zmq_encoder.hpp"
#include "zmq_decoder.hpp"
+#include "options.hpp"
namespace zmq
{
diff --git a/src/zmq_listener_init.cpp b/src/zmq_listener_init.cpp
index 756e9d8..eec41c7 100644
--- a/src/zmq_listener_init.cpp
+++ b/src/zmq_listener_init.cpp
@@ -76,7 +76,7 @@ void zmq::zmq_listener_init_t::flush ()
if (!session) {
io_thread_t *io_thread = choose_io_thread (options.affinity);
session = new session_t (io_thread, owner, peer_identity.c_str (),
- options);
+ options, false);
zmq_assert (session);
send_plug (session);
send_own (owner, session);