summaryrefslogtreecommitdiffstats
path: root/3rdparty/openpgm-svn-r1135/pgm/rxw.c
diff options
context:
space:
mode:
Diffstat (limited to '3rdparty/openpgm-svn-r1135/pgm/rxw.c')
-rw-r--r--3rdparty/openpgm-svn-r1135/pgm/rxw.c2233
1 files changed, 2233 insertions, 0 deletions
diff --git a/3rdparty/openpgm-svn-r1135/pgm/rxw.c b/3rdparty/openpgm-svn-r1135/pgm/rxw.c
new file mode 100644
index 0000000..a6891d9
--- /dev/null
+++ b/3rdparty/openpgm-svn-r1135/pgm/rxw.c
@@ -0,0 +1,2233 @@
+/* vim:ts=8:sts=8:sw=4:noai:noexpandtab
+ *
+ * A basic receive window: pointer array implementation.
+ *
+ * Copyright (c) 2006-2010 Miru Limited.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#define __STDC_FORMAT_MACROS
+#ifdef _MSC_VER
+# include <pgm/wininttypes.h>
+#else
+# include <inttypes.h>
+#endif
+#include <impl/i18n.h>
+#include <impl/framework.h>
+#include <impl/rxw.h>
+
+
+//#define RXW_DEBUG
+
+#ifndef RXW_DEBUG
+# define PGM_DISABLE_ASSERT
+#endif
+
+
+/* testing function: is TSI null
+ *
+ * returns TRUE if null, returns FALSE if not null.
+ */
+
+static inline
+bool
+_pgm_tsi_is_null (
+ const void*const tsi
+ )
+{
+ const union {
+ pgm_tsi_t tsi;
+ uint32_t l[2];
+ } *u = tsi;
+
+/* pre-conditions */
+ pgm_assert (NULL != tsi);
+
+ return (0 == u->l[0] && 0 == u->l[1]);
+}
+
+/* sequence state must be smaller than PGM skbuff control buffer */
+PGM_STATIC_ASSERT(sizeof(struct pgm_rxw_state_t) <= sizeof(((struct pgm_sk_buff_t*)0)->cb));
+
+static void _pgm_rxw_define (pgm_rxw_t*const, const uint32_t);
+static void _pgm_rxw_update_trail (pgm_rxw_t*const, const uint32_t);
+static inline uint32_t _pgm_rxw_update_lead (pgm_rxw_t*const, const uint32_t, const pgm_time_t, const pgm_time_t);
+static inline uint32_t _pgm_rxw_tg_sqn (pgm_rxw_t*const, const uint32_t);
+static inline uint32_t _pgm_rxw_pkt_sqn (pgm_rxw_t*const, const uint32_t);
+static inline bool _pgm_rxw_is_first_of_tg_sqn (pgm_rxw_t*const, const uint32_t);
+static inline bool _pgm_rxw_is_last_of_tg_sqn (pgm_rxw_t*const, const uint32_t);
+static int _pgm_rxw_insert (pgm_rxw_t*const restrict, struct pgm_sk_buff_t*const restrict);
+static int _pgm_rxw_append (pgm_rxw_t*const restrict, struct pgm_sk_buff_t*const restrict, const pgm_time_t);
+static int _pgm_rxw_add_placeholder_range (pgm_rxw_t*const, const uint32_t, const pgm_time_t, const pgm_time_t);
+static void _pgm_rxw_unlink (pgm_rxw_t*const restrict, struct pgm_sk_buff_t*const restrict);
+static uint32_t _pgm_rxw_remove_trail (pgm_rxw_t*const);
+static void _pgm_rxw_state (pgm_rxw_t*const restrict, struct pgm_sk_buff_t*const restrict, const int);
+static inline void _pgm_rxw_shuffle_parity (pgm_rxw_t*const restrict, struct pgm_sk_buff_t*const restrict);
+static inline ssize_t _pgm_rxw_incoming_read (pgm_rxw_t*const restrict, struct pgm_msgv_t**restrict, uint32_t);
+static bool _pgm_rxw_is_apdu_complete (pgm_rxw_t*const restrict, const uint32_t);
+static inline ssize_t _pgm_rxw_incoming_read_apdu (pgm_rxw_t*const restrict, struct pgm_msgv_t**restrict);
+static inline int _pgm_rxw_recovery_update (pgm_rxw_t*const, const uint32_t, const pgm_time_t);
+static inline int _pgm_rxw_recovery_append (pgm_rxw_t*const, const pgm_time_t, const pgm_time_t);
+
+
+/* returns the pointer at the given index of the window.
+ */
+
+static
+struct pgm_sk_buff_t*
+_pgm_rxw_peek (
+ const pgm_rxw_t* const window,
+ const uint32_t sequence
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+ if (pgm_rxw_is_empty (window))
+ return NULL;
+
+ if (pgm_uint32_gte (sequence, window->trail) && pgm_uint32_lte (sequence, window->lead))
+ {
+ const uint_fast32_t index_ = sequence % pgm_rxw_max_length (window);
+ struct pgm_sk_buff_t* skb = window->pdata[index_];
+/* availability only guaranteed inside commit window */
+ if (pgm_uint32_lt (sequence, window->commit_lead)) {
+ pgm_assert (NULL != skb);
+ pgm_assert (pgm_skb_is_valid (skb));
+ pgm_assert (!_pgm_tsi_is_null (&skb->tsi));
+ }
+ return skb;
+ }
+
+ return NULL;
+}
+
+/* sections of the receive window:
+ *
+ * | Commit | Incoming |
+ * |<---------------->|<------------>|
+ * | | |
+ * trail commit-lead lead
+ *
+ * commit buffers are currently held by the application, the window trail
+ * cannot be advanced if packets remain in the commit buffer.
+ *
+ * incoming buffers are waiting to be passed to the application.
+ */
+
+static inline
+uint32_t
+_pgm_rxw_commit_length (
+ const pgm_rxw_t* const window
+ )
+{
+ pgm_assert (NULL != window);
+ return window->commit_lead - window->trail;
+}
+
+static inline
+bool
+_pgm_rxw_commit_is_empty (
+ const pgm_rxw_t* const window
+ )
+{
+ pgm_assert (NULL != window);
+ return (_pgm_rxw_commit_length (window) == 0);
+}
+
+static inline
+uint32_t
+_pgm_rxw_incoming_length (
+ const pgm_rxw_t* const window
+ )
+{
+ pgm_assert (NULL != window);
+ return ( 1 + window->lead ) - window->commit_lead;
+}
+
+static inline
+bool
+_pgm_rxw_incoming_is_empty (
+ const pgm_rxw_t* const window
+ )
+{
+ pgm_assert (NULL != window);
+ return (_pgm_rxw_incoming_length (window) == 0);
+}
+
+/* constructor for receive window. zero-length windows are not permitted.
+ *
+ * returns pointer to window.
+ */
+
+pgm_rxw_t*
+pgm_rxw_create (
+ const pgm_tsi_t*const tsi,
+ const uint16_t tpdu_size,
+ const unsigned sqns, /* transmit window size in sequence numbers */
+ const unsigned secs, /* size in seconds */
+ const ssize_t max_rte, /* max bandwidth */
+ const uint32_t ack_c_p
+ )
+{
+ pgm_rxw_t* window;
+
+/* pre-conditions */
+ pgm_assert (NULL != tsi);
+ pgm_assert_cmpuint (tpdu_size, >, 0);
+ if (sqns) {
+ pgm_assert_cmpuint (sqns, >, 0);
+ pgm_assert_cmpuint (sqns & PGM_UINT32_SIGN_BIT, ==, 0);
+ pgm_assert_cmpuint (secs, ==, 0);
+ pgm_assert_cmpuint (max_rte, ==, 0);
+ } else {
+ pgm_assert_cmpuint (secs, >, 0);
+ pgm_assert_cmpuint (max_rte, >, 0);
+ }
+
+ pgm_debug ("create (tsi:%s max-tpdu:%" PRIu16 " sqns:%" PRIu32 " secs %u max-rte %zd ack-c_p %" PRIu32 ")",
+ pgm_tsi_print (tsi), tpdu_size, sqns, secs, max_rte, ack_c_p);
+
+/* calculate receive window parameters */
+ pgm_assert (sqns || (secs && max_rte));
+ const unsigned alloc_sqns = sqns ? sqns : ( (secs * max_rte) / tpdu_size );
+ window = pgm_malloc0 (sizeof(pgm_rxw_t) + ( alloc_sqns * sizeof(struct pgm_sk_buff_t*) ));
+
+ window->tsi = tsi;
+ window->max_tpdu = tpdu_size;
+
+/* empty state:
+ *
+ * trail = 0, lead = -1
+ * commit_trail = commit_lead = rxw_trail = rxw_trail_init = 0
+ */
+ window->lead = -1;
+ window->trail = window->lead + 1;
+
+/* limit retransmit requests on late session joining */
+ window->is_constrained = TRUE;
+
+/* minimum value of RS::k = 1 */
+ window->tg_size = 1;
+
+/* PGMCC filter weight */
+ window->ack_c_p = pgm_fp16 (ack_c_p);
+ window->bitmap = 0xffffffff;
+
+/* pointer array */
+ window->alloc = alloc_sqns;
+
+/* post-conditions */
+ pgm_assert_cmpuint (pgm_rxw_max_length (window), ==, alloc_sqns);
+ pgm_assert_cmpuint (pgm_rxw_length (window), ==, 0);
+ pgm_assert_cmpuint (pgm_rxw_size (window), ==, 0);
+ pgm_assert (pgm_rxw_is_empty (window));
+ pgm_assert (!pgm_rxw_is_full (window));
+
+ return window;
+}
+
+/* destructor for receive window. must not be called more than once for same window.
+ */
+
+void
+pgm_rxw_destroy (
+ pgm_rxw_t* const window
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert_cmpuint (window->alloc, >, 0);
+
+ pgm_debug ("destroy (window:%p)", (const void*)window);
+
+/* contents of window */
+ while (!pgm_rxw_is_empty (window)) {
+ _pgm_rxw_remove_trail (window);
+ }
+
+/* window must now be empty */
+ pgm_assert_cmpuint (pgm_rxw_length (window), ==, 0);
+ pgm_assert_cmpuint (pgm_rxw_size (window), ==, 0);
+ pgm_assert (pgm_rxw_is_empty (window));
+ pgm_assert (!pgm_rxw_is_full (window));
+
+/* window */
+ pgm_free (window);
+}
+
+/* add skb to receive window. window has fixed size and will not grow.
+ * PGM skbuff data/tail pointers must point to the PGM payload, and hence skb->len
+ * is allowed to be zero.
+ *
+ * if the skb sequence number indicates lost packets placeholders will be defined
+ * for each missing entry in the window.
+ *
+ * side effects:
+ *
+ * 1) sequence number is set in skb from PGM header value.
+ * 2) window may be updated with new skb.
+ * 3) placeholders may be created for detected lost packets.
+ * 4) parity skbs may be shuffled to accomodate original data.
+ *
+ * returns:
+ * PGM_RXW_INSERTED - packet filled a waiting placeholder, skb consumed.
+ * PGM_RXW_APPENDED - packet advanced window lead, skb consumed.
+ * PGM_RXW_MISSING - missing packets detected whilst window lead was adanced, skb consumed.
+ * PGM_RXW_DUPLICATE - re-transmission of previously seen packet.
+ * PGM_RXW_MALFORMED - corrupted or invalid packet.
+ * PGM_RXW_BOUNDS - packet out of window.
+ *
+ * it is an error to try to free the skb after adding to the window.
+ */
+
+int
+pgm_rxw_add (
+ pgm_rxw_t* const restrict window,
+ struct pgm_sk_buff_t* const restrict skb,
+ const pgm_time_t now,
+ const pgm_time_t nak_rb_expiry /* calculated expiry time for this skb */
+ )
+{
+ pgm_rxw_state_t* const state = (pgm_rxw_state_t*)&skb->cb;
+ int status;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (NULL != skb);
+ pgm_assert_cmpuint (nak_rb_expiry, >, 0);
+ pgm_assert_cmpuint (pgm_rxw_max_length (window), >, 0);
+ pgm_assert (pgm_skb_is_valid (skb));
+ pgm_assert (((const pgm_list_t*)skb)->next == NULL);
+ pgm_assert (((const pgm_list_t*)skb)->prev == NULL);
+ pgm_assert (!_pgm_tsi_is_null (&skb->tsi));
+ pgm_assert ((char*)skb->data > (char*)skb->head);
+ pgm_assert (sizeof(struct pgm_header) + sizeof(struct pgm_data) <= (size_t)((char*)skb->data - (char*)skb->head));
+ pgm_assert (skb->len == ((char*)skb->tail - (char*)skb->data));
+
+ pgm_debug ("add (window:%p skb:%p nak_rb_expiry:%" PGM_TIME_FORMAT ")",
+ (const void*)window, (const void*)skb, nak_rb_expiry);
+
+ skb->sequence = ntohl (skb->pgm_data->data_sqn);
+
+/* protocol sanity check: tsdu size */
+ if (PGM_UNLIKELY(skb->len != ntohs (skb->pgm_header->pgm_tsdu_length)))
+ return PGM_RXW_MALFORMED;
+
+/* protocol sanity check: valid trail pointer wrt. sequence */
+ if (PGM_UNLIKELY(skb->sequence - ntohl (skb->pgm_data->data_trail) >= ((UINT32_MAX/2)-1)))
+ return PGM_RXW_MALFORMED;
+
+/* verify fragment header for original data, parity packets include a
+ * parity fragment header
+ */
+ if (!(skb->pgm_header->pgm_options & PGM_OPT_PARITY) &&
+ skb->pgm_opt_fragment)
+ {
+/* protocol sanity check: single fragment APDU */
+ if (PGM_UNLIKELY(ntohl (skb->of_apdu_len) == skb->len))
+ skb->pgm_opt_fragment = NULL;
+
+/* protocol sanity check: minimum APDU length */
+ if (PGM_UNLIKELY(ntohl (skb->of_apdu_len) < skb->len))
+ return PGM_RXW_MALFORMED;
+
+/* protocol sanity check: sequential ordering */
+ if (PGM_UNLIKELY(pgm_uint32_gt (ntohl (skb->of_apdu_first_sqn), skb->sequence)))
+ return PGM_RXW_MALFORMED;
+
+/* protocol sanity check: maximum APDU length */
+ if (PGM_UNLIKELY(ntohl (skb->of_apdu_len) > PGM_MAX_APDU))
+ return PGM_RXW_MALFORMED;
+ }
+
+/* first packet of a session defines the window */
+ if (PGM_UNLIKELY(!window->is_defined))
+ _pgm_rxw_define (window, skb->sequence - 1); /* previous_lead needed for append to occur */
+ else
+ _pgm_rxw_update_trail (window, ntohl (skb->pgm_data->data_trail));
+
+/* bounds checking for parity data occurs at the transmission group sequence number */
+ if (skb->pgm_header->pgm_options & PGM_OPT_PARITY)
+ {
+ if (pgm_uint32_lt (_pgm_rxw_tg_sqn (window, skb->sequence), _pgm_rxw_tg_sqn (window, window->commit_lead)))
+ return PGM_RXW_DUPLICATE;
+
+ if (pgm_uint32_lt (_pgm_rxw_tg_sqn (window, skb->sequence), _pgm_rxw_tg_sqn (window, window->lead))) {
+ window->has_event = 1;
+ return _pgm_rxw_insert (window, skb);
+ }
+
+ const struct pgm_sk_buff_t* const first_skb = _pgm_rxw_peek (window, _pgm_rxw_tg_sqn (window, skb->sequence));
+ const pgm_rxw_state_t* const first_state = (pgm_rxw_state_t*)&first_skb->cb;
+
+ if (_pgm_rxw_tg_sqn (window, skb->sequence) == _pgm_rxw_tg_sqn (window, window->lead)) {
+ window->has_event = 1;
+ if (NULL == first_state || first_state->is_contiguous) {
+ state->is_contiguous = 1;
+ return _pgm_rxw_append (window, skb, now);
+ } else
+ return _pgm_rxw_insert (window, skb);
+ }
+
+ pgm_assert (NULL != first_state);
+ status = _pgm_rxw_add_placeholder_range (window, _pgm_rxw_tg_sqn (window, skb->sequence), now, nak_rb_expiry);
+ }
+ else
+ {
+ if (pgm_uint32_lt (skb->sequence, window->commit_lead)) {
+ if (pgm_uint32_gte (skb->sequence, window->trail))
+ return PGM_RXW_DUPLICATE;
+ else
+ return PGM_RXW_BOUNDS;
+ }
+
+ if (pgm_uint32_lte (skb->sequence, window->lead)) {
+ window->has_event = 1;
+ return _pgm_rxw_insert (window, skb);
+ }
+
+ if (skb->sequence == pgm_rxw_next_lead (window)) {
+ window->has_event = 1;
+ if (_pgm_rxw_is_first_of_tg_sqn (window, skb->sequence))
+ state->is_contiguous = 1;
+ return _pgm_rxw_append (window, skb, now);
+ }
+
+ status = _pgm_rxw_add_placeholder_range (window, skb->sequence, now, nak_rb_expiry);
+ }
+
+ if (PGM_RXW_APPENDED == status) {
+ status = _pgm_rxw_append (window, skb, now);
+ if (PGM_RXW_APPENDED == status)
+ status = PGM_RXW_MISSING;
+ }
+ return status;
+}
+
+/* trail is the next packet to commit upstream, lead is the leading edge
+ * of the receive window with possible gaps inside, rxw_trail is the transmit
+ * window trail for retransmit requests.
+ */
+
+/* define window by parameters of first data packet.
+ */
+
+static
+void
+_pgm_rxw_define (
+ pgm_rxw_t* const window,
+ const uint32_t lead
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (pgm_rxw_is_empty (window));
+ pgm_assert (_pgm_rxw_commit_is_empty (window));
+ pgm_assert (_pgm_rxw_incoming_is_empty (window));
+ pgm_assert (!window->is_defined);
+
+ window->lead = lead;
+ window->commit_lead = window->rxw_trail = window->rxw_trail_init = window->trail = window->lead + 1;
+ window->is_constrained = window->is_defined = TRUE;
+
+/* post-conditions */
+ pgm_assert (pgm_rxw_is_empty (window));
+ pgm_assert (_pgm_rxw_commit_is_empty (window));
+ pgm_assert (_pgm_rxw_incoming_is_empty (window));
+ pgm_assert (window->is_defined);
+ pgm_assert (window->is_constrained);
+}
+
+/* update window with latest transmitted parameters.
+ *
+ * returns count of placeholders added into window, used to start sending naks.
+ */
+
+unsigned
+pgm_rxw_update (
+ pgm_rxw_t* const window,
+ const uint32_t txw_lead,
+ const uint32_t txw_trail,
+ const pgm_time_t now,
+ const pgm_time_t nak_rb_expiry /* packet expiration time */
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert_cmpuint (nak_rb_expiry, >, 0);
+
+ pgm_debug ("pgm_rxw_update (window:%p txw-lead:%" PRIu32 " txw-trail:%" PRIu32 " nak-rb-expiry:%" PGM_TIME_FORMAT ")",
+ (void*)window, txw_lead, txw_trail, nak_rb_expiry);
+
+ if (PGM_UNLIKELY(!window->is_defined)) {
+ _pgm_rxw_define (window, txw_lead);
+ return 0;
+ }
+
+ _pgm_rxw_update_trail (window, txw_trail);
+ return _pgm_rxw_update_lead (window, txw_lead, now, nak_rb_expiry);
+}
+
+/* update trailing edge of receive window
+ */
+
+static
+void
+_pgm_rxw_update_trail (
+ pgm_rxw_t* const window,
+ const uint32_t txw_trail
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+/* advertised trail is less than the current value */
+ if (PGM_UNLIKELY(pgm_uint32_lte (txw_trail, window->rxw_trail)))
+ return;
+
+/* protocol sanity check: advertised trail jumps too far ahead */
+ if (PGM_UNLIKELY(txw_trail - window->rxw_trail > ((UINT32_MAX/2)-1)))
+ return;
+
+/* retransmissions requests are constrained on startup until the advertised trail advances
+ * beyond the first data sequence number.
+ */
+ if (PGM_UNLIKELY(window->is_constrained))
+ {
+ if (pgm_uint32_gt (txw_trail, window->rxw_trail_init))
+ window->is_constrained = FALSE;
+ else
+ return;
+ }
+
+ window->rxw_trail = txw_trail;
+
+/* new value doesn't affect window */
+ if (PGM_UNLIKELY(pgm_uint32_lte (window->rxw_trail, window->trail)))
+ return;
+
+/* jump remaining sequence numbers if window is empty */
+ if (pgm_rxw_is_empty (window))
+ {
+ const uint32_t distance = (int32_t)(window->rxw_trail) - (int32_t)(window->trail);
+ window->commit_lead = window->trail += distance;
+ window->lead += distance;
+
+/* add loss to bitmap */
+ if (distance > 32) window->bitmap = 0;
+ else window->bitmap <<= distance;
+
+/* update the Exponential Moving Average (EMA) data loss with long jump:
+ * s_t = α × (p₁ + (1 - α) × p₂ + (1 - α)² × p₃ + ⋯)
+ * omitting the weight by stopping after k terms,
+ * = α × ((1 - α)^^k + (1 - α)^^{k+1} +(1 - α)^^{k+1} + ⋯)
+ * = α × (1 - α)^^k × (1 + (1 - α) + (1 - α)² + ⋯)
+ * = (1 - α)^^k
+ */
+ window->data_loss = pgm_fp16mul (window->data_loss, pgm_fp16pow (pgm_fp16 (1) - window->ack_c_p, distance));
+
+ window->cumulative_losses += distance;
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Data loss due to trailing edge update, fragment count %" PRIu32 "."),window->fragment_count);
+ pgm_assert (pgm_rxw_is_empty (window));
+ pgm_assert (_pgm_rxw_commit_is_empty (window));
+ pgm_assert (_pgm_rxw_incoming_is_empty (window));
+ return;
+ }
+
+/* remove all buffers between commit lead and advertised rxw_trail */
+ for (uint32_t sequence = window->commit_lead;
+ pgm_uint32_gt (window->rxw_trail, sequence) && pgm_uint32_gte (window->lead, sequence);
+ sequence++)
+ {
+ struct pgm_sk_buff_t* skb;
+ pgm_rxw_state_t* state;
+
+ skb = _pgm_rxw_peek (window, sequence);
+ pgm_assert (NULL != skb);
+ state = (pgm_rxw_state_t*)&skb->cb;
+
+ switch (state->pkt_state) {
+ case PGM_PKT_STATE_HAVE_DATA:
+ case PGM_PKT_STATE_HAVE_PARITY:
+ case PGM_PKT_STATE_LOST_DATA:
+ break;
+
+ case PGM_PKT_STATE_ERROR:
+ pgm_assert_not_reached();
+
+ default:
+ pgm_rxw_lost (window, sequence);
+ break;
+ }
+ }
+
+/* post-conditions: only after flush */
+// pgm_assert (!pgm_rxw_is_full (window));
+}
+
+/* update FEC parameters
+ */
+
+void
+pgm_rxw_update_fec (
+ pgm_rxw_t* const window,
+ const uint8_t rs_k
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert_cmpuint (rs_k, >, 1);
+
+ pgm_debug ("pgm_rxw_update_fec (window:%p rs(k):%u)",
+ (void*)window, rs_k);
+
+ if (window->is_fec_available) {
+ if (rs_k == window->rs.k) return;
+ pgm_rs_destroy (&window->rs);
+ } else
+ window->is_fec_available = 1;
+ pgm_rs_create (&window->rs, PGM_RS_DEFAULT_N, rs_k);
+ window->tg_sqn_shift = pgm_power2_log2 (rs_k);
+ window->tg_size = window->rs.k;
+}
+
+/* add one placeholder to leading edge due to detected lost packet.
+ */
+
+static
+void
+_pgm_rxw_add_placeholder (
+ pgm_rxw_t* const window,
+ const pgm_time_t now,
+ const pgm_time_t nak_rb_expiry
+ )
+{
+ struct pgm_sk_buff_t* skb;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (!pgm_rxw_is_full (window));
+
+/* advance lead */
+ window->lead++;
+
+/* add loss to bitmap */
+ window->bitmap <<= 1;
+
+/* update the Exponential Moving Average (EMA) data loss with loss:
+ * s_t = α × x_{t-1} + (1 - α) × s_{t-1}
+ * x_{t-1} = 1
+ * ∴ s_t = α + (1 - α) × s_{t-1}
+ */
+ window->data_loss = window->ack_c_p + pgm_fp16mul ((pgm_fp16 (1) - window->ack_c_p), window->data_loss);
+
+ skb = pgm_alloc_skb (window->max_tpdu);
+ pgm_rxw_state_t* state = (pgm_rxw_state_t*)&skb->cb;
+ skb->tstamp = now;
+ skb->sequence = window->lead;
+ state->timer_expiry = nak_rb_expiry;
+
+ if (!_pgm_rxw_is_first_of_tg_sqn (window, skb->sequence))
+ {
+ struct pgm_sk_buff_t* first_skb = _pgm_rxw_peek (window, _pgm_rxw_tg_sqn (window, skb->sequence));
+ if (first_skb) {
+ pgm_rxw_state_t* first_state = (pgm_rxw_state_t*)&first_skb->cb;
+ first_state->is_contiguous = 0;
+ }
+ }
+
+/* add skb to window */
+ const uint_fast32_t index_ = skb->sequence % pgm_rxw_max_length (window);
+ window->pdata[index_] = skb;
+
+ pgm_rxw_state (window, skb, PGM_PKT_STATE_BACK_OFF);
+
+/* post-conditions */
+ pgm_assert_cmpuint (pgm_rxw_length (window), >, 0);
+ pgm_assert_cmpuint (pgm_rxw_length (window), <=, pgm_rxw_max_length (window));
+ pgm_assert_cmpuint (_pgm_rxw_incoming_length (window), >, 0);
+}
+
+/* add a range of placeholders to the window.
+ */
+
+static
+int
+_pgm_rxw_add_placeholder_range (
+ pgm_rxw_t* const window,
+ const uint32_t sequence,
+ const pgm_time_t now,
+ const pgm_time_t nak_rb_expiry
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (pgm_uint32_gt (sequence, pgm_rxw_lead (window)));
+
+/* check bounds of commit window */
+ const uint32_t new_commit_sqns = ( 1 + sequence ) - window->trail;
+ if ( !_pgm_rxw_commit_is_empty (window) &&
+ (new_commit_sqns >= pgm_rxw_max_length (window)) )
+ {
+ _pgm_rxw_update_lead (window, sequence, now, nak_rb_expiry);
+ return PGM_RXW_BOUNDS; /* effectively a slow consumer */
+ }
+
+ if (pgm_rxw_is_full (window)) {
+ pgm_assert (_pgm_rxw_commit_is_empty (window));
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Receive window full on placeholder sequence."));
+ _pgm_rxw_remove_trail (window);
+ }
+
+/* if packet is non-contiguous to current leading edge add place holders
+ * TODO: can be rather inefficient on packet loss looping through dropped sequence numbers
+ */
+ while (pgm_rxw_next_lead (window) != sequence)
+ {
+ _pgm_rxw_add_placeholder (window, now, nak_rb_expiry);
+ if (pgm_rxw_is_full (window)) {
+ pgm_assert (_pgm_rxw_commit_is_empty (window));
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Receive window full on placeholder sequence."));
+ _pgm_rxw_remove_trail (window);
+ }
+ }
+
+/* post-conditions */
+ pgm_assert (!pgm_rxw_is_full (window));
+
+ return PGM_RXW_APPENDED;
+}
+
+/* update leading edge of receive window.
+ *
+ * returns number of place holders added.
+ */
+
+static
+unsigned
+_pgm_rxw_update_lead (
+ pgm_rxw_t* const window,
+ const uint32_t txw_lead,
+ const pgm_time_t now,
+ const pgm_time_t nak_rb_expiry
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+/* advertised lead is less than the current value */
+ if (PGM_UNLIKELY(pgm_uint32_lte (txw_lead, window->lead)))
+ return 0;
+
+ uint32_t lead;
+
+/* committed packets limit constrain the lead until they are released */
+ if (!_pgm_rxw_commit_is_empty (window) &&
+ (txw_lead - window->trail) >= pgm_rxw_max_length (window))
+ {
+ lead = window->trail + pgm_rxw_max_length (window) - 1;
+ if (lead == window->lead)
+ return 0;
+ }
+ else
+ lead = txw_lead;
+
+ unsigned lost = 0;
+
+ while (window->lead != lead)
+ {
+/* slow consumer or fast producer */
+ if (pgm_rxw_is_full (window)) {
+ pgm_assert (_pgm_rxw_commit_is_empty (window));
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Receive window full on window lead advancement."));
+ _pgm_rxw_remove_trail (window);
+ }
+ _pgm_rxw_add_placeholder (window, now, nak_rb_expiry);
+ lost++;
+ }
+
+ return lost;
+}
+
+/* checks whether an APDU is unrecoverable due to lost TPDUs.
+ */
+static inline
+bool
+_pgm_rxw_is_apdu_lost (
+ pgm_rxw_t* const restrict window,
+ struct pgm_sk_buff_t* const restrict skb
+ )
+{
+ const pgm_rxw_state_t* state = (pgm_rxw_state_t*)&skb->cb;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (NULL != skb);
+
+/* lost is lost */
+ if (PGM_PKT_STATE_LOST_DATA == state->pkt_state)
+ return TRUE;
+
+/* by definition, a single-TPDU APDU is complete */
+ if (!skb->pgm_opt_fragment)
+ return FALSE;
+
+ const uint32_t apdu_first_sqn = ntohl (skb->of_apdu_first_sqn);
+
+/* by definition, first fragment indicates APDU is available */
+ if (apdu_first_sqn == skb->sequence)
+ return FALSE;
+
+ const struct pgm_sk_buff_t* const first_skb = _pgm_rxw_peek (window, apdu_first_sqn);
+/* first fragment out-of-bounds */
+ if (NULL == first_skb)
+ return TRUE;
+
+ const pgm_rxw_state_t* first_state = (pgm_rxw_state_t*)&first_skb->cb;
+ if (PGM_PKT_STATE_LOST_DATA == first_state->pkt_state)
+ return TRUE;
+
+ return FALSE;
+}
+
+/* return the first missing packet sequence in the specified transmission
+ * group or NULL if not required.
+ */
+
+static inline
+struct pgm_sk_buff_t*
+_pgm_rxw_find_missing (
+ pgm_rxw_t* const window,
+ const uint32_t tg_sqn /* tg_sqn | pkt_sqn */
+ )
+{
+ struct pgm_sk_buff_t* skb;
+ pgm_rxw_state_t* state;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+ for (uint32_t i = tg_sqn, j = 0; j < window->tg_size; i++, j++)
+ {
+ skb = _pgm_rxw_peek (window, i);
+ pgm_assert (NULL != skb);
+ state = (pgm_rxw_state_t*)&skb->cb;
+ switch (state->pkt_state) {
+ case PGM_PKT_STATE_BACK_OFF:
+ case PGM_PKT_STATE_WAIT_NCF:
+ case PGM_PKT_STATE_WAIT_DATA:
+ case PGM_PKT_STATE_LOST_DATA:
+ return skb;
+
+ case PGM_PKT_STATE_HAVE_DATA:
+ case PGM_PKT_STATE_HAVE_PARITY:
+ break;
+
+ default: pgm_assert_not_reached(); break;
+ }
+ }
+
+ return NULL;
+}
+
+/* returns TRUE if skb is a parity packet with packet length not
+ * matching the transmission group length without the variable-packet-length
+ * flag set.
+ */
+
+static inline
+bool
+_pgm_rxw_is_invalid_var_pktlen (
+ pgm_rxw_t* const restrict window,
+ const struct pgm_sk_buff_t* const restrict skb
+ )
+{
+ const struct pgm_sk_buff_t* first_skb;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+ if (!window->is_fec_available)
+ return FALSE;
+
+ if (skb->pgm_header->pgm_options & PGM_OPT_VAR_PKTLEN)
+ return FALSE;
+
+ const uint32_t tg_sqn = _pgm_rxw_tg_sqn (window, skb->sequence);
+ if (tg_sqn == skb->sequence)
+ return FALSE;
+
+ first_skb = _pgm_rxw_peek (window, tg_sqn);
+ if (NULL == first_skb)
+ return TRUE; /* transmission group unrecoverable */
+
+ if (first_skb->len == skb->len)
+ return FALSE;
+
+ return TRUE;
+}
+
+static inline
+bool
+_pgm_rxw_has_payload_op (
+ const struct pgm_sk_buff_t* const skb
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != skb);
+ pgm_assert (NULL != skb->pgm_header);
+
+ return skb->pgm_opt_fragment || skb->pgm_header->pgm_options & PGM_OP_ENCODED;
+}
+
+/* returns TRUE is skb options are invalid when compared to the transmission group
+ */
+
+static inline
+bool
+_pgm_rxw_is_invalid_payload_op (
+ pgm_rxw_t* const restrict window,
+ const struct pgm_sk_buff_t* const restrict skb
+ )
+{
+ const struct pgm_sk_buff_t* first_skb;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (NULL != skb);
+
+ if (!window->is_fec_available)
+ return FALSE;
+
+ const uint32_t tg_sqn = _pgm_rxw_tg_sqn (window, skb->sequence);
+ if (tg_sqn == skb->sequence)
+ return FALSE;
+
+ first_skb = _pgm_rxw_peek (window, tg_sqn);
+ if (NULL == first_skb)
+ return TRUE; /* transmission group unrecoverable */
+
+ if (_pgm_rxw_has_payload_op (first_skb) == _pgm_rxw_has_payload_op (skb))
+ return FALSE;
+
+ return TRUE;
+}
+
+/* insert skb into window range, discard if duplicate. window will have placeholder,
+ * parity, or data packet already matching sequence.
+ *
+ * returns:
+ * PGM_RXW_INSERTED - packet filled a waiting placeholder, skb consumed.
+ * PGM_RXW_DUPLICATE - re-transmission of previously seen packet.
+ * PGM_RXW_MALFORMED - corrupted or invalid packet.
+ * PGM_RXW_BOUNDS - packet out of window.
+ */
+
+static
+int
+_pgm_rxw_insert (
+ pgm_rxw_t* const restrict window,
+ struct pgm_sk_buff_t* const restrict new_skb
+ )
+{
+ struct pgm_sk_buff_t* skb;
+ pgm_rxw_state_t* state;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (NULL != new_skb);
+ pgm_assert (!_pgm_rxw_incoming_is_empty (window));
+
+ if (PGM_UNLIKELY(_pgm_rxw_is_invalid_var_pktlen (window, new_skb) ||
+ _pgm_rxw_is_invalid_payload_op (window, new_skb)))
+ return PGM_RXW_MALFORMED;
+
+ if (new_skb->pgm_header->pgm_options & PGM_OPT_PARITY)
+ {
+ skb = _pgm_rxw_find_missing (window, new_skb->sequence);
+ if (NULL == skb)
+ return PGM_RXW_DUPLICATE;
+ state = (pgm_rxw_state_t*)&skb->cb;
+ }
+ else
+ {
+ skb = _pgm_rxw_peek (window, new_skb->sequence);
+ pgm_assert (NULL != skb);
+ state = (pgm_rxw_state_t*)&skb->cb;
+
+ if (state->pkt_state == PGM_PKT_STATE_HAVE_DATA)
+ return PGM_RXW_DUPLICATE;
+ }
+
+/* APDU fragments are already declared lost */
+ if (new_skb->pgm_opt_fragment &&
+ _pgm_rxw_is_apdu_lost (window, new_skb))
+ {
+ pgm_rxw_lost (window, skb->sequence);
+ return PGM_RXW_BOUNDS;
+ }
+
+/* verify placeholder state */
+ switch (state->pkt_state) {
+ case PGM_PKT_STATE_BACK_OFF:
+ case PGM_PKT_STATE_WAIT_NCF:
+ case PGM_PKT_STATE_WAIT_DATA:
+ case PGM_PKT_STATE_LOST_DATA:
+ break;
+
+ case PGM_PKT_STATE_HAVE_PARITY:
+ _pgm_rxw_shuffle_parity (window, skb);
+ break;
+
+ default: pgm_assert_not_reached(); break;
+ }
+
+/* statistics */
+ const pgm_time_t fill_time = new_skb->tstamp - skb->tstamp;
+ PGM_HISTOGRAM_TIMES("Rx.RepairTime", fill_time);
+ PGM_HISTOGRAM_COUNTS("Rx.NakTransmits", state->nak_transmit_count);
+ PGM_HISTOGRAM_COUNTS("Rx.NcfRetries", state->ncf_retry_count);
+ PGM_HISTOGRAM_COUNTS("Rx.DataRetries", state->data_retry_count);
+ if (!window->max_fill_time) {
+ window->max_fill_time = window->min_fill_time = fill_time;
+ }
+ else
+ {
+ if (fill_time > window->max_fill_time)
+ window->max_fill_time = fill_time;
+ else if (fill_time < window->min_fill_time)
+ window->min_fill_time = fill_time;
+
+ if (!window->max_nak_transmit_count) {
+ window->max_nak_transmit_count = window->min_nak_transmit_count = state->nak_transmit_count;
+ } else {
+ if (state->nak_transmit_count > window->max_nak_transmit_count)
+ window->max_nak_transmit_count = state->nak_transmit_count;
+ else if (state->nak_transmit_count < window->min_nak_transmit_count)
+ window->min_nak_transmit_count = state->nak_transmit_count;
+ }
+ }
+
+/* add packet to bitmap */
+ const uint_fast32_t pos = window->lead - new_skb->sequence;
+ if (pos < 32) {
+ window->bitmap |= 1 << pos;
+ }
+
+/* update the Exponential Moving Average (EMA) data loss with repair data.
+ * s_t = α × x_{t-1} + (1 - α) × s_{t-1}
+ * x_{t-1} = 0
+ * ∴ s_t = (1 - α) × s_{t-1}
+ */
+ const uint_fast32_t s = pgm_fp16pow (pgm_fp16 (1) - window->ack_c_p, pos);
+ if (s > window->data_loss) window->data_loss = 0;
+ else window->data_loss -= s;
+
+/* replace place holder skb with incoming skb */
+ memcpy (new_skb->cb, skb->cb, sizeof(skb->cb));
+ pgm_rxw_state_t* rxw_state = (void*)new_skb->cb;
+ rxw_state->pkt_state = PGM_PKT_STATE_ERROR;
+ _pgm_rxw_unlink (window, skb);
+ pgm_free_skb (skb);
+ const uint_fast32_t index_ = new_skb->sequence % pgm_rxw_max_length (window);
+ window->pdata[index_] = new_skb;
+ if (new_skb->pgm_header->pgm_options & PGM_OPT_PARITY)
+ _pgm_rxw_state (window, new_skb, PGM_PKT_STATE_HAVE_PARITY);
+ else
+ _pgm_rxw_state (window, new_skb, PGM_PKT_STATE_HAVE_DATA);
+ window->size += new_skb->len;
+
+ return PGM_RXW_INSERTED;
+}
+
+/* shuffle parity packet at skb->sequence to any other needed spot.
+ */
+
+static inline
+void
+_pgm_rxw_shuffle_parity (
+ pgm_rxw_t* const restrict window,
+ struct pgm_sk_buff_t* const restrict skb
+ )
+{
+ uint_fast32_t index_;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (NULL != skb);
+
+ struct pgm_sk_buff_t* restrict missing = _pgm_rxw_find_missing (window, skb->sequence);
+ if (NULL == missing)
+ return;
+
+/* replace place holder skb with parity skb */
+ char cb[48];
+ _pgm_rxw_unlink (window, missing);
+ memcpy (cb, skb->cb, sizeof(skb->cb));
+ memcpy (skb->cb, missing->cb, sizeof(skb->cb));
+ memcpy (missing->cb, cb, sizeof(skb->cb));
+ index_ = skb->sequence % pgm_rxw_max_length (window);
+ window->pdata[index_] = skb;
+ index_ = missing->sequence % pgm_rxw_max_length (window);
+ window->pdata[index_] = missing;
+}
+
+/* skb advances the window lead.
+ *
+ * returns:
+ * PGM_RXW_APPENDED - packet advanced window lead, skb consumed.
+ * PGM_RXW_MALFORMED - corrupted or invalid packet.
+ * PGM_RXW_BOUNDS - packet out of window.
+ */
+
+static
+int
+_pgm_rxw_append (
+ pgm_rxw_t* const restrict window,
+ struct pgm_sk_buff_t* const restrict skb,
+ const pgm_time_t now
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (NULL != skb);
+ if (skb->pgm_header->pgm_options & PGM_OPT_PARITY) {
+ pgm_assert (_pgm_rxw_tg_sqn (window, skb->sequence) == _pgm_rxw_tg_sqn (window, pgm_rxw_lead (window)));
+ } else {
+ pgm_assert (skb->sequence == pgm_rxw_next_lead (window));
+ }
+
+ if (PGM_UNLIKELY(_pgm_rxw_is_invalid_var_pktlen (window, skb) ||
+ _pgm_rxw_is_invalid_payload_op (window, skb)))
+ return PGM_RXW_MALFORMED;
+
+ if (pgm_rxw_is_full (window)) {
+ if (_pgm_rxw_commit_is_empty (window)) {
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Receive window full on new data."));
+ _pgm_rxw_remove_trail (window);
+ } else {
+ return PGM_RXW_BOUNDS; /* constrained by commit window */
+ }
+ }
+
+/* advance leading edge */
+ window->lead++;
+
+/* add packet to bitmap */
+ window->bitmap = (window->bitmap << 1) | 1;
+
+/* update the Exponential Moving Average (EMA) data loss with data:
+ * s_t = α × x_{t-1} + (1 - α) × s_{t-1}
+ * x_{t-1} = 0
+ * ∴ s_t = (1 - α) × s_{t-1}
+ */
+ window->data_loss = pgm_fp16mul (window->data_loss, pgm_fp16 (1) - window->ack_c_p);
+
+/* APDU fragments are already declared lost */
+ if (PGM_UNLIKELY(skb->pgm_opt_fragment &&
+ _pgm_rxw_is_apdu_lost (window, skb)))
+ {
+ struct pgm_sk_buff_t* lost_skb = pgm_alloc_skb (window->max_tpdu);
+ lost_skb->tstamp = now;
+ lost_skb->sequence = skb->sequence;
+
+/* add lost-placeholder skb to window */
+ const uint_fast32_t index_ = lost_skb->sequence % pgm_rxw_max_length (window);
+ window->pdata[index_] = lost_skb;
+
+ _pgm_rxw_state (window, lost_skb, PGM_PKT_STATE_LOST_DATA);
+ return PGM_RXW_BOUNDS;
+ }
+
+/* add skb to window */
+ if (skb->pgm_header->pgm_options & PGM_OPT_PARITY)
+ {
+ const uint_fast32_t index_ = skb->sequence % pgm_rxw_max_length (window);
+ window->pdata[index_] = skb;
+ _pgm_rxw_state (window, skb, PGM_PKT_STATE_HAVE_PARITY);
+ }
+ else
+ {
+ const uint_fast32_t index_ = skb->sequence % pgm_rxw_max_length (window);
+ window->pdata[index_] = skb;
+ _pgm_rxw_state (window, skb, PGM_PKT_STATE_HAVE_DATA);
+ }
+
+/* statistics */
+ window->size += skb->len;
+
+ return PGM_RXW_APPENDED;
+}
+
+/* remove references to all commit packets not in the same transmission group
+ * as the commit-lead
+ */
+
+void
+pgm_rxw_remove_commit (
+ pgm_rxw_t* const window
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+ const uint32_t tg_sqn_of_commit_lead = _pgm_rxw_tg_sqn (window, window->commit_lead);
+
+ while (!_pgm_rxw_commit_is_empty (window) &&
+ tg_sqn_of_commit_lead != _pgm_rxw_tg_sqn (window, window->trail))
+ {
+ _pgm_rxw_remove_trail (window);
+ }
+}
+
+/* flush packets but instead of calling on_data append the contiguous data packets
+ * to the provided scatter/gather vector.
+ *
+ * when transmission groups are enabled, packets remain in the windows tagged committed
+ * until the transmission group has been completely committed. this allows the packet
+ * data to be used in parity calculations to recover the missing packets.
+ *
+ * returns -1 on nothing read, returns length of bytes read, 0 is a valid read length.
+ *
+ * PGM skbuffs will have an increased reference count and must be unreferenced by the
+ * calling application.
+ */
+
+ssize_t
+pgm_rxw_readv (
+ pgm_rxw_t* const restrict window,
+ struct pgm_msgv_t** restrict pmsg, /* message array, updated as messages appended */
+ const unsigned pmsglen /* number of items in pmsg */
+ )
+{
+ const struct pgm_msgv_t* msg_end;
+ struct pgm_sk_buff_t* skb;
+ pgm_rxw_state_t* state;
+ ssize_t bytes_read;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (NULL != pmsg);
+ pgm_assert_cmpuint (pmsglen, >, 0);
+
+ pgm_debug ("readv (window:%p pmsg:%p pmsglen:%u)",
+ (void*)window, (void*)pmsg, pmsglen);
+
+ msg_end = *pmsg + pmsglen - 1;
+
+ if (_pgm_rxw_incoming_is_empty (window))
+ return -1;
+
+ skb = _pgm_rxw_peek (window, window->commit_lead);
+ pgm_assert (NULL != skb);
+
+ state = (pgm_rxw_state_t*)&skb->cb;
+ switch (state->pkt_state) {
+ case PGM_PKT_STATE_HAVE_DATA:
+ bytes_read = _pgm_rxw_incoming_read (window, pmsg, msg_end - *pmsg + 1);
+ break;
+
+ case PGM_PKT_STATE_LOST_DATA:
+/* do not purge in situ sequence */
+ if (_pgm_rxw_commit_is_empty (window)) {
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Removing lost trail from window"));
+ _pgm_rxw_remove_trail (window);
+ } else {
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Locking trail at commit window"));
+ }
+/* fall through */
+ case PGM_PKT_STATE_BACK_OFF:
+ case PGM_PKT_STATE_WAIT_NCF:
+ case PGM_PKT_STATE_WAIT_DATA:
+ case PGM_PKT_STATE_HAVE_PARITY:
+ bytes_read = -1;
+ break;
+
+ case PGM_PKT_STATE_COMMIT_DATA:
+ case PGM_PKT_STATE_ERROR:
+ default:
+ pgm_assert_not_reached();
+ break;
+ }
+
+ return bytes_read;
+}
+
+/* remove lost sequences from the trailing edge of the window. lost sequence
+ * at lead of commit window invalidates all parity-data packets as any
+ * transmission group is now unrecoverable.
+ *
+ * returns number of sequences purged.
+ */
+
+static
+unsigned
+_pgm_rxw_remove_trail (
+ pgm_rxw_t* const window
+ )
+{
+ struct pgm_sk_buff_t* skb;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (!pgm_rxw_is_empty (window));
+
+ skb = _pgm_rxw_peek (window, window->trail);
+ pgm_assert (NULL != skb);
+ _pgm_rxw_unlink (window, skb);
+ window->size -= skb->len;
+/* remove reference to skb */
+ if (PGM_UNLIKELY(pgm_mem_gc_friendly)) {
+ const uint_fast32_t index_ = skb->sequence % pgm_rxw_max_length (window);
+ window->pdata[index_] = NULL;
+ }
+ pgm_free_skb (skb);
+ if (window->trail++ == window->commit_lead) {
+/* data-loss */
+ window->commit_lead++;
+ window->cumulative_losses++;
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Data loss due to pulled trailing edge, fragment count %" PRIu32 "."),window->fragment_count);
+ return 1;
+ }
+ return 0;
+}
+
+unsigned
+pgm_rxw_remove_trail (
+ pgm_rxw_t* const window
+ )
+{
+ pgm_debug ("remove_trail (window:%p)", (const void*)window);
+ return _pgm_rxw_remove_trail (window);
+}
+
+/* read contiguous APDU-grouped sequences from the incoming window.
+ *
+ * side effects:
+ *
+ * 1) increments statics for window messages and bytes read.
+ *
+ * returns count of bytes read.
+ */
+
+static inline
+ssize_t
+_pgm_rxw_incoming_read (
+ pgm_rxw_t* const restrict window,
+ struct pgm_msgv_t** restrict pmsg, /* message array, updated as messages appended */
+ unsigned pmsglen /* number of items in pmsg */
+ )
+{
+ const struct pgm_msgv_t* msg_end;
+ struct pgm_sk_buff_t* skb;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (NULL != pmsg);
+ pgm_assert_cmpuint (pmsglen, >, 0);
+ pgm_assert (!_pgm_rxw_incoming_is_empty (window));
+
+ pgm_debug ("_pgm_rxw_incoming_read (window:%p pmsg:%p pmsglen:%u)",
+ (void*)window, (void*)pmsg, pmsglen);
+
+ msg_end = *pmsg + pmsglen - 1;
+ ssize_t bytes_read = 0;
+ size_t data_read = 0;
+
+ do {
+ skb = _pgm_rxw_peek (window, window->commit_lead);
+ pgm_assert (NULL != skb);
+ if (_pgm_rxw_is_apdu_complete (window,
+ skb->pgm_opt_fragment ? ntohl (skb->of_apdu_first_sqn) : skb->sequence))
+ {
+ bytes_read += _pgm_rxw_incoming_read_apdu (window, pmsg);
+ data_read ++;
+ }
+ else break;
+ } while (*pmsg <= msg_end && !_pgm_rxw_incoming_is_empty (window));
+
+ window->bytes_delivered += bytes_read;
+ window->msgs_delivered += data_read;
+ return data_read > 0 ? bytes_read : -1;
+}
+
+/* returns TRUE if transmission group is lost.
+ *
+ * checking is lightly limited to bounds.
+ */
+
+static inline
+bool
+_pgm_rxw_is_tg_sqn_lost (
+ pgm_rxw_t* const window,
+ const uint32_t tg_sqn /* transmission group sequence */
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert_cmpuint (_pgm_rxw_pkt_sqn (window, tg_sqn), ==, 0);
+
+ if (pgm_rxw_is_empty (window))
+ return TRUE;
+
+ if (pgm_uint32_lt (tg_sqn, window->trail))
+ return TRUE;
+
+ return FALSE;
+}
+
+/* reconstruct missing sequences in a transmission group using embedded parity data.
+ */
+
+static
+void
+_pgm_rxw_reconstruct (
+ pgm_rxw_t* const window,
+ const uint32_t tg_sqn /* transmission group sequence */
+ )
+{
+ struct pgm_sk_buff_t* skb;
+ pgm_rxw_state_t* state;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (1 == window->is_fec_available);
+ pgm_assert_cmpuint (_pgm_rxw_pkt_sqn (window, tg_sqn), ==, 0);
+
+ skb = _pgm_rxw_peek (window, tg_sqn);
+ pgm_assert (NULL != skb);
+
+ const bool is_var_pktlen = skb->pgm_header->pgm_options & PGM_OPT_VAR_PKTLEN;
+ const bool is_op_encoded = skb->pgm_header->pgm_options & PGM_OPT_PRESENT;
+ const uint16_t parity_length = ntohs (skb->pgm_header->pgm_tsdu_length);
+ struct pgm_sk_buff_t* tg_skbs[ window->rs.n ];
+ pgm_gf8_t* tg_data[ window->rs.n ];
+ pgm_gf8_t* tg_opts[ window->rs.n ];
+ uint8_t offsets[ window->rs.k ];
+ uint8_t rs_h = 0;
+
+ for (uint32_t i = tg_sqn, j = 0; i != (tg_sqn + window->rs.k); i++, j++)
+ {
+ skb = _pgm_rxw_peek (window, i);
+ pgm_assert (NULL != skb);
+ state = (pgm_rxw_state_t*)&skb->cb;
+ switch (state->pkt_state) {
+ case PGM_PKT_STATE_HAVE_DATA:
+ tg_skbs[ j ] = skb;
+ tg_data[ j ] = skb->data;
+ tg_opts[ j ] = (pgm_gf8_t*)skb->pgm_opt_fragment;
+ offsets[ j ] = j;
+ break;
+
+ case PGM_PKT_STATE_HAVE_PARITY:
+ tg_skbs[ window->rs.k + rs_h ] = skb;
+ tg_data[ window->rs.k + rs_h ] = skb->data;
+ tg_opts[ window->rs.k + rs_h ] = (pgm_gf8_t*)skb->pgm_opt_fragment;
+ offsets[ j ] = window->rs.k + rs_h;
+ ++rs_h;
+/* fall through and alloc new skb for reconstructed data */
+ case PGM_PKT_STATE_BACK_OFF:
+ case PGM_PKT_STATE_WAIT_NCF:
+ case PGM_PKT_STATE_WAIT_DATA:
+ case PGM_PKT_STATE_LOST_DATA:
+ skb = pgm_alloc_skb (window->max_tpdu);
+ pgm_skb_reserve (skb, sizeof(struct pgm_header) + sizeof(struct pgm_data));
+ skb->pgm_header = skb->head;
+ skb->pgm_data = (void*)( skb->pgm_header + 1 );
+ if (is_op_encoded) {
+ const uint16_t opt_total_length = sizeof(struct pgm_opt_length) +
+ sizeof(struct pgm_opt_header) +
+ sizeof(struct pgm_opt_fragment);
+ pgm_skb_reserve (skb, opt_total_length);
+ skb->pgm_opt_fragment = (void*)( skb->pgm_data + 1 );
+ pgm_skb_put (skb, parity_length);
+ memset (skb->pgm_opt_fragment, 0, opt_total_length + parity_length);
+ } else {
+ pgm_skb_put (skb, parity_length);
+ memset (skb->data, 0, parity_length);
+ }
+ tg_skbs[ j ] = skb;
+ tg_data[ j ] = skb->data;
+ tg_opts[ j ] = (void*)skb->pgm_opt_fragment;
+ break;
+
+ default: pgm_assert_not_reached(); break;
+ }
+
+ if (!skb->zero_padded) {
+ memset (skb->tail, 0, parity_length - skb->len);
+ skb->zero_padded = 1;
+ }
+
+ }
+
+/* reconstruct payload */
+ pgm_rs_decode_parity_appended (&window->rs,
+ tg_data,
+ offsets,
+ parity_length);
+
+/* reconstruct opt_fragment option */
+ if (is_op_encoded)
+ pgm_rs_decode_parity_appended (&window->rs,
+ tg_opts,
+ offsets,
+ sizeof(struct pgm_opt_fragment));
+
+/* swap parity skbs with reconstructed skbs */
+ for (uint_fast8_t i = 0; i < window->rs.k; i++)
+ {
+ if (offsets[i] < window->rs.k)
+ continue;
+
+ struct pgm_sk_buff_t* repair_skb = tg_skbs[i];
+
+ if (is_var_pktlen)
+ {
+ const uint16_t pktlen = *(uint16_t*)( (char*)repair_skb->tail - sizeof(uint16_t));
+ if (pktlen > parity_length) {
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Invalid encoded variable packet length in reconstructed packet, dropping entire transmission group."));
+ pgm_free_skb (repair_skb);
+ for (uint_fast8_t j = i; j < window->rs.k; j++)
+ {
+ if (offsets[j] < window->rs.k)
+ continue;
+ pgm_rxw_lost (window, tg_skbs[offsets[j]]->sequence);
+ }
+ break;
+ }
+ const uint16_t padding = parity_length - pktlen;
+ repair_skb->len -= padding;
+ repair_skb->tail = (char*)repair_skb->tail - padding;
+ }
+
+#ifdef PGM_DISABLE_ASSERT
+ _pgm_rxw_insert (window, repair_skb);
+#else
+ pgm_assert_cmpint (_pgm_rxw_insert (window, repair_skb), ==, PGM_RXW_INSERTED);
+#endif
+ }
+}
+
+/* check every TPDU in an APDU and verify that the data has arrived
+ * and is available to commit to the application.
+ *
+ * if APDU sits in a transmission group that can be reconstructed use parity
+ * data then the entire group will be decoded and any missing data packets
+ * replaced by the recovery calculation.
+ *
+ * packets with single fragment fragment headers must be normalised as regular
+ * packets before calling.
+ *
+ * APDUs exceeding PGM_MAX_FRAGMENTS or PGM_MAX_APDU length will be discarded.
+ *
+ * returns FALSE if APDU is incomplete or longer than max_len sequences.
+ */
+
+static
+bool
+_pgm_rxw_is_apdu_complete (
+ pgm_rxw_t* const window,
+ const uint32_t first_sequence
+ )
+{
+ struct pgm_sk_buff_t* skb;
+ pgm_rxw_state_t* state;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+ pgm_debug ("_pgm_rxw_is_apdu_complete (window:%p first-sequence:%" PRIu32 ")",
+ (const void*)window, first_sequence);
+
+ skb = _pgm_rxw_peek (window, first_sequence);
+ if (PGM_UNLIKELY(NULL == skb)) {
+ return FALSE;
+ }
+
+ const size_t apdu_size = skb->pgm_opt_fragment ? ntohl (skb->of_apdu_len) : skb->len;
+ const uint32_t tg_sqn = _pgm_rxw_tg_sqn (window, first_sequence);
+ uint32_t sequence = first_sequence;
+ unsigned contiguous_tpdus = 0;
+ size_t contiguous_size = 0;
+ bool check_parity = FALSE;
+
+ pgm_assert_cmpuint (apdu_size, >=, skb->len);
+
+/* protocol sanity check: maximum length */
+ if (PGM_UNLIKELY(apdu_size > PGM_MAX_APDU)) {
+ pgm_rxw_lost (window, first_sequence);
+ return FALSE;
+ }
+
+ do {
+ state = (pgm_rxw_state_t*)&skb->cb;
+
+ if (!check_parity &&
+ PGM_PKT_STATE_HAVE_DATA != state->pkt_state)
+ {
+ if (window->is_fec_available &&
+ !_pgm_rxw_is_tg_sqn_lost (window, tg_sqn) )
+ {
+ check_parity = TRUE;
+/* pre-seed committed sequence count */
+ if (pgm_uint32_lte (tg_sqn, window->commit_lead))
+ contiguous_tpdus += window->commit_lead - tg_sqn;
+ }
+ else
+ return FALSE;
+ }
+
+ if (check_parity)
+ {
+ if (PGM_PKT_STATE_HAVE_DATA == state->pkt_state ||
+ PGM_PKT_STATE_HAVE_PARITY == state->pkt_state)
+ ++contiguous_tpdus;
+
+/* have sufficient been received for reconstruction */
+ if (contiguous_tpdus >= window->tg_size) {
+ _pgm_rxw_reconstruct (window, tg_sqn);
+ return _pgm_rxw_is_apdu_complete (window, first_sequence);
+ }
+ }
+ else
+ {
+/* single packet APDU, already complete */
+ if (PGM_PKT_STATE_HAVE_DATA == state->pkt_state &&
+ !skb->pgm_opt_fragment)
+ return TRUE;
+
+/* protocol sanity check: matching first sequence reference */
+ if (PGM_UNLIKELY(ntohl (skb->of_apdu_first_sqn) != first_sequence)) {
+ pgm_rxw_lost (window, first_sequence);
+ return FALSE;
+ }
+
+/* protocol sanity check: matching apdu length */
+ if (PGM_UNLIKELY(ntohl (skb->of_apdu_len) != apdu_size)) {
+ pgm_rxw_lost (window, first_sequence);
+ return FALSE;
+ }
+
+/* protocol sanity check: maximum number of fragments per apdu */
+ if (PGM_UNLIKELY(++contiguous_tpdus > PGM_MAX_FRAGMENTS)) {
+ pgm_rxw_lost (window, first_sequence);
+ return FALSE;
+ }
+
+ contiguous_size += skb->len;
+ if (apdu_size == contiguous_size)
+ return TRUE;
+ else if (PGM_UNLIKELY(apdu_size < contiguous_size)) {
+ pgm_rxw_lost (window, first_sequence);
+ return FALSE;
+ }
+ }
+
+ skb = _pgm_rxw_peek (window, ++sequence);
+ } while (skb);
+
+/* pending */
+ return FALSE;
+}
+
+/* read one APDU consisting of one or more TPDUs. target array is guaranteed
+ * to be big enough to store complete APDU.
+ */
+
+static inline
+ssize_t
+_pgm_rxw_incoming_read_apdu (
+ pgm_rxw_t* const restrict window,
+ struct pgm_msgv_t** restrict pmsg /* message array, updated as messages appended */
+ )
+{
+ struct pgm_sk_buff_t *skb;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (NULL != pmsg);
+
+ pgm_debug ("_pgm_rxw_incoming_read_apdu (window:%p pmsg:%p)",
+ (const void*)window, (const void*)pmsg);
+
+ skb = _pgm_rxw_peek (window, window->commit_lead);
+ size_t contiguous_len = 0;
+ const size_t apdu_len = skb->pgm_opt_fragment ? ntohl (skb->of_apdu_len) : skb->len;
+ unsigned i = 0;
+ pgm_assert_cmpuint (apdu_len, >=, skb->len);
+ (*pmsg)->msgv_len = 0;
+ do {
+ _pgm_rxw_state (window, skb, PGM_PKT_STATE_COMMIT_DATA);
+ (*pmsg)->msgv_skb[i++] = skb;
+ (*pmsg)->msgv_len++;
+ contiguous_len += skb->len;
+ window->commit_lead++;
+ if (apdu_len == contiguous_len)
+ break;
+ skb = _pgm_rxw_peek (window, window->commit_lead);
+ } while (apdu_len > contiguous_len);
+
+ (*pmsg)++;
+
+/* post-conditions */
+ pgm_assert (!_pgm_rxw_commit_is_empty (window));
+
+return contiguous_len;
+}
+
+/* returns transmission group sequence (TG_SQN) from sequence (SQN).
+ */
+
+static inline
+uint32_t
+_pgm_rxw_tg_sqn (
+ pgm_rxw_t* const window,
+ const uint32_t sequence
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+ const uint32_t tg_sqn_mask = 0xffffffff << window->tg_sqn_shift;
+ return sequence & tg_sqn_mask;
+}
+
+/* returns packet number (PKT_SQN) from sequence (SQN).
+ */
+
+static inline
+uint32_t
+_pgm_rxw_pkt_sqn (
+ pgm_rxw_t* const window,
+ const uint32_t sequence
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+ const uint32_t tg_sqn_mask = 0xffffffff << window->tg_sqn_shift;
+ return sequence & ~tg_sqn_mask;
+}
+
+/* returns TRUE when the sequence is the first of a transmission group.
+ */
+
+static inline
+bool
+_pgm_rxw_is_first_of_tg_sqn (
+ pgm_rxw_t* const window,
+ const uint32_t sequence
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+ return _pgm_rxw_pkt_sqn (window, sequence) == 0;
+}
+
+/* returns TRUE when the sequence is the last of a transmission group
+ */
+
+static inline
+bool
+_pgm_rxw_is_last_of_tg_sqn (
+ pgm_rxw_t* const window,
+ const uint32_t sequence
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+ return _pgm_rxw_pkt_sqn (window, sequence) == window->tg_size - 1;
+}
+
+/* set PGM skbuff to new FSM state.
+ */
+
+static
+void
+_pgm_rxw_state (
+ pgm_rxw_t* const restrict window,
+ struct pgm_sk_buff_t* const restrict skb,
+ const int new_pkt_state
+ )
+{
+ pgm_rxw_state_t* state = (pgm_rxw_state_t*)&skb->cb;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (NULL != skb);
+
+/* remove current state */
+ if (PGM_PKT_STATE_ERROR != state->pkt_state)
+ _pgm_rxw_unlink (window, skb);
+
+ switch (new_pkt_state) {
+ case PGM_PKT_STATE_BACK_OFF:
+ pgm_queue_push_head_link (&window->nak_backoff_queue, (pgm_list_t*)skb);
+ break;
+
+ case PGM_PKT_STATE_WAIT_NCF:
+ pgm_queue_push_head_link (&window->wait_ncf_queue, (pgm_list_t*)skb);
+ break;
+
+ case PGM_PKT_STATE_WAIT_DATA:
+ pgm_queue_push_head_link (&window->wait_data_queue, (pgm_list_t*)skb);
+ break;
+
+ case PGM_PKT_STATE_HAVE_DATA:
+ window->fragment_count++;
+ pgm_assert_cmpuint (window->fragment_count, <=, pgm_rxw_length (window));
+ break;
+
+ case PGM_PKT_STATE_HAVE_PARITY:
+ window->parity_count++;
+ pgm_assert_cmpuint (window->parity_count, <=, pgm_rxw_length (window));
+ break;
+
+ case PGM_PKT_STATE_COMMIT_DATA:
+ window->committed_count++;
+ pgm_assert_cmpuint (window->committed_count, <=, pgm_rxw_length (window));
+ break;
+
+ case PGM_PKT_STATE_LOST_DATA:
+ window->lost_count++;
+ window->cumulative_losses++;
+ window->has_event = 1;
+ pgm_assert_cmpuint (window->lost_count, <=, pgm_rxw_length (window));
+ break;
+
+ case PGM_PKT_STATE_ERROR:
+ break;
+
+ default: pgm_assert_not_reached(); break;
+ }
+
+ state->pkt_state = new_pkt_state;
+}
+
+void
+pgm_rxw_state (
+ pgm_rxw_t* const restrict window,
+ struct pgm_sk_buff_t* const restrict skb,
+ const int new_pkt_state
+ )
+{
+ pgm_debug ("state (window:%p skb:%p new_pkt_state:%s)",
+ (const void*)window, (const void*)skb, pgm_pkt_state_string (new_pkt_state));
+ _pgm_rxw_state (window, skb, new_pkt_state);
+}
+
+/* remove current state from sequence.
+ */
+
+static
+void
+_pgm_rxw_unlink (
+ pgm_rxw_t* const restrict window,
+ struct pgm_sk_buff_t* const restrict skb
+ )
+{
+ pgm_queue_t* queue;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (NULL != skb);
+
+ pgm_rxw_state_t* state = (pgm_rxw_state_t*)&skb->cb;
+
+ switch (state->pkt_state) {
+ case PGM_PKT_STATE_BACK_OFF:
+ pgm_assert (!pgm_queue_is_empty (&window->nak_backoff_queue));
+ queue = &window->nak_backoff_queue;
+ goto unlink_queue;
+
+ case PGM_PKT_STATE_WAIT_NCF:
+ pgm_assert (!pgm_queue_is_empty (&window->wait_ncf_queue));
+ queue = &window->wait_ncf_queue;
+ goto unlink_queue;
+
+ case PGM_PKT_STATE_WAIT_DATA:
+ pgm_assert (!pgm_queue_is_empty (&window->wait_data_queue));
+ queue = &window->wait_data_queue;
+unlink_queue:
+ pgm_queue_unlink (queue, (pgm_list_t*)skb);
+ break;
+
+ case PGM_PKT_STATE_HAVE_DATA:
+ pgm_assert_cmpuint (window->fragment_count, >, 0);
+ window->fragment_count--;
+ break;
+
+ case PGM_PKT_STATE_HAVE_PARITY:
+ pgm_assert_cmpuint (window->parity_count, >, 0);
+ window->parity_count--;
+ break;
+
+ case PGM_PKT_STATE_COMMIT_DATA:
+ pgm_assert_cmpuint (window->committed_count, >, 0);
+ window->committed_count--;
+ break;
+
+ case PGM_PKT_STATE_LOST_DATA:
+ pgm_assert_cmpuint (window->lost_count, >, 0);
+ window->lost_count--;
+ break;
+
+ case PGM_PKT_STATE_ERROR:
+ break;
+
+ default: pgm_assert_not_reached(); break;
+ }
+
+ state->pkt_state = PGM_PKT_STATE_ERROR;
+ pgm_assert (((pgm_list_t*)skb)->next == NULL);
+ pgm_assert (((pgm_list_t*)skb)->prev == NULL);
+}
+
+/* returns the pointer at the given index of the window.
+ */
+
+struct pgm_sk_buff_t*
+pgm_rxw_peek (
+ pgm_rxw_t* const window,
+ const uint32_t sequence
+ )
+{
+ pgm_debug ("peek (window:%p sequence:%" PRIu32 ")", (void*)window, sequence);
+ return _pgm_rxw_peek (window, sequence);
+}
+
+/* mark an existing sequence lost due to failed recovery.
+ */
+
+void
+pgm_rxw_lost (
+ pgm_rxw_t* const window,
+ const uint32_t sequence
+ )
+{
+ struct pgm_sk_buff_t* skb;
+ pgm_rxw_state_t* state;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (!pgm_rxw_is_empty (window));
+
+ pgm_debug ("lost (window:%p sequence:%" PRIu32 ")",
+ (const void*)window, sequence);
+
+ skb = _pgm_rxw_peek (window, sequence);
+ pgm_assert (NULL != skb);
+
+ state = (pgm_rxw_state_t*)&skb->cb;
+
+ if (PGM_UNLIKELY(!(state->pkt_state == PGM_PKT_STATE_BACK_OFF ||
+ state->pkt_state == PGM_PKT_STATE_WAIT_NCF ||
+ state->pkt_state == PGM_PKT_STATE_WAIT_DATA ||
+ state->pkt_state == PGM_PKT_STATE_HAVE_DATA || /* fragments */
+ state->pkt_state == PGM_PKT_STATE_HAVE_PARITY)))
+ {
+ pgm_fatal (_("Unexpected state %s(%u)"), pgm_pkt_state_string (state->pkt_state), state->pkt_state);
+ pgm_assert_not_reached();
+ }
+
+ _pgm_rxw_state (window, skb, PGM_PKT_STATE_LOST_DATA);
+}
+
+/* received a uni/multicast ncf, search for a matching nak & tag or extend window if
+ * beyond lead
+ *
+ * returns:
+ * PGM_RXW_BOUNDS - sequence is outside of window, or window is undefined.
+ * PGM_RXW_UPDATED - receiver state updated, waiting for data.
+ * PGM_RXW_DUPLICATE - data already exists at sequence.
+ * PGM_RXW_APPENDED - lead is extended with state set waiting for data.
+ */
+
+int
+pgm_rxw_confirm (
+ pgm_rxw_t* const window,
+ const uint32_t sequence,
+ const pgm_time_t now,
+ const pgm_time_t nak_rdata_expiry, /* pre-calculated expiry times */
+ const pgm_time_t nak_rb_expiry
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+ pgm_debug ("confirm (window:%p sequence:%" PRIu32 " nak_rdata_expiry:%" PGM_TIME_FORMAT " nak_rb_expiry:%" PGM_TIME_FORMAT ")",
+ (void*)window, sequence, nak_rdata_expiry, nak_rb_expiry);
+
+/* NCFs do not define the transmit window */
+ if (PGM_UNLIKELY(!window->is_defined))
+ return PGM_RXW_BOUNDS;
+
+/* sequence already committed */
+ if (pgm_uint32_lt (sequence, window->commit_lead)) {
+ if (pgm_uint32_gte (sequence, window->trail))
+ return PGM_RXW_DUPLICATE;
+ else
+ return PGM_RXW_BOUNDS;
+ }
+
+ if (pgm_uint32_lte (sequence, window->lead))
+ return _pgm_rxw_recovery_update (window, sequence, nak_rdata_expiry);
+
+ if (sequence == window->lead)
+ return _pgm_rxw_recovery_append (window, now, nak_rdata_expiry);
+ else {
+ _pgm_rxw_add_placeholder_range (window, sequence, now, nak_rb_expiry);
+ return _pgm_rxw_recovery_append (window, now, nak_rdata_expiry);
+ }
+}
+
+/* update an incoming sequence with state transition to WAIT-DATA.
+ *
+ * returns:
+ * PGM_RXW_UPDATED - receiver state updated, waiting for data.
+ * PGM_RXW_DUPLICATE - data already exists at sequence.
+ */
+
+static inline
+int
+_pgm_rxw_recovery_update (
+ pgm_rxw_t* const window,
+ const uint32_t sequence,
+ const pgm_time_t nak_rdata_expiry /* pre-calculated expiry times */
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+/* fetch skb from window and bump expiration times */
+ struct pgm_sk_buff_t* skb = _pgm_rxw_peek (window, sequence);
+ pgm_assert (NULL != skb);
+ pgm_rxw_state_t* state = (pgm_rxw_state_t*)&skb->cb;
+ switch (state->pkt_state) {
+ case PGM_PKT_STATE_BACK_OFF:
+ case PGM_PKT_STATE_WAIT_NCF:
+ pgm_rxw_state (window, skb, PGM_PKT_STATE_WAIT_DATA);
+
+/* fall through */
+ case PGM_PKT_STATE_WAIT_DATA:
+ state->timer_expiry = nak_rdata_expiry;
+ return PGM_RXW_UPDATED;
+
+ case PGM_PKT_STATE_HAVE_DATA:
+ case PGM_PKT_STATE_HAVE_PARITY:
+ case PGM_PKT_STATE_COMMIT_DATA:
+ case PGM_PKT_STATE_LOST_DATA:
+ break;
+
+ default: pgm_assert_not_reached(); break;
+ }
+
+ return PGM_RXW_DUPLICATE;
+}
+
+/* append an skb to the incoming window with WAIT-DATA state.
+ *
+ * returns:
+ * PGM_RXW_APPENDED - lead is extended with state set waiting for data.
+ * PGM_RXW_BOUNDS - constrained by commit window
+ */
+
+static inline
+int
+_pgm_rxw_recovery_append (
+ pgm_rxw_t* const window,
+ const pgm_time_t now,
+ const pgm_time_t nak_rdata_expiry /* pre-calculated expiry times */
+ )
+{
+ struct pgm_sk_buff_t* skb;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+ if (pgm_rxw_is_full (window)) {
+ if (_pgm_rxw_commit_is_empty (window)) {
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Receive window full on confirmed sequence."));
+ _pgm_rxw_remove_trail (window);
+ } else {
+ return PGM_RXW_BOUNDS; /* constrained by commit window */
+ }
+ }
+
+/* advance leading edge */
+ window->lead++;
+
+/* add loss to bitmap */
+ window->bitmap <<= 1;
+
+/* update the Exponential Moving Average (EMA) data loss with loss:
+ * s_t = α × x_{t-1} + (1 - α) × s_{t-1}
+ * x_{t-1} = 1
+ * ∴ s_t = α + (1 - α) × s_{t-1}
+ */
+ window->data_loss = window->ack_c_p + pgm_fp16mul (pgm_fp16 (1) - window->ack_c_p, window->data_loss);
+
+ skb = pgm_alloc_skb (window->max_tpdu);
+ pgm_rxw_state_t* state = (pgm_rxw_state_t*)&skb->cb;
+ skb->tstamp = now;
+ skb->sequence = window->lead;
+ state->timer_expiry = nak_rdata_expiry;
+
+ const uint_fast32_t index_ = pgm_rxw_lead (window) % pgm_rxw_max_length (window);
+ window->pdata[index_] = skb;
+ _pgm_rxw_state (window, skb, PGM_PKT_STATE_WAIT_DATA);
+
+ return PGM_RXW_APPENDED;
+}
+
+/* dumps window state to stdout
+ */
+
+void
+pgm_rxw_dump (
+ const pgm_rxw_t* const window
+ )
+{
+ pgm_info ("window = {"
+ "tsi = {gsi = {identifier = %i.%i.%i.%i.%i.%i}, sport = %" PRIu16 "}, "
+ "nak_backoff_queue = {head = %p, tail = %p, length = %u}, "
+ "wait_ncf_queue = {head = %p, tail = %p, length = %u}, "
+ "wait_data_queue = {head = %p, tail = %p, length = %u}, "
+ "lost_count = %" PRIu32 ", "
+ "fragment_count = %" PRIu32 ", "
+ "parity_count = %" PRIu32 ", "
+ "committed_count = %" PRIu32 ", "
+ "max_tpdu = %" PRIu16 ", "
+ "tg_size = %" PRIu32 ", "
+ "tg_sqn_shift = %u, "
+ "lead = %" PRIu32 ", "
+ "trail = %" PRIu32 ", "
+ "rxw_trail = %" PRIu32 ", "
+ "rxw_trail_init = %" PRIu32 ", "
+ "commit_lead = %" PRIu32 ", "
+ "is_constrained = %u, "
+ "is_defined = %u, "
+ "has_event = %u, "
+ "is_fec_available = %u, "
+ "min_fill_time = %" PRIu32 ", "
+ "max_fill_time = %" PRIu32 ", "
+ "min_nak_transmit_count = %" PRIu32 ", "
+ "max_nak_transmit_count = %" PRIu32 ", "
+ "cumulative_losses = %" PRIu32 ", "
+ "bytes_delivered = %" PRIu32 ", "
+ "msgs_delivered = %" PRIu32 ", "
+ "size = %zu, "
+ "alloc = %" PRIu32 ", "
+ "pdata = []"
+ "}",
+ window->tsi->gsi.identifier[0],
+ window->tsi->gsi.identifier[1],
+ window->tsi->gsi.identifier[2],
+ window->tsi->gsi.identifier[3],
+ window->tsi->gsi.identifier[4],
+ window->tsi->gsi.identifier[5],
+ ntohs (window->tsi->sport),
+ (void*)window->nak_backoff_queue.head,
+ (void*)window->nak_backoff_queue.tail,
+ window->nak_backoff_queue.length,
+ (void*)window->wait_ncf_queue.head,
+ (void*)window->wait_ncf_queue.tail,
+ window->wait_ncf_queue.length,
+ (void*)window->wait_data_queue.head,
+ (void*)window->wait_data_queue.tail,
+ window->wait_data_queue.length,
+ window->lost_count,
+ window->fragment_count,
+ window->parity_count,
+ window->committed_count,
+ window->max_tpdu,
+ window->tg_size,
+ window->tg_sqn_shift,
+ window->lead,
+ window->trail,
+ window->rxw_trail,
+ window->rxw_trail_init,
+ window->commit_lead,
+ window->is_constrained,
+ window->is_defined,
+ window->has_event,
+ window->is_fec_available,
+ window->min_fill_time,
+ window->max_fill_time,
+ window->min_nak_transmit_count,
+ window->max_nak_transmit_count,
+ window->cumulative_losses,
+ window->bytes_delivered,
+ window->msgs_delivered,
+ window->size,
+ window->alloc
+ );
+}
+
+/* state string helper
+ */
+
+const char*
+pgm_pkt_state_string (
+ const int pkt_state
+ )
+{
+ const char* c;
+
+ switch (pkt_state) {
+ case PGM_PKT_STATE_BACK_OFF: c = "PGM_PKT_STATE_BACK_OFF"; break;
+ case PGM_PKT_STATE_WAIT_NCF: c = "PGM_PKT_STATE_WAIT_NCF"; break;
+ case PGM_PKT_STATE_WAIT_DATA: c = "PGM_PKT_STATE_WAIT_DATA"; break;
+ case PGM_PKT_STATE_HAVE_DATA: c = "PGM_PKT_STATE_HAVE_DATA"; break;
+ case PGM_PKT_STATE_HAVE_PARITY: c = "PGM_PKT_STATE_HAVE_PARITY"; break;
+ case PGM_PKT_STATE_COMMIT_DATA: c = "PGM_PKT_STATE_COMMIT_DATA"; break;
+ case PGM_PKT_STATE_LOST_DATA: c = "PGM_PKT_STATE_LOST_DATA"; break;
+ case PGM_PKT_STATE_ERROR: c = "PGM_PKT_STATE_ERROR"; break;
+ default: c = "(unknown)"; break;
+ }
+
+ return c;
+}
+
+const char*
+pgm_rxw_returns_string (
+ const int rxw_returns
+ )
+{
+ const char* c;
+
+ switch (rxw_returns) {
+ case PGM_RXW_OK: c = "PGM_RXW_OK"; break;
+ case PGM_RXW_INSERTED: c = "PGM_RXW_INSERTED"; break;
+ case PGM_RXW_APPENDED: c = "PGM_RXW_APPENDED"; break;
+ case PGM_RXW_UPDATED: c = "PGM_RXW_UPDATED"; break;
+ case PGM_RXW_MISSING: c = "PGM_RXW_MISSING"; break;
+ case PGM_RXW_DUPLICATE: c = "PGM_RXW_DUPLICATE"; break;
+ case PGM_RXW_MALFORMED: c = "PGM_RXW_MALFORMED"; break;
+ case PGM_RXW_BOUNDS: c = "PGM_RXW_BOUNDS"; break;
+ case PGM_RXW_SLOW_CONSUMER: c = "PGM_RXW_SLOW_CONSUMER"; break;
+ case PGM_RXW_UNKNOWN: c = "PGM_RXW_UNKNOWN"; break;
+ default: c = "(unknown)"; break;
+ }
+
+ return c;
+}
+
+/* eof */