diff options
| -rw-r--r-- | man/Makefile.am | 2 | ||||
| -rw-r--r-- | man/convert2html.sh | 1 | ||||
| -rwxr-xr-x | man/convert2pdf.sh | 2 | ||||
| -rw-r--r-- | man/man7/zmq.7 | 3 | ||||
| -rw-r--r-- | man/man7/zmq_inproc.7 | 3 | ||||
| -rw-r--r-- | man/man7/zmq_ipc.7 | 34 | ||||
| -rw-r--r-- | man/man7/zmq_pgm.7 | 1 | ||||
| -rw-r--r-- | man/man7/zmq_tcp.7 | 1 | ||||
| -rw-r--r-- | man/man7/zmq_udp.7 | 1 | ||||
| -rw-r--r-- | src/ip.cpp | 17 | ||||
| -rw-r--r-- | src/ip.hpp | 9 | ||||
| -rw-r--r-- | src/socket_base.cpp | 23 | ||||
| -rw-r--r-- | src/tcp_connecter.cpp | 117 | ||||
| -rw-r--r-- | src/tcp_connecter.hpp | 4 | ||||
| -rw-r--r-- | src/tcp_listener.cpp | 166 | ||||
| -rw-r--r-- | src/tcp_listener.hpp | 10 | ||||
| -rw-r--r-- | src/zmq_connecter.cpp | 10 | ||||
| -rw-r--r-- | src/zmq_connecter.hpp | 7 | ||||
| -rw-r--r-- | src/zmq_engine.cpp | 9 | ||||
| -rw-r--r-- | src/zmq_engine.hpp | 6 | ||||
| -rw-r--r-- | src/zmq_init.cpp | 4 | ||||
| -rw-r--r-- | src/zmq_init.hpp | 3 | ||||
| -rw-r--r-- | src/zmq_listener.cpp | 6 | ||||
| -rw-r--r-- | src/zmq_listener.hpp | 4 | 
24 files changed, 323 insertions, 120 deletions
| diff --git a/man/Makefile.am b/man/Makefile.am index a292082..26867ab 100644 --- a/man/Makefile.am +++ b/man/Makefile.am @@ -7,7 +7,7 @@ dist_man_MANS = man1/zmq_forwarder.1 man1/zmq_streamer.1 man1/zmq_queue.1 \      man3/zmq_msg_data.3 man3/zmq_msg_size.3 man3/zmq_strerror.3 \      man7/zmq.7 man7/zmq_cpp.7 man7/zmq_python.7 man7/zmq_ruby.7 \      man7/zmq_cl.7 man7/zmq_tcp.7 man7/zmq_udp.7 man7/zmq_pgm.7 \ -    man7/zmq_inproc.7 +    man7/zmq_inproc.7 man7/zmq_ipc.7  distclean-local:  		-rm  *.pdf diff --git a/man/convert2html.sh b/man/convert2html.sh index 26f443f..b92e8f6 100644 --- a/man/convert2html.sh +++ b/man/convert2html.sh @@ -51,4 +51,5 @@ groff -man -Thtml man7/zmq_tcp.7 > man7/zmq_tcp.7.html  groff -man -Thtml man7/zmq_udp.7 > man7/zmq_udp.7.html  groff -man -Thtml man7/zmq_pgm.7 > man7/zmq_pgm.7.html  groff -man -Thtml man7/zmq_inproc.7 > man7/zmq_inproc.7.html +groff -man -Thtml man7/zmq_ipc.7 > man7/zmq_ipc.7.html diff --git a/man/convert2pdf.sh b/man/convert2pdf.sh index f8ab00c..dcd2901 100755 --- a/man/convert2pdf.sh +++ b/man/convert2pdf.sh @@ -83,4 +83,6 @@ groff -man -Tps man7/zmq_pgm.7 > man7/zmq_pgm.7.ps  ps2pdf man7/zmq_pgm.7.ps zmq_pgm.pdf  groff -man -Tps man7/zmq_inproc.7 > man7/zmq_inproc.7.ps  ps2pdf man7/zmq_inproc.7.ps zmq_inproc.pdf +groff -man -Tps man7/zmq_ipc.7 > man7/zmq_ipc.7.ps +ps2pdf man7/zmq_ipc.7.ps zmq_ipc.pdf diff --git a/man/man7/zmq.7 b/man/man7/zmq.7 index 740726c..61543a7 100644 --- a/man/man7/zmq.7 +++ b/man/man7/zmq.7 @@ -112,6 +112,9 @@ UDP reliable multicast transport:  PGM reliable multicast transport:  .BR zmq_pgm(7) +Inter-process transport: +.BR zmq_ipc (7) +  In-process (inter-thread) transport:  .BR zmq_inproc(7) diff --git a/man/man7/zmq_inproc.7 b/man/man7/zmq_inproc.7 index bc09f90..0c84641 100644 --- a/man/man7/zmq_inproc.7 +++ b/man/man7/zmq_inproc.7 @@ -3,7 +3,7 @@  In-process (inter-thread) tranport for 0MQ  .SH SYNOPSIS -In-process transport is optimised for passing messages betweem threads in the +In-process transport is optimised for passing messages between threads in the  same process.  Messages are passed directly from one application thread to @@ -32,6 +32,7 @@ wire format specification.  .SH "SEE ALSO" +.BR zmq_ipc (7)  .BR zmq_tcp (7)  .BR zmq_udp (7)  .BR zmq_pgm (7) diff --git a/man/man7/zmq_ipc.7 b/man/man7/zmq_ipc.7 new file mode 100644 index 0000000..28e57d0 --- /dev/null +++ b/man/man7/zmq_ipc.7 @@ -0,0 +1,34 @@ +.TH zmq_ipc 7 "" "(c)2007-2010 iMatix Corporation" "0MQ User Manuals" +.SH NAME +Inter-process tranport for 0MQ +.SH SYNOPSIS + +In-process transport is optimised for passing messages between processes on the +same physical machine. + +.SH CONNECTION STRING + +Connection string for inproc transport is "inproc://" followed by a file name. +The file will be used as placeholder for a message endpoint. (UNIX domain +sockets associate a file with the listening socket in a similar way.) + +.nf +    ipc:///tmp/my_ipc_endpoint +    ipc:///tmp/prices.ipc +.fi + +.SH WIRE FORMAT + +IPC transport doesn't transfer messages across the network thus there is no need +for a wire format specification. + +.SH "SEE ALSO" + +.BR zmq_inproc (7) +.BR zmq_tcp (7) +.BR zmq_udp (7) +.BR zmq_pgm (7) + +.SH AUTHOR +Martin Sustrik <sustrik at 250bpm dot com> + diff --git a/man/man7/zmq_pgm.7 b/man/man7/zmq_pgm.7 index 68af978..39f639b 100644 --- a/man/man7/zmq_pgm.7 +++ b/man/man7/zmq_pgm.7 @@ -80,6 +80,7 @@ Following example shows how messages are arranged in subsequent packets:  .BR zmq_udp (7)  .BR zmq_tcp (7) +.BR zmq_ipc (7)  .BR zmq_inproc (7)  .BR zmq_setsockopt (3) diff --git a/man/man7/zmq_tcp.7 b/man/man7/zmq_tcp.7 index f5504c8..41a116b 100644 --- a/man/man7/zmq_tcp.7 +++ b/man/man7/zmq_tcp.7 @@ -72,6 +72,7 @@ Binary layout of a larger message:  .BR zmq_udp (7)  .BR zmq_pgm (7) +.BR zmq_ipc (7)  .BR zmq_inproc (7)  .SH AUTHOR diff --git a/man/man7/zmq_udp.7 b/man/man7/zmq_udp.7 index 151a6d4..d0bf46c 100644 --- a/man/man7/zmq_udp.7 +++ b/man/man7/zmq_udp.7 @@ -37,6 +37,7 @@ Same as with PGM transport except for UDP packet headers.  .BR zmq_pgm (7)  .BR zmq_tcp (7) +.BR zmq_ipc (7)  .BR zmq_inproc (7)  .SH AUTHOR @@ -309,3 +309,20 @@ int zmq::resolve_ip_hostname (sockaddr_in *addr_, const char *hostname_)      return 0;  } + +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS + +int zmq::resolve_local_path (sockaddr_un *addr_, const char *path_) +{ +    if (strlen (path_) >= sizeof (addr_->sun_path)) +    { +        errno = ENAMETOOLONG; +        return -1; +    } +    strcpy (addr_->sun_path, path_); +    addr_->sun_family = AF_LOCAL; +    return 0; +} + +#endif + @@ -32,6 +32,10 @@  #include <netdb.h>  #endif +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +#include <sys/un.h> +#endif +  namespace zmq  { @@ -42,6 +46,11 @@ namespace zmq      //  This function resolves a string in <hostname>:<port-number> format.      //  Hostname can be either the name of the host or its IP address.      int resolve_ip_hostname (sockaddr_in *addr_, const char *hostname_); + +#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS +    // This function sets up the sockaddr_un structure with the pathname_ +    int resolve_local_path( sockaddr_un * addr_, const char* pathname_); +#endif  }  #endif  diff --git a/src/socket_base.cpp b/src/socket_base.cpp index ad68fdb..d35c8c0 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -87,11 +87,19 @@ int zmq::socket_base_t::bind (const char *addr_)      if (addr_type == "inproc")          return register_endpoint (addr_args.c_str (), this); -    if (addr_type == "tcp") { +    if (addr_type == "tcp" || addr_type == "ipc") { + +#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS +        if (addr_type == "ipc") { +            errno = EPROTONOSUPPORT; +            return -1; +        } +#endif +          zmq_listener_t *listener = new (std::nothrow) zmq_listener_t (              choose_io_thread (options.affinity), this, options);          zmq_assert (listener); -        int rc = listener->set_address (addr_args.c_str ()); +        int rc = listener->set_address (addr_type.c_str(), addr_args.c_str ());          if (rc != 0)              return -1; @@ -202,7 +210,14 @@ int zmq::socket_base_t::connect (const char *addr_)      send_plug (session);      send_own (this, session); -    if (addr_type == "tcp") { +    if (addr_type == "tcp" || addr_type == "ipc") { + +#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS +        if (addr_type == "ipc") { +            errno = EPROTONOSUPPORT; +            return -1; +        } +#endif          //  Create the connecter object. Supply it with the session name          //  so that it can bind the new connection to the session once @@ -211,7 +226,7 @@ int zmq::socket_base_t::connect (const char *addr_)              choose_io_thread (options.affinity), this, options,              session->get_ordinal (), false);          zmq_assert (connecter); -        int rc = connecter->set_address (addr_args.c_str ()); +        int rc = connecter->set_address (addr_type.c_str(), addr_args.c_str ());          if (rc != 0) {              delete connecter;              return -1; diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 5dcebba..aea8a12 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -17,6 +17,8 @@      along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ +#include <string.h> +  #include <string>  #include "tcp_connecter.hpp" @@ -38,10 +40,13 @@ zmq::tcp_connecter_t::~tcp_connecter_t ()          close ();  } -int zmq::tcp_connecter_t::set_address (const char *addr_) +int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_)  { -    //  Convert the hostname into sockaddr_in structure. -    return resolve_ip_hostname (&addr, addr_); +    if (strcmp (protocol_, "tcp") == 0) +        return resolve_ip_hostname ((sockaddr_in*) &addr, addr_); + +    errno = EPROTONOSUPPORT; +    return -1;      }  int zmq::tcp_connecter_t::open () @@ -67,7 +72,7 @@ int zmq::tcp_connecter_t::open ()      wsa_assert (rc != SOCKET_ERROR);      //  Connect to the remote peer. -    rc = ::connect (s, (sockaddr*) &addr, sizeof addr); +    rc = ::connect (s, (sockaddr*) &addr, sizeof (sockaddr_in));      //  Connect was successfull immediately.      if (rc == 0) @@ -143,58 +148,94 @@ zmq::tcp_connecter_t::~tcp_connecter_t ()          close ();  } -int zmq::tcp_connecter_t::set_address (const char *addr_) +int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_)  { -    //  Convert the hostname into sockaddr_in structure. -    return resolve_ip_hostname (&addr, addr_); +    if (strcmp (protocol_, "tcp") == 0) +        return resolve_ip_hostname ((struct sockaddr_in*)&addr, addr_); +    else if (strcmp (protocol_, "ipc") == 0) +        return resolve_local_path (( struct sockaddr_un*)&addr, addr_); + +    errno = EPROTONOSUPPORT; +    return -1;  }  int zmq::tcp_connecter_t::open ()  {      zmq_assert (s == retired_fd); +    struct sockaddr *sa = (struct sockaddr*) &addr; -    //  Create the socket. -    s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); -    if (s == -1) -        return -1; +    if (AF_INET == sa->sa_family) { -    // Set to non-blocking mode. -    int flags = fcntl (s, F_GETFL, 0); -    if (flags == -1)  -        flags = 0; -    int rc = fcntl (s, F_SETFL, flags | O_NONBLOCK); -    errno_assert (rc != -1); +        //  Create the socket. +        s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); +        if (s == -1) +            return -1; -    //  Disable Nagle's algorithm. -    int flag = 1; -    rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof (int)); -    errno_assert (rc == 0); +        // Set to non-blocking mode. +        int flags = fcntl (s, F_GETFL, 0); +        if (flags == -1)  +            flags = 0; +        int rc = fcntl (s, F_SETFL, flags | O_NONBLOCK); +        errno_assert (rc != -1); + +        //  Disable Nagle's algorithm. +        int flag = 1; +        rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof (int)); +        errno_assert (rc == 0);  #ifdef ZMQ_HAVE_OPENVMS -    //  Disable delayed acknowledgements. -    flag = 1; -    rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, sizeof (int)); -    errno_assert (rc != SOCKET_ERROR); +        //  Disable delayed acknowledgements. +        flag = 1; +        rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, sizeof (int)); +        errno_assert (rc != SOCKET_ERROR);  #endif -    //  Connect to the remote peer. -    rc = ::connect (s, (sockaddr*) &addr, sizeof (addr)); +        //  Connect to the remote peer. +        rc = ::connect (s, (struct sockaddr*) &addr, sizeof (sockaddr_in)); -    //  Connect was successfull immediately. -    if (rc == 0) -        return 0; +        //  Connect was successfull immediately. +        if (rc == 0) +            return 0; -    //  Asynchronous connect was launched. -    if (rc == -1 && errno == EINPROGRESS) { -        errno = EAGAIN; +        //  Asynchronous connect was launched. +        if (rc == -1 && errno == EINPROGRESS) { +            errno = EAGAIN; +            return -1; +        } + +        //  Error occured. +        int err = errno; +        close (); +        errno = err; +        return -1; +    } +    else if (AF_LOCAL == sa->sa_family) { +        s = socket (AF_LOCAL, SOCK_STREAM, 0); +        if (s == -1) +            return -1; + +        //  Set the non-blocking flag. +        int flag = fcntl (s, F_GETFL, 0); +        if (flag == -1)  +            flag = 0; +        int rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); +        errno_assert (rc != -1); + +        //  Connect to the remote peer. +        rc = ::connect (s, (struct sockaddr*) &addr, sizeof (sockaddr_un)); + +        //  Connect was successfull immediately. +        if (rc == 0) +            return 0; + +        //  Error occured. +        int err = errno; +        close (); +        errno = err;          return -1;      } -    //  Error occured. -    int err = errno; -    close (); -    errno = err; -    return -1; +    zmq_assert (false);  }  int zmq::tcp_connecter_t::close () diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index d397296..cc4434f 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -35,8 +35,8 @@ namespace zmq          tcp_connecter_t ();          ~tcp_connecter_t (); -        //  Set IP address/port to connect to. -        int set_address (const char *addr_); +        //  Set address to connect to. +        int set_address (const char *protocol, const char *addr_);          //  Open TCP connecting socket. Address is in          //  <hostname>:<port-number> format. Returns -1 in case of error, diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 9087405..2b30a8e 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -39,10 +39,16 @@ zmq::tcp_listener_t::~tcp_listener_t ()          close ();  } -int zmq::tcp_listener_t::set_address (const char *addr_) +int zmq::tcp_listener_t::set_address (cosnt char *protocol_, const char *addr_)  { +    //  IPC protocol is not supported on Windows platform. +    if (strcmp (protocol_, "tcp") != 0 ) { +        errno = EPROTONOSUPPORT; +        return -1; +    } +      //  Convert the interface into sockaddr_in structure. -    int rc = resolve_ip_interface (&addr, addr_); +    int rc = resolve_ip_interface ((sockaddr_in*) &addr, addr_);      if (rc != 0)          return rc; @@ -65,7 +71,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)      wsa_assert (rc != SOCKET_ERROR);      //  Bind the socket to the network interface and port. -    rc = bind (s, (struct sockaddr*) &addr, sizeof (addr)); +    rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_in));      if (rc == SOCKET_ERROR) {          wsa_error_to_errno ();          return -1; @@ -131,6 +137,7 @@ zmq::fd_t zmq::tcp_listener_t::accept ()  #include <netinet/in.h>  #include <netdb.h>  #include <fcntl.h> +#include <sys/un.h>  zmq::tcp_listener_t::tcp_listener_t () :      s (retired_fd) @@ -144,45 +151,91 @@ zmq::tcp_listener_t::~tcp_listener_t ()          close ();  } -int zmq::tcp_listener_t::set_address (const char *addr_) +int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_)  { -    //  Convert the interface into sockaddr_in structure. -    int rc = resolve_ip_interface (&addr, addr_); -    if (rc != 0) -        return rc; - -    //  Create a listening socket. -    s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); -    if (s == -1) -        return -1; - -    //  Allow reusing of the address. -    int flag = 1; -    rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); -    errno_assert (rc == 0); - -    //  Set the non-blocking flag. -    flag = fcntl (s, F_GETFL, 0); -    if (flag == -1)  -        flag = 0; -    rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); -    errno_assert (rc != -1); - -    //  Bind the socket to the network interface and port. -    rc = bind (s, (struct sockaddr*) &addr, sizeof (addr)); -    if (rc != 0) { -        close (); -        return -1; +    if (strcmp (protocol_, "tcp") == 0 ) { + +        //  Convert the interface into sockaddr_in structure. +        int rc = resolve_ip_interface ((struct sockaddr_in*) &addr, addr_); +        if (rc != 0) +            return -1; + +        //  Create a listening socket. +        s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); +        if (s == -1) +            return -1; + +        //  Allow reusing of the address. +        int flag = 1; +        rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); +        errno_assert (rc == 0); + +        //  Set the non-blocking flag. +        flag = fcntl (s, F_GETFL, 0); +        if (flag == -1)  +            flag = 0; +        rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); +        errno_assert (rc != -1); + +        //  Bind the socket to the network interface and port. +        rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_in)); +        if (rc != 0) { +            close (); +            return -1; +        } + +        //  Listen for incomming connections. +        rc = listen (s, tcp_connection_backlog); +        if (rc != 0) { +            close (); +            return -1; +        } + +        return 0;      } -               -    //  Listen for incomming connections. -    rc = listen (s, tcp_connection_backlog); -    if (rc != 0) { -        close (); -        return -1; +    else if (strcmp (protocol_, "ipc") == 0) { + +        //  Get rid of the file associated with the UNIX domain socket that +        //  may have been left behind by the previous run of the application. +        ::unlink (addr_); + +        //  Convert the address into sockaddr_un structure. +        int rc = resolve_local_path ((struct sockaddr_un*) &addr, addr_); +        if (rc != 0) +            return -1; + +        //  Create a listening socket. +        s = socket (AF_LOCAL, SOCK_STREAM, 0); +        if (s == -1) +            return -1; + +        //  Set the non-blocking flag. +        int flag = fcntl (s, F_GETFL, 0); +        if (flag == -1)  +            flag = 0; +        rc = fcntl (s, F_SETFL, flag | O_NONBLOCK); +        errno_assert (rc != -1); + +        //  Bind the socket to the file path. +        rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_un)); +        if (rc != 0) { +            close (); +            return -1; +        } + +        //  Listen for incomming connections. +        rc = listen (s, tcp_connection_backlog); +        if (rc != 0) { +            close (); +            return -1; +        } + +        return 0;      } - -    return 0; +    else { +        errno = EPROTONOSUPPORT; +        return -1; +    }      }  int zmq::tcp_listener_t::close () @@ -192,6 +245,17 @@ int zmq::tcp_listener_t::close ()      if (rc != 0)          return -1;      s = retired_fd; + +    //  If there's an underlying UNIX domain socket, get rid of the file it +    //  is associated with. +    struct sockaddr *sa = (struct sockaddr*) &addr; +    if (AF_LOCAL == sa->sa_family) { +        struct sockaddr_un *sun = (struct sockaddr_un*) &addr; +        rc = ::unlink(sun->sun_path); +        if (rc != 0) +            return -1; +    } +      return 0;  } @@ -239,19 +303,23 @@ zmq::fd_t zmq::tcp_listener_t::accept ()      int rc = fcntl (sock, F_SETFL, flags | O_NONBLOCK);      errno_assert (rc != -1); -    //  Disable Nagle's algorithm. -    int flag = 1; -    rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, -        sizeof (int)); -    errno_assert (rc == 0); +    struct sockaddr *sa = (struct sockaddr*) &addr; +    if (AF_INET == sa->sa_family) { + +        //  Disable Nagle's algorithm. +        int flag = 1; +        rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, +            sizeof (int)); +        errno_assert (rc == 0);  #ifdef ZMQ_HAVE_OPENVMS -    //  Disable delayed acknowledgements. -    flag = 1; -    rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, -        sizeof (int)); -    errno_assert (rc != SOCKET_ERROR); +        //  Disable delayed acknowledgements. +        flag = 1; +        rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, +            sizeof (int)); +        errno_assert (rc != SOCKET_ERROR);  #endif +    }      return sock;  } diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp index 1dfe288..87748d0 100644 --- a/src/tcp_listener.hpp +++ b/src/tcp_listener.hpp @@ -35,10 +35,8 @@ namespace zmq          tcp_listener_t ();          ~tcp_listener_t (); -        //  Start listening on the interface. Address is in -        //  <interface-name>:<port-number> format. Interface name may be '*' -        //  to bind to all the interfaces. -        int set_address (const char *addr_); +        //  Start listening on the interface. +        int set_address (const char *protocol_, const char *addr_);          //  Close the listening socket.          int close (); @@ -54,8 +52,8 @@ namespace zmq      private: -        //  IP address/port to listen on. -        sockaddr_in addr; +        //  Address to listen on. +        sockaddr_storage addr;          //  Underlying socket.          fd_t s; diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp index d4c2727..ebd7572 100644 --- a/src/zmq_connecter.cpp +++ b/src/zmq_connecter.cpp @@ -41,11 +41,13 @@ zmq::zmq_connecter_t::~zmq_connecter_t ()  {  } -int zmq::zmq_connecter_t::set_address (const char *address_) +int zmq::zmq_connecter_t::set_address (const char *protocol_, +    const char *address_)  { -     int rc = tcp_connecter.set_address (address_); +     int rc = tcp_connecter.set_address (protocol_, address_);       if (rc != 0)           return rc; +     protocol = protocol_;       address = address_;       return 0;  } @@ -91,7 +93,8 @@ void zmq::zmq_connecter_t::out_event ()      //  Create an init object.       zmq_init_t *init = new (std::nothrow) zmq_init_t (          choose_io_thread (options.affinity), owner, -        fd, options, true, address.c_str (), session_ordinal); +        fd, options, true, protocol.c_str (), address.c_str (), +        session_ordinal);      zmq_assert (init);      send_plug (init);      send_own (owner, init); @@ -128,7 +131,6 @@ void zmq::zmq_connecter_t::start_connecting ()      }      //  Handle any other error condition by eventual reconnect. -    tcp_connecter.close ();      wait = true;      add_timer ();  } diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp index c3a42a9..328dd6a 100644 --- a/src/zmq_connecter.hpp +++ b/src/zmq_connecter.hpp @@ -39,8 +39,8 @@ namespace zmq              const options_t &options_, uint64_t session_ordinal_, bool wait_);          ~zmq_connecter_t (); -        //  Set IP address to connect to. -        int set_address (const char *address_); +        //  Set address to connect to. +        int set_address (const char *protocol_, const char *address_);      private: @@ -75,7 +75,8 @@ namespace zmq          //  Associated socket options.          options_t options; -        //  Address to connect to. +        //  Protocol and address to connect to. +        std::string protocol;          std::string address;          zmq_connecter_t (const zmq_connecter_t&); diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index a79c0bd..dace6ae 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -27,7 +27,8 @@  #include "err.hpp"  zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_, -      const options_t &options_, bool reconnect_, const char *address_) : +      const options_t &options_, bool reconnect_,  +      const char *protocol_, const char *address_) :      io_object_t (parent_),      inpos (NULL),      insize (0), @@ -39,8 +40,10 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,      options (options_),      reconnect (reconnect_)  { -    if (reconnect) +    if (reconnect) { +        protocol = protocol_;          address = address_; +    }      //  Initialise the underlying socket.      int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf); @@ -166,7 +169,7 @@ void zmq::zmq_engine_t::error ()              inout->get_io_thread (), inout->get_owner (),              options, inout->get_ordinal (), true);          zmq_assert (reconnecter); -        reconnecter->set_address (address.c_str ()); +        reconnecter->set_address (protocol.c_str(), address.c_str ());      }      inout->detach (reconnecter); diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index d26e304..ddd0931 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -22,6 +22,8 @@  #include <stddef.h> +#include <string> +  #include "i_engine.hpp"  #include "io_object.hpp"  #include "tcp_socket.hpp" @@ -37,7 +39,8 @@ namespace zmq      public:          zmq_engine_t (class io_thread_t *parent_, fd_t fd_, -            const options_t &options_, bool reconnect_, const char *address_); +            const options_t &options_, bool reconnect_,  +            const char *protocol_, const char *address_);          ~zmq_engine_t ();          //  i_engine interface implementation. @@ -70,6 +73,7 @@ namespace zmq          options_t options;          bool reconnect; +        std::string protocol;          std::string address;          zmq_engine_t (const zmq_engine_t&); diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index c602e1d..fe6aea2 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -25,7 +25,7 @@  zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,        fd_t fd_, const options_t &options_, bool reconnect_, -      const char *address_, uint64_t session_ordinal_) : +      const char *protocol_, const char *address_, uint64_t session_ordinal_) :      owned_t (parent_, owner_),      sent (false),      received (false), @@ -34,7 +34,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,  {      //  Create the engine object for this connection.      engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options, -        reconnect_, address_); +        reconnect_, protocol_, address_);      zmq_assert (engine);  } diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp index 414adfe..df14293 100644 --- a/src/zmq_init.hpp +++ b/src/zmq_init.hpp @@ -41,7 +41,8 @@ namespace zmq          zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_,              fd_t fd_, const options_t &options_, bool reconnect_, -            const char *address_, uint64_t session_ordinal_); +            const char *protocol_, const char *address_, +            uint64_t session_ordinal_);          ~zmq_init_t ();      private: diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp index 9ccd82b..d7cf292 100644 --- a/src/zmq_listener.cpp +++ b/src/zmq_listener.cpp @@ -36,9 +36,9 @@ zmq::zmq_listener_t::~zmq_listener_t ()  {  } -int zmq::zmq_listener_t::set_address (const char *addr_) +int zmq::zmq_listener_t::set_address (const char *protocol_, const char *addr_)  { -     return tcp_listener.set_address (addr_); +     return tcp_listener.set_address (protocol_, addr_);  }  void zmq::zmq_listener_t::process_plug () @@ -65,7 +65,7 @@ void zmq::zmq_listener_t::in_event ()      //  Create an init object.       io_thread_t *io_thread = choose_io_thread (options.affinity);      zmq_init_t *init = new (std::nothrow) zmq_init_t ( -        io_thread, owner, fd, options, false, NULL, 0); +        io_thread, owner, fd, options, f | 
