summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ip.cpp4
-rw-r--r--src/kqueue.cpp25
-rw-r--r--src/options.cpp4
-rw-r--r--src/pgm_socket.cpp3
4 files changed, 24 insertions, 12 deletions
diff --git a/src/ip.cpp b/src/ip.cpp
index b11a244..8090a8a 100644
--- a/src/ip.cpp
+++ b/src/ip.cpp
@@ -40,7 +40,7 @@ zmq::fd_t zmq::open_socket (int domain_, int type_, int protocol_)
{
// Setting this option result in sane behaviour when exec() functions
// are used. Old sockets are closed and don't block TCP ports etc.
-#if defined SOCK_CLOEXEC
+#if defined ZMQ_HAVE_SOCK_CLOEXEC
type_ |= SOCK_CLOEXEC;
#endif
@@ -51,7 +51,7 @@ zmq::fd_t zmq::open_socket (int domain_, int type_, int protocol_)
// If there's no SOCK_CLOEXEC, let's try the second best option. Note that
// race condition can cause socket not to be closed (if fork happens
// between socket creation and this point).
-#if !defined SOCK_CLOEXEC && defined FD_CLOEXEC
+#if !defined ZMQ_HAVE_SOCK_CLOEXEC && defined FD_CLOEXEC
int rc = fcntl (s, F_SETFD, FD_CLOEXEC);
errno_assert (rc != -1);
#endif
diff --git a/src/kqueue.cpp b/src/kqueue.cpp
index bb42d8f..cbf38d1 100644
--- a/src/kqueue.cpp
+++ b/src/kqueue.cpp
@@ -33,6 +33,7 @@
#include "err.hpp"
#include "config.hpp"
#include "i_poll_events.hpp"
+#include "likely.hpp"
// NetBSD defines (struct kevent).udata as intptr_t, everyone else
// as void *.
@@ -106,29 +107,37 @@ void zmq::kqueue_t::rm_fd (handle_t handle_)
void zmq::kqueue_t::set_pollin (handle_t handle_)
{
poll_entry_t *pe = (poll_entry_t*) handle_;
- pe->flag_pollin = true;
- kevent_add (pe->fd, EVFILT_READ, pe);
+ if (likely (!pe->flag_pollin)) {
+ pe->flag_pollin = true;
+ kevent_add (pe->fd, EVFILT_READ, pe);
+ }
}
void zmq::kqueue_t::reset_pollin (handle_t handle_)
{
poll_entry_t *pe = (poll_entry_t*) handle_;
- pe->flag_pollin = false;
- kevent_delete (pe->fd, EVFILT_READ);
+ if (likely (pe->flag_pollin)) {
+ pe->flag_pollin = false;
+ kevent_delete (pe->fd, EVFILT_READ);
+ }
}
void zmq::kqueue_t::set_pollout (handle_t handle_)
{
poll_entry_t *pe = (poll_entry_t*) handle_;
- pe->flag_pollout = true;
- kevent_add (pe->fd, EVFILT_WRITE, pe);
+ if (likely (!pe->flag_pollout)) {
+ pe->flag_pollout = true;
+ kevent_add (pe->fd, EVFILT_WRITE, pe);
+ }
}
void zmq::kqueue_t::reset_pollout (handle_t handle_)
{
poll_entry_t *pe = (poll_entry_t*) handle_;
- pe->flag_pollout = false;
- kevent_delete (pe->fd, EVFILT_WRITE);
+ if (likely (pe->flag_pollout)) {
+ pe->flag_pollout = false;
+ kevent_delete (pe->fd, EVFILT_WRITE);
+ }
}
void zmq::kqueue_t::start ()
diff --git a/src/options.cpp b/src/options.cpp
index 89cf429..8a3e527 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -24,8 +24,8 @@
#include "err.hpp"
zmq::options_t::options_t () :
- sndhwm (0),
- rcvhwm (0),
+ sndhwm (1000),
+ rcvhwm (1000),
affinity (0),
rate (100),
recovery_ivl (10000),
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 5bf49d0..378370c 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -222,6 +222,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
goto err_abort;
} else {
const int send_only = 1,
+ max_rte = (int) ((options.rate * 1000) / 8),
txw_max_tpdu = (int) pgm_max_tpdu,
txw_sqns = compute_sqns (txw_max_tpdu),
ambient_spm = pgm_secs (30),
@@ -237,6 +238,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY,
&send_only, sizeof (send_only)) ||
+ !pgm_setsockopt (sock, IPPROTO_PGM, PGM_ODATA_MAX_RTE,
+ &max_rte, sizeof (max_rte)) ||
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS,
&txw_sqns, sizeof (txw_sqns)) ||
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM,