diff options
-rw-r--r-- | AUTHORS | 1 | ||||
-rw-r--r-- | src/ctx.cpp | 18 | ||||
-rw-r--r-- | src/ctx.hpp | 15 | ||||
-rw-r--r-- | src/object.cpp | 6 | ||||
-rw-r--r-- | src/object.hpp | 4 | ||||
-rw-r--r-- | src/socket_base.cpp | 34 |
6 files changed, 52 insertions, 26 deletions
@@ -16,6 +16,7 @@ Conrad D. Steenberg <conrad.steenberg@caltech.edu> Dhammika Pathirana <dhammika@gmail.com> Dhruva Krishnamurthy <dhruva@ymail.com> Dirk O. Kaar <dok@dok-net.net> +Douglas Creager <douglas.creager@redjack.com> Erich Heine <sophacles@gmail.com> Erik Rigtorp <erik@rigtorp.com> Frank Denis <zeromq@pureftpd.org> diff --git a/src/ctx.cpp b/src/ctx.cpp index 59ba2db..3849497 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -242,13 +242,12 @@ zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) return io_threads [result]; } -int zmq::ctx_t::register_endpoint (const char *addr_, - socket_base_t *socket_) +int zmq::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_) { endpoints_sync.lock (); bool inserted = endpoints.insert (endpoints_t::value_type ( - std::string (addr_), socket_)).second; + std::string (addr_), endpoint_)).second; if (!inserted) { errno = EADDRINUSE; endpoints_sync.unlock (); @@ -265,7 +264,7 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_) endpoints_t::iterator it = endpoints.begin (); while (it != endpoints.end ()) { - if (it->second == socket_) { + if (it->second.socket == socket_) { endpoints_t::iterator to_erase = it; it++; endpoints.erase (to_erase); @@ -277,7 +276,7 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_) endpoints_sync.unlock (); } -zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_) +zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) { endpoints_sync.lock (); @@ -285,18 +284,19 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_) if (it == endpoints.end ()) { endpoints_sync.unlock (); errno = ECONNREFUSED; - return NULL; + endpoint_t empty = {NULL, options_t()}; + return empty; } - socket_base_t *endpoint = it->second; + endpoint_t *endpoint = &it->second; // Increment the command sequence number of the peer so that it won't // get deallocated until "bind" command is issued by the caller. // The subsequent 'bind' has to be called with inc_seqnum parameter // set to false, so that the seqnum isn't incremented twice. - endpoint->inc_seqnum (); + endpoint->socket->inc_seqnum (); endpoints_sync.unlock (); - return endpoint; + return *endpoint; } void zmq::ctx_t::log (zmq_msg_t *msg_) diff --git a/src/ctx.hpp b/src/ctx.hpp index 0f2dd52..f35aa12 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -34,9 +34,18 @@ #include "mutex.hpp" #include "stdint.hpp" #include "thread.hpp" +#include "options.hpp" namespace zmq { + // Information associated with inproc endpoint. Note that endpoint options + // are registered as well so that the peer can access them without a need + // for synchronisation, handshaking or similar. + struct endpoint_t + { + class socket_base_t *socket; + options_t options; + }; // Context object encapsulates all the global state associated with // the library. @@ -70,9 +79,9 @@ namespace zmq class io_thread_t *choose_io_thread (uint64_t affinity_); // Management of inproc endpoints. - int register_endpoint (const char *addr_, class socket_base_t *socket_); + int register_endpoint (const char *addr_, endpoint_t &endpoint_); void unregister_endpoints (class socket_base_t *socket_); - class socket_base_t *find_endpoint (const char *addr_); + endpoint_t find_endpoint (const char *addr_); // Logging. void log (zmq_msg_t *msg_); @@ -122,7 +131,7 @@ namespace zmq mailbox_t **slots; // List of inproc endpoints within this context. - typedef std::map <std::string, class socket_base_t*> endpoints_t; + typedef std::map <std::string, endpoint_t> endpoints_t; endpoints_t endpoints; // Synchronisation of access to the list of inproc endpoints. diff --git a/src/object.cpp b/src/object.cpp index dd8fc24..63b42b4 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -123,9 +123,9 @@ void zmq::object_t::process_command (command_t &cmd_) deallocate_command (&cmd_); } -int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_) +int zmq::object_t::register_endpoint (const char *addr_, endpoint_t &endpoint_) { - return ctx->register_endpoint (addr_, socket_); + return ctx->register_endpoint (addr_, endpoint_); } void zmq::object_t::unregister_endpoints (socket_base_t *socket_) @@ -133,7 +133,7 @@ void zmq::object_t::unregister_endpoints (socket_base_t *socket_) return ctx->unregister_endpoints (socket_); } -zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_) +zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_) { return ctx->find_endpoint (addr_); } diff --git a/src/object.hpp b/src/object.hpp index f8cfdda..c81656e 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -46,9 +46,9 @@ namespace zmq // Using following function, socket is able to access global // repository of inproc endpoints. - int register_endpoint (const char *addr_, class socket_base_t *socket_); + int register_endpoint (const char *addr_, struct endpoint_t &endpoint_); void unregister_endpoints (class socket_base_t *socket_); - class socket_base_t *find_endpoint (const char *addr_); + struct endpoint_t find_endpoint (const char *addr_); // Logs an message. void log (zmq_msg_t *msg_); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index f48b48b..5c21b8f 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -300,8 +300,10 @@ int zmq::socket_base_t::bind (const char *addr_) if (rc != 0) return -1; - if (protocol == "inproc" || protocol == "sys") - return register_endpoint (addr_, this); + if (protocol == "inproc" || protocol == "sys") { + endpoint_t endpoint = {this, options}; + return register_endpoint (addr_, endpoint); + } if (protocol == "tcp" || protocol == "ipc") { @@ -361,24 +363,38 @@ int zmq::socket_base_t::connect (const char *addr_) // as there's no 'reconnect' functionality implemented. Once that // is in place we should follow generic pipe creation algorithm. - // Find the peer socket. - socket_base_t *peer = find_endpoint (addr_); - if (!peer) + // Find the peer endpoint. + endpoint_t peer = find_endpoint (addr_); + if (!peer.socket) return -1; reader_t *inpipe_reader = NULL; writer_t *inpipe_writer = NULL; reader_t *outpipe_reader = NULL; writer_t *outpipe_writer = NULL; - + + // The total HWM for an inproc connection should be the sum of + // the binder's HWM and the connector's HWM. (Similarly for the + // SWAP.) + int64_t hwm; + if (options.hwm == 0 || peer.options.hwm == 0) + hwm = 0; + else + hwm = options.hwm + peer.options.hwm; + int64_t swap; + if (options.swap == 0 && peer.options.swap == 0) + swap = 0; + else + swap = options.swap + peer.options.swap; + // Create inbound pipe, if required. if (options.requires_in) - create_pipe (this, peer, options.hwm, options.swap, + create_pipe (this, peer.socket, hwm, swap, &inpipe_reader, &inpipe_writer); // Create outbound pipe, if required. if (options.requires_out) - create_pipe (peer, this, options.hwm, options.swap, + create_pipe (peer.socket, this, hwm, swap, &outpipe_reader, &outpipe_writer); // Attach the pipes to this socket object. @@ -387,7 +403,7 @@ int zmq::socket_base_t::connect (const char *addr_) // Attach the pipes to the peer socket. Note that peer's seqnum // was incremented in find_endpoint function. We don't need it // increased here. - send_bind (peer, outpipe_reader, inpipe_writer, + send_bind (peer.socket, outpipe_reader, inpipe_writer, options.identity, false); return 0; |