diff options
Diffstat (limited to 'src/socket_base.cpp')
-rw-r--r-- | src/socket_base.cpp | 87 |
1 files changed, 51 insertions, 36 deletions
diff --git a/src/socket_base.cpp b/src/socket_base.cpp index ef563e5..871f9e9 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -141,6 +141,10 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "inproc") { + // TODO: inproc connect is specific with respect to creating pipes + // as there's no 'reconnect' functionality implemented. Once that + // is in place we should follow generic pipe creation algorithm. + // Find the peer socket. socket_base_t *peer = find_endpoint (addr_args.c_str ()); if (!peer) @@ -165,13 +169,13 @@ int zmq::socket_base_t::connect (const char *addr_) // Attach the pipes to this socket object. attach_pipes (in_pipe ? &in_pipe->reader : NULL, - out_pipe ? &out_pipe->writer : NULL); + out_pipe ? &out_pipe->writer : NULL, blob_t ()); // Attach the pipes to the peer socket. Note that peer's seqnum // was incremented in find_endpoint function. The callee is notified // about the fact via the last parameter. send_bind (peer, out_pipe ? &out_pipe->reader : NULL, - in_pipe ? &in_pipe->writer : NULL, false); + in_pipe ? &in_pipe->writer : NULL, options.identity, false); return 0; } @@ -182,31 +186,37 @@ int zmq::socket_base_t::connect (const char *addr_) this, options); zmq_assert (session); - pipe_t *in_pipe = NULL; - pipe_t *out_pipe = NULL; + // If 'immediate connect' feature is required, we'll created the pipes + // to the session straight away. Otherwise, they'll be created by the + // session once the connection is established. + if (options.immediate_connect) { - // Create inbound pipe, if required. - if (options.requires_in) { - in_pipe = new (std::nothrow) pipe_t (this, session, - options.hwm, options.lwm); - zmq_assert (in_pipe); + pipe_t *in_pipe = NULL; + pipe_t *out_pipe = NULL; - } + // Create inbound pipe, if required. + if (options.requires_in) { + in_pipe = new (std::nothrow) pipe_t (this, session, + options.hwm, options.lwm); + zmq_assert (in_pipe); - // Create outbound pipe, if required. - if (options.requires_out) { - out_pipe = new (std::nothrow) pipe_t (session, this, - options.hwm, options.lwm); - zmq_assert (out_pipe); - } + } - // Attach the pipes to the socket object. - attach_pipes (in_pipe ? &in_pipe->reader : NULL, - out_pipe ? &out_pipe->writer : NULL); + // Create outbound pipe, if required. + if (options.requires_out) { + out_pipe = new (std::nothrow) pipe_t (session, this, + options.hwm, options.lwm); + zmq_assert (out_pipe); + } - // Attach the pipes to the session object. - session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, - in_pipe ? &in_pipe->writer : NULL); + // Attach the pipes to the socket object. + attach_pipes (in_pipe ? &in_pipe->reader : NULL, + out_pipe ? &out_pipe->writer : NULL, blob_t ()); + + // Attach the pipes to the session object. + session->attach_pipes (out_pipe ? &out_pipe->reader : NULL, + in_pipe ? &in_pipe->writer : NULL, blob_t ()); + } // Activate the session. send_plug (session); @@ -215,6 +225,8 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "tcp" || addr_type == "ipc") { #if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS + // Windows named pipes are not compatible with Winsock API. + // There's no UNIX domain socket implementation on OpenVMS. if (addr_type == "ipc") { errno = EPROTONOSUPPORT; return -1; @@ -254,6 +266,9 @@ int zmq::socket_base_t::connect (const char *addr_) if (addr_type == "udp") udp_encapsulation = true; + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with PGM anyway. if (options.requires_out) { // PGM sender. @@ -267,7 +282,7 @@ int zmq::socket_base_t::connect (const char *addr_) return -1; } - send_attach (session, pgm_sender); + send_attach (session, pgm_sender, blob_t ()); } else if (options.requires_in) { @@ -282,7 +297,7 @@ int zmq::socket_base_t::connect (const char *addr_) return -1; } - send_attach (session, pgm_receiver); + send_attach (session, pgm_receiver, blob_t ()); } else zmq_assert (false); @@ -456,30 +471,29 @@ bool zmq::socket_base_t::has_out () return xhas_out (); } -bool zmq::socket_base_t::register_session (const char *name_, +bool zmq::socket_base_t::register_session (const blob_t &peer_identity_, session_t *session_) { sessions_sync.lock (); - bool registered = - named_sessions.insert (std::make_pair (name_, session_)).second; + bool registered = named_sessions.insert ( + std::make_pair (peer_identity_, session_)).second; sessions_sync.unlock (); return registered; } -void zmq::socket_base_t::unregister_session (const char *name_) +void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_) { sessions_sync.lock (); - named_sessions_t::iterator it = named_sessions.find (name_); + named_sessions_t::iterator it = named_sessions.find (peer_identity_); zmq_assert (it != named_sessions.end ()); named_sessions.erase (it); sessions_sync.unlock (); } -zmq::session_t *zmq::socket_base_t::find_session (const char *name_) +zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_) { sessions_sync.lock (); - - named_sessions_t::iterator it = named_sessions.find (name_); + named_sessions_t::iterator it = named_sessions.find (peer_identity_); if (it == named_sessions.end ()) { sessions_sync.unlock (); return NULL; @@ -541,13 +555,13 @@ void zmq::socket_base_t::revive (reader_t *pipe_) } void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, - class writer_t *outpipe_) + class writer_t *outpipe_, const blob_t &peer_identity_) { if (inpipe_) inpipe_->set_endpoint (this); if (outpipe_) outpipe_->set_endpoint (this); - xattach_pipes (inpipe_, outpipe_); + xattach_pipes (inpipe_, outpipe_, peer_identity_); } void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_) @@ -567,9 +581,10 @@ void zmq::socket_base_t::process_own (owned_t *object_) io_objects.insert (object_); } -void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_) +void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, + const blob_t &peer_identity_) { - attach_pipes (in_pipe_, out_pipe_); + attach_pipes (in_pipe_, out_pipe_, peer_identity_); } void zmq::socket_base_t::process_term_req (owned_t *object_) |