diff options
| author | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-09-20 10:14:21 +0200 | 
|---|---|---|
| committer | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-09-20 10:14:21 +0200 | 
| commit | 50a8b9ea0c4a819073b46449dee8fc839b837ae5 (patch) | |
| tree | a1effc887ebb0e824959b114dd0ed67e788d0507 | |
| parent | edecf75b611cf0e6b1c2658846cff013434edad4 (diff) | |
'flags' parameter added to zmq_init
| -rw-r--r-- | bindings/c/zmq.h | 6 | ||||
| -rw-r--r-- | bindings/cpp/zmq.hpp | 4 | ||||
| -rw-r--r-- | bindings/java/Context.cpp | 4 | ||||
| -rw-r--r-- | bindings/java/org/zmq/Context.java | 8 | ||||
| -rw-r--r-- | bindings/python/pyzmq.cpp | 13 | ||||
| -rw-r--r-- | bindings/ruby/rbzmq.cpp | 11 | ||||
| -rw-r--r-- | perf/c/local_lat.c | 2 | ||||
| -rw-r--r-- | perf/c/local_thr.c | 2 | ||||
| -rw-r--r-- | perf/c/remote_lat.c | 2 | ||||
| -rw-r--r-- | perf/c/remote_thr.c | 2 | ||||
| -rw-r--r-- | perf/java/local_lat.java | 2 | ||||
| -rw-r--r-- | perf/java/local_thr.java | 2 | ||||
| -rw-r--r-- | perf/java/remote_lat.java | 2 | ||||
| -rw-r--r-- | perf/java/remote_thr.java | 2 | ||||
| -rw-r--r-- | perf/ruby/local_lat.rb | 2 | ||||
| -rw-r--r-- | perf/ruby/local_thr.rb | 2 | ||||
| -rw-r--r-- | perf/ruby/remote_lat.rb | 2 | ||||
| -rw-r--r-- | perf/ruby/remote_thr.rb | 2 | ||||
| -rw-r--r-- | src/app_thread.cpp | 3 | ||||
| -rw-r--r-- | src/app_thread.hpp | 3 | ||||
| -rw-r--r-- | src/dispatcher.cpp | 8 | ||||
| -rw-r--r-- | src/dispatcher.hpp | 2 | ||||
| -rw-r--r-- | src/fd_signaler.cpp | 6 | ||||
| -rw-r--r-- | src/io_thread.cpp | 3 | ||||
| -rw-r--r-- | src/io_thread.hpp | 3 | ||||
| -rw-r--r-- | src/zmq.cpp | 8 | 
26 files changed, 65 insertions, 41 deletions
| diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h index 732ecb9..797d060 100644 --- a/bindings/c/zmq.h +++ b/bindings/c/zmq.h @@ -91,6 +91,10 @@ extern "C" {  //  the peer that the previous recv delivered message from.  #define ZMQ_REP 4 +//  Option specifying that the sockets should be pollable. This may be a little +//  less efficient that raw non-pollable sockets. +#define ZMQ_POLL 1 +  //  Prototype for the message body deallocation functions.  //  It is deliberately defined in the way to comply with standard C free.  typedef void (zmq_free_fn) (void *data); @@ -150,7 +154,7 @@ ZMQ_EXPORT int zmq_msg_type (struct zmq_msg_t *msg);  //  //  Errors: EINVAL - one of the arguments is less than zero or there are no  //                   threads declared at all. -ZMQ_EXPORT void *zmq_init (int app_threads, int io_threads); +ZMQ_EXPORT void *zmq_init (int app_threads, int io_threads, int flags);  //  Deinitialise 0MQ context including all the open sockets. Closing  //  sockets after zmq_term has been called will result in undefined behaviour. diff --git a/bindings/cpp/zmq.hpp b/bindings/cpp/zmq.hpp index 471d1d8..a9e63b5 100644 --- a/bindings/cpp/zmq.hpp +++ b/bindings/cpp/zmq.hpp @@ -180,9 +180,9 @@ namespace zmq      public: -        inline context_t (int app_threads_, int io_threads_) +        inline context_t (int app_threads_, int io_threads_, int flags_ = 0)          { -            ptr = zmq_init (app_threads_, io_threads_); +            ptr = zmq_init (app_threads_, io_threads_, flags_);              if (ptr == NULL)                  throw error_t ();          } diff --git a/bindings/java/Context.cpp b/bindings/java/Context.cpp index 67094e8..d2138c3 100644 --- a/bindings/java/Context.cpp +++ b/bindings/java/Context.cpp @@ -52,7 +52,7 @@ static void raise_exception (JNIEnv *env, int err)  }  JNIEXPORT void JNICALL Java_org_zmq_Context_construct (JNIEnv *env, jobject obj, -    jint app_threads, jint io_threads) +    jint app_threads, jint io_threads, jint flags)  {      if (ctx_handle_fid == NULL) {          jclass cls = env->GetObjectClass (obj); @@ -62,7 +62,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Context_construct (JNIEnv *env, jobject obj,          env->DeleteLocalRef (cls);      } -    void *ctx = zmq_init (app_threads, io_threads); +    void *ctx = zmq_init (app_threads, io_threads, flags);      if (ctx == NULL) {          raise_exception (env, errno);          return; diff --git a/bindings/java/org/zmq/Context.java b/bindings/java/org/zmq/Context.java index c63ef60..408c6b0 100644 --- a/bindings/java/org/zmq/Context.java +++ b/bindings/java/org/zmq/Context.java @@ -24,14 +24,16 @@ public class Context {          System.loadLibrary("jzmq");      } +    public static final int POLL = 1; +      /**       * Class constructor.       *       * @param appThreads maximum number of application threads.       * @param ioThreads size of the threads pool to handle I/O operations.       */ -    public Context (int appThreads, int ioThreads) { -        construct (appThreads, ioThreads); +    public Context (int appThreads, int ioThreads, int flags) { +        construct (appThreads, ioThreads, flags);      }      /** @@ -40,7 +42,7 @@ public class Context {      public native long createSocket (int type);      /** Initialize the JNI interface */ -    protected native void construct (int appThreads, int ioThreads); +    protected native void construct (int appThreads, int ioThreads, int flags);      /** Free resources used by JNI driver. */      protected native void finalize (); diff --git a/bindings/python/pyzmq.cpp b/bindings/python/pyzmq.cpp index 4bb5653..32f30fe 100644 --- a/bindings/python/pyzmq.cpp +++ b/bindings/python/pyzmq.cpp @@ -53,15 +53,16 @@ int context_init (context_t *self, PyObject *args, PyObject *kwdict)  {      int app_threads;      int io_threads; -    static const char *kwlist [] = {"app_threads", "io_threads", NULL}; -    if (!PyArg_ParseTupleAndKeywords (args, kwdict, "ii", (char**) kwlist, -          &app_threads, &io_threads)) { +    int flags = 0; +    static const char *kwlist [] = {"app_threads", "io_threads", "flags", NULL}; +    if (!PyArg_ParseTupleAndKeywords (args, kwdict, "ii|i", (char**) kwlist, +          &app_threads, &io_threads, &flags)) {          PyErr_SetString (PyExc_SystemError, "invalid arguments");          return -1;      }      assert (!self->handle); -    self->handle = zmq_init (app_threads, io_threads); +    self->handle = zmq_init (app_threads, io_threads, flags);      if (!self->handle) {          PyErr_SetString (PyExc_SystemError, strerror (errno));          return -1; @@ -522,7 +523,9 @@ PyMODINIT_FUNC initlibpyzmq ()      t = PyInt_FromLong (ZMQ_MCAST_LOOP);      PyDict_SetItemString (dict, "MCAST_LOOP", t);      Py_DECREF (t); - +    t = PyInt_FromLong (ZMQ_POLL); +    PyDict_SetItemString (dict, "POLL", t); +    Py_DECREF (t);  }  #if defined _MSC_VER diff --git a/bindings/ruby/rbzmq.cpp b/bindings/ruby/rbzmq.cpp index bf0d9bc..83bb2b6 100644 --- a/bindings/ruby/rbzmq.cpp +++ b/bindings/ruby/rbzmq.cpp @@ -38,10 +38,11 @@ static VALUE context_alloc (VALUE class_)  }  static VALUE context_initialize (VALUE self_, VALUE app_threads_, -    VALUE io_threads_) +    VALUE io_threads_, VALUE flags_)  {      assert (!DATA_PTR (self_)); -    void *ctx = zmq_init (NUM2INT (app_threads_), NUM2INT (io_threads_)); +    void *ctx = zmq_init (NUM2INT (app_threads_), NUM2INT (io_threads_), +        NUM2INT (flags_));      if (!ctx) {          rb_raise (rb_eRuntimeError, strerror (errno));          return Qnil; @@ -105,8 +106,8 @@ static VALUE socket_setsockopt (VALUE self_, VALUE option_,              rc = zmq_setsockopt (DATA_PTR (self_), NUM2INT (option_),                   (void *) &optval, 4);          } -          break; +      case ZMQ_IDENTITY:      case ZMQ_SUBSCRIBE:      case ZMQ_UNSUBSCRIBE: @@ -236,7 +237,7 @@ extern "C" void Init_librbzmq ()      VALUE context_type = rb_define_class ("Context", rb_cObject);      rb_define_alloc_func (context_type, context_alloc);      rb_define_method (context_type, "initialize", -        (VALUE(*)(...)) context_initialize, 2); +        (VALUE(*)(...)) context_initialize, 3);      VALUE socket_type = rb_define_class ("Socket", rb_cObject);      rb_define_alloc_func (socket_type, socket_alloc); @@ -274,4 +275,6 @@ extern "C" void Init_librbzmq ()      rb_define_global_const ("PUB", INT2NUM (ZMQ_PUB));      rb_define_global_const ("REQ", INT2NUM (ZMQ_REQ));      rb_define_global_const ("REP", INT2NUM (ZMQ_REP)); + +    rb_define_global_const ("POLL", INT2NUM (ZMQ_POLL));  } diff --git a/perf/c/local_lat.c b/perf/c/local_lat.c index 86d4721..2cbae13 100644 --- a/perf/c/local_lat.c +++ b/perf/c/local_lat.c @@ -42,7 +42,7 @@ int main (int argc, char *argv [])      message_size = atoi (argv [2]);      roundtrip_count = atoi (argv [3]); -    ctx = zmq_init (1, 1); +    ctx = zmq_init (1, 1, 0);      assert (ctx);      s = zmq_socket (ctx, ZMQ_REP); diff --git a/perf/c/local_thr.c b/perf/c/local_thr.c index 9b94330..f9ab720 100644 --- a/perf/c/local_thr.c +++ b/perf/c/local_thr.c @@ -45,7 +45,7 @@ int main (int argc, char *argv [])      message_size = atoi (argv [2]);      message_count = atoi (argv [3]); -    ctx = zmq_init (1, 1); +    ctx = zmq_init (1, 1, 0);      assert (ctx);      s = zmq_socket (ctx, ZMQ_SUB); diff --git a/perf/c/remote_lat.c b/perf/c/remote_lat.c index 15dfc46..52aa071 100644 --- a/perf/c/remote_lat.c +++ b/perf/c/remote_lat.c @@ -46,7 +46,7 @@ int main (int argc, char *argv [])      message_size = atoi (argv [2]);      roundtrip_count = atoi (argv [3]); -    ctx = zmq_init (1, 1); +    ctx = zmq_init (1, 1, 0);      assert (ctx);      s = zmq_socket (ctx, ZMQ_REQ); diff --git a/perf/c/remote_thr.c b/perf/c/remote_thr.c index 8814e0f..fb685cd 100644 --- a/perf/c/remote_thr.c +++ b/perf/c/remote_thr.c @@ -42,7 +42,7 @@ int main (int argc, char *argv [])      message_size = atoi (argv [2]);      message_count = atoi (argv [3]); -    ctx = zmq_init (1, 1); +    ctx = zmq_init (1, 1, 0);      assert (ctx);      s = zmq_socket (ctx, ZMQ_PUB); diff --git a/perf/java/local_lat.java b/perf/java/local_lat.java index e7583cc..873f636 100644 --- a/perf/java/local_lat.java +++ b/perf/java/local_lat.java @@ -33,7 +33,7 @@ class local_lat           int messageSize = Integer.parseInt (args [1]);           int roundtripCount = Integer.parseInt (args [2]); -         org.zmq.Context ctx = new org.zmq.Context (1, 1); +         org.zmq.Context ctx = new org.zmq.Context (1, 1, 0);           org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.REP);           s.bind (bindTo); diff --git a/perf/java/local_thr.java b/perf/java/local_thr.java index 450aed5..2881a08 100644 --- a/perf/java/local_thr.java +++ b/perf/java/local_thr.java @@ -33,7 +33,7 @@ class local_thr          long messageSize = Integer.parseInt (args [1]);          long messageCount = Integer.parseInt (args [2]); -        org.zmq.Context ctx = new org.zmq.Context (1, 1); +        org.zmq.Context ctx = new org.zmq.Context (1, 1, 0);          org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.SUB); diff --git a/perf/java/remote_lat.java b/perf/java/remote_lat.java index 28b2f76..905b82c 100644 --- a/perf/java/remote_lat.java +++ b/perf/java/remote_lat.java @@ -33,7 +33,7 @@ class remote_lat           int messageSize = Integer.parseInt (args [1]);           int roundtripCount = Integer.parseInt (args [2]); -         org.zmq.Context ctx = new org.zmq.Context (1, 1); +         org.zmq.Context ctx = new org.zmq.Context (1, 1, 0);           org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.REQ);           s.connect (connectTo); diff --git a/perf/java/remote_thr.java b/perf/java/remote_thr.java index 9182a8f..f696fc3 100644 --- a/perf/java/remote_thr.java +++ b/perf/java/remote_thr.java @@ -34,7 +34,7 @@ class remote_thr          int messageSize = Integer.parseInt (args [1]);          int messageCount = Integer.parseInt (args [2]); -        org.zmq.Context ctx = new org.zmq.Context (1, 1); +        org.zmq.Context ctx = new org.zmq.Context (1, 1, 0);          org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.PUB); diff --git a/perf/ruby/local_lat.rb b/perf/ruby/local_lat.rb index b71af80..8806ccb 100644 --- a/perf/ruby/local_lat.rb +++ b/perf/ruby/local_lat.rb @@ -27,7 +27,7 @@ bind_to = ARGV[0]  message_size = ARGV[1].to_i  roundtrip_count = ARGV[2].to_i -ctx = Context.new(1, 1) +ctx = Context.new(1, 1, 0)  s = Socket.new(ctx, REP);  s.bind(bind_to); diff --git a/perf/ruby/local_thr.rb b/perf/ruby/local_thr.rb index b916f2d..6d9d13f 100644 --- a/perf/ruby/local_thr.rb +++ b/perf/ruby/local_thr.rb @@ -27,7 +27,7 @@ bind_to = ARGV[0]  message_size = ARGV[1].to_i  message_count = ARGV[2].to_i -ctx = Context.new(1, 1) +ctx = Context.new(1, 1, 0)  s = Socket.new(ctx, SUB);  s.setsockopt(SUBSCRIBE, "*"); diff --git a/perf/ruby/remote_lat.rb b/perf/ruby/remote_lat.rb index 9387fe1..f5b438d 100644 --- a/perf/ruby/remote_lat.rb +++ b/perf/ruby/remote_lat.rb @@ -27,7 +27,7 @@ connect_to = ARGV[0]  message_size = ARGV[1].to_i  roundtrip_count = ARGV[2].to_i -ctx = Context.new(1, 1) +ctx = Context.new(1, 1, 0)  s = Socket.new(ctx, REQ);  s.connect(connect_to); diff --git a/perf/ruby/remote_thr.rb b/perf/ruby/remote_thr.rb index ef93961..760ff88 100644 --- a/perf/ruby/remote_thr.rb +++ b/perf/ruby/remote_thr.rb @@ -27,7 +27,7 @@ connect_to = ARGV[0]  message_size = ARGV[1].to_i  message_count = ARGV[2].to_i -ctx = Context.new(1, 1) +ctx = Context.new(1, 1, 0)  s = Socket.new(ctx, PUB);  #  Add your socket options here. diff --git a/src/app_thread.cpp b/src/app_thread.cpp index f523f40..c152fc8 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -46,7 +46,8 @@  #define ZMQ_DELAY_COMMANDS  #endif -zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : +zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_, +      int flags_) :      object_t (dispatcher_, thread_slot_),      associated (false),      last_processing_time (0) diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 0f95de9..5fdb92d 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -34,7 +34,8 @@ namespace zmq      {      public: -        app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); +        app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_, +            int flags_);          ~app_thread_t (); diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 4c4ec80..5e7ea46 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -30,7 +30,8 @@  #include "windows.h"  #endif -zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) : +zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_, +      int flags_) :      sockets (0),      terminated (false)  { @@ -47,7 +48,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :      //  Create application thread proxies.      for (int i = 0; i != app_threads_; i++) { -        app_thread_t *app_thread = new app_thread_t (this, i); +        app_thread_t *app_thread = new app_thread_t (this, i, flags_);          zmq_assert (app_thread);          app_threads.push_back (app_thread);          signalers.push_back (app_thread->get_signaler ()); @@ -55,7 +56,8 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :      //  Create I/O thread objects.      for (int i = 0; i != io_threads_; i++) { -        io_thread_t *io_thread = new io_thread_t (this, i + app_threads_); +        io_thread_t *io_thread = new io_thread_t (this, i + app_threads_, +            flags_);          zmq_assert (io_thread);          io_threads.push_back (io_thread);          signalers.push_back (io_thread->get_signaler ()); diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index bd1f655..23b6a33 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -50,7 +50,7 @@ namespace zmq          //  Create the dispatcher object. Matrix of pipes to communicate between          //  each socket and each I/O thread is created along with appropriate          //  signalers. -        dispatcher_t (int app_threads_, int io_threads_); +        dispatcher_t (int app_threads_, int io_threads_, int flags_);          //  This function is called when user invokes zmq_term. If there are          //  no more sockets open it'll cause all the infrastructure to be shut diff --git a/src/fd_signaler.cpp b/src/fd_signaler.cpp index 862b0fd..41d168d 100644 --- a/src/fd_signaler.cpp +++ b/src/fd_signaler.cpp @@ -85,6 +85,12 @@ zmq::fd_t zmq::fd_signaler_t::get_fd ()  zmq::fd_signaler_t::fd_signaler_t ()  { +    //  Windows have no 'socketpair' function. +    //  Here we create the socketpair by hand. + +    //  TODO: Check Windows pipe (CreatePipe). It'll presumably be more +    //  efficient than the socketpair. +      struct sockaddr_in addr;      SOCKET listener;      int addrlen = sizeof (addr); diff --git a/src/io_thread.cpp b/src/io_thread.cpp index afac11c..b6521e9 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -32,7 +32,8 @@  #include "dispatcher.hpp"  #include "simple_semaphore.hpp" -zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : +zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_, +      int flags_) :      object_t (dispatcher_, thread_slot_)  {  #if defined ZMQ_FORCE_SELECT diff --git a/src/io_thread.hpp b/src/io_thread.hpp index f95880a..4015b0c 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -37,7 +37,8 @@ namespace zmq      {      public: -        io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); +        io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_, +            int flags_);          //  Clean-up. If the thread was started, it's neccessary to call 'stop'          //  before invoking destructor. Otherwise the destructor would hang up. diff --git a/src/zmq.cpp b/src/zmq.cpp index c567b09..36f30eb 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -173,17 +173,17 @@ int zmq_msg_type (zmq_msg_t *msg_)      return (((const unsigned char*) msg_->content) - offset);  } -void *zmq_init (int app_threads_, int io_threads_) +void *zmq_init (int app_threads_, int io_threads_, int flags_)  {      //  There should be at least a single thread managed by the dispatcher. -    if (app_threads_ < 0 || io_threads_ < 0 || -          app_threads_ + io_threads_ == 0) { +    if (app_threads_ <= 0 || io_threads_ <= 0 || +          app_threads_ > 63 || io_threads_ > 63) {          errno = EINVAL;          return NULL;      }      zmq::dispatcher_t *dispatcher = new zmq::dispatcher_t (app_threads_, -        io_threads_); +        io_threads_, flags_);      zmq_assert (dispatcher);      return (void*) dispatcher;  } | 
