--- source.c 2010-08-05 11:41:13.000000000 +0800 +++ source.c89 2010-08-05 11:41:04.000000000 +0800 @@ -124,11 +124,13 @@ ) { pgm_return_val_if_fail (NULL != sock, FALSE); + { const bool status = pgm_txw_retransmit_push (sock->window, nak_tg_sqn | sock->rs_proactive_h, TRUE /* is_parity */, sock->tg_sqn_shift); return status; + } } /* a deferred request for RDATA, now processing in the timer thread, we check the transmit @@ -159,6 +161,7 @@ * has been retransmitted. */ pgm_spinlock_lock (&sock->txw_spinlock); + { struct pgm_sk_buff_t* skb = pgm_txw_retransmit_try_peek (sock->window); if (skb) { skb = pgm_skb_get (skb); @@ -174,6 +177,7 @@ } else pgm_spinlock_unlock (&sock->txw_spinlock); return TRUE; + } } /* SPMR indicates if multicast to cancel own SPMR, or unicast to send SPM. @@ -233,10 +237,11 @@ pgm_assert (NULL != skb); pgm_assert (NULL != opt_pgmcc_feedback); + { const uint32_t opt_tstamp = ntohl (opt_pgmcc_feedback->opt_tstamp); const uint16_t opt_loss_rate = ntohs (opt_pgmcc_feedback->opt_loss_rate); - const uint32_t rtt = pgm_to_msecs (skb->tstamp) - opt_tstamp; + const uint32_t rtt = (uint32_t)(pgm_to_msecs (skb->tstamp) - opt_tstamp); const uint64_t peer_loss = rtt * rtt * opt_loss_rate; struct sockaddr_storage peer_nla; @@ -263,6 +268,7 @@ } return FALSE; + } } /* NAK requesting RDATA transmission for a sending sock, only valid if @@ -290,6 +296,7 @@ pgm_debug ("pgm_on_nak (sock:%p skb:%p)", (const void*)sock, (const void*)skb); + { const bool is_parity = skb->pgm_header->pgm_options & PGM_OPT_PARITY; if (is_parity) { sock->cumulative_stats[PGM_PC_SOURCE_PARITY_NAKS_RECEIVED]++; @@ -307,6 +314,7 @@ return FALSE; } + { const struct pgm_nak* nak = (struct pgm_nak*) skb->data; const struct pgm_nak6* nak6 = (struct pgm_nak6*)skb->data; @@ -323,6 +331,7 @@ } /* NAK_GRP_NLA containers our sock multicast group */ + { struct sockaddr_storage nak_grp_nla; pgm_nla_to_sockaddr ((AF_INET6 == nak_src_nla.ss_family) ? &nak6->nak6_grp_nla_afi : &nak->nak_grp_nla_afi, (struct sockaddr*)&nak_grp_nla); if (PGM_UNLIKELY(pgm_sockaddr_cmp ((struct sockaddr*)&nak_grp_nla, (struct sockaddr*)&sock->send_gsr.gsr_group) != 0)) @@ -335,6 +344,7 @@ } /* create queue object */ + { struct pgm_sqn_list_t sqn_list; sqn_list.sqn[0] = ntohl (nak->nak_sqn); sqn_list.len = 1; @@ -342,6 +352,7 @@ pgm_debug ("nak_sqn %" PRIu32, sqn_list.sqn[0]); /* check NAK list */ + { const uint32_t* nak_list = NULL; uint_fast8_t nak_list_len = 0; if (skb->pgm_header->pgm_options & PGM_OPT_PRESENT) @@ -360,6 +371,7 @@ return FALSE; } /* TODO: check for > 16 options & past packet end */ + { const struct pgm_opt_header* opt_header = (const struct pgm_opt_header*)opt_len; do { opt_header = (const struct pgm_opt_header*)((const char*)opt_header + opt_header->opt_length); @@ -369,6 +381,7 @@ break; } } while (!(opt_header->opt_type & PGM_OPT_END)); + } } /* nak list numbers */ @@ -376,12 +389,15 @@ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Malformed NAK rejected on too long sequence list.")); return FALSE; } - - for (uint_fast8_t i = 0; i < nak_list_len; i++) + + { + uint_fast8_t i; + for (i = 0; i < nak_list_len; i++) { sqn_list.sqn[sqn_list.len++] = ntohl (*nak_list); nak_list++; } + } /* send NAK confirm packet immediately, then defer to timer thread for a.s.a.p * delivery of the actual RDATA packets. blocking send for NCF is ignored as RDATA @@ -393,13 +409,21 @@ send_ncf (sock, (struct sockaddr*)&nak_src_nla, (struct sockaddr*)&nak_grp_nla, sqn_list.sqn[0], is_parity); /* queue retransmit requests */ - for (uint_fast8_t i = 0; i < sqn_list.len; i++) { + { + uint_fast8_t i; + for (i = 0; i < sqn_list.len; i++) { const bool push_status = pgm_txw_retransmit_push (sock->window, sqn_list.sqn[i], is_parity, sock->tg_sqn_shift); if (PGM_UNLIKELY(!push_status)) { pgm_trace (PGM_LOG_ROLE_TX_WINDOW,_("Failed to push retransmit request for #%" PRIu32), sqn_list.sqn[i]); } } + } return TRUE; + } + } + } + } + } } /* Null-NAK, or N-NAK propogated by a DLR for hand waving excitement @@ -427,6 +451,7 @@ return FALSE; } + { const struct pgm_nak* nnak = (struct pgm_nak*) skb->data; const struct pgm_nak6* nnak6 = (struct pgm_nak6*)skb->data; @@ -441,6 +466,7 @@ } /* NAK_GRP_NLA containers our sock multicast group */ + { struct sockaddr_storage nnak_grp_nla; pgm_nla_to_sockaddr ((AF_INET6 == nnak_src_nla.ss_family) ? &nnak6->nak6_grp_nla_afi : &nnak->nak_grp_nla_afi, (struct sockaddr*)&nnak_grp_nla); if (PGM_UNLIKELY(pgm_sockaddr_cmp ((struct sockaddr*)&nnak_grp_nla, (struct sockaddr*)&sock->send_gsr.gsr_group) != 0)) @@ -450,6 +476,7 @@ } /* check NNAK list */ + { uint_fast8_t nnak_list_len = 0; if (skb->pgm_header->pgm_options & PGM_OPT_PRESENT) { @@ -465,6 +492,7 @@ return FALSE; } /* TODO: check for > 16 options & past packet end */ + { const struct pgm_opt_header* opt_header = (const struct pgm_opt_header*)opt_len; do { opt_header = (const struct pgm_opt_header*)((const char*)opt_header + opt_header->opt_length); @@ -473,10 +501,14 @@ break; } } while (!(opt_header->opt_type & PGM_OPT_END)); + } } sock->cumulative_stats[PGM_PC_SOURCE_SELECTIVE_NNAKS_RECEIVED] += 1 + nnak_list_len; return TRUE; + } + } + } } /* ACK, sent upstream by one selected ACKER for congestion control feedback. @@ -507,6 +539,7 @@ if (!sock->use_pgmcc) return FALSE; + { const struct pgm_ack* ack = (struct pgm_ack*)skb->data; bool is_acker = FALSE; @@ -522,6 +555,7 @@ pgm_trace (PGM_LOG_ROLE_NETWORK,_("Malformed ACK rejected.")); return FALSE; } + { const struct pgm_opt_header* opt_header = (const struct pgm_opt_header*)opt_len; do { opt_header = (const struct pgm_opt_header*)((const char*)opt_header + opt_header->opt_length); @@ -531,6 +565,7 @@ break; /* ignore other options */ } } while (!(opt_header->opt_type & PGM_OPT_END)); + } } /* ignore ACKs from other receivers or sessions */ @@ -541,22 +576,26 @@ sock->next_crqst = 0; /* count new ACK sequences */ + { const uint32_t ack_rx_max = ntohl (ack->ack_rx_max); const int32_t delta = ack_rx_max - sock->ack_rx_max; /* ignore older ACKs when multiple active ACKers */ if (pgm_uint32_gt (ack_rx_max, sock->ack_rx_max)) sock->ack_rx_max = ack_rx_max; + { uint32_t ack_bitmap = ntohl (ack->ack_bitmap); if (delta > 32) sock->ack_bitmap = 0; /* sequence jump ahead beyond past bitmap */ else if (delta > 0) sock->ack_bitmap <<= delta; /* immediate sequence */ else if (delta > -32) ack_bitmap <<= -delta; /* repair sequence scoped by bitmap */ else ack_bitmap = 0; /* old sequence */ + { unsigned new_acks = _pgm_popcount (ack_bitmap & ~sock->ack_bitmap); sock->ack_bitmap |= ack_bitmap; if (0 == new_acks) return TRUE; + { const bool is_congestion_limited = (sock->tokens < pgm_fp8 (1)); /* after loss detection cancel any further manipulation of the window @@ -568,14 +607,17 @@ { pgm_trace (PGM_LOG_ROLE_CONGESTION_CONTROL,_("PGMCC window token manipulation suspended due to congestion (T:%u W:%u)"), pgm_fp8tou (sock->tokens), pgm_fp8tou (sock->cwnd_size)); + { const uint_fast32_t token_inc = pgm_fp8mul (pgm_fp8 (new_acks), pgm_fp8 (1) + pgm_fp8div (pgm_fp8 (1), sock->cwnd_size)); sock->tokens = MIN( sock->tokens + token_inc, sock->cwnd_size ); + } goto notify_tx; } sock->is_congested = FALSE; } /* count outstanding lost sequences */ + { const unsigned total_lost = _pgm_popcount (~sock->ack_bitmap); /* no detected data loss at ACKer, increase congestion window size */ @@ -583,6 +625,7 @@ { new_acks += sock->acks_after_loss; sock->acks_after_loss = 0; + { uint_fast32_t n = pgm_fp8 (new_acks); uint_fast32_t token_inc = 0; @@ -594,6 +637,7 @@ sock->cwnd_size += d; } + { const uint_fast32_t iw = pgm_fp8div (pgm_fp8 (1), sock->cwnd_size); /* linear window increase */ @@ -602,6 +646,8 @@ sock->tokens = MIN( sock->tokens + token_inc, sock->cwnd_size ); // pgm_trace (PGM_LOG_ROLE_CONGESTION_CONTROL,_("PGMCC++ (T:%u W:%u)"), // pgm_fp8tou (sock->tokens), pgm_fp8tou (sock->cwnd_size)); + } + } } else { @@ -636,6 +682,12 @@ pgm_notify_send (&sock->ack_notify); } return TRUE; + } + } + } + } + } + } } /* ambient/heartbeat SPM's @@ -658,6 +710,7 @@ pgm_debug ("pgm_send_spm (sock:%p flags:%d)", (const void*)sock, flags); + { size_t tpdu_length = sizeof(struct pgm_header); if (AF_INET == sock->send_gsr.gsr_group.ss_family) tpdu_length += sizeof(struct pgm_spm); @@ -683,9 +736,11 @@ tpdu_length += sizeof(struct pgm_opt_header) + sizeof(struct pgm_opt_fin); } - char buf[ tpdu_length ]; + { + char* buf = pgm_newa (char, tpdu_length); if (PGM_UNLIKELY(pgm_mem_gc_friendly)) memset (buf, 0, tpdu_length); + { struct pgm_header* header = (struct pgm_header*)buf; struct pgm_spm* spm = (struct pgm_spm *)(header + 1); struct pgm_spm6* spm6 = (struct pgm_spm6*)(header + 1); @@ -734,12 +789,14 @@ sizeof(struct pgm_opt_parity_prm); opt_header->opt_type = PGM_OPT_PARITY_PRM; opt_header->opt_length = sizeof(struct pgm_opt_header) + sizeof(struct pgm_opt_parity_prm); + { struct pgm_opt_parity_prm* opt_parity_prm = (struct pgm_opt_parity_prm*)(opt_header + 1); opt_parity_prm->opt_reserved = (sock->use_proactive_parity ? PGM_PARITY_PRM_PRO : 0) | (sock->use_ondemand_parity ? PGM_PARITY_PRM_OND : 0); opt_parity_prm->parity_prm_tgs = htonl (sock->rs_k); last_opt_header = opt_header; opt_header = (struct pgm_opt_header*)(opt_parity_prm + 1); + } } /* OPT_CRQST */ @@ -750,12 +807,14 @@ sizeof(struct pgm_opt_crqst); opt_header->opt_type = PGM_OPT_CRQST; opt_header->opt_length = sizeof(struct pgm_opt_header) + sizeof(struct pgm_opt_crqst); + { struct pgm_opt_crqst* opt_crqst = (struct pgm_opt_crqst*)(opt_header + 1); /* request receiver worst path report, OPT_CR_RX_WP */ opt_crqst->opt_reserved = PGM_OPT_CRQST_RXP; sock->is_pending_crqst = FALSE; last_opt_header = opt_header; opt_header = (struct pgm_opt_header*)(opt_crqst + 1); + } } /* OPT_FIN */ @@ -765,10 +824,12 @@ sizeof(struct pgm_opt_fin); opt_header->opt_type = PGM_OPT_FIN; opt_header->opt_length = sizeof(struct pgm_opt_header) + sizeof(struct pgm_opt_fin); + { struct pgm_opt_fin* opt_fin = (struct pgm_opt_fin*)(opt_header + 1); opt_fin->opt_reserved = 0; last_opt_header = opt_header; opt_header = (struct pgm_opt_header*)(opt_fin + 1); + } } last_opt_header->opt_type |= PGM_OPT_END; @@ -779,6 +840,7 @@ header->pgm_checksum = 0; header->pgm_checksum = pgm_csum_fold (pgm_csum_partial (buf, tpdu_length, 0)); + { const ssize_t sent = pgm_sendto (sock, flags != PGM_OPT_SYN && sock->is_controlled_spm, /* rate limited */ TRUE, /* with router alert */ @@ -794,6 +856,10 @@ sock->spm_sqn++; pgm_atomic_add32 (&sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT], tpdu_length); return TRUE; + } + } + } + } } /* send a NAK confirm (NCF) message with provided sequence number list. @@ -818,6 +884,7 @@ pgm_assert (nak_src_nla->sa_family == nak_grp_nla->sa_family); #ifdef SOURCE_DEBUG + { char saddr[INET6_ADDRSTRLEN], gaddr[INET6_ADDRSTRLEN]; pgm_sockaddr_ntop (nak_src_nla, saddr, sizeof(saddr)); pgm_sockaddr_ntop (nak_grp_nla, gaddr, sizeof(gaddr)); @@ -828,11 +895,14 @@ sequence, is_parity ? "TRUE": "FALSE" ); + } #endif + { size_t tpdu_length = sizeof(struct pgm_header); tpdu_length += (AF_INET == nak_src_nla->sa_family) ? sizeof(struct pgm_nak) : sizeof(struct pgm_nak6); - char buf[ tpdu_length ]; + { + char* buf = pgm_newa (char, tpdu_length); struct pgm_header* header = (struct pgm_header*)buf; struct pgm_nak* ncf = (struct pgm_nak *)(header + 1); struct pgm_nak6* ncf6 = (struct pgm_nak6*)(header + 1); @@ -854,6 +924,7 @@ header->pgm_checksum = 0; header->pgm_checksum = pgm_csum_fold (pgm_csum_partial (buf, tpdu_length, 0)); + { const ssize_t sent = pgm_sendto (sock, FALSE, /* not rate limited */ TRUE, /* with router alert */ @@ -865,6 +936,9 @@ return FALSE; pgm_atomic_add32 (&sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT], tpdu_length); return TRUE; + } + } + } } /* A NCF packet with a OPT_NAK_LIST option extension @@ -891,16 +965,20 @@ pgm_assert (nak_src_nla->sa_family == nak_grp_nla->sa_family); #ifdef SOURCE_DEBUG + { char saddr[INET6_ADDRSTRLEN], gaddr[INET6_ADDRSTRLEN]; char list[1024]; pgm_sockaddr_ntop (nak_src_nla, saddr, sizeof(saddr)); pgm_sockaddr_ntop (nak_grp_nla, gaddr, sizeof(gaddr)); sprintf (list, "%" PRIu32, sqn_list->sqn[0]); - for (uint_fast8_t i = 1; i < sqn_list->len; i++) { + { + uint_fast8_t i; + for (i = 1; i < sqn_list->len; i++) { char sequence[ 2 + strlen("4294967295") ]; sprintf (sequence, " %" PRIu32, sqn_list->sqn[i]); strcat (list, sequence); } + } pgm_debug ("send_ncf_list (sock:%p nak-src-nla:%s nak-grp-nla:%s sqn-list:[%s] is-parity:%s)", (void*)sock, saddr, @@ -908,14 +986,17 @@ list, is_parity ? "TRUE": "FALSE" ); + } #endif + { size_t tpdu_length = sizeof(struct pgm_header) + sizeof(struct pgm_opt_length) + /* includes header */ sizeof(struct pgm_opt_header) + sizeof(struct pgm_opt_nak_list) + ( (sqn_list->len-1) * sizeof(uint32_t) ); tpdu_length += (AF_INET == nak_src_nla->sa_family) ? sizeof(struct pgm_nak) : sizeof(struct pgm_nak6); - char buf[ tpdu_length ]; + { + char* buf = pgm_newa (char, tpdu_length); struct pgm_header* header = (struct pgm_header*)buf; struct pgm_nak* ncf = (struct pgm_nak *)(header + 1); struct pgm_nak6* ncf6 = (struct pgm_nak6*)(header + 1); @@ -935,6 +1016,7 @@ pgm_sockaddr_to_nla (nak_grp_nla, (AF_INET6 == nak_src_nla->sa_family) ? (char*)&ncf6->nak6_grp_nla_afi : (char*)&ncf->nak_grp_nla_afi ); /* OPT_NAK_LIST */ + { struct pgm_opt_length* opt_len = (AF_INET6 == nak_src_nla->sa_family) ? (struct pgm_opt_length*)(ncf6 + 1) : (struct pgm_opt_length*)(ncf + 1); opt_len->opt_type = PGM_OPT_LENGTH; opt_len->opt_length = sizeof(struct pgm_opt_length); @@ -942,20 +1024,26 @@ sizeof(struct pgm_opt_header) + sizeof(struct pgm_opt_nak_list) + ( (sqn_list->len-1) * sizeof(uint32_t) ) ); + { struct pgm_opt_header* opt_header = (struct pgm_opt_header*)(opt_len + 1); opt_header->opt_type = PGM_OPT_NAK_LIST | PGM_OPT_END; opt_header->opt_length = sizeof(struct pgm_opt_header) + sizeof(struct pgm_opt_nak_list) + ( (sqn_list->len-1) * sizeof(uint32_t) ); + { struct pgm_opt_nak_list* opt_nak_list = (struct pgm_opt_nak_list*)(opt_header + 1); opt_nak_list->opt_reserved = 0; /* to network-order */ - for (uint_fast8_t i = 1; i < sqn_list->len; i++) + { + uint_fast8_t i; + for (i = 1; i < sqn_list->len; i++) opt_nak_list->opt_sqn[i-1] = htonl (sqn_list->sqn[i]); + } header->pgm_checksum = 0; header->pgm_checksum = pgm_csum_fold (pgm_csum_partial (buf, tpdu_length, 0)); + { const ssize_t sent = pgm_sendto (sock, FALSE, /* not rate limited */ TRUE, /* with router alert */ @@ -967,6 +1055,12 @@ return FALSE; pgm_atomic_add32 (&sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT], tpdu_length); return TRUE; + } + } + } + } + } + } } /* cancel any pending heartbeat SPM and schedule a new one @@ -980,6 +1074,7 @@ ) { pgm_mutex_lock (&sock->timer_mutex); + { const pgm_time_t next_poll = sock->next_poll; const pgm_time_t spm_heartbeat_interval = sock->spm_heartbeat_interval[ sock->spm_heartbeat_state = 1 ]; sock->next_heartbeat_spm = now + spm_heartbeat_interval; @@ -992,6 +1087,7 @@ } } pgm_mutex_unlock (&sock->timer_mutex); + } } /* state helper for resuming sends @@ -1027,6 +1123,7 @@ pgm_debug ("send_odata (sock:%p skb:%p bytes-written:%p)", (void*)sock, (void*)skb, (void*)bytes_written); + { const uint16_t tsdu_length = skb->len; const sa_family_t pgmcc_family = sock->use_pgmcc ? sock->family : 0; const size_t tpdu_length = tsdu_length + pgm_pkt_offset (FALSE, pgmcc_family); @@ -1056,6 +1153,7 @@ STATE(skb)->pgm_data->data_trail = htonl (pgm_txw_trail(sock->window)); STATE(skb)->pgm_header->pgm_checksum = 0; + { void* data = STATE(skb)->pgm_data + 1; if (sock->use_pgmcc) { struct pgm_opt_length* opt_len = data; @@ -1066,23 +1164,28 @@ ((AF_INET6 == sock->acker_nla.ss_family) ? sizeof(struct pgm_opt6_pgmcc_data) : sizeof(struct pgm_opt_pgmcc_data)) ); + { struct pgm_opt_header* opt_header = (struct pgm_opt_header*)(opt_len + 1); opt_header->opt_type = PGM_OPT_PGMCC_DATA | PGM_OPT_END; opt_header->opt_length = sizeof(struct pgm_opt_header) + ((AF_INET6 == sock->acker_nla.ss_family) ? sizeof(struct pgm_opt6_pgmcc_data) : sizeof(struct pgm_opt_pgmcc_data)); + { struct pgm_opt_pgmcc_data* pgmcc_data = (struct pgm_opt_pgmcc_data*)(opt_header + 1); struct pgm_opt6_pgmcc_data* pgmcc_data6 = (struct pgm_opt6_pgmcc_data*)(opt_header + 1); - pgmcc_data->opt_tstamp = htonl (pgm_to_msecs (STATE(skb)->tstamp)); + pgmcc_data->opt_tstamp = htonl ((unsigned long)pgm_to_msecs (STATE(skb)->tstamp)); /* acker nla */ pgm_sockaddr_to_nla ((struct sockaddr*)&sock->acker_nla, (char*)&pgmcc_data->opt_nla_afi); if (AF_INET6 == sock->acker_nla.ss_family) data = (char*)pgmcc_data6 + sizeof(struct pgm_opt6_pgmcc_data); else data = (char*)pgmcc_data + sizeof(struct pgm_opt_pgmcc_data); + } + } } + { const size_t pgm_header_len = (char*)data - (char*)STATE(skb)->pgm_header; const uint32_t unfolded_header = pgm_csum_partial (STATE(skb)->pgm_header, pgm_header_len, 0); STATE(unfolded_odata) = pgm_csum_partial (data, tsdu_length, 0); @@ -1097,6 +1200,7 @@ * attempt to send a repair-data packet based on in transit original data. */ + { ssize_t sent; retry_send: @@ -1157,6 +1261,10 @@ if (bytes_written) *bytes_written = tsdu_length; return PGM_IO_STATUS_NORMAL; + } + } + } + } } /* send one PGM original data packet, callee owned memory. @@ -1183,6 +1291,7 @@ pgm_debug ("send_odata_copy (sock:%p tsdu:%p tsdu_length:%u bytes-written:%p)", (void*)sock, tsdu, tsdu_length, (void*)bytes_written); + { const sa_family_t pgmcc_family = sock->use_pgmcc ? sock->family : 0; const size_t tpdu_length = tsdu_length + pgm_pkt_offset (FALSE, pgmcc_family); @@ -1212,6 +1321,7 @@ STATE(skb)->pgm_data->data_trail = htonl (pgm_txw_trail(sock->window)); STATE(skb)->pgm_header->pgm_checksum = 0; + { void* data = STATE(skb)->pgm_data + 1; if (sock->use_pgmcc) { struct pgm_opt_length* opt_len = data; @@ -1222,21 +1332,26 @@ ((AF_INET6 == sock->acker_nla.ss_family) ? sizeof(struct pgm_opt6_pgmcc_data) : sizeof(struct pgm_opt_pgmcc_data)) ); + { struct pgm_opt_header* opt_header = (struct pgm_opt_header*)(opt_len + 1); opt_header->opt_type = PGM_OPT_PGMCC_DATA | PGM_OPT_END; opt_header->opt_length = sizeof(struct pgm_opt_header) + ((AF_INET6 == sock->acker_nla.ss_family) ? sizeof(struct pgm_opt6_pgmcc_data) : sizeof(struct pgm_opt_pgmcc_data)); + { struct pgm_opt_pgmcc_data* pgmcc_data = (struct pgm_opt_pgmcc_data*)(opt_header + 1); struct pgm_opt6_pgmcc_data* pgmcc_data6 = (struct pgm_opt6_pgmcc_data*)(opt_header + 1); pgmcc_data->opt_reserved = 0; - pgmcc_data->opt_tstamp = htonl (pgm_to_msecs (STATE(skb)->tstamp)); + pgmcc_data->opt_tstamp = htonl ((unsigned long)pgm_to_msecs (STATE(skb)->tstamp)); /* acker nla */ pgm_sockaddr_to_nla ((struct sockaddr*)&sock->acker_nla, (char*)&pgmcc_data->opt_nla_afi); data = (char*)opt_header + opt_header->opt_length; + } + } } + { const size_t pgm_header_len = (char*)data - (char*)STATE(skb)->pgm_header; const uint32_t unfolded_header = pgm_csum_partial (STATE(skb)->pgm_header, pgm_header_len, 0); STATE(unfolded_odata) = pgm_csum_partial_copy (tsdu, data, tsdu_length, 0); @@ -1247,6 +1362,7 @@ pgm_txw_add (sock->window, STATE(skb)); pgm_spinlock_unlock (&sock->txw_spinlock); + { ssize_t sent; retry_send: @@ -1309,6 +1425,10 @@ if (bytes_written) *bytes_written = tsdu_length; return PGM_IO_STATUS_NORMAL; + } + } + } + } } /* send one PGM original data packet, callee owned scatter/gather io vector @@ -1347,7 +1467,9 @@ goto retry_send; STATE(tsdu_length) = 0; - for (unsigned i = 0; i < count; i++) + { + unsigned i; + for (i = 0; i < count; i++) { #ifdef TRANSPORT_DEBUG if (PGM_LIKELY(vector[i].iov_len)) { @@ -1356,11 +1478,13 @@ #endif STATE(tsdu_length) += vector[i].iov_len; } + } pgm_return_val_if_fail (STATE(tsdu_length) <= sock->max_tsdu, PGM_IO_STATUS_ERROR); STATE(skb) = pgm_alloc_skb (sock->max_tpdu); STATE(skb)->sock = sock; STATE(skb)->tstamp = pgm_time_update_now(); + { const sa_family_t pgmcc_family = sock->use_pgmcc ? sock->family : 0; pgm_skb_reserve (STATE(skb), pgm_pkt_offset (FALSE, pgmcc_family)); pgm_skb_put (STATE(skb), STATE(tsdu_length)); @@ -1379,18 +1503,24 @@ STATE(skb)->pgm_data->data_trail = htonl (pgm_txw_trail(sock->window)); STATE(skb)->pgm_header->pgm_checksum = 0; + { const size_t pgm_header_len = (char*)(STATE(skb)->pgm_data + 1) - (char*)STATE(skb)->pgm_header; const uint32_t unfolded_header = pgm_csum_partial (STATE(skb)->pgm_header, pgm_header_len, 0); /* unroll first iteration to make friendly branch prediction */ char* dst = (char*)(STATE(skb)->pgm_data + 1); - STATE(unfolded_odata) = pgm_csum_partial_copy ((const char*)vector[0].iov_base, dst, vector[0].iov_len, 0); + STATE(unfolded_odata) = pgm_csum_partial_copy ((const char*)vector[0].iov_base, dst, (uint16_t)vector[0].iov_len, 0); /* iterate over one or more vector elements to perform scatter/gather checksum & copy */ - for (unsigned i = 1; i < count; i++) { + { + unsigned i; + for (i = 1; i < count; i++) { dst += vector[i-1].iov_len; - const uint32_t unfolded_element = pgm_csum_partial_copy ((const char*)vector[i].iov_base, dst, vector[i].iov_len, 0); - STATE(unfolded_odata) = pgm_csum_block_add (STATE(unfolded_odata), unfolded_element, vector[i-1].iov_len); + { + const uint32_t unfolded_element = pgm_csum_partial_copy ((const char*)vector[i].iov_base, dst, (uint16_t)vector[i].iov_len, 0); + STATE(unfolded_odata) = pgm_csum_block_add (STATE(unfolded_odata), unfolded_element, (uint16_t)vector[i-1].iov_len); + } + } } STATE(skb)->pgm_header->pgm_checksum = pgm_csum_fold (pgm_csum_block_add (unfolded_header, STATE(unfolded_odata), pgm_header_len)); @@ -1400,6 +1530,7 @@ pgm_txw_add (sock->window, STATE(skb)); pgm_spinlock_unlock (&sock->txw_spinlock); + { ssize_t sent; size_t tpdu_length; retry_send: @@ -1447,6 +1578,9 @@ if (bytes_written) *bytes_written = STATE(tsdu_length); return PGM_IO_STATUS_NORMAL; + } + } + } } /* send PGM original data, callee owned memory. if larger than maximum TPDU @@ -1530,6 +1664,7 @@ STATE(skb)->pgm_data->data_trail = htonl (pgm_txw_trail(sock->window)); /* OPT_LENGTH */ + { struct pgm_opt_length* opt_len = (struct pgm_opt_length*)(STATE(skb)->pgm_data + 1); opt_len->opt_type = PGM_OPT_LENGTH; opt_len->opt_length = sizeof(struct pgm_opt_length); @@ -1537,6 +1672,7 @@ sizeof(struct pgm_opt_header) + sizeof(struct pgm_opt_fragment) ); /* OPT_FRAGMENT */ + { struct pgm_opt_header* opt_header = (struct pgm_opt_header*)(opt_len + 1); opt_header->opt_type = PGM_OPT_FRAGMENT | PGM_OPT_END; opt_header->opt_length = sizeof(struct pgm_opt_header) + @@ -1549,6 +1685,7 @@ /* TODO: the assembly checksum & copy routine is faster than memcpy & pgm_cksum on >= opteron hardware */ STATE(skb)->pgm_header->pgm_checksum = 0; + { const size_t pgm_header_len = (char*)(STATE(skb)->pgm_opt_fragment + 1) - (char*)STATE(skb)->pgm_header; const uint32_t unfolded_header = pgm_csum_partial (STATE(skb)->pgm_header, pgm_header_len, 0); STATE(unfolded_odata) = pgm_csum_partial_copy ((const char*)apdu + STATE(data_bytes_offset), STATE(skb)->pgm_opt_fragment + 1, STATE(tsdu_length), 0); @@ -1559,6 +1696,7 @@ pgm_txw_add (sock->window, STATE(skb)); pgm_spinlock_unlock (&sock->txw_spinlock); + { ssize_t sent; size_t tpdu_length; retry_send: @@ -1596,6 +1734,10 @@ pgm_schedule_proactive_nak (sock, odata_sqn & tg_sqn_mask); } + } + } + } + } } while ( STATE(data_bytes_offset) < apdu_length); pgm_assert( STATE(data_bytes_offset) == apdu_length ); @@ -1638,7 +1780,7 @@ size_t* restrict bytes_written ) { - pgm_debug ("pgm_send (sock:%p apdu:%p apdu-length:%zu bytes-written:%p)", + pgm_debug ("pgm_send (sock:%p apdu:%p apdu-length:%lu bytes-written:%p)", (void*)sock, apdu, apdu_length, (void*)bytes_written); /* parameters */ @@ -1670,10 +1812,12 @@ return status; } + { const int status = send_apdu (sock, apdu, apdu_length, bytes_written); pgm_mutex_unlock (&sock->source_mutex); pgm_rwlock_reader_unlock (&sock->lock); return status; + } } /* send PGM original data, callee owned scatter/gather IO vector. if larger than maximum TPDU @@ -1735,6 +1879,7 @@ return status; } + { size_t bytes_sent = 0; unsigned packets_sent = 0; size_t data_bytes_sent = 0; @@ -1759,7 +1904,9 @@ /* calculate (total) APDU length */ STATE(apdu_length) = 0; - for (unsigned i = 0; i < count; i++) + { + unsigned i; + for (i = 0; i < count; i++) { #ifdef TRANSPORT_DEBUG if (PGM_LIKELY(vector[i].iov_len)) { @@ -1775,6 +1922,7 @@ } STATE(apdu_length) += vector[i].iov_len; } + } /* pass on non-fragment calls */ if (is_one_apdu) { @@ -1885,6 +2033,7 @@ STATE(skb)->pgm_data->data_trail = htonl (pgm_txw_trail(sock->window)); /* OPT_LENGTH */ + { struct pgm_opt_length* opt_len = (struct pgm_opt_length*)(STATE(skb)->pgm_data + 1); opt_len->opt_type = PGM_OPT_LENGTH; opt_len->opt_length = sizeof(struct pgm_opt_length); @@ -1892,6 +2041,7 @@ sizeof(struct pgm_opt_header) + sizeof(struct pgm_opt_fragment) ); /* OPT_FRAGMENT */ + { struct pgm_opt_header* opt_header = (struct pgm_opt_header*)(opt_len + 1); opt_header->opt_type = PGM_OPT_FRAGMENT | PGM_OPT_END; opt_header->opt_length = sizeof(struct pgm_opt_header) + @@ -1904,6 +2054,7 @@ /* checksum & copy */ STATE(skb)->pgm_header->pgm_checksum = 0; + { const size_t pgm_header_len = (char*)(STATE(skb)->pgm_opt_fragment + 1) - (char*)STATE(skb)->pgm_header; const uint32_t unfolded_header = pgm_csum_partial (STATE(skb)->pgm_header, pgm_header_len, 0); @@ -1941,8 +2092,10 @@ dst += copy_length; src_length = vector[STATE(vector_index)].iov_len - STATE(vector_offset); copy_length = MIN( STATE(tsdu_length) - dst_length, src_length ); + { const uint32_t unfolded_element = pgm_csum_partial_copy (src, dst, copy_length, 0); STATE(unfolded_odata) = pgm_csum_block_add (STATE(unfolded_odata), unfolded_element, dst_length); + } } STATE(skb)->pgm_header->pgm_checksum = pgm_csum_fold (pgm_csum_block_add (unfolded_header, STATE(unfolded_odata), pgm_header_len)); @@ -1952,6 +2105,7 @@ pgm_txw_add (sock->window, STATE(skb)); pgm_spinlock_unlock (&sock->txw_spinlock); + { ssize_t sent; size_t tpdu_length; retry_one_apdu_send: @@ -1988,6 +2142,10 @@ pgm_schedule_proactive_nak (sock, odata_sqn & tg_sqn_mask); } + } + } + } + } } while ( STATE(data_bytes_offset) < STATE(apdu_length) ); pgm_assert( STATE(data_bytes_offset) == STATE(apdu_length) ); @@ -2018,6 +2176,7 @@ return PGM_IO_STATUS_WOULD_BLOCK; } return PGM_IO_STATUS_RATE_LIMITED; + } } /* send PGM original data, transmit window owned scatter/gather IO vector. @@ -2077,6 +2236,7 @@ return status; } + { size_t bytes_sent = 0; unsigned packets_sent = 0; size_t data_bytes_sent = 0; @@ -2090,8 +2250,11 @@ if (sock->is_nonblocking && sock->is_controlled_odata) { size_t total_tpdu_length = 0; - for (unsigned i = 0; i < count; i++) + { + unsigned i; + for (i = 0; i < count; i++) total_tpdu_length += sock->iphdr_len + pgm_pkt_offset (is_one_apdu, pgmcc_family) + vector[i]->len; + } /* calculation includes one iphdr length already */ if (!pgm_rate_check (&sock->rate_control, @@ -2110,7 +2273,9 @@ { STATE(apdu_length) = 0; STATE(first_sqn) = pgm_txw_next_lead(sock->window); - for (unsigned i = 0; i < count; i++) + { + unsigned i; + for (i = 0; i < count; i++) { if (PGM_UNLIKELY(vector[i]->len > sock->max_tsdu_fragment)) { pgm_mutex_unlock (&sock->source_mutex); @@ -2119,6 +2284,7 @@ } STATE(apdu_length) += vector[i]->len; } + } if (PGM_UNLIKELY(STATE(apdu_length) > sock->max_apdu)) { pgm_mutex_unlock (&sock->source_mutex); pgm_rwlock_reader_unlock (&sock->lock); @@ -2157,6 +2323,7 @@ sizeof(struct pgm_opt_header) + sizeof(struct pgm_opt_fragment) ); /* OPT_FRAGMENT */ + { struct pgm_opt_header* opt_header = (struct pgm_opt_header*)(opt_len + 1); opt_header->opt_type = PGM_OPT_FRAGMENT | PGM_OPT_END; opt_header->opt_length = sizeof(struct pgm_opt_header) + @@ -2168,6 +2335,7 @@ STATE(skb)->pgm_opt_fragment->opt_frag_len = htonl (STATE(apdu_length)); pgm_assert (STATE(skb)->data == (STATE(skb)->pgm_opt_fragment + 1)); + } } else { @@ -2177,6 +2345,7 @@ /* TODO: the assembly checksum & copy routine is faster than memcpy & pgm_cksum on >= opteron hardware */ STATE(skb)->pgm_header->pgm_checksum = 0; pgm_assert ((char*)STATE(skb)->data > (char*)STATE(skb)->pgm_header); + { const size_t pgm_header_len = (char*)STATE(skb)->data - (char*)STATE(skb)->pgm_header; const uint32_t unfolded_header = pgm_csum_partial (STATE(skb)->pgm_header, pgm_header_len, 0); STATE(unfolded_odata) = pgm_csum_partial ((char*)STATE(skb)->data, STATE(tsdu_length), 0); @@ -2186,6 +2355,7 @@ pgm_spinlock_lock (&sock->txw_spinlock); pgm_txw_add (sock->window, STATE(skb)); pgm_spinlock_unlock (&sock->txw_spinlock); + { ssize_t sent; size_t tpdu_length; retry_send: @@ -2223,6 +2393,8 @@ if (!((odata_sqn + 1) & ~tg_sqn_mask)) pgm_schedule_proactive_nak (sock, odata_sqn & tg_sqn_mask); } + } + } } #ifdef TRANSPORT_DEBUG @@ -2259,6 +2431,7 @@ return PGM_IO_STATUS_WOULD_BLOCK; } return PGM_IO_STATUS_RATE_LIMITED; + } } /* cleanup resuming send state helper @@ -2282,6 +2455,7 @@ pgm_assert (NULL != skb); pgm_assert ((char*)skb->tail > (char*)skb->head); + { const size_t tpdu_length = (char*)skb->tail - (char*)skb->head; /* update previous odata/rdata contents */ @@ -2292,6 +2466,7 @@ rdata->data_trail = htonl (pgm_txw_trail(sock->window)); header->pgm_checksum = 0; + { const size_t pgm_header_len = tpdu_length - ntohs(header->pgm_tsdu_length); uint32_t unfolded_header = pgm_csum_partial (header, pgm_header_len, 0); uint32_t unfolded_odata = pgm_txw_get_unfolded_checksum (skb); @@ -2306,6 +2481,7 @@ return FALSE; } + { const ssize_t sent = pgm_sendto (sock, sock->is_controlled_rdata, /* rate limited */ TRUE, /* with router alert */ @@ -2319,6 +2495,7 @@ return FALSE; } + { const pgm_time_t now = pgm_time_update_now(); if (sock->use_pgmcc) { @@ -2338,6 +2515,10 @@ sock->cumulative_stats[PGM_PC_SOURCE_SELECTIVE_MSGS_RETRANSMITTED]++; /* impossible to determine APDU count */ pgm_atomic_add32 (&sock->cumulative_stats[PGM_PC_SOURCE_BYTES_SENT], tpdu_length + sock->iphdr_len); return TRUE; + } + } + } + } } /* eof */