summaryrefslogtreecommitdiff
path: root/src/pgm_sender.cpp
diff options
context:
space:
mode:
authormalosek <malosek@fastmq.com>2009-09-28 18:06:06 +0200
committermalosek <malosek@fastmq.com>2009-09-28 18:06:06 +0200
commit39d915ded8ccb612ae1f9aaefcd93f349f4c8f4c (patch)
tree1da94b99d5072345e53d58ba5e304887ee7f1c1f /src/pgm_sender.cpp
parentcf6cc0128ff4d26e0059f399bbb8342ce259b996 (diff)
PGM2 sender
Diffstat (limited to 'src/pgm_sender.cpp')
-rw-r--r--src/pgm_sender.cpp26
1 files changed, 23 insertions, 3 deletions
diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp
index 423865b..51dfbf1 100644
--- a/src/pgm_sender.cpp
+++ b/src/pgm_sender.cpp
@@ -68,13 +68,21 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)
{
// Alocate 2 fds for PGM socket.
- int downlink_socket_fd;
- int uplink_socket_fd;
+ int downlink_socket_fd = 0;
+ int uplink_socket_fd = 0;
+#ifdef ZMQ_HAVE_OPENPGM2
+ int rdata_notify_fd = 0;
+#endif
encoder.set_inout (inout_);
// Fill fds from PGM transport.
+#ifdef ZMQ_HAVE_OPENPGM1
pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd);
+#elif ZMQ_HAVE_OPENPGM2
+ pgm_socket.get_sender_fds
+ (&downlink_socket_fd, &uplink_socket_fd, &rdata_notify_fd);
+#endif
// Add downlink_socket_fd into poller.
handle = add_fd (downlink_socket_fd);
@@ -82,9 +90,17 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)
// Add uplink_socket_fd into the poller.
uplink_handle = add_fd (uplink_socket_fd);
+ // Add rdata_notify_fd into the poller.
+#ifdef ZMQ_HAVE_OPENPGM2
+ rdata_notify_handle = add_fd (rdata_notify_fd);
+#endif
+
// Set POLLIN. We wont never want to stop polling for uplink = we never
// want to stop porocess NAKs.
set_pollin (uplink_handle);
+#ifdef ZMQ_HAVE_OPENPGM2
+ set_pollin (rdata_notify_handle);
+#endif
// Set POLLOUT for downlink_socket_handle.
set_pollout (handle);
@@ -96,6 +112,9 @@ void zmq::pgm_sender_t::unplug ()
{
rm_fd (handle);
rm_fd (uplink_handle);
+#ifdef ZMQ_HAVE_OPENPGM2
+ rm_fd (rdata_notify_handle);
+#endif
encoder.set_inout (NULL);
inout = NULL;
}
@@ -167,11 +186,12 @@ void zmq::pgm_sender_t::out_event ()
zmq_log (1, "pgm rate limit reached, %s(%i)\n", __FILE__, __LINE__);
}
+#ifdef ZMQ_HAVE_OPENPGM1
// After sending data slice is owned by tx window.
if (nbytes) {
out_buffer = NULL;
}
-
+#endif
write_pos += nbytes;
}