summaryrefslogtreecommitdiff
path: root/src/app_thread.cpp
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@fastmq.commkdir>2009-08-08 16:01:58 +0200
committerMartin Sustrik <sustrik@fastmq.commkdir>2009-08-08 16:01:58 +0200
commita8b410e66c3c75809c8e9c01dd3e35c579f02347 (patch)
tree7af63906dce0216f86e5ff0767efaddfd6492cfd /src/app_thread.cpp
parent0b5cc026fbe7ccc6de66907be29471562a2d344d (diff)
lockfree interaction patter for 3 theads implemented
Diffstat (limited to 'src/app_thread.cpp')
-rw-r--r--src/app_thread.cpp37
1 files changed, 28 insertions, 9 deletions
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 23a055a..3f76970 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <algorithm>
+
#include "../include/zmq.h"
#if defined ZMQ_HAVE_WINDOWS
@@ -26,10 +28,12 @@
#endif
#include "app_thread.hpp"
-#include "context.hpp"
+#include "i_api.hpp"
+#include "dispatcher.hpp"
#include "err.hpp"
#include "pipe.hpp"
#include "config.hpp"
+#include "socket_base.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@@ -39,8 +43,8 @@
#define ZMQ_DELAY_COMMANDS
#endif
-zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) :
- object_t (context_, thread_slot_),
+zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
+ object_t (dispatcher_, thread_slot_),
tid (0),
last_processing_time (0)
{
@@ -48,13 +52,9 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) :
zmq::app_thread_t::~app_thread_t ()
{
- // Ask all the sockets to start termination, then wait till it is complete.
- for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
- (*it)->stop ();
+ // Destroy all the sockets owned by this application thread.
for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
delete *it;
-
- delete this;
}
zmq::i_signaler *zmq::app_thread_t::get_signaler ()
@@ -123,9 +123,28 @@ void zmq::app_thread_t::process_commands (bool block_)
for (int i = 0; i != thread_slot_count (); i++) {
if (signals & (ypollset_t::signals_t (1) << i)) {
command_t cmd;
- while (context->read (i, get_thread_slot (), &cmd))
+ while (dispatcher->read (i, get_thread_slot (), &cmd))
cmd.destination->process_command (cmd);
}
}
}
}
+
+zmq::i_api *zmq::app_thread_t::create_socket (int type_)
+{
+ // TODO: type is ignored for the time being.
+ socket_base_t *s = new socket_base_t (this);
+ zmq_assert (s);
+ sockets.push_back (s);
+ return s;
+}
+
+void zmq::app_thread_t::remove_socket (i_api *socket_)
+{
+ // TODO: To speed this up we can possibly use the system where each socket
+ // holds its index (see I/O scheduler implementation).
+ sockets_t::iterator it = std::find (sockets.begin (), sockets.end (),
+ socket_);
+ zmq_assert (it != sockets.end ());
+ sockets.erase (it);
+}