From 7a5db6041f4f11ed502fa7446da900509dedb00f Mon Sep 17 00:00:00 2001 From: malosek Date: Wed, 16 Sep 2009 12:22:36 +0200 Subject: added newly added socket options to all language bindings, P2P model changed to PUB/SUB for throughput tests --- perf/c/local_thr.c | 8 +++++- perf/c/remote_thr.c | 5 +++- perf/cpp/local_thr.cpp | 8 +++++- perf/cpp/remote_thr.cpp | 6 ++++- perf/java/local_thr.java | 68 ++++++++++++++++++++++++++--------------------- perf/java/remote_thr.java | 60 ++++++++++++++++++++++------------------- perf/python/local_thr.py | 8 +++++- perf/python/remote_thr.py | 6 ++++- perf/ruby/local_thr.rb | 7 ++++- perf/ruby/remote_thr.rb | 6 ++++- 10 files changed, 115 insertions(+), 67 deletions(-) (limited to 'perf') diff --git a/perf/c/local_thr.c b/perf/c/local_thr.c index 68d9ec6..9b94330 100644 --- a/perf/c/local_thr.c +++ b/perf/c/local_thr.c @@ -48,9 +48,15 @@ int main (int argc, char *argv []) ctx = zmq_init (1, 1); assert (ctx); - s = zmq_socket (ctx, ZMQ_P2P); + s = zmq_socket (ctx, ZMQ_SUB); assert (s); + rc = zmq_setsockopt (s, ZMQ_SUBSCRIBE , "*", 1); + assert (rc == 0); + + // Add your socket options here. + // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + rc = zmq_bind (s, bind_to); assert (rc == 0); diff --git a/perf/c/remote_thr.c b/perf/c/remote_thr.c index 3069640..8814e0f 100644 --- a/perf/c/remote_thr.c +++ b/perf/c/remote_thr.c @@ -45,9 +45,12 @@ int main (int argc, char *argv []) ctx = zmq_init (1, 1); assert (ctx); - s = zmq_socket (ctx, ZMQ_P2P); + s = zmq_socket (ctx, ZMQ_PUB); assert (s); + // Add your socket options here. + // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + rc = zmq_connect (s, connect_to); assert (rc == 0); diff --git a/perf/cpp/local_thr.cpp b/perf/cpp/local_thr.cpp index 7d40904..cbcc106 100644 --- a/perf/cpp/local_thr.cpp +++ b/perf/cpp/local_thr.cpp @@ -36,7 +36,13 @@ int main (int argc, char *argv []) zmq::context_t ctx (1, 1); - zmq::socket_t s (ctx, ZMQ_P2P); + zmq::socket_t s (ctx, ZMQ_SUB); + + s.setsockopt (ZMQ_SUBSCRIBE , "*", 1); + + // Add your socket options here. + // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + s.bind (bind_to); zmq::message_t msg; diff --git a/perf/cpp/remote_thr.cpp b/perf/cpp/remote_thr.cpp index 15a4ed1..3eee16c 100644 --- a/perf/cpp/remote_thr.cpp +++ b/perf/cpp/remote_thr.cpp @@ -36,7 +36,11 @@ int main (int argc, char *argv []) zmq::context_t ctx (1, 1); - zmq::socket_t s (ctx, ZMQ_P2P); + zmq::socket_t s (ctx, ZMQ_PUB); + + // Add your socket options here. + // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + s.connect (connect_to); for (int i = 0; i != message_count; i++) { diff --git a/perf/java/local_thr.java b/perf/java/local_thr.java index f5d3718..450aed5 100644 --- a/perf/java/local_thr.java +++ b/perf/java/local_thr.java @@ -21,45 +21,51 @@ import org.zmq.*; class local_thr { - public static void main (String [] args) - { - if (args.length != 3) { - System.out.println ("usage: local_thr " + - " "); - return; - } + public static void main (String [] args) + { + if (args.length != 3) { + System.out.println ("usage: local_thr " + + " "); + return; + } - String bindTo = args [0]; - long messageSize = Integer.parseInt (args [1]); - long messageCount = Integer.parseInt (args [2]); + String bindTo = args [0]; + long messageSize = Integer.parseInt (args [1]); + long messageCount = Integer.parseInt (args [2]); - org.zmq.Context ctx = new org.zmq.Context (1, 1); + org.zmq.Context ctx = new org.zmq.Context (1, 1); - org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.P2P); - s.bind (bindTo); + org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.SUB); - byte [] data = s.recv (0); - assert (data.length == messageSize); + s.setsockopt (org.zmq.Socket.SUBSCRIBE , "*"); - long start = System.currentTimeMillis (); + // Add your socket options here. + // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. - for (int i = 1; i != messageCount; i ++) { - data = s.recv (0); - assert (data.length == messageSize); - } + s.bind (bindTo); - long end = System.currentTimeMillis (); + byte [] data = s.recv (0); + assert (data.length == messageSize); - long elapsed = (end - start) * 1000; - if (elapsed == 0) - elapsed = 1; + long start = System.currentTimeMillis (); - long throughput = messageCount * 1000000 / elapsed; - double megabits = (double) (throughput * messageSize * 8) / 1000000; + for (int i = 1; i != messageCount; i ++) { + data = s.recv (0); + assert (data.length == messageSize); + } - System.out.println ("message size: " + messageSize + " [B]"); - System.out.println ("message count: " + messageCount); - System.out.println ("mean throughput: " + throughput + "[msg/s]"); - System.out.println ("mean throughput: " + megabits + "[Mb/s]"); - } + long end = System.currentTimeMillis (); + + long elapsed = (end - start) * 1000; + if (elapsed == 0) + elapsed = 1; + + long throughput = messageCount * 1000000 / elapsed; + double megabits = (double) (throughput * messageSize * 8) / 1000000; + + System.out.println ("message size: " + messageSize + " [B]"); + System.out.println ("message count: " + messageCount); + System.out.println ("mean throughput: " + throughput + "[msg/s]"); + System.out.println ("mean throughput: " + megabits + "[Mb/s]"); + } } diff --git a/perf/java/remote_thr.java b/perf/java/remote_thr.java index f505600..9182a8f 100644 --- a/perf/java/remote_thr.java +++ b/perf/java/remote_thr.java @@ -21,33 +21,37 @@ import org.zmq.*; class remote_thr { - public static void main (String [] args) - { - if (args.length != 3) { - System.out.println ("usage: remote_thr " + - " "); - return; - } - - // Parse the command line arguments. - String connectTo = args [0]; - int messageSize = Integer.parseInt (args [1]); - int messageCount = Integer.parseInt (args [2]); - - org.zmq.Context ctx = new org.zmq.Context (1, 1); - - org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.P2P); - s.connect (connectTo); - - byte msg [] = new byte [messageSize]; - for (int i = 0; i != messageCount; i++) - s.send (msg, 0); - - try { - Thread.sleep (10000); - } - catch (InterruptedException e) { - e.printStackTrace (); - } + public static void main (String [] args) + { + if (args.length != 3) { + System.out.println ("usage: remote_thr " + + " "); + return; + } + + // Parse the command line arguments. + String connectTo = args [0]; + int messageSize = Integer.parseInt (args [1]); + int messageCount = Integer.parseInt (args [2]); + + org.zmq.Context ctx = new org.zmq.Context (1, 1); + + org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.PUB); + + // Add your socket options here. + // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + + s.connect (connectTo); + + byte msg [] = new byte [messageSize]; + for (int i = 0; i != messageCount; i++) + s.send (msg, 0); + + try { + Thread.sleep (10000); + } + catch (InterruptedException e) { + e.printStackTrace (); + } } } diff --git a/perf/python/local_thr.py b/perf/python/local_thr.py index 0d142cf..26896d0 100644 --- a/perf/python/local_thr.py +++ b/perf/python/local_thr.py @@ -35,7 +35,13 @@ def main (): sys.exit (1) ctx = libpyzmq.Context (1, 1); - s = libpyzmq.Socket (ctx, libpyzmq.P2P) + s = libpyzmq.Socket (ctx, libpyzmq.SUB) + + s.setsockopt (libpyzmq.SUBSCRIBE , "*"); + + # Add your socket options here. + # For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + s.bind (bind_to) msg = s.recv () diff --git a/perf/python/remote_thr.py b/perf/python/remote_thr.py index bab001d..f7c69cf 100644 --- a/perf/python/remote_thr.py +++ b/perf/python/remote_thr.py @@ -35,7 +35,11 @@ def main (): sys.exit (1) ctx = libpyzmq.Context (1, 1); - s = libpyzmq.Socket (ctx, libpyzmq.P2P) + s = libpyzmq.Socket (ctx, libpyzmq.PUB) + + # Add your socket options here. + # For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + s.connect (connect_to) msg = ''.join ([' ' for n in range (0, message_size)]) diff --git a/perf/ruby/local_thr.rb b/perf/ruby/local_thr.rb index 117e54b..db14cf2 100644 --- a/perf/ruby/local_thr.rb +++ b/perf/ruby/local_thr.rb @@ -28,7 +28,12 @@ message_size = ARGV[1].to_i message_count = ARGV[2].to_i ctx = Context.new(1, 1) -s = Socket.new(ctx, P2P); +s = Socket.new(ctx, SUB); +s.setsockopt (SUBSCRIBE, "*"); + +# Add your socket options here. +# For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + s.bind(bind_to); msg = s.recv(0) diff --git a/perf/ruby/remote_thr.rb b/perf/ruby/remote_thr.rb index 8d98848..ef93961 100644 --- a/perf/ruby/remote_thr.rb +++ b/perf/ruby/remote_thr.rb @@ -28,7 +28,11 @@ message_size = ARGV[1].to_i message_count = ARGV[2].to_i ctx = Context.new(1, 1) -s = Socket.new(ctx, P2P); +s = Socket.new(ctx, PUB); + +# Add your socket options here. +# For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + s.connect(connect_to); msg = "#{'0'*message_size}" -- cgit v1.2.3