summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/zmq.h70
-rw-r--r--src/Makefile.am15
-rw-r--r--src/app_thread.cpp195
-rw-r--r--src/app_thread.hpp88
-rw-r--r--src/config.hpp5
-rw-r--r--src/ctx.cpp249
-rw-r--r--src/ctx.hpp94
-rw-r--r--src/fq.cpp29
-rw-r--r--src/fq.hpp17
-rw-r--r--src/i_endpoint.hpp43
-rw-r--r--src/io_thread.cpp5
-rw-r--r--src/io_thread.hpp2
-rw-r--r--src/lb.cpp21
-rw-r--r--src/lb.hpp13
-rw-r--r--src/object.cpp29
-rw-r--r--src/object.hpp14
-rw-r--r--src/owned.cpp10
-rw-r--r--src/owned.hpp9
-rw-r--r--src/pair.cpp72
-rw-r--r--src/pair.hpp26
-rw-r--r--src/pipe.cpp250
-rw-r--r--src/pipe.hpp111
-rw-r--r--src/pub.cpp62
-rw-r--r--src/pub.hpp21
-rw-r--r--src/pull.cpp48
-rw-r--r--src/pull.hpp16
-rw-r--r--src/push.cpp50
-rw-r--r--src/push.hpp16
-rw-r--r--src/rep.cpp81
-rw-r--r--src/rep.hpp30
-rw-r--r--src/req.cpp65
-rw-r--r--src/req.hpp32
-rw-r--r--src/semaphore.hpp135
-rw-r--r--src/session.cpp62
-rw-r--r--src/session.hpp26
-rw-r--r--src/socket_base.cpp495
-rw-r--r--src/socket_base.hpp100
-rw-r--r--src/sub.cpp40
-rw-r--r--src/sub.hpp11
-rw-r--r--src/thread.cpp20
-rw-r--r--src/thread.hpp9
-rw-r--r--src/xrep.cpp65
-rw-r--r--src/xrep.hpp26
-rw-r--r--src/xreq.cpp37
-rw-r--r--src/xreq.hpp10
-rw-r--r--src/zmq.cpp7
-rw-r--r--src/zmq_encoder.cpp2
47 files changed, 1427 insertions, 1406 deletions
diff --git a/include/zmq.h b/include/zmq.h
index bce1215..075ff3c 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -82,7 +82,6 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
#endif
/* Native 0MQ error codes. */
-#define EMTHREAD (ZMQ_HAUSNUMERO + 50)
#define EFSM (ZMQ_HAUSNUMERO + 51)
#define ENOCOMPATPROTO (ZMQ_HAUSNUMERO + 52)
#define ETERM (ZMQ_HAUSNUMERO + 53)
@@ -152,36 +151,39 @@ ZMQ_EXPORT int zmq_term (void *context);
/******************************************************************************/
/* Socket types. */
-#define ZMQ_PAIR 0
-#define ZMQ_PUB 1
-#define ZMQ_SUB 2
-#define ZMQ_REQ 3
-#define ZMQ_REP 4
-#define ZMQ_XREQ 5
-#define ZMQ_XREP 6
-#define ZMQ_PULL 7
-#define ZMQ_PUSH 8
-#define ZMQ_UPSTREAM ZMQ_PULL /* Old alias, remove in 3.x */
-#define ZMQ_DOWNSTREAM ZMQ_PUSH /* Old alias, remove in 3.x */
+#define ZMQ_PAIR 0
+#define ZMQ_PUB 1
+#define ZMQ_SUB 2
+#define ZMQ_REQ 3
+#define ZMQ_REP 4
+#define ZMQ_XREQ 5
+#define ZMQ_XREP 6
+#define ZMQ_PULL 7
+#define ZMQ_PUSH 8
+
+/* Deprecated aliases, to be removed in release 3.x */
+#define ZMQ_UPSTREAM ZMQ_PULL
+#define ZMQ_DOWNSTREAM ZMQ_PUSH
/* Socket options. */
-#define ZMQ_HWM 1
-/* ZMQ_LWM 2 no longer supported */
-#define ZMQ_SWAP 3
-#define ZMQ_AFFINITY 4
-#define ZMQ_IDENTITY 5
-#define ZMQ_SUBSCRIBE 6
-#define ZMQ_UNSUBSCRIBE 7
-#define ZMQ_RATE 8
+#define ZMQ_HWM 1
+#define ZMQ_SWAP 3
+#define ZMQ_AFFINITY 4
+#define ZMQ_IDENTITY 5
+#define ZMQ_SUBSCRIBE 6
+#define ZMQ_UNSUBSCRIBE 7
+#define ZMQ_RATE 8
#define ZMQ_RECOVERY_IVL 9
-#define ZMQ_MCAST_LOOP 10
-#define ZMQ_SNDBUF 11
-#define ZMQ_RCVBUF 12
-#define ZMQ_RCVMORE 13
+#define ZMQ_MCAST_LOOP 10
+#define ZMQ_SNDBUF 11
+#define ZMQ_RCVBUF 12
+#define ZMQ_RCVMORE 13
+#define ZMQ_FD 14
+#define ZMQ_EVENTS 15
/* Send/recv options. */
-#define ZMQ_NOBLOCK 1
-#define ZMQ_SNDMORE 2
+#define ZMQ_NOBLOCK 1
+#define ZMQ_SNDMORE 2
ZMQ_EXPORT void *zmq_socket (void *context, int type);
ZMQ_EXPORT int zmq_close (void *s);
@@ -198,9 +200,9 @@ ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags);
/* I/O multiplexing. */
/******************************************************************************/
-#define ZMQ_POLLIN 1
-#define ZMQ_POLLOUT 2
-#define ZMQ_POLLERR 4
+#define ZMQ_POLLIN 1
+#define ZMQ_POLLOUT 2
+#define ZMQ_POLLERR 4
typedef struct
{
@@ -217,17 +219,15 @@ typedef struct
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
/******************************************************************************/
-/* Devices */
+/* Devices - Experimental. */
/******************************************************************************/
-#define ZMQ_QUEUE 1
-#define ZMQ_FORWARDER 2
-#define ZMQ_STREAMER 3
+#define ZMQ_STREAMER 1
+#define ZMQ_FORWARDER 2
+#define ZMQ_QUEUE 3
ZMQ_EXPORT int zmq_device (int device, void * insocket, void* outsocket);
-#undef ZMQ_EXPORT
-
#ifdef __cplusplus
}
#endif
diff --git a/src/Makefile.am b/src/Makefile.am
index 19a80d0..937372f 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -49,7 +49,7 @@ endif
nodist_libzmq_la_SOURCES = $(pgm_sources)
-libzmq_la_SOURCES = app_thread.hpp \
+libzmq_la_SOURCES = \
atomic_counter.hpp \
atomic_ptr.hpp \
blob.hpp \
@@ -58,7 +58,6 @@ libzmq_la_SOURCES = app_thread.hpp \
ctx.hpp \
decoder.hpp \
devpoll.hpp \
- push.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
@@ -69,7 +68,6 @@ libzmq_la_SOURCES = app_thread.hpp \
io_object.hpp \
io_thread.hpp \
ip.hpp \
- i_endpoint.hpp \
i_engine.hpp \
i_poll_events.hpp \
kqueue.hpp \
@@ -91,10 +89,13 @@ libzmq_la_SOURCES = app_thread.hpp \
pair.hpp \
prefix_tree.hpp \
pub.hpp \
+ pull.hpp \
+ push.hpp \
queue.hpp \
rep.hpp \
req.hpp \
select.hpp \
+ semaphore.hpp \
session.hpp \
signaler.hpp \
socket_base.hpp \
@@ -105,7 +106,6 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
- pull.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
@@ -121,11 +121,9 @@ libzmq_la_SOURCES = app_thread.hpp \
zmq_engine.hpp \
zmq_init.hpp \
zmq_listener.hpp \
- app_thread.cpp \
command.cpp \
ctx.cpp \
devpoll.cpp \
- push.cpp \
epoll.cpp \
err.cpp \
forwarder.cpp \
@@ -139,13 +137,15 @@ libzmq_la_SOURCES = app_thread.hpp \
object.cpp \
options.cpp \
owned.cpp \
+ pair.cpp \
pgm_receiver.cpp \
pgm_sender.cpp \
pgm_socket.cpp \
- pair.cpp \
prefix_tree.cpp \
pipe.cpp \
poll.cpp \
+ pull.cpp \
+ push.cpp \
pub.cpp \
queue.cpp \
rep.cpp \
@@ -160,7 +160,6 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
- pull.cpp \
uuid.cpp \
xrep.cpp \
xreq.cpp \
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
deleted file mode 100644
index ac59464..0000000
--- a/src/app_thread.cpp
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#include <new>
-#include <algorithm>
-
-#include "../include/zmq.h"
-
-#include "platform.hpp"
-
-#if defined ZMQ_HAVE_WINDOWS
-#include "windows.hpp"
-#if defined _MSC_VER
-#include <intrin.h>
-#endif
-#else
-#include <unistd.h>
-#endif
-
-#include "app_thread.hpp"
-#include "ctx.hpp"
-#include "err.hpp"
-#include "pipe.hpp"
-#include "config.hpp"
-#include "socket_base.hpp"
-#include "pair.hpp"
-#include "pub.hpp"
-#include "sub.hpp"
-#include "req.hpp"
-#include "rep.hpp"
-#include "xreq.hpp"
-#include "xrep.hpp"
-#include "pull.hpp"
-#include "push.hpp"
-
-// If the RDTSC is available we use it to prevent excessive
-// polling for commands. The nice thing here is that it will work on any
-// system with x86 architecture and gcc or MSVC compiler.
-#if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\
- (defined _MSC_VER && (defined _M_IX86 || defined _M_X64))
-#define ZMQ_DELAY_COMMANDS
-#endif
-
-zmq::app_thread_t::app_thread_t (ctx_t *ctx_,
- uint32_t thread_slot_) :
- object_t (ctx_, thread_slot_),
- last_processing_time (0),
- terminated (false)
-{
-}
-
-zmq::app_thread_t::~app_thread_t ()
-{
- zmq_assert (sockets.empty ());
-}
-
-void zmq::app_thread_t::stop ()
-{
- send_stop ();
-}
-
-zmq::signaler_t *zmq::app_thread_t::get_signaler ()
-{
- return &signaler;
-}
-
-bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)
-{
- bool received;
- command_t cmd;
- if (block_) {
- received = signaler.recv (&cmd, true);
- zmq_assert (received);
- }
- else {
-
-#if defined ZMQ_DELAY_COMMANDS
- // Optimised version of command processing - it doesn't have to check
- // for incoming commands each time. It does so only if certain time
- // elapsed since last command processing. Command delay varies
- // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
- // etc. The optimisation makes sense only on platforms where getting
- // a timestamp is a very cheap operation (tens of nanoseconds).
- if (throttle_) {
-
- // Get timestamp counter.
-#if defined __GNUC__
- uint32_t low;
- uint32_t high;
- __asm__ volatile ("rdtsc" : "=a" (low), "=d" (high));
- uint64_t current_time = (uint64_t) high << 32 | low;
-#elif defined _MSC_VER
- uint64_t current_time = __rdtsc ();
-#else
-#error
-#endif
-
- // Check whether certain time have elapsed since last command
- // processing.
- if (current_time - last_processing_time <= max_command_delay)
- return !terminated;
- last_processing_time = current_time;
- }
-#endif
-
- // Check whether there are any commands pending for this thread.
- received = signaler.recv (&cmd, false);
- }
-
- // Process all the commands available at the moment.
- while (received) {
- cmd.destination->process_command (cmd);
- received = signaler.recv (&cmd, false);
- }
-
- return !terminated;
-}
-
-zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
-{
- socket_base_t *s = NULL;
- switch (type_) {
- case ZMQ_PAIR:
- s = new (std::nothrow) pair_t (this);
- break;
- case ZMQ_PUB:
- s = new (std::nothrow) pub_t (this);
- break;
- case ZMQ_SUB:
- s = new (std::nothrow) sub_t (this);
- break;
- case ZMQ_REQ:
- s = new (std::nothrow) req_t (this);
- break;
- case ZMQ_REP:
- s = new (std::nothrow) rep_t (this);
- break;
- case ZMQ_XREQ:
- s = new (std::nothrow) xreq_t (this);
- break;
- case ZMQ_XREP:
- s = new (std::nothrow) xrep_t (this);
- break;
- case ZMQ_PULL:
- s = new (std::nothrow) pull_t (this);
- break;
- case ZMQ_PUSH:
- s = new (std::nothrow) push_t (this);
- break;
- default:
- if (sockets.empty ())
- get_ctx ()->no_sockets (this);
- errno = EINVAL;
- return NULL;
- }
- zmq_assert (s);
-
- sockets.push_back (s);
-
- return s;
-}
-
-void zmq::app_thread_t::remove_socket (socket_base_t *socket_)
-{
- sockets.erase (socket_);
- if (sockets.empty ())
- get_ctx ()->no_sockets (this);
-}
-
-void zmq::app_thread_t::process_stop ()
-{
- terminated = true;
-}
-
-bool zmq::app_thread_t::is_terminated ()
-{
- return terminated;
-}
-
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
deleted file mode 100644
index f0deaab..0000000
--- a/src/app_thread.hpp
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- Copyright (c) 2007-2010 iMatix Corporation
-
- This file is part of 0MQ.
-
- 0MQ is free software; you can redistribute it and/or modify it under
- the terms of the Lesser GNU General Public License as published by
- the Free Software Foundation; either version 3 of the License, or
- (at your option) any later version.
-
- 0MQ is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- Lesser GNU General Public License for more details.
-
- You should have received a copy of the Lesser GNU General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
-*/
-
-#ifndef __ZMQ_APP_THREAD_HPP_INCLUDED__
-#define __ZMQ_APP_THREAD_HPP_INCLUDED__
-
-#include <vector>
-
-#include "stdint.hpp"
-#include "object.hpp"
-#include "yarray.hpp"
-#include "signaler.hpp"
-
-namespace zmq
-{
-
- class app_thread_t : public object_t
- {
- public:
-
- app_thread_t (class ctx_t *ctx_, uint32_t thread_slot_);
-
- ~app_thread_t ();
-
- // Interrupt blocking call if the app thread is stuck in one.
- // This function is is called from a different thread!
- void stop ();
-
- // Returns signaler associated with this application thread.
- signaler_t *get_signaler ();
-
- // Processes commands sent to this thread (if any). If 'block' is
- // set to true, returns only after at least one command was processed.
- // If throttle argument is true, commands are processed at most once
- // in a predefined time period. The function returns false is the
- // associated context was terminated, true otherwise.
- bool process_commands (bool block_, bool throttle_);
-
- // Create a socket of a specified type.
- class socket_base_t *create_socket (int type_);
-
- // Unregister the socket from the app_thread (called by socket itself).
- void remove_socket (class socket_base_t *socket_);
-
- // Returns true is the associated context was already terminated.
- bool is_terminated ();
-
- private:
-
- // Command handlers.
- void process_stop ();
-
- // All the sockets created from this application thread.
- typedef yarray_t <socket_base_t> sockets_t;
- sockets_t sockets;
-
- // App thread's signaler object.
- signaler_t signaler;
-
- // Timestamp of when commands were processed the last time.
- uint64_t last_processing_time;
-
- // If true, 'stop' command was already received.
- bool terminated;
-
- app_thread_t (const app_thread_t&);
- void operator = (const app_thread_t&);
- };
-
-}
-
-#endif
diff --git a/src/config.hpp b/src/config.hpp
index 2c0ac2d..83e612a 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -27,9 +27,8 @@ namespace zmq
enum
{
- // Maximal number of OS threads that can own 0MQ sockets
- // at the same time.
- max_app_threads = 512,
+ // Maximum number of sockets that can be opened at the same time.
+ max_sockets = 512,
// Number of new messages in message pipe needed to trigger new memory
// allocation. Setting this parameter to 256 decreases the impact of
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 397f692..91157a5 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -24,7 +24,6 @@
#include "ctx.hpp"
#include "socket_base.hpp"
-#include "app_thread.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
#include "err.hpp"
@@ -32,11 +31,12 @@
#if defined ZMQ_HAVE_WINDOWS
#include "windows.h"
+#else
+#include "unistd.h"
#endif
zmq::ctx_t::ctx_t (uint32_t io_threads_) :
- sockets (0),
- terminated (false)
+ no_sockets_notify (false)
{
#ifdef ZMQ_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple
@@ -50,44 +50,32 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
#endif
// Initialise the array of signalers.
- signalers_count = max_app_threads + io_threads_;
- signalers = (signaler_t**) malloc (sizeof (signaler_t*) * signalers_count);
- zmq_assert (signalers);
- memset (signalers, 0, sizeof (signaler_t*) * signalers_count);
+ slot_count = max_sockets + io_threads_;
+ slots = (signaler_t**) malloc (sizeof (signaler_t*) * slot_count);
+ zmq_assert (slots);
// Create I/O thread objects and launch them.
for (uint32_t i = 0; i != io_threads_; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
zmq_assert (io_thread);
io_threads.push_back (io_thread);
- signalers [i] = io_thread->get_signaler ();
+ slots [i] = io_thread->get_signaler ();
io_thread->start ();
}
-}
-
-int zmq::ctx_t::term ()
-{
- // First send stop command to application threads so that any
- // blocking calls are interrupted.
- for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
- app_threads [i].app_thread->stop ();
-
- // Then mark context as terminated.
- term_sync.lock ();
- zmq_assert (!terminated);
- terminated = true;
- bool destroy = (sockets == 0);
- term_sync.unlock ();
-
- // If there are no sockets open, destroy the context immediately.
- if (destroy)
- delete this;
- return 0;
+ // In the unused part of the slot array, create a list of empty slots.
+ for (uint32_t i = slot_count - 1; i >= io_threads_; i--) {
+ empty_slots.push_back (i);
+ slots [i] = NULL;
+ }
}
zmq::ctx_t::~ctx_t ()
{
+ // Check that there are no remaining open or zombie sockets.
+ zmq_assert (sockets.empty ());
+ zmq_assert (zombies.empty ());
+
// Ask I/O threads to terminate. If stop signal wasn't sent to I/O
// thread subsequent invocation of destructor would hang-up.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
@@ -97,18 +85,10 @@ zmq::ctx_t::~ctx_t ()
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
delete io_threads [i];
- // Close all application theads, sockets, io_objects etc.
- for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
- delete app_threads [i].app_thread;
-
- // Deallocate all the orphaned pipes.
- while (!pipes.empty ())
- delete *pipes.begin ();
-
- // Deallocate the array of pointers to signalers. No special work is
+ // Deallocate the array of slot. No special work is
// needed as signalers themselves were deallocated with their
- // corresponding (app_/io_) thread objects.
- free (signalers);
+ // corresponding io_thread/socket objects.
+ free (slots);
#ifdef ZMQ_HAVE_WINDOWS
// On Windows, uninitialise socket layer.
@@ -117,110 +97,113 @@ zmq::ctx_t::~ctx_t ()
#endif
}
-zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
+int zmq::ctx_t::term ()
{
- app_threads_sync.lock ();
-
- // Find whether the calling thread has app_thread_t object associated
- // already. At the same time find an unused app_thread_t so that it can
- // be used if there's no associated object for the calling thread.
- // Check whether thread ID is already assigned. If so, return it.
- app_threads_t::size_type unused = app_threads.size ();
- app_threads_t::size_type current;
- for (current = 0; current != app_threads.size (); current++) {
- if (app_threads [current].associated &&
- thread_t::equal (thread_t::id (), app_threads [current].tid))
- break;
- if (!app_threads [current].associated)
- unused = current;
+ // First send stop command to sockets so that any
+ // blocking calls are interrupted.
+ for (sockets_t::size_type i = 0; i != sockets.size (); i++)
+ sockets [i]->stop ();
+
+ // Find out whether there are any open sockets to care about.
+ // If so, sleep till they are closed. Note that we can use
+ // no_sockets_notify safely out of the critical section as once set
+ // its value is never changed again.
+ slot_sync.lock ();
+ if (!sockets.empty ())
+ no_sockets_notify = true;
+ slot_sync.unlock ();
+ if (no_sockets_notify)
+ no_sockets_sync.wait ();
+
+ // At this point there's only one application thread (this one) remaining.
+ // We don't even have to synchronise access to data.
+ zmq_assert (sockets.empty ());
+
+ // Get rid of remaining zombie sockets.
+ while (!zombies.empty ()) {
+ dezombify ();
+
+ // Sleep for 1ms not to end up busy-looping in the case the I/O threads
+ // are still busy sending data. We can possibly add a grand poll here
+ // (polling for fds associated with all the zombie sockets), but it's
+ // probably not worth of implementing it.
+#if defined ZMQ_HAVE_WINDOWS
+ Sleep (1);
+#else
+ usleep (1000);
+#endif
}
- // If no app_thread_t is associated with the calling thread,
- // associate it with one of the unused app_thread_t objects.
- if (current == app_threads.size ()) {
+ // Deallocate the resources.
+ delete this;
- // If all the existing app_threads are already used, create one more.
- if (unused == app_threads.size ()) {
-
- // If max_app_threads limit was reached, return error.
- if (app_threads.size () == max_app_threads) {
- app_threads_sync.unlock ();
- errno = EMTHREAD;
- return NULL;
- }
+ return 0;
+}
- // Create the new application thread proxy object.
- app_thread_info_t info;
- memset (&info, 0, sizeof (info));
- info.associated = false;
- info.app_thread = new (std::nothrow) app_thread_t (this,
- io_threads.size () + app_threads.size ());
- zmq_assert (info.app_thread);
- signalers [io_threads.size () + app_threads.size ()] =
- info.app_thread->get_signaler ();
- app_threads.push_back (info);
- }
+zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
+{
+ slot_sync.lock ();
- // Incidentally, this works both when there is an unused app_thread
- // and when a new one is created.
- current = unused;
+ // Free the slots, if possible.
+ dezombify ();
- // Associate the selected app_thread with the OS thread.
- app_threads [current].associated = true;
- app_threads [current].tid = thread_t::id ();
+ // If max_sockets limit was reached, return error.
+ if (empty_slots.empty ()) {
+ slot_sync.unlock ();
+ errno = EMFILE;
+ return NULL;
}
- app_thread_t *thread = app_threads [current].app_thread;
- app_threads_sync.unlock ();
+ // Choose a slot for the socket.
+ uint32_t slot = empty_slots.back ();
+ empty_slots.pop_back ();
- socket_base_t *s = thread->create_socket (type_);
- if (!s)
+ // Create the socket and register its signaler.
+ socket_base_t *s = socket_base_t::create (type_, this, slot);
+ if (!s) {
+ empty_slots.push_back (slot);
+ slot_sync.unlock ();
return NULL;
+ }
+ sockets.push_back (s);
+ slots [slot] = s->get_signaler ();
- term_sync.lock ();
- s