diff options
| author | Martin Sustrik <sustrik@250bpm.com> | 2010-02-26 15:55:36 +0100 | 
|---|---|---|
| committer | Martin Sustrik <sustrik@250bpm.com> | 2010-02-26 15:55:36 +0100 | 
| commit | 4440b13c359dab2c1ba55e31c604ac093172d68e (patch) | |
| tree | f6d606e7b14a4d412c052f40ba4a9fbe2be565ce /bindings | |
| parent | 1c4daf79ce12da75acb8010c99b3c1d509a7a950 (diff) | |
Poller object implemented in Java binding
Diffstat (limited to 'bindings')
| -rwxr-xr-x | bindings/java/Poller.cpp | 126 | ||||
| -rwxr-xr-x | bindings/java/org/zmq/Poller.java | 135 | 
2 files changed, 261 insertions, 0 deletions
| diff --git a/bindings/java/Poller.cpp b/bindings/java/Poller.cpp new file mode 100755 index 0000000..bf7a043 --- /dev/null +++ b/bindings/java/Poller.cpp @@ -0,0 +1,126 @@ +/* +    Copyright (c) 2007-2010 iMatix Corporation + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <assert.h> +#include <errno.h> + +#include "../c/zmq.h" + +#include "org_zmq_Poller.h" + +static void *fetch_socket (JNIEnv *env, jobject socket); + +JNIEXPORT jlong JNICALL Java_org_zmq_Poller_run_1poll (JNIEnv *env, +                                                       jobject obj, +                                                       jint count, +                                                       jobjectArray socket_0mq, +                                                       jshortArray event_0mq, +                                                       jshortArray revent_0mq, +                                                       jlong timeout) +{ +    int ls = (int) count; +    if (ls <= 0) +        return 0; +     +    int ls_0mq = 0; +    int le_0mq = 0; +    int lr_0mq = 0; + +    if (socket_0mq) +        ls_0mq = env->GetArrayLength (socket_0mq); +    if (event_0mq) +        le_0mq = env->GetArrayLength (event_0mq); +    if (revent_0mq) +        lr_0mq = env->GetArrayLength (revent_0mq); + +    if (ls > ls_0mq || ls > le_0mq || ls > ls_0mq) +        return 0; + +    zmq_pollitem_t *pitem = new zmq_pollitem_t [ls]; +    short pc = 0; +    int rc = 0; + +    //  Add 0MQ sockets. +    if (ls_0mq > 0) { +        jshort *e_0mq = env->GetShortArrayElements (event_0mq, 0); +        if (e_0mq != NULL) { +            for (int i = 0; i < ls_0mq; ++i) { +                jobject s_0mq = env->GetObjectArrayElement (socket_0mq, i); +                if (!s_0mq) +                    continue; +                void *s = fetch_socket (env, s_0mq); +                if (!s) +                    continue; +                pitem [pc].socket = s; +                pitem [pc].fd = 0; +                pitem [pc].events = e_0mq [i]; +                pitem [pc].revents = 0; +                ++pc; +            } +            env->ReleaseShortArrayElements(event_0mq, e_0mq, 0); +        } +    } + +    if (pc == ls) { +        pc = 0; +        long tout = (long) timeout; +        rc = zmq_poll (pitem, ls, tout); +    } + +    //  Set 0MQ results. +    if (ls_0mq > 0) { +        jshort *r_0mq = env->GetShortArrayElements (revent_0mq, 0); +        if (r_0mq) { +            for (int i = 0; i < ls_0mq; ++i) { +                r_0mq [i] = pitem [pc].revents; +                ++pc; +            } +            env->ReleaseShortArrayElements(revent_0mq, r_0mq, 0); +        } +    } + +    delete [] pitem; +    return rc; +} +   +/** + * Get the value of socketHandle for the specified Java Socket. + * TODO: move this to a single util.h file. + */ +static void *fetch_socket (JNIEnv *env, jobject socket) +{ +    static jmethodID get_socket_handle_mid = NULL; + +    if (get_socket_handle_mid == NULL) { +        jclass cls = env->GetObjectClass (socket); +        assert (cls); +        get_socket_handle_mid = env->GetMethodID (cls, +            "getSocketHandle", "()J"); +        env->DeleteLocalRef (cls); +        assert (get_socket_handle_mid); +    } +   +    void *s = (void*) env->CallLongMethod (socket, get_socket_handle_mid); +    if (env->ExceptionCheck ()) { +        s = NULL; +    } +   +    assert (s); +    return s; +} diff --git a/bindings/java/org/zmq/Poller.java b/bindings/java/org/zmq/Poller.java new file mode 100755 index 0000000..dd168b2 --- /dev/null +++ b/bindings/java/org/zmq/Poller.java @@ -0,0 +1,135 @@ +/* +    Copyright (c) 2007-2010 iMatix Corporation + +    This file is part of 0MQ. + +    0MQ is free software; you can redistribute it and/or modify it under +    the terms of the Lesser GNU General Public License as published by +    the Free Software Foundation; either version 3 of the License, or +    (at your option) any later version. + +    0MQ is distributed in the hope that it will be useful, +    but WITHOUT ANY WARRANTY; without even the implied warranty of +    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +    Lesser GNU General Public License for more details. + +    You should have received a copy of the Lesser GNU General Public License +    along with this program.  If not, see <http://www.gnu.org/licenses/>. +*/ + +package org.zmq; + +public class Poller { +    static { +        System.loadLibrary("jzmq"); +    } + +    public static final int POLLIN = 1; +    public static final int POLLOUT = 2; +    public static final int POLLERR = 4; + +    /** +     * Class constructor. +     * +     * @param context a 0MQ context previously created. +     */ +    public Poller (Context context, int size) { +        this.context = context; +        this.size = size; +        this.next = 0; + +        this.socket = new Socket[size]; +        this.event = new short[size]; +        this.revent = new short[size]; + +        for (int i = 0; i < size; ++i) { +            this.event[i] = (POLLIN | POLLOUT | POLLERR); +        } +    } + +    public int register (Socket socket) { +        if (next >= size) +            return -1; +        this.socket[next] = socket; +        return next++; +    } +     +    public long getTimeout () { +        return this.timeout; +    } +     +    public void setTimeout (long timeout) { +        this.timeout = timeout; +    } + +    public int getSize () { +        return this.size; +    } + +    public int getNext () { +        return this.next; +    } + +    /** +     * Issue a poll call. +     * @return how many objects where signalled by poll(). +     */ +    public long poll () { +        if (size <= 0 || next <= 0) +            return 0; + +        for (int i = 0; i < next; ++i) { +            revent[i] = 0; +        } + +        return run_poll(next, socket, event, revent, timeout); +    } + +    public boolean pollin(int index) { +        return poll_mask(index, POLLIN); +    } + +    public boolean pollout(int index) { +        return poll_mask(index, POLLOUT); +    } + +    public boolean pollerr(int index) { +        return poll_mask(index, POLLERR); +    } + +    /** +     * Issue a poll call on the specified 0MQ sockets. +     * +     * @param socket an array of 0MQ Socket objects to poll. +     * @param event an array of short values specifying what to poll for. +     * @param revent an array of short values with the results. +     * @param timeout the maximum timeout in microseconds. +     * @return how many objects where signalled by poll(). +     */ +    private native long run_poll(int count, +                                 Socket[] socket, +                                 short[] event, +                                 short[] revent, +                                 long timeout); + +    /** +     * Check whether a specific mask was signalled by latest poll call. +     * +     * @param index the index indicating the socket. +     * @param mask a combination of POLLIN, POLLOUT and POLLERR. +     * @return true if specific socket was signalled as specified. +     */ +    private boolean poll_mask(int index, int mask) { +        if (mask <= 0 || index < 0 || index >= next) +            return false; +        return (revent[index] & mask) > 0; +    } + +    private Context context = null; +    private long timeout = 0; +    private int size = 0; +    private int next = 0; +    private Socket[] socket = null; +    private short[] event = null; +    private short[] revent = null; +} | 
