summaryrefslogtreecommitdiff
path: root/src/context.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/context.cpp')
-rw-r--r--src/context.cpp139
1 files changed, 13 insertions, 126 deletions
diff --git a/src/context.cpp b/src/context.cpp
index ab4643e..6b071cf 100644
--- a/src/context.cpp
+++ b/src/context.cpp
@@ -20,15 +20,12 @@
#include "../include/zmq.h"
#include "context.hpp"
+#include "i_api.hpp"
#include "app_thread.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
#include "err.hpp"
#include "pipe.hpp"
-#include "pipe_reader.hpp"
-#include "pipe_writer.hpp"
-#include "session.hpp"
-#include "i_api.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.h"
@@ -72,37 +69,23 @@ zmq::context_t::context_t (int app_threads_, int io_threads_)
io_threads [i]->start ();
}
-void zmq::context_t::shutdown ()
-{
- delete this;
-}
-
zmq::context_t::~context_t ()
{
- // Ask I/O threads to terminate.
+ // Close all application theads, sockets, io_objects etc.
+ for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
+ delete app_threads [i];
+
+ // Ask I/O threads to terminate. If stop signal wasn't sent to I/O
+ // thread subsequent invocation of destructor would hang-up.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
io_threads [i]->stop ();
// Wait till I/O threads actually terminate.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
- io_threads [i]->join ();
-
- // At this point the current thread is the only thread with access to
- // our internal data. Deallocation will be done exclusively in this thread.
- for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
- app_threads [i]->shutdown ();
- for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
- io_threads [i]->shutdown ();
+ delete io_threads [i];
delete [] command_pipes;
- // Deallocate all the pipes, pipe readers and pipe writers.
- for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it++) {
- delete it->pipe;
- delete it->reader;
- delete it->writer;
- }
-
#ifdef ZMQ_HAVE_WINDOWS
// On Windows, uninitialise socket layer.
int rc = WSACleanup ();
@@ -123,7 +106,11 @@ zmq::i_api *zmq::context_t::create_socket (int type_)
threads_sync.unlock ();
return NULL;
}
- i_api *s = thread->create_socket (type_);
+
+ zmq_assert (false);
+ i_api *s = NULL;
+ //i_api *s = thread->create_socket (type_);
+
threads_sync.unlock ();
return s;
}
@@ -164,103 +151,3 @@ zmq::io_thread_t *zmq::context_t::choose_io_thread (uint64_t taskset_)
return io_threads [result];
}
-
-void zmq::context_t::create_pipe (object_t *reader_parent_,
- object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_,
- pipe_reader_t **reader_, pipe_writer_t **writer_)
-{
- // Create the pipe, reader & writer triple.
- pipe_t *pipe = new pipe_t;
- zmq_assert (pipe);
- pipe_reader_t *reader = new pipe_reader_t (reader_parent_, pipe,
- hwm_, lwm_);
- zmq_assert (reader);
- pipe_writer_t *writer = new pipe_writer_t (writer_parent_, pipe, reader,
- hwm_, lwm_);
- zmq_assert (writer);
- reader->set_peer (writer);
-
- // Store the pipe in the repository.
- pipe_info_t info = {pipe, reader, writer};
- pipes_sync.lock ();
- pipe->set_index (pipes.size ());
- pipes.push_back (info);
- pipes_sync.unlock ();
-
- *reader_ = reader;
- *writer_ = writer;
-}
-
-void zmq::context_t::destroy_pipe (pipe_t *pipe_)
-{
- // Remove the pipe from the repository.
- pipe_info_t info;
- pipes_sync.lock ();
- pipes_t::size_type i = pipe_->get_index ();
- info = pipes [i];
- pipes [i] = pipes.back ();
- pipes.pop_back ();
- pipes_sync.unlock ();
-
- // Deallocate the pipe and associated pipe reader & pipe writer.
- zmq_assert (info.pipe == pipe_);
- delete info.pipe;
- delete info.reader;
- delete info.writer;
-}
-
-int zmq::context_t::register_inproc_endpoint (const char *endpoint_,
- session_t *session_)
-{
- inproc_endpoint_sync.lock ();
- inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_);
-
- if (it != inproc_endpoints.end ()) {
- inproc_endpoint_sync.unlock ();
- errno = EADDRINUSE;
- return -1;
- }
-
- inproc_endpoints.insert (std::make_pair (endpoint_, session_));
-
- inproc_endpoint_sync.unlock ();
- return 0;
-}
-
-zmq::object_t *zmq::context_t::get_inproc_endpoint (const char *endpoint_)
-{
- inproc_endpoint_sync.lock ();
- inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_);
-
- if (it == inproc_endpoints.end ()) {
- inproc_endpoint_sync.unlock ();
- errno = EADDRNOTAVAIL;
- return NULL;
- }
-
- it->second->inc_seqnum ();
- object_t *session = it->second;
-
- inproc_endpoint_sync.unlock ();
- return session;
-}
-
-void zmq::context_t::unregister_inproc_endpoints (session_t *session_)
-{
- inproc_endpoint_sync.lock ();
-
- // Remove the connection from the repository.
- // TODO: Yes, the algorithm has O(n^2) complexity. Should be O(log n).
- for (inproc_endpoints_t::iterator it = inproc_endpoints.begin ();
- it != inproc_endpoints.end ();) {
- if (it->second == session_) {
- inproc_endpoints.erase (it);
- it = inproc_endpoints.begin ();
- }
- else
- it++;
- }
-
- inproc_endpoint_sync.unlock ();
-}
-