From 50a8b9ea0c4a819073b46449dee8fc839b837ae5 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sun, 20 Sep 2009 10:14:21 +0200 Subject: 'flags' parameter added to zmq_init --- src/app_thread.cpp | 3 ++- src/app_thread.hpp | 3 ++- src/dispatcher.cpp | 8 +++++--- src/dispatcher.hpp | 2 +- src/fd_signaler.cpp | 6 ++++++ src/io_thread.cpp | 3 ++- src/io_thread.hpp | 3 ++- src/zmq.cpp | 8 ++++---- 8 files changed, 24 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/app_thread.cpp b/src/app_thread.cpp index f523f40..c152fc8 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -46,7 +46,8 @@ #define ZMQ_DELAY_COMMANDS #endif -zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : +zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_, + int flags_) : object_t (dispatcher_, thread_slot_), associated (false), last_processing_time (0) diff --git a/src/app_thread.hpp b/src/app_thread.hpp index 0f95de9..5fdb92d 100644 --- a/src/app_thread.hpp +++ b/src/app_thread.hpp @@ -34,7 +34,8 @@ namespace zmq { public: - app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); + app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_, + int flags_); ~app_thread_t (); diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 4c4ec80..5e7ea46 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -30,7 +30,8 @@ #include "windows.h" #endif -zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) : +zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_, + int flags_) : sockets (0), terminated (false) { @@ -47,7 +48,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) : // Create application thread proxies. for (int i = 0; i != app_threads_; i++) { - app_thread_t *app_thread = new app_thread_t (this, i); + app_thread_t *app_thread = new app_thread_t (this, i, flags_); zmq_assert (app_thread); app_threads.push_back (app_thread); signalers.push_back (app_thread->get_signaler ()); @@ -55,7 +56,8 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) : // Create I/O thread objects. for (int i = 0; i != io_threads_; i++) { - io_thread_t *io_thread = new io_thread_t (this, i + app_threads_); + io_thread_t *io_thread = new io_thread_t (this, i + app_threads_, + flags_); zmq_assert (io_thread); io_threads.push_back (io_thread); signalers.push_back (io_thread->get_signaler ()); diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index bd1f655..23b6a33 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -50,7 +50,7 @@ namespace zmq // Create the dispatcher object. Matrix of pipes to communicate between // each socket and each I/O thread is created along with appropriate // signalers. - dispatcher_t (int app_threads_, int io_threads_); + dispatcher_t (int app_threads_, int io_threads_, int flags_); // This function is called when user invokes zmq_term. If there are // no more sockets open it'll cause all the infrastructure to be shut diff --git a/src/fd_signaler.cpp b/src/fd_signaler.cpp index 862b0fd..41d168d 100644 --- a/src/fd_signaler.cpp +++ b/src/fd_signaler.cpp @@ -85,6 +85,12 @@ zmq::fd_t zmq::fd_signaler_t::get_fd () zmq::fd_signaler_t::fd_signaler_t () { + // Windows have no 'socketpair' function. + // Here we create the socketpair by hand. + + // TODO: Check Windows pipe (CreatePipe). It'll presumably be more + // efficient than the socketpair. + struct sockaddr_in addr; SOCKET listener; int addrlen = sizeof (addr); diff --git a/src/io_thread.cpp b/src/io_thread.cpp index afac11c..b6521e9 100644 --- a/src/io_thread.cpp +++ b/src/io_thread.cpp @@ -32,7 +32,8 @@ #include "dispatcher.hpp" #include "simple_semaphore.hpp" -zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : +zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_, + int flags_) : object_t (dispatcher_, thread_slot_) { #if defined ZMQ_FORCE_SELECT diff --git a/src/io_thread.hpp b/src/io_thread.hpp index f95880a..4015b0c 100644 --- a/src/io_thread.hpp +++ b/src/io_thread.hpp @@ -37,7 +37,8 @@ namespace zmq { public: - io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); + io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_, + int flags_); // Clean-up. If the thread was started, it's neccessary to call 'stop' // before invoking destructor. Otherwise the destructor would hang up. diff --git a/src/zmq.cpp b/src/zmq.cpp index c567b09..36f30eb 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -173,17 +173,17 @@ int zmq_msg_type (zmq_msg_t *msg_) return (((const unsigned char*) msg_->content) - offset); } -void *zmq_init (int app_threads_, int io_threads_) +void *zmq_init (int app_threads_, int io_threads_, int flags_) { // There should be at least a single thread managed by the dispatcher. - if (app_threads_ < 0 || io_threads_ < 0 || - app_threads_ + io_threads_ == 0) { + if (app_threads_ <= 0 || io_threads_ <= 0 || + app_threads_ > 63 || io_threads_ > 63) { errno = EINVAL; return NULL; } zmq::dispatcher_t *dispatcher = new zmq::dispatcher_t (app_threads_, - io_threads_); + io_threads_, flags_); zmq_assert (dispatcher); return (void*) dispatcher; } -- cgit v1.2.3