From 2bb57ac57ace37203c505ff17147210feca34d73 Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Fri, 15 Jan 2010 14:11:39 +0100 Subject: ZMQII-39: Implement IPC transport --- man/Makefile.am | 2 +- man/convert2html.sh | 1 + man/convert2pdf.sh | 2 + man/man7/zmq.7 | 3 + man/man7/zmq_inproc.7 | 3 +- man/man7/zmq_ipc.7 | 34 +++++++++++ man/man7/zmq_pgm.7 | 1 + man/man7/zmq_tcp.7 | 1 + man/man7/zmq_udp.7 | 1 + src/ip.cpp | 17 ++++++ src/ip.hpp | 9 +++ src/socket_base.cpp | 23 +++++-- src/tcp_connecter.cpp | 117 +++++++++++++++++++++++------------ src/tcp_connecter.hpp | 4 +- src/tcp_listener.cpp | 166 +++++++++++++++++++++++++++++++++++--------------- src/tcp_listener.hpp | 10 ++- src/zmq_connecter.cpp | 10 +-- src/zmq_connecter.hpp | 7 ++- src/zmq_engine.cpp | 9 ++- src/zmq_engine.hpp | 6 +- src/zmq_init.cpp | 4 +- src/zmq_init.hpp | 3 +- src/zmq_listener.cpp | 6 +- src/zmq_listener.hpp | 4 +- 24 files changed, 323 insertions(+), 120 deletions(-) create mode 100644 man/man7/zmq_ipc.7 diff --git a/man/Makefile.am b/man/Makefile.am index a292082..26867ab 100644 --- a/man/Makefile.am +++ b/man/Makefile.am @@ -7,7 +7,7 @@ dist_man_MANS = man1/zmq_forwarder.1 man1/zmq_streamer.1 man1/zmq_queue.1 \ man3/zmq_msg_data.3 man3/zmq_msg_size.3 man3/zmq_strerror.3 \ man7/zmq.7 man7/zmq_cpp.7 man7/zmq_python.7 man7/zmq_ruby.7 \ man7/zmq_cl.7 man7/zmq_tcp.7 man7/zmq_udp.7 man7/zmq_pgm.7 \ - man7/zmq_inproc.7 + man7/zmq_inproc.7 man7/zmq_ipc.7 distclean-local: -rm *.pdf diff --git a/man/convert2html.sh b/man/convert2html.sh index 26f443f..b92e8f6 100644 --- a/man/convert2html.sh +++ b/man/convert2html.sh @@ -51,4 +51,5 @@ groff -man -Thtml man7/zmq_tcp.7 > man7/zmq_tcp.7.html groff -man -Thtml man7/zmq_udp.7 > man7/zmq_udp.7.html groff -man -Thtml man7/zmq_pgm.7 > man7/zmq_pgm.7.html groff -man -Thtml man7/zmq_inproc.7 > man7/zmq_inproc.7.html +groff -man -Thtml man7/zmq_ipc.7 > man7/zmq_ipc.7.html diff --git a/man/convert2pdf.sh b/man/convert2pdf.sh index f8ab00c..dcd2901 100755 --- a/man/convert2pdf.sh +++ b/man/convert2pdf.sh @@ -83,4 +83,6 @@ groff -man -Tps man7/zmq_pgm.7 > man7/zmq_pgm.7.ps ps2pdf man7/zmq_pgm.7.ps zmq_pgm.pdf groff -man -Tps man7/zmq_inproc.7 > man7/zmq_inproc.7.ps ps2pdf man7/zmq_inproc.7.ps zmq_inproc.pdf +groff -man -Tps man7/zmq_ipc.7 > man7/zmq_ipc.7.ps +ps2pdf man7/zmq_ipc.7.ps zmq_ipc.pdf diff --git a/man/man7/zmq.7 b/man/man7/zmq.7 index 740726c..61543a7 100644 --- a/man/man7/zmq.7 +++ b/man/man7/zmq.7 @@ -112,6 +112,9 @@ UDP reliable multicast transport: PGM reliable multicast transport: .BR zmq_pgm(7) +Inter-process transport: +.BR zmq_ipc (7) + In-process (inter-thread) transport: .BR zmq_inproc(7) diff --git a/man/man7/zmq_inproc.7 b/man/man7/zmq_inproc.7 index bc09f90..0c84641 100644 --- a/man/man7/zmq_inproc.7 +++ b/man/man7/zmq_inproc.7 @@ -3,7 +3,7 @@ In-process (inter-thread) tranport for 0MQ .SH SYNOPSIS -In-process transport is optimised for passing messages betweem threads in the +In-process transport is optimised for passing messages between threads in the same process. Messages are passed directly from one application thread to @@ -32,6 +32,7 @@ wire format specification. .SH "SEE ALSO" +.BR zmq_ipc (7) .BR zmq_tcp (7) .BR zmq_udp (7) .BR zmq_pgm (7) diff --git a/man/man7/zmq_ipc.7 b/man/man7/zmq_ipc.7 new file mode 100644 index 0000000..28e57d0 --- /dev/null +++ b/man/man7/zmq_ipc.7 @@ -0,0 +1,34 @@ +.TH zmq_ipc 7 "" "(c)2007-2010 iMatix Corporation" "0MQ User Manuals" +.SH NAME +Inter-process tranport for 0MQ +.SH SYNOPSIS + +In-process transport is optimised for passing messages between processes on the +same physical machine. + +.SH CONNECTION STRING + +Connection string for inproc transport is "inproc://" followed by a file name. +The file will be used as placeholder for a message endpoint. (UNIX domain +sockets associate a file with the listening socket in a similar way.) + +.nf + ipc:///tmp/my_ipc_endpoint + ipc:///tmp/prices.ipc +.fi + +.SH WIRE FORMAT + +IPC transport doesn't transfer messages across the network thus there is no need +for a wire format specification. + +.SH "SEE ALSO" + +.BR zmq_inproc (7) +.BR zmq_tcp (7) +.BR zmq_udp (7) +.BR zmq_pgm (7) + +.SH AUTHOR +Martin Sustrik + diff --git a/man/man7/zmq_pgm.7 b/man/man7/zmq_pgm.7 index 68af978..39f639b 100644 --- a/man/man7/zmq_pgm.7 +++ b/man/man7/zmq_pgm.7 @@ -80,6 +80,7 @@ Following example shows how messages are arranged in subsequent packets: .BR zmq_udp (7) .BR zmq_tcp (7) +.BR zmq_ipc (7) .BR zmq_inproc (7) .BR zmq_setsockopt (3) diff --git a/man/man7/zmq_tcp.7 b/man/man7/zmq_tcp.7 index f5504c8..41a116b 100644 --- a/man/man7/zmq_tcp.7 +++ b/man/man7/zmq_tcp.7 @@ -72,6 +72,7 @@ Binary layout of a larger message: .BR zmq_udp (7) .BR zmq_pgm (7) +.BR zmq_ipc (7) .BR zmq_inproc (7) .SH AUTHOR diff --git a/man/man7/zmq_udp.7 b/man/man7/zmq_udp.7 index 151a6d4..d0bf46c 100644 --- a/man/man7/zmq_udp.7 +++ b/man/man7/zmq_udp.7 @@ -37,6 +37,7 @@ Same as with PGM transport except for UDP packet headers. .BR zmq_pgm (7) .BR zmq_tcp (7) +.BR zmq_ipc (7) .BR zmq_inproc (7) .SH AUTHOR diff --git a/src/ip.cpp b/src/ip.cpp index 50af2ce..d5bb05d 100644 --- a/src/ip.cpp +++ b/src/ip.cpp @@ -309,3 +309,20 @@ int zmq::resolve_ip_hostname (sockaddr_in *addr_, const char *hostname_) return 0; } + +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS + +int zmq::resolve_local_path (sockaddr_un *addr_, const char *path_) +{ + if (strlen (path_) >= sizeof (addr_->sun_path)) + { + errno = ENAMETOOLONG; + return -1; + } + strcpy (addr_->sun_path, path_); + addr_->sun_family = AF_LOCAL; + return 0; +} + +#endif + diff --git a/src/ip.hpp b/src/ip.hpp index 8a0a34f..16c1f62 100644 --- a/src/ip.hpp +++ b/src/ip.hpp @@ -32,6 +32,10 @@ #include #endif +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#include +#endif + namespace zmq { @@ -42,6 +46,11 @@ namespace zmq // This function resolves a string in : format. // Hostname can be either the name of the host or its IP address. int resolve_ip_hostname (sockaddr_in *addr_, const char *hostname_); + +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS + // This function sets up the sockaddr_un structure with the pathname_ + int resolve_local_path( sockaddr_un * addr_, const char* pathname_); +#endif } #endif diff --git a/src/socket_base.cpp b/src/socket_base.cpp index ad68fdb..d35c8c0 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -87,11 +87,19 @@ int zmq::socket_base_t::bind (const char *addr_) if (addr_type == "inproc") return register_endpoint (addr_args.c_str (), this); - if (addr_type == "tcp") { + if (addr_type == "tcp" || addr_type == "ipc") { + +#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS + if (addr_type == "ipc") { + errno = EPROTONOSUPPORT; + return -1; + } +#endif + zmq_listener_t *listener = new (std::nothrow) zmq_listener_t ( choose_io_thread (options.affinity), this, options); zmq_assert (listener); - int rc = listener->set_address (addr_args.c_str ()); + int rc = listener->set_address (addr_type.c_str(), addr_args.c_str ()); if (rc != 0) return -1; @@ -202,7 +210,14 @@ int zmq::socket_base_t::connect (const char *addr_) send_plug (session); send_own (this, session); - if (addr_type == "tcp") { + if (addr_type == "tcp" || addr_type == "ipc") { + +#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS + if (addr_type == "ipc") { + errno = EPROTONOSUPPORT; + return -1; + } +#endif // Create the connecter object. Supply it with the session name // so that it can bind the new connection to the session once @@ -211,7 +226,7 @@ int zmq::socket_base_t::connect (const char *addr_) choose_io_thread (options.affinity), this, options, session->get_ordinal (), false); zmq_assert (connecter); - int rc = connecter->set_address (addr_args.c_str ()); + int rc = connecter->set_address (addr_type.c_str(), addr_args.c_str ()); if (rc != 0) { delete connecter; return -1; diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 5dcebba..aea8a12 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -17,6 +17,8 @@ along with this program. If not, see . */ +#include + #include #include "tcp_connecter.hpp" @@ -38,10 +40,13 @@ zmq::tcp_connecter_t::~tcp_connecter_t () close (); } -int zmq::tcp_connecter_t::set_address (const char *addr_) +int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_) { - // Convert the hostname into sockaddr_in structure. - return resolve_ip_hostname (&addr, addr_); + if (strcmp (protocol_, "tcp") == 0) + return resolve_ip_hostname ((sockaddr_in*) &addr, addr_); + + errno = EPROTONOSUPPORT; + return -1; } int zmq::tcp_connecter_t::open () @@ -67,7 +72,7 @@ int zmq::tcp_connecter_t::open () wsa_assert (rc != SOCKET_ERROR); // Connect to the remote peer. - rc = ::connect (s, (sockaddr*) &addr, sizeof addr); + rc = ::connect (s, (sockaddr*) &addr, sizeof (sockaddr_in)); // Connect was successfull immediately. if (rc == 0) @@ -143,58 +148,94 @@ zmq::tcp_connecter_t::~tcp_connecter_t () close (); } -int zmq::tcp_connecter_t::set_address (const char *addr_) +int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_) { - // Convert the hostname into sockaddr_in structure. - return resolve_ip_hostname (&addr, addr_); + if (strcmp (protocol_, "tcp") == 0) + return resolve_ip_hostname ((struct sockaddr_in*)&addr, addr_); + else if (strcmp (protocol_, "ipc") == 0) + return resolve_local_path (( struct sockaddr_un*)&addr, addr_); + + errno = EPROTONOSUPPORT; + return -1; } int zmq::tcp_connecter_t::open () { zmq_assert (s == retired_fd); + struct sockaddr *sa = (struct sockaddr*) &addr; - // Create the socket. - s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); - if (s == -1) - return -1; + if (AF_INET == sa->sa_family) { - // Set to non-blocking mode. - int flags = fcntl (s, F_GETFL, 0); - if (flags == -1) - flags = 0; - int rc = fcntl (s, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc != -1); + // Create the socket. + s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (s == -1) + return -1; - // Disable Nagle's algorithm. - int flag = 1; - rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof (int)); - errno_assert (rc == 0); + // Set to non-blocking mode. + int flags = fcntl (s, F_GETFL, 0); + if (flags == -1) + flags = 0; + int rc = fcntl (s, F_SETFL, flags | O_NONBLOCK); + errno_assert (rc != -1); + + // Disable Nagle's algorithm. + int flag = 1; + rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof (int)); + errno_assert (rc == 0); #ifdef ZMQ_HAVE_OPENVMS - // Disable delayed acknowledgements. - flag = 1; - rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, sizeof (int)); - errno_assert (rc != SOCKET_ERROR); + // Disable delayed acknowledgements. + flag = 1; + rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, sizeof (int)); + errno_assert (rc != SOCKET_ERROR); #endif - // Connect to the remote peer. - rc = ::connect (s, (sockaddr*) &addr, sizeof (addr)); + // Connect to the remote peer. + rc = ::connect (s, (struct sockaddr*) &addr, sizeof (sockaddr_in)); - // Connect was successfull immediately. - if (rc == 0) - return 0; + // Connect was successfull immediately. + if (rc == 0) + return 0; - // Asynchronous connect was launched. - if (rc == -1 && errno == EINPROGRESS) { - errno = EAGAIN; + // Asynchronous connect was launched. + if (rc == -1 && errno == EINPROGRESS) { + errno = EAGAIN; + return -1; + } + + // Error occured. + int err = errno; + close (); + errno = err; + return -1; + } + else if (AF_LOCAL == sa->sa_family) { + s = socket (AF_LOCAL, SOCK_STREAM, 0); + if (s == -1) + return -1; + + // Set the non-blocking flag. + int flag = fcntl (s, F_GETFL, 0); + if (flag == -1) + flag = 0; + int rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); + errno_assert (rc != -1); + + // Connect to the remote peer. + rc = ::connect (s, (struct sockaddr*) &addr, sizeof (sockaddr_un)); + + // Connect was successfull immediately. + if (rc == 0) + return 0; + + // Error occured. + int err = errno; + close (); + errno = err; return -1; } - // Error occured. - int err = errno; - close (); - errno = err; - return -1; + zmq_assert (false); } int zmq::tcp_connecter_t::close () diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index d397296..cc4434f 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -35,8 +35,8 @@ namespace zmq tcp_connecter_t (); ~tcp_connecter_t (); - // Set IP address/port to connect to. - int set_address (const char *addr_); + // Set address to connect to. + int set_address (const char *protocol, const char *addr_); // Open TCP connecting socket. Address is in // : format. Returns -1 in case of error, diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 9087405..2b30a8e 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -39,10 +39,16 @@ zmq::tcp_listener_t::~tcp_listener_t () close (); } -int zmq::tcp_listener_t::set_address (const char *addr_) +int zmq::tcp_listener_t::set_address (cosnt char *protocol_, const char *addr_) { + // IPC protocol is not supported on Windows platform. + if (strcmp (protocol_, "tcp") != 0 ) { + errno = EPROTONOSUPPORT; + return -1; + } + // Convert the interface into sockaddr_in structure. - int rc = resolve_ip_interface (&addr, addr_); + int rc = resolve_ip_interface ((sockaddr_in*) &addr, addr_); if (rc != 0) return rc; @@ -65,7 +71,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_) wsa_assert (rc != SOCKET_ERROR); // Bind the socket to the network interface and port. - rc = bind (s, (struct sockaddr*) &addr, sizeof (addr)); + rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_in)); if (rc == SOCKET_ERROR) { wsa_error_to_errno (); return -1; @@ -131,6 +137,7 @@ zmq::fd_t zmq::tcp_listener_t::accept () #include #include #include +#include zmq::tcp_listener_t::tcp_listener_t () : s (retired_fd) @@ -144,45 +151,91 @@ zmq::tcp_listener_t::~tcp_listener_t () close (); } -int zmq::tcp_listener_t::set_address (const char *addr_) +int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_) { - // Convert the interface into sockaddr_in structure. - int rc = resolve_ip_interface (&addr, addr_); - if (rc != 0) - return rc; - - // Create a listening socket. - s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); - if (s == -1) - return -1; - - // Allow reusing of the address. - int flag = 1; - rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); - errno_assert (rc == 0); - - // Set the non-blocking flag. - flag = fcntl (s, F_GETFL, 0); - if (flag == -1) - flag = 0; - rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); - errno_assert (rc != -1); - - // Bind the socket to the network interface and port. - rc = bind (s, (struct sockaddr*) &addr, sizeof (addr)); - if (rc != 0) { - close (); - return -1; + if (strcmp (protocol_, "tcp") == 0 ) { + + // Convert the interface into sockaddr_in structure. + int rc = resolve_ip_interface ((struct sockaddr_in*) &addr, addr_); + if (rc != 0) + return -1; + + // Create a listening socket. + s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (s == -1) + return -1; + + // Allow reusing of the address. + int flag = 1; + rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); + errno_assert (rc == 0); + + // Set the non-blocking flag. + flag = fcntl (s, F_GETFL, 0); + if (flag == -1) + flag = 0; + rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); + errno_assert (rc != -1); + + // Bind the socket to the network interface and port. + rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_in)); + if (rc != 0) { + close (); + return -1; + } + + // Listen for incomming connections. + rc = listen (s, tcp_connection_backlog); + if (rc != 0) { + close (); + return -1; + } + + return 0; } - - // Listen for incomming connections. - rc = listen (s, tcp_connection_backlog); - if (rc != 0) { - close (); - return -1; + else if (strcmp (protocol_, "ipc") == 0) { + + // Get rid of the file associated with the UNIX domain socket that + // may have been left behind by the previous run of the application. + ::unlink (addr_); + + // Convert the address into sockaddr_un structure. + int rc = resolve_local_path ((struct sockaddr_un*) &addr, addr_); + if (rc != 0) + return -1; + + // Create a listening socket. + s = socket (AF_LOCAL, SOCK_STREAM, 0); + if (s == -1) + return -1; + + // Set the non-blocking flag. + int flag = fcntl (s, F_GETFL, 0); + if (flag == -1) + flag = 0; + rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); + errno_assert (rc != -1); + + // Bind the socket to the file path. + rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_un)); + if (rc != 0) { + close (); + return -1; + } + + // Listen for incomming connections. + rc = listen (s, tcp_connection_backlog); + if (rc != 0) { + close (); + return -1; + } + + return 0; } - - return 0; + else { + errno = EPROTONOSUPPORT; + return -1; + } } int zmq::tcp_listener_t::close () @@ -192,6 +245,17 @@ int zmq::tcp_listener_t::close () if (rc != 0) return -1; s = retired_fd; + + // If there's an underlying UNIX domain socket, get rid of the file it + // is associated with. + struct sockaddr *sa = (struct sockaddr*) &addr; + if (AF_LOCAL == sa->sa_family) { + struct sockaddr_un *sun = (struct sockaddr_un*) &addr; + rc = ::unlink(sun->sun_path); + if (rc != 0) + return -1; + } + return 0; } @@ -239,19 +303,23 @@ zmq::fd_t zmq::tcp_listener_t::accept () int rc = fcntl (sock, F_SETFL, flags | O_NONBLOCK); errno_assert (rc != -1); - // Disable Nagle's algorithm. - int flag = 1; - rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, - sizeof (int)); - errno_assert (rc == 0); + struct sockaddr *sa = (struct sockaddr*) &addr; + if (AF_INET == sa->sa_family) { + + // Disable Nagle's algorithm. + int flag = 1; + rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, + sizeof (int)); + errno_assert (rc == 0); #ifdef ZMQ_HAVE_OPENVMS - // Disable delayed acknowledgements. - flag = 1; - rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, - sizeof (int)); - errno_assert (rc != SOCKET_ERROR); + // Disable delayed acknowledgements. + flag = 1; + rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, + sizeof (int)); + errno_assert (rc != SOCKET_ERROR); #endif + } return sock; } diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp index 1dfe288..87748d0 100644 --- a/src/tcp_listener.hpp +++ b/src/tcp_listener.hpp @@ -35,10 +35,8 @@ namespace zmq tcp_listener_t (); ~tcp_listener_t (); - // Start listening on the interface. Address is in - // : format. Interface name may be '*' - // to bind to all the interfaces. - int set_address (const char *addr_); + // Start listening on the interface. + int set_address (const char *protocol_, const char *addr_); // Close the listening socket. int close (); @@ -54,8 +52,8 @@ namespace zmq private: - // IP address/port to listen on. - sockaddr_in addr; + // Address to listen on. + sockaddr_storage addr; // Underlying socket. fd_t s; diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index d4c2727..ebd7572 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -41,11 +41,13 @@ zmq::zmq_connecter_t::~zmq_connecter_t () { } -int zmq::zmq_connecter_t::set_address (const char *address_) +int zmq::zmq_connecter_t::set_address (const char *protocol_, + const char *address_) { - int rc = tcp_connecter.set_address (address_); + int rc = tcp_connecter.set_address (protocol_, address_); if (rc != 0) return rc; + protocol = protocol_; address = address_; return 0; } @@ -91,7 +93,8 @@ void zmq::zmq_connecter_t::out_event () // Create an init object. zmq_init_t *init = new (std::nothrow) zmq_init_t ( choose_io_thread (options.affinity), owner, - fd, options, true, address.c_str (), session_ordinal); + fd, options, true, protocol.c_str (), address.c_str (), + session_ordinal); zmq_assert (init); send_plug (init); send_own (owner, init); @@ -128,7 +131,6 @@ void zmq::zmq_connecter_t::start_connecting () } // Handle any other error condition by eventual reconnect. - tcp_connecter.close (); wait = true; add_timer (); } diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp index c3a42a9..328dd6a 100644 --- a/src/zmq_connecter.hpp +++ b/src/zmq_connecter.hpp @@ -39,8 +39,8 @@ namespace zmq const options_t &options_, uint64_t session_ordinal_, bool wait_); ~zmq_connecter_t (); - // Set IP address to connect to. - int set_address (const char *address_); + // Set address to connect to. + int set_address (const char *protocol_, const char *address_); private: @@ -75,7 +75,8 @@ namespace zmq // Associated socket options. options_t options; - // Address to connect to. + // Protocol and address to connect to. + std::string protocol; std::string address; zmq_connecter_t (const zmq_connecter_t&); diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index a79c0bd..dace6ae 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -27,7 +27,8 @@ #include "err.hpp" zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, - const options_t &options_, bool reconnect_, const char *address_) : + const options_t &options_, bool reconnect_, + const char *protocol_, const char *address_) : io_object_t (parent_), inpos (NULL), insize (0), @@ -39,8 +40,10 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, options (options_), reconnect (reconnect_) { - if (reconnect) + if (reconnect) { + protocol = protocol_; address = address_; + } // Initialise the underlying socket. int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf); @@ -166,7 +169,7 @@ void zmq::zmq_engine_t::error () inout->get_io_thread (), inout->get_owner (), options, inout->get_ordinal (), true); zmq_assert (reconnecter); - reconnecter->set_address (address.c_str ()); + reconnecter->set_address (protocol.c_str(), address.c_str ()); } inout->detach (reconnecter); diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index d26e304..ddd0931 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -22,6 +22,8 @@ #include +#include + #include "i_engine.hpp" #include "io_object.hpp" #include "tcp_socket.hpp" @@ -37,7 +39,8 @@ namespace zmq public: zmq_engine_t (class io_thread_t *parent_, fd_t fd_, - const options_t &options_, bool reconnect_, const char *address_); + const options_t &options_, bool reconnect_, + const char *protocol_, const char *address_); ~zmq_engine_t (); // i_engine interface implementation. @@ -70,6 +73,7 @@ namespace zmq options_t options; bool reconnect; + std::string protocol; std::string address; zmq_engine_t (const zmq_engine_t&); diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index c602e1d..fe6aea2 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -25,7 +25,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_, fd_t fd_, const options_t &options_, bool reconnect_, - const char *address_, uint64_t session_ordinal_) : + const char *protocol_, const char *address_, uint64_t session_ordinal_) : owned_t (parent_, owner_), sent (false), received (false), @@ -34,7 +34,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_, { // Create the engine object for this connection. engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, - reconnect_, address_); + reconnect_, protocol_, address_); zmq_assert (engine); } diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp index 414adfe..df14293 100644 --- a/src/zmq_init.hpp +++ b/src/zmq_init.hpp @@ -41,7 +41,8 @@ namespace zmq zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_, fd_t fd_, const options_t &options_, bool reconnect_, - const char *address_, uint64_t session_ordinal_); + const char *protocol_, const char *address_, + uint64_t session_ordinal_); ~zmq_init_t (); private: diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp index 9ccd82b..d7cf292 100644 --- a/src/zmq_listener.cpp +++ b/src/zmq_listener.cpp @@ -36,9 +36,9 @@ zmq::zmq_listener_t::~zmq_listener_t () { } -int zmq::zmq_listener_t::set_address (const char *addr_) +int zmq::zmq_listener_t::set_address (const char *protocol_, const char *addr_) { - return tcp_listener.set_address (addr_); + return tcp_listener.set_address (protocol_, addr_); } void zmq::zmq_listener_t::process_plug () @@ -65,7 +65,7 @@ void zmq::zmq_listener_t::in_event () // Create an init object. io_thread_t *io_thread = choose_io_thread (options.affinity); zmq_init_t *init = new (std::nothrow) zmq_init_t ( - io_thread, owner, fd, options, false, NULL, 0); + io_thread, owner, fd, options, false, NULL, NULL, 0); zmq_assert (init); send_plug (init); send_own (owner, init); diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp index 6f8cdc9..c82a280 100644 --- a/src/zmq_listener.hpp +++ b/src/zmq_listener.hpp @@ -36,8 +36,8 @@ namespace zmq zmq_listener_t (class io_thread_t *parent_, socket_base_t *owner_, const options_t &options_); - // Set IP address to listen on. - int set_address (const char *addr_); + // Set address to listen on. + int set_address (const char* protocol_, const char *addr_); private: -- cgit v1.2.3