summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--AUTHORS1
-rw-r--r--src/ctx.cpp18
-rw-r--r--src/ctx.hpp15
-rw-r--r--src/object.cpp6
-rw-r--r--src/object.hpp4
-rw-r--r--src/socket_base.cpp34
6 files changed, 52 insertions, 26 deletions
diff --git a/AUTHORS b/AUTHORS
index fd03cc7..3943d94 100644
--- a/AUTHORS
+++ b/AUTHORS
@@ -16,6 +16,7 @@ Conrad D. Steenberg <conrad.steenberg@caltech.edu>
Dhammika Pathirana <dhammika@gmail.com>
Dhruva Krishnamurthy <dhruva@ymail.com>
Dirk O. Kaar <dok@dok-net.net>
+Douglas Creager <douglas.creager@redjack.com>
Erich Heine <sophacles@gmail.com>
Erik Rigtorp <erik@rigtorp.com>
Frank Denis <zeromq@pureftpd.org>
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 59ba2db..3849497 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -242,13 +242,12 @@ zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
return io_threads [result];
}
-int zmq::ctx_t::register_endpoint (const char *addr_,
- socket_base_t *socket_)
+int zmq::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
{
endpoints_sync.lock ();
bool inserted = endpoints.insert (endpoints_t::value_type (
- std::string (addr_), socket_)).second;
+ std::string (addr_), endpoint_)).second;
if (!inserted) {
errno = EADDRINUSE;
endpoints_sync.unlock ();
@@ -265,7 +264,7 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
endpoints_t::iterator it = endpoints.begin ();
while (it != endpoints.end ()) {
- if (it->second == socket_) {
+ if (it->second.socket == socket_) {
endpoints_t::iterator to_erase = it;
it++;
endpoints.erase (to_erase);
@@ -277,7 +276,7 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
endpoints_sync.unlock ();
}
-zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_)
+zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
{
endpoints_sync.lock ();
@@ -285,18 +284,19 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_)
if (it == endpoints.end ()) {
endpoints_sync.unlock ();
errno = ECONNREFUSED;
- return NULL;
+ endpoint_t empty = {NULL, options_t()};
+ return empty;
}
- socket_base_t *endpoint = it->second;
+ endpoint_t *endpoint = &it->second;
// Increment the command sequence number of the peer so that it won't
// get deallocated until "bind" command is issued by the caller.
// The subsequent 'bind' has to be called with inc_seqnum parameter
// set to false, so that the seqnum isn't incremented twice.
- endpoint->inc_seqnum ();
+ endpoint->socket->inc_seqnum ();
endpoints_sync.unlock ();
- return endpoint;
+ return *endpoint;
}
void zmq::ctx_t::log (zmq_msg_t *msg_)
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 0f2dd52..f35aa12 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -34,9 +34,18 @@
#include "mutex.hpp"
#include "stdint.hpp"
#include "thread.hpp"
+#include "options.hpp"
namespace zmq
{
+ // Information associated with inproc endpoint. Note that endpoint options
+ // are registered as well so that the peer can access them without a need
+ // for synchronisation, handshaking or similar.
+ struct endpoint_t
+ {
+ class socket_base_t *socket;
+ options_t options;
+ };
// Context object encapsulates all the global state associated with
// the library.
@@ -70,9 +79,9 @@ namespace zmq
class io_thread_t *choose_io_thread (uint64_t affinity_);
// Management of inproc endpoints.
- int register_endpoint (const char *addr_, class socket_base_t *socket_);
+ int register_endpoint (const char *addr_, endpoint_t &endpoint_);
void unregister_endpoints (class socket_base_t *socket_);
- class socket_base_t *find_endpoint (const char *addr_);
+ endpoint_t find_endpoint (const char *addr_);
// Logging.
void log (zmq_msg_t *msg_);
@@ -122,7 +131,7 @@ namespace zmq
mailbox_t **slots;
// List of inproc endpoints within this context.
- typedef std::map <std::string, class socket_base_t*> endpoints_t;
+ typedef std::map <std::string, endpoint_t> endpoints_t;
endpoints_t endpoints;
// Synchronisation of access to the list of inproc endpoints.
diff --git a/src/object.cpp b/src/object.cpp
index dd8fc24..63b42b4 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -123,9 +123,9 @@ void zmq::object_t::process_command (command_t &cmd_)
deallocate_command (&cmd_);
}
-int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_)
+int zmq::object_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
{
- return ctx->register_endpoint (addr_, socket_);
+ return ctx->register_endpoint (addr_, endpoint_);
}
void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
@@ -133,7 +133,7 @@ void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
return ctx->unregister_endpoints (socket_);
}
-zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_)
+zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
{
return ctx->find_endpoint (addr_);
}
diff --git a/src/object.hpp b/src/object.hpp
index f8cfdda..c81656e 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -46,9 +46,9 @@ namespace zmq
// Using following function, socket is able to access global
// repository of inproc endpoints.
- int register_endpoint (const char *addr_, class socket_base_t *socket_);
+ int register_endpoint (const char *addr_, struct endpoint_t &endpoint_);
void unregister_endpoints (class socket_base_t *socket_);
- class socket_base_t *find_endpoint (const char *addr_);
+ struct endpoint_t find_endpoint (const char *addr_);
// Logs an message.
void log (zmq_msg_t *msg_);
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index f48b48b..5c21b8f 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -300,8 +300,10 @@ int zmq::socket_base_t::bind (const char *addr_)
if (rc != 0)
return -1;
- if (protocol == "inproc" || protocol == "sys")
- return register_endpoint (addr_, this);
+ if (protocol == "inproc" || protocol == "sys") {
+ endpoint_t endpoint = {this, options};
+ return register_endpoint (addr_, endpoint);
+ }
if (protocol == "tcp" || protocol == "ipc") {
@@ -361,24 +363,38 @@ int zmq::socket_base_t::connect (const char *addr_)
// as there's no 'reconnect' functionality implemented. Once that
// is in place we should follow generic pipe creation algorithm.
- // Find the peer socket.
- socket_base_t *peer = find_endpoint (addr_);
- if (!peer)
+ // Find the peer endpoint.
+ endpoint_t peer = find_endpoint (addr_);
+ if (!peer.socket)
return -1;
reader_t *inpipe_reader = NULL;
writer_t *inpipe_writer = NULL;
reader_t *outpipe_reader = NULL;
writer_t *outpipe_writer = NULL;
-
+
+ // The total HWM for an inproc connection should be the sum of
+ // the binder's HWM and the connector's HWM. (Similarly for the
+ // SWAP.)
+ int64_t hwm;
+ if (options.hwm == 0 || peer.options.hwm == 0)
+ hwm = 0;
+ else
+ hwm = options.hwm + peer.options.hwm;
+ int64_t swap;
+ if (options.swap == 0 && peer.options.swap == 0)
+ swap = 0;
+ else
+ swap = options.swap + peer.options.swap;
+
// Create inbound pipe, if required.
if (options.requires_in)
- create_pipe (this, peer, options.hwm, options.swap,
+ create_pipe (this, peer.socket, hwm, swap,
&inpipe_reader, &inpipe_writer);
// Create outbound pipe, if required.
if (options.requires_out)
- create_pipe (peer, this, options.hwm, options.swap,
+ create_pipe (peer.socket, this, hwm, swap,
&outpipe_reader, &outpipe_writer);
// Attach the pipes to this socket object.
@@ -387,7 +403,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach the pipes to the peer socket. Note that peer's seqnum
// was incremented in find_endpoint function. We don't need it
// increased here.
- send_bind (peer, outpipe_reader, inpipe_writer,
+ send_bind (peer.socket, outpipe_reader, inpipe_writer,
options.identity, false);
return 0;