From 0e9ab2e8a3f5bc22f2c331c14236a2918a5512a8 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Sat, 21 Nov 2009 20:59:55 +0100 Subject: inproc transport - initial commit --- src/dispatcher.cpp | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) (limited to 'src/dispatcher.cpp') diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 1f6b4f0..1e41ee8 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -20,6 +20,7 @@ #include "../bindings/c/zmq.h" #include "dispatcher.hpp" +#include "socket_base.hpp" #include "app_thread.hpp" #include "io_thread.hpp" #include "platform.hpp" @@ -202,3 +203,58 @@ void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_) zmq_assert (erased == 1); pipes_sync.unlock (); } + +int zmq::dispatcher_t::register_endpoint (const char *addr_, + socket_base_t *socket_) +{ + endpoints_sync.lock (); + + bool inserted = endpoints.insert (std::make_pair (addr_, socket_)).second; + if (!inserted) { + errno = EADDRINUSE; + endpoints_sync.unlock (); + return -1; + } + + endpoints_sync.unlock (); + return 0; +} + +void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_) +{ + endpoints_sync.lock (); + + endpoints_t::iterator it = endpoints.begin (); + while (it != endpoints.end ()) { + if (it->second == socket_) { + endpoints_t::iterator to_erase = it; + it++; + endpoints.erase (to_erase); + continue; + } + it++; + } + + endpoints_sync.unlock (); +} + +zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_) +{ + endpoints_sync.lock (); + + endpoints_t::iterator it = endpoints.find (addr_); + if (it == endpoints.end ()) { + endpoints_sync.unlock (); + errno = ECONNREFUSED; + return NULL; + } + socket_base_t *endpoint = it->second; + + // Increment the command sequence number of the peer so that it won't + // get deallocated until "bind" command is issued by the caller. + endpoint->inc_seqnum (); + + endpoints_sync.unlock (); + return endpoint; +} + -- cgit v1.2.3