summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-02-26 15:55:36 +0100
committerMartin Sustrik <sustrik@250bpm.com>2010-02-26 15:55:36 +0100
commit4440b13c359dab2c1ba55e31c604ac093172d68e (patch)
treef6d606e7b14a4d412c052f40ba4a9fbe2be565ce
parent1c4daf79ce12da75acb8010c99b3c1d509a7a950 (diff)
Poller object implemented in Java binding
-rwxr-xr-xbindings/java/Poller.cpp126
-rwxr-xr-xbindings/java/org/zmq/Poller.java135
2 files changed, 261 insertions, 0 deletions
diff --git a/bindings/java/Poller.cpp b/bindings/java/Poller.cpp
new file mode 100755
index 0000000..bf7a043
--- /dev/null
+++ b/bindings/java/Poller.cpp
@@ -0,0 +1,126 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <assert.h>
+#include <errno.h>
+
+#include "../c/zmq.h"
+
+#include "org_zmq_Poller.h"
+
+static void *fetch_socket (JNIEnv *env, jobject socket);
+
+JNIEXPORT jlong JNICALL Java_org_zmq_Poller_run_1poll (JNIEnv *env,
+ jobject obj,
+ jint count,
+ jobjectArray socket_0mq,
+ jshortArray event_0mq,
+ jshortArray revent_0mq,
+ jlong timeout)
+{
+ int ls = (int) count;
+ if (ls <= 0)
+ return 0;
+
+ int ls_0mq = 0;
+ int le_0mq = 0;
+ int lr_0mq = 0;
+
+ if (socket_0mq)
+ ls_0mq = env->GetArrayLength (socket_0mq);
+ if (event_0mq)
+ le_0mq = env->GetArrayLength (event_0mq);
+ if (revent_0mq)
+ lr_0mq = env->GetArrayLength (revent_0mq);
+
+ if (ls > ls_0mq || ls > le_0mq || ls > ls_0mq)
+ return 0;
+
+ zmq_pollitem_t *pitem = new zmq_pollitem_t [ls];
+ short pc = 0;
+ int rc = 0;
+
+ // Add 0MQ sockets.
+ if (ls_0mq > 0) {
+ jshort *e_0mq = env->GetShortArrayElements (event_0mq, 0);
+ if (e_0mq != NULL) {
+ for (int i = 0; i < ls_0mq; ++i) {
+ jobject s_0mq = env->GetObjectArrayElement (socket_0mq, i);
+ if (!s_0mq)
+ continue;
+ void *s = fetch_socket (env, s_0mq);
+ if (!s)
+ continue;
+ pitem [pc].socket = s;
+ pitem [pc].fd = 0;
+ pitem [pc].events = e_0mq [i];
+ pitem [pc].revents = 0;
+ ++pc;
+ }
+ env->ReleaseShortArrayElements(event_0mq, e_0mq, 0);
+ }
+ }
+
+ if (pc == ls) {
+ pc = 0;
+ long tout = (long) timeout;
+ rc = zmq_poll (pitem, ls, tout);
+ }
+
+ // Set 0MQ results.
+ if (ls_0mq > 0) {
+ jshort *r_0mq = env->GetShortArrayElements (revent_0mq, 0);
+ if (r_0mq) {
+ for (int i = 0; i < ls_0mq; ++i) {
+ r_0mq [i] = pitem [pc].revents;
+ ++pc;
+ }
+ env->ReleaseShortArrayElements(revent_0mq, r_0mq, 0);
+ }
+ }
+
+ delete [] pitem;
+ return rc;
+}
+
+/**
+ * Get the value of socketHandle for the specified Java Socket.
+ * TODO: move this to a single util.h file.
+ */
+static void *fetch_socket (JNIEnv *env, jobject socket)
+{
+ static jmethodID get_socket_handle_mid = NULL;
+
+ if (get_socket_handle_mid == NULL) {
+ jclass cls = env->GetObjectClass (socket);
+ assert (cls);
+ get_socket_handle_mid = env->GetMethodID (cls,
+ "getSocketHandle", "()J");
+ env->DeleteLocalRef (cls);
+ assert (get_socket_handle_mid);
+ }
+
+ void *s = (void*) env->CallLongMethod (socket, get_socket_handle_mid);
+ if (env->ExceptionCheck ()) {
+ s = NULL;
+ }
+
+ assert (s);
+ return s;
+}
diff --git a/bindings/java/org/zmq/Poller.java b/bindings/java/org/zmq/Poller.java
new file mode 100755
index 0000000..dd168b2
--- /dev/null
+++ b/bindings/java/org/zmq/Poller.java
@@ -0,0 +1,135 @@
+/*
+ Copyright (c) 2007-2010 iMatix Corporation
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the Lesser GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ Lesser GNU General Public License for more details.
+
+ You should have received a copy of the Lesser GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package org.zmq;
+
+public class Poller {
+ static {
+ System.loadLibrary("jzmq");
+ }
+
+ public static final int POLLIN = 1;
+ public static final int POLLOUT = 2;
+ public static final int POLLERR = 4;
+
+ /**
+ * Class constructor.
+ *
+ * @param context a 0MQ context previously created.
+ */
+ public Poller (Context context, int size) {
+ this.context = context;
+ this.size = size;
+ this.next = 0;
+
+ this.socket = new Socket[size];
+ this.event = new short[size];
+ this.revent = new short[size];
+
+ for (int i = 0; i < size; ++i) {
+ this.event[i] = (POLLIN | POLLOUT | POLLERR);
+ }
+ }
+
+ public int register (Socket socket) {
+ if (next >= size)
+ return -1;
+ this.socket[next] = socket;
+ return next++;
+ }
+
+ public long getTimeout () {
+ return this.timeout;
+ }
+
+ public void setTimeout (long timeout) {
+ this.timeout = timeout;
+ }
+
+ public int getSize () {
+ return this.size;
+ }
+
+ public int getNext () {
+ return this.next;
+ }
+
+ /**
+ * Issue a poll call.
+ * @return how many objects where signalled by poll().
+ */
+ public long poll () {
+ if (size <= 0 || next <= 0)
+ return 0;
+
+ for (int i = 0; i < next; ++i) {
+ revent[i] = 0;
+ }
+
+ return run_poll(next, socket, event, revent, timeout);
+ }
+
+ public boolean pollin(int index) {
+ return poll_mask(index, POLLIN);
+ }
+
+ public boolean pollout(int index) {
+ return poll_mask(index, POLLOUT);
+ }
+
+ public boolean pollerr(int index) {
+ return poll_mask(index, POLLERR);
+ }
+
+ /**
+ * Issue a poll call on the specified 0MQ sockets.
+ *
+ * @param socket an array of 0MQ Socket objects to poll.
+ * @param event an array of short values specifying what to poll for.
+ * @param revent an array of short values with the results.
+ * @param timeout the maximum timeout in microseconds.
+ * @return how many objects where signalled by poll().
+ */
+ private native long run_poll(int count,
+ Socket[] socket,
+ short[] event,
+ short[] revent,
+ long timeout);
+
+ /**
+ * Check whether a specific mask was signalled by latest poll call.
+ *
+ * @param index the index indicating the socket.
+ * @param mask a combination of POLLIN, POLLOUT and POLLERR.
+ * @return true if specific socket was signalled as specified.
+ */
+ private boolean poll_mask(int index, int mask) {
+ if (mask <= 0 || index < 0 || index >= next)
+ return false;
+ return (revent[index] & mask) > 0;
+ }
+
+ private Context context = null;
+ private long timeout = 0;
+ private int size = 0;
+ private int next = 0;
+ private Socket[] socket = null;
+ private short[] event = null;
+ private short[] revent = null;
+}