From 57e057c825156b6effd15c07ee302f4ccdd2561c Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 25 Feb 2010 16:29:17 +0100 Subject: 1st version of Java poll added --- bindings/java/Context.cpp | 140 +++++++++++++++++++++++++++++++------ bindings/java/Socket.cpp | 89 +++++++++-------------- bindings/java/org/zmq/Context.java | 21 +++++- bindings/java/org/zmq/Socket.java | 5 +- 4 files changed, 173 insertions(+), 82 deletions(-) mode change 100644 => 100755 bindings/java/Context.cpp mode change 100644 => 100755 bindings/java/Socket.cpp (limited to 'bindings') diff --git a/bindings/java/Context.cpp b/bindings/java/Context.cpp old mode 100644 new mode 100755 index d2b5e42..53ef0c9 --- a/bindings/java/Context.cpp +++ b/bindings/java/Context.cpp @@ -24,29 +24,29 @@ #include "org_zmq_Context.h" +static void *fetch_socket (JNIEnv *env, jobject socket); + /** Handle to Java's Context::contextHandle. */ static jfieldID ctx_handle_fid = NULL; /** * Make sure we have a valid pointer to Java's Context::contextHandle. */ -static void ensure_context (JNIEnv *env, - jobject obj) +static void ensure_context (JNIEnv *env, jobject obj) { if (ctx_handle_fid == NULL) { jclass cls = env->GetObjectClass (obj); - assert (cls); - ctx_handle_fid = env->GetFieldID (cls, "contextHandle", "J"); - assert (ctx_handle_fid); - env->DeleteLocalRef (cls); + assert (cls); + ctx_handle_fid = env->GetFieldID (cls, "contextHandle", "J"); + assert (ctx_handle_fid); + env->DeleteLocalRef (cls); } } /** * Get the value of Java's Context::contextHandle. */ -static void *get_context (JNIEnv *env, - jobject obj) +static void *get_context (JNIEnv *env, jobject obj) { ensure_context (env, obj); void *s = (void*) env->GetLongField (obj, ctx_handle_fid); @@ -56,9 +56,7 @@ static void *get_context (JNIEnv *env, /** * Set the value of Java's Context::contextHandle. */ -static void put_context (JNIEnv *env, - jobject obj, - void *s) +static void put_context (JNIEnv *env, jobject obj, void *s) { ensure_context (env, obj); env->SetLongField (obj, ctx_handle_fid, (jlong) s); @@ -67,8 +65,7 @@ static void put_context (JNIEnv *env, /** * Raise an exception that includes 0MQ's error message. */ -static void raise_exception (JNIEnv *env, - int err) +static void raise_exception (JNIEnv *env, int err) { // Get exception class. jclass exception_class = env->FindClass ("java/lang/Exception"); @@ -88,18 +85,15 @@ static void raise_exception (JNIEnv *env, * Called to construct a Java Context object. */ JNIEXPORT void JNICALL Java_org_zmq_Context_construct (JNIEnv *env, - jobject obj, - jint app_threads, - jint io_threads, - jint flags) + jobject obj, jint app_threads, jint io_threads, jint flags) { void *c = get_context (env, obj); - assert (! c); + assert (!c); c = zmq_init (app_threads, io_threads, flags); - put_context(env, obj, c); + put_context (env, obj, c); - if (c == NULL) { + if (!c) { raise_exception (env, errno); return; } @@ -109,7 +103,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Context_construct (JNIEnv *env, * Called to destroy a Java Context object. */ JNIEXPORT void JNICALL Java_org_zmq_Context_finalize (JNIEnv *env, - jobject obj) + jobject obj) { void *c = get_context (env, obj); assert (c); @@ -118,3 +112,107 @@ JNIEXPORT void JNICALL Java_org_zmq_Context_finalize (JNIEnv *env, put_context (env, obj, NULL); assert (rc == 0); } + +JNIEXPORT jlong JNICALL Java_org_zmq_Context_poll (JNIEnv *env, + jobject obj, + jobjectArray socket_0mq, + jshortArray event_0mq, + jshortArray revent_0mq, + jlong timeout) +{ + jsize ls_0mq = 0; + jsize le_0mq = 0; + jsize lr_0mq = 0; + + if (socket_0mq) + ls_0mq = env->GetArrayLength (socket_0mq); + if (event_0mq) + le_0mq = env->GetArrayLength (event_0mq); + if (revent_0mq) + lr_0mq = env->GetArrayLength (revent_0mq); + + if (ls_0mq != le_0mq || ls_0mq != lr_0mq) + return 0; + + jsize ls = ls_0mq; + if (ls <= 0) + return 0; + + zmq_pollitem_t *pitem = new zmq_pollitem_t [ls]; + short pc = 0; + int rc = 0; + + // Add 0MQ sockets. + if (ls_0mq > 0) { + jshort *e_0mq = env->GetShortArrayElements (event_0mq, 0); + if (e_0mq != NULL) { + for (int i = 0; i < ls_0mq; ++i) { + jobject s_0mq = env->GetObjectArrayElement (socket_0mq, i); + if (!s_0mq) + continue; + void *s = fetch_socket (env, s_0mq); + if (!s) + continue; + pitem [pc].socket = s; + pitem [pc].fd = 0; + pitem [pc].events = e_0mq [i]; + pitem [pc].revents = 0; + ++pc; + } + env->ReleaseShortArrayElements(event_0mq, e_0mq, 0); + } + } + + if (pc == ls) { + pc = 0; + long tout = (long) timeout; + rc = zmq_poll (pitem, ls, tout); + int err = 0; + const char *msg = ""; + if (rc < 0) { + err = errno; + msg = zmq_strerror (err); + } + } + + // Set 0MQ results. + if (ls_0mq > 0) { + jshort *r_0mq = env->GetShortArrayElements (revent_0mq, 0); + if (r_0mq) { + for (int i = 0; i < ls_0mq; ++i) { + r_0mq [i] = pitem [pc].revents; + ++pc; + } + env->ReleaseShortArrayElements(revent_0mq, r_0mq, 0); + } + } + + delete [] pitem; + return rc; +} + +/** + * Get the value of socketHandle for the specified Java Socket. + */ +static void *fetch_socket (JNIEnv *env, jobject socket) +{ + static jmethodID get_socket_handle_mid = NULL; + + if (get_socket_handle_mid == NULL) { + jclass cls = env->GetObjectClass (socket); + assert (cls); + get_socket_handle_mid = env->GetMethodID (cls, + "getSocketHandle", "()J"); + env->DeleteLocalRef (cls); + assert (get_socket_handle_mid); + } + + void *s = (void*) env->CallLongMethod (socket, get_socket_handle_mid); + if (env->ExceptionCheck ()) { + s = NULL; + } + + assert (s); + return s; +} + diff --git a/bindings/java/Socket.cpp b/bindings/java/Socket.cpp old mode 100644 new mode 100755 index adea3fa..51c473d --- a/bindings/java/Socket.cpp +++ b/bindings/java/Socket.cpp @@ -32,23 +32,21 @@ static jfieldID socket_handle_fid = NULL; /** * Make sure we have a valid pointer to Java's Socket::socketHandle. */ -static void ensure_socket (JNIEnv *env, - jobject obj) +static void ensure_socket (JNIEnv *env, jobject obj) { if (socket_handle_fid == NULL) { jclass cls = env->GetObjectClass (obj); - assert (cls); - socket_handle_fid = env->GetFieldID (cls, "socketHandle", "J"); - assert (socket_handle_fid); - env->DeleteLocalRef (cls); + assert (cls); + socket_handle_fid = env->GetFieldID (cls, "socketHandle", "J"); + assert (socket_handle_fid); + env->DeleteLocalRef (cls); } } /** * Get the value of Java's Socket::socketHandle. */ -static void *get_socket (JNIEnv *env, - jobject obj) +static void *get_socket (JNIEnv *env, jobject obj) { ensure_socket (env, obj); void *s = (void*) env->GetLongField (obj, socket_handle_fid); @@ -58,42 +56,35 @@ static void *get_socket (JNIEnv *env, /** * Set the value of Java's Socket::socketHandle. */ -static void put_socket (JNIEnv *env, - jobject obj, - void *s) +static void put_socket (JNIEnv *env, jobject obj, void *s) { ensure_socket (env, obj); env->SetLongField (obj, socket_handle_fid, (jlong) s); } /** - * Get the value of contextHandle for the Java Context associated with - * this Java Socket object. + * Get the value of contextHandle for the specified Java Context. */ -static void *fetch_context (JNIEnv *env, - jobject context) +static void *fetch_context (JNIEnv *env, jobject context) { static jmethodID get_context_handle_mid = NULL; - if (get_context_handle_mid == NULL) { + if (!get_context_handle_mid) { jclass cls = env->GetObjectClass (context); - assert (cls); - - get_context_handle_mid = env->GetMethodID (cls, - "getContextHandle", - "()J"); - env->DeleteLocalRef (cls); - assert (get_context_handle_mid); + assert (cls); + get_context_handle_mid = env->GetMethodID (cls, + "getContextHandle", "()J"); + env->DeleteLocalRef (cls); + assert (get_context_handle_mid); } - void *zmq_ctx = (void*) env->CallLongMethod (context, - get_context_handle_mid); + void *c = (void*) env->CallLongMethod (context, get_context_handle_mid); if (env->ExceptionCheck ()) { - zmq_ctx = NULL; + c = NULL; } - assert (zmq_ctx); - return zmq_ctx; + assert (c); + return c; } /** @@ -119,20 +110,18 @@ static void raise_exception (JNIEnv *env, int err) * Called to construct a Java Socket object. */ JNIEXPORT void JNICALL Java_org_zmq_Socket_construct (JNIEnv *env, - jobject obj, - jobject context, - jint type) + jobject obj, jobject context, jint type) { void *s = get_socket (env, obj); assert (! s); - void *zmq_ctx = fetch_context (env, context); - s = zmq_socket (zmq_ctx, type); + void *c = fetch_context (env, context); + s = zmq_socket (c, type); put_socket(env, obj, s); if (s == NULL) { raise_exception (env, errno); - return; + return; } } @@ -140,7 +129,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Socket_construct (JNIEnv *env, * Called to destroy a Java Socket object. */ JNIEXPORT void JNICALL Java_org_zmq_Socket_finalize (JNIEnv *env, - jobject obj) + jobject obj) { void *s = get_socket (env, obj); assert (s); @@ -184,10 +173,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Socket_setsockopt__IJ (JNIEnv *env, * Called by Java's Socket::setsockopt(int option, String optval). */ JNIEXPORT void JNICALL Java_org_zmq_Socket_setsockopt__ILjava_lang_String_2 ( - JNIEnv *env, - jobject obj, - jint option, - jstring optval) + JNIEnv *env, jobject obj, jint option, jstring optval) { switch (option) { case ZMQ_IDENTITY: @@ -219,9 +205,8 @@ JNIEXPORT void JNICALL Java_org_zmq_Socket_setsockopt__ILjava_lang_String_2 ( /** * Called by Java's Socket::bind(String addr). */ -JNIEXPORT void JNICALL Java_org_zmq_Socket_bind (JNIEnv *env, - jobject obj, - jstring addr) +JNIEXPORT void JNICALL Java_org_zmq_Socket_bind (JNIEnv *env, jobject obj, + jstring addr) { void *s = get_socket (env, obj); assert (s); @@ -248,8 +233,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Socket_bind (JNIEnv *env, * Called by Java's Socket::connect(String addr). */ JNIEXPORT void JNICALL Java_org_zmq_Socket_connect (JNIEnv *env, - jobject obj, - jstring addr) + jobject obj, jstring addr) { void *s = get_socket (env, obj); assert (s); @@ -276,9 +260,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Socket_connect (JNIEnv *env, * Called by Java's Socket::send(byte [] msg, long flags). */ JNIEXPORT jboolean JNICALL Java_org_zmq_Socket_send (JNIEnv *env, - jobject obj, - jbyteArray msg, - jlong flags) + jobject obj, jbyteArray msg, jlong flags) { void *s = get_socket (env, obj); assert (s); @@ -316,8 +298,7 @@ JNIEXPORT jboolean JNICALL Java_org_zmq_Socket_send (JNIEnv *env, /** * Called by Java's Socket::flush(). */ -JNIEXPORT void JNICALL Java_org_zmq_Socket_flush (JNIEnv *env, - jobject obj) +JNIEXPORT void JNICALL Java_org_zmq_Socket_flush (JNIEnv *env, jobject obj) { void *s = get_socket (env, obj); assert (s); @@ -334,8 +315,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Socket_flush (JNIEnv *env, * Called by Java's Socket::recv(long flags). */ JNIEXPORT jbyteArray JNICALL Java_org_zmq_Socket_recv (JNIEnv *env, - jobject obj, - jlong flags) + jobject obj, jlong flags) { void *s = get_socket (env, obj); assert (s); @@ -357,10 +337,9 @@ JNIEXPORT jbyteArray JNICALL Java_org_zmq_Socket_recv (JNIEnv *env, jbyteArray data = env->NewByteArray (zmq_msg_size (&message)); assert (data); - env->SetByteArrayRegion (data, - 0, - zmq_msg_size (&message), - (jbyte*) zmq_msg_data (&message)); + env->SetByteArrayRegion (data, 0, zmq_msg_size (&message), + (jbyte*) zmq_msg_data (&message)); return data; } + diff --git a/bindings/java/org/zmq/Context.java b/bindings/java/org/zmq/Context.java index 1b5ecb4..7a3552b 100644 --- a/bindings/java/org/zmq/Context.java +++ b/bindings/java/org/zmq/Context.java @@ -26,6 +26,10 @@ public class Context { public static final int POLL = 1; + public static final int POLLIN = 1; + public static final int POLLOUT = 2; + public static final int POLLERR = 4; + /** * Class constructor. * @@ -36,6 +40,20 @@ public class Context { construct (appThreads, ioThreads, flags); } + /** + * Issue a poll call on the specified 0MQ sockets. + * This function is experimental and may change in the future. + * + * @param socket an array of 0MQ Socket objects to poll. + * @param event an array of short values specifying what to poll for. + * @param revent an array of short values with the results. + * @param timeout the maximum timeout in microseconds. + */ + public native long poll (Socket[] socket, + short[] event, + short[] revent, + long timeout); + /** Initialize the JNI interface */ protected native void construct (int appThreads, int ioThreads, int flags); @@ -50,10 +68,9 @@ public class Context { * @return the internal 0MQ context handle. */ private long getContextHandle () { - return contextHandle; + return contextHandle; } - /** Opaque data used by JNI driver. */ private long contextHandle; } diff --git a/bindings/java/org/zmq/Socket.java b/bindings/java/org/zmq/Socket.java index 84e2c52..851b7b8 100644 --- a/bindings/java/org/zmq/Socket.java +++ b/bindings/java/org/zmq/Socket.java @@ -50,7 +50,6 @@ public class Socket { public static final int SNDBUF = 11; public static final int RCVBUF = 12; - /** * Class constructor. * @@ -127,11 +126,9 @@ public class Socket { * @return the internal 0MQ socket handle. */ private long getSocketHandle () { - return socketHandle; + return socketHandle; } - /** Opaque data used by JNI driver. */ private long socketHandle; - } -- cgit v1.2.3