diff options
Diffstat (limited to '3rdparty/openpgm-svn-r1135/pgm/source.c')
-rw-r--r-- | 3rdparty/openpgm-svn-r1135/pgm/source.c | 2345 |
1 files changed, 2345 insertions, 0 deletions
diff --git a/3rdparty/openpgm-svn-r1135/pgm/source.c b/3rdparty/openpgm-svn-r1135/pgm/source.c new file mode 100644 index 0000000..bea0f33 --- /dev/null +++ b/3rdparty/openpgm-svn-r1135/pgm/source.c @@ -0,0 +1,2345 @@ +/* vim:ts=8:sts=8:sw=4:noai:noexpandtab + * + * PGM source socket. + * + * Copyright (c) 2006-2010 Miru Limited. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#define __STDC_FORMAT_MACROS +#ifdef _MSC_VER +# include <pgm/wininttypes.h> +#else +# include <inttypes.h> +#endif +#include <errno.h> +#include <impl/i18n.h> +#include <impl/framework.h> +#include <impl/socket.h> +#include <impl/source.h> +#include <impl/sqn_list.h> +#include <impl/packet_parse.h> +#include <impl/net.h> + + +//#define SOURCE_DEBUG + +#ifndef SOURCE_DEBUG +# define PGM_DISABLE_ASSERT +#endif + +#if !defined(ENOBUFS) && defined(WSAENOBUFS) +# define ENOBUFS WSAENOBUFS +#endif + + +/* locals */ +static inline bool peer_is_source (const pgm_peer_t*) PGM_GNUC_CONST; +static inline bool peer_is_peer (const pgm_peer_t*) PGM_GNUC_CONST; +static void reset_heartbeat_spm (pgm_sock_t*const, const pgm_time_t); +static bool send_ncf (pgm_sock_t*const restrict, const struct sockaddr*const restrict, const struct sockaddr*const restrict, const uint32_t, const bool); +static bool send_ncf_list (pgm_sock_t*const restrict, const struct sockaddr*const restrict, const struct sockaddr*const restrict, struct pgm_sqn_list_t*const restrict, const bool); +static int send_odata (pgm_sock_t*const restrict, struct pgm_sk_buff_t*const restrict, size_t*restrict); +static int send_odata_copy (pgm_sock_t*const restrict, const void*restrict, const uint16_t, size_t*restrict); +static int send_odatav (pgm_sock_t*const restrict, const struct pgm_iovec*const restrict, const unsigned, size_t*restrict); +static bool send_rdata (pgm_sock_t*restrict, struct pgm_sk_buff_t*restrict); + + +static inline +unsigned +_pgm_popcount ( + uint32_t n + ) +{ +#if (__GNUC__ > 3) || (__GNUC__ == 3 && __GNUC_MINOR__ >= 4) + return __builtin_popcount (n); +#else +/* MIT HAKMEM 169 */ + const uint32_t t = n - ((n >> 1) & 033333333333) + - ((n >> 2) & 011111111111); + return ((t + (t >> 3) & 030707070707)) % 63; +#endif +} + +static inline +bool +peer_is_source ( + const pgm_peer_t* peer + ) +{ + return (NULL == peer); +} + +static inline +bool +peer_is_peer ( + const pgm_peer_t* peer + ) +{ + return (NULL != peer); +} + +static inline +void +reset_spmr_timer ( + pgm_peer_t* const peer + ) +{ + peer->spmr_expiry = 0; +} + +static inline +size_t +source_max_tsdu ( + const pgm_sock_t* sock, + const bool can_fragment + ) +{ + size_t max_tsdu = can_fragment ? sock->max_tsdu_fragment : sock->max_tsdu; + if (sock->use_var_pktlen /* OPT_VAR_PKT_LEN */) + max_tsdu -= sizeof (uint16_t); + return max_tsdu; +} + +/* prototype of function to send pro-active parity NAKs. + */ +static +bool +pgm_schedule_proactive_nak ( + pgm_sock_t* sock, + uint32_t nak_tg_sqn /* transmission group (shifted) */ + ) +{ + pgm_return_val_if_fail (NULL != sock, FALSE); + const bool status = pgm_txw_retransmit_push (sock->window, + nak_tg_sqn | sock->rs_proactive_h, + TRUE /* is_parity */, + sock->tg_sqn_shift); + return status; +} + +/* a deferred request for RDATA, now processing in the timer thread, we check the transmit + * window to see if the packet exists and forward on, maintaining a lock until the queue is + * empty. + * + * returns TRUE on success, returns FALSE if operation would block. + */ + +bool +pgm_on_deferred_nak ( + pgm_sock_t* const sock + ) +{ +/* pre-conditions */ + pgm_assert (NULL != sock); + +/* We can flush queue and block all odata, or process one set, or process each + * sequence number individually. + */ + +/* parity packets are re-numbered across the transmission group with index h, sharing the space + * with the original packets. beyond the transmission group size (k), the PGM option OPT_PARITY_GRP + * provides the extra offset value. + */ + +/* peek from the retransmit queue so we can eliminate duplicate NAKs up until the repair packet + * has been retransmitted. + */ + pgm_spinlock_lock (&sock->txw_spinlock); + struct pgm_sk_buff_t* skb = pgm_txw_retransmit_try_peek (sock->window); + if (skb) { + skb = pgm_skb_get (skb); + pgm_spinlock_unlock (&sock->txw_spinlock); + if (!send_rdata (sock, skb)) { + pgm_free_skb (skb); + pgm_notify_send (&sock->rdata_notify); + return FALSE; + } + pgm_free_skb (skb); +/* now remove sequence number from retransmit queue, re-enabling NAK processing for this sequence number */ + pgm_txw_retransmit_remove_head (sock->window); + } else + pgm_spinlock_unlock (&sock->txw_spinlock); + return TRUE; +} + +/* SPMR indicates if multicast to cancel own SPMR, or unicast to send SPM. + * + * rate limited to 1/IHB_MIN per TSI (13.4). + * + * if SPMR was valid, returns TRUE, if invalid returns FALSE. + */ + +bool +pgm_on_spmr ( + pgm_sock_t* const restrict sock, + pgm_peer_t* const restrict peer, /* maybe NULL if socket is source */ + struct pgm_sk_buff_t* const restrict skb + ) +{ +/* pre-conditions */ + pgm_assert (NULL != sock); + pgm_assert (NULL != skb); + + pgm_debug ("pgm_on_spmr (sock:%p peer:%p skb:%p)", + (void*)sock, (void*)peer, (void*)skb); + + if (PGM_UNLIKELY(!pgm_verify_spmr (skb))) { + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Malformed SPMR rejected.")); + return FALSE; + } + + if (peer_is_source (peer)) { + const bool send_status = pgm_send_spm (sock, 0); + if (PGM_UNLIKELY(!send_status)) { + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Failed to send SPM on SPM-Request.")); + } + } else { + pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Suppressing SPMR due to peer multicast SPMR.")); + reset_spmr_timer (peer); + } + return TRUE; +} + +/* Process opt_pgmcc_feedback PGM option that ships attached to ACK or NAK. + * Contents use to elect best ACKer. + * + * returns TRUE if peer is the elected ACKer. + */ + +static +bool +on_opt_pgmcc_feedback ( + pgm_sock_t* const restrict sock, + const struct pgm_sk_buff_t* const restrict skb, + const struct pgm_opt_pgmcc_feedback* restrict opt_pgmcc_feedback + ) +{ +/* pre-conditions */ + pgm_assert (NULL != sock); + pgm_assert (NULL != skb); + pgm_assert (NULL != opt_pgmcc_feedback); + + const uint32_t opt_tstamp = ntohl (opt_pgmcc_feedback->opt_tstamp); + const uint16_t opt_loss_rate = ntohs (opt_pgmcc_feedback->opt_loss_rate); + + const uint32_t rtt = pgm_to_msecs (skb->tstamp) - opt_tstamp; + const uint64_t peer_loss = rtt * rtt * opt_loss_rate; + + struct sockaddr_storage peer_nla; + pgm_nla_to_sockaddr (&opt_pgmcc_feedback->opt_nla_afi, (struct sockaddr*)&peer_nla); + +/* ACKer elections */ + if (PGM_UNLIKELY(pgm_sockaddr_is_addr_unspecified ((const struct sockaddr*)&sock->acker_nla))) + { + pgm_trace (PGM_LOG_ROLE_CONGESTION_CONTROL,_("Elected first ACKer")); + memcpy (&sock->acker_nla, &peer_nla, pgm_sockaddr_storage_len (&peer_nla)); + } + else if (peer_loss > sock->acker_loss && + 0 != pgm_sockaddr_cmp ((const struct sockaddr*)&peer_nla, (const struct sockaddr*)&sock->acker_nla)) + { + pgm_trace (PGM_LOG_ROLE_CONGESTION_CONTROL,_("Elected new ACKer")); + memcpy (&sock->acker_nla, &peer_nla, pgm_sockaddr_storage_len (&peer_nla)); + } + +/* update ACKer state */ + if (0 == pgm_sockaddr_cmp ((const struct sockaddr*)&peer_nla, (const struct sockaddr*)&sock->acker_nla)) + { + sock->acker_loss = peer_loss; + return TRUE; + } + + return FALSE; +} + +/* NAK requesting RDATA transmission for a sending sock, only valid if + * sequence number(s) still in transmission window. + * + * we can potentially have different IP versions for the NAK packet to the send group. + * + * TODO: fix IPv6 AFIs + * + * take in a NAK and pass off to an asynchronous queue for another thread to process + * + * if NAK is valid, returns TRUE. on error, FALSE is returned. + */ + +bool +pgm_on_nak ( + pgm_sock_t* const restrict sock, + struct pgm_sk_buff_t* const restrict skb + ) +{ +/* pre-conditions */ + pgm_assert (NULL != sock); + pgm_assert (NULL != skb); + + pgm_debug ("pgm_on_nak (sock:%p skb:%p)", + (const void*)sock, (const void*)skb); + + const bool is_parity = skb->pgm_header->pgm_options & PGM_OPT_PARITY; + if (is_parity) { + sock->cumulative_stats[PGM_PC_SOURCE_PARITY_NAKS_RECEIVED]++; + if (!sock->use_ondemand_parity) { + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Parity NAK rejected as on-demand parity is not enabled.")); + sock->cumulative_stats[PGM_PC_SOURCE_MALFORMED_NAKS]++; + return FALSE; + } + } else + sock->cumulative_stats[PGM_PC_SOURCE_SELECTIVE_NAKS_RECEIVED]++; + + if (PGM_UNLIKELY(!pgm_verify_nak (skb))) { + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Malformed NAK rejected.")); + sock->cumulative_stats[PGM_PC_SOURCE_MALFORMED_NAKS]++; + return FALSE; + } + + const struct pgm_nak* nak = (struct pgm_nak*) skb->data; + const struct pgm_nak6* nak6 = (struct pgm_nak6*)skb->data; + +/* NAK_SRC_NLA contains our sock unicast NLA */ + struct sockaddr_storage nak_src_nla; + pgm_nla_to_sockaddr (&nak->nak_src_nla_afi, (struct sockaddr*)&nak_src_nla); + if (PGM_UNLIKELY(pgm_sockaddr_cmp ((struct sockaddr*)&nak_src_nla, (struct sockaddr*)&sock->send_addr) != 0)) + { + char saddr[INET6_ADDRSTRLEN]; + pgm_sockaddr_ntop ((struct sockaddr*)&nak_src_nla, saddr, sizeof(saddr)); + pgm_trace (PGM_LOG_ROLE_NETWORK,_("NAK rejected for unmatched NLA: %s"), saddr); + sock->cumulative_stats[PGM_PC_SOURCE_MALFORMED_NAKS]++; + return FALSE; + } + +/* NAK_GRP_NLA containers our sock multicast group */ + struct sockaddr_storage nak_grp_nla; + pgm_nla_to_sockaddr ((AF_INET6 == nak_src_nla.ss_family) ? &nak6->nak6_grp_nla_afi : &nak->nak_grp_nla_afi, (struct sockaddr*)&nak_grp_nla); + if (PGM_UNLIKELY(pgm_sockaddr_cmp ((struct sockaddr*)&nak_grp_nla, (struct sockaddr*)&sock->send_gsr.gsr_group) != 0)) + { + char sgroup[INET6_ADDRSTRLEN]; + pgm_sockaddr_ntop ((struct sockaddr*)&nak_src_nla, sgroup, sizeof(sgroup)); + pgm_trace (PGM_LOG_ROLE_NETWORK,_("NAK rejected as targeted for different multicast group: %s"), sgroup); + sock->cumulative_stats[PGM_PC_SOURCE_MALFORMED_NAKS]++; + return FALSE; + } + +/* create queue object */ + struct pgm_sqn_list_t sqn_list; + sqn_list.sqn[0] = ntohl (nak->nak_sqn); + sqn_list.len = 1; + + pgm_debug ("nak_sqn %" PRIu32, sqn_list.sqn[0]); + +/* check NAK list */ + const uint32_t* nak_list = NULL; + uint_fast8_t nak_list_len = 0; + if (skb->pgm_header->pgm_options & PGM_OPT_PRESENT) + { + const struct pgm_opt_length* opt_len = (AF_INET6 == nak_src_nla.ss_family) ? + (const struct pgm_opt_length*)(nak6 + 1) : + (const struct pgm_opt_length*)(nak + 1); + if (PGM_UNLIKELY(opt_len->opt_type != PGM_OPT_LENGTH)) { + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Malformed NAK rejected.")); + sock->cumulative_stats[PGM_PC_SOURCE_MALFORMED_NAKS]++; + return FALSE; + } + if (PGM_UNLIKELY(opt_len->opt_length != sizeof(struct pgm_opt_length))) { + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Malformed NAK rejected.")); + sock->cumulative_stats[PGM_PC_SOURCE_MALFORMED_NAKS]++; + return FALSE; + } +/* TODO: check for > 16 options & past packet end */ + const struct pgm_opt_header* opt_header = (const struct pgm_opt_header*)opt_len; + do { + opt_header = (const struct pgm_opt_header*)((const char*)opt_header + opt_header->opt_length); + if ((opt_header->opt_type & PGM_OPT_MASK) == PGM_OPT_NAK_LIST) { + nak_list = ((const struct pgm_opt_nak_list*)(opt_header + 1))->opt_sqn; + nak_list_len = ( opt_header->opt_length - sizeof(struct pgm_opt_header) - sizeof(uint8_t) ) / sizeof(uint32_t); + break; + } + } while (!(opt_header->opt_type & PGM_OPT_END)); + } + +/* nak list numbers */ + if (PGM_UNLIKELY(nak_list_len > 63)) { + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Malformed NAK rejected on too long sequence list.")); + return FALSE; + } + + for (uint_fast8_t i = 0; i < nak_list_len; i++) + { + sqn_list.sqn[sqn_list.len++] = ntohl (*nak_list); + nak_list++; + } + +/* send NAK confirm packet immediately, then defer to timer thread for a.s.a.p + * delivery of the actual RDATA packets. blocking send for NCF is ignored as RDATA + * broadcast will be sent later. + */ + if (nak_list_len) + send_ncf_list (sock, (struct sockaddr*)&nak_src_nla, (struct sockaddr*)&nak_grp_nla, &sqn_list, is_parity); + else + send_ncf (sock, (struct sockaddr*)&nak_src_nla, (struct sockaddr*)&nak_grp_nla, sqn_list.sqn[0], is_parity); + +/* queue retransmit requests */ + for (uint_fast8_t i = 0; i < sqn_list.len; i++) { + const bool push_status = pgm_txw_retransmit_push (sock->window, sqn_list.sqn[i], is_parity, sock->tg_sqn_shift); + if (PGM_UNLIKELY(!push_status)) { + pgm_trace (PGM_LOG_ROLE_TX_WINDOW,_("Failed to push retransmit request for #%" PRIu32), sqn_list.sqn[i]); + } + } + return TRUE; +} + +/* Null-NAK, or N-NAK propogated by a DLR for hand waving excitement + * + * if NNAK is valid, returns TRUE. on error, FALSE is returned. + */ + +bool +pgm_on_nnak ( + pgm_sock_t* const restrict sock, + struct pgm_sk_buff_t* const restrict skb + ) +{ +/* pre-conditions */ + pgm_assert (NULL != sock); + pgm_assert (NULL != skb); + + pgm_debug ("pgm_on_nnak (sock:%p skb:%p)", + (void*)sock, (void*)skb); + + sock->cumulative_stats[PGM_PC_SOURCE_SELECTIVE_NNAK_PACKETS_RECEIVED]++; + + if (PGM_UNLIKELY(!pgm_verify_nnak (skb))) { + sock->cumulative_stats[PGM_PC_SOURCE_NNAK_ERRORS]++; + return FALSE; + } + + const struct pgm_nak* nnak = (struct pgm_nak*) skb->data; + const struct pgm_nak6* nnak6 = (struct pgm_nak6*)skb->data; + +/* NAK_SRC_NLA contains our sock unicast NLA */ + struct sockaddr_storage nnak_src_nla; + pgm_nla_to_sockaddr (&nnak->nak_src_nla_afi, (struct sockaddr*)&nnak_src_nla); + + if (PGM_UNLIKELY(pgm_sockaddr_cmp ((struct sockaddr*)&nnak_src_nla, (struct sockaddr*)&sock->send_addr) != 0)) + { + sock->cumulative_stats[PGM_PC_SOURCE_NNAK_ERRORS]++; + return FALSE; + } + +/* NAK_GRP_NLA containers our sock multicast group */ + struct sockaddr_storage nnak_grp_nla; + pgm_nla_to_sockaddr ((AF_INET6 == nnak_src_nla.ss_family) ? &nnak6->nak6_grp_nla_afi : &nnak->nak_grp_nla_afi, (struct sockaddr*)&nnak_grp_nla); + if (PGM_UNLIKELY(pgm_sockaddr_cmp ((struct sockaddr*)&nnak_grp_nla, (struct sockaddr*)&sock->send_gsr.gsr_group) != 0)) + { + sock->cumulative_stats[PGM_PC_SOURCE_NNAK_ERRORS]++; + return FALSE; + } + +/* check NNAK list */ + uint_fast8_t nnak_list_len = 0; + if (skb->pgm_header->pgm_options & PGM_OPT_PRESENT) + { + const struct pgm_opt_length* opt_len = (AF_INET6 == nnak_src_nla.ss_family) ? + (const struct pgm_opt_length*)(nnak6 + 1) : + (const struct pgm_opt_length*)(nnak + 1); + if (PGM_UNLIKELY(opt_len->opt_type != PGM_OPT_LENGTH)) { + sock->cumulative_stats[PGM_PC_SOURCE_NNAK_ERRORS]++; + return FALSE; + } + if (PGM_UNLIKELY(opt_len->opt_length != sizeof(struct pgm_opt_length))) { + sock->cumulative_stats[PGM_PC_SOURCE_NNAK_ERRORS]++; + return FALSE; + } +/* TODO: check for > 16 options & past packet end */ + const struct pgm_opt_header* opt_header = (const struct pgm_opt_header*)opt_len; + do { + opt_header = (const struct pgm_opt_header*)((const char*)opt_header + opt_header->opt_length); + if ((opt_header->opt_type & PGM_OPT_MASK) == PGM_OPT_NAK_LIST) { + nnak_list_len = ( opt_header->opt_length - sizeof(struct pgm_opt_header) - sizeof(uint8_t) ) / sizeof(uint32_t); + break; + } + } while (!(opt_header->opt_type & PGM_OPT_END)); + } + + sock->cumulative_stats[PGM_PC_SOURCE_SELECTIVE_NNAKS_RECEIVED] += 1 + nnak_list_len; + return TRUE; +} + +/* ACK, sent upstream by one selected ACKER for congestion control feedback. + * + * if ACK is valid, returns TRUE. on error, FALSE is returned. + */ + +bool +pgm_on_ack ( + pgm_sock_t* const restrict sock, + struct pgm_sk_buff_t* const restrict skb + ) +{ +/* pre-conditions */ + pgm_assert (NULL != sock); + pgm_assert (NULL != skb); + + pgm_debug ("pgm_on_ack (sock:%p skb:%p)", + (const void*)sock, (const void*)skb); + + sock->cumulative_stats[PGM_PC_SOURCE_ACK_PACKETS_RECEIVED]++; + + if (PGM_UNLIKELY(!pgm_verify_ack (skb))) { + sock->cumulative_stats[PGM_PC_SOURCE_ACK_ERRORS]++; + return FALSE; + } + + if (!sock->use_pgmcc) + return FALSE; + + const struct pgm_ack* ack = (struct pgm_ack*)skb->data; + bool is_acker = FALSE; + +/* check PGMCC feedback option for new elections */ + if (skb->pgm_header->pgm_options & PGM_OPT_PRESENT) + { + const struct pgm_opt_length* opt_len = (const struct pgm_opt_length*)(ack + 1); + if (PGM_UNLIKELY(opt_len->opt_type != PGM_OPT_LENGTH)) { + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Malformed ACK rejected.")); + return FALSE; + } + if (PGM_UNLIKELY(opt_len->opt_length != sizeof(struct pgm_opt_length))) { + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Malformed ACK rejected.")); + return FALSE; + } + const struct pgm_opt_header* opt_header = (const struct pgm_opt_header*)opt_len; + do { + opt_header = (const struct pgm_opt_header*)((const char*)opt_header + opt_header->opt_length); + if ((opt_header->opt_type & PGM_OPT_MASK) == PGM_OPT_PGMCC_FEEDBACK) { + const struct pgm_opt_pgmcc_feedback* opt_pgmcc_feedback = (const struct pgm_opt_pgmcc_feedback*)(opt_header + 1); + is_acker = on_opt_pgmcc_feedback (sock, skb, opt_pgmcc_feedback); + break; /* ignore other options */ + } + } while (!(opt_header->opt_type & PGM_OPT_END)); + } + +/* ignore ACKs from other receivers or sessions */ + if (!is_acker) + return TRUE; + +/* reset ACK expiration */ + sock->next_crqst = 0; + +/* count new ACK sequences */ + const uint32_t ack_rx_max = ntohl (ack->ack_rx_max); + const int32_t delta = ack_rx_max - sock->ack_rx_max; +/* ignore older ACKs when multiple active ACKers */ + if (pgm_uint32_gt (ack_rx_max, sock->ack_rx_max)) + sock->ack_rx_max = ack_rx_max; + uint32_t ack_bitmap = ntohl (ack->ack_bitmap); + if (delta > 32) sock->ack_bitmap = 0; /* sequence jump ahead beyond past bitmap */ + else if (delta > 0) sock->ack_bitmap <<= delta; /* immediate sequence */ + else if (delta > -32) ack_bitmap <<= -delta; /* repair sequence scoped by bitmap */ + else ack_bitmap = 0; /* old sequence */ + unsigned new_acks = _pgm_popcount (ack_bitmap & ~sock->ack_bitmap); + sock->ack_bitmap |= ack_bitmap; + + if (0 == new_acks) + return TRUE; + + const bool is_congestion_limited = (sock->tokens < pgm_fp8 (1)); + +/* after loss detection cancel any further manipulation of the window + * until feedback is received for the next transmitted packet. + */ + if (sock->is_congested) + { + if (pgm_uint32_lte (ack_rx_max, sock->suspended_sqn)) + { + pgm_trace (PGM_LOG_ROLE_CONGESTION_CONTROL,_("PGMCC window token manipulation suspended due to congestion (T:%u W:%u)"), + pgm_fp8tou (sock->tokens), pgm_fp8tou (sock->cwnd_size)); + const uint_fast32_t token_inc = pgm_fp8mul (pgm_fp8 (new_acks), pgm_fp8 (1) + pgm_fp8div (pgm_fp8 (1), sock->cwnd_size)); + sock->tokens = MIN( sock->tokens + token_inc, sock->cwnd_size ); + goto notify_tx; + } + sock->is_congested = FALSE; + } + +/* count outstanding lost sequences */ + const unsigned total_lost = _pgm_popcount (~sock->ack_bitmap); + +/* no detected data loss at ACKer, increase congestion window size */ + if (0 == total_lost) + { + new_acks += sock->acks_after_loss; + sock->acks_after_loss = 0; + uint_fast32_t n = pgm_fp8 (new_acks); + uint_fast32_t token_inc = 0; + +/* slow-start phase, exponential increase to SSTHRESH */ + if (sock->cwnd_size < sock->ssthresh) { + const uint_fast32_t d = MIN( n, sock->ssthresh - sock->cwnd_size ); + n -= d; + token_inc = d + d; + sock->cwnd_size += d; + } + + const uint_fast32_t iw = pgm_fp8div (pgm_fp8 (1), sock->cwnd_size); + +/* linear window increase */ + token_inc += pgm_fp8mul (n, pgm_fp8 (1) + iw); + sock->cwnd_size += pgm_fp8mul (n, iw); + sock->tokens = MIN( sock->tokens + token_inc, sock->cwnd_size ); +// pgm_trace (PGM_LOG_ROLE_CONGESTION_CONTROL,_("PGMCC++ (T:%u W:%u)"), +// pgm_fp8tou (sock->tokens), pgm_fp8tou (sock->cwnd_size)); + } + else + { +/* Look for an unacknowledged data packet which is followed by at least three + * acknowledged data packets, then the packet is assumed to be lost and PGMCC + * reacts by halving the window. + * + * Common value will be 0xfffffff7. + */ + sock->acks_after_loss += new_acks; + if (sock->acks_after_loss >= 3) + { + sock->acks_after_loss = 0; + sock->suspended_sqn = ack_rx_max; + sock->is_congested = TRUE; + sock->cwnd_size = pgm_fp8div (sock->cwnd_size, pgm_fp8 (2)); + if (sock->cwnd_size > sock->tokens) + sock->tokens = 0; + else + sock->tokens -= sock->cwnd_size; + sock->ack_bitmap = 0xffffffff; + pgm_trace (PGM_LOG_ROLE_CONGESTION_CONTROL,_("PGMCC congestion, half window size (T:%u W:%u)"), + pgm_fp8tou (sock->tokens), pgm_fp8tou (sock->cwnd_size)); + } + } + +/* token is now available so notify tx thread that transmission time is available */ +notify_tx: + if (is_congestion_limited && + sock->tokens >= pgm_fp8 (1)) + { + pgm_notify_send (&sock->ack_notify); + } + return TRUE; +} + +/* ambient/heartbeat SPM's + * + * heartbeat: ihb_tmr decaying between ihb_min and ihb_max 2x after last packet + * + * on success, TRUE is returned, if operation would block, FALSE is returned. + */ + +bool +pgm_send_spm ( + pgm_sock_t* const sock, + const int flags + ) +{ +/* pre-conditions */ + pgm_assert (NULL != sock); + pgm_assert (NULL != sock->window); + + pgm_debug ("pgm_send_spm (sock:%p flags:%d)", + (const void*)sock, flags); + + size_t tpdu_length = sizeof(struct pgm_header); + if (AF_INET == sock->send_gsr.gsr_group.ss_family) + tpdu_length += sizeof(struct pgm_spm); + else + tpdu_length += sizeof(struct pgm_spm6); + if (sock->use_proactive_parity || + sock->use_ondemand_parity || + sock->is_pending_crqst || + PGM_OPT_FIN == flags) + { + tpdu_length += sizeof(struct pgm_opt_length); +/* forward error correction */ + if (sock->use_proactive_parity || + sock->use_ondemand_parity) + tpdu_length += sizeof(struct pgm_opt_header) + + sizeof(struct pgm_opt_parity_prm); +/* congestion report request */ + if (sock->is_pending_crqst) + tpdu_length += sizeof(struct pgm_opt_header) + + sizeof(struct pgm_opt_crqst); +/* end of session */ + if (PGM_OPT_FIN == flags) + tpdu_length += sizeof(struct pgm_opt_header) + + sizeof(struct pgm_opt_fin); + } + char buf[ tpdu_length ]; + if (PGM_UNLIKELY(pgm_mem_gc_friendly)) + memset (buf, 0, tpdu_length); + struct pgm_header* header = (struct pgm_header*)buf; + struct pgm_spm* spm = (struct pgm_spm *)(header + 1); + struct pgm_spm6* spm6 = (struct pgm_spm6*)(header + 1); + memcpy (header->pgm_gsi, &sock->tsi.gsi, sizeof(pgm_gsi_t)); + header->pgm_sport = sock->tsi.sport; + header->pgm_dport = sock->dport; + header->pgm_type = PGM_SPM; + header->pgm_options = 0; + header->pgm_tsdu_length = 0; + +/* SPM */ + spm->spm_sqn = htonl (sock->spm_sqn); + spm->spm_trail = htonl (pgm_txw_trail_atomic (sock->window)); + spm->spm_lead = htonl (pgm_txw_lead_atomic (sock->window)); + spm->spm_reserved = 0; +/* our nla */ + pgm_sockaddr_to_nla ((struct sockaddr*)&sock->send_addr, (char*)&spm->spm_nla_afi); + +/* PGM options */ + if (sock->use_proactive_parity || + sock->use_ondemand_parity || + sock->is_pending_crqst || + PGM_OPT_FIN == flags) + { + struct pgm_opt_length* opt_len; + struct pgm_opt_header *opt_header, *last_opt_header; + uint16_t opt_total_length; + + if (AF_INET == sock->send_gsr.gsr_group.ss_family) + opt_header = (struct pgm_opt_header*)(spm + 1); + else + opt_header = (struct pgm_opt_header*)(spm6 + 1); + header->pgm_options |= PGM_OPT_PRESENT; + opt_len = (struct pgm_opt_length*)opt_header; + opt_len->opt_type = PGM_OPT_LENGTH; + opt_len->opt_length = sizeof(struct pgm_opt_length); + opt_total_length = sizeof(struct pgm_opt_length); + last_opt_header = opt_header = (struct pgm_opt_header*)(opt_len + 1); + +/* OPT_PARITY_PRM */ + if (sock->use_proactive_parity || + sock->use_ondemand_parity) + { + header->pgm_options |= PGM_OPT_NETWORK; + opt_total_length += sizeof(struct pgm_opt_header) + + sizeof(struct pgm_opt_parity_prm); + opt_header->opt_type = PGM_OPT_PARITY_PRM; + opt_header->opt_length = sizeof(struct pgm_opt_header) + sizeof(struct pgm_opt_parity_prm); + struct pgm_opt_parity_prm* opt_parity_prm = (struct pgm_opt_parity_prm*)(opt_header + 1); + opt_parity_prm->opt_reserved = (sock->use_proactive_parity ? PGM_PARITY_PRM_PRO : 0) | + (sock->use_ondemand_parity ? PGM_PARITY_PRM_OND : 0); + opt_parity_prm->parity_prm_tgs = htonl (sock->rs_k); + last_opt_header = opt_header; + opt_header = (struct pgm_opt_header*)(opt_parity_prm + 1); + } + +/* OPT_CRQST */ + if (sock->is_pending_crqst) + { + header->pgm_options |= PGM_OPT_NETWORK; + opt_total_length += sizeof(struct pgm_opt_header) + + sizeof(struct pgm_opt_crqst); + opt_header->opt_type = PGM_OPT_CRQST; + opt_header->opt_length = sizeof(struct pgm_opt_header) + sizeof(struct pgm_opt_crqst); + struct pgm_opt_crqst* opt_crqst = (struct pgm_opt_crqst*)(opt_header + 1); +/* request receiver worst path report, OPT_CR_RX_WP */ + opt_crqst->opt_reserved = PGM_OPT_CRQST_RXP; + sock->is_pending_crqst = FALSE; + last_opt_header = opt_header; + opt_header = (struct pgm_opt_header*)(opt_crqst + 1); + } + +/* OPT_FIN */ + if (PGM_OPT_FIN == flags) + { + opt_total_length += sizeof(struct pgm_opt_header) + + sizeof(struct pgm_opt_fin); + opt_header->opt_type = PGM_OPT_FIN; + opt_header->opt_length = sizeof(struct pgm_opt_header) + sizeof(struct pgm_opt_fin); + struct pgm_opt_fin* opt_fin = (struct pgm_opt_fin*)(opt_header + 1); + opt_fin->opt_reserved = 0; + last_opt_header = opt_header; + opt_header = (struct pgm_opt_header*)(opt_fin + 1); + } + + last_opt_header->opt_type |= PGM_OPT_END; + opt_len->opt_total_length = htons (opt_total_length); + } + +/* checksum optional for SPMs */ + header->pgm_checksum = 0; + header->pgm_checksum = pgm_csum_fold (pgm_csum_partial (buf, tpdu_length, 0)); + + const ssize_t sent = pgm_sendto (sock, + flags != PGM_OPT_SYN && sock->is_controlled_spm, /* rate limited */ + TRUE, /* with router alert */ + buf, + tpdu_length, + (struct sockaddr*)&sock->send_gsr.gsr_group, + pgm_sockaddr_len((struct sockaddr*)&sock->send_gsr.gsr_group)); + if (sent < 0 && (EAGAIN == errno || ENOBUFS == errno)) { + sock->blocklen = tpdu_length; + return FALSE; + } +/* advance SPM sequence only on successful transmission */ + sock->spm_sqn++; + pgm_atomic_add32 (&sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT], tpdu_length); + return TRUE; +} + +/* send a NAK confirm (NCF) message with provided sequence number list. + * + * on success, TRUE is returned, returns FALSE if operation would block. + */ + +static +bool +send_ncf ( + pgm_sock_t* const restrict sock, + const struct sockaddr* const restrict nak_src_nla, + const struct sockaddr* const restrict nak_grp_nla, + const uint32_t sequence, + const bool is_parity /* send parity NCF */ + ) +{ +/* pre-conditions */ + pgm_assert (NULL != sock); + pgm_assert (NULL != nak_src_nla); + pgm_assert (NULL != nak_grp_nla); + pgm_assert (nak_src_nla->sa_family == nak_grp_nla->sa_family); + +#ifdef SOURCE_DEBUG + char saddr[INET6_ADDRSTRLEN], gaddr[INET6_ADDRSTRLEN]; + pgm_sockaddr_ntop (nak_src_nla, saddr, sizeof(saddr)); + pgm_sockaddr_ntop (nak_grp_nla, gaddr, sizeof(gaddr)); + pgm_debug ("send_ncf (sock:%p nak-src-nla:%s nak-grp-nla:%s sequence:%" PRIu32" is-parity:%s)", + (void*)sock, + saddr, + gaddr, + sequence, + is_parity ? "TRUE": "FALSE" + ); +#endif + + size_t tpdu_length = sizeof(struct pgm_header); + tpdu_length += (AF_INET == nak_src_nla->sa_family) ? sizeof(struct pgm_nak) : sizeof(struct pgm_nak6); + char buf[ tpdu_length ]; + struct pgm_header* header = (struct pgm_header*)buf; + struct pgm_nak* ncf = (struct pgm_nak *)(header + 1); + struct pgm_nak6* ncf6 = (struct pgm_nak6*)(header + 1); + memcpy (header->pgm_gsi, &sock->tsi.gsi, sizeof(pgm_gsi_t)); + header->pgm_sport = sock->tsi.sport; + header->pgm_dport = sock->dport; + header->pgm_type = PGM_NCF; + header->pgm_options = is_parity ? PGM_OPT_PARITY : 0; + header->pgm_tsdu_length = 0; + +/* NCF */ + ncf->nak_sqn = htonl (sequence); + +/* source nla */ + pgm_sockaddr_to_nla (nak_src_nla, (char*)&ncf->nak_src_nla_afi); + +/* group nla */ + pgm_sockaddr_to_nla (nak_grp_nla, (AF_INET6 == nak_src_nla->sa_family) ? (char*)&ncf6->nak6_grp_nla_afi : (char*)&ncf->nak_grp_nla_afi ); + header->pgm_checksum = 0; + header->pgm_checksum = pgm_csum_fold (pgm_csum_partial (buf, tpdu_length, 0)); + + const ssize_t sent = pgm_sendto (sock, + FALSE, /* not rate limited */ + TRUE, /* with router alert */ + buf, + tpdu_length, + (struct sockaddr*)&sock->send_gsr.gsr_group, + pgm_sockaddr_len((struct sockaddr*)&sock->send_gsr.gsr_group)); + if (sent < 0 && (EAGAIN == errno || ENOBUFS == errno)) + return FALSE; + pgm_atomic_add32 (&sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT], tpdu_length); + return TRUE; +} + +/* A NCF packet with a OPT_NAK_LIST option extension + * + * on success, TRUE is returned. on error, FALSE is returned. + */ + +static +bool +send_ncf_list ( + pgm_sock_t* const restrict sock, + const struct sockaddr* const restrict nak_src_nla, + const struct sockaddr* const restrict nak_grp_nla, + struct pgm_sqn_list_t* const restrict sqn_list, /* will change to network-order */ + const bool is_parity /* send parity NCF */ + ) +{ +/* pre-conditions */ + pgm_assert (NULL != sock); + pgm_assert (NULL != nak_src_nla); + pgm_assert (NULL != nak_grp_nla); + pgm_assert (sqn_list->len > 1); + pgm_assert (sqn_list->len <= 63); + pgm_assert (nak_src_nla->sa_family == nak_grp_nla->sa_family); + +#ifdef SOURCE_DEBUG + char saddr[INET6_ADDRSTRLEN], gaddr[INET6_ADDRSTRLEN]; + char list[1024]; + pgm_sockaddr_ntop (nak_src_nla, saddr, sizeof(saddr)); + pgm_sockaddr_ntop (nak_grp_nla, gaddr, sizeof(gaddr)); + sprintf (list, "%" PRIu32, sqn_list->sqn[0]); + for (uint_fast8_t i = 1; i < sqn_list->len; i++) { + char sequence[ 2 + strlen("4294967295") ]; + sprintf (sequence, " %" PRIu32, sqn_list->sqn[i]); + strcat (list, sequence); + } + pgm_debug ("send_ncf_list (sock:%p nak-src-nla:%s nak-grp-nla:%s sqn-list:[%s] is-parity:%s)", + (void*)sock, + saddr, + gaddr, + list, + is_parity ? "TRUE": "FALSE" + ); +#endif + + size_t tpdu_length = sizeof(struct pgm_header) + + sizeof(struct pgm_opt_length) + /* includes header */ + sizeof(struct pgm_opt_header) + sizeof(struct pgm_opt_nak_list) + + ( (sqn_list->len-1) * sizeof(uint32_t) ); + tpdu_length += (AF_INET == nak_src_nla->sa_family) ? sizeof(struct pgm_nak) : sizeof(struct pgm_nak6); + char buf[ tpdu_length ]; + struct pgm_header* header = (struct pgm_header*)buf; + struct pgm_nak* ncf = (struct pgm_nak *)(header + 1); + struct pgm_nak6* ncf6 = (struct pgm_nak6*)(header + 1); + memcpy (header->pgm_gsi, &sock->tsi.gsi, sizeof(pgm_gsi_t)); + header->pgm_sport = sock->tsi.sport; + header->pgm_dport = sock->dport; + header->pgm_type = PGM_NCF; + header->pgm_options = is_parity ? (PGM_OPT_PRESENT | PGM_OPT_NETWORK | PGM_OPT_PARITY) : (PGM_OPT_PRESENT | PGM_OPT_NETWORK); + header->pgm_tsdu_length = 0; +/* NCF */ + ncf->nak_sqn = htonl (sqn_list->sqn[0]); + +/* source nla */ + pgm_sockaddr_to_nla (nak_src_nla, (char*)&ncf->nak_src_nla_afi); + +/* group nla */ + pgm_sockaddr_to_nla (nak_grp_nla, (AF_INET6 == nak_src_nla->sa_family) ? (char*)&ncf6->nak6_grp_nla_afi : (char*)&ncf->nak_grp_nla_afi ); + +/* OPT_NAK_LIST */ + struct pgm_opt_length* opt_len = (AF_INET6 == nak_src_nla->sa_family) ? (struct pgm_opt_length*)(ncf6 + 1) : (struct pgm_opt_length*)(ncf + 1); + opt_len->opt_type = PGM_OPT_LENGTH; + opt_len->opt_length = sizeof(struct pgm_opt_length); + opt_len->opt_total_length = htons ( sizeof(struct pgm_opt_length) + + sizeof(struct pgm_opt_header) + + sizeof(struct pgm_opt_nak_list) + + ( (sqn_list->len-1) * sizeof(uint32_t) ) ); + struct pgm_opt_header* opt_header = (struct pgm_opt_header*)(opt_len + 1); + opt_header->opt_type = PGM_OPT_NAK_LIST | PGM_OPT_END; + opt_header->opt_length = sizeof(struct pgm_opt_header) + + sizeof(struct pgm_opt_nak_list) + + ( (sqn_list->len-1) * sizeof(uint32_t) ); + struct pgm_opt_nak_list* opt_nak_list = (struct pgm_opt_nak_list*)(opt_header + 1); + opt_nak_list->opt_reserved = 0; +/* to network-order */ + for (uint_fast8_t i = 1; i < sqn_list->len; i++) + opt_nak_list->opt_sqn[i-1] = htonl (sqn_list->sqn[i]); + + header->pgm_checksum = 0; + header->pgm_checksum = pgm_csum_fold (pgm_csum_partial (buf, tpdu_length, 0)); + + const ssize_t sent = pgm_sendto (sock, + FALSE, /* not rate limited */ + TRUE, /* with router alert */ + buf, + tpdu_length, + (struct sockaddr*)&sock->send_gsr.gsr_group, + pgm_sockaddr_len((struct sockaddr*)&sock->send_gsr.gsr_group)); + if (sent < 0 && (EAGAIN == errno || ENOBUFS == errno)) + return FALSE; + pgm_atomic_add32 (&sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT], tpdu_length); + return TRUE; +} + +/* cancel any pending heartbeat SPM and schedule a new one + */ + +static +void +reset_heartbeat_spm ( + pgm_sock_t*const sock, + const pgm_time_t now + ) +{ + pgm_mutex_lock (&sock->timer_mutex); + const pgm_time_t next_poll = sock->next_poll; + const pgm_time_t spm_heartbeat_interval = sock->spm_heartbeat_interval[ sock->spm_heartbeat_state = 1 ]; + sock->next_heartbeat_spm = now + spm_heartbeat_interval; + if (pgm_time_after( next_poll, sock->next_heartbeat_spm )) + { + sock->next_poll = sock->next_heartbeat_spm; + if (!sock->is_pending_read) { + pgm_notify_send (&sock->pending_notify); + sock->is_pending_read = TRUE; + } + } + pgm_mutex_unlock (&sock->timer_mutex); +} + +/* state helper for resuming sends + */ +#define STATE(x) (sock->pkt_dontwait_state.x) + +/* send one PGM data packet, transmit window owned memory. + * + * On success, returns PGM_IO_STATUS_NORMAL and the number of data bytes pushed + * into the transmit window and attempted to send to the socket layer is saved + * into bytes_written. On non-blocking sockets, PGM_IO_STATUS_WOULD_BLOCK is + * returned if the send would block. PGM_IO_STATUS_RATE_LIMITED is returned if + * the packet sizes would exceed the current rate limit. + * + * ! always returns successful if data is pushed into the transmit window, even if + * sendto() double fails ¡ we don't want the application to try again as that is the + * reliable socks role. + */ + +static +int +send_odata ( + pgm_sock_t* const restrict sock, + struct pgm_sk_buff_t* const restrict skb, + size_t* restrict bytes_written + ) +{ +/* pre-conditions */ + pgm_assert (NULL != sock); + pgm_assert (NULL != skb); + pgm_assert (skb->len <= sock->max_tsdu); + + pgm_debug ("send_odata (sock:%p skb:%p bytes-written:%p)", + (void*)sock, (void*)skb, (void*)bytes_written); + + const uint16_t tsdu_length = skb->len; + const sa_family_t pgmcc_family = sock->use_pgmcc ? sock->family : 0; + const size_t tpdu_length = tsdu_length + pgm_pkt_offset (FALSE, pgmcc_family); + +/* continue if send would block */ + if (sock->is_apdu_eagain) { + STATE(skb)->tstamp = pgm_time_update_now(); + goto retry_send; + } + +/* add PGM header to skbuff */ + STATE(skb) = pgm_skb_get(skb); + STATE(skb)->sock = sock; + STATE(skb)->tstamp = pgm_time_update_now(); + + STATE(skb)->pgm_header = (struct pgm_header*)STATE(skb)->head; + STATE(skb)->pgm_data = (struct pgm_data*)(STATE(skb)->pgm_header + 1); + memcpy (STATE(skb)->pgm_header->pgm_gsi, &sock->tsi.gsi, sizeof(pgm_gsi_t)); + STATE(skb)->pgm_header->pgm_sport = sock->tsi.sport; + STATE(skb)->pgm_header->pgm_dport = sock->dport; + STATE(skb)->pgm_header->pgm_type = PGM_ODATA; + STATE(skb)->pgm_header->pgm_options = sock->use_pgmcc ? PGM_OPT_PRESENT : 0; + STATE(skb)->pgm_header->pgm_tsdu_length = htons (tsdu_length); + +/* ODATA */ + STATE(skb)->pgm_data->data_sqn = htonl (pgm_txw_next_lead(sock->window)); + STATE(skb)->pgm_data->data_trail = htonl (pgm_txw_trail(sock->window)); + + STATE(skb)->pgm_header->pgm_checksum = 0; + void* data = STATE(skb)->pgm_data + 1; + if (sock->use_pgmcc) { + struct pgm_opt_length* opt_len = data; + opt_len->opt_type = PGM_OPT_LENGTH; + opt_len->opt_length = sizeof(struct pgm_opt_length); + opt_len->opt_total_length = htons ( sizeof(struct pgm_opt_length) + + sizeof(struct pgm_opt_header) + + ((AF_INET6 == sock->acker_nla.ss_family) ? + sizeof(struct pgm_opt6_pgmcc_data) : + sizeof(struct pgm_opt_pgmcc_data)) ); + struct pgm_opt_header* opt_header = (struct pgm_opt_header*)(opt_len + 1); + opt_header->opt_type = PGM_OPT_PGMCC_DATA | PGM_OPT_END; + opt_header->opt_length = sizeof(struct pgm_opt_header) + + ((AF_INET6 == sock->acker_nla.ss_family) ? + sizeof(struct pgm_opt6_pgmcc_data) : + sizeof(struct pgm_opt_pgmcc_data)); + struct pgm_opt_pgmcc_data* pgmcc_data = (struct pgm_opt_pgmcc_data*)(opt_header + 1); + struct pgm_opt6_pgmcc_data* pgmcc_data6 = (struct pgm_opt6_pgmcc_data*)(opt_header + 1); + + pgmcc_data->opt_tstamp = htonl (pgm_to_msecs (STATE(skb)->tstamp)); +/* acker nla */ + pgm_sockaddr_to_nla ((struct sockaddr*)&sock->acker_nla, (char*)&pgmcc_data->opt_nla_afi); + if (AF_INET6 == sock->acker_nla.ss_family) + data = (char*)pgmcc_data6 + sizeof(struct pgm_opt6_pgmcc_data); + else + data = (char*)pgmcc_data + sizeof(struct pgm_opt_pgmcc_data); + } + const size_t pgm_header_len = (char*)data - (char*)STATE(skb)->pgm_header; + const uint32_t unfolded_header = pgm_csum_partial (STATE(skb)->pgm_header, pgm_header_len, 0); + STATE(unfolded_odata) = pgm_csum_partial (data, tsdu_length, 0); + STATE(skb)->pgm_header->pgm_checksum = pgm_csum_fold (pgm_csum_block_add (unfolded_header, STATE(unfolded_odata), pgm_header_len)); + +/* add to transmit window, skb::data set to payload */ + pgm_spinlock_lock (&sock->txw_spinlock); + pgm_txw_add (sock->window, STATE(skb)); + pgm_spinlock_unlock (&sock->txw_spinlock); + +/* the transmit window MUST check the user count to ensure it does not + * attempt to send a repair-data packet based on in transit original data. + */ + + ssize_t sent; +retry_send: + +/* congestion control */ + if (sock->use_pgmcc && + sock->tokens < pgm_fp8 (1)) + { +// pgm_trace (PGM_LOG_ROLE_CONGESTION_CONTROL,_("Token limit reached.")); + sock->is_apdu_eagain = TRUE; + sock->blocklen = tpdu_length; + return PGM_IO_STATUS_CONGESTION; /* peer expiration to re-elect ACKer */ + } + + sent = pgm_sendto (sock, + sock->is_controlled_odata, /* rate limited */ + FALSE, /* regular socket */ + STATE(skb)->head, + tpdu_length, + (struct sockaddr*)&sock->send_gsr.gsr_group, + pgm_sockaddr_len((struct sockaddr*)&sock->send_gsr.gsr_group)); + if (sent < 0 && (EAGAIN == errno || ENOBUFS == errno)) { + sock->is_apdu_eagain = TRUE; + sock->blocklen = tpdu_length; + if (EAGAIN == errno) { + if (sock->use_pgmcc) + pgm_notify_clear (&sock->ack_notify); + return PGM_IO_STATUS_WOULD_BLOCK; + } + return PGM_IO_STATUS_RATE_LIMITED; + } + +/* save unfolded odata for retransmissions */ + pgm_txw_set_unfolded_checksum (STATE(skb), STATE(unfolded_odata)); + + sock->is_apdu_eagain = FALSE; + reset_heartbeat_spm (sock, STATE(skb)->tstamp); + if (sock->use_pgmcc) { + sock->tokens -= pgm_fp8 (1); + sock->ack_expiry = STATE(skb)->tstamp + sock->ack_expiry_ivl; + } + + if (PGM_LIKELY((size_t)sent == tpdu_length)) { + sock->cumulative_stats[PGM_PC_SOURCE_DATA_BYTES_SENT] += tsdu_length; + sock->cumulative_stats[PGM_PC_SOURCE_DATA_MSGS_SENT] ++; + pgm_atomic_add32 (&sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT], tpdu_length + sock->iphdr_len); + } + +/* check for end of transmission group */ + if (sock->use_proactive_parity) { + const uint32_t odata_sqn = ntohl (STATE(skb)->pgm_data->data_sqn); + const uint32_t tg_sqn_mask = 0xffffffff << sock->tg_sqn_shift; + if (!((odata_sqn + 1) & ~tg_sqn_mask)) + pgm_schedule_proactive_nak (sock, odata_sqn & tg_sqn_mask); + } + +/* remove applications reference to skbuff */ + pgm_free_skb (STATE(skb)); + if (bytes_written) + *bytes_written = tsdu_length; + return PGM_IO_STATUS_NORMAL; +} + +/* send one PGM original data packet, callee owned memory. + * + * on success, returns PGM_IO_STATUS_NORMAL, on block for non-blocking sockets + * returns PGM_IO_STATUS_WOULD_BLOCK, returns PGM_IO_STATUS_RATE_LIMITED if + * packet size exceeds the current rate limit. + */ + +static +int +send_odata_copy ( + pgm_sock_t* const restrict sock, + const void* restrict tsdu, + const uint16_t tsdu_length, + size_t* restrict bytes_written + ) +{ +/* pre-conditions */ + pgm_assert (NULL != sock); + pgm_assert (tsdu_length <= sock->max_tsdu); + if (PGM_LIKELY(tsdu_length)) pgm_assert (NULL != tsdu); + + pgm_debug ("send_odata_copy (sock:%p tsdu:%p tsdu_length:%u bytes-written:%p)", + (void*)sock, tsdu, tsdu_length, (void*)bytes_written); + + const sa_family_t pgmcc_family = sock->use_pgmcc ? sock->family : 0; + const size_t tpdu_length = tsdu_length + pgm_pkt_offset (FALSE, pgmcc_family); + +/* continue if blocked mid-apdu, updating timestamp */ + if (sock->is_apdu_eagain) { + STATE(skb)->tstamp = pgm_time_update_now(); + goto retry_send; + } + + STATE(skb) = pgm_alloc_skb (sock->max_tpdu); + STATE(skb)->sock = sock; + STATE(skb)->tstamp = pgm_time_update_now(); + pgm_skb_reserve (STATE(skb), pgm_pkt_offset (FALSE, pgmcc_family)); + pgm_skb_put (STATE(skb), tsdu_length); + + STATE(skb)->pgm_header = (struct pgm_header*)STATE(skb)->head; + STATE(skb)->pgm_data = (struct pgm_data*)(STATE(skb)->pgm_header + 1); + memcpy (STATE(skb)->pgm_header->pgm_gsi, &sock->tsi.gsi, sizeof(pgm_gsi_t)); + STATE(skb)->pgm_header->pgm_sport = sock->tsi.sport; + STATE(skb)->pgm_header->pgm_dport = sock->dport; + STATE(skb)->pgm_header->pgm_type = PGM_ODATA; + STATE(skb)->pgm_header->pgm_options = sock->use_pgmcc ? PGM_OPT_PRESENT : 0; + STATE(skb)->pgm_header->pgm_tsdu_length = htons (tsdu_length); + +/* ODATA */ + STATE(skb)->pgm_data->data_sqn = htonl (pgm_txw_next_lead(sock->window)); + STATE(skb)->pgm_data->data_trail = htonl (pgm_txw_trail(sock->window)); + + STATE(skb)->pgm_header->pgm_checksum = 0; + void* data = STATE(skb)->pgm_data + 1; + if (sock->use_pgmcc) { + struct pgm_opt_length* opt_len = data; + opt_len->opt_type = PGM_OPT_LENGTH; + opt_len->opt_length = sizeof(struct pgm_opt_length); + opt_len->opt_total_length = htons ( sizeof(struct pgm_opt_length) + + sizeof(struct pgm_opt_header) + + ((AF_INET6 == sock->acker_nla.ss_family) ? + sizeof(struct pgm_opt6_pgmcc_data) : + sizeof(struct pgm_opt_pgmcc_data)) ); + struct pgm_opt_header* opt_header = (struct pgm_opt_header*)(opt_len + 1); + opt_header->opt_type = PGM_OPT_PGMCC_DATA | PGM_OPT_END; + opt_header->opt_length = sizeof(struct pgm_opt_header) + + ((AF_INET6 == sock->acker_nla.ss_family) ? + sizeof(struct pgm_opt6_pgmcc_data) : + sizeof(struct pgm_opt_pgmcc_data)); + struct pgm_opt_pgmcc_data* pgmcc_data = (struct pgm_opt_pgmcc_data*)(opt_header + 1); +/* unused */ +// struct pgm_opt6_pgmcc_data* pgmcc_data6 = (struct pgm_opt6_pgmcc_data*)(opt_header + 1); + + pgmcc_data->opt_reserved = 0; + pgmcc_data->opt_tstamp = htonl (pgm_to_msecs (STATE(skb)->tstamp)); +/* acker nla */ + pgm_sockaddr_to_nla ((struct sockaddr*)&sock->acker_nla, (char*)&pgmcc_data->opt_nla_afi); + data = (char*)opt_header + opt_header->opt_length; + } + const size_t pgm_header_len = (char*)data - (char*)STATE(skb)->pgm_header; + const uint32_t unfolded_header = pgm_csum_partial (STATE(skb)->pgm_header, pgm_header_len, 0); + STATE(unfolded_odata) = pgm_csum_partial_copy (tsdu, data, tsdu_length, 0); + STATE(skb)->pgm_header->pgm_checksum = pgm_csum_fold (pgm_csum_block_add (unfolded_header, STATE(unfolded_odata), pgm_header_len)); + +/* add to transmit window, skb::data set to payload */ + pgm_spinlock_lock (&sock->txw_spinlock); + pgm_txw_add (sock->window, STATE(skb)); + pgm_spinlock_unlock (&sock->txw_spinlock); + + ssize_t sent; +retry_send: + +/* congestion control */ + if (sock->use_pgmcc && + sock->tokens < pgm_fp8 (1)) + { +// pgm_trace (PGM_LOG_ROLE_CONGESTION_CONTROL,_("Token limit reached.")); + sock->is_apdu_eagain = TRUE; + sock->blocklen = tpdu_length; + return PGM_IO_STATUS_CONGESTION; + } + + sent = pgm_sendto (sock, + sock->is_controlled_odata, /* rate limited */ + FALSE, /* regular socket */ + STATE(skb)->head, + tpdu_length, + (struct sockaddr*)&sock->send_gsr.gsr_group, + pgm_sockaddr_len((struct sockaddr*)&sock->send_gsr.gsr_group)); + if (sent < 0 && (EAGAIN == errno || ENOBUFS == errno)) { + sock->is_apdu_eagain = TRUE; + sock->blocklen = tpdu_length; + if (EAGAIN == errno) { + if (sock->use_pgmcc) + pgm_notify_clear (&sock->ack_notify); + return PGM_IO_STATUS_WOULD_BLOCK; + } + return PGM_IO_STATUS_RATE_LIMITED; + } + + if (sock->use_pgmcc) { + sock->tokens -= pgm_fp8 (1); + pgm_trace (PGM_LOG_ROLE_CONGESTION_CONTROL,_("PGMCC tokens-- (T:%u W:%u)"), + pgm_fp8tou (sock->tokens), pgm_fp8tou (sock->cwnd_size)); + sock->ack_expiry = STATE(skb)->tstamp + sock->ack_expiry_ivl; + } + +/* save unfolded odata for retransmissions */ + pgm_txw_set_unfolded_checksum (STATE(skb), STATE(unfolded_odata)); + + sock->is_apdu_eagain = FALSE; + reset_heartbeat_spm (sock, STATE(skb)->tstamp); + + if (PGM_LIKELY((size_t)sent == tpdu_length)) { + sock->cumulative_stats[PGM_PC_SOURCE_DATA_BYTES_SENT] += tsdu_length; + sock->cumulative_stats[PGM_PC_SOURCE_DATA_MSGS_SENT] ++; + pgm_atomic_add32 (&sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT], tpdu_length + sock->iphdr_len); + } + +/* check for end of transmission group */ + if (sock->use_proactive_parity) { + const uint32_t odata_sqn = ntohl (STATE(skb)->pgm_data->data_sqn); + const uint32_t tg_sqn_mask = 0xffffffff << sock->tg_sqn_shift; + if (!((odata_sqn + 1) & ~tg_sqn_mask)) + pgm_schedule_proactive_nak (sock, odata_sqn & tg_sqn_mask); + } + +/* return data payload length sent */ + if (bytes_written) + *bytes_written = tsdu_length; + return PGM_IO_STATUS_NORMAL; +} + +/* send one PGM original data packet, callee owned scatter/gather io vector + * + * ⎢ DATA₀ ⎢ + * ⎢ DATA₁ ⎢ → send_odatav() → ⎢ TSDU₀ ⎢ → libc + * ⎢ ⋮ ⎢ + * + * on success, returns PGM_IO_STATUS_NORMAL, on block for non-blocking sockets + * returns PGM_IO_STATUS_WOULD_BLOCK, returns PGM_IO_STATUS_RATE_LIMITED if + * packet size exceeds the current rate limit. + */ + +static +int +send_odatav ( + pgm_sock_t* const restrict sock, + const struct pgm_iovec* const restrict vector, + const unsigned count, /* number of items in vector */ + size_t* restrict bytes_written + ) +{ +/* pre-conditions */ + pgm_assert (NULL != sock); + pgm_assert (count <= PGM_MAX_FRAGMENTS); + if (PGM_LIKELY(count)) pgm_assert (NULL != vector); + + pgm_debug ("send_odatav (sock:%p vector:%p count:%u bytes-written:%p)", + (const void*)sock, (const void*)vector, count, (const void*)bytes_written); + + if (PGM_UNLIKELY(0 == count)) + return send_odata_copy (sock, NULL, 0, bytes_written); + +/* continue if blocked on send */ + if (sock->is_apdu_eagain) + goto retry_send; + + STATE(tsdu_length) = 0; + for (unsigned i = 0; i < count; i++) + { +#ifdef TRANSPORT_DEBUG + if (PGM_LIKELY(vector[i].iov_len)) { + pgm_assert( vector[i].iov_base ); + } +#endif + STATE(tsdu_length) += vector[i].iov_len; + } + pgm_return_val_if_fail (STATE(tsdu_length) <= sock->max_tsdu, PGM_IO_STATUS_ERROR); + + STATE(skb) = pgm_alloc_skb (sock->max_tpdu); + STATE(skb)->sock = sock; + STATE(skb)->tstamp = pgm_time_update_now(); + const sa_family_t pgmcc_family = sock->use_pgmcc ? sock->family : 0; + pgm_skb_reserve (STATE(skb), pgm_pkt_offset (FALSE, pgmcc_family)); + pgm_skb_put (STATE(skb), STATE(tsdu_length)); + + STATE(skb)->pgm_header = (struct pgm_header*)STATE(skb)->data; + STATE(skb)->pgm_data = (struct pgm_data*)(STATE(skb)->pgm_header + 1); + memcpy (STATE(skb)->pgm_header->pgm_gsi, &sock->tsi.gsi, sizeof(pgm_gsi_t)); + STATE(skb)->pgm_header->pgm_sport = sock->tsi.sport; + STATE(skb)->pgm_header->pgm_dport = sock->dport; + STATE(skb)->pgm_header->pgm_type = PGM_ODATA; + STATE(skb)->pgm_header->pgm_options = 0; + STATE(skb)->pgm_header->pgm_tsdu_length = htons (STATE(tsdu_length)); + +/* ODATA */ + STATE(skb)->pgm_data->data_sqn = htonl (pgm_txw_next_lead(sock->window)); + STATE(skb)->pgm_data->data_trail = htonl (pgm_txw_trail(sock->window)); + + STATE(skb)->pgm_header->pgm_checksum = 0; + const size_t pgm_header_len = (char*)(STATE(skb)->pgm_data + 1) - (char*)STATE(skb)->pgm_header; + const uint32_t unfolded_header = pgm_csum_partial (STATE(skb)->pgm_header, pgm_header_len, 0); + +/* unroll first iteration to make friendly branch prediction */ + char* dst = (char*)(STATE(skb)->pgm_data + 1); + STATE(unfolded_odata) = pgm_csum_partial_copy ((const char*)vector[0].iov_base, dst, vector[0].iov_len, 0); + +/* iterate over one or more vector elements to perform scatter/gather checksum & copy */ + for (unsigned i = 1; i < count; i++) { + dst += vector[i-1].iov_len; + const uint32_t unfolded_element = pgm_csum_partial_copy ((const char*)vector[i].iov_base, dst, vector[i].iov_len, 0); + STATE(unfolded_odata) = pgm_csum_block_add (STATE(unfolded_odata), unfolded_element, vector[i-1].iov_len); + } + + STATE(skb)->pgm_header->pgm_checksum = pgm_csum_fold (pgm_csum_block_add (unfolded_header, STATE(unfolded_odata), pgm_header_len)); + +/* add to transmit window, skb::data set to payload */ + pgm_spinlock_lock (&sock->txw_spinlock); + pgm_txw_add (sock->window, STATE(skb)); + pgm_spinlock_unlock (&sock->txw_spinlock); + + ssize_t sent; + size_t tpdu_length; +retry_send: + pgm_assert ((char*)STATE(skb)->tail > (char*)STATE(skb)->head); + tpdu_length = (char*)STATE(skb)->tail - (char*)STATE(skb)->head; + sent = pgm_sendto (sock, + sock->is_controlled_odata, /* rate limited */ + FALSE, /* regular socket */ + STATE(skb)->head, + tpdu_length, + (struct sockaddr*)&sock->send_gsr.gsr_group, + pgm_sockaddr_len((struct sockaddr*)&sock->send_gsr.gsr_group)); + if (sent < 0 && (EAGAIN == errno || ENOBUFS == errno)) { + sock->is_apdu_eagain = TRUE; + sock->blocklen = tpdu_length; + if (EAGAIN == errno) { + if (sock->use_pgmcc) + pgm_notify_clear (&sock->ack_notify); + return PGM_IO_STATUS_WOULD_BLOCK; + } + return PGM_IO_STATUS_RATE_LIMITED; + } + +/* save unfolded odata for retransmissions */ + pgm_txw_set_unfolded_checksum (STATE(skb), STATE(unfolded_odata)); + + sock->is_apdu_eagain = FALSE; + reset_heartbeat_spm (sock, STATE(skb)->tstamp); + + if (PGM_LIKELY((size_t)sent == STATE(skb)->len)) { + sock->cumulative_stats[PGM_PC_SOURCE_DATA_BYTES_SENT] += STATE(tsdu_length); + sock->cumulative_stats[PGM_PC_SOURCE_DATA_MSGS_SENT] ++; + pgm_atomic_add32 (&sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT], tpdu_length + sock->iphdr_len); + } + +/* check for end of transmission group */ + if (sock->use_proactive_parity) { + const uint32_t odata_sqn = ntohl (STATE(skb)->pgm_data->data_sqn); + const uint32_t tg_sqn_mask = 0xffffffff << sock->tg_sqn_shift; + if (!((odata_sqn + 1) & ~tg_sqn_mask)) + pgm_schedule_proactive_nak (sock, odata_sqn & tg_sqn_mask); + } + +/* return data payload length sent */ + if (bytes_written) + *bytes_written = STATE(tsdu_length); + return PGM_IO_STATUS_NORMAL; +} + +/* send PGM original data, callee owned memory. if larger than maximum TPDU + * size will be fragmented. + * + * on success, returns PGM_IO_STATUS_NORMAL, on block for non-blocking sockets + * returns PGM_IO_STATUS_WOULD_BLOCK, returns PGM_IO_STATUS_RATE_LIMITED if + * packet size exceeds the current rate limit. + */ + +static +int +send_apdu ( + pgm_sock_t* const restrict sock, + const void* restrict apdu, + const size_t apdu_length, + size_t* restrict bytes_written + ) +{ + size_t bytes_sent = 0; /* counted at IP layer */ + unsigned packets_sent = 0; /* IP packets */ + size_t data_bytes_sent = 0; + const sa_family_t pgmcc_family = sock->use_pgmcc ? sock->family : 0; + + pgm_assert (NULL != sock); + pgm_assert (NULL != apdu); + +/* continue if blocked mid-apdu */ + if (sock->is_apdu_eagain) + goto retry_send; + +/* if non-blocking calculate total wire size and check rate limit */ + STATE(is_rate_limited) = FALSE; + if (sock->is_nonblocking && sock->is_controlled_odata) + { + const size_t header_length = pgm_pkt_offset (TRUE, pgmcc_family); + size_t tpdu_length = 0; + size_t offset_ = 0; + do { + const uint16_t tsdu_length = MIN( source_max_tsdu (sock, TRUE), apdu_length - offset_ ); + tpdu_length += sock->iphdr_len + header_length + tsdu_length; + offset_ += tsdu_length; + } while (offset_ < apdu_length); + +/* calculation includes one iphdr length already */ + if (!pgm_rate_check (&sock->rate_control, + tpdu_length - sock->iphdr_len, + sock->is_nonblocking)) + { + sock->blocklen = tpdu_length; + return PGM_IO_STATUS_RATE_LIMITED; + } + STATE(is_rate_limited) = TRUE; + } + + STATE(data_bytes_offset) = 0; + STATE(first_sqn) = pgm_txw_next_lead(sock->window); + + do { +/* retrieve packet storage from transmit window */ + size_t header_length = pgm_pkt_offset (TRUE, pgmcc_family); + STATE(tsdu_length) = MIN( source_max_tsdu (sock, TRUE), apdu_length - STATE(data_bytes_offset) ); + + STATE(skb) = pgm_alloc_skb (sock->max_tpdu); + STATE(skb)->sock = sock; + STATE(skb)->tstamp = pgm_time_update_now(); + pgm_skb_reserve (STATE(skb), header_length); + pgm_skb_put (STATE(skb), STATE(tsdu_length)); + + STATE(skb)->pgm_header = (struct pgm_header*)STATE(skb)->head; + STATE(skb)->pgm_data = (struct pgm_data*)(STATE(skb)->pgm_header + 1); + memcpy (STATE(skb)->pgm_header->pgm_gsi, &sock->tsi.gsi, sizeof(pgm_gsi_t)); + STATE(skb)->pgm_header->pgm_sport = sock->tsi.sport; + STATE(skb)->pgm_header->pgm_dport = sock->dport; + STATE(skb)->pgm_header->pgm_type = PGM_ODATA; + STATE(skb)->pgm_header->pgm_options = PGM_OPT_PRESENT; + STATE(skb)->pgm_header->pgm_tsdu_length = htons (STATE(tsdu_length)); + +/* ODATA */ + STATE(skb)->pgm_data->data_sqn = htonl (pgm_txw_next_lead(sock->window)); + STATE(skb)->pgm_data->data_trail = htonl (pgm_txw_trail(sock->window)); + +/* OPT_LENGTH */ + struct pgm_opt_length* opt_len = (struct pgm_opt_length*)(STATE(skb)->pgm_data + 1); + opt_len->opt_type = PGM_OPT_LENGTH; + opt_len->opt_length = sizeof(struct pgm_opt_length); + opt_len->opt_total_length = htons ( sizeof(struct pgm_opt_length) + + sizeof(struct pgm_opt_header) + + sizeof(struct pgm_opt_fragment) ); +/* OPT_FRAGMENT */ + struct pgm_opt_header* opt_header = (struct pgm_opt_header*)(opt_len + 1); + opt_header->opt_type = PGM_OPT_FRAGMENT | PGM_OPT_END; + opt_header->opt_length = sizeof(struct pgm_opt_header) + + sizeof(struct pgm_opt_fragment); + STATE(skb)->pgm_opt_fragment = (struct pgm_opt_fragment*)(opt_header + 1); + STATE(skb)->pgm_opt_fragment->opt_reserved = 0; + STATE(skb)->pgm_opt_fragment->opt_sqn = htonl (STATE(first_sqn)); + STATE(skb)->pgm_opt_fragment->opt_frag_off = htonl (STATE(data_bytes_offset)); + STATE(skb)->pgm_opt_fragment->opt_frag_len = htonl (apdu_length); + +/* TODO: the assembly checksum & copy routine is faster than memcpy & pgm_cksum on >= opteron hardware */ + STATE(skb)->pgm_header->pgm_checksum = 0; + const size_t pgm_header_len = (char*)(STATE(skb)->pgm_opt_fragment + 1) - (char*)STATE(skb)->pgm_header; + const uint32_t unfolded_header = pgm_csum_partial (STATE(skb)->pgm_header, pgm_header_len, 0); + STATE(unfolded_odata) = pgm_csum_partial_copy ((const char*)apdu + STATE(data_bytes_offset), STATE(skb)->pgm_opt_fragment + 1, STATE(tsdu_length), 0); + STATE(skb)->pgm_header->pgm_checksum = pgm_csum_fold (pgm_csum_block_add (unfolded_header, STATE(unfolded_odata), pgm_header_len)); + +/* add to transmit window, skb::data set to payload */ + pgm_spinlock_lock (&sock->txw_spinlock); + pgm_txw_add (sock->window, STATE(skb)); + pgm_spinlock_unlock (&sock->txw_spinlock); + + ssize_t sent; + size_t tpdu_length; +retry_send: + pgm_assert ((char*)STATE(skb)->tail > (char*)STATE(skb)->head); + tpdu_length = (char*)STATE(skb)->tail - (char*)STATE(skb)->head; + sent = pgm_sendto (sock, + !STATE(is_rate_limited), /* rate limit on blocking */ + FALSE, /* regular socket */ + STATE(skb)->head, + tpdu_length, + (struct sockaddr*)&sock->send_gsr.gsr_group, + pgm_sockaddr_len((struct sockaddr*)&sock->send_gsr.gsr_group)); + if (sent < 0 && (EAGAIN == errno || ENOBUFS == errno)) { + sock->is_apdu_eagain = TRUE; + sock->blocklen = tpdu_length; + goto blocked; + } + +/* save unfolded odata for retransmissions */ + pgm_txw_set_unfolded_checksum (STATE(skb), STATE(unfolded_odata)); + + if (PGM_LIKELY((size_t)sent == tpdu_length)) { + bytes_sent += tpdu_length + sock->iphdr_len; /* as counted at IP layer */ + packets_sent++; /* IP packets */ + data_bytes_sent += STATE(tsdu_length); + } + + STATE(data_bytes_offset) += STATE(tsdu_length); + +/* check for end of transmission group */ + if (sock->use_proactive_parity) { + const uint32_t odata_sqn = ntohl (STATE(skb)->pgm_data->data_sqn); + const uint32_t tg_sqn_mask = 0xffffffff << sock->tg_sqn_shift; + if (!((odata_sqn + 1) & ~tg_sqn_mask)) + pgm_schedule_proactive_nak (sock, odata_sqn & tg_sqn_mask); + } + + } while ( STATE(data_bytes_offset) < apdu_length); + pgm_assert( STATE(data_bytes_offset) == apdu_length ); + + sock->is_apdu_eagain = FALSE; + reset_heartbeat_spm (sock, STATE(skb)->tstamp); + + pgm_atomic_add32 (&sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT], bytes_sent); + sock->cumulative_stats[PGM_PC_SOURCE_DATA_MSGS_SENT] += packets_sent; + sock->cumulative_stats[PGM_PC_SOURCE_DATA_BYTES_SENT] += data_bytes_sent; + if (bytes_written) + *bytes_written = apdu_length; + return PGM_IO_STATUS_NORMAL; + +blocked: + if (bytes_sent) { + reset_heartbeat_spm (sock, STATE(skb)->tstamp); + pgm_atomic_add32 (&sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT], bytes_sent); + sock->cumulative_stats[PGM_PC_SOURCE_DATA_MSGS_SENT] += packets_sent; + sock->cumulative_stats[PGM_PC_SOURCE_DATA_BYTES_SENT] += data_bytes_sent; + } + if (EAGAIN == errno) { + if (sock->use_pgmcc) + pgm_notify_clear (&sock->ack_notify); + return PGM_IO_STATUS_WOULD_BLOCK; + } + return PGM_IO_STATUS_RATE_LIMITED; +} + +/* Send one APDU, whether it fits within one TPDU or more. + * + * on success, returns PGM_IO_STATUS_NORMAL, on block for non-blocking sockets + * returns PGM_IO_STATUS_WOULD_BLOCK, returns PGM_IO_STATUS_RATE_LIMITED if + * packet size exceeds the current rate limit. + */ +int +pgm_send ( + pgm_sock_t* const restrict sock, + const void* restrict apdu, + const size_t apdu_length, + size_t* restrict bytes_written + ) +{ + pgm_debug ("pgm_send (sock:%p apdu:%p apdu-length:%zu bytes-written:%p)", + (void*)sock, apdu, apdu_length, (void*)bytes_written); + +/* parameters */ + pgm_return_val_if_fail (NULL != sock, PGM_IO_STATUS_ERROR); + if (PGM_LIKELY(apdu_length)) pgm_return_val_if_fail (NULL != apdu, PGM_IO_STATUS_ERROR); + +/* shutdown */ + if (PGM_UNLIKELY(!pgm_rwlock_reader_trylock (&sock->lock))) + pgm_return_val_if_reached (PGM_IO_STATUS_ERROR); + +/* state */ + if (PGM_UNLIKELY(!sock->is_bound || + sock->is_destroyed || + apdu_length > sock->max_apdu)) + { + pgm_rwlock_reader_unlock (&sock->lock); + pgm_return_val_if_reached (PGM_IO_STATUS_ERROR); + } + +/* source */ + pgm_mutex_lock (&sock->source_mutex); + +/* pass on non-fragment calls */ + if (apdu_length <= sock->max_tsdu) + { + const int status = send_odata_copy (sock, apdu, apdu_length, bytes_written); + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + return status; + } + + const int status = send_apdu (sock, apdu, apdu_length, bytes_written); + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + return status; +} + +/* send PGM original data, callee owned scatter/gather IO vector. if larger than maximum TPDU + * size will be fragmented. + * + * is_one_apdu = true: + * + * ⎢ DATA₀ ⎢ + * ⎢ DATA₁ ⎢ → pgm_sendv() → ⎢ ⋯ TSDU₁ TSDU₀ ⎢ → libc + * ⎢ ⋮ ⎢ + * + * is_one_apdu = false: + * + * ⎢ APDU₀ ⎢ ⎢ ⋯ TSDU₁,₀ TSDU₀,₀ ⎢ + * ⎢ APDU₁ ⎢ → pgm_sendv() → ⎢ ⋯ TSDU₁,₁ TSDU₀,₁ ⎢ → libc + * ⎢ ⋮ ⎢ ⎢ ⋮ ⋮ ⎢ + * + * on success, returns PGM_IO_STATUS_NORMAL, on block for non-blocking sockets + * returns PGM_IO_STATUS_WOULD_BLOCK, returns PGM_IO_STATUS_RATE_LIMITED if + * packet size exceeds the current rate limit. + */ + +int +pgm_sendv ( + pgm_sock_t* const restrict sock, + const struct pgm_iovec* const restrict vector, + const unsigned count, /* number of items in vector */ + const bool is_one_apdu, /* true = vector = apdu, false = vector::iov_base = apdu */ + size_t* restrict bytes_written + ) +{ + pgm_debug ("pgm_sendv (sock:%p vector:%p count:%u is-one-apdu:%s bytes-written:%p)", + (const void*)sock, + (const void*)vector, + count, + is_one_apdu ? "TRUE" : "FALSE", + (const void*)bytes_written); + + pgm_return_val_if_fail (NULL != sock, PGM_IO_STATUS_ERROR); + pgm_return_val_if_fail (count <= PGM_MAX_FRAGMENTS, PGM_IO_STATUS_ERROR); + if (PGM_LIKELY(count)) pgm_return_val_if_fail (NULL != vector, PGM_IO_STATUS_ERROR); + if (PGM_UNLIKELY(!pgm_rwlock_reader_trylock (&sock->lock))) + pgm_return_val_if_reached (PGM_IO_STATUS_ERROR); + if (PGM_UNLIKELY(!sock->is_bound || + sock->is_destroyed)) + { + pgm_rwlock_reader_unlock (&sock->lock); + pgm_return_val_if_reached (PGM_IO_STATUS_ERROR); + } + + pgm_mutex_lock (&sock->source_mutex); + +/* pass on zero length as cannot count vector lengths */ + if (PGM_UNLIKELY(0 == count)) + { + const int status = send_odata_copy (sock, NULL, count, bytes_written); + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + return status; + } + + size_t bytes_sent = 0; + unsigned packets_sent = 0; + size_t data_bytes_sent = 0; + const sa_family_t pgmcc_family = sock->use_pgmcc ? sock->family : 0; + +/* continue if blocked mid-apdu */ + if (sock->is_apdu_eagain) { + if (is_one_apdu) { + if (STATE(apdu_length) <= sock->max_tsdu) + { + const int status = send_odatav (sock, vector, count, bytes_written); + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + return status; + } + else + goto retry_one_apdu_send; + } else { + goto retry_send; + } + } + +/* calculate (total) APDU length */ + STATE(apdu_length) = 0; + for (unsigned i = 0; i < count; i++) + { +#ifdef TRANSPORT_DEBUG + if (PGM_LIKELY(vector[i].iov_len)) { + pgm_assert( vector[i].iov_base ); + } +#endif + if (!is_one_apdu && + vector[i].iov_len > sock->max_apdu) + { + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + pgm_return_val_if_reached (PGM_IO_STATUS_ERROR); + } + STATE(apdu_length) += vector[i].iov_len; + } + +/* pass on non-fragment calls */ + if (is_one_apdu) { + if (STATE(apdu_length) <= sock->max_tsdu) { + const int status = send_odatav (sock, vector, count, bytes_written); + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + return status; + } else if (STATE(apdu_length) > sock->max_apdu) { + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + pgm_return_val_if_reached (PGM_IO_STATUS_ERROR); + } + } + +/* if non-blocking calculate total wire size and check rate limit */ + STATE(is_rate_limited) = FALSE; + if (sock->is_nonblocking && sock->is_controlled_odata) + { + const size_t header_length = pgm_pkt_offset (TRUE, pgmcc_family); + size_t tpdu_length = 0; + size_t offset_ = 0; + do { + const uint16_t tsdu_length = MIN( source_max_tsdu (sock, TRUE), STATE(apdu_length) - offset_ ); + tpdu_length += sock->iphdr_len + header_length + tsdu_length; + offset_ += tsdu_length; + } while (offset_ < STATE(apdu_length)); + +/* calculation includes one iphdr length already */ + if (!pgm_rate_check (&sock->rate_control, + tpdu_length - sock->iphdr_len, + sock->is_nonblocking)) + { + sock->blocklen = tpdu_length; + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + return PGM_IO_STATUS_RATE_LIMITED; + } + STATE(is_rate_limited) = TRUE; + } + +/* non-fragmented packets can be forwarded onto basic send() */ + if (!is_one_apdu) + { + for (STATE(data_pkt_offset) = 0; STATE(data_pkt_offset) < count; STATE(data_pkt_offset)++) + { + size_t wrote_bytes; + int status; +retry_send: + status = send_apdu (sock, + vector[STATE(data_pkt_offset)].iov_base, + vector[STATE(data_pkt_offset)].iov_len, + &wrote_bytes); + switch (status) { + case PGM_IO_STATUS_NORMAL: + break; + case PGM_IO_STATUS_WOULD_BLOCK: + case PGM_IO_STATUS_RATE_LIMITED: + sock->is_apdu_eagain = TRUE; + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + return status; + case PGM_IO_STATUS_ERROR: + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + return status; + default: + pgm_assert_not_reached(); + } + data_bytes_sent += wrote_bytes; + } + + sock->is_apdu_eagain = FALSE; + if (bytes_written) + *bytes_written = data_bytes_sent; + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + return PGM_IO_STATUS_NORMAL; + } + + STATE(data_bytes_offset) = 0; + STATE(vector_index) = 0; + STATE(vector_offset) = 0; + + STATE(first_sqn) = pgm_txw_next_lead(sock->window); + + do { +/* retrieve packet storage from transmit window */ + size_t header_length = pgm_pkt_offset (TRUE, pgmcc_family); + STATE(tsdu_length) = MIN( source_max_tsdu (sock, TRUE), STATE(apdu_length) - STATE(data_bytes_offset) ); + STATE(skb) = pgm_alloc_skb (sock->max_tpdu); + STATE(skb)->sock = sock; + STATE(skb)->tstamp = pgm_time_update_now(); + pgm_skb_reserve (STATE(skb), header_length); + pgm_skb_put (STATE(skb), STATE(tsdu_length)); + + STATE(skb)->pgm_header = (struct pgm_header*)STATE(skb)->head; + STATE(skb)->pgm_data = (struct pgm_data*)(STATE(skb)->pgm_header + 1); + memcpy (STATE(skb)->pgm_header->pgm_gsi, &sock->tsi.gsi, sizeof(pgm_gsi_t)); + STATE(skb)->pgm_header->pgm_sport = sock->tsi.sport; + STATE(skb)->pgm_header->pgm_dport = sock->dport; + STATE(skb)->pgm_header->pgm_type = PGM_ODATA; + STATE(skb)->pgm_header->pgm_options = PGM_OPT_PRESENT; + STATE(skb)->pgm_header->pgm_tsdu_length = htons (STATE(tsdu_length)); + +/* ODATA */ + STATE(skb)->pgm_data->data_sqn = htonl (pgm_txw_next_lead(sock->window)); + STATE(skb)->pgm_data->data_trail = htonl (pgm_txw_trail(sock->window)); + +/* OPT_LENGTH */ + struct pgm_opt_length* opt_len = (struct pgm_opt_length*)(STATE(skb)->pgm_data + 1); + opt_len->opt_type = PGM_OPT_LENGTH; + opt_len->opt_length = sizeof(struct pgm_opt_length); + opt_len->opt_total_length = htons ( sizeof(struct pgm_opt_length) + + sizeof(struct pgm_opt_header) + + sizeof(struct pgm_opt_fragment) ); +/* OPT_FRAGMENT */ + struct pgm_opt_header* opt_header = (struct pgm_opt_header*)(opt_len + 1); + opt_header->opt_type = PGM_OPT_FRAGMENT | PGM_OPT_END; + opt_header->opt_length = sizeof(struct pgm_opt_header) + + sizeof(struct pgm_opt_fragment); + STATE(skb)->pgm_opt_fragment = (struct pgm_opt_fragment*)(opt_header + 1); + STATE(skb)->pgm_opt_fragment->opt_reserved = 0; + STATE(skb)->pgm_opt_fragment->opt_sqn = htonl (STATE(first_sqn)); + STATE(skb)->pgm_opt_fragment->opt_frag_off = htonl (STATE(data_bytes_offset)); + STATE(skb)->pgm_opt_fragment->opt_frag_len = htonl (STATE(apdu_length)); + +/* checksum & copy */ + STATE(skb)->pgm_header->pgm_checksum = 0; + const size_t pgm_header_len = (char*)(STATE(skb)->pgm_opt_fragment + 1) - (char*)STATE(skb)->pgm_header; + const uint32_t unfolded_header = pgm_csum_partial (STATE(skb)->pgm_header, pgm_header_len, 0); + +/* iterate over one or more vector elements to perform scatter/gather checksum & copy + * + * STATE(vector_index) - index into application scatter/gather vector + * STATE(vector_offset) - current offset into current vector element + * STATE(unfolded_odata)- checksum accumulator + */ + const char* src = (const char*)vector[STATE(vector_index)].iov_base + STATE(vector_offset); + char* dst = (char*)(STATE(skb)->pgm_opt_fragment + 1); + size_t src_length = vector[STATE(vector_index)].iov_len - STATE(vector_offset); + size_t dst_length = 0; + size_t copy_length = MIN( STATE(tsdu_length), src_length ); + STATE(unfolded_odata) = pgm_csum_partial_copy (src, dst, copy_length, 0); + + for(;;) + { + if (copy_length == src_length) { +/* application packet complete */ + STATE(vector_index)++; + STATE(vector_offset) = 0; + } else { +/* data still remaining */ + STATE(vector_offset) += copy_length; + } + + dst_length += copy_length; + +/* sock packet complete */ + if (dst_length == STATE(tsdu_length)) + break; + + src = (const char*)vector[STATE(vector_index)].iov_base + STATE(vector_offset); + dst += copy_length; + src_length = vector[STATE(vector_index)].iov_len - STATE(vector_offset); + copy_length = MIN( STATE(tsdu_length) - dst_length, src_length ); + const uint32_t unfolded_element = pgm_csum_partial_copy (src, dst, copy_length, 0); + STATE(unfolded_odata) = pgm_csum_block_add (STATE(unfolded_odata), unfolded_element, dst_length); + } + + STATE(skb)->pgm_header->pgm_checksum = pgm_csum_fold (pgm_csum_block_add (unfolded_header, STATE(unfolded_odata), pgm_header_len)); + +/* add to transmit window, skb::data set to payload */ + pgm_spinlock_lock (&sock->txw_spinlock); + pgm_txw_add (sock->window, STATE(skb)); + pgm_spinlock_unlock (&sock->txw_spinlock); + + ssize_t sent; + size_t tpdu_length; +retry_one_apdu_send: + tpdu_length = (char*)STATE(skb)->tail - (char*)STATE(skb)->head; + sent = pgm_sendto (sock, + !STATE(is_rate_limited), /* rate limited on blocking */ + FALSE, /* regular socket */ + STATE(skb)->head, + tpdu_length, + (struct sockaddr*)&sock->send_gsr.gsr_group, + pgm_sockaddr_len((struct sockaddr*)&sock->send_gsr.gsr_group)); + if (sent < 0 && (EAGAIN == errno || ENOBUFS == errno)) { + sock->is_apdu_eagain = TRUE; + sock->blocklen = tpdu_length; + goto blocked; + } + +/* save unfolded odata for retransmissions */ + pgm_txw_set_unfolded_checksum (STATE(skb), STATE(unfolded_odata)); + + if (PGM_LIKELY((size_t)sent == tpdu_length)) { + bytes_sent += tpdu_length + sock->iphdr_len; /* as counted at IP layer */ + packets_sent++; /* IP packets */ + data_bytes_sent += STATE(tsdu_length); + } + + STATE(data_bytes_offset) += STATE(tsdu_length); + +/* check for end of transmission group */ + if (sock->use_proactive_parity) { + const uint32_t odata_sqn = ntohl (STATE(skb)->pgm_data->data_sqn); + const uint32_t tg_sqn_mask = 0xffffffff << sock->tg_sqn_shift; + if (!((odata_sqn + 1) & ~tg_sqn_mask)) + pgm_schedule_proactive_nak (sock, odata_sqn & tg_sqn_mask); + } + + } while ( STATE(data_bytes_offset) < STATE(apdu_length) ); + pgm_assert( STATE(data_bytes_offset) == STATE(apdu_length) ); + + sock->is_apdu_eagain = FALSE; + reset_heartbeat_spm (sock, STATE(skb)->tstamp); + + pgm_atomic_add32 (&sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT], bytes_sent); + sock->cumulative_stats[PGM_PC_SOURCE_DATA_MSGS_SENT] += packets_sent; + sock->cumulative_stats[PGM_PC_SOURCE_DATA_BYTES_SENT] += data_bytes_sent; + if (bytes_written) + *bytes_written = STATE(apdu_length); + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + return PGM_IO_STATUS_NORMAL; + +blocked: + if (bytes_sent) { + reset_heartbeat_spm (sock, STATE(skb)->tstamp); + pgm_atomic_add32 (&sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT], bytes_sent); + sock->cumulative_stats[PGM_PC_SOURCE_DATA_MSGS_SENT] += packets_sent; + sock->cumulative_stats[PGM_PC_SOURCE_DATA_BYTES_SENT] += data_bytes_sent; + } + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + if (EAGAIN == errno) { + if (sock->use_pgmcc) + pgm_notify_clear (&sock->ack_notify); + return PGM_IO_STATUS_WOULD_BLOCK; + } + return PGM_IO_STATUS_RATE_LIMITED; +} + +/* send PGM original data, transmit window owned scatter/gather IO vector. + * + * ⎢ TSDU₀ ⎢ + * ⎢ TSDU₁ ⎢ → pgm_send_skbv() → ⎢ ⋯ TSDU₁ TSDU₀ ⎢ → libc + * ⎢ ⋮ ⎢ + * + * on success, returns PGM_IO_STATUS_NORMAL, on block for non-blocking sockets + * returns PGM_IO_STATUS_WOULD_BLOCK, returns PGM_IO_STATUS_RATE_LIMITED if + * packet size exceeds the current rate limit. + */ + +int +pgm_send_skbv ( + pgm_sock_t* const restrict sock, + struct pgm_sk_buff_t** const restrict vector, /* array of skb pointers vs. array of skbs */ + const unsigned count, + const bool is_one_apdu, /* true: vector = apdu, false: vector::iov_base = apdu */ + size_t* restrict bytes_written + ) +{ + pgm_debug ("pgm_send_skbv (sock:%p vector:%p count:%u is-one-apdu:%s bytes-written:%p)", + (const void*)sock, + (const void*)vector, + count, + is_one_apdu ? "TRUE" : "FALSE", + (const void*)bytes_written); + + pgm_return_val_if_fail (NULL != sock, PGM_IO_STATUS_ERROR); + pgm_return_val_if_fail (count <= PGM_MAX_FRAGMENTS, PGM_IO_STATUS_ERROR); + if (PGM_LIKELY(count)) pgm_return_val_if_fail (NULL != vector, PGM_IO_STATUS_ERROR); + if (PGM_UNLIKELY(!pgm_rwlock_reader_trylock (&sock->lock))) + pgm_return_val_if_reached (PGM_IO_STATUS_ERROR); + if (PGM_UNLIKELY(!sock->is_bound || + sock->is_destroyed)) + { + pgm_rwlock_reader_unlock (&sock->lock); + pgm_return_val_if_reached (PGM_IO_STATUS_ERROR); + } + + pgm_mutex_lock (&sock->source_mutex); + +/* pass on zero length as cannot count vector lengths */ + if (PGM_UNLIKELY(0 == count)) + { + const int status = send_odata_copy (sock, NULL, count, bytes_written); + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + return status; + } + else if (1 == count) + { + const int status = send_odata (sock, vector[0], bytes_written); + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + return status; + } + + size_t bytes_sent = 0; + unsigned packets_sent = 0; + size_t data_bytes_sent = 0; + const sa_family_t pgmcc_family = sock->use_pgmcc ? sock->family : 0; + +/* continue if blocked mid-apdu */ + if (sock->is_apdu_eagain) + goto retry_send; + + STATE(is_rate_limited) = FALSE; + if (sock->is_nonblocking && sock->is_controlled_odata) + { + size_t total_tpdu_length = 0; + for (unsigned i = 0; i < count; i++) + total_tpdu_length += sock->iphdr_len + pgm_pkt_offset (is_one_apdu, pgmcc_family) + vector[i]->len; + +/* calculation includes one iphdr length already */ + if (!pgm_rate_check (&sock->rate_control, + total_tpdu_length - sock->iphdr_len, + sock->is_nonblocking)) + { + sock->blocklen = total_tpdu_length; + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + return PGM_IO_STATUS_RATE_LIMITED; + } + STATE(is_rate_limited) = TRUE; + } + + if (is_one_apdu) + { + STATE(apdu_length) = 0; + STATE(first_sqn) = pgm_txw_next_lead(sock->window); + for (unsigned i = 0; i < count; i++) + { + if (PGM_UNLIKELY(vector[i]->len > sock->max_tsdu_fragment)) { + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + return PGM_IO_STATUS_ERROR; + } + STATE(apdu_length) += vector[i]->len; + } + if (PGM_UNLIKELY(STATE(apdu_length) > sock->max_apdu)) { + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + return PGM_IO_STATUS_ERROR; + } + } + + for (STATE(vector_index) = 0; STATE(vector_index) < count; STATE(vector_index)++) + { + STATE(tsdu_length) = vector[STATE(vector_index)]->len; + + STATE(skb) = pgm_skb_get(vector[STATE(vector_index)]); + STATE(skb)->sock = sock; + STATE(skb)->tstamp = pgm_time_update_now(); + + STATE(skb)->pgm_header = (struct pgm_header*)STATE(skb)->head; + STATE(skb)->pgm_data = (struct pgm_data*)(STATE(skb)->pgm_header + 1); + memcpy (STATE(skb)->pgm_header->pgm_gsi, &sock->tsi.gsi, sizeof(pgm_gsi_t)); + STATE(skb)->pgm_header->pgm_sport = sock->tsi.sport; + STATE(skb)->pgm_header->pgm_dport = sock->dport; + STATE(skb)->pgm_header->pgm_type = PGM_ODATA; + STATE(skb)->pgm_header->pgm_options = is_one_apdu ? PGM_OPT_PRESENT : 0; + STATE(skb)->pgm_header->pgm_tsdu_length = htons (STATE(tsdu_length)); + +/* ODATA */ + STATE(skb)->pgm_data->data_sqn = htonl (pgm_txw_next_lead(sock->window)); + STATE(skb)->pgm_data->data_trail = htonl (pgm_txw_trail(sock->window)); + + if (is_one_apdu) + { +/* OPT_LENGTH */ + struct pgm_opt_length* opt_len = (struct pgm_opt_length*)(STATE(skb)->pgm_data + 1); + opt_len->opt_type = PGM_OPT_LENGTH; + opt_len->opt_length = sizeof(struct pgm_opt_length); + opt_len->opt_total_length = htons ( sizeof(struct pgm_opt_length) + + sizeof(struct pgm_opt_header) + + sizeof(struct pgm_opt_fragment) ); +/* OPT_FRAGMENT */ + struct pgm_opt_header* opt_header = (struct pgm_opt_header*)(opt_len + 1); + opt_header->opt_type = PGM_OPT_FRAGMENT | PGM_OPT_END; + opt_header->opt_length = sizeof(struct pgm_opt_header) + + sizeof(struct pgm_opt_fragment); + STATE(skb)->pgm_opt_fragment = (struct pgm_opt_fragment*)(opt_header + 1); + STATE(skb)->pgm_opt_fragment->opt_reserved = 0; + STATE(skb)->pgm_opt_fragment->opt_sqn = htonl (STATE(first_sqn)); + STATE(skb)->pgm_opt_fragment->opt_frag_off = htonl (STATE(data_bytes_offset)); + STATE(skb)->pgm_opt_fragment->opt_frag_len = htonl (STATE(apdu_length)); + + pgm_assert (STATE(skb)->data == (STATE(skb)->pgm_opt_fragment + 1)); + } + else + { + pgm_assert (STATE(skb)->data == (STATE(skb)->pgm_data + 1)); + } + +/* TODO: the assembly checksum & copy routine is faster than memcpy & pgm_cksum on >= opteron hardware */ + STATE(skb)->pgm_header->pgm_checksum = 0; + pgm_assert ((char*)STATE(skb)->data > (char*)STATE(skb)->pgm_header); + const size_t pgm_header_len = (char*)STATE(skb)->data - (char*)STATE(skb)->pgm_header; + const uint32_t unfolded_header = pgm_csum_partial (STATE(skb)->pgm_header, pgm_header_len, 0); + STATE(unfolded_odata) = pgm_csum_partial ((char*)STATE(skb)->data, STATE(tsdu_length), 0); + STATE(skb)->pgm_header->pgm_checksum = pgm_csum_fold (pgm_csum_block_add (unfolded_header, STATE(unfolded_odata), pgm_header_len)); + +/* add to transmit window, skb::data set to payload */ + pgm_spinlock_lock (&sock->txw_spinlock); + pgm_txw_add (sock->window, STATE(skb)); + pgm_spinlock_unlock (&sock->txw_spinlock); + ssize_t sent; + size_t tpdu_length; +retry_send: + pgm_assert ((char*)STATE(skb)->tail > (char*)STATE(skb)->head); + tpdu_length = (char*)STATE(skb)->tail - (char*)STATE(skb)->head; + sent = pgm_sendto (sock, + !STATE(is_rate_limited), /* rate limited on blocking */ + FALSE, /* regular socket */ + STATE(skb)->head, + tpdu_length, + (struct sockaddr*)&sock->send_gsr.gsr_group, + pgm_sockaddr_len((struct sockaddr*)&sock->send_gsr.gsr_group)); + if (sent < 0 && (EAGAIN == errno || ENOBUFS == errno)) { + sock->is_apdu_eagain = TRUE; + sock->blocklen = tpdu_length; + goto blocked; + } + +/* save unfolded odata for retransmissions */ + pgm_txw_set_unfolded_checksum (STATE(skb), STATE(unfolded_odata)); + + if (PGM_LIKELY((size_t)sent == tpdu_length)) { + bytes_sent += tpdu_length + sock->iphdr_len; /* as counted at IP layer */ + packets_sent++; /* IP packets */ + data_bytes_sent += STATE(tsdu_length); + } + + pgm_free_skb (STATE(skb)); + STATE(data_bytes_offset) += STATE(tsdu_length); + +/* check for end of transmission group */ + if (sock->use_proactive_parity) { + const uint32_t odata_sqn = ntohl (STATE(skb)->pgm_data->data_sqn); + const uint32_t tg_sqn_mask = 0xffffffff << sock->tg_sqn_shift; + if (!((odata_sqn + 1) & ~tg_sqn_mask)) + pgm_schedule_proactive_nak (sock, odata_sqn & tg_sqn_mask); + } + + } +#ifdef TRANSPORT_DEBUG + if (is_one_apdu) + { + pgm_assert( STATE(data_bytes_offset) == STATE(apdu_length) ); + } +#endif + + sock->is_apdu_eagain = FALSE; + reset_heartbeat_spm (sock, STATE(skb)->tstamp); + + pgm_atomic_add32 (&sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT], bytes_sent); + sock->cumulative_stats[PGM_PC_SOURCE_DATA_MSGS_SENT] += packets_sent; + sock->cumulative_stats[PGM_PC_SOURCE_DATA_BYTES_SENT] += data_bytes_sent; + if (bytes_written) + *bytes_written = data_bytes_sent; + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + return PGM_IO_STATUS_NORMAL; + +blocked: + if (bytes_sent) { + reset_heartbeat_spm (sock, STATE(skb)->tstamp); + pgm_atomic_add32 (&sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT], bytes_sent); + sock->cumulative_stats[PGM_PC_SOURCE_DATA_MSGS_SENT] += packets_sent; + sock->cumulative_stats[PGM_PC_SOURCE_DATA_BYTES_SENT] += data_bytes_sent; + } + pgm_mutex_unlock (&sock->source_mutex); + pgm_rwlock_reader_unlock (&sock->lock); + if (EAGAIN == errno) { + if (sock->use_pgmcc) + pgm_notify_clear (&sock->ack_notify); + return PGM_IO_STATUS_WOULD_BLOCK; + } + return PGM_IO_STATUS_RATE_LIMITED; +} + +/* cleanup resuming send state helper + */ +#undef STATE + +/* send repair packet. + * + * on success, TRUE is returned. on error, FALSE is returned. + */ + +static +bool +send_rdata ( + pgm_sock_t* restrict sock, + struct pgm_sk_buff_t* restrict skb + ) +{ +/* pre-conditions */ + pgm_assert (NULL != sock); + pgm_assert (NULL != skb); + pgm_assert ((char*)skb->tail > (char*)skb->head); + + const size_t tpdu_length = (char*)skb->tail - (char*)skb->head; + +/* update previous odata/rdata contents */ + struct pgm_header* header = skb->pgm_header; + struct pgm_data* rdata = skb->pgm_data; + header->pgm_type = PGM_RDATA; +/* RDATA */ + rdata->data_trail = htonl (pgm_txw_trail(sock->window)); + + header->pgm_checksum = 0; + const size_t pgm_header_len = tpdu_length - ntohs(header->pgm_tsdu_length); + uint32_t unfolded_header = pgm_csum_partial (header, pgm_header_len, 0); + uint32_t unfolded_odata = pgm_csum_partial (skb->data, ntohs(header->pgm_tsdu_length), 0); + header->pgm_checksum = pgm_csum_fold (pgm_csum_block_add (unfolded_header, unfolded_odata, pgm_header_len)); + +/* congestion control */ +// if (sock->use_pgmcc && +// sock->tokens < pgm_fp8 (1)) +// { +// pgm_trace (PGM_LOG_ROLE_CONGESTION_CONTROL,_("Token limit reached.")); +// sock->blocklen = tpdu_length; +// return FALSE; +// } + + const ssize_t sent = pgm_sendto (sock, + sock->is_controlled_rdata, /* rate limited */ + TRUE, /* with router alert */ + header, + tpdu_length, + (struct sockaddr*)&sock->send_gsr.gsr_group, + pgm_sockaddr_len((struct sockaddr*)&sock->send_gsr.gsr_group)); + + if (sent < 0 && EAGAIN == errno) { + sock->blocklen = tpdu_length; + return FALSE; + } + + const pgm_time_t now = pgm_time_update_now(); + + if (sock->use_pgmcc) { + if (sock->tokens >= pgm_fp8 (1)) + sock->tokens -= pgm_fp8 (1); + sock->ack_expiry = now + sock->ack_expiry_ivl; + } + +/* re-set spm timer: we are already in the timer thread, no need to prod timers + */ + pgm_mutex_lock (&sock->timer_mutex); + sock->spm_heartbeat_state = 1; + sock->next_heartbeat_spm = now + sock->spm_heartbeat_interval[sock->spm_heartbeat_state++]; + pgm_mutex_unlock (&sock->timer_mutex); + + pgm_txw_inc_retransmit_count (skb); + sock->cumulative_stats[PGM_PC_SOURCE_SELECTIVE_BYTES_RETRANSMITTED] += ntohs(header->pgm_tsdu_length); + sock->cumulative_stats[PGM_PC_SOURCE_SELECTIVE_MSGS_RETRANSMITTED]++; /* impossible to determine APDU count */ + pgm_atomic_add32 (&sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT], tpdu_length + sock->iphdr_len); + return TRUE; +} + +/* eof */ |