summaryrefslogtreecommitdiffstats
path: root/3rdparty/openpgm-svn-r1085/pgm/txw.c
diff options
context:
space:
mode:
Diffstat (limited to '3rdparty/openpgm-svn-r1085/pgm/txw.c')
-rw-r--r--3rdparty/openpgm-svn-r1085/pgm/txw.c763
1 files changed, 763 insertions, 0 deletions
diff --git a/3rdparty/openpgm-svn-r1085/pgm/txw.c b/3rdparty/openpgm-svn-r1085/pgm/txw.c
new file mode 100644
index 0000000..e2487ae
--- /dev/null
+++ b/3rdparty/openpgm-svn-r1085/pgm/txw.c
@@ -0,0 +1,763 @@
+/* vim:ts=8:sts=8:sw=4:noai:noexpandtab
+ *
+ * A basic transmit window: pointer array implementation.
+ *
+ * Copyright (c) 2006-2010 Miru Limited.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+#include <impl/i18n.h>
+#include <impl/framework.h>
+#include <impl/txw.h>
+
+
+//#define TXW_DEBUG
+
+#ifndef TXW_DEBUG
+# define PGM_DISABLE_ASSERT
+#endif
+
+
+/* testing function: is TSI null
+ *
+ * returns TRUE if null, returns FALSE if not null.
+ */
+
+static inline
+bool
+pgm_tsi_is_null (
+ const void*const tsi
+ )
+{
+ const union {
+ pgm_tsi_t tsi;
+ uint32_t l[2];
+ } *u = tsi;
+
+/* pre-conditions */
+ pgm_assert (NULL != tsi);
+
+ return (0 == u->l[0] && 0 == u->l[1]);
+}
+
+/* returns the pointer at the given index of the window. responsibility
+ * is with the caller to verify a single user ownership.
+ */
+
+static inline
+struct pgm_sk_buff_t*
+_pgm_txw_peek (
+ const pgm_txw_t*const window,
+ const uint32_t sequence
+ )
+{
+ struct pgm_sk_buff_t* skb;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+ if (pgm_txw_is_empty (window))
+ return NULL;
+
+ if (pgm_uint32_gte (sequence, window->trail) && pgm_uint32_lte (sequence, window->lead))
+ {
+ const uint_fast32_t index_ = sequence % pgm_txw_max_length (window);
+ skb = window->pdata[index_];
+ pgm_assert (NULL != skb);
+ pgm_assert (pgm_skb_is_valid (skb));
+ pgm_assert (pgm_tsi_is_null (&skb->tsi));
+ }
+ else
+ skb = NULL;
+
+ return skb;
+}
+
+/* testing function: can a request be peeked from the retransmit queue.
+ *
+ * returns TRUE if request is available, returns FALSE if not available.
+ */
+
+static inline
+bool
+pgm_txw_retransmit_can_peek (
+ pgm_txw_t*const window
+ )
+{
+ pgm_return_val_if_fail (NULL != window, FALSE);
+ return (NULL != pgm_txw_retransmit_try_peek (window));
+}
+
+/* sequence state must be smaller than PGM skbuff control buffer */
+PGM_STATIC_ASSERT(sizeof(struct pgm_txw_state_t) <= sizeof(((struct pgm_sk_buff_t*)0)->cb));
+
+uint32_t
+pgm_txw_get_unfolded_checksum (
+ const struct pgm_sk_buff_t*const skb
+ )
+{
+ const pgm_txw_state_t*const state = (const pgm_txw_state_t*const)&skb->cb;
+ return state->unfolded_checksum;
+}
+
+void
+pgm_txw_set_unfolded_checksum (
+ struct pgm_sk_buff_t*const skb,
+ const uint32_t csum
+ )
+{
+ pgm_txw_state_t* state = (pgm_txw_state_t*)&skb->cb;
+ state->unfolded_checksum = csum;
+}
+
+void
+pgm_txw_inc_retransmit_count (
+ struct pgm_sk_buff_t*const skb
+ )
+{
+ pgm_txw_state_t*const state = (pgm_txw_state_t*const)&skb->cb;
+ state->retransmit_count++;
+}
+
+bool
+pgm_txw_retransmit_is_empty (
+ const pgm_txw_t*const window
+ )
+{
+ pgm_assert (NULL != window);
+ return pgm_queue_is_empty (&window->retransmit_queue);
+}
+
+
+/* globals */
+
+static void pgm_txw_remove_tail (pgm_txw_t*const);
+static bool pgm_txw_retransmit_push_parity (pgm_txw_t*const, const uint32_t, const uint8_t);
+static bool pgm_txw_retransmit_push_selective (pgm_txw_t*const, const uint32_t);
+
+
+/* constructor for transmit window. zero-length windows are not permitted.
+ *
+ * returns pointer to window.
+ */
+
+pgm_txw_t*
+pgm_txw_create (
+ const pgm_tsi_t*const tsi,
+ const uint16_t tpdu_size,
+ const uint32_t sqns, /* transmit window size in sequence numbers */
+ const unsigned secs, /* size in seconds */
+ const ssize_t max_rte, /* max bandwidth */
+ const bool use_fec,
+ const uint8_t rs_n,
+ const uint8_t rs_k
+ )
+{
+ pgm_txw_t* window;
+
+/* pre-conditions */
+ pgm_assert (NULL != tsi);
+ if (sqns) {
+ pgm_assert_cmpuint (tpdu_size, ==, 0);
+ pgm_assert_cmpuint (sqns, >, 0);
+ pgm_assert_cmpuint (sqns & PGM_UINT32_SIGN_BIT, ==, 0);
+ pgm_assert_cmpuint (secs, ==, 0);
+ pgm_assert_cmpuint (max_rte, ==, 0);
+ } else {
+ pgm_assert_cmpuint (tpdu_size, >, 0);
+ pgm_assert_cmpuint (secs, >, 0);
+ pgm_assert_cmpuint (max_rte, >, 0);
+ }
+ if (use_fec) {
+ pgm_assert_cmpuint (rs_n, >, 0);
+ pgm_assert_cmpuint (rs_k, >, 0);
+ }
+
+ pgm_debug ("create (tsi:%s max-tpdu:%" PRIu16 " sqns:%" PRIu32 " secs %u max-rte %zd use-fec:%s rs(n):%u rs(k):%u)",
+ pgm_tsi_print (tsi),
+ tpdu_size, sqns, secs, max_rte,
+ use_fec ? "YES" : "NO",
+ rs_n, rs_k);
+
+/* calculate transmit window parameters */
+ pgm_assert (sqns || (tpdu_size && secs && max_rte));
+ const unsigned alloc_sqns = sqns ? sqns : ( (secs * max_rte) / tpdu_size );
+ window = pgm_malloc0 (sizeof(pgm_txw_t) + ( alloc_sqns * sizeof(struct pgm_sk_buff_t*) ));
+ window->tsi = tsi;
+
+/* empty state for transmission group boundaries to align.
+ *
+ * trail = 0, lead = -1
+ */
+ window->lead = -1;
+ window->trail = window->lead + 1;
+
+/* reed-solomon forward error correction */
+ if (use_fec) {
+ window->parity_buffer = pgm_alloc_skb (tpdu_size);
+ window->tg_sqn_shift = pgm_power2_log2 (rs_k);
+ pgm_rs_create (&window->rs, rs_n, rs_k);
+ window->is_fec_enabled = 1;
+ }
+
+/* pointer array */
+ window->alloc = alloc_sqns;
+
+/* post-conditions */
+ pgm_assert_cmpuint (pgm_txw_max_length (window), ==, alloc_sqns);
+ pgm_assert_cmpuint (pgm_txw_length (window), ==, 0);
+ pgm_assert_cmpuint (pgm_txw_size (window), ==, 0);
+ pgm_assert (pgm_txw_is_empty (window));
+ pgm_assert (!pgm_txw_is_full (window));
+ pgm_assert (!pgm_txw_retransmit_can_peek (window));
+
+ return window;
+}
+
+/* destructor for transmit window. must not be called more than once for same window.
+ */
+
+void
+pgm_txw_shutdown (
+ pgm_txw_t*const window
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert_cmpuint (window->alloc, >, 0);
+
+ pgm_debug ("shutdown (window:%p)", (const void*)window);
+
+/* contents of window */
+ while (!pgm_txw_is_empty (window)) {
+ pgm_txw_remove_tail (window);
+ }
+
+/* window must now be empty */
+ pgm_assert_cmpuint (pgm_txw_length (window), ==, 0);
+ pgm_assert_cmpuint (pgm_txw_size (window), ==, 0);
+ pgm_assert (pgm_txw_is_empty (window));
+ pgm_assert (!pgm_txw_is_full (window));
+
+/* retransmit queue must be empty */
+ pgm_assert (!pgm_txw_retransmit_can_peek (window));
+
+/* free reed-solomon state */
+ if (window->is_fec_enabled) {
+ pgm_free_skb (window->parity_buffer);
+ pgm_rs_destroy (&window->rs);
+ }
+
+/* window */
+ pgm_free (window);
+}
+
+/* add skb to transmit window, taking ownership. window does not grow.
+ * PGM skbuff data/tail pointers must point to the PGM payload, and hence skb->len
+ * is allowed to be zero.
+ *
+ * side effects:
+ *
+ * 1) sequence number is set in skb.
+ * 2) window is updated with new skb.
+ *
+ * no return value. fatal error raised on invalid parameters. if window is full then
+ * an entry is dropped to fulfil the request.
+ *
+ * it is an error to try to free the skb after adding to the window.
+ */
+
+void
+pgm_txw_add (
+ pgm_txw_t* const restrict window,
+ struct pgm_sk_buff_t* const restrict skb /* cannot be NULL */
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (NULL != skb);
+ pgm_assert_cmpuint (pgm_txw_max_length (window), >, 0);
+ pgm_assert (pgm_skb_is_valid (skb));
+ pgm_assert (((const pgm_list_t*)skb)->next == NULL);
+ pgm_assert (((const pgm_list_t*)skb)->prev == NULL);
+ pgm_assert (pgm_tsi_is_null (&skb->tsi));
+ pgm_assert ((char*)skb->data > (char*)skb->head);
+ pgm_assert ((sizeof(struct pgm_header) + sizeof(struct pgm_data)) <= (size_t)((char*)skb->data - (char*)skb->head));
+
+ pgm_debug ("add (window:%p skb:%p)", (const char*)window, (const char*)skb);
+
+ if (pgm_txw_is_full (window))
+ {
+/* transmit window advancement scheme dependent action here */
+ pgm_txw_remove_tail (window);
+ }
+
+/* generate new sequence number */
+ pgm_atomic_inc32 (&window->lead);
+ skb->sequence = window->lead;
+
+/* add skb to window */
+ const uint_fast32_t index_ = skb->sequence % pgm_txw_max_length (window);
+ window->pdata[index_] = skb;
+
+/* statistics */
+ window->size += skb->len;
+
+/* post-conditions */
+ pgm_assert_cmpuint (pgm_txw_length (window), >, 0);
+ pgm_assert_cmpuint (pgm_txw_length (window), <=, pgm_txw_max_length (window));
+}
+
+/* peek an entry from the window for retransmission.
+ *
+ * returns pointer to skbuff on success, returns NULL on invalid parameters.
+ */
+
+struct pgm_sk_buff_t*
+pgm_txw_peek (
+ const pgm_txw_t*const window,
+ const uint32_t sequence
+ )
+{
+ pgm_debug ("peek (window:%p sequence:%" PRIu32 ")",
+ (const void*)window, sequence);
+ return _pgm_txw_peek (window, sequence);
+}
+
+/* remove an entry from the trailing edge of the transmit window.
+ */
+
+static
+void
+pgm_txw_remove_tail (
+ pgm_txw_t* const window
+ )
+{
+ struct pgm_sk_buff_t* skb;
+ pgm_txw_state_t* state;
+
+ pgm_debug ("pgm_txw_remove_tail (window:%p)", (const void*)window);
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert (!pgm_txw_is_empty (window));
+
+ skb = _pgm_txw_peek (window, pgm_txw_trail (window));
+ pgm_assert (NULL != skb);
+ pgm_assert (pgm_skb_is_valid (skb));
+ pgm_assert (pgm_tsi_is_null (&skb->tsi));
+
+ state = (pgm_txw_state_t*)&skb->cb;
+ if (state->waiting_retransmit) {
+ pgm_queue_unlink (&window->retransmit_queue, (pgm_list_t*)skb);
+ state->waiting_retransmit = 0;
+ }
+
+/* statistics */
+ window->size -= skb->len;
+ if (state->retransmit_count > 0) {
+ PGM_HISTOGRAM_COUNTS("Tx.RetransmitCount", state->retransmit_count);
+ }
+ if (state->nak_elimination_count > 0) {
+ PGM_HISTOGRAM_COUNTS("Tx.NakEliminationCount", state->nak_elimination_count);
+ }
+
+/* remove reference to skb */
+ if (PGM_UNLIKELY(pgm_mem_gc_friendly)) {
+ const uint_fast32_t index_ = skb->sequence % pgm_txw_max_length (window);
+ window->pdata[index_] = NULL;
+ }
+ pgm_free_skb (skb);
+
+/* advance trailing pointer */
+ pgm_atomic_inc32 (&window->trail);
+
+/* post-conditions */
+ pgm_assert (!pgm_txw_is_full (window));
+}
+
+/* Try to add a sequence number to the retransmit queue, ignore if
+ * already there or no longer in the transmit window.
+ *
+ * For parity NAKs, we deal on the transmission group sequence number
+ * rather than the packet sequence number. To simplify managment we
+ * use the leading window packet to store the details of the entire
+ * transmisison group. Parity NAKs are ignored if the packet count is
+ * less than or equal to the count already queued for retransmission.
+ *
+ * returns FALSE if request was eliminated, returns TRUE if request was
+ * added to queue.
+ */
+
+bool
+pgm_txw_retransmit_push (
+ pgm_txw_t* const window,
+ const uint32_t sequence,
+ const bool is_parity, /* parity NAK ⇒ sequence_number = transmission group | packet count */
+ const uint8_t tg_sqn_shift
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert_cmpuint (tg_sqn_shift, <, 8 * sizeof(uint32_t));
+
+ pgm_debug ("retransmit_push (window:%p sequence:%" PRIu32 " is_parity:%s tg_sqn_shift:%u)",
+ (const void*)window, sequence, is_parity ? "TRUE" : "FALSE", tg_sqn_shift);
+
+/* early elimination */
+ if (pgm_txw_is_empty (window))
+ return FALSE;
+
+ if (is_parity)
+ {
+ return pgm_txw_retransmit_push_parity (window, sequence, tg_sqn_shift);
+ }
+ else
+ {
+ return pgm_txw_retransmit_push_selective (window, sequence);
+ }
+}
+
+static
+bool
+pgm_txw_retransmit_push_parity (
+ pgm_txw_t* const window,
+ const uint32_t sequence,
+ const uint8_t tg_sqn_shift
+ )
+{
+ struct pgm_sk_buff_t* skb;
+ pgm_txw_state_t* state;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+ pgm_assert_cmpuint (tg_sqn_shift, <, 8 * sizeof(uint32_t));
+
+ const uint32_t tg_sqn_mask = 0xffffffff << tg_sqn_shift;
+ const uint32_t nak_tg_sqn = sequence & tg_sqn_mask; /* left unshifted */
+ const uint32_t nak_pkt_cnt = sequence & ~tg_sqn_mask;
+ skb = _pgm_txw_peek (window, nak_tg_sqn);
+
+ if (NULL == skb) {
+ pgm_trace (PGM_LOG_ROLE_TX_WINDOW,_("Transmission group lead #%" PRIu32 " not in window."), nak_tg_sqn);
+ return FALSE;
+ }
+
+ pgm_assert (pgm_skb_is_valid (skb));
+ pgm_assert (pgm_tsi_is_null (&skb->tsi));
+ state = (pgm_txw_state_t*)&skb->cb;
+
+/* check if request can be eliminated */
+ if (state->waiting_retransmit)
+ {
+ pgm_assert (NULL != ((const pgm_list_t*)skb)->next);
+ pgm_assert (NULL != ((const pgm_list_t*)skb)->prev);
+ if (state->pkt_cnt_requested < nak_pkt_cnt) {
+/* more parity packets requested than currently scheduled, simply bump up the count */
+ state->pkt_cnt_requested = nak_pkt_cnt;
+ }
+ state->nak_elimination_count++;
+ return FALSE;
+ }
+ else
+ {
+ pgm_assert (((const pgm_list_t*)skb)->next == NULL);
+ pgm_assert (((const pgm_list_t*)skb)->prev == NULL);
+ }
+
+/* new request */
+ state->pkt_cnt_requested++;
+ pgm_queue_push_head_link (&window->retransmit_queue, (pgm_list_t*)skb);
+ pgm_assert (!pgm_queue_is_empty (&window->retransmit_queue));
+ state->waiting_retransmit = 1;
+ return TRUE;
+}
+
+static
+bool
+pgm_txw_retransmit_push_selective (
+ pgm_txw_t* const window,
+ const uint32_t sequence
+ )
+{
+ struct pgm_sk_buff_t* skb;
+ pgm_txw_state_t* state;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+ skb = _pgm_txw_peek (window, sequence);
+ if (NULL == skb) {
+ pgm_trace (PGM_LOG_ROLE_TX_WINDOW,_("Requested packet #%" PRIu32 " not in window."), sequence);
+ return FALSE;
+ }
+
+ pgm_assert (pgm_skb_is_valid (skb));
+ pgm_assert (pgm_tsi_is_null (&skb->tsi));
+ state = (pgm_txw_state_t*)&skb->cb;
+
+/* check if request can be eliminated */
+ if (state->waiting_retransmit) {
+ pgm_assert (!pgm_queue_is_empty (&window->retransmit_queue));
+ state->nak_elimination_count++;
+ return FALSE;
+ }
+
+ pgm_assert (((const pgm_list_t*)skb)->next == NULL);
+ pgm_assert (((const pgm_list_t*)skb)->prev == NULL);
+
+/* new request */
+ pgm_queue_push_head_link (&window->retransmit_queue, (pgm_list_t*)skb);
+ pgm_assert (!pgm_queue_is_empty (&window->retransmit_queue));
+ state->waiting_retransmit = 1;
+ return TRUE;
+}
+
+/* try to peek a request from the retransmit queue
+ *
+ * return pointer of first skb in queue, or return NULL if the queue is empty.
+ */
+
+struct pgm_sk_buff_t*
+pgm_txw_retransmit_try_peek (
+ pgm_txw_t* const window
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+ pgm_debug ("retransmit_try_peek (window:%p)", (const void*)window);
+
+/* no lock required to detect presence of a request */
+ pgm_list_t* tail_link = pgm_queue_peek_tail_link (&window->retransmit_queue);
+ if (PGM_UNLIKELY(NULL == tail_link)) {
+ pgm_debug ("retransmit queue empty on peek.");
+ return NULL;
+ }
+
+ struct pgm_sk_buff_t* skb = (struct pgm_sk_buff_t*)tail_link;
+ pgm_assert (pgm_skb_is_valid (skb));
+ pgm_txw_state_t* state = (pgm_txw_state_t*)&skb->cb;
+
+ if (!state->waiting_retransmit) {
+ pgm_assert (((const pgm_list_t*)skb)->next == NULL);
+ pgm_assert (((const pgm_list_t*)skb)->prev == NULL);
+ }
+/* packet payload still in transit */
+ if (PGM_UNLIKELY(1 != pgm_atomic_read32 (&skb->users))) {
+ pgm_trace (PGM_LOG_ROLE_TX_WINDOW,_("Retransmit sqn #%" PRIu32 " is still in transit in transmit thread."), skb->sequence);
+ return NULL;
+ }
+ if (!state->pkt_cnt_requested) {
+ return skb;
+ }
+
+/* generate parity packet to satisify request */
+ const uint8_t rs_h = state->pkt_cnt_sent % (window->rs.n - window->rs.k);
+ const uint32_t tg_sqn_mask = 0xffffffff << window->tg_sqn_shift;
+ const uint32_t tg_sqn = skb->sequence & tg_sqn_mask;
+ bool is_var_pktlen = FALSE;
+ bool is_op_encoded = FALSE;
+ uint16_t parity_length = 0;
+ const pgm_gf8_t* src[ window->rs.k ];
+ for (uint_fast8_t i = 0; i < window->rs.k; i++)
+ {
+ const struct pgm_sk_buff_t* odata_skb = pgm_txw_peek (window, tg_sqn + i);
+ const uint16_t odata_tsdu_length = ntohs (odata_skb->pgm_header->pgm_tsdu_length);
+ if (!parity_length)
+ {
+ parity_length = odata_tsdu_length;
+ }
+ else if (odata_tsdu_length != parity_length)
+ {
+ is_var_pktlen = TRUE;
+ if (odata_tsdu_length > parity_length)
+ parity_length = odata_tsdu_length;
+ }
+
+ src[i] = odata_skb->data;
+ if (odata_skb->pgm_header->pgm_options & PGM_OPT_PRESENT) {
+ is_op_encoded = TRUE;
+ }
+ }
+
+/* construct basic PGM header to be completed by send_rdata() */
+ skb = window->parity_buffer;
+ skb->data = skb->tail = skb->head = skb + 1;
+
+/* space for PGM header */
+ pgm_skb_put (skb, sizeof(struct pgm_header));
+
+ skb->pgm_header = skb->data;
+ skb->pgm_data = (void*)( skb->pgm_header + 1 );
+ memcpy (skb->pgm_header->pgm_gsi, &window->tsi->gsi, sizeof(pgm_gsi_t));
+ skb->pgm_header->pgm_options = PGM_OPT_PARITY;
+
+/* append actual TSDU length if variable length packets, zero pad as necessary.
+ */
+ if (is_var_pktlen)
+ {
+ skb->pgm_header->pgm_options |= PGM_OPT_VAR_PKTLEN;
+
+ for (uint_fast8_t i = 0; i < window->rs.k; i++)
+ {
+ struct pgm_sk_buff_t* odata_skb = pgm_txw_peek (window, tg_sqn + i);
+ const uint16_t odata_tsdu_length = ntohs (odata_skb->pgm_header->pgm_tsdu_length);
+
+ pgm_assert (odata_tsdu_length == odata_skb->len);
+ pgm_assert (parity_length >= odata_tsdu_length);
+
+ if (!odata_skb->zero_padded) {
+ memset (odata_skb->tail, 0, parity_length - odata_tsdu_length);
+ *(uint16_t*)((char*)odata_skb->data + parity_length) = odata_tsdu_length;
+ odata_skb->zero_padded = 1;
+ }
+ }
+ parity_length += 2;
+ }
+
+ skb->pgm_header->pgm_tsdu_length = htons (parity_length);
+
+/* space for DATA */
+ pgm_skb_put (skb, sizeof(struct pgm_data) + parity_length);
+
+ skb->pgm_data->data_sqn = htonl ( tg_sqn | rs_h );
+
+ void* data_bytes = skb->pgm_data + 1;
+
+/* encode every option separately, currently only one applies: opt_fragment
+ */
+ if (is_op_encoded)
+ {
+ skb->pgm_header->pgm_options |= PGM_OPT_PRESENT;
+
+ struct pgm_opt_fragment null_opt_fragment;
+ const pgm_gf8_t* opt_src[ window->rs.k ];
+ memset (&null_opt_fragment, 0, sizeof(null_opt_fragment));
+ *(uint8_t*)&null_opt_fragment |= PGM_OP_ENCODED_NULL;
+ for (uint_fast8_t i = 0; i < window->rs.k; i++)
+ {
+ const struct pgm_sk_buff_t* odata_skb = pgm_txw_peek (window, tg_sqn + i);
+
+ if (odata_skb->pgm_opt_fragment)
+ {
+ pgm_assert (odata_skb->pgm_header->pgm_options & PGM_OPT_PRESENT);
+/* skip three bytes of header */
+ opt_src[i] = (pgm_gf8_t*)((char*)odata_skb->pgm_opt_fragment + sizeof (struct pgm_opt_header));
+ }
+ else
+ {
+ opt_src[i] = (pgm_gf8_t*)&null_opt_fragment;
+ }
+ }
+
+/* add options to this rdata packet */
+ const uint16_t opt_total_length = sizeof(struct pgm_opt_length) +
+ sizeof(struct pgm_opt_header) +
+ sizeof(struct pgm_opt_fragment);
+
+/* add space for PGM options */
+ pgm_skb_put (skb, opt_total_length);
+
+ struct pgm_opt_length* opt_len = data_bytes;
+ opt_len->opt_type = PGM_OPT_LENGTH;
+ opt_len->opt_length = sizeof(struct pgm_opt_length);
+ opt_len->opt_total_length = htons ( opt_total_length );
+ struct pgm_opt_header* opt_header = (struct pgm_opt_header*)(opt_len + 1);
+ opt_header->opt_type = PGM_OPT_FRAGMENT | PGM_OPT_END;
+ opt_header->opt_length = sizeof(struct pgm_opt_header) + sizeof(struct pgm_opt_fragment);
+ opt_header->opt_reserved = PGM_OP_ENCODED;
+ struct pgm_opt_fragment* opt_fragment = (struct pgm_opt_fragment*)(opt_header + 1);
+
+/* The cast below is the correct way to handle the problem.
+ * The (void *) cast is to avoid a GCC warning like:
+ *
+ * "warning: dereferencing type-punned pointer will break strict-aliasing rules"
+ */
+ pgm_rs_encode (&window->rs,
+ opt_src,
+ window->rs.k + rs_h,
+ (pgm_gf8_t*)((char*)opt_fragment + sizeof(struct pgm_opt_header)),
+ sizeof(struct pgm_opt_fragment) - sizeof(struct pgm_opt_header));
+
+ data_bytes = opt_fragment + 1;
+ }
+
+/* encode payload */
+ pgm_rs_encode (&window->rs,
+ src,
+ window->rs.k + rs_h,
+ data_bytes,
+ parity_length);
+
+/* calculate partial checksum */
+ const uint16_t tsdu_length = ntohs (skb->pgm_header->pgm_tsdu_length);
+ state->unfolded_checksum = pgm_csum_partial ((char*)skb->tail - tsdu_length, tsdu_length, 0);
+ return skb;
+}
+
+/* remove head entry from retransmit queue, will fail on assertion if queue is empty.
+ */
+
+void
+pgm_txw_retransmit_remove_head (
+ pgm_txw_t* const window
+ )
+{
+ struct pgm_sk_buff_t* skb;
+ pgm_txw_state_t* state;
+
+/* pre-conditions */
+ pgm_assert (NULL != window);
+
+ pgm_debug ("retransmit_remove_head (window:%p)",
+ (const void*)window);
+
+/* tail link is valid without lock */
+ pgm_list_t* tail_link = pgm_queue_peek_tail_link (&window->retransmit_queue);
+
+/* link must be valid for pop */
+ pgm_assert (NULL != tail_link);
+
+ skb = (struct pgm_sk_buff_t*)tail_link;
+ pgm_assert (pgm_skb_is_valid (skb));
+ pgm_assert (pgm_tsi_is_null (&skb->tsi));
+ state = (pgm_txw_state_t*)&skb->cb;
+ if (!state->waiting_retransmit)
+ {
+ pgm_assert (((const pgm_list_t*)skb)->next == NULL);
+ pgm_assert (((const pgm_list_t*)skb)->prev == NULL);
+ }
+ if (state->pkt_cnt_requested)
+ {
+ state->pkt_cnt_sent++;
+
+/* remove if all requested parity packets have been sent */
+ if (state->pkt_cnt_sent == state->pkt_cnt_requested) {
+ pgm_queue_pop_tail_link (&window->retransmit_queue);
+ state->waiting_retransmit = 0;
+ }
+ }
+ else /* selective request */
+ {
+ pgm_queue_pop_tail_link (&window->retransmit_queue);
+ state->waiting_retransmit = 0;
+ }
+}
+
+/* eof */