diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/connect_session.cpp | 15 | ||||
-rw-r--r-- | src/ctx.cpp | 7 | ||||
-rw-r--r-- | src/ctx.hpp | 5 | ||||
-rw-r--r-- | src/object.cpp | 4 | ||||
-rw-r--r-- | src/object.hpp | 2 | ||||
-rw-r--r-- | src/socket_base.cpp | 21 | ||||
-rw-r--r-- | src/zmq.cpp | 2 | ||||
-rw-r--r-- | src/zmq_connecter.cpp | 9 | ||||
-rw-r--r-- | src/zmq_listener.cpp | 9 |
9 files changed, 55 insertions, 19 deletions
diff --git a/src/connect_session.cpp b/src/connect_session.cpp index afa80b8..10d19c3 100644 --- a/src/connect_session.cpp +++ b/src/connect_session.cpp @@ -43,13 +43,18 @@ void zmq::connect_session_t::process_plug () void zmq::connect_session_t::start_connecting () { + // Choose I/O thread to run connecter in. Given that we are already + // running in an I/O thread, there must be at least one available. + io_thread_t *io_thread = choose_io_thread (options.affinity); + zmq_assert (io_thread); + // Create the connecter object. // Both TCP and IPC transports are using the same infrastructure. - if (protocol == "tcp" || protocol == "ipc") { + if (protocol == "tcp" || protocol == "ipc") { + zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( - choose_io_thread (options.affinity), this, options, - protocol.c_str (), address.c_str ()); + io_thread, this, options, protocol.c_str (), address.c_str ()); zmq_assert (connecter); launch_child (connecter); return; @@ -70,7 +75,7 @@ void zmq::connect_session_t::start_connecting () // PGM sender. pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( - choose_io_thread (options.affinity), options); + io_thread, options); zmq_assert (pgm_sender); int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); @@ -82,7 +87,7 @@ void zmq::connect_session_t::start_connecting () // PGM receiver. pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( - choose_io_thread (options.affinity), options); + io_thread, options); zmq_assert (pgm_receiver); int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); diff --git a/src/ctx.cpp b/src/ctx.cpp index 2660e1f..267f7d0 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -64,7 +64,8 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : } // In the unused part of the slot array, create a list of empty slots. - for (uint32_t i = slot_count - 1; i >= io_threads_; i--) { + for (int32_t i = (int32_t) slot_count - 1; + i >= (int32_t) io_threads_; i--) { empty_slots.push_back (i); slots [i] = NULL; } @@ -221,8 +222,10 @@ void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_) zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) { + if (io_threads.empty ()) + return NULL; + // Find the I/O thread with minimum load. - zmq_assert (io_threads.size () > 0); int min_load = -1; io_threads_t::size_type result = 0; for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) { diff --git a/src/ctx.hpp b/src/ctx.hpp index 360ca0e..98b4f81 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -65,8 +65,9 @@ namespace zmq void send_command (uint32_t slot_, const command_t &command_); // Returns the I/O thread that is the least busy at the moment. - // Taskset specifies which I/O threads are eligible (0 = all). - class io_thread_t *choose_io_thread (uint64_t taskset_); + // Affinity specifies which I/O threads are eligible (0 = all). + // Returns NULL is no I/O thread is available. + class io_thread_t *choose_io_thread (uint64_t affinity_); // Management of inproc endpoints. int register_endpoint (const char *addr_, class socket_base_t *socket_); diff --git a/src/object.cpp b/src/object.cpp index 7b5532b..90c015a 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -142,9 +142,9 @@ void zmq::object_t::log (zmq_msg_t *msg_) ctx->log (msg_); } -zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) +zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_) { - return ctx->choose_io_thread (taskset_); + return ctx->choose_io_thread (affinity_); } void zmq::object_t::zombify_socket (socket_base_t *socket_) diff --git a/src/object.hpp b/src/object.hpp index 6b52f4b..bc1b325 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -54,7 +54,7 @@ namespace zmq void log (zmq_msg_t *msg_); // Chooses least loaded I/O thread. - class io_thread_t *choose_io_thread (uint64_t taskset_); + class io_thread_t *choose_io_thread (uint64_t affinity_); // Zombify particular socket. In other words, pass the ownership to // the context. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index cdad09d..288a627 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -289,8 +289,17 @@ int zmq::socket_base_t::bind (const char *addr_) return register_endpoint (addr_, this); if (protocol == "tcp" || protocol == "ipc") { + + // Choose I/O thread to run the listerner in. + io_thread_t *io_thread = choose_io_thread (options.affinity); + if (!io_thread) { + errno = EMTHREAD; + return -1; + } + + // Create and run the listener. zmq_listener_t *listener = new (std::nothrow) zmq_listener_t ( - choose_io_thread (options.affinity), this, options); + io_thread, this, options); zmq_assert (listener); int rc = listener->set_address (protocol.c_str(), address.c_str ()); if (rc != 0) { @@ -376,10 +385,16 @@ int zmq::socket_base_t::connect (const char *addr_) return 0; } + // Choose the I/O thread to run the session in. + io_thread_t *io_thread = choose_io_thread (options.affinity); + if (!io_thread) { + errno = EMTHREAD; + return -1; + } + // Create session. connect_session_t *session = new (std::nothrow) connect_session_t ( - choose_io_thread (options.affinity), this, options, - protocol.c_str (), address.c_str ()); + io_thread, this, options, protocol.c_str (), address.c_str ()); zmq_assert (session); // If 'immediate connect' feature is required, we'll create the pipes diff --git a/src/zmq.cpp b/src/zmq.cpp index 9a4bdec..e93f8b7 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -88,6 +88,8 @@ const char *zmq_strerror (int errnum_) return "The protocol is not compatible with the socket type"; case ETERM: return "Context was terminated"; + case EMTHREAD: + return "No thread available"; default: #if defined _MSC_VER #pragma warning (push) diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index 8f8fae2..cfca875 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -77,9 +77,14 @@ void zmq::zmq_connecter_t::out_event () return; } + // Choose I/O thread to run connecter in. Given that we are already + // running in an I/O thread, there must be at least one available. + io_thread_t *io_thread = choose_io_thread (options.affinity); + zmq_assert (io_thread); + // Create an init object. - zmq_init_t *init = new (std::nothrow) zmq_init_t ( - choose_io_thread (options.affinity), NULL, session, fd, options); + zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, NULL, + session, fd, options); zmq_assert (init); launch_sibling (init); diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp index 4569ac1..78e44e6 100644 --- a/src/zmq_listener.cpp +++ b/src/zmq_listener.cpp @@ -64,9 +64,14 @@ void zmq::zmq_listener_t::in_event () if (fd == retired_fd) return; + // Choose I/O thread to run connecter in. Given that we are already + // running in an I/O thread, there must be at least one available. + io_thread_t *io_thread = choose_io_thread (options.affinity); + zmq_assert (io_thread); + // Create and launch an init object. - zmq_init_t *init = new (std::nothrow) zmq_init_t ( - choose_io_thread (options.affinity), socket, NULL, fd, options); + zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, socket, + NULL, fd, options); zmq_assert (init); launch_sibling (init); } |