summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/zmq_bind.txt2
-rw-r--r--doc/zmq_connect.txt2
-rw-r--r--include/zmq.h2
-rw-r--r--src/connect_session.cpp15
-rw-r--r--src/ctx.cpp7
-rw-r--r--src/ctx.hpp5
-rw-r--r--src/object.cpp4
-rw-r--r--src/object.hpp2
-rw-r--r--src/socket_base.cpp21
-rw-r--r--src/zmq.cpp2
-rw-r--r--src/zmq_connecter.cpp9
-rw-r--r--src/zmq_listener.cpp9
12 files changed, 60 insertions, 20 deletions
diff --git a/doc/zmq_bind.txt b/doc/zmq_bind.txt
index 7aa5a0b..23c3134 100644
--- a/doc/zmq_bind.txt
+++ b/doc/zmq_bind.txt
@@ -58,6 +58,8 @@ The requested 'address' specifies a nonexistent interface.
The 0MQ 'context' associated with the specified 'socket' was terminated.
*EFAULT*::
The provided 'socket' was not valid (NULL).
+*EMTHREAD*::
+No I/O thread is available to accomplish the task.
EXAMPLE
diff --git a/doc/zmq_connect.txt b/doc/zmq_connect.txt
index ffcf3b4..a95f716 100644
--- a/doc/zmq_connect.txt
+++ b/doc/zmq_connect.txt
@@ -56,6 +56,8 @@ The requested 'transport' protocol is not compatible with the socket type.
The 0MQ 'context' associated with the specified 'socket' was terminated.
*EFAULT*::
The provided 'socket' was not valid (NULL).
+*EMTHREAD*::
+No I/O thread is available to accomplish the task.
EXAMPLE
diff --git a/include/zmq.h b/include/zmq.h
index 8c01477..c5f79d4 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -85,7 +85,7 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
#define EFSM (ZMQ_HAUSNUMERO + 51)
#define ENOCOMPATPROTO (ZMQ_HAUSNUMERO + 52)
#define ETERM (ZMQ_HAUSNUMERO + 53)
-#define EMTHREAD (ZMQ_HAUSNUMERO + 54) /* Old error code, remove in 3.x */
+#define EMTHREAD (ZMQ_HAUSNUMERO + 54)
/* This function retrieves the errno as it is known to 0MQ library. The goal */
/* of this function is to make the code 100% portable, including where 0MQ */
diff --git a/src/connect_session.cpp b/src/connect_session.cpp
index afa80b8..10d19c3 100644
--- a/src/connect_session.cpp
+++ b/src/connect_session.cpp
@@ -43,13 +43,18 @@ void zmq::connect_session_t::process_plug ()
void zmq::connect_session_t::start_connecting ()
{
+ // Choose I/O thread to run connecter in. Given that we are already
+ // running in an I/O thread, there must be at least one available.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ zmq_assert (io_thread);
+
// Create the connecter object.
// Both TCP and IPC transports are using the same infrastructure.
- if (protocol == "tcp" || protocol == "ipc") {
+ if (protocol == "tcp" || protocol == "ipc") {
+
zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
- choose_io_thread (options.affinity), this, options,
- protocol.c_str (), address.c_str ());
+ io_thread, this, options, protocol.c_str (), address.c_str ());
zmq_assert (connecter);
launch_child (connecter);
return;
@@ -70,7 +75,7 @@ void zmq::connect_session_t::start_connecting ()
// PGM sender.
pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
- choose_io_thread (options.affinity), options);
+ io_thread, options);
zmq_assert (pgm_sender);
int rc = pgm_sender->init (udp_encapsulation, address.c_str ());
@@ -82,7 +87,7 @@ void zmq::connect_session_t::start_connecting ()
// PGM receiver.
pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
- choose_io_thread (options.affinity), options);
+ io_thread, options);
zmq_assert (pgm_receiver);
int rc = pgm_receiver->init (udp_encapsulation, address.c_str ());
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 2660e1f..267f7d0 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -64,7 +64,8 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
}
// In the unused part of the slot array, create a list of empty slots.
- for (uint32_t i = slot_count - 1; i >= io_threads_; i--) {
+ for (int32_t i = (int32_t) slot_count - 1;
+ i >= (int32_t) io_threads_; i--) {
empty_slots.push_back (i);
slots [i] = NULL;
}
@@ -221,8 +222,10 @@ void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_)
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
{
+ if (io_threads.empty ())
+ return NULL;
+
// Find the I/O thread with minimum load.
- zmq_assert (io_threads.size () > 0);
int min_load = -1;
io_threads_t::size_type result = 0;
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 360ca0e..98b4f81 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -65,8 +65,9 @@ namespace zmq
void send_command (uint32_t slot_, const command_t &command_);
// Returns the I/O thread that is the least busy at the moment.
- // Taskset specifies which I/O threads are eligible (0 = all).
- class io_thread_t *choose_io_thread (uint64_t taskset_);
+ // Affinity specifies which I/O threads are eligible (0 = all).
+ // Returns NULL is no I/O thread is available.
+ class io_thread_t *choose_io_thread (uint64_t affinity_);
// Management of inproc endpoints.
int register_endpoint (const char *addr_, class socket_base_t *socket_);
diff --git a/src/object.cpp b/src/object.cpp
index 7b5532b..90c015a 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -142,9 +142,9 @@ void zmq::object_t::log (zmq_msg_t *msg_)
ctx->log (msg_);
}
-zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
+zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
{
- return ctx->choose_io_thread (taskset_);
+ return ctx->choose_io_thread (affinity_);
}
void zmq::object_t::zombify_socket (socket_base_t *socket_)
diff --git a/src/object.hpp b/src/object.hpp
index 6b52f4b..bc1b325 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -54,7 +54,7 @@ namespace zmq
void log (zmq_msg_t *msg_);
// Chooses least loaded I/O thread.
- class io_thread_t *choose_io_thread (uint64_t taskset_);
+ class io_thread_t *choose_io_thread (uint64_t affinity_);
// Zombify particular socket. In other words, pass the ownership to
// the context.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index cdad09d..288a627 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -289,8 +289,17 @@ int zmq::socket_base_t::bind (const char *addr_)
return register_endpoint (addr_, this);
if (protocol == "tcp" || protocol == "ipc") {
+
+ // Choose I/O thread to run the listerner in.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ if (!io_thread) {
+ errno = EMTHREAD;
+ return -1;
+ }
+
+ // Create and run the listener.
zmq_listener_t *listener = new (std::nothrow) zmq_listener_t (
- choose_io_thread (options.affinity), this, options);
+ io_thread, this, options);
zmq_assert (listener);
int rc = listener->set_address (protocol.c_str(), address.c_str ());
if (rc != 0) {
@@ -376,10 +385,16 @@ int zmq::socket_base_t::connect (const char *addr_)
return 0;
}
+ // Choose the I/O thread to run the session in.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ if (!io_thread) {
+ errno = EMTHREAD;
+ return -1;
+ }
+
// Create session.
connect_session_t *session = new (std::nothrow) connect_session_t (
- choose_io_thread (options.affinity), this, options,
- protocol.c_str (), address.c_str ());
+ io_thread, this, options, protocol.c_str (), address.c_str ());
zmq_assert (session);
// If 'immediate connect' feature is required, we'll create the pipes
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 9a4bdec..e93f8b7 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -88,6 +88,8 @@ const char *zmq_strerror (int errnum_)
return "The protocol is not compatible with the socket type";
case ETERM:
return "Context was terminated";
+ case EMTHREAD:
+ return "No thread available";
default:
#if defined _MSC_VER
#pragma warning (push)
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index 8f8fae2..cfca875 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -77,9 +77,14 @@ void zmq::zmq_connecter_t::out_event ()
return;
}
+ // Choose I/O thread to run connecter in. Given that we are already
+ // running in an I/O thread, there must be at least one available.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ zmq_assert (io_thread);
+
// Create an init object.
- zmq_init_t *init = new (std::nothrow) zmq_init_t (
- choose_io_thread (options.affinity), NULL, session, fd, options);
+ zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, NULL,
+ session, fd, options);
zmq_assert (init);
launch_sibling (init);
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index 4569ac1..78e44e6 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -64,9 +64,14 @@ void zmq::zmq_listener_t::in_event ()
if (fd == retired_fd)
return;
+ // Choose I/O thread to run connecter in. Given that we are already
+ // running in an I/O thread, there must be at least one available.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ zmq_assert (io_thread);
+
// Create and launch an init object.
- zmq_init_t *init = new (std::nothrow) zmq_init_t (
- choose_io_thread (options.affinity), socket, NULL, fd, options);
+ zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, socket,
+ NULL, fd, options);
zmq_assert (init);
launch_sibling (init);
}