From 39d915ded8ccb612ae1f9aaefcd93f349f4c8f4c Mon Sep 17 00:00:00 2001 From: malosek Date: Mon, 28 Sep 2009 18:06:06 +0200 Subject: PGM2 sender --- src/pgm_socket.cpp | 85 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 67 insertions(+), 18 deletions(-) (limited to 'src/pgm_socket.cpp') diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 65a80a5..dbfe52d 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -39,7 +39,7 @@ #include "uuid.hpp" //#define PGM_SOCKET_DEBUG -//#define PGM_SOCKET_DEBUG_LEVEL 1 +//#define PGM_SOCKET_DEBUG_LEVEL 4 // level 1 = key behaviour // level 2 = processing flow @@ -275,7 +275,7 @@ int zmq::pgm_socket_t::open_transport (void) // Set transport->can_send_data = FALSE. // Note that NAKs are still generated by the transport. - rc = pgm_transport_set_recv_only (g_transport, false); + rc = pgm_transport_set_recv_only (g_transport, true, false); if (rc != pgm_ok) { errno = EINVAL; return -1; @@ -511,8 +511,18 @@ int zmq::pgm_socket_t::get_receiver_fds (int *recv_fd_, // Get fds and store them into user allocated memory. // sender_fd is from pgm_transport->send_sock. // receive_fd_ is from transport->recv_sock. -int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_) +int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, + int *rdata_notify_fd_) { +#ifdef ZMQ_HAVE_OPENPGM1 + zmq_assert (send_fd_); + zmq_assert (receive_fd_); + zmq_assert (!rdata_notify_fd_); +#elif ZMQ_HAVE_OPENPGM2 + zmq_assert (send_fd_); + zmq_assert (receive_fd_); + zmq_assert (rdata_notify_fd_); +#endif // Preallocate pollfds array. int fds_array_size = pgm_sender_fd_count; @@ -530,8 +540,14 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_) zmq_assert (rc == pgm_sender_fd_count); // Store pfds into user allocated space. +#ifdef ZMQ_HAVE_OPENPGM1 *receive_fd_ = fds [0].fd; *send_fd_ = fds [1].fd; +#elif ZMQ_HAVE_OPENPGM2 + *receive_fd_ = fds [0].fd; + *rdata_notify_fd_ = fds [1].fd; + *send_fd_ = fds [2].fd; +#endif delete [] fds; @@ -542,10 +558,9 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_) size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) { - ssize_t nbytes = 0; - #ifdef ZMQ_HAVE_OPENPGM1 + ssize_t nbytes = 0; iovec iov = {data_,data_len_}; nbytes = pgm_transport_send_packetv (g_transport, &iov, 1, @@ -561,13 +576,30 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) // now. We have to call write_one_pkt again. nbytes = nbytes == -1 ? 0 : nbytes; - zmq_log (4, "wrote %iB, %s(%i)\n", (int)nbytes, __FILE__, __LINE__); +#elif ZMQ_HAVE_OPENPGM2 + + size_t nbytes = 0; + + PGMIOStatus status = pgm_send (g_transport, data_, data_len_, &nbytes); + + if (nbytes != data_len_) { + zmq_log (1, "status %i, data_len %i, wrote %iB, %s(%i)\n", + (int) status, (int) data_len_, (int) nbytes, __FILE__, __LINE__); + + zmq_assert (status == PGM_IO_STATUS_AGAIN2); + zmq_assert (nbytes == 0); + } +#endif + + zmq_log (4, "wrote %i/%iB, %s(%i)\n", (int) nbytes, (int) data_len_, + __FILE__, __LINE__); // We have to write all data as one packet. if (nbytes > 0) { - zmq_assert (nbytes == (ssize_t)data_len_); + zmq_log (1, "data sent %i, %s(%i)\n", (int) nbytes, + __FILE__, __LINE__); + zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_); } -#endif return nbytes; } @@ -603,15 +635,17 @@ size_t zmq::pgm_socket_t::get_max_apdu_at_once (size_t readbuf_size_) // content via pgm_transport_send() calls or unused with pgm_packetv_free1(). void *zmq::pgm_socket_t::get_buffer (size_t *size_) { -#ifdef ZMQ_HAVE_OPENPGM1 // Store size. *size_ = get_max_tsdu_size (); - // Allocate one packet. +#ifdef ZMQ_HAVE_OPENPGM1 + // Allocate one packet in tx window. return pgm_packetv_alloc (g_transport, false); #elif ZMQ_HAVE_OPENPGM2 - zmq_assert (false); - + // Allocate buffer. + unsigned char *apdu_buff = new unsigned char [*size_]; + zmq_assert (apdu_buff); + return apdu_buff; #endif } @@ -622,7 +656,7 @@ void zmq::pgm_socket_t::free_buffer (void *data_) #ifdef ZMQ_HAVE_OPENPGM1 pgm_packetv_free1 (g_transport, data_, false); #elif ZMQ_HAVE_OPENPGM2 - zmq_assert (false); + delete [] (unsigned char*) data_; #endif } @@ -718,6 +752,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n", status, (int) nbytes_rec, __FILE__, __LINE__); + zmq_assert (false); + nbytes_rec = 0; return -1; } @@ -767,21 +803,34 @@ void zmq::pgm_socket_t::process_upstream (void) { zmq_log (1, "On upstream packet, %s(%i)\n", __FILE__, __LINE__); - ssize_t dummy_bytes = 0; + pgm_msgv_t dummy_msg; #ifdef ZMQ_HAVE_OPENPGM1 + ssize_t dummy_bytes = 0; + // We acctually do not want to read any data here we are going to // process NAK. - pgm_msgv_t dummy_msg; dummy_bytes = pgm_transport_recvmsgv (g_transport, &dummy_msg, 1, MSG_DONTWAIT); + + // No data should be returned. + zmq_assert (dummy_bytes == -1 && errno == EAGAIN); + #elif defined ZMQ_HAVE_OPENPGM2 - zmq_assert (false); + size_t dummy_bytes = 0; + GError *pgm_error = NULL; + + PGMIOStatus status = pgm_recvmsgv (g_transport, &dummy_msg, + 1, MSG_DONTWAIT, &dummy_bytes, &pgm_error); + + zmq_log (1, "upstream status %i, nbytes %i, %s(%i)\n", + (int) status, (int) dummy_bytes, __FILE__, __LINE__); + + // No data should be returned. + zmq_assert (dummy_bytes == 0 && status == PGM_IO_STATUS_AGAIN); #endif - // No data should be returned. - zmq_assert (dummy_bytes == -1 && errno == EAGAIN); } #endif -- cgit v1.2.3