summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2010-09-09 08:25:00 +0200
committerMartin Sustrik <sustrik@250bpm.com>2010-09-09 08:25:00 +0200
commita68e6739f4248e25a0f9a64c89729f55dfacb842 (patch)
tree4686061e52137cfa373587162f59536cd3f55d32 /src
parent47e87b7e4b8dd6a0cfbc1f30ffcb06edfa83c853 (diff)
when no I/O threads are available error is raised instead of assertion
Diffstat (limited to 'src')
-rw-r--r--src/connect_session.cpp15
-rw-r--r--src/ctx.cpp7
-rw-r--r--src/ctx.hpp5
-rw-r--r--src/object.cpp4
-rw-r--r--src/object.hpp2
-rw-r--r--src/socket_base.cpp21
-rw-r--r--src/zmq.cpp2
-rw-r--r--src/zmq_connecter.cpp9
-rw-r--r--src/zmq_listener.cpp9
9 files changed, 55 insertions, 19 deletions
diff --git a/src/connect_session.cpp b/src/connect_session.cpp
index afa80b8..10d19c3 100644
--- a/src/connect_session.cpp
+++ b/src/connect_session.cpp
@@ -43,13 +43,18 @@ void zmq::connect_session_t::process_plug ()
void zmq::connect_session_t::start_connecting ()
{
+ // Choose I/O thread to run connecter in. Given that we are already
+ // running in an I/O thread, there must be at least one available.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ zmq_assert (io_thread);
+
// Create the connecter object.
// Both TCP and IPC transports are using the same infrastructure.
- if (protocol == "tcp" || protocol == "ipc") {
+ if (protocol == "tcp" || protocol == "ipc") {
+
zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
- choose_io_thread (options.affinity), this, options,
- protocol.c_str (), address.c_str ());
+ io_thread, this, options, protocol.c_str (), address.c_str ());
zmq_assert (connecter);
launch_child (connecter);
return;
@@ -70,7 +75,7 @@ void zmq::connect_session_t::start_connecting ()
// PGM sender.
pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
- choose_io_thread (options.affinity), options);
+ io_thread, options);
zmq_assert (pgm_sender);
int rc = pgm_sender->init (udp_encapsulation, address.c_str ());
@@ -82,7 +87,7 @@ void zmq::connect_session_t::start_connecting ()
// PGM receiver.
pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
- choose_io_thread (options.affinity), options);
+ io_thread, options);
zmq_assert (pgm_receiver);
int rc = pgm_receiver->init (udp_encapsulation, address.c_str ());
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 2660e1f..267f7d0 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -64,7 +64,8 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
}
// In the unused part of the slot array, create a list of empty slots.
- for (uint32_t i = slot_count - 1; i >= io_threads_; i--) {
+ for (int32_t i = (int32_t) slot_count - 1;
+ i >= (int32_t) io_threads_; i--) {
empty_slots.push_back (i);
slots [i] = NULL;
}
@@ -221,8 +222,10 @@ void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_)
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
{
+ if (io_threads.empty ())
+ return NULL;
+
// Find the I/O thread with minimum load.
- zmq_assert (io_threads.size () > 0);
int min_load = -1;
io_threads_t::size_type result = 0;
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 360ca0e..98b4f81 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -65,8 +65,9 @@ namespace zmq
void send_command (uint32_t slot_, const command_t &command_);
// Returns the I/O thread that is the least busy at the moment.
- // Taskset specifies which I/O threads are eligible (0 = all).
- class io_thread_t *choose_io_thread (uint64_t taskset_);
+ // Affinity specifies which I/O threads are eligible (0 = all).
+ // Returns NULL is no I/O thread is available.
+ class io_thread_t *choose_io_thread (uint64_t affinity_);
// Management of inproc endpoints.
int register_endpoint (const char *addr_, class socket_base_t *socket_);
diff --git a/src/object.cpp b/src/object.cpp
index 7b5532b..90c015a 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -142,9 +142,9 @@ void zmq::object_t::log (zmq_msg_t *msg_)
ctx->log (msg_);
}
-zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
+zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
{
- return ctx->choose_io_thread (taskset_);
+ return ctx->choose_io_thread (affinity_);
}
void zmq::object_t::zombify_socket (socket_base_t *socket_)
diff --git a/src/object.hpp b/src/object.hpp
index 6b52f4b..bc1b325 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -54,7 +54,7 @@ namespace zmq
void log (zmq_msg_t *msg_);
// Chooses least loaded I/O thread.
- class io_thread_t *choose_io_thread (uint64_t taskset_);
+ class io_thread_t *choose_io_thread (uint64_t affinity_);
// Zombify particular socket. In other words, pass the ownership to
// the context.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index cdad09d..288a627 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -289,8 +289,17 @@ int zmq::socket_base_t::bind (const char *addr_)
return register_endpoint (addr_, this);
if (protocol == "tcp" || protocol == "ipc") {
+
+ // Choose I/O thread to run the listerner in.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ if (!io_thread) {
+ errno = EMTHREAD;
+ return -1;
+ }
+
+ // Create and run the listener.
zmq_listener_t *listener = new (std::nothrow) zmq_listener_t (
- choose_io_thread (options.affinity), this, options);
+ io_thread, this, options);
zmq_assert (listener);
int rc = listener->set_address (protocol.c_str(), address.c_str ());
if (rc != 0) {
@@ -376,10 +385,16 @@ int zmq::socket_base_t::connect (const char *addr_)
return 0;
}
+ // Choose the I/O thread to run the session in.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ if (!io_thread) {
+ errno = EMTHREAD;
+ return -1;
+ }
+
// Create session.
connect_session_t *session = new (std::nothrow) connect_session_t (
- choose_io_thread (options.affinity), this, options,
- protocol.c_str (), address.c_str ());
+ io_thread, this, options, protocol.c_str (), address.c_str ());
zmq_assert (session);
// If 'immediate connect' feature is required, we'll create the pipes
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 9a4bdec..e93f8b7 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -88,6 +88,8 @@ const char *zmq_strerror (int errnum_)
return "The protocol is not compatible with the socket type";
case ETERM:
return "Context was terminated";
+ case EMTHREAD:
+ return "No thread available";
default:
#if defined _MSC_VER
#pragma warning (push)
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index 8f8fae2..cfca875 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -77,9 +77,14 @@ void zmq::zmq_connecter_t::out_event ()
return;
}
+ // Choose I/O thread to run connecter in. Given that we are already
+ // running in an I/O thread, there must be at least one available.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ zmq_assert (io_thread);
+
// Create an init object.
- zmq_init_t *init = new (std::nothrow) zmq_init_t (
- choose_io_thread (options.affinity), NULL, session, fd, options);
+ zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, NULL,
+ session, fd, options);
zmq_assert (init);
launch_sibling (init);
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index 4569ac1..78e44e6 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -64,9 +64,14 @@ void zmq::zmq_listener_t::in_event ()
if (fd == retired_fd)
return;
+ // Choose I/O thread to run connecter in. Given that we are already
+ // running in an I/O thread, there must be at least one available.
+ io_thread_t *io_thread = choose_io_thread (options.affinity);
+ zmq_assert (io_thread);
+
// Create and launch an init object.
- zmq_init_t *init = new (std::nothrow) zmq_init_t (
- choose_io_thread (options.affinity), socket, NULL, fd, options);
+ zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, socket,
+ NULL, fd, options);
zmq_assert (init);
launch_sibling (init);
}