summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--man/Makefile.am2
-rw-r--r--man/convert2html.sh1
-rwxr-xr-xman/convert2pdf.sh2
-rw-r--r--man/man7/zmq.73
-rw-r--r--man/man7/zmq_inproc.73
-rw-r--r--man/man7/zmq_ipc.734
-rw-r--r--man/man7/zmq_pgm.71
-rw-r--r--man/man7/zmq_tcp.71
-rw-r--r--man/man7/zmq_udp.71
-rw-r--r--src/ip.cpp17
-rw-r--r--src/ip.hpp9
-rw-r--r--src/socket_base.cpp23
-rw-r--r--src/tcp_connecter.cpp117
-rw-r--r--src/tcp_connecter.hpp4
-rw-r--r--src/tcp_listener.cpp166
-rw-r--r--src/tcp_listener.hpp10
-rw-r--r--src/zmq_connecter.cpp10
-rw-r--r--src/zmq_connecter.hpp7
-rw-r--r--src/zmq_engine.cpp9
-rw-r--r--src/zmq_engine.hpp6
-rw-r--r--src/zmq_init.cpp4
-rw-r--r--src/zmq_init.hpp3
-rw-r--r--src/zmq_listener.cpp6
-rw-r--r--src/zmq_listener.hpp4
24 files changed, 323 insertions, 120 deletions
diff --git a/man/Makefile.am b/man/Makefile.am
index a292082..26867ab 100644
--- a/man/Makefile.am
+++ b/man/Makefile.am
@@ -7,7 +7,7 @@ dist_man_MANS = man1/zmq_forwarder.1 man1/zmq_streamer.1 man1/zmq_queue.1 \
man3/zmq_msg_data.3 man3/zmq_msg_size.3 man3/zmq_strerror.3 \
man7/zmq.7 man7/zmq_cpp.7 man7/zmq_python.7 man7/zmq_ruby.7 \
man7/zmq_cl.7 man7/zmq_tcp.7 man7/zmq_udp.7 man7/zmq_pgm.7 \
- man7/zmq_inproc.7
+ man7/zmq_inproc.7 man7/zmq_ipc.7
distclean-local:
-rm *.pdf
diff --git a/man/convert2html.sh b/man/convert2html.sh
index 26f443f..b92e8f6 100644
--- a/man/convert2html.sh
+++ b/man/convert2html.sh
@@ -51,4 +51,5 @@ groff -man -Thtml man7/zmq_tcp.7 > man7/zmq_tcp.7.html
groff -man -Thtml man7/zmq_udp.7 > man7/zmq_udp.7.html
groff -man -Thtml man7/zmq_pgm.7 > man7/zmq_pgm.7.html
groff -man -Thtml man7/zmq_inproc.7 > man7/zmq_inproc.7.html
+groff -man -Thtml man7/zmq_ipc.7 > man7/zmq_ipc.7.html
diff --git a/man/convert2pdf.sh b/man/convert2pdf.sh
index f8ab00c..dcd2901 100755
--- a/man/convert2pdf.sh
+++ b/man/convert2pdf.sh
@@ -83,4 +83,6 @@ groff -man -Tps man7/zmq_pgm.7 > man7/zmq_pgm.7.ps
ps2pdf man7/zmq_pgm.7.ps zmq_pgm.pdf
groff -man -Tps man7/zmq_inproc.7 > man7/zmq_inproc.7.ps
ps2pdf man7/zmq_inproc.7.ps zmq_inproc.pdf
+groff -man -Tps man7/zmq_ipc.7 > man7/zmq_ipc.7.ps
+ps2pdf man7/zmq_ipc.7.ps zmq_ipc.pdf
diff --git a/man/man7/zmq.7 b/man/man7/zmq.7
index 740726c..61543a7 100644
--- a/man/man7/zmq.7
+++ b/man/man7/zmq.7
@@ -112,6 +112,9 @@ UDP reliable multicast transport:
PGM reliable multicast transport:
.BR zmq_pgm(7)
+Inter-process transport:
+.BR zmq_ipc (7)
+
In-process (inter-thread) transport:
.BR zmq_inproc(7)
diff --git a/man/man7/zmq_inproc.7 b/man/man7/zmq_inproc.7
index bc09f90..0c84641 100644
--- a/man/man7/zmq_inproc.7
+++ b/man/man7/zmq_inproc.7
@@ -3,7 +3,7 @@
In-process (inter-thread) tranport for 0MQ
.SH SYNOPSIS
-In-process transport is optimised for passing messages betweem threads in the
+In-process transport is optimised for passing messages between threads in the
same process.
Messages are passed directly from one application thread to
@@ -32,6 +32,7 @@ wire format specification.
.SH "SEE ALSO"
+.BR zmq_ipc (7)
.BR zmq_tcp (7)
.BR zmq_udp (7)
.BR zmq_pgm (7)
diff --git a/man/man7/zmq_ipc.7 b/man/man7/zmq_ipc.7
new file mode 100644
index 0000000..28e57d0
--- /dev/null
+++ b/man/man7/zmq_ipc.7
@@ -0,0 +1,34 @@
+.TH zmq_ipc 7 "" "(c)2007-2010 iMatix Corporation" "0MQ User Manuals"
+.SH NAME
+Inter-process tranport for 0MQ
+.SH SYNOPSIS
+
+In-process transport is optimised for passing messages between processes on the
+same physical machine.
+
+.SH CONNECTION STRING
+
+Connection string for inproc transport is "inproc://" followed by a file name.
+The file will be used as placeholder for a message endpoint. (UNIX domain
+sockets associate a file with the listening socket in a similar way.)
+
+.nf
+ ipc:///tmp/my_ipc_endpoint
+ ipc:///tmp/prices.ipc
+.fi
+
+.SH WIRE FORMAT
+
+IPC transport doesn't transfer messages across the network thus there is no need
+for a wire format specification.
+
+.SH "SEE ALSO"
+
+.BR zmq_inproc (7)
+.BR zmq_tcp (7)
+.BR zmq_udp (7)
+.BR zmq_pgm (7)
+
+.SH AUTHOR
+Martin Sustrik <sustrik at 250bpm dot com>
+
diff --git a/man/man7/zmq_pgm.7 b/man/man7/zmq_pgm.7
index 68af978..39f639b 100644
--- a/man/man7/zmq_pgm.7
+++ b/man/man7/zmq_pgm.7
@@ -80,6 +80,7 @@ Following example shows how messages are arranged in subsequent packets:
.BR zmq_udp (7)
.BR zmq_tcp (7)
+.BR zmq_ipc (7)
.BR zmq_inproc (7)
.BR zmq_setsockopt (3)
diff --git a/man/man7/zmq_tcp.7 b/man/man7/zmq_tcp.7
index f5504c8..41a116b 100644
--- a/man/man7/zmq_tcp.7
+++ b/man/man7/zmq_tcp.7
@@ -72,6 +72,7 @@ Binary layout of a larger message:
.BR zmq_udp (7)
.BR zmq_pgm (7)
+.BR zmq_ipc (7)
.BR zmq_inproc (7)
.SH AUTHOR
diff --git a/man/man7/zmq_udp.7 b/man/man7/zmq_udp.7
index 151a6d4..d0bf46c 100644
--- a/man/man7/zmq_udp.7
+++ b/man/man7/zmq_udp.7
@@ -37,6 +37,7 @@ Same as with PGM transport except for UDP packet headers.
.BR zmq_pgm (7)
.BR zmq_tcp (7)
+.BR zmq_ipc (7)
.BR zmq_inproc (7)
.SH AUTHOR
diff --git a/src/ip.cpp b/src/ip.cpp
index 50af2ce..d5bb05d 100644
--- a/src/ip.cpp
+++ b/src/ip.cpp
@@ -309,3 +309,20 @@ int zmq::resolve_ip_hostname (sockaddr_in *addr_, const char *hostname_)
return 0;
}
+
+#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
+
+int zmq::resolve_local_path (sockaddr_un *addr_, const char *path_)
+{
+ if (strlen (path_) >= sizeof (addr_->sun_path))
+ {
+ errno = ENAMETOOLONG;
+ return -1;
+ }
+ strcpy (addr_->sun_path, path_);
+ addr_->sun_family = AF_LOCAL;
+ return 0;
+}
+
+#endif
+
diff --git a/src/ip.hpp b/src/ip.hpp
index 8a0a34f..16c1f62 100644
--- a/src/ip.hpp
+++ b/src/ip.hpp
@@ -32,6 +32,10 @@
#include <netdb.h>
#endif
+#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
+#include <sys/un.h>
+#endif
+
namespace zmq
{
@@ -42,6 +46,11 @@ namespace zmq
// This function resolves a string in <hostname>:<port-number> format.
// Hostname can be either the name of the host or its IP address.
int resolve_ip_hostname (sockaddr_in *addr_, const char *hostname_);
+
+#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
+ // This function sets up the sockaddr_un structure with the pathname_
+ int resolve_local_path( sockaddr_un * addr_, const char* pathname_);
+#endif
}
#endif
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index ad68fdb..d35c8c0 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -87,11 +87,19 @@ int zmq::socket_base_t::bind (const char *addr_)
if (addr_type == "inproc")
return register_endpoint (addr_args.c_str (), this);
- if (addr_type == "tcp") {
+ if (addr_type == "tcp" || addr_type == "ipc") {
+
+#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+ if (addr_type == "ipc") {
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
+#endif
+
zmq_listener_t *listener = new (std::nothrow) zmq_listener_t (
choose_io_thread (options.affinity), this, options);
zmq_assert (listener);
- int rc = listener->set_address (addr_args.c_str ());
+ int rc = listener->set_address (addr_type.c_str(), addr_args.c_str ());
if (rc != 0)
return -1;
@@ -202,7 +210,14 @@ int zmq::socket_base_t::connect (const char *addr_)
send_plug (session);
send_own (this, session);
- if (addr_type == "tcp") {
+ if (addr_type == "tcp" || addr_type == "ipc") {
+
+#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
+ if (addr_type == "ipc") {
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
+#endif
// Create the connecter object. Supply it with the session name
// so that it can bind the new connection to the session once
@@ -211,7 +226,7 @@ int zmq::socket_base_t::connect (const char *addr_)
choose_io_thread (options.affinity), this, options,
session->get_ordinal (), false);
zmq_assert (connecter);
- int rc = connecter->set_address (addr_args.c_str ());
+ int rc = connecter->set_address (addr_type.c_str(), addr_args.c_str ());
if (rc != 0) {
delete connecter;
return -1;
diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp
index 5dcebba..aea8a12 100644
--- a/src/tcp_connecter.cpp
+++ b/src/tcp_connecter.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <string.h>
+
#include <string>
#include "tcp_connecter.hpp"
@@ -38,10 +40,13 @@ zmq::tcp_connecter_t::~tcp_connecter_t ()
close ();
}
-int zmq::tcp_connecter_t::set_address (const char *addr_)
+int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_)
{
- // Convert the hostname into sockaddr_in structure.
- return resolve_ip_hostname (&addr, addr_);
+ if (strcmp (protocol_, "tcp") == 0)
+ return resolve_ip_hostname ((sockaddr_in*) &addr, addr_);
+
+ errno = EPROTONOSUPPORT;
+ return -1;
}
int zmq::tcp_connecter_t::open ()
@@ -67,7 +72,7 @@ int zmq::tcp_connecter_t::open ()
wsa_assert (rc != SOCKET_ERROR);
// Connect to the remote peer.
- rc = ::connect (s, (sockaddr*) &addr, sizeof addr);
+ rc = ::connect (s, (sockaddr*) &addr, sizeof (sockaddr_in));
// Connect was successfull immediately.
if (rc == 0)
@@ -143,58 +148,94 @@ zmq::tcp_connecter_t::~tcp_connecter_t ()
close ();
}
-int zmq::tcp_connecter_t::set_address (const char *addr_)
+int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_)
{
- // Convert the hostname into sockaddr_in structure.
- return resolve_ip_hostname (&addr, addr_);
+ if (strcmp (protocol_, "tcp") == 0)
+ return resolve_ip_hostname ((struct sockaddr_in*)&addr, addr_);
+ else if (strcmp (protocol_, "ipc") == 0)
+ return resolve_local_path (( struct sockaddr_un*)&addr, addr_);
+
+ errno = EPROTONOSUPPORT;
+ return -1;
}
int zmq::tcp_connecter_t::open ()
{
zmq_assert (s == retired_fd);
+ struct sockaddr *sa = (struct sockaddr*) &addr;
- // Create the socket.
- s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
- if (s == -1)
- return -1;
+ if (AF_INET == sa->sa_family) {
- // Set to non-blocking mode.
- int flags = fcntl (s, F_GETFL, 0);
- if (flags == -1)
- flags = 0;
- int rc = fcntl (s, F_SETFL, flags | O_NONBLOCK);
- errno_assert (rc != -1);
+ // Create the socket.
+ s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (s == -1)
+ return -1;
- // Disable Nagle's algorithm.
- int flag = 1;
- rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof (int));
- errno_assert (rc == 0);
+ // Set to non-blocking mode.
+ int flags = fcntl (s, F_GETFL, 0);
+ if (flags == -1)
+ flags = 0;
+ int rc = fcntl (s, F_SETFL, flags | O_NONBLOCK);
+ errno_assert (rc != -1);
+
+ // Disable Nagle's algorithm.
+ int flag = 1;
+ rc = setsockopt (s, IPPROTO_TCP, TCP_NODELAY, (char*) &flag, sizeof (int));
+ errno_assert (rc == 0);
#ifdef ZMQ_HAVE_OPENVMS
- // Disable delayed acknowledgements.
- flag = 1;
- rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, sizeof (int));
- errno_assert (rc != SOCKET_ERROR);
+ // Disable delayed acknowledgements.
+ flag = 1;
+ rc = setsockopt (s, IPPROTO_TCP, TCP_NODELACK, (char*) &flag, sizeof (int));
+ errno_assert (rc != SOCKET_ERROR);
#endif
- // Connect to the remote peer.
- rc = ::connect (s, (sockaddr*) &addr, sizeof (addr));
+ // Connect to the remote peer.
+ rc = ::connect (s, (struct sockaddr*) &addr, sizeof (sockaddr_in));
- // Connect was successfull immediately.
- if (rc == 0)
- return 0;
+ // Connect was successfull immediately.
+ if (rc == 0)
+ return 0;
- // Asynchronous connect was launched.
- if (rc == -1 && errno == EINPROGRESS) {
- errno = EAGAIN;
+ // Asynchronous connect was launched.
+ if (rc == -1 && errno == EINPROGRESS) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ // Error occured.
+ int err = errno;
+ close ();
+ errno = err;
+ return -1;
+ }
+ else if (AF_LOCAL == sa->sa_family) {
+ s = socket (AF_LOCAL, SOCK_STREAM, 0);
+ if (s == -1)
+ return -1;
+
+ // Set the non-blocking flag.
+ int flag = fcntl (s, F_GETFL, 0);
+ if (flag == -1)
+ flag = 0;
+ int rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
+ errno_assert (rc != -1);
+
+ // Connect to the remote peer.
+ rc = ::connect (s, (struct sockaddr*) &addr, sizeof (sockaddr_un));
+
+ // Connect was successfull immediately.
+ if (rc == 0)
+ return 0;
+
+ // Error occured.
+ int err = errno;
+ close ();
+ errno = err;
return -1;
}
- // Error occured.
- int err = errno;
- close ();
- errno = err;
- return -1;
+ zmq_assert (false);
}
int zmq::tcp_connecter_t::close ()
diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp
index d397296..cc4434f 100644
--- a/src/tcp_connecter.hpp
+++ b/src/tcp_connecter.hpp
@@ -35,8 +35,8 @@ namespace zmq
tcp_connecter_t ();
~tcp_connecter_t ();
- // Set IP address/port to connect to.
- int set_address (const char *addr_);
+ // Set address to connect to.
+ int set_address (const char *protocol, const char *addr_);
// Open TCP connecting socket. Address is in
// <hostname>:<port-number> format. Returns -1 in case of error,
diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp
index 9087405..2b30a8e 100644
--- a/src/tcp_listener.cpp
+++ b/src/tcp_listener.cpp
@@ -39,10 +39,16 @@ zmq::tcp_listener_t::~tcp_listener_t ()
close ();
}
-int zmq::tcp_listener_t::set_address (const char *addr_)
+int zmq::tcp_listener_t::set_address (cosnt char *protocol_, const char *addr_)
{
+ // IPC protocol is not supported on Windows platform.
+ if (strcmp (protocol_, "tcp") != 0 ) {
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
+
// Convert the interface into sockaddr_in structure.
- int rc = resolve_ip_interface (&addr, addr_);
+ int rc = resolve_ip_interface ((sockaddr_in*) &addr, addr_);
if (rc != 0)
return rc;
@@ -65,7 +71,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
wsa_assert (rc != SOCKET_ERROR);
// Bind the socket to the network interface and port.
- rc = bind (s, (struct sockaddr*) &addr, sizeof (addr));
+ rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_in));
if (rc == SOCKET_ERROR) {
wsa_error_to_errno ();
return -1;
@@ -131,6 +137,7 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
+#include <sys/un.h>
zmq::tcp_listener_t::tcp_listener_t () :
s (retired_fd)
@@ -144,45 +151,91 @@ zmq::tcp_listener_t::~tcp_listener_t ()
close ();
}
-int zmq::tcp_listener_t::set_address (const char *addr_)
+int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_)
{
- // Convert the interface into sockaddr_in structure.
- int rc = resolve_ip_interface (&addr, addr_);
- if (rc != 0)
- return rc;
-
- // Create a listening socket.
- s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
- if (s == -1)
- return -1;
-
- // Allow reusing of the address.
- int flag = 1;
- rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
- errno_assert (rc == 0);
-
- // Set the non-blocking flag.
- flag = fcntl (s, F_GETFL, 0);
- if (flag == -1)
- flag = 0;
- rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
- errno_assert (rc != -1);
-
- // Bind the socket to the network interface and port.
- rc = bind (s, (struct sockaddr*) &addr, sizeof (addr));
- if (rc != 0) {
- close ();
- return -1;
+ if (strcmp (protocol_, "tcp") == 0 ) {
+
+ // Convert the interface into sockaddr_in structure.
+ int rc = resolve_ip_interface ((struct sockaddr_in*) &addr, addr_);
+ if (rc != 0)
+ return -1;
+
+ // Create a listening socket.
+ s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (s == -1)
+ return -1;
+
+ // Allow reusing of the address.
+ int flag = 1;
+ rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
+ errno_assert (rc == 0);
+
+ // Set the non-blocking flag.
+ flag = fcntl (s, F_GETFL, 0);
+ if (flag == -1)
+ flag = 0;
+ rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
+ errno_assert (rc != -1);
+
+ // Bind the socket to the network interface and port.
+ rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_in));
+ if (rc != 0) {
+ close ();
+ return -1;
+ }
+
+ // Listen for incomming connections.
+ rc = listen (s, tcp_connection_backlog);
+ if (rc != 0) {
+ close ();
+ return -1;
+ }
+
+ return 0;
}
-
- // Listen for incomming connections.
- rc = listen (s, tcp_connection_backlog);
- if (rc != 0) {
- close ();
- return -1;
+ else if (strcmp (protocol_, "ipc") == 0) {
+
+ // Get rid of the file associated with the UNIX domain socket that
+ // may have been left behind by the previous run of the application.
+ ::unlink (addr_);
+
+ // Convert the address into sockaddr_un structure.
+ int rc = resolve_local_path ((struct sockaddr_un*) &addr, addr_);
+ if (rc != 0)
+ return -1;
+
+ // Create a listening socket.
+ s = socket (AF_LOCAL, SOCK_STREAM, 0);
+ if (s == -1)
+ return -1;
+
+ // Set the non-blocking flag.
+ int flag = fcntl (s, F_GETFL, 0);
+ if (flag == -1)
+ flag = 0;
+ rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
+ errno_assert (rc != -1);
+
+ // Bind the socket to the file path.
+ rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_un));
+ if (rc != 0) {
+ close ();
+ return -1;
+ }
+
+ // Listen for incomming connections.
+ rc = listen (s, tcp_connection_backlog);
+ if (rc != 0) {
+ close ();
+ return -1;
+ }
+
+ return 0;
}
-
- return 0;
+ else {
+ errno = EPROTONOSUPPORT;
+ return -1;
+ }
}
int zmq::tcp_listener_t::close ()
@@ -192,6 +245,17 @@ int zmq::tcp_listener_t::close ()
if (rc != 0)
return -1;
s = retired_fd;
+
+ // If there's an underlying UNIX domain socket, get rid of the file it
+ // is associated with.
+ struct sockaddr *sa = (struct sockaddr*) &addr;
+ if (AF_LOCAL == sa->sa_family) {
+ struct sockaddr_un *sun = (struct sockaddr_un*) &addr;
+ rc = ::unlink(sun->sun_path);
+ if (rc != 0)
+ return -1;
+ }
+
return 0;
}
@@ -239,19 +303,23 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
int rc = fcntl (sock, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc != -1);
- // Disable Nagle's algorithm.
- int flag = 1;
- rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
- sizeof (int));
- errno_assert (rc == 0);
+ struct sockaddr *sa = (struct sockaddr*) &addr;
+ if (AF_INET == sa->sa_family) {
+
+ // Disable Nagle's algorithm.
+ int flag = 1;
+ rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
+ sizeof (int));
+ errno_assert (rc == 0);
#ifdef ZMQ_HAVE_OPENVMS
- // Disable delayed acknowledgements.
- flag = 1;
- rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELACK, (char*) &flag,
- sizeof (int));
- errno_assert (rc != SOCKET_ERROR);
+ // Disable delayed acknowledgements.
+ flag = 1;
+ rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELACK, (char*) &flag,
+ sizeof (int));
+ errno_assert (rc != SOCKET_ERROR);
#endif
+ }
return sock;
}
diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp
index 1dfe288..87748d0 100644
--- a/src/tcp_listener.hpp
+++ b/src/tcp_listener.hpp
@@ -35,10 +35,8 @@ namespace zmq
tcp_listener_t ();
~tcp_listener_t ();
- // Start listening on the interface. Address is in
- // <interface-name>:<port-number> format. Interface name may be '*'
- // to bind to all the interfaces.
- int set_address (const char *addr_);
+ // Start listening on the interface.
+ int set_address (const char *protocol_, const char *addr_);
// Close the listening socket.
int close ();
@@ -54,8 +52,8 @@ namespace zmq
private:
- // IP address/port to listen on.
- sockaddr_in addr;
+ // Address to listen on.
+ sockaddr_storage addr;
// Underlying socket.
fd_t s;
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index d4c2727..ebd7572 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -41,11 +41,13 @@ zmq::zmq_connecter_t::~zmq_connecter_t ()
{
}
-int zmq::zmq_connecter_t::set_address (const char *address_)
+int zmq::zmq_connecter_t::set_address (const char *protocol_,
+ const char *address_)
{
- int rc = tcp_connecter.set_address (address_);
+ int rc = tcp_connecter.set_address (protocol_, address_);
if (rc != 0)
return rc;
+ protocol = protocol_;
address = address_;
return 0;
}
@@ -91,7 +93,8 @@ void zmq::zmq_connecter_t::out_event ()
// Create an init object.
zmq_init_t *init = new (std::nothrow) zmq_init_t (
choose_io_thread (options.affinity), owner,
- fd, options, true, address.c_str (), session_ordinal);
+ fd, options, true, protocol.c_str (), address.c_str (),
+ session_ordinal);
zmq_assert (init);
send_plug (init);
send_own (owner, init);
@@ -128,7 +131,6 @@ void zmq::zmq_connecter_t::start_connecting ()
}
// Handle any other error condition by eventual reconnect.
- tcp_connecter.close ();
wait = true;
add_timer ();
}
diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp
index c3a42a9..328dd6a 100644
--- a/src/zmq_connecter.hpp
+++ b/src/zmq_connecter.hpp
@@ -39,8 +39,8 @@ namespace zmq
const options_t &options_, uint64_t session_ordinal_, bool wait_);
~zmq_connecter_t ();
- // Set IP address to connect to.
- int set_address (const char *address_);
+ // Set address to connect to.
+ int set_address (const char *protocol_, const char *address_);
private:
@@ -75,7 +75,8 @@ namespace zmq
// Associated socket options.
options_t options;
- // Address to connect to.
+ // Protocol and address to connect to.
+ std::string protocol;
std::string address;
zmq_connecter_t (const zmq_connecter_t&);
diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index a79c0bd..dace6ae 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -27,7 +27,8 @@
#include "err.hpp"
zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
- const options_t &options_, bool reconnect_, const char *address_) :
+ const options_t &options_, bool reconnect_,
+ const char *protocol_, const char *address_) :
io_object_t (parent_),
inpos (NULL),
insize (0),
@@ -39,8 +40,10 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
options (options_),
reconnect (reconnect_)
{
- if (reconnect)
+ if (reconnect) {
+ protocol = protocol_;
address = address_;
+ }
// Initialise the underlying socket.
int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf);
@@ -166,7 +169,7 @@ void zmq::zmq_engine_t::error ()
inout->get_io_thread (), inout->get_owner (),
options, inout->get_ordinal (), true);
zmq_assert (reconnecter);
- reconnecter->set_address (address.c_str ());
+ reconnecter->set_address (protocol.c_str(), address.c_str ());
}
inout->detach (reconnecter);
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index d26e304..ddd0931 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -22,6 +22,8 @@
#include <stddef.h>
+#include <string>
+
#include "i_engine.hpp"
#include "io_object.hpp"
#include "tcp_socket.hpp"
@@ -37,7 +39,8 @@ namespace zmq
public:
zmq_engine_t (class io_thread_t *parent_, fd_t fd_,
- const options_t &options_, bool reconnect_, const char *address_);
+ const options_t &options_, bool reconnect_,
+ const char *protocol_, const char *address_);
~zmq_engine_t ();
// i_engine interface implementation.
@@ -70,6 +73,7 @@ namespace zmq
options_t options;
bool reconnect;
+ std::string protocol;
std::string address;
zmq_engine_t (const zmq_engine_t&);
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index c602e1d..fe6aea2 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -25,7 +25,7 @@
zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,
fd_t fd_, const options_t &options_, bool reconnect_,
- const char *address_, uint64_t session_ordinal_) :
+ const char *protocol_, const char *address_, uint64_t session_ordinal_) :
owned_t (parent_, owner_),
sent (false),
received (false),
@@ -34,7 +34,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,
{
// Create the engine object for this connection.
engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options,
- reconnect_, address_);
+ reconnect_, protocol_, address_);
zmq_assert (engine);
}
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index 414adfe..df14293 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -41,7 +41,8 @@ namespace zmq
zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_,
fd_t fd_, const options_t &options_, bool reconnect_,
- const char *address_, uint64_t session_ordinal_);
+ const char *protocol_, const char *address_,
+ uint64_t session_ordinal_);
~zmq_init_t ();
private:
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index 9ccd82b..d7cf292 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -36,9 +36,9 @@ zmq::zmq_listener_t::~zmq_listener_t ()
{
}
-int zmq::zmq_listener_t::set_address (const char *addr_)
+int zmq::zmq_listener_t::set_address (const char *protocol_, const char *addr_)
{
- return tcp_listener.set_address (addr_);
+ return tcp_listener.set_address (protocol_, addr_);
}
void zmq::zmq_listener_t::process_plug ()
@@ -65,7 +65,7 @@ void zmq::zmq_listener_t::in_event ()
// Create an init object.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_init_t *init = new (std::nothrow) zmq_init_t (
- io_thread, owner, fd, options, false, NULL, 0);
+ io_thread, owner, fd, options, false, NULL, NULL, 0);
zmq_assert (init);
send_plug (init);
send_own (owner, init);
diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp
index 6f8cdc9..c82a280 100644
--- a/src/zmq_listener.hpp
+++ b/src/zmq_listener.hpp
@@ -36,8 +36,8 @@ namespace zmq
zmq_listener_t (class io_thread_t *parent_, socket_base_t *owner_,
const options_t &options_);
- // Set IP address to listen on.
- int set_address (const char *addr_);
+ // Set address to listen on.
+ int set_address (const char* protocol_, const char *addr_);
private: