From 7a5db6041f4f11ed502fa7446da900509dedb00f Mon Sep 17 00:00:00 2001 From: malosek Date: Wed, 16 Sep 2009 12:22:36 +0200 Subject: added newly added socket options to all language bindings, P2P model changed to PUB/SUB for throughput tests --- c/zmq.h | 2 +- java/Socket.cpp | 1 + java/org/zmq/Socket.java | 1 + perf/c/local_thr.c | 8 +++++- perf/c/remote_thr.c | 5 +++- perf/cpp/local_thr.cpp | 8 +++++- perf/cpp/remote_thr.cpp | 6 ++++- perf/java/local_thr.java | 68 ++++++++++++++++++++++++++--------------------- perf/java/remote_thr.java | 60 ++++++++++++++++++++++------------------- perf/python/local_thr.py | 8 +++++- perf/python/remote_thr.py | 6 ++++- perf/ruby/local_thr.rb | 7 ++++- perf/ruby/remote_thr.rb | 6 ++++- python/pyzmq.cpp | 18 ++++++++++++- ruby/rbzmq.cpp | 5 ++++ src/socket_base.cpp | 11 ++++++-- 16 files changed, 149 insertions(+), 71 deletions(-) diff --git a/c/zmq.h b/c/zmq.h index 58c5551..732ecb9 100644 --- a/c/zmq.h +++ b/c/zmq.h @@ -53,7 +53,7 @@ extern "C" { #define ZMQ_UNSUBSCRIBE 7 // string #define ZMQ_RATE 8 // int64_t #define ZMQ_RECOVERY_IVL 9 // int64_t -#define ZMQ_MCAST_LOOP 10 // boolean +#define ZMQ_MCAST_LOOP 10 // int64_t // The operation should be performed in non-blocking mode. I.e. if it cannot // be processed immediately, error should be returned with errno set to EAGAIN. diff --git a/java/Socket.cpp b/java/Socket.cpp index 2a2f420..2274535 100644 --- a/java/Socket.cpp +++ b/java/Socket.cpp @@ -98,6 +98,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Socket_setsockopt__IJ (JNIEnv *env, case ZMQ_AFFINITY: case ZMQ_RATE: case ZMQ_RECOVERY_IVL: + case ZMQ_MCAST_LOOP: { void *s = (void*) env->GetLongField (obj, socket_handle_fid); assert (s); diff --git a/java/org/zmq/Socket.java b/java/org/zmq/Socket.java index 0d96c71..501bc16 100644 --- a/java/org/zmq/Socket.java +++ b/java/org/zmq/Socket.java @@ -44,6 +44,7 @@ public class Socket public static final int UNSUBSCRIBE = 7; public static final int RATE = 8; public static final int RECOVERY_IVL = 9; + public static final int MCAST_LOOP = 10; /** * Class constructor. diff --git a/perf/c/local_thr.c b/perf/c/local_thr.c index 68d9ec6..9b94330 100644 --- a/perf/c/local_thr.c +++ b/perf/c/local_thr.c @@ -48,9 +48,15 @@ int main (int argc, char *argv []) ctx = zmq_init (1, 1); assert (ctx); - s = zmq_socket (ctx, ZMQ_P2P); + s = zmq_socket (ctx, ZMQ_SUB); assert (s); + rc = zmq_setsockopt (s, ZMQ_SUBSCRIBE , "*", 1); + assert (rc == 0); + + // Add your socket options here. + // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + rc = zmq_bind (s, bind_to); assert (rc == 0); diff --git a/perf/c/remote_thr.c b/perf/c/remote_thr.c index 3069640..8814e0f 100644 --- a/perf/c/remote_thr.c +++ b/perf/c/remote_thr.c @@ -45,9 +45,12 @@ int main (int argc, char *argv []) ctx = zmq_init (1, 1); assert (ctx); - s = zmq_socket (ctx, ZMQ_P2P); + s = zmq_socket (ctx, ZMQ_PUB); assert (s); + // Add your socket options here. + // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + rc = zmq_connect (s, connect_to); assert (rc == 0); diff --git a/perf/cpp/local_thr.cpp b/perf/cpp/local_thr.cpp index 7d40904..cbcc106 100644 --- a/perf/cpp/local_thr.cpp +++ b/perf/cpp/local_thr.cpp @@ -36,7 +36,13 @@ int main (int argc, char *argv []) zmq::context_t ctx (1, 1); - zmq::socket_t s (ctx, ZMQ_P2P); + zmq::socket_t s (ctx, ZMQ_SUB); + + s.setsockopt (ZMQ_SUBSCRIBE , "*", 1); + + // Add your socket options here. + // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + s.bind (bind_to); zmq::message_t msg; diff --git a/perf/cpp/remote_thr.cpp b/perf/cpp/remote_thr.cpp index 15a4ed1..3eee16c 100644 --- a/perf/cpp/remote_thr.cpp +++ b/perf/cpp/remote_thr.cpp @@ -36,7 +36,11 @@ int main (int argc, char *argv []) zmq::context_t ctx (1, 1); - zmq::socket_t s (ctx, ZMQ_P2P); + zmq::socket_t s (ctx, ZMQ_PUB); + + // Add your socket options here. + // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + s.connect (connect_to); for (int i = 0; i != message_count; i++) { diff --git a/perf/java/local_thr.java b/perf/java/local_thr.java index f5d3718..450aed5 100644 --- a/perf/java/local_thr.java +++ b/perf/java/local_thr.java @@ -21,45 +21,51 @@ import org.zmq.*; class local_thr { - public static void main (String [] args) - { - if (args.length != 3) { - System.out.println ("usage: local_thr " + - " "); - return; - } + public static void main (String [] args) + { + if (args.length != 3) { + System.out.println ("usage: local_thr " + + " "); + return; + } - String bindTo = args [0]; - long messageSize = Integer.parseInt (args [1]); - long messageCount = Integer.parseInt (args [2]); + String bindTo = args [0]; + long messageSize = Integer.parseInt (args [1]); + long messageCount = Integer.parseInt (args [2]); - org.zmq.Context ctx = new org.zmq.Context (1, 1); + org.zmq.Context ctx = new org.zmq.Context (1, 1); - org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.P2P); - s.bind (bindTo); + org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.SUB); - byte [] data = s.recv (0); - assert (data.length == messageSize); + s.setsockopt (org.zmq.Socket.SUBSCRIBE , "*"); - long start = System.currentTimeMillis (); + // Add your socket options here. + // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. - for (int i = 1; i != messageCount; i ++) { - data = s.recv (0); - assert (data.length == messageSize); - } + s.bind (bindTo); - long end = System.currentTimeMillis (); + byte [] data = s.recv (0); + assert (data.length == messageSize); - long elapsed = (end - start) * 1000; - if (elapsed == 0) - elapsed = 1; + long start = System.currentTimeMillis (); - long throughput = messageCount * 1000000 / elapsed; - double megabits = (double) (throughput * messageSize * 8) / 1000000; + for (int i = 1; i != messageCount; i ++) { + data = s.recv (0); + assert (data.length == messageSize); + } - System.out.println ("message size: " + messageSize + " [B]"); - System.out.println ("message count: " + messageCount); - System.out.println ("mean throughput: " + throughput + "[msg/s]"); - System.out.println ("mean throughput: " + megabits + "[Mb/s]"); - } + long end = System.currentTimeMillis (); + + long elapsed = (end - start) * 1000; + if (elapsed == 0) + elapsed = 1; + + long throughput = messageCount * 1000000 / elapsed; + double megabits = (double) (throughput * messageSize * 8) / 1000000; + + System.out.println ("message size: " + messageSize + " [B]"); + System.out.println ("message count: " + messageCount); + System.out.println ("mean throughput: " + throughput + "[msg/s]"); + System.out.println ("mean throughput: " + megabits + "[Mb/s]"); + } } diff --git a/perf/java/remote_thr.java b/perf/java/remote_thr.java index f505600..9182a8f 100644 --- a/perf/java/remote_thr.java +++ b/perf/java/remote_thr.java @@ -21,33 +21,37 @@ import org.zmq.*; class remote_thr { - public static void main (String [] args) - { - if (args.length != 3) { - System.out.println ("usage: remote_thr " + - " "); - return; - } - - // Parse the command line arguments. - String connectTo = args [0]; - int messageSize = Integer.parseInt (args [1]); - int messageCount = Integer.parseInt (args [2]); - - org.zmq.Context ctx = new org.zmq.Context (1, 1); - - org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.P2P); - s.connect (connectTo); - - byte msg [] = new byte [messageSize]; - for (int i = 0; i != messageCount; i++) - s.send (msg, 0); - - try { - Thread.sleep (10000); - } - catch (InterruptedException e) { - e.printStackTrace (); - } + public static void main (String [] args) + { + if (args.length != 3) { + System.out.println ("usage: remote_thr " + + " "); + return; + } + + // Parse the command line arguments. + String connectTo = args [0]; + int messageSize = Integer.parseInt (args [1]); + int messageCount = Integer.parseInt (args [2]); + + org.zmq.Context ctx = new org.zmq.Context (1, 1); + + org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.PUB); + + // Add your socket options here. + // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + + s.connect (connectTo); + + byte msg [] = new byte [messageSize]; + for (int i = 0; i != messageCount; i++) + s.send (msg, 0); + + try { + Thread.sleep (10000); + } + catch (InterruptedException e) { + e.printStackTrace (); + } } } diff --git a/perf/python/local_thr.py b/perf/python/local_thr.py index 0d142cf..26896d0 100644 --- a/perf/python/local_thr.py +++ b/perf/python/local_thr.py @@ -35,7 +35,13 @@ def main (): sys.exit (1) ctx = libpyzmq.Context (1, 1); - s = libpyzmq.Socket (ctx, libpyzmq.P2P) + s = libpyzmq.Socket (ctx, libpyzmq.SUB) + + s.setsockopt (libpyzmq.SUBSCRIBE , "*"); + + # Add your socket options here. + # For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + s.bind (bind_to) msg = s.recv () diff --git a/perf/python/remote_thr.py b/perf/python/remote_thr.py index bab001d..f7c69cf 100644 --- a/perf/python/remote_thr.py +++ b/perf/python/remote_thr.py @@ -35,7 +35,11 @@ def main (): sys.exit (1) ctx = libpyzmq.Context (1, 1); - s = libpyzmq.Socket (ctx, libpyzmq.P2P) + s = libpyzmq.Socket (ctx, libpyzmq.PUB) + + # Add your socket options here. + # For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + s.connect (connect_to) msg = ''.join ([' ' for n in range (0, message_size)]) diff --git a/perf/ruby/local_thr.rb b/perf/ruby/local_thr.rb index 117e54b..db14cf2 100644 --- a/perf/ruby/local_thr.rb +++ b/perf/ruby/local_thr.rb @@ -28,7 +28,12 @@ message_size = ARGV[1].to_i message_count = ARGV[2].to_i ctx = Context.new(1, 1) -s = Socket.new(ctx, P2P); +s = Socket.new(ctx, SUB); +s.setsockopt (SUBSCRIBE, "*"); + +# Add your socket options here. +# For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + s.bind(bind_to); msg = s.recv(0) diff --git a/perf/ruby/remote_thr.rb b/perf/ruby/remote_thr.rb index 8d98848..ef93961 100644 --- a/perf/ruby/remote_thr.rb +++ b/perf/ruby/remote_thr.rb @@ -28,7 +28,11 @@ message_size = ARGV[1].to_i message_count = ARGV[2].to_i ctx = Context.new(1, 1) -s = Socket.new(ctx, P2P); +s = Socket.new(ctx, PUB); + +# Add your socket options here. +# For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + s.connect(connect_to); msg = "#{'0'*message_size}" diff --git a/python/pyzmq.cpp b/python/pyzmq.cpp index 93b8b8f..c550eb5 100644 --- a/python/pyzmq.cpp +++ b/python/pyzmq.cpp @@ -453,7 +453,7 @@ PyMODINIT_FUNC initlibpyzmq () PyObject *dict = PyModule_GetDict (module); assert (dict); - PyObject *t; + PyObject *t; t = PyInt_FromLong (ZMQ_NOBLOCK); PyDict_SetItemString (dict, "NOBLOCK", t); Py_DECREF (t); @@ -489,7 +489,23 @@ PyMODINIT_FUNC initlibpyzmq () Py_DECREF (t); t = PyInt_FromLong (ZMQ_IDENTITY); PyDict_SetItemString (dict, "IDENTITY", t); + Py_DECREF (t); + t = PyInt_FromLong (ZMQ_SUBSCRIBE); + PyDict_SetItemString (dict, "SUBSCRIBE", t); + Py_DECREF (t); + t = PyInt_FromLong (ZMQ_UNSUBSCRIBE); + PyDict_SetItemString (dict, "UNSUBSCRIBE", t); + Py_DECREF (t); + t = PyInt_FromLong (ZMQ_RATE); + PyDict_SetItemString (dict, "RATE", t); Py_DECREF (t); + t = PyInt_FromLong (ZMQ_RECOVERY_IVL); + PyDict_SetItemString (dict, "RECOVERY_IVL", t); + Py_DECREF (t); + t = PyInt_FromLong (ZMQ_MCAST_LOOP); + PyDict_SetItemString (dict, "MCAST_LOOP", t); + Py_DECREF (t); + } #if defined _MSC_VER diff --git a/ruby/rbzmq.cpp b/ruby/rbzmq.cpp index 5d679b9..1751c14 100644 --- a/ruby/rbzmq.cpp +++ b/ruby/rbzmq.cpp @@ -282,6 +282,11 @@ extern "C" void Init_librbzmq () rb_define_global_const ("SWAP", INT2NUM (ZMQ_SWAP)); rb_define_global_const ("AFFINITY", INT2NUM (ZMQ_AFFINITY)); rb_define_global_const ("IDENTITY", INT2NUM (ZMQ_IDENTITY)); + rb_define_global_const ("SUBSCRIBE", INT2NUM (ZMQ_SUBSCRIBE)); + rb_define_global_const ("UNSUBSCRIBE", INT2NUM (ZMQ_UNSUBSCRIBE)); + rb_define_global_const ("RATE", INT2NUM (ZMQ_RATE)); + rb_define_global_const ("RECOVERY_IVL", INT2NUM (ZMQ_RECOVERY_IVL)); + rb_define_global_const ("MCAST_LOOP", INT2NUM (ZMQ_MCAST_LOOP)); rb_define_global_const ("NOBLOCK", INT2NUM (ZMQ_NOBLOCK)); rb_define_global_const ("NOFLUSH", INT2NUM (ZMQ_NOFLUSH)); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 10f1404..88ba43f 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -158,11 +158,18 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, return 0; case ZMQ_MCAST_LOOP: - if (optvallen_ != sizeof (bool)) { + if (optvallen_ != sizeof (int64_t)) { + errno = EINVAL; + return -1; + } + + if ((int64_t) *((int64_t*) optval_) == 0 || + (int64_t) *((int64_t*) optval_) == 1) { + options.use_multicast_loop = (bool) *((int64_t*) optval_); + } else { errno = EINVAL; return -1; } - options.use_multicast_loop = optval_; return 0; default: -- cgit v1.2.3