diff options
| -rw-r--r-- | doc/zmq_bind.txt | 2 | ||||
| -rw-r--r-- | doc/zmq_connect.txt | 2 | ||||
| -rw-r--r-- | include/zmq.h | 2 | ||||
| -rw-r--r-- | src/connect_session.cpp | 15 | ||||
| -rw-r--r-- | src/ctx.cpp | 7 | ||||
| -rw-r--r-- | src/ctx.hpp | 5 | ||||
| -rw-r--r-- | src/object.cpp | 4 | ||||
| -rw-r--r-- | src/object.hpp | 2 | ||||
| -rw-r--r-- | src/socket_base.cpp | 21 | ||||
| -rw-r--r-- | src/zmq.cpp | 2 | ||||
| -rw-r--r-- | src/zmq_connecter.cpp | 9 | ||||
| -rw-r--r-- | src/zmq_listener.cpp | 9 | 
12 files changed, 60 insertions, 20 deletions
| diff --git a/doc/zmq_bind.txt b/doc/zmq_bind.txt index 7aa5a0b..23c3134 100644 --- a/doc/zmq_bind.txt +++ b/doc/zmq_bind.txt @@ -58,6 +58,8 @@ The requested 'address' specifies a nonexistent interface.  The 0MQ 'context' associated with the specified 'socket' was terminated.  *EFAULT*::  The provided 'socket' was not valid (NULL). +*EMTHREAD*:: +No I/O thread is available to accomplish the task.  EXAMPLE diff --git a/doc/zmq_connect.txt b/doc/zmq_connect.txt index ffcf3b4..a95f716 100644 --- a/doc/zmq_connect.txt +++ b/doc/zmq_connect.txt @@ -56,6 +56,8 @@ The requested 'transport' protocol is not compatible with the socket type.  The 0MQ 'context' associated with the specified 'socket' was terminated.  *EFAULT*::  The provided 'socket' was not valid (NULL). +*EMTHREAD*:: +No I/O thread is available to accomplish the task.  EXAMPLE diff --git a/include/zmq.h b/include/zmq.h index 8c01477..c5f79d4 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -85,7 +85,7 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);  #define EFSM (ZMQ_HAUSNUMERO + 51)  #define ENOCOMPATPROTO (ZMQ_HAUSNUMERO + 52)  #define ETERM (ZMQ_HAUSNUMERO + 53) -#define EMTHREAD (ZMQ_HAUSNUMERO + 54)  /*  Old error code, remove in 3.x     */ +#define EMTHREAD (ZMQ_HAUSNUMERO + 54)  /*  This function retrieves the errno as it is known to 0MQ library. The goal */  /*  of this function is to make the code 100% portable, including where 0MQ   */ diff --git a/src/connect_session.cpp b/src/connect_session.cpp index afa80b8..10d19c3 100644 --- a/src/connect_session.cpp +++ b/src/connect_session.cpp @@ -43,13 +43,18 @@ void zmq::connect_session_t::process_plug ()  void zmq::connect_session_t::start_connecting ()  { +    //  Choose I/O thread to run connecter in. Given that we are already +    //  running in an I/O thread, there must be at least one available. +    io_thread_t *io_thread = choose_io_thread (options.affinity); +    zmq_assert (io_thread); +      //  Create the connecter object.      //  Both TCP and IPC transports are using the same infrastructure. -    if (protocol == "tcp" || protocol == "ipc") {         +    if (protocol == "tcp" || protocol == "ipc") { +          zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( -            choose_io_thread (options.affinity), this, options, -            protocol.c_str (), address.c_str ()); +            io_thread, this, options, protocol.c_str (), address.c_str ());          zmq_assert (connecter);          launch_child (connecter);          return; @@ -70,7 +75,7 @@ void zmq::connect_session_t::start_connecting ()              //  PGM sender.              pgm_sender_t *pgm_sender =  new (std::nothrow) pgm_sender_t ( -                choose_io_thread (options.affinity), options); +                io_thread, options);              zmq_assert (pgm_sender);              int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); @@ -82,7 +87,7 @@ void zmq::connect_session_t::start_connecting ()              //  PGM receiver.              pgm_receiver_t *pgm_receiver =  new (std::nothrow) pgm_receiver_t ( -                choose_io_thread (options.affinity), options); +                io_thread, options);              zmq_assert (pgm_receiver);              int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); diff --git a/src/ctx.cpp b/src/ctx.cpp index 2660e1f..267f7d0 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -64,7 +64,8 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :      }      //  In the unused part of the slot array, create a list of empty slots. -    for (uint32_t i = slot_count - 1; i >= io_threads_; i--) { +    for (int32_t i = (int32_t) slot_count - 1; +          i >= (int32_t) io_threads_; i--) {          empty_slots.push_back (i);          slots [i] = NULL;      } @@ -221,8 +222,10 @@ void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_)  zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)  { +    if (io_threads.empty ()) +        return NULL; +      //  Find the I/O thread with minimum load. -    zmq_assert (io_threads.size () > 0);      int min_load = -1;      io_threads_t::size_type result = 0;      for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) { diff --git a/src/ctx.hpp b/src/ctx.hpp index 360ca0e..98b4f81 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -65,8 +65,9 @@ namespace zmq          void send_command (uint32_t slot_, const command_t &command_);          //  Returns the I/O thread that is the least busy at the moment. -        //  Taskset specifies which I/O threads are eligible (0 = all). -        class io_thread_t *choose_io_thread (uint64_t taskset_); +        //  Affinity specifies which I/O threads are eligible (0 = all). +        //  Returns NULL is no I/O thread is available. +        class io_thread_t *choose_io_thread (uint64_t affinity_);          //  Management of inproc endpoints.          int register_endpoint (const char *addr_, class socket_base_t *socket_); diff --git a/src/object.cpp b/src/object.cpp index 7b5532b..90c015a 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -142,9 +142,9 @@ void zmq::object_t::log (zmq_msg_t *msg_)      ctx->log (msg_);  } -zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) +zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)  { -    return ctx->choose_io_thread (taskset_); +    return ctx->choose_io_thread (affinity_);  }  void zmq::object_t::zombify_socket (socket_base_t *socket_) diff --git a/src/object.hpp b/src/object.hpp index 6b52f4b..bc1b325 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -54,7 +54,7 @@ namespace zmq          void log (zmq_msg_t *msg_);          //  Chooses least loaded I/O thread. -        class io_thread_t *choose_io_thread (uint64_t taskset_); +        class io_thread_t *choose_io_thread (uint64_t affinity_);          //  Zombify particular socket. In other words, pass the ownership to          //  the context. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index cdad09d..288a627 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -289,8 +289,17 @@ int zmq::socket_base_t::bind (const char *addr_)          return register_endpoint (addr_, this);      if (protocol == "tcp" || protocol == "ipc") { + +        //  Choose I/O thread to run the listerner in. +        io_thread_t *io_thread = choose_io_thread (options.affinity); +        if (!io_thread) { +            errno = EMTHREAD; +            return -1; +        } + +        //  Create and run the listener.          zmq_listener_t *listener = new (std::nothrow) zmq_listener_t ( -            choose_io_thread (options.affinity), this, options); +            io_thread, this, options);          zmq_assert (listener);          int rc = listener->set_address (protocol.c_str(), address.c_str ());          if (rc != 0) { @@ -376,10 +385,16 @@ int zmq::socket_base_t::connect (const char *addr_)          return 0;      } +    //  Choose the I/O thread to run the session in. +    io_thread_t *io_thread = choose_io_thread (options.affinity); +    if (!io_thread) { +        errno = EMTHREAD; +        return -1; +    } +      //  Create session.      connect_session_t *session = new (std::nothrow) connect_session_t ( -        choose_io_thread (options.affinity), this, options, -        protocol.c_str (), address.c_str ()); +        io_thread, this, options, protocol.c_str (), address.c_str ());      zmq_assert (session);      //  If 'immediate connect' feature is required, we'll create the pipes diff --git a/src/zmq.cpp b/src/zmq.cpp index 9a4bdec..e93f8b7 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -88,6 +88,8 @@ const char *zmq_strerror (int errnum_)          return "The protocol is not compatible with the socket type";      case ETERM:          return "Context was terminated"; +    case EMTHREAD: +        return "No thread available";      default:  #if defined _MSC_VER  #pragma warning (push) diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index 8f8fae2..cfca875 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -77,9 +77,14 @@ void zmq::zmq_connecter_t::out_event ()          return;      } +    //  Choose I/O thread to run connecter in. Given that we are already +    //  running in an I/O thread, there must be at least one available. +    io_thread_t *io_thread = choose_io_thread (options.affinity); +    zmq_assert (io_thread); +      //  Create an init object.  -    zmq_init_t *init = new (std::nothrow) zmq_init_t ( -        choose_io_thread (options.affinity), NULL, session, fd, options); +    zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, NULL, +        session, fd, options);      zmq_assert (init);      launch_sibling (init); diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp index 4569ac1..78e44e6 100644 --- a/src/zmq_listener.cpp +++ b/src/zmq_listener.cpp @@ -64,9 +64,14 @@ void zmq::zmq_listener_t::in_event ()      if (fd == retired_fd)          return; +    //  Choose I/O thread to run connecter in. Given that we are already +    //  running in an I/O thread, there must be at least one available. +    io_thread_t *io_thread = choose_io_thread (options.affinity); +    zmq_assert (io_thread); +      //  Create and launch an init object.  -    zmq_init_t *init = new (std::nothrow) zmq_init_t ( -        choose_io_thread (options.affinity), socket, NULL, fd, options); +    zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, socket, +        NULL, fd, options);      zmq_assert (init);      launch_sibling (init);  } | 
