From 4a7aad06d95701cf232198093ce396dcdbb53e5b Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 16 Feb 2012 10:01:47 +0900 Subject: ZeroMQ renamed to Crossroads Signed-off-by: Martin Sustrik --- src/pgm_socket.cpp | 146 ++++++++++++++++++++++++++--------------------------- 1 file changed, 73 insertions(+), 73 deletions(-) (limited to 'src/pgm_socket.cpp') diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 0274ee4..eeaafa0 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -1,17 +1,17 @@ /* - Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2009-2012 250bpm s.r.o. Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2010-2011 Miru Limited Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - This file is part of 0MQ. + This file is part of Crossroads project. - 0MQ is free software; you can redistribute it and/or modify it under + Crossroads is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. - 0MQ is distributed in the hope that it will be useful, + Crossroads is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. @@ -22,13 +22,13 @@ #include "platform.hpp" -#ifdef ZMQ_HAVE_OPENPGM +#ifdef XS_HAVE_OPENPGM -#ifdef ZMQ_HAVE_WINDOWS +#ifdef XS_HAVE_WINDOWS #include "windows.hpp" #endif -#ifdef ZMQ_HAVE_LINUX +#ifdef XS_HAVE_LINUX #include #endif @@ -47,7 +47,7 @@ #define MSG_ERRQUEUE 0x2000 #endif -zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : +xs::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : sock (NULL), options (options_), receiver (receiver_), @@ -64,10 +64,10 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : // e.g. eth0;239.192.0.1:7500 // link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000 // ;[fe80::1%en0]:7500 -int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) +int xs::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) { // Can not open transport before destroying old one. - zmq_assert (sock == NULL); + xs_assert (sock == NULL); // Parse port number, start from end for IPv6 const char *port_delim = strrchr (network_, ':'); @@ -86,7 +86,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) memset (network, '\0', sizeof (network)); memcpy (network, network_, port_delim - network_); - zmq_assert (options.rate > 0); + xs_assert (options.rate > 0); // Zero counter used in msgrecv. nbytes_rec = 0; @@ -102,7 +102,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (!pgm_getaddrinfo (network, NULL, &res, &pgm_error)) { // Invalid parameters don't set pgm_error_t. - zmq_assert (pgm_error != NULL); + xs_assert (pgm_error != NULL); if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && ( // NB: cannot catch EAI_BADFLAGS. @@ -113,10 +113,10 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) goto err_abort; // Fatal OpenPGM internal error. - zmq_assert (false); + xs_assert (false); } - zmq_assert (res != NULL); + xs_assert (res != NULL); // Pick up detected IP family. sa_family = res->ai_send_addrs[0].gsr_group.ss_family; @@ -127,7 +127,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) &pgm_error)) { // Invalid parameters don't set pgm_error_t. - zmq_assert (pgm_error != NULL); + xs_assert (pgm_error != NULL); if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && ( pgm_error->code != PGM_ERROR_BADF && pgm_error->code != PGM_ERROR_FAULT && @@ -138,7 +138,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) goto err_abort; // Fatal OpenPGM internal error. - zmq_assert (false); + xs_assert (false); } // All options are of data type int @@ -155,7 +155,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) &pgm_error)) { // Invalid parameters don't set pgm_error_t. - zmq_assert (pgm_error != NULL); + xs_assert (pgm_error != NULL); if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && ( pgm_error->code != PGM_ERROR_BADF && pgm_error->code != PGM_ERROR_FAULT && @@ -166,7 +166,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) goto err_abort; // Fatal OpenPGM internal error. - zmq_assert (false); + xs_assert (false); } } @@ -280,7 +280,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) &if_req, sizeof (if_req), &pgm_error)) { // Invalid parameters don't set pgm_error_t. - zmq_assert (pgm_error != NULL); + xs_assert (pgm_error != NULL); if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET || pgm_error->domain == PGM_ERROR_DOMAIN_IF) && ( pgm_error->code != PGM_ERROR_INVAL && @@ -291,7 +291,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) goto err_abort; // Fatal OpenPGM internal error. - zmq_assert (false); + xs_assert (false); } // Join IP multicast groups. @@ -335,18 +335,18 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) if (!pgm_connect (sock, &pgm_error)) { // Invalid parameters don't set pgm_error_t. - zmq_assert (pgm_error != NULL); + xs_assert (pgm_error != NULL); goto err_abort; } // For receiver transport preallocate pgm_msgv array. if (receiver) { - zmq_assert (in_batch_size > 0); + xs_assert (in_batch_size > 0); size_t max_tsdu_size = get_max_tsdu_size (); pgm_msgv_len = (int) in_batch_size / max_tsdu_size; if ((int) in_batch_size % max_tsdu_size) pgm_msgv_len++; - zmq_assert (pgm_msgv_len); + xs_assert (pgm_msgv_len); pgm_msgv = (pgm_msgv_t*) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len); alloc_assert (pgm_msgv); @@ -371,7 +371,7 @@ err_abort: return -1; } -zmq::pgm_socket_t::~pgm_socket_t () +xs::pgm_socket_t::~pgm_socket_t () { if (pgm_msgv) free (pgm_msgv); @@ -381,26 +381,26 @@ zmq::pgm_socket_t::~pgm_socket_t () // Get receiver fds. receive_fd_ is signaled for incoming packets, // waiting_pipe_fd_ is signaled for state driven events and data. -void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_, +void xs::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_, fd_t *waiting_pipe_fd_) { socklen_t socklen; bool rc; - zmq_assert (receive_fd_); - zmq_assert (waiting_pipe_fd_); + xs_assert (receive_fd_); + xs_assert (waiting_pipe_fd_); socklen = sizeof (*receive_fd_); rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, &socklen); - zmq_assert (rc); - zmq_assert (socklen == sizeof (*receive_fd_)); + xs_assert (rc); + xs_assert (socklen == sizeof (*receive_fd_)); socklen = sizeof (*waiting_pipe_fd_); rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, waiting_pipe_fd_, &socklen); - zmq_assert (rc); - zmq_assert (socklen == sizeof (*waiting_pipe_fd_)); + xs_assert (rc); + xs_assert (socklen == sizeof (*waiting_pipe_fd_)); } // Get fds and store them into user allocated memory. @@ -408,44 +408,44 @@ void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_, // receive_fd_ is for incoming back-channel protocol packets. // rdata_notify_fd_ is raised for waiting repair transmissions. // pending_notify_fd_ is for state driven events. -void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_, +void xs::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_, fd_t *rdata_notify_fd_, fd_t *pending_notify_fd_) { socklen_t socklen; bool rc; - zmq_assert (send_fd_); - zmq_assert (receive_fd_); - zmq_assert (rdata_notify_fd_); - zmq_assert (pending_notify_fd_); + xs_assert (send_fd_); + xs_assert (receive_fd_); + xs_assert (rdata_notify_fd_); + xs_assert (pending_notify_fd_); socklen = sizeof (*send_fd_); rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_SEND_SOCK, send_fd_, &socklen); - zmq_assert (rc); - zmq_assert (socklen == sizeof (*receive_fd_)); + xs_assert (rc); + xs_assert (socklen == sizeof (*receive_fd_)); socklen = sizeof (*receive_fd_); rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, &socklen); - zmq_assert (rc); - zmq_assert (socklen == sizeof (*receive_fd_)); + xs_assert (rc); + xs_assert (socklen == sizeof (*receive_fd_)); socklen = sizeof (*rdata_notify_fd_); rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_REPAIR_SOCK, rdata_notify_fd_, &socklen); - zmq_assert (rc); - zmq_assert (socklen == sizeof (*rdata_notify_fd_)); + xs_assert (rc); + xs_assert (socklen == sizeof (*rdata_notify_fd_)); socklen = sizeof (*pending_notify_fd_); rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, pending_notify_fd_, &socklen); - zmq_assert (rc); - zmq_assert (socklen == sizeof (*pending_notify_fd_)); + xs_assert (rc); + xs_assert (socklen == sizeof (*pending_notify_fd_)); } // Send one APDU, transmit window owned memory. // data_len_ must be less than one TPDU. -size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) +size_t xs::pgm_socket_t::send (unsigned char *data_, size_t data_len_) { size_t nbytes = 0; @@ -453,10 +453,10 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) // We have to write all data as one packet. if (nbytes > 0) { - zmq_assert (status == PGM_IO_STATUS_NORMAL); - zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_); + xs_assert (status == PGM_IO_STATUS_NORMAL); + xs_assert ((ssize_t) nbytes == (ssize_t) data_len_); } else { - zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED || + xs_assert (status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK); if (status == PGM_IO_STATUS_RATE_LIMITED) @@ -471,7 +471,7 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) return nbytes; } -long zmq::pgm_socket_t::get_rx_timeout () +long xs::pgm_socket_t::get_rx_timeout () { if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED && last_rx_status != PGM_IO_STATUS_TIMER_PENDING) @@ -482,14 +482,14 @@ long zmq::pgm_socket_t::get_rx_timeout () const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, last_rx_status == PGM_IO_STATUS_RATE_LIMITED ? PGM_RATE_REMAIN : PGM_TIME_REMAIN, &tv, &optlen); - zmq_assert (rc); + xs_assert (rc); const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); return timeout; } -long zmq::pgm_socket_t::get_tx_timeout () +long xs::pgm_socket_t::get_tx_timeout () { if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED) return -1; @@ -498,7 +498,7 @@ long zmq::pgm_socket_t::get_tx_timeout () socklen_t optlen = sizeof (tv); const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen); - zmq_assert (rc); + xs_assert (rc); const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); @@ -506,20 +506,20 @@ long zmq::pgm_socket_t::get_tx_timeout () } // Return max TSDU size without fragmentation from current PGM transport. -size_t zmq::pgm_socket_t::get_max_tsdu_size () +size_t xs::pgm_socket_t::get_max_tsdu_size () { int max_tsdu = 0; socklen_t optlen = sizeof (max_tsdu); bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_MSS, &max_tsdu, &optlen); - zmq_assert (rc); - zmq_assert (optlen == sizeof (max_tsdu)); + xs_assert (rc); + xs_assert (optlen == sizeof (max_tsdu)); return (size_t) max_tsdu; } // pgm_recvmsgv is called to fill the pgm_msgv array up to pgm_msgv_len. // In subsequent calls data from pgm_msgv structure are returned. -ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) +ssize_t xs::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) { size_t raw_data_len = 0; @@ -540,9 +540,9 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) if (nbytes_rec == nbytes_processed) { // Check program flow. - zmq_assert (pgm_msgv_processed == 0); - zmq_assert (nbytes_processed == 0); - zmq_assert (nbytes_rec == 0); + xs_assert (pgm_msgv_processed == 0); + xs_assert (nbytes_processed == 0); + xs_assert (nbytes_rec == 0); // Receive a vector of Application Protocol Domain Unit's (APDUs) // from the transport. @@ -552,7 +552,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) pgm_msgv_len, MSG_ERRQUEUE, &nbytes_rec, &pgm_error); // Invalid parameters. - zmq_assert (status != PGM_IO_STATUS_ERROR); + xs_assert (status != PGM_IO_STATUS_ERROR); last_rx_status = status; @@ -560,7 +560,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING. if (status == PGM_IO_STATUS_TIMER_PENDING) { - zmq_assert (nbytes_rec == 0); + xs_assert (nbytes_rec == 0); // In case if no RDATA/ODATA caused POLLIN 0 is // returned. @@ -572,7 +572,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // Send SPMR, NAK, ACK is rate limited. if (status == PGM_IO_STATUS_RATE_LIMITED) { - zmq_assert (nbytes_rec == 0); + xs_assert (nbytes_rec == 0); // In case if no RDATA/ODATA caused POLLIN 0 is returned. nbytes_rec = 0; @@ -583,7 +583,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // No peers and hence no incoming packets. if (status == PGM_IO_STATUS_WOULD_BLOCK) { - zmq_assert (nbytes_rec == 0); + xs_assert (nbytes_rec == 0); // In case if no RDATA/ODATA caused POLLIN 0 is returned. nbytes_rec = 0; @@ -606,18 +606,18 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) return -1; } - zmq_assert (status == PGM_IO_STATUS_NORMAL); + xs_assert (status == PGM_IO_STATUS_NORMAL); } else { - zmq_assert (pgm_msgv_processed <= pgm_msgv_len); + xs_assert (pgm_msgv_processed <= pgm_msgv_len); } - // Zero byte payloads are valid in PGM, but not 0MQ protocol. - zmq_assert (nbytes_rec > 0); + // Zero byte payloads are valid in PGM, but not Crossroads protocol. + xs_assert (nbytes_rec > 0); // Only one APDU per pgm_msgv_t structure is allowed. - zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1); + xs_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1); struct pgm_sk_buff_t* skb = pgm_msgv [pgm_msgv_processed].msgv_skb [0]; @@ -631,13 +631,13 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // Move the the next pgm_msgv_t structure. pgm_msgv_processed++; - zmq_assert (pgm_msgv_processed <= pgm_msgv_len); + xs_assert (pgm_msgv_processed <= pgm_msgv_len); nbytes_processed +=raw_data_len; return raw_data_len; } -void zmq::pgm_socket_t::process_upstream () +void xs::pgm_socket_t::process_upstream () { pgm_msgv_t dummy_msg; @@ -648,10 +648,10 @@ void zmq::pgm_socket_t::process_upstream () 1, MSG_ERRQUEUE, &dummy_bytes, &pgm_error); // Invalid parameters. - zmq_assert (status != PGM_IO_STATUS_ERROR); + xs_assert (status != PGM_IO_STATUS_ERROR); // No data should be returned. - zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING || + xs_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING || status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK)); @@ -665,7 +665,7 @@ void zmq::pgm_socket_t::process_upstream () errno = EAGAIN; } -int zmq::pgm_socket_t::compute_sqns (int tpdu_) +int xs::pgm_socket_t::compute_sqns (int tpdu_) { // Convert rate into B/ms. uint64_t rate = uint64_t (options.rate) / 8; -- cgit v1.2.3