diff options
Diffstat (limited to 'bindings')
-rw-r--r-- | bindings/Makefile.am | 15 | ||||
-rw-r--r-- | bindings/c/zmq.h | 216 | ||||
-rw-r--r-- | bindings/cpp/zmq.hpp | 283 | ||||
-rw-r--r-- | bindings/java/Context.cpp | 96 | ||||
-rw-r--r-- | bindings/java/Makefile.am | 58 | ||||
-rw-r--r-- | bindings/java/Socket.cpp | 272 | ||||
-rw-r--r-- | bindings/java/org/zmq/Context.java | 50 | ||||
-rw-r--r-- | bindings/java/org/zmq/Socket.java | 112 | ||||
-rw-r--r-- | bindings/python/Makefile.am | 7 | ||||
-rw-r--r-- | bindings/python/pyzmq.cpp | 528 | ||||
-rw-r--r-- | bindings/python/setup.py.in | 14 | ||||
-rw-r--r-- | bindings/ruby/Makefile.am | 11 | ||||
-rw-r--r-- | bindings/ruby/extconf.rb | 24 | ||||
-rw-r--r-- | bindings/ruby/rbzmq.cpp | 277 |
14 files changed, 1963 insertions, 0 deletions
diff --git a/bindings/Makefile.am b/bindings/Makefile.am new file mode 100644 index 0000000..77b4ec2 --- /dev/null +++ b/bindings/Makefile.am @@ -0,0 +1,15 @@ +if BUILD_JAVA +DIR_J = java +endif + +if BUILD_PYTHON +DIR_P = python +endif + +if BUILD_RUBY +DIR_R = ruby +endif + +SUBDIRS = $(DIR_J) $(DIR_P) $(DIR_R) +DIST_SUBDIRS = java python ruby + diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h new file mode 100644 index 0000000..732ecb9 --- /dev/null +++ b/bindings/c/zmq.h @@ -0,0 +1,216 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_H_INCLUDED__ +#define __ZMQ_H_INCLUDED__ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <stddef.h> + +#if defined ZMQ_BUILDING_LIBZMQ_WITH_MSVC +#define ZMQ_EXPORT __declspec(dllexport) +#else +#define ZMQ_EXPORT +#endif + +// Maximal size of "Very Small Message". VSMs are passed by value +// to avoid excessive memory allocation/deallocation. +// If VMSs larger than 255 bytes are required, type of 'vsm_size' +// field in zmq_msg_t structure should be modified accordingly. +#define ZMQ_MAX_VSM_SIZE 30 + +// Message & notification types. +#define ZMQ_GAP 1 +#define ZMQ_DELIMITER 31 +#define ZMQ_VSM 32 + +// Socket options. +#define ZMQ_HWM 1 // int64_t +#define ZMQ_LWM 2 // int64_t +#define ZMQ_SWAP 3 // int64_t +#define ZMQ_AFFINITY 4 // int64_t +#define ZMQ_IDENTITY 5 // string +#define ZMQ_SUBSCRIBE 6 // string +#define ZMQ_UNSUBSCRIBE 7 // string +#define ZMQ_RATE 8 // int64_t +#define ZMQ_RECOVERY_IVL 9 // int64_t +#define ZMQ_MCAST_LOOP 10 // int64_t + +// The operation should be performed in non-blocking mode. I.e. if it cannot +// be processed immediately, error should be returned with errno set to EAGAIN. +#define ZMQ_NOBLOCK 1 + +// zmq_send should not flush the message downstream immediately. Instead, it +// should batch ZMQ_NOFLUSH messages and send them downstream only if zmq_flush +// is invoked. This is an optimisation for cases where several messages are +// sent in a single business transaction. However, the effect is measurable +// only in extremely high-perf scenarios (million messages a second or so). +// If that's not your case, use standard flushing send instead. See exchange +// example for illustration of ZMQ_NOFLUSH functionality. +#define ZMQ_NOFLUSH 2 + +// Socket to communicate with a single peer. Allows for a singe connect or a +// single accept. There's no message routing or message filtering involved. +#define ZMQ_P2P 0 + +// Socket to distribute data. Recv fuction is not implemented for this socket +// type. Messages are distributed in fanout fashion to all peers. +#define ZMQ_PUB 1 + +// Socket to subscribe to distributed data. Send function is not implemented +// for this socket type. However, subscribe function can be used to modify the +// message filter. +#define ZMQ_SUB 2 + +// Socket to send requests on and receive replies from. Requests are +// load-balanced among all the peers. This socket type doesn't allow for more +// recv's that there were send's. +#define ZMQ_REQ 3 + +// Socket to receive requests from and send replies to. This socket type allows +// only an alternated sequence of recv's and send's. Each send is routed to +// the peer that the previous recv delivered message from. +#define ZMQ_REP 4 + +// Prototype for the message body deallocation functions. +// It is deliberately defined in the way to comply with standard C free. +typedef void (zmq_free_fn) (void *data); + +// A message. If 'shared' is true, message content pointed to by 'content' +// is shared, i.e. reference counting is used to manage its lifetime +// rather than straighforward malloc/free. struct zmq_msg_content is +// not declared in the API. +struct zmq_msg_t +{ + void *content; + unsigned char shared; + unsigned char vsm_size; + unsigned char vsm_data [ZMQ_MAX_VSM_SIZE]; +}; + +// Initialise an empty message (zero bytes long). +ZMQ_EXPORT int zmq_msg_init (struct zmq_msg_t *msg); + +// Initialise a message 'size' bytes long. +// +// Errors: ENOMEM - the size is too large to allocate. +ZMQ_EXPORT int zmq_msg_init_size (struct zmq_msg_t *msg, size_t size); + +// Initialise a message from an existing buffer. Message isn't copied, +// instead 0MQ infrastructure take ownership of the buffer and call +// deallocation functio (ffn) once it's not needed anymore. +ZMQ_EXPORT int zmq_msg_init_data (struct zmq_msg_t *msg, void *data, + size_t size, zmq_free_fn *ffn); + +// Deallocate the message. +ZMQ_EXPORT int zmq_msg_close (struct zmq_msg_t *msg); + +// Move the content of the message from 'src' to 'dest'. The content isn't +// copied, just moved. 'src' is an empty message after the call. Original +// content of 'dest' message is deallocated. +ZMQ_EXPORT int zmq_msg_move (struct zmq_msg_t *dest, struct zmq_msg_t *src); + +// Copy the 'src' message to 'dest'. The content isn't copied, instead +// reference count is increased. Don't modify the message data after the +// call as they are shared between two messages. Original content of 'dest' +// message is deallocated. +ZMQ_EXPORT int zmq_msg_copy (struct zmq_msg_t *dest, struct zmq_msg_t *src); + +// Returns pointer to message data. +ZMQ_EXPORT void *zmq_msg_data (struct zmq_msg_t *msg); + +// Return size of message data (in bytes). +ZMQ_EXPORT size_t zmq_msg_size (struct zmq_msg_t *msg); + +// Returns type of the message. +ZMQ_EXPORT int zmq_msg_type (struct zmq_msg_t *msg); + +// Initialise 0MQ context. 'app_threads' specifies maximal number +// of application threads that can have open sockets at the same time. +// 'io_threads' specifies the size of thread pool to handle I/O operations. +// +// Errors: EINVAL - one of the arguments is less than zero or there are no +// threads declared at all. +ZMQ_EXPORT void *zmq_init (int app_threads, int io_threads); + +// Deinitialise 0MQ context including all the open sockets. Closing +// sockets after zmq_term has been called will result in undefined behaviour. +ZMQ_EXPORT int zmq_term (void *context); + +// Open a socket. +// +// Errors: EINVAL - invalid socket type. +// EMFILE - the number of application threads entitled to hold open +// sockets at the same time was exceeded. +ZMQ_EXPORT void *zmq_socket (void *context, int type); + +// Close the socket. +ZMQ_EXPORT int zmq_close (void *s); + +// Sets an option on the socket. +// EINVAL - unknown option, a value with incorrect length or an invalid value. +ZMQ_EXPORT int zmq_setsockopt (void *s, int option_, const void *optval_, + size_t optvallen_); + +// Bind the socket to a particular address. +ZMQ_EXPORT int zmq_bind (void *s, const char *addr); + +// Connect the socket to a particular address. +ZMQ_EXPORT int zmq_connect (void *s, const char *addr); + +// Send the message 'msg' to the socket 's'. 'flags' argument can be +// combination of following values: +// ZMQ_NOBLOCK - if message cannot be sent, return immediately. +// ZMQ_NOFLUSH - message won't be sent immediately. It'll be sent with either +// subsequent flushing send or explicit call to zmq_flush +// function. +// +// Errors: EAGAIN - message cannot be sent at the moment (applies only to +// non-blocking send). +// EFAULT - function isn't supported by particular socket type. +ZMQ_EXPORT int zmq_send (void *s, struct zmq_msg_t *msg, int flags); + +// Flush the messages that were send using ZMQ_NOFLUSH flag down the stream. +// +// Errors: FAULT - function isn't supported by particular socket type. +ZMQ_EXPORT int zmq_flush (void *s); + +// Send a message from the socket 's'. 'flags' argument can be combination +// of following values: +// ZMQ_NOBLOCK - if message cannot be received, return immediately. +// +// Errors: EAGAIN - message cannot be received at the moment (applies only to +// non-blocking receive). +// EFAULT - function isn't supported by particular socket type. +ZMQ_EXPORT int zmq_recv (void *s, struct zmq_msg_t *msg, int flags); + +// Helper functions used by perf tests so that they don't have to care +// about minutiae of time-related functions on different OS platforms. +ZMQ_EXPORT void *zmq_stopwatch_start (); +ZMQ_EXPORT unsigned long zmq_stopwatch_stop (void *watch_); +ZMQ_EXPORT void zmq_sleep (int seconds_); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/bindings/cpp/zmq.hpp b/bindings/cpp/zmq.hpp new file mode 100644 index 0000000..471d1d8 --- /dev/null +++ b/bindings/cpp/zmq.hpp @@ -0,0 +1,283 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef __ZMQ_HPP_INCLUDED__ +#define __ZMQ_HPP_INCLUDED__ + +#include "zmq.h" + +#include <assert.h> +#include <errno.h> +#include <string.h> +#include <exception> + +namespace zmq +{ + + typedef zmq_free_fn free_fn; + + enum message_type_t + { + message_data = 1 << 0, + message_gap = 1 << ZMQ_GAP, + message_delimiter = 1 << ZMQ_DELIMITER + }; + + // The class masquerades POSIX-style errno error as a C++ exception. + class error_t : public std::exception + { + public: + + error_t () : errnum (errno) {} + + virtual const char *what () const throw () + { +#if defined _MSC_VER +#pragma warning (push) +#pragma warning (disable:4996) +#endif + return strerror (errnum); +#if defined _MSC_VER +#pragma warning (pop) +#endif + } + + private: + + int errnum; + }; + + // A message. Caution: Don't change the body of the message once you've + // copied it - the behaviour is undefined. Don't change the body of the + // received message either - other threads may be accessing it in parallel. + + class message_t : private zmq_msg_t + { + friend class socket_t; + + public: + + // Creates message size_ bytes long. + inline message_t (size_t size_ = 0) + { + int rc = zmq_msg_init_size (this, size_); + if (rc != 0) + throw error_t (); + } + + // Creates message from the supplied buffer. 0MQ takes care of + // deallocating the buffer once it is not needed. The deallocation + // function is supplied in ffn_ parameter. If ffn_ is NULL, no + // deallocation happens - this is useful for sending static buffers. + inline message_t (void *data_, size_t size_, + free_fn *ffn_) + { + int rc = zmq_msg_init_data (this, data_, size_, ffn_); + if (rc != 0) + throw error_t (); + } + + // Destroys the message. + inline ~message_t () + { + int rc = zmq_msg_close (this); + if (rc != 0) + throw error_t (); + } + + // Destroys old content of the message and allocates buffer for the + // new message body. Having this as a separate function allows user + // to reuse once-allocated message for multiple times. + inline void rebuild (size_t size_) + { + int rc = zmq_msg_close (this); + if (rc != 0) + throw error_t (); + rc = zmq_msg_init_size (this, size_); + if (rc != 0) + throw error_t (); + } + + // Same as above, however, the message is rebuilt from the supplied + // buffer. See appropriate constructor for discussion of buffer + // deallocation mechanism. + inline void rebuild (void *data_, size_t size_, free_fn *ffn_) + { + int rc = zmq_msg_close (this); + if (rc != 0) + throw error_t (); + rc = zmq_msg_init_data (this, data_, size_, ffn_); + if (rc != 0) + throw error_t (); + } + + // Moves the message content from one message to the another. If the + // destination message have contained data prior to the operation + // these get deallocated. The source message will contain 0 bytes + // of data after the operation. + inline void move_to (message_t *msg_) + { + int rc = zmq_msg_move (this, (zmq_msg_t*) msg_); + if (rc != 0) + throw error_t (); + } + + // Copies the message content from one message to the another. If the + // destination message have contained data prior to the operation + // these get deallocated. + inline void copy_to (message_t *msg_) + { + int rc = zmq_msg_copy (this, (zmq_msg_t*) msg_); + if (rc != 0) + throw error_t (); + } + + // Returns message type. + inline message_type_t type () + { + return (message_type_t) (1 << zmq_msg_type (this)); + } + + // Returns pointer to message's data buffer. + inline void *data () + { + return zmq_msg_data (this); + } + + // Returns the size of message data buffer. + inline size_t size () + { + return zmq_msg_size (this); + } + + private: + + // Disable implicit message copying, so that users won't use shared + // messages (less efficient) without being aware of the fact. + message_t (const message_t&); + void operator = (const message_t&); + }; + + class context_t + { + friend class socket_t; + + public: + + inline context_t (int app_threads_, int io_threads_) + { + ptr = zmq_init (app_threads_, io_threads_); + if (ptr == NULL) + throw error_t (); + } + + inline ~context_t () + { + int rc = zmq_term (ptr); + assert (rc == 0); + } + + private: + + void *ptr; + + // Disable copying. + context_t (const context_t&); + void operator = (const context_t&); + }; + + class socket_t + { + public: + + inline socket_t (context_t &context_, int type_ = 0) + { + ptr = zmq_socket (context_.ptr, type_); + if (ptr == NULL) + throw error_t (); + } + + inline ~socket_t () + { + int rc = zmq_close (ptr); + if (rc != 0) + throw error_t (); + } + + inline void setsockopt (int option_, const void *optval_, + size_t optvallen_) + { + int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_); + if (rc != 0) + throw error_t (); + } + + inline void bind (const char *addr_) + { + int rc = zmq_bind (ptr, addr_); + if (rc != 0) + throw error_t (); + } + + inline void connect (const char *addr_) + { + int rc = zmq_connect (ptr, addr_); + if (rc != 0) + throw error_t (); + } + + inline bool send (message_t &msg_, int flags_ = 0) + { + int rc = zmq_send (ptr, &msg_, flags_); + if (rc == 0) + return true; + if (rc == -1 && errno == EAGAIN) + return false; + throw error_t (); + } + + inline void flush () + { + int rc = zmq_flush (ptr); + if (rc != 0) + throw error_t (); + } + + inline bool recv (message_t *msg_, int flags_ = 0) + { + int rc = zmq_recv (ptr, msg_, flags_); + if (rc == 0) + return true; + if (rc == -1 && errno == EAGAIN) + return false; + throw error_t (); + } + + private: + + void *ptr; + + // Disable copying. + socket_t (const socket_t&); + void operator = (const socket_t&); + }; + +} + +#endif diff --git a/bindings/java/Context.cpp b/bindings/java/Context.cpp new file mode 100644 index 0000000..67094e8 --- /dev/null +++ b/bindings/java/Context.cpp @@ -0,0 +1,96 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <errno.h> + +#include "zmq.h" +#include "org_zmq_Context.h" + +static jfieldID ctx_handle_fid = NULL; + +static void raise_exception (JNIEnv *env, int err) +{ + // Get exception class. + jclass exception_class = env->FindClass ("java/lang/Exception"); + assert (exception_class); + + // Get text description of the exception. +#if defined _MSC_VER +#pragma warning (push) +#pragma warning (disable:4996) +#endif + const char *err_msg = strerror (err); +#if defined _MSC_VER +#pragma warning (pop) +#endif + + // Raise the exception. + int rc = env->ThrowNew (exception_class, err_msg); + assert (rc == 0); + + // Free the local ref. + env->DeleteLocalRef (exception_class); +} + +JNIEXPORT void JNICALL Java_org_zmq_Context_construct (JNIEnv *env, jobject obj, + jint app_threads, jint io_threads) +{ + if (ctx_handle_fid == NULL) { + jclass cls = env->GetObjectClass (obj); + assert (cls); + ctx_handle_fid = env->GetFieldID (cls, "contextHandle", "J"); + assert (ctx_handle_fid); + env->DeleteLocalRef (cls); + } + + void *ctx = zmq_init (app_threads, io_threads); + if (ctx == NULL) { + raise_exception (env, errno); + return; + } + + env->SetLongField (obj, ctx_handle_fid, (jlong) ctx); +} + +JNIEXPORT void JNICALL Java_org_zmq_Context_finalize (JNIEnv *env, jobject obj) +{ + void *ctx = (void*) env->GetLongField (obj, ctx_handle_fid); + assert (ctx); + + int rc = zmq_term (ctx); + assert (rc == 0); +} + +JNIEXPORT jlong JNICALL Java_org_zmq_Context_createSocket (JNIEnv *env, + jobject obj, jint type) +{ + void *ctx = (void*) env->GetLongField (obj, ctx_handle_fid); + assert (ctx); + + void *s = zmq_socket (ctx, type); + if (s == NULL) { + raise_exception (env, errno); + return -1; + } + + return (jlong) s; +} diff --git a/bindings/java/Makefile.am b/bindings/java/Makefile.am new file mode 100644 index 0000000..c9e430c --- /dev/null +++ b/bindings/java/Makefile.am @@ -0,0 +1,58 @@ +# We do not want to install Jzmq.class file +# user has to copy it to the right location. +#jzmqdir = /tmp + +jarfile = Zmq.jar +jardir = $(datadir)/java + +$(jarfile): $(dist_noinst_JAVA) + $(JAR) cf $(JARFLAGS) $@ org/zmq/*.class + +jar_DATA = $(jarfile) + +dist_noinst_JAVA = \ + org/zmq/Context.java \ + org/zmq/Socket.java + +lib_LTLIBRARIES = libjzmq.la +libjzmq_la_SOURCES = \ + Context.cpp \ + org_zmq_Context.h \ + Socket.cpp \ + org_zmq_Socket.h + +libjzmq_la_CXXFLAGS = -I$(top_srcdir)/src/libzmq \ +@JAVA_INCLUDE@ -I$(top_srcdir)/bindings/c -Wall +libjzmq_la_LDFLAGS = -version-info @JLTVER@ +libjzmq_la_LIBADD = $(top_builddir)/src/libzmq.la + +BUILT_SOURCES = \ + org/zmq/Context.class \ + org_zmq_Context.h \ + org/zmq/Socket.class \ + org_zmq_Socket.h + +CLEANFILES = \ + org/zmq/Context.class \ + org_zmq_Context.h \ + org/zmq/Socket.class \ + org_zmq_Socket.h \ + Zmq.jar + +$(srcdir)/Context.cpp: org_zmq_Context.h + +org_zmq_Context.h: org/zmq/Context.class + $(CLASSPATH_ENV) $(JAVAH) -jni -classpath . org.zmq.Context + +./org/zmq/Context.class: classdist_noinst.stamp + +$(srcdir)/Socket.cpp: org_zmq_Socket.h + +org_zmq_Socket.h: org/zmq/Socket.class + $(CLASSPATH_ENV) $(JAVAH) -jni -classpath . org.zmq.Socket + +./org/zmq/Socket.class: classdist_noinst.stamp + +dist-hook: + -rm $(distdir)/*.h + diff --git a/bindings/java/Socket.cpp b/bindings/java/Socket.cpp new file mode 100644 index 0000000..2274535 --- /dev/null +++ b/bindings/java/Socket.cpp @@ -0,0 +1,272 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include <stdlib.h> +#include <string.h> +#include <assert.h> +#include <errno.h> + +#include "../src/stdint.hpp" + +#include "zmq.h" +#include "org_zmq_Socket.h" + +static jfieldID socket_handle_fid = NULL; +static jmethodID create_socket_mid = NULL; + +static void raise_exception (JNIEnv *env, int err) +{ + // Get exception class. + jclass exception_class = env->FindClass ("java/lang/Exception"); + assert (exception_class); + + // Get text description of the exception. +#if defined _MSC_VER +#pragma warning (push) +#pragma warning (disable:4996) +#endif + const char *err_msg = strerror (err); +#if defined _MSC_VER +#pragma warning (pop) +#endif + + // Raise the exception. + int rc = env->ThrowNew (exception_class, err_msg); + assert (rc == 0); + + // Free the local ref. + env->DeleteLocalRef (exception_class); +} + +JNIEXPORT void JNICALL Java_org_zmq_Socket_construct (JNIEnv *env, jobject obj, + jobject context, jint type) +{ + if (socket_handle_fid == NULL) { + jclass cls = env->GetObjectClass (obj); + assert (cls); + socket_handle_fid = env->GetFieldID (cls, "socketHandle", "J"); + assert (socket_handle_fid); + env->DeleteLocalRef (cls); + } + + if (create_socket_mid == NULL) { + jclass cls = env->FindClass ("org/zmq/Context"); + assert (cls); + create_socket_mid = env->GetMethodID (cls, "createSocket", "(I)J"); + assert (create_socket_mid); + env->DeleteLocalRef (cls); + } + + void *s = (void*) env->CallLongMethod (context, create_socket_mid, type); + if (env->ExceptionCheck ()) + return; + + env->SetLongField (obj, socket_handle_fid, (jlong) s); +} + +JNIEXPORT void JNICALL Java_org_zmq_Socket_finalize (JNIEnv *env, jobject obj) +{ + void *s = (void*) env->GetLongField (obj, socket_handle_fid); + assert (s); + int rc = zmq_close (s); + assert (rc == 0); +} + +JNIEXPORT void JNICALL Java_org_zmq_Socket_setsockopt__IJ (JNIEnv *env, + jobject obj, jint option, jlong optval) +{ + switch (option) { + case ZMQ_HWM: + case ZMQ_LWM: + case ZMQ_SWAP: + case ZMQ_AFFINITY: + case ZMQ_RATE: + case ZMQ_RECOVERY_IVL: + case ZMQ_MCAST_LOOP: + { + void *s = (void*) env->GetLongField (obj, socket_handle_fid); + assert (s); + + int64_t value = optval; + int rc = zmq_setsockopt (s, option, &value, sizeof (value)); + if (rc != 0) + raise_exception (env, errno); + return; + } + default: + raise_exception (env, EINVAL); + return; + } +} + +JNIEXPORT void JNICALL Java_org_zmq_Socket_setsockopt__ILjava_lang_String_2 ( + JNIEnv *env, jobject obj, jint option, jstring optval) +{ + switch (option) { + case ZMQ_IDENTITY: + case ZMQ_SUBSCRIBE: + case ZMQ_UNSUBSCRIBE: + { + if (optval == NULL) { + raise_exception (env, EINVAL); + return; + } + + void *s = (void*) env->GetLongField (obj, socket_handle_fid); + assert (s); + + const char *value = env->GetStringUTFChars (optval, NULL); + assert (value); + int rc = zmq_setsockopt (s, option, value, strlen (value)); + env->ReleaseStringUTFChars (optval, value); + if (rc != 0) + raise_exception (env, errno); + return; + } + default: + raise_exception (env, EINVAL); + return; + } +} + +JNIEXPORT void JNICALL Java_org_zmq_Socket_bind (JNIEnv *env, jobject obj, + jstring addr) +{ + void *s = (void*) env->GetLongField (obj, socket_handle_fid); + assert (s); + + if (addr == NULL) { + raise_exception (env, EINVAL); + return; + } + + const char *c_addr = env->GetStringUTFChars (addr, NULL); + if (c_addr == NULL) { + raise_exception (env, EINVAL); + return; + } + + int rc = zmq_bind (s, c_addr); + env->ReleaseStringUTFChars (addr, c_addr); + + if (rc == -1) + raise_exception (env, errno); +} + +JNIEXPORT void JNICALL Java_org_zmq_Socket_connect (JNIEnv *env, jobject obj, + jstring addr) +{ + void *s = (void*) env->GetLongField (obj, socket_handle_fid); + assert (s); + + if (addr == NULL) { + raise_exception (env, EINVAL); + return; + } + + const char *c_addr = env->GetStringUTFChars (addr, NULL); + if (c_addr == NULL) { + raise_exception (env, EINVAL); + return; + } + + int rc = zmq_connect (s, c_addr); + env->ReleaseStringUTFChars (addr, c_addr); + + if (rc == -1) + raise_exception (env, errno); +} + +JNIEXPORT jboolean JNICALL Java_org_zmq_Socket_send (JNIEnv *env, jobject obj, + jbyteArray msg, jlong flags) +{ + void *s = (void*) env->GetLongField (obj, socket_handle_fid); + assert (s); + + jsize size = env->GetArrayLength (msg); + jbyte *data = env->GetByteArrayElements (msg, 0); + + zmq_msg_t message; + int rc = zmq_msg_init_size (&message, size); + assert (rc == 0); + memcpy (zmq_msg_data (&message), data, size); + + env->ReleaseByteArrayElements (msg, data, 0); + + rc = zmq_send (s, &message, (int) flags); + + if (rc == -1 && errno == EAGAIN) { + rc = zmq_msg_close (&message); + assert (rc == 0); + return JNI_FALSE; + } + + if (rc == -1) { + raise_exception (env, errno); + rc = zmq_msg_close (&message); + assert (rc == 0); + return JNI_FALSE; + } + + rc = zmq_msg_close (&message); + assert (rc == 0); + return JNI_TRUE; +} + +JNIEXPORT void JNICALL Java_org_zmq_Socket_flush (JNIEnv *env, jobject obj) +{ + void *s = (void*) env->GetLongField (obj, socket_handle_fid); + assert (s); + + int rc = zmq_flush (s); + + if (rc == -1) { + raise_exception (env, errno); + return ; + } +} + +JNIEXPORT jbyteArray JNICALL Java_org_zmq_Socket_recv (JNIEnv *env, jobject obj, + jlong flags) +{ + void *s = (void*) env->GetLongField (obj, socket_handle_fid); + assert (s); + + zmq_msg_t message; + zmq_msg_init (&message); + int rc = zmq_recv (s, &message, (int) flags); + + if (rc == -1 && errno == EAGAIN) { + zmq_msg_close (&message); + return NULL; + } + + if (rc == -1) { + raise_exception (env, errno); + zmq_msg_close (&message); + return NULL; + } + + jbyteArray data = env->NewByteArray (zmq_msg_size (&message)); + assert (data); + env->SetByteArrayRegion (data, 0, zmq_msg_size (&message), + (jbyte*) zmq_msg_data (&message)); + + return data; +} diff --git a/bindings/java/org/zmq/Context.java b/bindings/java/org/zmq/Context.java new file mode 100644 index 0000000..c63ef60 --- /dev/null +++ b/bindings/java/org/zmq/Context.java @@ -0,0 +1,50 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +package org.zmq; + +public class Context { + static { + System.loadLibrary("jzmq"); + } + + /** + * Class constructor. + * + * @param appThreads maximum number of application threads. + * @param ioThreads size of the threads pool to handle I/O operations. + */ + public Context (int appThreads, int ioThreads) { + construct (appThreads, ioThreads); + } + + /** + * Int |