summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-09-20 10:14:21 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-09-20 10:14:21 +0200
commit50a8b9ea0c4a819073b46449dee8fc839b837ae5 (patch)
treea1effc887ebb0e824959b114dd0ed67e788d0507
parentedecf75b611cf0e6b1c2658846cff013434edad4 (diff)
'flags' parameter added to zmq_init
-rw-r--r--bindings/c/zmq.h6
-rw-r--r--bindings/cpp/zmq.hpp4
-rw-r--r--bindings/java/Context.cpp4
-rw-r--r--bindings/java/org/zmq/Context.java8
-rw-r--r--bindings/python/pyzmq.cpp13
-rw-r--r--bindings/ruby/rbzmq.cpp11
-rw-r--r--perf/c/local_lat.c2
-rw-r--r--perf/c/local_thr.c2
-rw-r--r--perf/c/remote_lat.c2
-rw-r--r--perf/c/remote_thr.c2
-rw-r--r--perf/java/local_lat.java2
-rw-r--r--perf/java/local_thr.java2
-rw-r--r--perf/java/remote_lat.java2
-rw-r--r--perf/java/remote_thr.java2
-rw-r--r--perf/ruby/local_lat.rb2
-rw-r--r--perf/ruby/local_thr.rb2
-rw-r--r--perf/ruby/remote_lat.rb2
-rw-r--r--perf/ruby/remote_thr.rb2
-rw-r--r--src/app_thread.cpp3
-rw-r--r--src/app_thread.hpp3
-rw-r--r--src/dispatcher.cpp8
-rw-r--r--src/dispatcher.hpp2
-rw-r--r--src/fd_signaler.cpp6
-rw-r--r--src/io_thread.cpp3
-rw-r--r--src/io_thread.hpp3
-rw-r--r--src/zmq.cpp8
26 files changed, 65 insertions, 41 deletions
diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h
index 732ecb9..797d060 100644
--- a/bindings/c/zmq.h
+++ b/bindings/c/zmq.h
@@ -91,6 +91,10 @@ extern "C" {
// the peer that the previous recv delivered message from.
#define ZMQ_REP 4
+// Option specifying that the sockets should be pollable. This may be a little
+// less efficient that raw non-pollable sockets.
+#define ZMQ_POLL 1
+
// Prototype for the message body deallocation functions.
// It is deliberately defined in the way to comply with standard C free.
typedef void (zmq_free_fn) (void *data);
@@ -150,7 +154,7 @@ ZMQ_EXPORT int zmq_msg_type (struct zmq_msg_t *msg);
//
// Errors: EINVAL - one of the arguments is less than zero or there are no
// threads declared at all.
-ZMQ_EXPORT void *zmq_init (int app_threads, int io_threads);
+ZMQ_EXPORT void *zmq_init (int app_threads, int io_threads, int flags);
// Deinitialise 0MQ context including all the open sockets. Closing
// sockets after zmq_term has been called will result in undefined behaviour.
diff --git a/bindings/cpp/zmq.hpp b/bindings/cpp/zmq.hpp
index 471d1d8..a9e63b5 100644
--- a/bindings/cpp/zmq.hpp
+++ b/bindings/cpp/zmq.hpp
@@ -180,9 +180,9 @@ namespace zmq
public:
- inline context_t (int app_threads_, int io_threads_)
+ inline context_t (int app_threads_, int io_threads_, int flags_ = 0)
{
- ptr = zmq_init (app_threads_, io_threads_);
+ ptr = zmq_init (app_threads_, io_threads_, flags_);
if (ptr == NULL)
throw error_t ();
}
diff --git a/bindings/java/Context.cpp b/bindings/java/Context.cpp
index 67094e8..d2138c3 100644
--- a/bindings/java/Context.cpp
+++ b/bindings/java/Context.cpp
@@ -52,7 +52,7 @@ static void raise_exception (JNIEnv *env, int err)
}
JNIEXPORT void JNICALL Java_org_zmq_Context_construct (JNIEnv *env, jobject obj,
- jint app_threads, jint io_threads)
+ jint app_threads, jint io_threads, jint flags)
{
if (ctx_handle_fid == NULL) {
jclass cls = env->GetObjectClass (obj);
@@ -62,7 +62,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Context_construct (JNIEnv *env, jobject obj,
env->DeleteLocalRef (cls);
}
- void *ctx = zmq_init (app_threads, io_threads);
+ void *ctx = zmq_init (app_threads, io_threads, flags);
if (ctx == NULL) {
raise_exception (env, errno);
return;
diff --git a/bindings/java/org/zmq/Context.java b/bindings/java/org/zmq/Context.java
index c63ef60..408c6b0 100644
--- a/bindings/java/org/zmq/Context.java
+++ b/bindings/java/org/zmq/Context.java
@@ -24,14 +24,16 @@ public class Context {
System.loadLibrary("jzmq");
}
+ public static final int POLL = 1;
+
/**
* Class constructor.
*
* @param appThreads maximum number of application threads.
* @param ioThreads size of the threads pool to handle I/O operations.
*/
- public Context (int appThreads, int ioThreads) {
- construct (appThreads, ioThreads);
+ public Context (int appThreads, int ioThreads, int flags) {
+ construct (appThreads, ioThreads, flags);
}
/**
@@ -40,7 +42,7 @@ public class Context {
public native long createSocket (int type);
/** Initialize the JNI interface */
- protected native void construct (int appThreads, int ioThreads);
+ protected native void construct (int appThreads, int ioThreads, int flags);
/** Free resources used by JNI driver. */
protected native void finalize ();
diff --git a/bindings/python/pyzmq.cpp b/bindings/python/pyzmq.cpp
index 4bb5653..32f30fe 100644
--- a/bindings/python/pyzmq.cpp
+++ b/bindings/python/pyzmq.cpp
@@ -53,15 +53,16 @@ int context_init (context_t *self, PyObject *args, PyObject *kwdict)
{
int app_threads;
int io_threads;
- static const char *kwlist [] = {"app_threads", "io_threads", NULL};
- if (!PyArg_ParseTupleAndKeywords (args, kwdict, "ii", (char**) kwlist,
- &app_threads, &io_threads)) {
+ int flags = 0;
+ static const char *kwlist [] = {"app_threads", "io_threads", "flags", NULL};
+ if (!PyArg_ParseTupleAndKeywords (args, kwdict, "ii|i", (char**) kwlist,
+ &app_threads, &io_threads, &flags)) {
PyErr_SetString (PyExc_SystemError, "invalid arguments");
return -1;
}
assert (!self->handle);
- self->handle = zmq_init (app_threads, io_threads);
+ self->handle = zmq_init (app_threads, io_threads, flags);
if (!self->handle) {
PyErr_SetString (PyExc_SystemError, strerror (errno));
return -1;
@@ -522,7 +523,9 @@ PyMODINIT_FUNC initlibpyzmq ()
t = PyInt_FromLong (ZMQ_MCAST_LOOP);
PyDict_SetItemString (dict, "MCAST_LOOP", t);
Py_DECREF (t);
-
+ t = PyInt_FromLong (ZMQ_POLL);
+ PyDict_SetItemString (dict, "POLL", t);
+ Py_DECREF (t);
}
#if defined _MSC_VER
diff --git a/bindings/ruby/rbzmq.cpp b/bindings/ruby/rbzmq.cpp
index bf0d9bc..83bb2b6 100644
--- a/bindings/ruby/rbzmq.cpp
+++ b/bindings/ruby/rbzmq.cpp
@@ -38,10 +38,11 @@ static VALUE context_alloc (VALUE class_)
}
static VALUE context_initialize (VALUE self_, VALUE app_threads_,
- VALUE io_threads_)
+ VALUE io_threads_, VALUE flags_)
{
assert (!DATA_PTR (self_));
- void *ctx = zmq_init (NUM2INT (app_threads_), NUM2INT (io_threads_));
+ void *ctx = zmq_init (NUM2INT (app_threads_), NUM2INT (io_threads_),
+ NUM2INT (flags_));
if (!ctx) {
rb_raise (rb_eRuntimeError, strerror (errno));
return Qnil;
@@ -105,8 +106,8 @@ static VALUE socket_setsockopt (VALUE self_, VALUE option_,
rc = zmq_setsockopt (DATA_PTR (self_), NUM2INT (option_),
(void *) &optval, 4);
}
-
break;
+
case ZMQ_IDENTITY:
case ZMQ_SUBSCRIBE:
case ZMQ_UNSUBSCRIBE:
@@ -236,7 +237,7 @@ extern "C" void Init_librbzmq ()
VALUE context_type = rb_define_class ("Context", rb_cObject);
rb_define_alloc_func (context_type, context_alloc);
rb_define_method (context_type, "initialize",
- (VALUE(*)(...)) context_initialize, 2);
+ (VALUE(*)(...)) context_initialize, 3);
VALUE socket_type = rb_define_class ("Socket", rb_cObject);
rb_define_alloc_func (socket_type, socket_alloc);
@@ -274,4 +275,6 @@ extern "C" void Init_librbzmq ()
rb_define_global_const ("PUB", INT2NUM (ZMQ_PUB));
rb_define_global_const ("REQ", INT2NUM (ZMQ_REQ));
rb_define_global_const ("REP", INT2NUM (ZMQ_REP));
+
+ rb_define_global_const ("POLL", INT2NUM (ZMQ_POLL));
}
diff --git a/perf/c/local_lat.c b/perf/c/local_lat.c
index 86d4721..2cbae13 100644
--- a/perf/c/local_lat.c
+++ b/perf/c/local_lat.c
@@ -42,7 +42,7 @@ int main (int argc, char *argv [])
message_size = atoi (argv [2]);
roundtrip_count = atoi (argv [3]);
- ctx = zmq_init (1, 1);
+ ctx = zmq_init (1, 1, 0);
assert (ctx);
s = zmq_socket (ctx, ZMQ_REP);
diff --git a/perf/c/local_thr.c b/perf/c/local_thr.c
index 9b94330..f9ab720 100644
--- a/perf/c/local_thr.c
+++ b/perf/c/local_thr.c
@@ -45,7 +45,7 @@ int main (int argc, char *argv [])
message_size = atoi (argv [2]);
message_count = atoi (argv [3]);
- ctx = zmq_init (1, 1);
+ ctx = zmq_init (1, 1, 0);
assert (ctx);
s = zmq_socket (ctx, ZMQ_SUB);
diff --git a/perf/c/remote_lat.c b/perf/c/remote_lat.c
index 15dfc46..52aa071 100644
--- a/perf/c/remote_lat.c
+++ b/perf/c/remote_lat.c
@@ -46,7 +46,7 @@ int main (int argc, char *argv [])
message_size = atoi (argv [2]);
roundtrip_count = atoi (argv [3]);
- ctx = zmq_init (1, 1);
+ ctx = zmq_init (1, 1, 0);
assert (ctx);
s = zmq_socket (ctx, ZMQ_REQ);
diff --git a/perf/c/remote_thr.c b/perf/c/remote_thr.c
index 8814e0f..fb685cd 100644
--- a/perf/c/remote_thr.c
+++ b/perf/c/remote_thr.c
@@ -42,7 +42,7 @@ int main (int argc, char *argv [])
message_size = atoi (argv [2]);
message_count = atoi (argv [3]);
- ctx = zmq_init (1, 1);
+ ctx = zmq_init (1, 1, 0);
assert (ctx);
s = zmq_socket (ctx, ZMQ_PUB);
diff --git a/perf/java/local_lat.java b/perf/java/local_lat.java
index e7583cc..873f636 100644
--- a/perf/java/local_lat.java
+++ b/perf/java/local_lat.java
@@ -33,7 +33,7 @@ class local_lat
int messageSize = Integer.parseInt (args [1]);
int roundtripCount = Integer.parseInt (args [2]);
- org.zmq.Context ctx = new org.zmq.Context (1, 1);
+ org.zmq.Context ctx = new org.zmq.Context (1, 1, 0);
org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.REP);
s.bind (bindTo);
diff --git a/perf/java/local_thr.java b/perf/java/local_thr.java
index 450aed5..2881a08 100644
--- a/perf/java/local_thr.java
+++ b/perf/java/local_thr.java
@@ -33,7 +33,7 @@ class local_thr
long messageSize = Integer.parseInt (args [1]);
long messageCount = Integer.parseInt (args [2]);
- org.zmq.Context ctx = new org.zmq.Context (1, 1);
+ org.zmq.Context ctx = new org.zmq.Context (1, 1, 0);
org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.SUB);
diff --git a/perf/java/remote_lat.java b/perf/java/remote_lat.java
index 28b2f76..905b82c 100644
--- a/perf/java/remote_lat.java
+++ b/perf/java/remote_lat.java
@@ -33,7 +33,7 @@ class remote_lat
int messageSize = Integer.parseInt (args [1]);
int roundtripCount = Integer.parseInt (args [2]);
- org.zmq.Context ctx = new org.zmq.Context (1, 1);
+ org.zmq.Context ctx = new org.zmq.Context (1, 1, 0);
org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.REQ);
s.connect (connectTo);
diff --git a/perf/java/remote_thr.java b/perf/java/remote_thr.java
index 9182a8f..f696fc3 100644
--- a/perf/java/remote_thr.java
+++ b/perf/java/remote_thr.java
@@ -34,7 +34,7 @@ class remote_thr
int messageSize = Integer.parseInt (args [1]);
int messageCount = Integer.parseInt (args [2]);
- org.zmq.Context ctx = new org.zmq.Context (1, 1);
+ org.zmq.Context ctx = new org.zmq.Context (1, 1, 0);
org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.PUB);
diff --git a/perf/ruby/local_lat.rb b/perf/ruby/local_lat.rb
index b71af80..8806ccb 100644
--- a/perf/ruby/local_lat.rb
+++ b/perf/ruby/local_lat.rb
@@ -27,7 +27,7 @@ bind_to = ARGV[0]
message_size = ARGV[1].to_i
roundtrip_count = ARGV[2].to_i
-ctx = Context.new(1, 1)
+ctx = Context.new(1, 1, 0)
s = Socket.new(ctx, REP);
s.bind(bind_to);
diff --git a/perf/ruby/local_thr.rb b/perf/ruby/local_thr.rb
index b916f2d..6d9d13f 100644
--- a/perf/ruby/local_thr.rb
+++ b/perf/ruby/local_thr.rb
@@ -27,7 +27,7 @@ bind_to = ARGV[0]
message_size = ARGV[1].to_i
message_count = ARGV[2].to_i
-ctx = Context.new(1, 1)
+ctx = Context.new(1, 1, 0)
s = Socket.new(ctx, SUB);
s.setsockopt(SUBSCRIBE, "*");
diff --git a/perf/ruby/remote_lat.rb b/perf/ruby/remote_lat.rb
index 9387fe1..f5b438d 100644
--- a/perf/ruby/remote_lat.rb
+++ b/perf/ruby/remote_lat.rb
@@ -27,7 +27,7 @@ connect_to = ARGV[0]
message_size = ARGV[1].to_i
roundtrip_count = ARGV[2].to_i
-ctx = Context.new(1, 1)
+ctx = Context.new(1, 1, 0)
s = Socket.new(ctx, REQ);
s.connect(connect_to);
diff --git a/perf/ruby/remote_thr.rb b/perf/ruby/remote_thr.rb
index ef93961..760ff88 100644
--- a/perf/ruby/remote_thr.rb
+++ b/perf/ruby/remote_thr.rb
@@ -27,7 +27,7 @@ connect_to = ARGV[0]
message_size = ARGV[1].to_i
message_count = ARGV[2].to_i
-ctx = Context.new(1, 1)
+ctx = Context.new(1, 1, 0)
s = Socket.new(ctx, PUB);
# Add your socket options here.
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index f523f40..c152fc8 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -46,7 +46,8 @@
#define ZMQ_DELAY_COMMANDS
#endif
-zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
+zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
+ int flags_) :
object_t (dispatcher_, thread_slot_),
associated (false),
last_processing_time (0)
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
index 0f95de9..5fdb92d 100644
--- a/src/app_thread.hpp
+++ b/src/app_thread.hpp
@@ -34,7 +34,8 @@ namespace zmq
{
public:
- app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
+ app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_,
+ int flags_);
~app_thread_t ();
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 4c4ec80..5e7ea46 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -30,7 +30,8 @@
#include "windows.h"
#endif
-zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
+zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
+ int flags_) :
sockets (0),
terminated (false)
{
@@ -47,7 +48,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
// Create application thread proxies.
for (int i = 0; i != app_threads_; i++) {
- app_thread_t *app_thread = new app_thread_t (this, i);
+ app_thread_t *app_thread = new app_thread_t (this, i, flags_);
zmq_assert (app_thread);
app_threads.push_back (app_thread);
signalers.push_back (app_thread->get_signaler ());
@@ -55,7 +56,8 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
// Create I/O thread objects.
for (int i = 0; i != io_threads_; i++) {
- io_thread_t *io_thread = new io_thread_t (this, i + app_threads_);
+ io_thread_t *io_thread = new io_thread_t (this, i + app_threads_,
+ flags_);
zmq_assert (io_thread);
io_threads.push_back (io_thread);
signalers.push_back (io_thread->get_signaler ());
diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp
index bd1f655..23b6a33 100644
--- a/src/dispatcher.hpp
+++ b/src/dispatcher.hpp
@@ -50,7 +50,7 @@ namespace zmq
// Create the dispatcher object. Matrix of pipes to communicate between
// each socket and each I/O thread is created along with appropriate
// signalers.
- dispatcher_t (int app_threads_, int io_threads_);
+ dispatcher_t (int app_threads_, int io_threads_, int flags_);
// This function is called when user invokes zmq_term. If there are
// no more sockets open it'll cause all the infrastructure to be shut
diff --git a/src/fd_signaler.cpp b/src/fd_signaler.cpp
index 862b0fd..41d168d 100644
--- a/src/fd_signaler.cpp
+++ b/src/fd_signaler.cpp
@@ -85,6 +85,12 @@ zmq::fd_t zmq::fd_signaler_t::get_fd ()
zmq::fd_signaler_t::fd_signaler_t ()
{
+ // Windows have no 'socketpair' function.
+ // Here we create the socketpair by hand.
+
+ // TODO: Check Windows pipe (CreatePipe). It'll presumably be more
+ // efficient than the socketpair.
+
struct sockaddr_in addr;
SOCKET listener;
int addrlen = sizeof (addr);
diff --git a/src/io_thread.cpp b/src/io_thread.cpp
index afac11c..b6521e9 100644
--- a/src/io_thread.cpp
+++ b/src/io_thread.cpp
@@ -32,7 +32,8 @@
#include "dispatcher.hpp"
#include "simple_semaphore.hpp"
-zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
+zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
+ int flags_) :
object_t (dispatcher_, thread_slot_)
{
#if defined ZMQ_FORCE_SELECT
diff --git a/src/io_thread.hpp b/src/io_thread.hpp
index f95880a..4015b0c 100644
--- a/src/io_thread.hpp
+++ b/src/io_thread.hpp
@@ -37,7 +37,8 @@ namespace zmq
{
public:
- io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
+ io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_,
+ int flags_);
// Clean-up. If the thread was started, it's neccessary to call 'stop'
// before invoking destructor. Otherwise the destructor would hang up.
diff --git a/src/zmq.cpp b/src/zmq.cpp
index c567b09..36f30eb 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -173,17 +173,17 @@ int zmq_msg_type (zmq_msg_t *msg_)
return (((const unsigned char*) msg_->content) - offset);
}
-void *zmq_init (int app_threads_, int io_threads_)
+void *zmq_init (int app_threads_, int io_threads_, int flags_)
{
// There should be at least a single thread managed by the dispatcher.
- if (app_threads_ < 0 || io_threads_ < 0 ||
- app_threads_ + io_threads_ == 0) {
+ if (app_threads_ <= 0 || io_threads_ <= 0 ||
+ app_threads_ > 63 || io_threads_ > 63) {
errno = EINVAL;
return NULL;
}
zmq::dispatcher_t *dispatcher = new zmq::dispatcher_t (app_threads_,
- io_threads_);
+ io_threads_, flags_);
zmq_assert (dispatcher);
return (void*) dispatcher;
}