summaryrefslogtreecommitdiff
path: root/src/dispatcher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/dispatcher.cpp')
-rw-r--r--src/dispatcher.cpp56
1 files changed, 56 insertions, 0 deletions
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp
index 1f6b4f0..1e41ee8 100644
--- a/src/dispatcher.cpp
+++ b/src/dispatcher.cpp
@@ -20,6 +20,7 @@
#include "../bindings/c/zmq.h"
#include "dispatcher.hpp"
+#include "socket_base.hpp"
#include "app_thread.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
@@ -202,3 +203,58 @@ void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_)
zmq_assert (erased == 1);
pipes_sync.unlock ();
}
+
+int zmq::dispatcher_t::register_endpoint (const char *addr_,
+ socket_base_t *socket_)
+{
+ endpoints_sync.lock ();
+
+ bool inserted = endpoints.insert (std::make_pair (addr_, socket_)).second;
+ if (!inserted) {
+ errno = EADDRINUSE;
+ endpoints_sync.unlock ();
+ return -1;
+ }
+
+ endpoints_sync.unlock ();
+ return 0;
+}
+
+void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_)
+{
+ endpoints_sync.lock ();
+
+ endpoints_t::iterator it = endpoints.begin ();
+ while (it != endpoints.end ()) {
+ if (it->second == socket_) {
+ endpoints_t::iterator to_erase = it;
+ it++;
+ endpoints.erase (to_erase);
+ continue;
+ }
+ it++;
+ }
+
+ endpoints_sync.unlock ();
+}
+
+zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_)
+{
+ endpoints_sync.lock ();
+
+ endpoints_t::iterator it = endpoints.find (addr_);
+ if (it == endpoints.end ()) {
+ endpoints_sync.unlock ();
+ errno = ECONNREFUSED;
+ return NULL;
+ }
+ socket_base_t *endpoint = it->second;
+
+ // Increment the command sequence number of the peer so that it won't
+ // get deallocated until "bind" command is issued by the caller.
+ endpoint->inc_seqnum ();
+
+ endpoints_sync.unlock ();
+ return endpoint;
+}
+