diff options
author | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-11-21 20:59:55 +0100 |
---|---|---|
committer | Martin Sustrik <sustrik@fastmq.commkdir> | 2009-11-21 20:59:55 +0100 |
commit | 0e9ab2e8a3f5bc22f2c331c14236a2918a5512a8 (patch) | |
tree | c91f1e131a4a84a374c466f147958c5971f36ad7 /src | |
parent | 14f2fecdcd9732fe741c211138a4ba327816a937 (diff) |
inproc transport - initial commit
Diffstat (limited to 'src')
-rw-r--r-- | src/dispatcher.cpp | 56 | ||||
-rw-r--r-- | src/dispatcher.hpp | 12 | ||||
-rw-r--r-- | src/object.cpp | 15 | ||||
-rw-r--r-- | src/object.hpp | 6 | ||||
-rw-r--r-- | src/socket_base.cpp | 62 | ||||
-rw-r--r-- | src/socket_base.hpp | 12 | ||||
-rw-r--r-- | src/zmq.cpp | 6 |
7 files changed, 165 insertions, 4 deletions
diff --git a/src/dispatcher.cpp b/src/dispatcher.cpp index 1f6b4f0..1e41ee8 100644 --- a/src/dispatcher.cpp +++ b/src/dispatcher.cpp @@ -20,6 +20,7 @@ #include "../bindings/c/zmq.h" #include "dispatcher.hpp" +#include "socket_base.hpp" #include "app_thread.hpp" #include "io_thread.hpp" #include "platform.hpp" @@ -202,3 +203,58 @@ void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_) zmq_assert (erased == 1); pipes_sync.unlock (); } + +int zmq::dispatcher_t::register_endpoint (const char *addr_, + socket_base_t *socket_) +{ + endpoints_sync.lock (); + + bool inserted = endpoints.insert (std::make_pair (addr_, socket_)).second; + if (!inserted) { + errno = EADDRINUSE; + endpoints_sync.unlock (); + return -1; + } + + endpoints_sync.unlock (); + return 0; +} + +void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_) +{ + endpoints_sync.lock (); + + endpoints_t::iterator it = endpoints.begin (); + while (it != endpoints.end ()) { + if (it->second == socket_) { + endpoints_t::iterator to_erase = it; + it++; + endpoints.erase (to_erase); + continue; + } + it++; + } + + endpoints_sync.unlock (); +} + +zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_) +{ + endpoints_sync.lock (); + + endpoints_t::iterator it = endpoints.find (addr_); + if (it == endpoints.end ()) { + endpoints_sync.unlock (); + errno = ECONNREFUSED; + return NULL; + } + socket_base_t *endpoint = it->second; + + // Increment the command sequence number of the peer so that it won't + // get deallocated until "bind" command is issued by the caller. + endpoint->inc_seqnum (); + + endpoints_sync.unlock (); + return endpoint; +} + diff --git a/src/dispatcher.hpp b/src/dispatcher.hpp index 23b6a33..8364d4d 100644 --- a/src/dispatcher.hpp +++ b/src/dispatcher.hpp @@ -97,6 +97,11 @@ namespace zmq void register_pipe (class pipe_t *pipe_); void unregister_pipe (class pipe_t *pipe_); + // Management of inproc endpoints. + int register_endpoint (const char *addr_, class socket_base_t *socket_); + void unregister_endpoints (class socket_base_t *socket_); + class socket_base_t *find_endpoint (const char *addr_); + private: ~dispatcher_t (); @@ -149,6 +154,13 @@ namespace zmq // and 'terminated' flag). mutex_t term_sync; + // List of inproc endpoints within this context. + typedef std::map <std::string, class socket_base_t*> endpoints_t; + endpoints_t endpoints; + + // Synchronisation of access to the list of inproc endpoints. + mutex_t endpoints_sync; + dispatcher_t (const dispatcher_t&); void operator = (const dispatcher_t&); }; diff --git a/src/object.cpp b/src/object.cpp index 1433b7b..d24e477 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -122,6 +122,21 @@ void zmq::object_t::unregister_pipe (class pipe_t *pipe_) dispatcher->unregister_pipe (pipe_); } +int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_) +{ + return dispatcher->register_endpoint (addr_, socket_); +} + +void zmq::object_t::unregister_endpoints (socket_base_t *socket_) +{ + return dispatcher->unregister_endpoints (socket_); +} + +zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_) +{ + return dispatcher->find_endpoint (addr_); +} + zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) { return dispatcher->choose_io_thread (taskset_); diff --git a/src/object.hpp b/src/object.hpp index 1954071..6331372 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -49,6 +49,12 @@ namespace zmq protected: + // Using following function, socket is able to access global + // repository of inproc endpoints. + int register_endpoint (const char *addr_, class socket_base_t *socket_); + void unregister_endpoints (class socket_base_t *socket_); + class socket_base_t *find_endpoint (const char *addr_); + // Derived object can use following functions to interact with // global repositories. See dispatcher.hpp for function details. int thread_slot_count (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 6583608..86e1205 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -43,7 +43,9 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : pending_term_acks (0), ticks (0), app_thread (parent_), - shutting_down (false) + shutting_down (false), + sent_seqnum (0), + processed_seqnum (0) { } @@ -81,6 +83,9 @@ int zmq::socket_base_t::bind (const char *addr_) addr_type = addr.substr (0, pos); addr_args = addr.substr (pos + 3); + if (addr_type == "inproc") + return register_endpoint (addr_args.c_str (), this); + if (addr_type == "tcp") { zmq_listener_t *listener = new zmq_listener_t ( choose_io_thread (options.affinity), this, options); @@ -126,6 +131,42 @@ int zmq::socket_base_t::connect (const char *addr_) addr_type = addr.substr (0, pos); addr_args = addr.substr (pos + 3); + if (addr_type == "inproc") { + + // Find the peer socket. + socket_base_t *peer = find_endpoint (addr_args.c_str ()); + if (!peer) + return -1; + + pipe_t *in_pipe = NULL; + pipe_t *out_pipe = NULL; + + // Create inbound pipe, if required. + if (options.requires_in) { + in_pipe = new pipe_t (this, peer, options.hwm, options.lwm); + zmq_assert (in_pipe); + } + + // Create outbound pipe, if required. + if (options.requires_out) { + out_pipe = new pipe_t (peer, this, options.hwm, options.lwm); + zmq_assert (out_pipe); + } + + // Attach the pipes to this socket object. + attach_pipes (in_pipe ? &in_pipe->reader : NULL, + out_pipe ? &out_pipe->writer : NULL); + + // Attach the pipes to the peer socket. Note that peer's seqnum + // was incremented in find_endpoint function. When this command + // is delivered, peer will consider the seqnum to be processed. + // TODO: Seems that 'session' parameter is unused... + send_bind (peer, NULL, out_pipe ? &out_pipe->reader : NULL, + in_pipe ? &in_pipe->writer : NULL); + + return 0; + } + // Create the session. io_thread_t *io_thread = choose_io_thread (options.affinity); session_t *session = new session_t (io_thread, this, session_name.c_str (), @@ -319,13 +360,24 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::close () { + shutting_down = true; + + // Let the thread know that the socket is no longer available. app_thread->remove_socket (this); // Pointer to the dispatcher must be retrieved before the socket is // deallocated. Afterwards it is not available. dispatcher_t *dispatcher = get_dispatcher (); - shutting_down = true; + // Unregister all inproc endpoints associated with this socket. + // From this point we are sure that inc_seqnum won't be called again + // on this object. + dispatcher->unregister_endpoints (this); + + // Wait till all undelivered commands are delivered. This should happen + // very quickly. There's no way to wait here for extensive period of time. + while (processed_seqnum != sent_seqnum.get ()) + app_thread->process_commands (true, false); while (true) { @@ -364,6 +416,12 @@ int zmq::socket_base_t::close () return 0; } +void zmq::socket_base_t::inc_seqnum () +{ + // NB: This function may be called from a different thread! + sent_seqnum.add (1); +} + zmq::app_thread_t *zmq::socket_base_t::get_thread () { return app_thread; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 49ff5a5..b6df8c4 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -33,6 +33,7 @@ #include "mutex.hpp" #include "options.hpp" #include "stdint.hpp" +#include "atomic_counter.hpp" namespace zmq { @@ -54,6 +55,11 @@ namespace zmq int recv (zmq_msg_t *msg_, int flags_); int close (); + // When another owned object wants to send command to this object + // it calls this function to let it know it should not shut down + // before the command is delivered. + void inc_seqnum (); + // This function is used by the polling mechanism to determine // whether the socket belongs to the application thread the poll // is called from. @@ -132,6 +138,12 @@ namespace zmq // started. bool shutting_down; + // Sequence number of the last command sent to this object. + atomic_counter_t sent_seqnum; + + // Sequence number of the last command processed by this object. + uint64_t processed_seqnum; + // List of existing sessions. This list is never referenced from within // the socket, instead it is used by I/O objects owned by the session. // As those objects can live in different threads, the access is diff --git a/src/zmq.cpp b/src/zmq.cpp index 7952b61..9b66be8 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -198,8 +198,10 @@ size_t zmq_msg_size (zmq_msg_t *msg_) void *zmq_init (int app_threads_, int io_threads_, int flags_) { - // There should be at least a single thread managed by the dispatcher. - if (app_threads_ <= 0 || io_threads_ <= 0 || + // There should be at least a single application thread managed + // by the dispatcher. There's no need for I/O threads if 0MQ is used + // only for inproc messaging + if (app_threads_ < 1 || io_threads_ < 0 || app_threads_ > 63 || io_threads_ > 63) { errno = EINVAL; return NULL; |