summaryrefslogtreecommitdiff
path: root/src/socket_base.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r--src/socket_base.cpp79
1 files changed, 47 insertions, 32 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 720e8cd..222b769 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -141,6 +141,10 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "inproc") {
+ // TODO: inproc connect is specific with respect to creating pipes
+ // as there's no 'reconnect' functionality implemented. Once that
+ // is in place we should follow generic pipe creation algorithm.
+
// Find the peer socket.
socket_base_t *peer = find_endpoint (addr_args.c_str ());
if (!peer)
@@ -171,7 +175,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// was incremented in find_endpoint function. The callee is notified
// about the fact via the last parameter.
send_bind (peer, out_pipe ? &out_pipe->reader : NULL,
- in_pipe ? &in_pipe->writer : NULL, false);
+ in_pipe ? &in_pipe->writer : NULL, options.identity, false);
return 0;
}
@@ -182,31 +186,37 @@ int zmq::socket_base_t::connect (const char *addr_)
this, options);
zmq_assert (session);
- pipe_t *in_pipe = NULL;
- pipe_t *out_pipe = NULL;
+ // If 'immediate connect' feature is required, we'll created the pipes
+ // to the session straight away. Otherwise, they'll be created by the
+ // session once the connection is established.
+ if (options.immediate_connect) {
- // Create inbound pipe, if required.
- if (options.requires_in) {
- in_pipe = new (std::nothrow) pipe_t (this, session,
- options.hwm, options.lwm);
- zmq_assert (in_pipe);
+ pipe_t *in_pipe = NULL;
+ pipe_t *out_pipe = NULL;
- }
+ // Create inbound pipe, if required.
+ if (options.requires_in) {
+ in_pipe = new (std::nothrow) pipe_t (this, session,
+ options.hwm, options.lwm);
+ zmq_assert (in_pipe);
- // Create outbound pipe, if required.
- if (options.requires_out) {
- out_pipe = new (std::nothrow) pipe_t (session, this,
- options.hwm, options.lwm);
- zmq_assert (out_pipe);
- }
+ }
- // Attach the pipes to the socket object.
- attach_pipes (in_pipe ? &in_pipe->reader : NULL,
- out_pipe ? &out_pipe->writer : NULL);
+ // Create outbound pipe, if required.
+ if (options.requires_out) {
+ out_pipe = new (std::nothrow) pipe_t (session, this,
+ options.hwm, options.lwm);
+ zmq_assert (out_pipe);
+ }
- // Attach the pipes to the session object.
- session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
- in_pipe ? &in_pipe->writer : NULL);
+ // Attach the pipes to the socket object.
+ attach_pipes (in_pipe ? &in_pipe->reader : NULL,
+ out_pipe ? &out_pipe->writer : NULL);
+
+ // Attach the pipes to the session object.
+ session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
+ in_pipe ? &in_pipe->writer : NULL);
+ }
// Activate the session.
send_plug (session);
@@ -215,6 +225,8 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "tcp" || addr_type == "ipc") {
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+ // Windows named pipes are not compatible with Winsock API.
+ // There's no UNIX domain socket implementation on OpenVMS.
if (addr_type == "ipc") {
errno = EPROTONOSUPPORT;
return -1;
@@ -254,6 +266,9 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "udp")
udp_encapsulation = true;
+ // At this point we'll create message pipes to the session straight
+ // away. There's no point in delaying it as no concept of 'connect'
+ // exists with PGM anyway.
if (options.requires_out) {
// PGM sender.
@@ -267,7 +282,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
- send_attach (session, pgm_sender);
+ send_attach (session, pgm_sender, blob_t ());
}
else if (options.requires_in) {
@@ -282,7 +297,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
- send_attach (session, pgm_receiver);
+ send_attach (session, pgm_receiver, blob_t ());
}
else
zmq_assert (false);
@@ -454,30 +469,29 @@ bool zmq::socket_base_t::has_out ()
return xhas_out ();
}
-bool zmq::socket_base_t::register_session (const char *name_,
+bool zmq::socket_base_t::register_session (const blob_t &peer_identity_,
session_t *session_)
{
sessions_sync.lock ();
- bool registered =
- named_sessions.insert (std::make_pair (name_, session_)).second;
+ bool registered = named_sessions.insert (
+ std::make_pair (peer_identity_, session_)).second;
sessions_sync.unlock ();
return registered;
}
-void zmq::socket_base_t::unregister_session (const char *name_)
+void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_)
{
sessions_sync.lock ();
- named_sessions_t::iterator it = named_sessions.find (name_);
+ named_sessions_t::iterator it = named_sessions.find (peer_identity_);
zmq_assert (it != named_sessions.end ());
named_sessions.erase (it);
sessions_sync.unlock ();
}
-zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
+zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_)
{
sessions_sync.lock ();
-
- named_sessions_t::iterator it = named_sessions.find (name_);
+ named_sessions_t::iterator it = named_sessions.find (peer_identity_);
if (it == named_sessions.end ()) {
sessions_sync.unlock ();
return NULL;
@@ -565,7 +579,8 @@ void zmq::socket_base_t::process_own (owned_t *object_)
io_objects.insert (object_);
}
-void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_)
+void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
+ const blob_t &peer_identity_)
{
attach_pipes (in_pipe_, out_pipe_);
}