summaryrefslogtreecommitdiffstats
path: root/3rdparty/openpgm-svn-r1085/pgm/receiver.c
diff options
context:
space:
mode:
Diffstat (limited to '3rdparty/openpgm-svn-r1085/pgm/receiver.c')
-rw-r--r--3rdparty/openpgm-svn-r1085/pgm/receiver.c2268
1 files changed, 2268 insertions, 0 deletions
diff --git a/3rdparty/openpgm-svn-r1085/pgm/receiver.c b/3rdparty/openpgm-svn-r1085/pgm/receiver.c
new file mode 100644
index 0000000..8f26353
--- /dev/null
+++ b/3rdparty/openpgm-svn-r1085/pgm/receiver.c
@@ -0,0 +1,2268 @@
+/* vim:ts=8:sts=8:sw=4:noai:noexpandtab
+ *
+ * PGM receiver socket.
+ *
+ * Copyright (c) 2006-2010 Miru Limited.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+#include <errno.h>
+#include <impl/i18n.h>
+#include <impl/framework.h>
+#include <impl/receiver.h>
+#include <impl/sqn_list.h>
+#include <impl/timer.h>
+#include <impl/packet_parse.h>
+#include <impl/net.h>
+
+
+//#define RECEIVER_DEBUG
+//#define SPM_DEBUG
+
+#ifndef RECEIVER_DEBUG
+# define PGM_DISABLE_ASSERT
+#endif
+
+#if !defined(ENOBUFS) && defined(WSAENOBUFS)
+# define ENOBUFS WSAENOBUFS
+#endif
+#if !defined(ECONNRESET) && defined(WSAECONNRESET)
+# define ECONNRESET WSAECONNRESET
+#endif
+
+static bool send_spmr (pgm_sock_t*const restrict, pgm_peer_t*const restrict);
+static bool send_nak (pgm_sock_t*const restrict, pgm_peer_t*const restrict, const uint32_t);
+static bool send_parity_nak (pgm_sock_t*const restrict, pgm_peer_t*const restrict, const unsigned, const unsigned);
+static bool send_nak_list (pgm_sock_t*const restrict, pgm_peer_t*const restrict, const struct pgm_sqn_list_t*const restrict);
+static bool nak_rb_state (pgm_peer_t*, const pgm_time_t);
+static void nak_rpt_state (pgm_peer_t*, const pgm_time_t);
+static void nak_rdata_state (pgm_peer_t*, const pgm_time_t);
+static inline pgm_peer_t* _pgm_peer_ref (pgm_peer_t*);
+static bool on_general_poll (pgm_sock_t*const restrict, pgm_peer_t*const restrict, struct pgm_sk_buff_t*const restrict);
+static bool on_dlr_poll (pgm_sock_t*const restrict, pgm_peer_t*const restrict, struct pgm_sk_buff_t*const restrict);
+
+
+/* helpers for pgm_peer_t */
+static inline
+pgm_time_t
+next_ack_rb_expiry (
+ const pgm_rxw_t* window
+ )
+{
+ pgm_assert (NULL != window);
+ pgm_assert (NULL != window->ack_backoff_queue.tail);
+
+ const struct pgm_peer_t* peer = (const struct pgm_peer_t*)window->ack_backoff_queue.tail;
+ return peer->ack_rb_expiry;
+}
+
+static inline
+pgm_time_t
+next_nak_rb_expiry (
+ const pgm_rxw_t* window
+ )
+{
+ pgm_assert (NULL != window);
+ pgm_assert (NULL != window->nak_backoff_queue.tail);
+
+ const struct pgm_sk_buff_t* skb = (const struct pgm_sk_buff_t*)window->nak_backoff_queue.tail;
+ const pgm_rxw_state_t* state = (const pgm_rxw_state_t*)&skb->cb;
+ return state->timer_expiry;
+}
+
+static inline
+pgm_time_t
+next_nak_rpt_expiry (
+ const pgm_rxw_t* window
+ )
+{
+ pgm_assert (NULL != window);
+ pgm_assert (NULL != window->wait_ncf_queue.tail);
+
+ const struct pgm_sk_buff_t* skb = (const struct pgm_sk_buff_t*)window->wait_ncf_queue.tail;
+ const pgm_rxw_state_t* state = (const pgm_rxw_state_t*)&skb->cb;
+ return state->timer_expiry;
+}
+
+static inline
+pgm_time_t
+next_nak_rdata_expiry (
+ const pgm_rxw_t* window
+ )
+{
+ pgm_assert (NULL != window);
+ pgm_assert (NULL != window->wait_data_queue.tail);
+
+ const struct pgm_sk_buff_t* skb = (const struct pgm_sk_buff_t*)window->wait_data_queue.tail;
+ const pgm_rxw_state_t* state = (const pgm_rxw_state_t*)&skb->cb;
+ return state->timer_expiry;
+}
+
+/* calculate ACK_RB_IVL.
+ */
+static inline
+uint32_t
+ack_rb_ivl (
+ pgm_sock_t* sock
+ ) /* not const as rand() updates the seed */
+{
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+ pgm_assert_cmpuint (sock->ack_bo_ivl, >, 1);
+
+ return pgm_rand_int_range (&sock->rand_, 1 /* us */, sock->ack_bo_ivl);
+}
+
+/* calculate NAK_RB_IVL as random time interval 1 - NAK_BO_IVL.
+ */
+static inline
+uint32_t
+nak_rb_ivl (
+ pgm_sock_t* sock
+ ) /* not const as rand() updates the seed */
+{
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+ pgm_assert_cmpuint (sock->nak_bo_ivl, >, 1);
+
+ return pgm_rand_int_range (&sock->rand_, 1 /* us */, sock->nak_bo_ivl);
+}
+
+/* mark sequence as recovery failed.
+ */
+
+static
+void
+cancel_skb (
+ pgm_sock_t* restrict sock,
+ pgm_peer_t* restrict peer,
+ const struct pgm_sk_buff_t* restrict skb,
+ const pgm_time_t now
+ )
+{
+ pgm_assert (NULL != sock);
+ pgm_assert (NULL != peer);
+ pgm_assert (NULL != skb);
+ pgm_assert_cmpuint (now, >=, skb->tstamp);
+
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW, _("Lost data #%u due to cancellation."), skb->sequence);
+
+ const uint32_t fail_time = now - skb->tstamp;
+ if (!peer->max_fail_time)
+ peer->max_fail_time = peer->min_fail_time = fail_time;
+ else if (fail_time > peer->max_fail_time)
+ peer->max_fail_time = fail_time;
+ else if (fail_time < peer->min_fail_time)
+ peer->min_fail_time = fail_time;
+
+ pgm_rxw_lost (peer->window, skb->sequence);
+ PGM_HISTOGRAM_TIMES("Rx.FailTime", fail_time);
+
+/* mark receiver window for flushing on next recv() */
+ pgm_peer_set_pending (sock, peer);
+}
+
+/* check whether this receiver is the designated acker for the source
+ */
+
+static inline
+bool
+_pgm_is_acker (
+ const pgm_peer_t* restrict peer,
+ const struct pgm_sk_buff_t* restrict skb
+ )
+{
+ struct sockaddr_storage acker_nla;
+
+/* pre-conditions */
+ pgm_assert (NULL != peer);
+ pgm_assert (NULL != skb);
+ pgm_assert (NULL != skb->pgm_opt_pgmcc_data);
+
+ pgm_nla_to_sockaddr (&skb->pgm_opt_pgmcc_data->opt_nla_afi, (struct sockaddr*)&acker_nla);
+ return (0 == pgm_sockaddr_cmp ((struct sockaddr*)&acker_nla, (struct sockaddr*)&peer->sock->send_addr));
+}
+
+/* is the source holding an acker election
+ */
+
+static inline
+bool
+_pgm_is_acker_election (
+ const struct pgm_sk_buff_t* restrict skb
+ )
+{
+ pgm_assert (NULL != skb);
+ pgm_assert (NULL != skb->pgm_opt_pgmcc_data);
+
+ const unsigned acker_afi = ntohs (skb->pgm_opt_pgmcc_data->opt_nla_afi);
+ switch (acker_afi) {
+ case AFI_IP:
+ if (INADDR_ANY == skb->pgm_opt_pgmcc_data->opt_nla.s_addr)
+ return TRUE;
+ break;
+
+ case AFI_IP6:
+ if (0 == memcmp (&skb->pgm_opt_pgmcc_data->opt_nla, &in6addr_any, sizeof(in6addr_any)))
+ return TRUE;
+ break;
+
+ default: break;
+ }
+
+ return FALSE;
+}
+
+/* add state for an ACK on a data packet.
+ */
+
+static inline
+void
+_pgm_add_ack (
+ pgm_peer_t* const restrict peer,
+ const pgm_time_t ack_rb_expiry
+ )
+{
+ peer->ack_rb_expiry = ack_rb_expiry;
+ pgm_queue_push_head_link (&peer->window->ack_backoff_queue, &peer->ack_link);
+}
+
+/* remove outstanding ACK
+ */
+
+static inline
+void
+_pgm_remove_ack (
+ pgm_peer_t* const restrict peer
+ )
+{
+ pgm_assert (!pgm_queue_is_empty (&peer->window->ack_backoff_queue));
+ pgm_queue_unlink (&peer->window->ack_backoff_queue, &peer->ack_link);
+ peer->ack_rb_expiry = 0;
+}
+
+/* increase reference count for peer object
+ *
+ * on success, returns peer object.
+ */
+
+static inline
+pgm_peer_t*
+_pgm_peer_ref (
+ pgm_peer_t* peer
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != peer);
+
+ pgm_atomic_inc32 (&peer->ref_count);
+ return peer;
+}
+
+/* decrease reference count of peer object, destroying on last reference.
+ */
+
+void
+pgm_peer_unref (
+ pgm_peer_t* peer
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != peer);
+
+ if (pgm_atomic_exchange_and_add32 (&peer->ref_count, (uint32_t)-1) != 1)
+ return;
+
+/* receive window */
+ pgm_rxw_destroy (peer->window);
+ peer->window = NULL;
+
+/* object */
+ pgm_free (peer);
+ peer = NULL;
+}
+
+/* find PGM options in received SKB.
+ *
+ * returns TRUE if opt_fragment is found, otherwise FALSE is returned.
+ */
+
+static
+bool
+get_pgm_options (
+ struct pgm_sk_buff_t* const skb
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != skb);
+ pgm_assert (NULL != skb->pgm_data);
+
+ struct pgm_opt_header* opt_header = (struct pgm_opt_header*)(skb->pgm_data + 1);
+ bool found_opt = FALSE;
+
+ pgm_assert (opt_header->opt_type == PGM_OPT_LENGTH);
+ pgm_assert (opt_header->opt_length == sizeof(struct pgm_opt_length));
+
+ pgm_debug ("get_pgm_options (skb:%p)",
+ (const void*)skb);
+
+ skb->pgm_opt_fragment = NULL;
+ skb->pgm_opt_pgmcc_data = NULL;
+
+/* always at least two options, first is always opt_length */
+ do {
+ opt_header = (struct pgm_opt_header*)((char*)opt_header + opt_header->opt_length);
+/* option overflow */
+ if (PGM_UNLIKELY((char*)opt_header > (char*)skb->data))
+ break;
+
+ switch (opt_header->opt_type & PGM_OPT_MASK) {
+ case PGM_OPT_FRAGMENT:
+ skb->pgm_opt_fragment = (struct pgm_opt_fragment*)(opt_header + 1);
+ found_opt = TRUE;
+ break;
+
+ case PGM_OPT_PGMCC_DATA:
+ skb->pgm_opt_pgmcc_data = (struct pgm_opt_pgmcc_data*)(opt_header + 1);
+ found_opt = TRUE;
+ break;
+
+ default: break;
+ }
+
+ } while (!(opt_header->opt_type & PGM_OPT_END));
+ return found_opt;
+}
+
+/* a peer in the context of the sock is another party on the network sending PGM
+ * packets. for each peer we need a receive window and network layer address (nla) to
+ * which nak requests can be forwarded to.
+ *
+ * on success, returns new peer object.
+ */
+
+pgm_peer_t*
+pgm_new_peer (
+ pgm_sock_t* const restrict sock,
+ const pgm_tsi_t* const restrict tsi,
+ const struct sockaddr* const restrict src_addr,
+ const socklen_t src_addrlen,
+ const struct sockaddr* const restrict dst_addr,
+ const socklen_t dst_addrlen,
+ const pgm_time_t now
+ )
+{
+ pgm_peer_t* peer;
+
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+ pgm_assert (NULL != src_addr);
+ pgm_assert (src_addrlen > 0);
+ pgm_assert (NULL != dst_addr);
+ pgm_assert (dst_addrlen > 0);
+
+#ifdef PGM_DEBUG
+ char saddr[INET6_ADDRSTRLEN], daddr[INET6_ADDRSTRLEN];
+ pgm_sockaddr_ntop (src_addr, saddr, sizeof(saddr));
+ pgm_sockaddr_ntop (dst_addr, daddr, sizeof(daddr));
+ pgm_debug ("pgm_new_peer (sock:%p tsi:%s src-addr:%s src-addrlen:%u dst-addr:%s dst-addrlen:%u)",
+ (void*)sock, pgm_tsi_print (tsi), saddr, (unsigned)src_addrlen, daddr, (unsigned)dst_addrlen);
+#endif
+
+ peer = pgm_new0 (pgm_peer_t, 1);
+ peer->expiry = now + sock->peer_expiry;
+ peer->sock = sock;
+ memcpy (&peer->tsi, tsi, sizeof(pgm_tsi_t));
+ memcpy (&peer->group_nla, dst_addr, dst_addrlen);
+ memcpy (&peer->local_nla, src_addr, src_addrlen);
+/* port at same location for sin/sin6 */
+ ((struct sockaddr_in*)&peer->local_nla)->sin_port = htons (sock->udp_encap_ucast_port);
+ ((struct sockaddr_in*)&peer->nla)->sin_port = htons (sock->udp_encap_ucast_port);
+
+/* lock on rx window */
+ peer->window = pgm_rxw_create (&peer->tsi,
+ sock->max_tpdu,
+ sock->rxw_sqns,
+ sock->rxw_secs,
+ sock->rxw_max_rte,
+ sock->ack_c_p);
+ peer->spmr_expiry = now + sock->spmr_expiry;
+
+/* add peer to hash table and linked list */
+ pgm_rwlock_writer_lock (&sock->peers_lock);
+ pgm_peer_t* entry = _pgm_peer_ref (peer);
+ pgm_hashtable_insert (sock->peers_hashtable, &peer->tsi, entry);
+ peer->peers_link.data = peer;
+ sock->peers_list = pgm_list_prepend_link (sock->peers_list, &peer->peers_link);
+ pgm_rwlock_writer_unlock (&sock->peers_lock);
+
+ pgm_timer_lock (sock);
+ if (pgm_time_after( sock->next_poll, peer->spmr_expiry ))
+ sock->next_poll = peer->spmr_expiry;
+ pgm_timer_unlock (sock);
+ return peer;
+}
+
+/* copy any contiguous buffers in the peer list to the provided
+ * message vector.
+ * returns -ENOBUFS if the vector is full, returns -ECONNRESET if
+ * data loss is detected, returns 0 when all peers flushed.
+ */
+
+int
+pgm_flush_peers_pending (
+ pgm_sock_t* const restrict sock,
+ struct pgm_msgv_t** restrict pmsg,
+ const struct pgm_msgv_t* const msg_end, /* at least pmsg + 1, same object */
+ size_t* const restrict bytes_read, /* added to, not set */
+ unsigned* const restrict data_read
+ )
+{
+ int retval = 0;
+
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+ pgm_assert (NULL != pmsg);
+ pgm_assert (NULL != *pmsg);
+ pgm_assert (NULL != msg_end);
+ pgm_assert (NULL != bytes_read);
+ pgm_assert (NULL != data_read);
+
+ pgm_debug ("pgm_flush_peers_pending (sock:%p pmsg:%p msg-end:%p bytes-read:%p data-read:%p)",
+ (const void*)sock, (const void*)pmsg, (const void*)msg_end, (const void*)bytes_read, (const void*)data_read);
+
+ while (sock->peers_pending)
+ {
+ pgm_peer_t* peer = sock->peers_pending->data;
+ if (peer->last_commit && peer->last_commit < sock->last_commit)
+ pgm_rxw_remove_commit (peer->window);
+ const ssize_t peer_bytes = pgm_rxw_readv (peer->window, pmsg, msg_end - *pmsg + 1);
+
+ if (peer->last_cumulative_losses != ((pgm_rxw_t*)peer->window)->cumulative_losses)
+ {
+ sock->is_reset = TRUE;
+ peer->lost_count = ((pgm_rxw_t*)peer->window)->cumulative_losses - peer->last_cumulative_losses;
+ peer->last_cumulative_losses = ((pgm_rxw_t*)peer->window)->cumulative_losses;
+ }
+
+ if (peer_bytes >= 0)
+ {
+ (*bytes_read) += peer_bytes;
+ (*data_read) ++;
+ peer->last_commit = sock->last_commit;
+ if (*pmsg > msg_end) { /* commit full */
+ retval = -ENOBUFS;
+ break;
+ }
+ } else
+ peer->last_commit = 0;
+ if (PGM_UNLIKELY(sock->is_reset)) {
+ retval = -ECONNRESET;
+ break;
+ }
+/* clear this reference and move to next */
+ sock->peers_pending = pgm_slist_remove_first (sock->peers_pending);
+ }
+
+ return retval;
+}
+
+/* edge trigerred has receiver pending events
+ */
+
+bool
+pgm_peer_has_pending (
+ pgm_peer_t* const peer
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != peer);
+
+ if (NULL == peer->pending_link.data && ((pgm_rxw_t*)peer->window)->has_event) {
+ ((pgm_rxw_t*)peer->window)->has_event = 0;
+ return TRUE;
+ }
+ return FALSE;
+}
+
+/* set receiver in pending event queue
+ */
+
+void
+pgm_peer_set_pending (
+ pgm_sock_t* const restrict sock,
+ pgm_peer_t* const restrict peer
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+ pgm_assert (NULL != peer);
+
+ if (peer->pending_link.data) return;
+ peer->pending_link.data = peer;
+ sock->peers_pending = pgm_slist_prepend_link (sock->peers_pending, &peer->pending_link);
+}
+
+/* Create a new error SKB detailing data loss.
+ */
+
+void
+pgm_set_reset_error (
+ pgm_sock_t* const restrict sock,
+ pgm_peer_t* const restrict source,
+ struct pgm_msgv_t* const restrict msgv
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+ pgm_assert (NULL != source);
+ pgm_assert (NULL != msgv);
+
+ struct pgm_sk_buff_t* error_skb = pgm_alloc_skb (0);
+ error_skb->sock = sock;
+ error_skb->tstamp = pgm_time_update_now ();
+ memcpy (&error_skb->tsi, &source->tsi, sizeof(pgm_tsi_t));
+ error_skb->sequence = source->lost_count;
+ msgv->msgv_skb[0] = error_skb;
+ msgv->msgv_len = 1;
+}
+
+/* SPM indicate start of a session, continued presence of a session, or flushing final packets
+ * of a session.
+ *
+ * returns TRUE on valid packet, FALSE on invalid packet or duplicate SPM sequence number.
+ */
+
+bool
+pgm_on_spm (
+ pgm_sock_t* const restrict sock,
+ pgm_peer_t* const restrict source,
+ struct pgm_sk_buff_t* const restrict skb
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+ pgm_assert (NULL != source);
+ pgm_assert (NULL != skb);
+
+ pgm_debug("pgm_on_spm (sock:%p source:%p skb:%p)",
+ (const void*)sock, (const void*)source, (const void*)skb);
+
+ if (PGM_UNLIKELY(!pgm_verify_spm (skb))) {
+ pgm_trace(PGM_LOG_ROLE_NETWORK,_("Discarded invalid SPM."));
+ source->cumulative_stats[PGM_PC_RECEIVER_MALFORMED_SPMS]++;
+ return FALSE;
+ }
+
+ const struct pgm_spm* spm = (struct pgm_spm*) skb->data;
+ const struct pgm_spm6* spm6 = (struct pgm_spm6*)skb->data;
+ const uint32_t spm_sqn = ntohl (spm->spm_sqn);
+
+/* check for advancing sequence number, or first SPM */
+ if (PGM_LIKELY(pgm_uint32_gte (spm_sqn, source->spm_sqn)))
+ {
+/* copy NLA for replies */
+ pgm_nla_to_sockaddr (&spm->spm_nla_afi, (struct sockaddr*)&source->nla);
+
+/* save sequence number */
+ source->spm_sqn = spm_sqn;
+
+/* update receive window */
+ const pgm_time_t nak_rb_expiry = skb->tstamp + nak_rb_ivl (sock);
+ const unsigned naks = pgm_rxw_update (source->window,
+ ntohl (spm->spm_lead),
+ ntohl (spm->spm_trail),
+ skb->tstamp,
+ nak_rb_expiry);
+ if (naks) {
+ pgm_timer_lock (sock);
+ if (pgm_time_after (sock->next_poll, nak_rb_expiry))
+ sock->next_poll = nak_rb_expiry;
+ pgm_timer_unlock (sock);
+ }
+
+/* mark receiver window for flushing on next recv() */
+ const pgm_rxw_t* window = source->window;
+ if (window->cumulative_losses != source->last_cumulative_losses &&
+ !source->pending_link.data)
+ {
+ sock->is_reset = TRUE;
+ source->lost_count = window->cumulative_losses - source->last_cumulative_losses;
+ source->last_cumulative_losses = window->cumulative_losses;
+ pgm_peer_set_pending (sock, source);
+ }
+ }
+ else
+ { /* does not advance SPM sequence number */
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Discarded duplicate SPM."));
+ source->cumulative_stats[PGM_PC_RECEIVER_DUP_SPMS]++;
+ return FALSE;
+ }
+
+/* check whether peer can generate parity packets */
+ if (skb->pgm_header->pgm_options & PGM_OPT_PRESENT)
+ {
+ const struct pgm_opt_length* opt_len = (AF_INET6 == source->nla.ss_family) ?
+ (const struct pgm_opt_length*)(spm6 + 1) :
+ (const struct pgm_opt_length*)(spm + 1);
+ if (PGM_UNLIKELY(opt_len->opt_type != PGM_OPT_LENGTH))
+ {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Discarded malformed SPM."));
+ source->cumulative_stats[PGM_PC_RECEIVER_MALFORMED_SPMS]++;
+ return FALSE;
+ }
+ if (PGM_UNLIKELY(opt_len->opt_length != sizeof(struct pgm_opt_length)))
+ {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Discarded malformed SPM."));
+ source->cumulative_stats[PGM_PC_RECEIVER_MALFORMED_SPMS]++;
+ return FALSE;
+ }
+/* TODO: check for > 16 options & past packet end */
+ const struct pgm_opt_header* opt_header = (const struct pgm_opt_header*)opt_len;
+ do {
+ opt_header = (const struct pgm_opt_header*)((const char*)opt_header + opt_header->opt_length);
+ if ((opt_header->opt_type & PGM_OPT_MASK) == PGM_OPT_PARITY_PRM)
+ {
+ const struct pgm_opt_parity_prm* opt_parity_prm = (const struct pgm_opt_parity_prm*)(opt_header + 1);
+ if (PGM_UNLIKELY((opt_parity_prm->opt_reserved & PGM_PARITY_PRM_MASK) == 0))
+ {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Discarded malformed SPM."));
+ source->cumulative_stats[PGM_PC_RECEIVER_MALFORMED_SPMS]++;
+ return FALSE;
+ }
+
+ const uint32_t parity_prm_tgs = ntohl (opt_parity_prm->parity_prm_tgs);
+ if (PGM_UNLIKELY(parity_prm_tgs < 2 || parity_prm_tgs > 128))
+ {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Discarded malformed SPM."));
+ source->cumulative_stats[PGM_PC_RECEIVER_MALFORMED_SPMS]++;
+ return FALSE;
+ }
+
+ source->has_proactive_parity = opt_parity_prm->opt_reserved & PGM_PARITY_PRM_PRO;
+ source->has_ondemand_parity = opt_parity_prm->opt_reserved & PGM_PARITY_PRM_OND;
+ if (source->has_proactive_parity || source->has_ondemand_parity) {
+ source->is_fec_enabled = 1;
+ pgm_rxw_update_fec (source->window, parity_prm_tgs);
+ }
+ }
+ } while (!(opt_header->opt_type & PGM_OPT_END));
+ }
+
+/* either way bump expiration timer */
+ source->expiry = skb->tstamp + sock->peer_expiry;
+ source->spmr_expiry = 0;
+ if (source->spmr_tstamp > 0) {
+ PGM_HISTOGRAM_TIMES("Rx.SpmRequestResponseTime", skb->tstamp - source->spmr_tstamp);
+ source->spmr_tstamp = 0;
+ }
+ return TRUE;
+}
+
+/* Multicast peer-to-peer NAK handling, pretty much the same as a NCF but different direction
+ *
+ * if NAK is valid, returns TRUE. on error, FALSE is returned.
+ */
+
+bool
+pgm_on_peer_nak (
+ pgm_sock_t* const restrict sock,
+ pgm_peer_t* const restrict peer,
+ struct pgm_sk_buff_t* const restrict skb
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+ pgm_assert (NULL != peer);
+ pgm_assert (NULL != skb);
+
+ pgm_debug ("pgm_on_peer_nak (sock:%p peer:%p skb:%p)",
+ (const void*)sock, (const void*)peer, (const void*)skb);
+
+ if (PGM_UNLIKELY(!pgm_verify_nak (skb)))
+ {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Discarded invalid multicast NAK."));
+ peer->cumulative_stats[PGM_PC_RECEIVER_NAK_ERRORS]++;
+ return FALSE;
+ }
+
+ const struct pgm_nak* nak = (struct pgm_nak*) skb->data;
+ const struct pgm_nak6* nak6 = (struct pgm_nak6*)skb->data;
+
+/* NAK_SRC_NLA must not contain our sock unicast NLA */
+ struct sockaddr_storage nak_src_nla;
+ pgm_nla_to_sockaddr (&nak->nak_src_nla_afi, (struct sockaddr*)&nak_src_nla);
+ if (PGM_UNLIKELY(pgm_sockaddr_cmp ((struct sockaddr*)&nak_src_nla, (struct sockaddr*)&sock->send_addr) == 0))
+ {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Discarded multicast NAK on NLA mismatch."));
+ return FALSE;
+ }
+
+/* NAK_GRP_NLA contains one of our sock receive multicast groups: the sources send multicast group */
+ struct sockaddr_storage nak_grp_nla;
+ pgm_nla_to_sockaddr ((AF_INET6 == nak_src_nla.ss_family) ? &nak6->nak6_grp_nla_afi : &nak->nak_grp_nla_afi, (struct sockaddr*)&nak_grp_nla);
+ bool found = FALSE;
+ for (unsigned i = 0; i < sock->recv_gsr_len; i++)
+ {
+ if (pgm_sockaddr_cmp ((struct sockaddr*)&nak_grp_nla, (struct sockaddr*)&sock->recv_gsr[i].gsr_group) == 0)
+ {
+ found = TRUE;
+ break;
+ }
+ }
+
+ if (PGM_UNLIKELY(!found)) {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Discarded multicast NAK on multicast group mismatch."));
+ return FALSE;
+ }
+
+/* handle as NCF */
+ int status = pgm_rxw_confirm (peer->window,
+ ntohl (nak->nak_sqn),
+ skb->tstamp,
+ skb->tstamp + sock->nak_rdata_ivl,
+ skb->tstamp + nak_rb_ivl(sock));
+ if (PGM_RXW_UPDATED == status || PGM_RXW_APPENDED == status)
+ peer->cumulative_stats[PGM_PC_RECEIVER_SELECTIVE_NAKS_SUPPRESSED]++;
+
+/* check NAK list */
+ const uint32_t* nak_list = NULL;
+ unsigned nak_list_len = 0;
+ if (skb->pgm_header->pgm_options & PGM_OPT_PRESENT)
+ {
+ const struct pgm_opt_length* opt_len = (AF_INET6 == nak_src_nla.ss_family) ?
+ (const struct pgm_opt_length*)(nak6 + 1) :
+ (const struct pgm_opt_length*)(nak + 1);
+ if (PGM_UNLIKELY(opt_len->opt_type != PGM_OPT_LENGTH))
+ {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Discarded malformed multicast NAK."));
+ peer->cumulative_stats[PGM_PC_RECEIVER_MALFORMED_NCFS]++;
+ return FALSE;
+ }
+ if (PGM_UNLIKELY(opt_len->opt_length != sizeof(struct pgm_opt_length)))
+ {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Discarded malformed multicast NAK."));
+ peer->cumulative_stats[PGM_PC_RECEIVER_MALFORMED_NCFS]++;
+ return FALSE;
+ }
+/* TODO: check for > 16 options & past packet end */
+ const struct pgm_opt_header* opt_header = (const struct pgm_opt_header*)opt_len;
+ do {
+ opt_header = (const struct pgm_opt_header*)((const char*)opt_header + opt_header->opt_length);
+ if ((opt_header->opt_type & PGM_OPT_MASK) == PGM_OPT_NAK_LIST)
+ {
+ nak_list = ((const struct pgm_opt_nak_list*)(opt_header + 1))->opt_sqn;
+ nak_list_len = ( opt_header->opt_length - sizeof(struct pgm_opt_header) - sizeof(uint8_t) ) / sizeof(uint32_t);
+ break;
+ }
+ } while (!(opt_header->opt_type & PGM_OPT_END));
+ }
+
+ while (nak_list_len) {
+ status = pgm_rxw_confirm (peer->window,
+ ntohl (*nak_list),
+ skb->tstamp,
+ skb->tstamp + sock->nak_rdata_ivl,
+ skb->tstamp + nak_rb_ivl(sock));
+ if (PGM_RXW_UPDATED == status || PGM_RXW_APPENDED == status)
+ peer->cumulative_stats[PGM_PC_RECEIVER_SELECTIVE_NAKS_SUPPRESSED]++;
+ nak_list++;
+ nak_list_len--;
+ }
+
+/* mark receiver window for flushing on next recv() */
+ const pgm_rxw_t* window = peer->window;
+ if (window->cumulative_losses != peer->last_cumulative_losses &&
+ !peer->pending_link.data)
+ {
+ sock->is_reset = TRUE;
+ peer->lost_count = window->cumulative_losses - peer->last_cumulative_losses;
+ peer->last_cumulative_losses = window->cumulative_losses;
+ pgm_peer_set_pending (sock, peer);
+ }
+ return TRUE;
+}
+
+/* NCF confirming receipt of a NAK from this sock or another on the LAN segment.
+ *
+ * Packet contents will match exactly the sent NAK, although not really that helpful.
+ *
+ * if NCF is valid, returns TRUE. on error, FALSE is returned.
+ */
+
+bool
+pgm_on_ncf (
+ pgm_sock_t* const restrict sock,
+ pgm_peer_t* const restrict source,
+ struct pgm_sk_buff_t* const restrict skb
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+ pgm_assert (NULL != source);
+ pgm_assert (NULL != skb);
+
+ pgm_debug ("pgm_on_ncf (sock:%p source:%p skb:%p)",
+ (const void*)sock, (const void*)source, (const void*)skb);
+
+ if (PGM_UNLIKELY(!pgm_verify_ncf (skb)))
+ {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Discarded invalid NCF."));
+ source->cumulative_stats[PGM_PC_RECEIVER_MALFORMED_NCFS]++;
+ return FALSE;
+ }
+
+ const struct pgm_nak* ncf = (struct pgm_nak*) skb->data;
+ const struct pgm_nak6* ncf6 = (struct pgm_nak6*)skb->data;
+
+/* NCF_SRC_NLA may contain our sock unicast NLA, we don't really care */
+ struct sockaddr_storage ncf_src_nla;
+ pgm_nla_to_sockaddr (&ncf->nak_src_nla_afi, (struct sockaddr*)&ncf_src_nla);
+
+#if 0
+ if (PGM(pgm_sockaddr_cmp ((struct sockaddr*)&ncf_src_nla, (struct sockaddr*)&sock->send_addr) != 0)) {
+ g_trace ("INFO", "Discarded NCF on NLA mismatch.");
+ peer->cumulative_stats[PGM_PC_RECEIVER_PACKETS_DISCARDED]++;
+ return FALSE;
+ }
+#endif
+
+/* NCF_GRP_NLA contains our sock multicast group */
+ struct sockaddr_storage ncf_grp_nla;
+ pgm_nla_to_sockaddr ((AF_INET6 == ncf_src_nla.ss_family) ? &ncf6->nak6_grp_nla_afi : &ncf->nak_grp_nla_afi, (struct sockaddr*)&ncf_grp_nla);
+ if (PGM_UNLIKELY(pgm_sockaddr_cmp ((struct sockaddr*)&ncf_grp_nla, (struct sockaddr*)&sock->send_gsr.gsr_group) != 0))
+ {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Discarded NCF on multicast group mismatch."));
+ return FALSE;
+ }
+
+ const pgm_time_t ncf_rdata_ivl = skb->tstamp + sock->nak_rdata_ivl;
+ const pgm_time_t ncf_rb_ivl = skb->tstamp + nak_rb_ivl(sock);
+ int status = pgm_rxw_confirm (source->window,
+ ntohl (ncf->nak_sqn),
+ skb->tstamp,
+ ncf_rdata_ivl,
+ ncf_rb_ivl);
+ if (PGM_RXW_UPDATED == status || PGM_RXW_APPENDED == status)
+ {
+ const pgm_time_t ncf_ivl = (PGM_RXW_APPENDED == status) ? ncf_rb_ivl : ncf_rdata_ivl;
+ pgm_timer_lock (sock);
+ if (pgm_time_after (sock->next_poll, ncf_ivl)) {
+ sock->next_poll = ncf_ivl;
+ }
+ pgm_timer_unlock (sock);
+ source->cumulative_stats[PGM_PC_RECEIVER_SELECTIVE_NAKS_SUPPRESSED]++;
+ }
+
+/* check NCF list */
+ const uint32_t* ncf_list = NULL;
+ unsigned ncf_list_len = 0;
+ if (skb->pgm_header->pgm_options & PGM_OPT_PRESENT)
+ {
+ const struct pgm_opt_length* opt_len = (AF_INET6 == ncf_src_nla.ss_family) ?
+ (const struct pgm_opt_length*)(ncf6 + 1) :
+ (const struct pgm_opt_length*)(ncf + 1);
+ if (PGM_UNLIKELY(opt_len->opt_type != PGM_OPT_LENGTH))
+ {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Discarded malformed NCF."));
+ source->cumulative_stats[PGM_PC_RECEIVER_MALFORMED_NCFS]++;
+ return FALSE;
+ }
+ if (PGM_UNLIKELY(opt_len->opt_length != sizeof(struct pgm_opt_length)))
+ {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Discarded malformed NCF."));
+ source->cumulative_stats[PGM_PC_RECEIVER_MALFORMED_NCFS]++;
+ return FALSE;
+ }
+/* TODO: check for > 16 options & past packet end */
+ const struct pgm_opt_header* opt_header = (const struct pgm_opt_header*)opt_len;
+ do {
+ opt_header = (const struct pgm_opt_header*)((const char*)opt_header + opt_header->opt_length);
+ if ((opt_header->opt_type & PGM_OPT_MASK) == PGM_OPT_NAK_LIST)
+ {
+ ncf_list = ((const struct pgm_opt_nak_list*)(opt_header + 1))->opt_sqn;
+ ncf_list_len = ( opt_header->opt_length - sizeof(struct pgm_opt_header) - sizeof(uint8_t) ) / sizeof(uint32_t);
+ break;
+ }
+ } while (!(opt_header->opt_type & PGM_OPT_END));
+ }
+
+ pgm_debug ("NCF contains 1+%d sequence numbers.", ncf_list_len);
+ while (ncf_list_len)
+ {
+ status = pgm_rxw_confirm (source->window,
+ ntohl (*ncf_list),
+ skb->tstamp,
+ ncf_rdata_ivl,
+ ncf_rb_ivl);
+ if (PGM_RXW_UPDATED == status || PGM_RXW_APPENDED == status)
+ source->cumulative_stats[PGM_PC_RECEIVER_SELECTIVE_NAKS_SUPPRESSED]++;
+ ncf_list++;
+ ncf_list_len--;
+ }
+
+/* mark receiver window for flushing on next recv() */
+ const pgm_rxw_t* window = source->window;
+ if (window->cumulative_losses != source->last_cumulative_losses &&
+ !source->pending_link.data)
+ {
+ sock->is_reset = TRUE;
+ source->lost_count = window->cumulative_losses - source->last_cumulative_losses;
+ source->last_cumulative_losses = window->cumulative_losses;
+ pgm_peer_set_pending (sock, source);
+ }
+ return TRUE;
+}
+
+/* send SPM-request to a new peer, this packet type has no contents
+ *
+ * on success, TRUE is returned, if operation would block FALSE is
+ * returned.
+ */
+
+static
+bool
+send_spmr (
+ pgm_sock_t* const restrict sock,
+ pgm_peer_t* const restrict source
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+ pgm_assert (NULL != source);
+
+ pgm_debug ("send_spmr (sock:%p source:%p)",
+ (const void*)sock, (const void*)source);
+
+ const size_t tpdu_length = sizeof(struct pgm_header);
+ char buf[ tpdu_length ];
+ struct pgm_header* header = (struct pgm_header*)buf;
+ memcpy (header->pgm_gsi, &source->tsi.gsi, sizeof(pgm_gsi_t));
+/* dport & sport reversed communicating upstream */
+ header->pgm_sport = sock->dport;
+ header->pgm_dport = source->tsi.sport;
+ header->pgm_type = PGM_SPMR;
+ header->pgm_options = 0;
+ header->pgm_tsdu_length = 0;
+ header->pgm_checksum = 0;
+ header->pgm_checksum = pgm_csum_fold (pgm_csum_partial (buf, tpdu_length, 0));
+
+/* send multicast SPMR TTL 1 */
+ pgm_sockaddr_multicast_hops (sock->send_sock, sock->send_gsr.gsr_group.ss_family, 1);
+ ssize_t sent = pgm_sendto (sock,
+ FALSE, /* not rate limited */
+ FALSE, /* regular socket */
+ header,
+ tpdu_length,
+ (struct sockaddr*)&sock->send_gsr.gsr_group,
+ pgm_sockaddr_len ((struct sockaddr*)&sock->send_gsr.gsr_group));
+ if (sent < 0 && (EAGAIN == errno || ENOBUFS == errno))
+ return FALSE;
+
+/* send unicast SPMR with regular TTL */
+ pgm_sockaddr_multicast_hops (sock->send_sock, sock->send_gsr.gsr_group.ss_family, sock->hops);
+ sent = pgm_sendto (sock,
+ FALSE,
+ FALSE,
+ header,
+ tpdu_length,
+ (struct sockaddr*)&source->local_nla,
+ pgm_sockaddr_len ((struct sockaddr*)&source->local_nla));
+ if (sent < 0 && EAGAIN == errno)
+ return FALSE;
+
+ sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT] += tpdu_length * 2;
+ return TRUE;
+}
+
+/* send selective NAK for one sequence number.
+ *
+ * on success, TRUE is returned, returns FALSE if would block on operation.
+ */
+
+static
+bool
+send_nak (
+ pgm_sock_t* const restrict sock,
+ pgm_peer_t* const restrict source,
+ const uint32_t sequence
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+ pgm_assert (NULL != source);
+
+ pgm_debug ("send_nak (sock:%p peer:%p sequence:%" PRIu32 ")",
+ (void*)sock, (void*)source, sequence);
+
+ size_t tpdu_length = sizeof(struct pgm_header) + sizeof(struct pgm_nak);
+ if (AF_INET6 == source->nla.ss_family)
+ tpdu_length += sizeof(struct pgm_nak6) - sizeof(struct pgm_nak);
+ char buf[ tpdu_length ];
+ struct pgm_header* header = (struct pgm_header*)buf;
+ struct pgm_nak* nak = (struct pgm_nak* )(header + 1);
+ struct pgm_nak6* nak6 = (struct pgm_nak6*)(header + 1);
+ memcpy (header->pgm_gsi, &source->tsi.gsi, sizeof(pgm_gsi_t));
+
+/* dport & sport swap over for a nak */
+ header->pgm_sport = sock->dport;
+ header->pgm_dport = source->tsi.sport;
+ header->pgm_type = PGM_NAK;
+ header->pgm_options = 0;
+ header->pgm_tsdu_length = 0;
+
+/* NAK */
+ nak->nak_sqn = htonl (sequence);
+
+/* source nla */
+ pgm_sockaddr_to_nla ((struct sockaddr*)&source->nla, (char*)&nak->nak_src_nla_afi);
+
+/* group nla: we match the NAK NLA to the same as advertised by the source, we might
+ * be listening to multiple multicast groups
+ */
+ pgm_sockaddr_to_nla ((struct sockaddr*)&source->group_nla,
+ (AF_INET6 == source->nla.ss_family) ? (char*)&nak6->nak6_grp_nla_afi : (char*)&nak->nak_grp_nla_afi);
+
+ header->pgm_checksum = 0;
+ header->pgm_checksum = pgm_csum_fold (pgm_csum_partial (buf, tpdu_length, 0));
+
+ const ssize_t sent = pgm_sendto (sock,
+ FALSE, /* not rate limited */
+ TRUE, /* with router alert */
+ header,
+ tpdu_length,
+ (struct sockaddr*)&source->nla,
+ pgm_sockaddr_len((struct sockaddr*)&source->nla));
+ if (sent < 0 && (EAGAIN == errno || ENOBUFS == errno))
+ return FALSE;
+
+ source->cumulative_stats[PGM_PC_RECEIVER_SELECTIVE_NAK_PACKETS_SENT]++;
+ source->cumulative_stats[PGM_PC_RECEIVER_SELECTIVE_NAKS_SENT]++;
+ return TRUE;
+}
+
+/* Send a parity NAK requesting on-demand parity packet generation.
+ *
+ * on success, TRUE is returned, returns FALSE if operation would block.
+ */
+
+static
+bool
+send_parity_nak (
+ pgm_sock_t* const restrict sock,
+ pgm_peer_t* const restrict source,
+ const uint32_t nak_tg_sqn, /* transmission group (shifted) */
+ const uint32_t nak_pkt_cnt /* count of parity packets to request */
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+ pgm_assert (NULL != source);
+ pgm_assert (nak_pkt_cnt > 0);
+
+ pgm_debug ("send_parity_nak (sock:%p source:%p nak-tg-sqn:%" PRIu32 " nak-pkt-cnt:%" PRIu32 ")",
+ (void*)sock, (void*)source, nak_tg_sqn, nak_pkt_cnt);
+
+ size_t tpdu_length = sizeof(struct pgm_header) + sizeof(struct pgm_nak);
+ if (AF_INET6 == source->nla.ss_family)
+ tpdu_length += sizeof(struct pgm_nak6) - sizeof(struct pgm_nak);
+ char buf[ tpdu_length ];
+ struct pgm_header* header = (struct pgm_header*)buf;
+ struct pgm_nak* nak = (struct pgm_nak* )(header + 1);
+ struct pgm_nak6* nak6 = (struct pgm_nak6*)(header + 1);
+ memcpy (header->pgm_gsi, &source->tsi.gsi, sizeof(pgm_gsi_t));
+
+/* dport & sport swap over for a nak */
+ header->pgm_sport = sock->dport;
+ header->pgm_dport = source->tsi.sport;
+ header->pgm_type = PGM_NAK;
+ header->pgm_options = PGM_OPT_PARITY; /* this is a parity packet */
+ header->pgm_tsdu_length = 0;
+
+/* NAK */
+ nak->nak_sqn = htonl (nak_tg_sqn | (nak_pkt_cnt - 1) );
+
+/* source nla */
+ pgm_sockaddr_to_nla ((struct sockaddr*)&source->nla, (char*)&nak->nak_src_nla_afi);
+
+/* group nla: we match the NAK NLA to the same as advertised by the source, we might
+ * be listening to multiple multicast groups
+ */
+ pgm_sockaddr_to_nla ((struct sockaddr*)&source->group_nla,
+ (AF_INET6 == source->nla.ss_family) ? (char*)&nak6->nak6_grp_nla_afi : (char*)&nak->nak_grp_nla_afi );
+ header->pgm_checksum = 0;
+ header->pgm_checksum = pgm_csum_fold (pgm_csum_partial (buf, tpdu_length, 0));
+
+ const ssize_t sent = pgm_sendto (sock,
+ FALSE, /* not rate limited */
+ TRUE, /* with router alert */
+ header,
+ tpdu_length,
+ (struct sockaddr*)&source->nla,
+ pgm_sockaddr_len((struct sockaddr*)&source->nla));
+ if (sent < 0 && (EAGAIN == errno || ENOBUFS == errno))
+ return FALSE;
+
+ source->cumulative_stats[PGM_PC_RECEIVER_PARITY_NAK_PACKETS_SENT]++;
+ source->cumulative_stats[PGM_PC_RECEIVER_PARITY_NAKS_SENT]++;
+ return TRUE;
+}
+
+/* A NAK packet with a OPT_NAK_LIST option extension
+ *
+ * on success, TRUE is returned. on error, FALSE is returned.
+ */
+
+static
+bool
+send_nak_list (
+ pgm_sock_t* const restrict sock,
+ pgm_peer_t* const restrict source,
+ const struct pgm_sqn_list_t* const restrict sqn_list
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+ pgm_assert (NULL != source);
+ pgm_assert (NULL != sqn_list);
+ pgm_assert_cmpuint (sqn_list->len, >, 1);
+ pgm_assert_cmpuint (sqn_list->len, <=, 63);
+
+#ifdef RECEIVER_DEBUG
+ char list[1024];
+ sprintf (list, "%" PRIu32, sqn_list->sqn[0]);
+ for (unsigned i = 1; i < sqn_list->len; i++) {
+ char sequence[2 + strlen("4294967295")];
+ sprintf (sequence, " %" PRIu32, sqn_list->sqn[i]);
+ strcat (list, sequence);
+ }
+ pgm_debug("send_nak_list (sock:%p source:%p sqn-list:[%s])",
+ (const void*)sock, (const void*)source, list);
+#endif
+
+ size_t tpdu_length = sizeof(struct pgm_header) +
+ sizeof(struct pgm_nak) +
+ sizeof(struct pgm_opt_length) + /* includes header */
+ sizeof(struct pgm_opt_header) +
+ sizeof(struct pgm_opt_nak_list) +
+ ( (sqn_list->len-1) * sizeof(uint32_t) );
+ if (AF_INET6 == source->nla.ss_family)
+ tpdu_length += sizeof(struct pgm_nak6) - sizeof(struct pgm_nak);
+ char buf[ tpdu_length ];
+ if (PGM_UNLIKELY(pgm_mem_gc_friendly))
+ memset (buf, 0, tpdu_length);
+ struct pgm_header* header = (struct pgm_header*)buf;
+ struct pgm_nak* nak = (struct pgm_nak* )(header + 1);
+ struct pgm_nak6* nak6 = (struct pgm_nak6*)(header + 1);
+ memcpy (header->pgm_gsi, &source->tsi.gsi, sizeof(pgm_gsi_t));
+
+/* dport & sport swap over for a nak */
+ header->pgm_sport = sock->dport;
+ header->pgm_dport = source->tsi.sport;
+ header->pgm_type = PGM_NAK;
+ header->pgm_options = PGM_OPT_PRESENT | PGM_OPT_NETWORK;
+ header->pgm_tsdu_length = 0;
+
+/* NAK */
+ nak->nak_sqn = htonl (sqn_list->sqn[0]);
+
+/* source nla */
+ pgm_sockaddr_to_nla ((struct sockaddr*)&source->nla, (char*)&nak->nak_src_nla_afi);
+
+/* group nla */
+ pgm_sockaddr_to_nla ((struct sockaddr*)&source->group_nla,
+ (AF_INET6 == source->nla.ss_family) ? (char*)&nak6->nak6_grp_nla_afi : (char*)&nak->nak_grp_nla_afi);
+
+/* OPT_NAK_LIST */
+ struct pgm_opt_length* opt_len = (AF_INET6 == source->nla.ss_family) ? (struct pgm_opt_length*)(nak6 + 1) : (struct pgm_opt_length*)(nak + 1);
+ opt_len->opt_type = PGM_OPT_LENGTH;
+ opt_len->opt_length = sizeof(struct pgm_opt_length);
+ opt_len->opt_total_length = htons ( sizeof(struct pgm_opt_length) +
+ sizeof(struct pgm_opt_header) +
+ sizeof(struct pgm_opt_nak_list) +
+ ( (sqn_list->len-1) * sizeof(uint32_t) ) );
+ struct pgm_opt_header* opt_header = (struct pgm_opt_header*)(opt_len + 1);
+ opt_header->opt_type = PGM_OPT_NAK_LIST | PGM_OPT_END;
+ opt_header->opt_length = sizeof(struct pgm_opt_header) + sizeof(struct pgm_opt_nak_list)
+ + ( (sqn_list->len-1) * sizeof(uint32_t) );
+ struct pgm_opt_nak_list* opt_nak_list = (struct pgm_opt_nak_list*)(opt_header + 1);
+ opt_nak_list->opt_reserved = 0;
+
+ for (unsigned i = 1; i < sqn_list->len; i++)
+ opt_nak_list->opt_sqn[i-1] = htonl (sqn_list->sqn[i]);
+
+ header->pgm_checksum = 0;
+ header->pgm_checksum = pgm_csum_fold (pgm_csum_partial (buf, tpdu_length, 0));
+
+ const ssize_t sent = pgm_sendto (sock,
+ FALSE, /* not rate limited */
+ FALSE, /* regular socket */
+ header,
+ tpdu_length,
+ (struct sockaddr*)&source->nla,
+ pgm_sockaddr_len((struct sockaddr*)&source->nla));
+ if ( sent != (ssize_t)tpdu_length )
+ return FALSE;
+
+ source->cumulative_stats[PGM_PC_RECEIVER_SELECTIVE_NAK_PACKETS_SENT]++;
+ source->cumulative_stats[PGM_PC_RECEIVER_SELECTIVE_NAKS_SENT] += 1 + sqn_list->len;
+ return TRUE;
+}
+
+/* send ACK upstream to source
+ *
+ * on success, TRUE is returned. on error, FALSE is returned.
+ */
+
+static
+bool
+send_ack (
+ pgm_sock_t* const restrict sock,
+ pgm_peer_t* const restrict source,
+ const pgm_time_t now
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+ pgm_assert (NULL != source);
+
+ pgm_debug ("send_ack (sock:%p source:%p now:%" PGM_TIME_FORMAT ")",
+ (const void*)sock, (const void*)source, now);
+
+ size_t tpdu_length = sizeof(struct pgm_header) +
+ sizeof(struct pgm_ack) +
+ sizeof(struct pgm_opt_length) + /* includes header */
+ sizeof(struct pgm_opt_header) +
+ sizeof(struct pgm_opt_pgmcc_feedback);
+ if (AF_INET6 == sock->send_addr.ss_family)
+ tpdu_length += sizeof(struct pgm_opt6_pgmcc_feedback) - sizeof(struct pgm_opt_pgmcc_feedback);
+ char buf[ tpdu_length ];
+ if (PGM_UNLIKELY(pgm_mem_gc_friendly))
+ memset (buf, 0, tpdu_length);
+ struct pgm_header* header = (struct pgm_header*)buf;
+ struct pgm_ack* ack = (struct pgm_ack*)(header + 1);
+ memcpy (header->pgm_gsi, &source->tsi.gsi, sizeof(pgm_gsi_t));
+
+/* dport & sport swap over for an ack */
+ header->pgm_sport = sock->dport;
+ header->pgm_dport = source->tsi.sport;
+ header->pgm_type = PGM_ACK;
+ header->pgm_options = PGM_OPT_PRESENT;
+ header->pgm_tsdu_length = 0;
+
+/* ACK */
+ ack->ack_rx_max = htonl (pgm_rxw_lead (source->window));
+ ack->ack_bitmap = htonl (source->window->bitmap);
+
+/* OPT_PGMCC_FEEDBACK */
+ struct pgm_opt_length* opt_len = (struct pgm_opt_length*)(ack + 1);
+ opt_len->opt_type = PGM_OPT_LENGTH;
+ opt_len->opt_length = sizeof(struct pgm_opt_length);
+ opt_len->opt_total_length = htons ( sizeof(struct pgm_opt_length) +
+ sizeof(struct pgm_opt_header) +
+ (AF_INET6 == sock->send_addr.ss_family) ?
+ sizeof(struct pgm_opt6_pgmcc_feedback) :
+ sizeof(struct pgm_opt_pgmcc_feedback) );
+ struct pgm_opt_header* opt_header = (struct pgm_opt_header*)(opt_len + 1);
+ opt_header->opt_type = PGM_OPT_PGMCC_FEEDBACK | PGM_OPT_END;
+ opt_header->opt_length = sizeof(struct pgm_opt_header) +
+ ( (AF_INET6 == sock->send_addr.ss_family) ?
+ sizeof(struct pgm_opt6_pgmcc_feedback) :
+ sizeof(struct pgm_opt_pgmcc_feedback) );
+ struct pgm_opt_pgmcc_feedback* opt_pgmcc_feedback = (struct pgm_opt_pgmcc_feedback*)(opt_header + 1);
+ opt_pgmcc_feedback->opt_reserved = 0;
+
+ const uint32_t t = source->ack_last_tstamp + pgm_to_msecs( now - source->last_data_tstamp );
+ opt_pgmcc_feedback->opt_tstamp = htonl (t);
+ pgm_sockaddr_to_nla ((struct sockaddr*)&sock->send_addr, (char*)&opt_pgmcc_feedback->opt_nla_afi);
+ opt_pgmcc_feedback->opt_loss_rate = htonl (source->window->data_loss);
+
+ header->pgm_checksum = 0;
+ header->pgm_checksum = pgm_csum_fold (pgm_csum_partial (buf, tpdu_length, 0));
+
+ const ssize_t sent = pgm_sendto (sock,
+ FALSE, /* not rate limited */
+ FALSE, /* regular socket */
+ header,
+ tpdu_length,
+ (struct sockaddr*)&source->nla,
+ pgm_sockaddr_len((struct sockaddr*)&source->nla));
+ if ( sent != (ssize_t)tpdu_length )
+ return FALSE;
+
+ source->cumulative_stats[PGM_PC_RECEIVER_ACKS_SENT]++;
+ return TRUE;
+}
+
+/* check all receiver windows for ACKer elections, on expiration send an ACK.
+ *
+ * returns TRUE on success, returns FALSE if operation would block.
+ */
+
+static
+bool
+ack_rb_state (
+ pgm_peer_t* peer,
+ const pgm_time_t now
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != peer);
+
+ pgm_debug ("ack_rb_state (peer:%p now:%" PGM_TIME_FORMAT ")",
+ (const void*)peer, now);
+
+ pgm_rxw_t* window = peer->window;
+ pgm_sock_t* sock = peer->sock;
+ pgm_list_t* list;
+
+ list = window->ack_backoff_queue.tail;
+ if (!list) {
+ pgm_assert (window->ack_backoff_queue.head == NULL);
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Backoff queue is empty in ack_rb_state."));
+ return TRUE;
+ } else {
+ pgm_assert (window->ack_backoff_queue.head != NULL);
+ }
+
+/* have not learned this peers NLA */
+ const bool is_valid_nla = (0 != peer->nla.ss_family);
+
+ while (list)
+ {
+ pgm_list_t* next_list_el = list->prev;
+
+/* check for ACK backoff expiration */
+ if (pgm_time_after_eq(now, peer->ack_rb_expiry))
+ {
+/* unreliable delivery */
+ _pgm_remove_ack (peer);
+
+ if (PGM_UNLIKELY(!is_valid_nla)) {
+ pgm_trace (PGM_LOG_ROLE_CONGESTION_CONTROL,_("Unable to send ACK due to unknown NLA."));
+ list = next_list_el;
+ continue;
+ }
+
+ pgm_assert (!pgm_sockaddr_is_addr_unspecified ((struct sockaddr*)&peer->nla));
+
+ if (!send_ack (sock, peer, now))
+ return FALSE;
+ }
+ else
+ { /* packet expires some time later */
+ break;
+ }
+
+ list = next_list_el;
+ }
+
+ if (window->ack_backoff_queue.length == 0)
+ {
+ pgm_assert ((struct rxw_packet*)window->ack_backoff_queue.head == NULL);
+ pgm_assert ((struct rxw_packet*)window->ack_backoff_queue.tail == NULL);
+ }
+ else
+ {
+ pgm_assert ((struct rxw_packet*)window->ack_backoff_queue.head != NULL);
+ pgm_assert ((struct rxw_packet*)window->ack_backoff_queue.tail != NULL);
+ }
+
+ if (window->ack_backoff_queue.tail)
+ {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Next expiry set in %f seconds."),
+ pgm_to_secsf(next_ack_rb_expiry(window) - now));
+ }
+ else
+ {
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("ACK backoff queue empty."));
+ }
+ return TRUE;
+}
+
+/* check all receiver windows for packets in BACK-OFF_STATE, on expiration send a NAK.
+ * update sock::next_nak_rb_timestamp for next expiration time.
+ *
+ * peer object is locked before entry.
+ *
+ * returns TRUE on success, returns FALSE if operation would block.
+ */
+
+static
+bool
+nak_rb_state (
+ pgm_peer_t* peer,
+ const pgm_time_t now
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != peer);
+
+ pgm_debug ("nak_rb_state (peer:%p now:%" PGM_TIME_FORMAT ")",
+ (const void*)peer, now);
+
+ pgm_rxw_t* window = peer->window;
+ pgm_sock_t* sock = peer->sock;
+ pgm_list_t* list;
+ struct pgm_sqn_list_t nak_list = { .len = 0 };
+
+/* send all NAKs first, lack of data is blocking contiguous processing and its
+ * better to get the notification out a.s.a.p. even though it might be waiting
+ * in a kernel queue.
+ *
+ * alternative: after each packet check for incoming data and return to the
+ * event loop. bias for shorter loops as retry count increases.
+ */
+ list = window->nak_backoff_queue.tail;
+ if (!list) {
+ pgm_assert (window->nak_backoff_queue.head == NULL);
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Backoff queue is empty in nak_rb_state."));
+ return TRUE;
+ } else {
+ pgm_assert (window->nak_backoff_queue.head != NULL);
+ }
+
+ unsigned dropped_invalid = 0;
+
+/* have not learned this peers NLA */
+ const bool is_valid_nla = 0 != peer->nla.ss_family;
+
+/* TODO: process BOTH selective and parity NAKs? */
+
+/* calculate current transmission group for parity enabled peers */
+ if (peer->has_ondemand_parity)
+ {
+ const uint32_t tg_sqn_mask = 0xffffffff << window->tg_sqn_shift;
+
+/* NAKs only generated previous to current transmission group */
+ const uint32_t current_tg_sqn = window->lead & tg_sqn_mask;
+
+ uint32_t nak_tg_sqn = 0;
+ uint32_t nak_pkt_cnt = 0;
+
+/* parity NAK generation */
+
+ while (list)
+ {
+ pgm_list_t* next_list_el = list->prev;
+ struct pgm_sk_buff_t* skb = (struct pgm_sk_buff_t*)list;
+ pgm_rxw_state_t* state = (pgm_rxw_state_t*)&skb->cb;
+
+/* check this packet for state expiration */
+ if (pgm_time_after_eq (now, state->timer_expiry))
+ {
+ if (PGM_UNLIKELY(!is_valid_nla)) {
+ dropped_invalid++;
+ pgm_rxw_lost (window, skb->sequence);
+/* mark receiver window for flushing on next recv() */
+ pgm_peer_set_pending (sock, peer);
+ list = next_list_el;
+ continue;
+ }
+
+/* TODO: parity nak lists */
+ const uint32_t tg_sqn = skb->sequence & tg_sqn_mask;
+ if ( ( nak_pkt_cnt && tg_sqn == nak_tg_sqn ) ||
+ ( !nak_pkt_cnt && tg_sqn != current_tg_sqn ) )
+ {
+ pgm_rxw_state (window, skb, PGM_PKT_STATE_WAIT_NCF);
+
+ if (!nak_pkt_cnt++)
+ nak_tg_sqn = tg_sqn;
+ state->nak_transmit_count++;
+
+#ifdef PGM_ABSOLUTE_EXPIRY
+ state->timer_expiry += sock->nak_rpt_ivl;
+ while (pgm_time_after_eq (now, state->timer_expiry)) {
+ state->timer_expiry += sock->nak_rpt_ivl;
+ state->ncf_retry_count++;
+ }
+#else
+ state->timer_expiry = now + sock->nak_rpt_ivl;
+#endif
+ pgm_timer_lock (sock);
+ if (pgm_time_after (sock->next_poll, state->timer_expiry))
+ sock->next_poll = state->timer_expiry;
+ pgm_timer_unlock (sock);
+ }
+ else
+ { /* different transmission group */
+ break;
+ }
+ }
+ else
+ { /* packet expires some time later */
+ break;
+ }
+
+ list = next_list_el;
+ }
+
+ if (nak_pkt_cnt && !send_parity_nak (sock, peer, nak_tg_sqn, nak_pkt_cnt))
+ return FALSE;
+ }
+ else
+ {
+
+/* select NAK generation */
+
+ while (list)
+ {
+ pgm_list_t* next_list_el = list->prev;
+ struct pgm_sk_buff_t* skb = (struct pgm_sk_buff_t*)list;
+ pgm_rxw_state_t* state = (pgm_rxw_state_t*)&skb->cb;
+
+/* check this packet for state expiration */
+ if (pgm_time_after_eq(now, state->timer_expiry))
+ {
+ if (PGM_UNLIKELY(!is_valid_nla)) {
+ dropped_invalid++;
+ pgm_rxw_lost (window, skb->sequence);
+/* mark receiver window for flushing on next recv() */
+ pgm_peer_set_pending (sock, peer);
+ list = next_list_el;
+ continue;
+ }
+
+ pgm_rxw_state (window, skb, PGM_PKT_STATE_WAIT_NCF);
+ nak_list.sqn[nak_list.len++] = skb->sequence;
+ state->nak_transmit_count++;
+
+/* we have two options here, calculate the expiry time in the new state relative to the current
+ * state execution time, skipping missed expirations due to delay in state processing, or base
+ * from the actual current time.
+ */
+#ifdef PGM_ABSOLUTE_EXPIRY
+ state->timer_expiry += sock->nak_rpt_ivl;
+ while (pgm_time_after_eq(now, state->timer_expiry)){
+ state->timer_expiry += sock->nak_rpt_ivl;
+ state->ncf_retry_count++;
+ }
+#else
+ state->timer_expiry = now + sock->nak_rpt_ivl;
+pgm_trace(PGM_LOG_ROLE_NETWORK,_("nak_rpt_expiry in %f seconds."),
+ pgm_to_secsf( state->timer_expiry - now ) );
+#endif
+ pgm_timer_lock (sock);
+ if (pgm_time_after (sock->next_poll, state->timer_expiry))
+ sock->next_poll = state->timer_expiry;
+ pgm_timer_unlock (sock);
+
+ if (nak_list.len == PGM_N_ELEMENTS(nak_list.sqn)) {
+ if (sock->can_send_nak && !send_nak_list (sock, peer, &nak_list))
+ return FALSE;
+ nak_list.len = 0;
+ }
+ }
+ else
+ { /* packet expires some time later */
+ break;
+ }
+
+ list = next_list_el;
+ }
+
+ if (sock->can_send_nak && nak_list.len)
+ {
+ if (nak_list.len > 1 && !send_nak_list (sock, peer, &nak_list))
+ return FALSE;
+ else if (!send_nak (sock, peer, nak_list.sqn[0]))
+ return FALSE;
+ }
+
+ }
+
+ if (PGM_UNLIKELY(dropped_invalid))
+ {
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Dropped %u messages due to invalid NLA."), dropped_invalid);
+
+/* mark receiver window for flushing on next recv() */
+ if (window->cumulative_losses != peer->last_cumulative_losses &&
+ !peer->pending_link.data)
+ {
+ sock->is_reset = TRUE;
+ peer->lost_count = window->cumulative_losses - peer->last_cumulative_losses;
+ peer->last_cumulative_losses = window->cumulative_losses;
+ pgm_peer_set_pending (sock, peer);
+ }
+ }
+
+ if (window->nak_backoff_queue.length == 0)
+ {
+ pgm_assert ((struct rxw_packet*)window->nak_backoff_queue.head == NULL);
+ pgm_assert ((struct rxw_packet*)window->nak_backoff_queue.tail == NULL);
+ }
+ else
+ {
+ pgm_assert ((struct rxw_packet*)window->nak_backoff_queue.head != NULL);
+ pgm_assert ((struct rxw_packet*)window->nak_backoff_queue.tail != NULL);
+ }
+
+ if (window->nak_backoff_queue.tail)
+ {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Next expiry set in %f seconds."),
+ pgm_to_secsf(next_nak_rb_expiry(window) - now));
+ }
+ else
+ {
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("NAK backoff queue empty."));
+ }
+ return TRUE;
+}
+
+/* check this peer for NAK state timers, uses the tail of each queue for the nearest
+ * timer execution.
+ *
+ * returns TRUE on complete sweep, returns FALSE if operation would block.
+ */
+
+bool
+pgm_check_peer_state (
+ pgm_sock_t* sock,
+ const pgm_time_t now
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+
+ pgm_debug ("pgm_check_peer_state (sock:%p now:%" PGM_TIME_FORMAT ")",
+ (const void*)sock, now);
+
+ if (!sock->peers_list)
+ return TRUE;
+
+ pgm_list_t* list = sock->peers_list;
+ do {
+ pgm_list_t* next = list->next;
+ pgm_peer_t* peer = list->data;
+ pgm_rxw_t* window = peer->window;
+
+ if (peer->spmr_expiry)
+ {
+ if (pgm_time_after_eq (now, peer->spmr_expiry))
+ {
+ if (sock->can_send_nak) {
+ if (!send_spmr (sock, peer)) {
+ return FALSE;
+ }
+ peer->spmr_tstamp = now;
+ }
+ peer->spmr_expiry = 0;
+ }
+ }
+
+ if (window->ack_backoff_queue.tail)
+ {
+ if (pgm_time_after_eq (now, next_ack_rb_expiry (window)))
+ if (!ack_rb_state (peer, now)) {
+ return FALSE;
+ }
+ }
+
+ if (window->nak_backoff_queue.tail)
+ {
+ if (pgm_time_after_eq (now, next_nak_rb_expiry (window)))
+ if (!nak_rb_state (peer, now)) {
+ return FALSE;
+ }
+ }
+
+ if (window->wait_ncf_queue.tail)
+ {
+ if (pgm_time_after_eq (now, next_nak_rpt_expiry (window)))
+ nak_rpt_state (peer, now);
+ }
+
+ if (window->wait_data_queue.tail)
+ {
+ if (pgm_time_after_eq (now, next_nak_rdata_expiry (window)))
+ nak_rdata_state (peer, now);
+ }
+
+/* expired, remove from hash table and linked list */
+ if (pgm_time_after_eq (now, peer->expiry))
+ {
+ if (peer->pending_link.data)
+ {
+ pgm_trace (PGM_LOG_ROLE_SESSION,_("Peer expiration postponed due to committing data, tsi %s"), pgm_tsi_print (&peer->tsi));
+ peer->expiry += sock->peer_expiry;
+ }
+ else if (window->committed_count)
+ {
+ pgm_trace (PGM_LOG_ROLE_SESSION,_("Peer expiration postponed due to committed data, tsi %s"), pgm_tsi_print (&peer->tsi));
+ peer->expiry += sock->peer_expiry;
+ }
+ else
+ {
+ pgm_trace (PGM_LOG_ROLE_SESSION,_("Peer expired, tsi %s"), pgm_tsi_print (&peer->tsi));
+ pgm_hashtable_remove (sock->peers_hashtable, &peer->tsi);
+ sock->peers_list = pgm_list_remove_link (sock->peers_list, &peer->peers_link);
+ if (sock->last_hash_value == peer)
+ sock->last_hash_value = NULL;
+ pgm_peer_unref (peer);
+ }
+ }
+
+ list = next;
+ } while (list);
+
+/* check for waiting contiguous packets */
+ if (sock->peers_pending && !sock->is_pending_read)
+ {
+ pgm_debug ("prod rx thread");
+ pgm_notify_send (&sock->pending_notify);
+ sock->is_pending_read = TRUE;
+ }
+ return TRUE;
+}
+
+/* find the next state expiration time among the socks peers.
+ *
+ * on success, returns the earliest of the expiration parameter or next
+ * peer expiration time.
+ */
+
+pgm_time_t
+pgm_min_receiver_expiry (
+ pgm_time_t expiration, /* absolute time */
+ pgm_sock_t* sock
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+
+ pgm_debug ("pgm_min_receiver_expiry (expiration:%" PGM_TIME_FORMAT " sock:%p)",
+ expiration, (const void*)sock);
+
+ if (!sock->peers_list)
+ return expiration;
+
+ pgm_list_t* list = sock->peers_list;
+ do {
+ pgm_list_t* next = list->next;
+ pgm_peer_t* peer = (pgm_peer_t*)list->data;
+ pgm_rxw_t* window = peer->window;
+
+ if (peer->spmr_expiry)
+ {
+ if (pgm_time_after_eq (expiration, peer->spmr_expiry))
+ expiration = peer->spmr_expiry;
+ }
+
+ if (window->ack_backoff_queue.tail)
+ {
+ if (pgm_time_after_eq (expiration, next_ack_rb_expiry (window)))
+ expiration = next_ack_rb_expiry (window);
+ }
+
+ if (window->nak_backoff_queue.tail)
+ {
+ if (pgm_time_after_eq (expiration, next_nak_rb_expiry (window)))
+ expiration = next_nak_rb_expiry (window);
+ }
+
+ if (window->wait_ncf_queue.tail)
+ {
+ if (pgm_time_after_eq (expiration, next_nak_rpt_expiry (window)))
+ expiration = next_nak_rpt_expiry (window);
+ }
+
+ if (window->wait_data_queue.tail)
+ {
+ if (pgm_time_after_eq (expiration, next_nak_rdata_expiry (window)))
+ expiration = next_nak_rdata_expiry (window);
+ }
+
+ list = next;
+ } while (list);
+
+ return expiration;
+}
+
+/* check WAIT_NCF_STATE, on expiration move back to BACK-OFF_STATE, on exceeding NAK_NCF_RETRIES
+ * cancel the sequence number.
+ */
+static
+void
+nak_rpt_state (
+ pgm_peer_t* peer,
+ const pgm_time_t now
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != peer);
+
+ pgm_debug ("nak_rpt_state (peer:%p now:%" PGM_TIME_FORMAT ")",
+ (void*)peer, now);
+
+ pgm_rxw_t* window = peer->window;
+ pgm_sock_t* sock = peer->sock;
+ pgm_list_t* list = window->wait_ncf_queue.tail;
+
+ unsigned dropped_invalid = 0;
+ unsigned dropped = 0;
+
+/* have not learned this peers NLA */
+ const bool is_valid_nla = 0 != peer->nla.ss_family;
+
+ while (list)
+ {
+ pgm_list_t* next_list_el = list->prev;
+ struct pgm_sk_buff_t* skb = (struct pgm_sk_buff_t*)list;
+ pgm_rxw_state_t* state = (pgm_rxw_state_t*)&skb->cb;
+
+/* check this packet for state expiration */
+ if (pgm_time_after_eq (now, state->timer_expiry))
+ {
+ if (PGM_UNLIKELY(!is_valid_nla)) {
+ dropped_invalid++;
+ pgm_rxw_lost (window, skb->sequence);
+/* mark receiver window for flushing on next recv() */
+ pgm_peer_set_pending (sock, peer);
+ list = next_list_el;
+ continue;
+ }
+
+ if (++state->ncf_retry_count >= sock->nak_ncf_retries)
+ {
+ dropped++;
+ cancel_skb (sock, peer, skb, now);
+ peer->cumulative_stats[PGM_PC_RECEIVER_NAKS_FAILED_NCF_RETRIES_EXCEEDED]++;
+ }
+ else
+ {
+/* retry */
+// state->timer_expiry += nak_rb_ivl(sock);
+ state->timer_expiry = now + nak_rb_ivl (sock);
+ pgm_rxw_state (window, skb, PGM_PKT_STATE_BACK_OFF);
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("NCF retry #%u attempt %u/%u."), skb->sequence, state->ncf_retry_count, sock->nak_ncf_retries);
+ }
+ }
+ else
+ {
+/* packet expires some time later */
+ pgm_trace(PGM_LOG_ROLE_RX_WINDOW,_("NCF retry #%u is delayed %f seconds."),
+ skb->sequence, pgm_to_secsf (state->timer_expiry - now));
+ break;
+ }
+
+ list = next_list_el;
+ }
+
+ if (window->wait_ncf_queue.length == 0)
+ {
+ pgm_assert ((pgm_rxw_state_t*)window->wait_ncf_queue.head == NULL);
+ pgm_assert ((pgm_rxw_state_t*)window->wait_ncf_queue.tail == NULL);
+ }
+ else
+ {
+ pgm_assert ((pgm_rxw_state_t*)window->wait_ncf_queue.head != NULL);
+ pgm_assert ((pgm_rxw_state_t*)window->wait_ncf_queue.tail != NULL);
+ }
+
+ if (PGM_UNLIKELY(dropped_invalid)) {
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Dropped %u messages due to invalid NLA."), dropped_invalid);
+ }
+
+ if (PGM_UNLIKELY(dropped)) {
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Dropped %u messages due to ncf cancellation, "
+ "rxw_sqns %" PRIu32
+ " bo %" PRIu32
+ " ncf %" PRIu32
+ " wd %" PRIu32
+ " lost %" PRIu32
+ " frag %" PRIu32),
+ dropped,
+ pgm_rxw_length (window),
+ window->nak_backoff_queue.length,
+ window->wait_ncf_queue.length,
+ window->wait_data_queue.length,
+ window->lost_count,
+ window->fragment_count);
+ }
+
+/* mark receiver window for flushing on next recv() */
+ if (PGM_UNLIKELY(window->cumulative_losses != peer->last_cumulative_losses &&
+ !peer->pending_link.data))
+ {
+ sock->is_reset = TRUE;
+ peer->lost_count = window->cumulative_losses - peer->last_cumulative_losses;
+ peer->last_cumulative_losses = window->cumulative_losses;
+ pgm_peer_set_pending (sock, peer);
+ }
+
+ if (window->wait_ncf_queue.tail)
+ {
+ if (next_nak_rpt_expiry (window) > now)
+ {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Next expiry set in %f seconds."), pgm_to_secsf (next_nak_rpt_expiry (window) - now));
+ } else {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Next expiry set in -%f seconds."), pgm_to_secsf (now - next_nak_rpt_expiry (window)));
+ }
+ }
+ else
+ {
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Wait ncf queue empty."));
+ }
+}
+
+/* check WAIT_DATA_STATE, on expiration move back to BACK-OFF_STATE, on exceeding NAK_DATA_RETRIES
+ * canel the sequence number.
+ */
+static
+void
+nak_rdata_state (
+ pgm_peer_t* peer,
+ const pgm_time_t now
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != peer);
+
+ pgm_debug ("nak_rdata_state (peer:%p now:%" PGM_TIME_FORMAT ")",
+ (const void*)peer, now);
+
+ pgm_rxw_t* window = peer->window;
+ pgm_sock_t* sock = peer->sock;
+ pgm_list_t* list = window->wait_data_queue.tail;
+
+ unsigned dropped_invalid = 0;
+ unsigned dropped = 0;
+
+/* have not learned this peers NLA */
+ const bool is_valid_nla = 0 != peer->nla.ss_family;
+
+ while (list)
+ {
+ pgm_list_t* next_list_el = list->prev;
+ struct pgm_sk_buff_t* rdata_skb = (struct pgm_sk_buff_t*)list;
+ pgm_assert (NULL != rdata_skb);
+ pgm_rxw_state_t* rdata_state = (pgm_rxw_state_t*)&rdata_skb->cb;
+
+/* check this packet for state expiration */
+ if (pgm_time_after_eq (now, rdata_state->timer_expiry))
+ {
+ if (PGM_UNLIKELY(!is_valid_nla)) {
+ dropped_invalid++;
+ pgm_rxw_lost (window, rdata_skb->sequence);
+/* mark receiver window for flushing on next recv() */
+ pgm_peer_set_pending (sock, peer);
+ list = next_list_el;
+ continue;
+ }
+
+ if (++rdata_state->data_retry_count >= sock->nak_data_retries)
+ {
+ dropped++;
+ cancel_skb (sock, peer, rdata_skb, now);
+ peer->cumulative_stats[PGM_PC_RECEIVER_NAKS_FAILED_DATA_RETRIES_EXCEEDED]++;
+ list = next_list_el;
+ continue;
+ }
+
+// rdata_state->timer_expiry += nak_rb_ivl(sock);
+ rdata_state->timer_expiry = now + nak_rb_ivl (sock);
+ pgm_rxw_state (window, rdata_skb, PGM_PKT_STATE_BACK_OFF);
+
+/* retry back to back-off state */
+ pgm_trace(PGM_LOG_ROLE_RX_WINDOW,_("Data retry #%u attempt %u/%u."), rdata_skb->sequence, rdata_state->data_retry_count, sock->nak_data_retries);
+ }
+ else
+ { /* packet expires some time later */
+ break;
+ }
+
+
+ list = next_list_el;
+ }
+
+ if (window->wait_data_queue.length == 0)
+ {
+ pgm_assert (NULL == (pgm_rxw_state_t*)window->wait_data_queue.head);
+ pgm_assert (NULL == (pgm_rxw_state_t*)window->wait_data_queue.tail);
+ }
+ else
+ {
+ pgm_assert (NULL != (pgm_rxw_state_t*)window->wait_data_queue.head);
+ pgm_assert (NULL != (pgm_rxw_state_t*)window->wait_data_queue.tail);
+ }
+
+ if (PGM_UNLIKELY(dropped_invalid)) {
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Dropped %u messages due to invalid NLA."), dropped_invalid);
+ }
+
+ if (PGM_UNLIKELY(dropped)) {
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Dropped %u messages due to data cancellation."), dropped);
+ }
+
+/* mark receiver window for flushing on next recv() */
+ if (PGM_UNLIKELY(window->cumulative_losses != peer->last_cumulative_losses &&
+ !peer->pending_link.data))
+ {
+ sock->is_reset = TRUE;
+ peer->lost_count = window->cumulative_losses - peer->last_cumulative_losses;
+ peer->last_cumulative_losses = window->cumulative_losses;
+ pgm_peer_set_pending (sock, peer);
+ }
+
+ if (window->wait_data_queue.tail) {
+ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Next expiry set in %f seconds."), pgm_to_secsf (next_nak_rdata_expiry (window) - now));
+ } else {
+ pgm_trace (PGM_LOG_ROLE_RX_WINDOW,_("Wait data queue empty."));
+ }
+}
+
+/* ODATA or RDATA packet with any of the following options:
+ *
+ * OPT_FRAGMENT - this TPDU part of a larger APDU.
+ *
+ * Ownership of skb is taken and must be passed to the receive window or destroyed.
+ *
+ * returns TRUE is skb has been replaced, FALSE is remains unchanged and can be recycled.
+ */
+
+bool
+pgm_on_data (
+ pgm_sock_t* const restrict sock,
+ pgm_peer_t* const restrict source,
+ struct pgm_sk_buff_t* const restrict skb
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+ pgm_assert (NULL != source);
+ pgm_assert (NULL != skb);
+
+ pgm_debug ("pgm_on_data (sock:%p source:%p skb:%p)",
+ (void*)sock, (void*)source, (void*)skb);
+
+ unsigned msg_count = 0;
+ const pgm_time_t nak_rb_expiry = skb->tstamp + nak_rb_ivl (sock);
+ pgm_time_t ack_rb_expiry = 0;
+ const unsigned tsdu_length = ntohs (skb->pgm_header->pgm_tsdu_length);
+
+ skb->pgm_data = skb->data;
+
+ const unsigned opt_total_length = (skb->pgm_header->pgm_options & PGM_OPT_PRESENT) ? ntohs(*(uint16_t*)( (char*)( skb->pgm_data + 1 ) + sizeof(uint16_t))) : 0;
+
+/* advance data pointer to payload */
+ pgm_skb_pull (skb, sizeof(struct pgm_data) + opt_total_length);
+
+ if (opt_total_length > 0 && /* there are options */
+ get_pgm_options (skb) && /* valid options */
+ sock->use_pgmcc && /* PGMCC is enabled */
+ NULL != skb->pgm_opt_pgmcc_data && /* PGMCC options */
+ 0 == source->ack_rb_expiry) /* not partaking in a current election */
+ {
+ ack_rb_expiry = skb->tstamp + ack_rb_ivl (sock);
+ }
+
+ const int add_status = pgm_rxw_add (source->window, skb, skb->tstamp, nak_rb_expiry);
+
+/* skb reference is now invalid */
+ bool flush_naks = FALSE;
+
+ switch (add_status) {
+ case PGM_RXW_MISSING:
+ flush_naks = TRUE;
+/* fall through */
+ case PGM_RXW_INSERTED:
+ case PGM_RXW_APPENDED:
+ msg_count++;
+ break;
+
+ case PGM_RXW_DUPLICATE:
+ source->cumulative_stats[PGM_PC_RECEIVER_DUP_DATAS]++;
+ goto discarded;
+
+ case PGM_RXW_MALFORMED:
+ source->cumulative_stats[PGM_PC_RECEIVER_MALFORMED_ODATA]++;
+/* fall through */
+ case PGM_RXW_BOUNDS:
+discarded:
+ return FALSE;
+
+ default: pgm_assert_not_reached(); break;
+ }
+
+/* valid data */
+ PGM_HISTOGRAM_COUNTS("Rx.DataBytesReceived", tsdu_length);
+ source->cumulative_stats[PGM_PC_RECEIVER_DATA_BYTES_RECEIVED] += tsdu_length;
+ source->cumulative_stats[PGM_PC_RECEIVER_DATA_MSGS_RECEIVED] += msg_count;
+
+/* congestion control */
+ if (0 != ack_rb_expiry)
+ {
+/* save source timestamp and local timestamp for RTT calculation */
+ source->ack_last_tstamp = ntohl (skb->pgm_opt_pgmcc_data->opt_tstamp);
+ source->last_data_tstamp = skb->tstamp;
+ if (_pgm_is_acker (source, skb))
+ {
+ if (PGM_UNLIKELY(pgm_sockaddr_is_addr_unspecified ((struct sockaddr*)&source->nla)))
+ {
+ pgm_trace (PGM_LOG_ROLE_CONGESTION_CONTROL,_("Unable to send ACK due to unknown NLA."));
+ }
+ else if (PGM_UNLIKELY(!send_ack (sock, source, skb->tstamp)))
+ {
+ pgm_debug ("send_ack failed");
+ }
+ ack_rb_expiry = 0;
+ }
+ else if (_pgm_is_acker_election (skb))
+ {
+ pgm_trace (PGM_LOG_ROLE_CONGESTION_CONTROL,_("ACKer election."));
+ _pgm_add_ack (source, ack_rb_expiry);
+ }
+ else if (0 != source->window->ack_backoff_queue.length)
+ {
+/* purge ACK backoff queue as host is not elected ACKer */
+ _pgm_remove_ack (source);
+ ack_rb_expiry = 0;
+ }
+ else
+ {
+/* no election, not the elected ACKer, no outstanding ACKs */
+ ack_rb_expiry = 0;
+ }
+ }
+
+ if (flush_naks || 0 != ack_rb_expiry) {
+/* flush out 1st time nak packets */
+ pgm_timer_lock (sock);
+ if (flush_naks && pgm_time_after (sock->next_poll, nak_rb_expiry))
+ sock->next_poll = nak_rb_expiry;
+ if (0 != ack_rb_expiry && pgm_time_after (sock->next_poll, ack_rb_expiry))
+ sock->next_poll = ack_rb_expiry;
+ pgm_timer_unlock (sock);
+ }
+ return TRUE;
+}
+
+/* POLLs are generated by PGM Parents (Sources or Network Elements).
+ *
+ * returns TRUE on valid packet, FALSE on invalid packet.
+ */
+
+bool
+pgm_on_poll (
+ pgm_sock_t* const restrict sock,
+ pgm_peer_t* const restrict source,
+ struct pgm_sk_buff_t* const restrict skb
+ )
+{
+/* pre-conditions */
+ pgm_assert (NULL != sock);
+ pgm_assert (NULL != source);
+ pgm_assert (NULL != skb);
+
+ pgm_debug ("pgm_on_poll (sock:%p source:%p skb:%p)",
+ (void*)sock, (void*)source, (void*)skb);
+
+ if (PGM_UNLIKELY(!pgm_verify_poll (skb))) {
+ pgm_trace(PGM_LOG_ROLE_NETWORK,_("Discarded invalid POLL."));
+ return FALSE;
+ }
+
+ struct pgm_poll* poll4 = (struct pgm_poll*) skb->data;
+ struct pgm_poll6* poll6 = (struct pgm_poll6*)skb->data;
+ uint32_t poll_rand;
+ memcpy (&poll_rand, (AFI_IP6 == ntohs (poll4->poll_nla_afi)) ? poll6->poll6_rand : poll4->poll_rand, sizeof(poll_rand));
+ const uint32_t poll_mask = (AFI_IP6 == ntohs (poll4->poll_nla_afi)) ? ntohl (poll6->poll6_mask) : ntohl (poll4->poll_mask);
+
+/* Check for probability match */
+ if (poll_mask &&
+ (sock->rand_node_id & poll_mask) != poll_rand)
+ {
+/* discard early */
+ return FALSE;
+ }
+
+/* scoped per path nla
+ * TODO: manage list of pollers per peer
+ */
+ const uint32_t poll_sqn = ntohl (poll4->poll_sqn);
+ const uint16_t poll_round = ntohs (poll4->poll_round);
+
+/* Check for new poll round */
+ if (poll_round &&
+ poll_sqn != source->last_poll_sqn)
+ {
+ return FALSE;
+ }
+
+/* save sequence and round of valid poll */
+ source->last_poll_sqn = poll_sqn;
+ source->last_poll_round = poll_round;
+
+ const uint16_t poll_s_type = ntohs (poll4->poll_s_type);
+
+/* Check poll type */
+ switch (poll_s_type) {
+ case PGM_POLL_GENERAL:
+ return on_general_poll (sock, source, skb);
+
+ case PGM_POLL_DLR:
+ return on_dlr_poll (sock, source, skb);
+
+ default:
+/* unknown sub-type, discard */
+ break;
+ }
+
+ return FALSE;
+}
+
+/* Used to count PGM children */
+
+static
+bool
+on_general_poll (
+ pgm_sock_t* const restrict sock,
+ pgm_peer_t* const restrict source,
+ struct pgm_sk_buff_t* const restrict skb
+ )
+{
+ struct pgm_poll* poll4 = (struct pgm_poll*) skb->data;
+ struct pgm_poll6* poll6 = (struct pgm_poll6*)skb->data;
+
+/* TODO: cancel any pending poll-response */
+
+/* defer response based on provided back-off interval */
+ const uint32_t poll_bo_ivl = (AFI_IP6 == ntohs (poll4->poll_nla_afi)) ? ntohl (poll6->poll6_bo_ivl) : ntohl (poll4->poll_bo_ivl);
+ source->polr_expiry = skb->tstamp + pgm_rand_int_range (&sock->rand_, 0, poll_bo_ivl);
+ pgm_nla_to_sockaddr (&poll4->poll_nla_afi, (struct sockaddr*)&source->poll_nla);
+/* TODO: schedule poll-response */
+
+ return TRUE;
+}
+
+/* Used to count off-tree DLRs */
+
+static
+bool
+on_dlr_poll (
+ PGM_GNUC_UNUSED pgm_sock_t* const restrict sock,
+ PGM_GNUC_UNUSED pgm_peer_t* const restrict source,
+ PGM_GNUC_UNUSED struct pgm_sk_buff_t* const restrict skb
+ )
+{
+/* we are not a DLR */
+ return FALSE;
+}
+
+/* eof */