summaryrefslogtreecommitdiff
path: root/src/pgm_socket.cpp
diff options
context:
space:
mode:
authormalosek <malosek@fastmq.com>2009-09-28 18:06:06 +0200
committermalosek <malosek@fastmq.com>2009-09-28 18:06:06 +0200
commit39d915ded8ccb612ae1f9aaefcd93f349f4c8f4c (patch)
tree1da94b99d5072345e53d58ba5e304887ee7f1c1f /src/pgm_socket.cpp
parentcf6cc0128ff4d26e0059f399bbb8342ce259b996 (diff)
PGM2 sender
Diffstat (limited to 'src/pgm_socket.cpp')
-rw-r--r--src/pgm_socket.cpp85
1 files changed, 67 insertions, 18 deletions
diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp
index 65a80a5..dbfe52d 100644
--- a/src/pgm_socket.cpp
+++ b/src/pgm_socket.cpp
@@ -39,7 +39,7 @@
#include "uuid.hpp"
//#define PGM_SOCKET_DEBUG
-//#define PGM_SOCKET_DEBUG_LEVEL 1
+//#define PGM_SOCKET_DEBUG_LEVEL 4
// level 1 = key behaviour
// level 2 = processing flow
@@ -275,7 +275,7 @@ int zmq::pgm_socket_t::open_transport (void)
// Set transport->can_send_data = FALSE.
// Note that NAKs are still generated by the transport.
- rc = pgm_transport_set_recv_only (g_transport, false);
+ rc = pgm_transport_set_recv_only (g_transport, true, false);
if (rc != pgm_ok) {
errno = EINVAL;
return -1;
@@ -511,8 +511,18 @@ int zmq::pgm_socket_t::get_receiver_fds (int *recv_fd_,
// Get fds and store them into user allocated memory.
// sender_fd is from pgm_transport->send_sock.
// receive_fd_ is from transport->recv_sock.
-int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_)
+int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,
+ int *rdata_notify_fd_)
{
+#ifdef ZMQ_HAVE_OPENPGM1
+ zmq_assert (send_fd_);
+ zmq_assert (receive_fd_);
+ zmq_assert (!rdata_notify_fd_);
+#elif ZMQ_HAVE_OPENPGM2
+ zmq_assert (send_fd_);
+ zmq_assert (receive_fd_);
+ zmq_assert (rdata_notify_fd_);
+#endif
// Preallocate pollfds array.
int fds_array_size = pgm_sender_fd_count;
@@ -530,8 +540,14 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_)
zmq_assert (rc == pgm_sender_fd_count);
// Store pfds into user allocated space.
+#ifdef ZMQ_HAVE_OPENPGM1
*receive_fd_ = fds [0].fd;
*send_fd_ = fds [1].fd;
+#elif ZMQ_HAVE_OPENPGM2
+ *receive_fd_ = fds [0].fd;
+ *rdata_notify_fd_ = fds [1].fd;
+ *send_fd_ = fds [2].fd;
+#endif
delete [] fds;
@@ -542,10 +558,9 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_)
size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
{
- ssize_t nbytes = 0;
-
#ifdef ZMQ_HAVE_OPENPGM1
+ ssize_t nbytes = 0;
iovec iov = {data_,data_len_};
nbytes = pgm_transport_send_packetv (g_transport, &iov, 1,
@@ -561,13 +576,30 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
// now. We have to call write_one_pkt again.
nbytes = nbytes == -1 ? 0 : nbytes;
- zmq_log (4, "wrote %iB, %s(%i)\n", (int)nbytes, __FILE__, __LINE__);
+#elif ZMQ_HAVE_OPENPGM2
+
+ size_t nbytes = 0;
+
+ PGMIOStatus status = pgm_send (g_transport, data_, data_len_, &nbytes);
+
+ if (nbytes != data_len_) {
+ zmq_log (1, "status %i, data_len %i, wrote %iB, %s(%i)\n",
+ (int) status, (int) data_len_, (int) nbytes, __FILE__, __LINE__);
+
+ zmq_assert (status == PGM_IO_STATUS_AGAIN2);
+ zmq_assert (nbytes == 0);
+ }
+#endif
+
+ zmq_log (4, "wrote %i/%iB, %s(%i)\n", (int) nbytes, (int) data_len_,
+ __FILE__, __LINE__);
// We have to write all data as one packet.
if (nbytes > 0) {
- zmq_assert (nbytes == (ssize_t)data_len_);
+ zmq_log (1, "data sent %i, %s(%i)\n", (int) nbytes,
+ __FILE__, __LINE__);
+ zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_);
}
-#endif
return nbytes;
}
@@ -603,15 +635,17 @@ size_t zmq::pgm_socket_t::get_max_apdu_at_once (size_t readbuf_size_)
// content via pgm_transport_send() calls or unused with pgm_packetv_free1().
void *zmq::pgm_socket_t::get_buffer (size_t *size_)
{
-#ifdef ZMQ_HAVE_OPENPGM1
// Store size.
*size_ = get_max_tsdu_size ();
- // Allocate one packet.
+#ifdef ZMQ_HAVE_OPENPGM1
+ // Allocate one packet in tx window.
return pgm_packetv_alloc (g_transport, false);
#elif ZMQ_HAVE_OPENPGM2
- zmq_assert (false);
-
+ // Allocate buffer.
+ unsigned char *apdu_buff = new unsigned char [*size_];
+ zmq_assert (apdu_buff);
+ return apdu_buff;
#endif
}
@@ -622,7 +656,7 @@ void zmq::pgm_socket_t::free_buffer (void *data_)
#ifdef ZMQ_HAVE_OPENPGM1
pgm_packetv_free1 (g_transport, data_, false);
#elif ZMQ_HAVE_OPENPGM2
- zmq_assert (false);
+ delete [] (unsigned char*) data_;
#endif
}
@@ -718,6 +752,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n",
status, (int) nbytes_rec, __FILE__, __LINE__);
+ zmq_assert (false);
+
nbytes_rec = 0;
return -1;
}
@@ -767,21 +803,34 @@ void zmq::pgm_socket_t::process_upstream (void)
{
zmq_log (1, "On upstream packet, %s(%i)\n", __FILE__, __LINE__);
- ssize_t dummy_bytes = 0;
+ pgm_msgv_t dummy_msg;
#ifdef ZMQ_HAVE_OPENPGM1
+ ssize_t dummy_bytes = 0;
+
// We acctually do not want to read any data here we are going to
// process NAK.
- pgm_msgv_t dummy_msg;
dummy_bytes = pgm_transport_recvmsgv (g_transport, &dummy_msg,
1, MSG_DONTWAIT);
+
+ // No data should be returned.
+ zmq_assert (dummy_bytes == -1 && errno == EAGAIN);
+
#elif defined ZMQ_HAVE_OPENPGM2
- zmq_assert (false);
+ size_t dummy_bytes = 0;
+ GError *pgm_error = NULL;
+
+ PGMIOStatus status = pgm_recvmsgv (g_transport, &dummy_msg,
+ 1, MSG_DONTWAIT, &dummy_bytes, &pgm_error);
+
+ zmq_log (1, "upstream status %i, nbytes %i, %s(%i)\n",
+ (int) status, (int) dummy_bytes, __FILE__, __LINE__);
+
+ // No data should be returned.
+ zmq_assert (dummy_bytes == 0 && status == PGM_IO_STATUS_AGAIN);
#endif
- // No data should be returned.
- zmq_assert (dummy_bytes == -1 && errno == EAGAIN);
}
#endif