diff options
Diffstat (limited to '3rdparty/openpgm-svn-r1085/pgm/socket.c')
-rw-r--r-- | 3rdparty/openpgm-svn-r1085/pgm/socket.c | 2046 |
1 files changed, 2046 insertions, 0 deletions
diff --git a/3rdparty/openpgm-svn-r1085/pgm/socket.c b/3rdparty/openpgm-svn-r1085/pgm/socket.c new file mode 100644 index 0000000..1338085 --- /dev/null +++ b/3rdparty/openpgm-svn-r1085/pgm/socket.c @@ -0,0 +1,2046 @@ +/* vim:ts=8:sts=8:sw=4:noai:noexpandtab + * + * PGM socket: manage incoming & outgoing sockets with ambient SPMs, + * transmit & receive windows. + * + * Copyright (c) 2006-2010 Miru Limited. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include <errno.h> +#ifdef CONFIG_HAVE_POLL +# include <poll.h> +#endif +#ifdef CONFIG_HAVE_EPOLL +# include <sys/epoll.h> +#endif +#include <stdio.h> +#include <impl/i18n.h> +#include <impl/framework.h> +#include <impl/socket.h> +#include <impl/receiver.h> +#include <impl/source.h> +#include <impl/timer.h> + + +//#define SOCK_DEBUG +//#define SOCK_SPM_DEBUG + + +/* global locals */ +pgm_rwlock_t pgm_sock_list_lock; /* list of all sockets for admin interfaces */ +pgm_slist_t* pgm_sock_list = NULL; + + +static const char* pgm_family_string (const int) PGM_GNUC_CONST; +static const char* pgm_sock_type_string (const int) PGM_GNUC_CONST; +static const char* pgm_protocol_string (const int) PGM_GNUC_CONST; + + +size_t +pgm_pkt_offset ( + bool can_fragment, + sa_family_t pgmcc_family /* 0 = disable */ + ) +{ + static const size_t data_size = sizeof(struct pgm_header) + sizeof(struct pgm_data); + size_t pkt_size = data_size; + if (can_fragment || 0 != pgmcc_family) + pkt_size += sizeof(struct pgm_opt_length) + sizeof(struct pgm_opt_header); + if (can_fragment) + pkt_size += sizeof(struct pgm_opt_fragment); + if (AF_INET == pgmcc_family) + pkt_size += sizeof(struct pgm_opt_pgmcc_data); + else if (AF_INET6 == pgmcc_family) + pkt_size += sizeof(struct pgm_opt6_pgmcc_data); + return pkt_size; +} + +/* destroy a pgm_sock object and contents, if last sock also destroy + * associated event loop + * + * outstanding locks: + * 1) pgm_sock_t::lock + * 2) pgm_sock_t::receiver_mutex + * 3) pgm_sock_t::source_mutex + * 4) pgm_sock_t::txw_spinlock + * 5) pgm_sock_t::timer_mutex + * + * If application calls a function on the sock after destroy() it is a + * programmer error: segv likely to occur on unlock. + * + * on success, returns TRUE, on failure returns FALSE. + */ + +bool +pgm_close ( + pgm_sock_t* sock, + bool flush + ) +{ + pgm_return_val_if_fail (sock != NULL, FALSE); + if (!pgm_rwlock_reader_trylock (&sock->lock)) + pgm_return_val_if_reached (FALSE); + pgm_return_val_if_fail (!sock->is_destroyed, FALSE); + pgm_debug ("pgm_sock_destroy (sock:%p flush:%s)", + (const void*)sock, + flush ? "TRUE":"FALSE"); +/* flag existing calls */ + sock->is_destroyed = TRUE; +/* cancel running blocking operations */ + if (PGM_INVALID_SOCKET != sock->recv_sock) { + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Closing receive socket.")); + pgm_closesocket (sock->recv_sock); + sock->recv_sock = PGM_INVALID_SOCKET; + } + if (PGM_INVALID_SOCKET != sock->send_sock) { + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Closing send socket.")); + pgm_closesocket (sock->send_sock); + sock->send_sock = PGM_INVALID_SOCKET; + } + pgm_rwlock_reader_unlock (&sock->lock); + pgm_debug ("blocking on destroy lock ..."); + pgm_rwlock_writer_lock (&sock->lock); + + pgm_debug ("removing sock from inventory."); + pgm_rwlock_writer_lock (&pgm_sock_list_lock); + pgm_sock_list = pgm_slist_remove (pgm_sock_list, sock); + pgm_rwlock_writer_unlock (&pgm_sock_list_lock); + +/* flush source side by sending heartbeat SPMs */ + if (sock->can_send_data && + sock->is_connected && + flush) + { + pgm_trace (PGM_LOG_ROLE_TX_WINDOW,_("Flushing PGM source with session finish option broadcast SPMs.")); + if (!pgm_send_spm (sock, PGM_OPT_FIN) || + !pgm_send_spm (sock, PGM_OPT_FIN) || + !pgm_send_spm (sock, PGM_OPT_FIN)) + { + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Failed to send flushing SPMs.")); + } + } + + if (sock->peers_hashtable) { + pgm_debug ("destroying peer lookup table."); + pgm_hashtable_destroy (sock->peers_hashtable); + sock->peers_hashtable = NULL; + } + if (sock->peers_list) { + pgm_debug ("destroying peer list."); + do { + pgm_list_t* next = sock->peers_list->next; + pgm_peer_unref ((pgm_peer_t*)sock->peers_list->data); + + sock->peers_list = next; + } while (sock->peers_list); + } + + if (sock->window) { + pgm_trace (PGM_LOG_ROLE_TX_WINDOW,_("Destroying transmit window.")); + pgm_txw_shutdown (sock->window); + sock->window = NULL; + } + pgm_trace (PGM_LOG_ROLE_RATE_CONTROL,_("Destroying rate control.")); + pgm_rate_destroy (&sock->rate_control); + if (PGM_INVALID_SOCKET != sock->send_with_router_alert_sock) { + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Closing send with router alert socket.")); + pgm_closesocket (sock->send_with_router_alert_sock); + sock->send_with_router_alert_sock = PGM_INVALID_SOCKET; + } + if (sock->spm_heartbeat_interval) { + pgm_debug ("freeing SPM heartbeat interval data."); + pgm_free (sock->spm_heartbeat_interval); + sock->spm_heartbeat_interval = NULL; + } + if (sock->rx_buffer) { + pgm_debug ("freeing receive buffer."); + pgm_free_skb (sock->rx_buffer); + sock->rx_buffer = NULL; + } + pgm_debug ("destroying notification channels."); + if (sock->can_send_data) { + if (sock->use_pgmcc) { + pgm_notify_destroy (&sock->ack_notify); + } + pgm_notify_destroy (&sock->rdata_notify); + } + pgm_notify_destroy (&sock->pending_notify); + pgm_debug ("freeing sock locks."); + pgm_rwlock_free (&sock->peers_lock); + pgm_spinlock_free (&sock->txw_spinlock); + pgm_mutex_free (&sock->send_mutex); + pgm_mutex_free (&sock->timer_mutex); + pgm_mutex_free (&sock->source_mutex); + pgm_mutex_free (&sock->receiver_mutex); + pgm_rwlock_writer_unlock (&sock->lock); + pgm_rwlock_free (&sock->lock); + pgm_debug ("freeing sock data."); + pgm_free (sock); + pgm_debug ("finished."); + return TRUE; +} + +/* Create a pgm_sock object. Create sockets that require superuser + * priviledges. If interface ports are specified then UDP encapsulation will + * be used instead of raw protocol. + * + * If send == recv only two sockets need to be created iff ip headers are not + * required (IPv6). + * + * All receiver addresses must be the same family. + * interface and multiaddr must be the same family. + * family cannot be AF_UNSPEC! + * + * returns TRUE on success, or FALSE on error and sets error appropriately. + */ + +#if ( AF_INET != PF_INET ) || ( AF_INET6 != PF_INET6 ) +#error AF_INET and PF_INET are different values, the bananas are jumping in their pyjamas! +#endif + +bool +pgm_socket ( + pgm_sock_t** restrict sock, + const sa_family_t family, /* communications domain */ + const int pgm_sock_type, + const int protocol, + pgm_error_t** restrict error + ) +{ + pgm_sock_t* new_sock; + + pgm_return_val_if_fail (NULL != sock, FALSE); + pgm_return_val_if_fail (AF_INET == family || AF_INET6 == family, FALSE); + pgm_return_val_if_fail (SOCK_SEQPACKET == pgm_sock_type, FALSE); + pgm_return_val_if_fail (IPPROTO_UDP == protocol || IPPROTO_PGM == protocol, FALSE); + + pgm_debug ("socket (sock:%p family:%s sock-type:%s protocol:%s error:%p)", + (const void*)sock, pgm_family_string(family), pgm_sock_type_string(pgm_sock_type), pgm_protocol_string(protocol), (const void*)error); + + new_sock = pgm_new0 (pgm_sock_t, 1); + new_sock->family = family; + new_sock->socket_type = pgm_sock_type; + new_sock->protocol = protocol; + new_sock->can_send_data = TRUE; + new_sock->can_send_nak = TRUE; + new_sock->can_recv_data = TRUE; + new_sock->dport = DEFAULT_DATA_DESTINATION_PORT; + new_sock->tsi.sport = DEFAULT_DATA_SOURCE_PORT; + new_sock->adv_mode = 0; /* advance with time */ + +/* PGMCC */ + new_sock->acker_nla.ss_family = family; + +/* source-side */ + pgm_mutex_init (&new_sock->source_mutex); +/* transmit window */ + pgm_spinlock_init (&new_sock->txw_spinlock); +/* send socket */ + pgm_mutex_init (&new_sock->send_mutex); +/* next timer & spm expiration */ + pgm_mutex_init (&new_sock->timer_mutex); +/* receiver-side */ + pgm_mutex_init (&new_sock->receiver_mutex); +/* peer hash map & list lock */ + pgm_rwlock_init (&new_sock->peers_lock); +/* destroy lock */ + pgm_rwlock_init (&new_sock->lock); + +/* open sockets to implement PGM */ + int socket_type; + if (IPPROTO_UDP == new_sock->protocol) { + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Opening UDP encapsulated sockets.")); + socket_type = SOCK_DGRAM; + new_sock->udp_encap_ucast_port = DEFAULT_UDP_ENCAP_UCAST_PORT; + new_sock->udp_encap_mcast_port = DEFAULT_UDP_ENCAP_MCAST_PORT; + } else { + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Opening raw sockets.")); + socket_type = SOCK_RAW; + } + + if ((new_sock->recv_sock = socket (new_sock->family, + socket_type, + new_sock->protocol)) == PGM_INVALID_SOCKET) + { + const int save_errno = pgm_sock_errno(); + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + pgm_error_from_sock_errno (save_errno), + _("Creating receive socket: %s"), + pgm_sock_strerror (save_errno)); +#ifndef _WIN32 + if (EPERM == save_errno) { + pgm_error (_("PGM protocol requires CAP_NET_RAW capability, e.g. sudo execcap 'cap_net_raw=ep'")); + } +#endif + goto err_destroy; + } + + if ((new_sock->send_sock = socket (new_sock->family, + socket_type, + new_sock->protocol)) == PGM_INVALID_SOCKET) + { + const int save_errno = pgm_sock_errno(); + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + pgm_error_from_sock_errno (save_errno), + _("Creating send socket: %s"), + pgm_sock_strerror (save_errno)); + goto err_destroy; + } + + if ((new_sock->send_with_router_alert_sock = socket (new_sock->family, + socket_type, + new_sock->protocol)) == PGM_INVALID_SOCKET) + { + const int save_errno = pgm_sock_errno(); + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + pgm_error_from_sock_errno (save_errno), + _("Creating IP Router Alert (RFC 2113) send socket: %s"), + pgm_sock_strerror (save_errno)); + goto err_destroy; + } + + *sock = new_sock; + + pgm_rwlock_writer_lock (&pgm_sock_list_lock); + pgm_sock_list = pgm_slist_append (pgm_sock_list, *sock); + pgm_rwlock_writer_unlock (&pgm_sock_list_lock); + pgm_debug ("PGM socket successfully created."); + return TRUE; + +err_destroy: + if (PGM_INVALID_SOCKET != new_sock->recv_sock) { + if (PGM_SOCKET_ERROR == pgm_closesocket (new_sock->recv_sock)) { + const int save_errno = pgm_sock_errno(); + pgm_warn (_("Close on receive socket failed: %s"), + pgm_sock_strerror (save_errno)); + } + new_sock->recv_sock = PGM_INVALID_SOCKET; + } + if (PGM_INVALID_SOCKET != new_sock->send_sock) { + if (PGM_SOCKET_ERROR == pgm_closesocket (new_sock->send_sock)) { + const int save_errno = pgm_sock_errno(); + pgm_warn (_("Close on send socket failed: %s"), + pgm_sock_strerror (save_errno)); + } + new_sock->send_sock = PGM_INVALID_SOCKET; + } + if (PGM_INVALID_SOCKET != new_sock->send_with_router_alert_sock) { + if (PGM_SOCKET_ERROR == pgm_closesocket (new_sock->send_with_router_alert_sock)) { + const int save_errno = pgm_sock_errno(); + pgm_warn (_("Close on IP Router Alert (RFC 2113) send socket failed: %s"), + pgm_sock_strerror (save_errno)); + } + new_sock->send_with_router_alert_sock = PGM_INVALID_SOCKET; + } + pgm_free (new_sock); + return FALSE; +} + +bool +pgm_getsockopt ( + pgm_sock_t* const restrict sock, + const int optname, + void* restrict optval, + socklen_t* restrict optlen /* required */ + ) +{ + bool status = FALSE; + pgm_return_val_if_fail (sock != NULL, status); + pgm_return_val_if_fail (optval != NULL, status); + pgm_return_val_if_fail (optlen != NULL, status); + if (PGM_UNLIKELY(!pgm_rwlock_reader_trylock (&sock->lock))) + pgm_return_val_if_reached (status); + if (PGM_UNLIKELY(sock->is_destroyed)) { + pgm_rwlock_reader_unlock (&sock->lock); + return status; + } + switch (optname) { +/* maximum TPDU size */ + case PGM_MTU: + if (PGM_UNLIKELY(*optlen != sizeof (int))) + break; + *(int*restrict)optval = sock->max_tpdu; + status = TRUE; + break; + +/* receive socket */ + case PGM_RECV_SOCK: + if (PGM_UNLIKELY(!sock->is_connected)) + break; + if (PGM_UNLIKELY(*optlen != sizeof (int))) + break; + *(int*)optval = sock->recv_sock; + status = TRUE; + break; + +/* repair socket */ + case PGM_REPAIR_SOCK: + if (PGM_UNLIKELY(!sock->is_connected)) + break; + if (PGM_UNLIKELY(*optlen != sizeof (int))) + break; + *(int*)optval = pgm_notify_get_fd (&sock->rdata_notify); + status = TRUE; + break; + +/* pending socket */ + case PGM_PENDING_SOCK: + if (PGM_UNLIKELY(!sock->is_connected)) + break; + if (PGM_UNLIKELY(*optlen != sizeof (int))) + break; + *(int*)optval = pgm_notify_get_fd (&sock->pending_notify); + status = TRUE; + break; + +/* ACK or congestion socket */ + case PGM_ACK_SOCK: + if (PGM_UNLIKELY(!sock->is_connected)) + break; + if (PGM_UNLIKELY(*optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(!sock->use_pgmcc)) + break; + *(int*)optval = pgm_notify_get_fd (&sock->ack_notify); + status = TRUE; + break; + + +/* timeout for pending timer */ + case PGM_TIME_REMAIN: + if (PGM_UNLIKELY(!sock->is_connected)) + break; + if (PGM_UNLIKELY(*optlen != sizeof (struct timeval))) + break; + { + struct timeval* tv = optval; + const pgm_time_t usecs = pgm_timer_expiration (sock); + tv->tv_sec = usecs / 1000000UL; + tv->tv_usec = usecs % 1000000UL; + } + status = TRUE; + break; + +/* timeout for blocking sends */ + case PGM_RATE_REMAIN: + if (PGM_UNLIKELY(!sock->is_connected)) + break; + if (PGM_UNLIKELY(*optlen != sizeof (struct timeval))) + break; + { + struct timeval* tv = optval; + const pgm_time_t usecs = pgm_rate_remaining (&sock->rate_control, sock->blocklen); + tv->tv_sec = usecs / 1000000UL; + tv->tv_usec = usecs % 1000000UL; + } + status = TRUE; + break; + + + } + pgm_rwlock_reader_unlock (&sock->lock); + return status; +} + +bool +pgm_setsockopt ( + pgm_sock_t* const sock, + const int optname, + const void* optval, + const socklen_t optlen + ) +{ + bool status = FALSE; + pgm_return_val_if_fail (sock != NULL, status); + if (PGM_UNLIKELY(!pgm_rwlock_reader_trylock (&sock->lock))) + pgm_return_val_if_reached (status); + if (PGM_UNLIKELY(sock->is_connected || sock->is_destroyed)) { + pgm_rwlock_reader_unlock (&sock->lock); + return status; + } + switch (optname) { + +/* RFC2113 IP Router Alert + */ + case PGM_IP_ROUTER_ALERT: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + { + const bool v = (0 != *(const int*)optval); + if (PGM_SOCKET_ERROR == pgm_sockaddr_router_alert (sock->send_with_router_alert_sock, sock->family, v)) + break; + } + status = TRUE; + break; + +/* IPv4: 68 <= tpdu < 65536 (RFC 2765) + * IPv6: 1280 <= tpdu < 65536 (RFC 2460) + */ + case PGM_MTU: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(*(const int*)optval < (sizeof(struct pgm_ip) + sizeof(struct pgm_header)))) + break; + if (PGM_UNLIKELY(*(const int*)optval > UINT16_MAX)) + break; + sock->max_tpdu = *(const int*)optval; + status = TRUE; + break; + +/* 1 = enable multicast loopback. + * 0 = default, to disable. + */ + case PGM_MULTICAST_LOOP: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + { + const bool v = (0 != *(const int*)optval); +#ifndef _WIN32 /* loop on send */ + if (PGM_SOCKET_ERROR == pgm_sockaddr_multicast_loop (sock->send_sock, sock->family, v) || + PGM_SOCKET_ERROR == pgm_sockaddr_multicast_loop (sock->send_with_router_alert_sock, sock->family, v)) + break; +#else /* loop on receive */ + if (PGM_SOCKET_ERROR == pgm_sockaddr_multicast_loop (sock->recv_sock, sock->family, v)) + break; +#endif + } + status = TRUE; + break; + +/* 0 < hops < 256, hops == -1 use kernel default (ignored). + */ + case PGM_MULTICAST_HOPS: +#ifndef CONFIG_TARGET_WINE + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(*(const int*)optval <= 0)) + break; + if (PGM_UNLIKELY(*(const int*)optval > UINT8_MAX)) + break; + { + const unsigned hops = *(const int*)optval; + if (PGM_SOCKET_ERROR == pgm_sockaddr_multicast_hops (sock->send_sock, sock->family, hops) || + PGM_SOCKET_ERROR == pgm_sockaddr_multicast_hops (sock->send_with_router_alert_sock, sock->family, hops)) + break; + } +#endif + status = TRUE; + break; + +/* IP Type of Service (ToS) or RFC 3246, differentiated services (DSCP) + */ + case PGM_TOS: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_SOCKET_ERROR == pgm_sockaddr_tos (sock->send_sock, sock->family, *(const int*)optval) || + PGM_SOCKET_ERROR == pgm_sockaddr_tos (sock->send_with_router_alert_sock, sock->family, *(const int*)optval)) + { + pgm_warn (_("ToS/DSCP setting requires CAP_NET_ADMIN or ADMIN capability.")); + break; + } + status = TRUE; + break; + +/* 0 < wmem < wmem_max (user) + * + * operating system and sysctl dependent maximum, minimum on Linux 256 (doubled). + */ + case PGM_SNDBUF: + if (PGM_SOCKET_ERROR == setsockopt (sock->send_sock, SOL_SOCKET, SO_SNDBUF, (const char*)optval, optlen) || + PGM_SOCKET_ERROR == setsockopt (sock->send_with_router_alert_sock, SOL_SOCKET, SO_SNDBUF, (const char*)optval, optlen)) + break; + status = TRUE; + break; + +/* 0 < rmem < rmem_max (user) + * + * minimum on Linux is 2048 (doubled). + */ + case PGM_RCVBUF: + if (PGM_SOCKET_ERROR == setsockopt (sock->recv_sock, SOL_SOCKET, SO_RCVBUF, (const char*)optval, optlen)) + break; + status = TRUE; + break; + +/* periodic ambient broadcast SPM interval in milliseconds. + */ + case PGM_AMBIENT_SPM: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(*(const int*)optval <= 0)) + break; + sock->spm_ambient_interval = *(const int*)optval; + status = TRUE; + break; + +/* sequence of heartbeat broadcast SPMS to flush out original + */ + case PGM_HEARTBEAT_SPM: + if (PGM_UNLIKELY(0 != optlen % sizeof (int))) + break; + { + sock->spm_heartbeat_len = optlen / sizeof (int); + sock->spm_heartbeat_interval = pgm_new (unsigned, sock->spm_heartbeat_len + 1); + sock->spm_heartbeat_interval[0] = 0; + for (unsigned i = 0; i < sock->spm_heartbeat_len; i++) + sock->spm_heartbeat_interval[i + 1] = ((const int*)optval)[i]; + } + status = TRUE; + break; + +/* size of transmit window in sequence numbers. + * 0 < txw_sqns < one less than half sequence space + */ + case PGM_TXW_SQNS: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(*(const int*)optval <= 0)) + break; + if (PGM_UNLIKELY(*(const int*)optval >= ((UINT32_MAX/2)-1))) + break; + sock->txw_sqns = *(const int*)optval; + status = TRUE; + break; + +/* size of transmit window in seconds. + * 0 < secs < ( txw_sqns / txw_max_rte ) + */ + case PGM_TXW_SECS: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(*(const int*)optval <= 0)) + break; + sock->txw_secs = *(const int*)optval; + status = TRUE; + break; + +/* maximum transmit rate. + * 0 < txw_max_rte < interface capacity + * 10mb : 1250000 + * 100mb : 12500000 + * 1gb : 125000000 + */ + case PGM_TXW_MAX_RTE: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(*(const int*)optval <= 0)) + break; + sock->txw_max_rte = *(const int*)optval; + status = TRUE; + break; + +/* timeout for peers. + * 0 < 2 * spm_ambient_interval <= peer_expiry + */ + case PGM_PEER_EXPIRY: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(*(const int*)optval <= 0)) + break; + sock->peer_expiry = *(const int*)optval; + status = TRUE; + break; + +/* maximum back off range for listening for multicast SPMR. + * 0 < spmr_expiry < spm_ambient_interval + */ + case PGM_SPMR_EXPIRY: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(*(const int*)optval <= 0)) + break; + sock->spmr_expiry = *(const int*)optval; + status = TRUE; + break; + +/* size of receive window in sequence numbers. + * 0 < rxw_sqns < one less than half sequence space + */ + case PGM_RXW_SQNS: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(*(const int*)optval <= 0)) + break; + if (PGM_UNLIKELY(*(const int*)optval >= ((UINT32_MAX/2)-1))) + break; + sock->rxw_sqns = *(const int*)optval; + status = TRUE; + break; + +/* size of receive window in seconds. + * 0 < secs < ( rxw_sqns / rxw_max_rte ) + */ + case PGM_RXW_SECS: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(*(const int*)optval <= 0)) + break; + sock->rxw_secs = *(const int*)optval; + status = TRUE; + break; + +/* maximum receive rate, for determining window size with txw_secs. + * 0 < rxw_max_rte < interface capacity + */ + case PGM_RXW_MAX_RTE: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(*(const int*)optval <= 0)) + break; + sock->rxw_max_rte = *(const int*)optval; + status = TRUE; + break; + +/* maximum NAK back-off value nak_rb_ivl in milliseconds. + * 0 < nak_rb_ivl <= nak_bo_ivl + */ + case PGM_NAK_BO_IVL: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(*(const int*)optval <= 0)) + break; + sock->nak_bo_ivl = *(const int*)optval; + status = TRUE; + break; + +/* repeat interval prior to re-sending a NAK, in milliseconds. + */ + case PGM_NAK_RPT_IVL: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(*(const int*)optval <= 0)) + break; + sock->nak_rpt_ivl = *(const int*)optval; + status = TRUE; + break; + +/* interval waiting for repair data, in milliseconds. + */ + case PGM_NAK_RDATA_IVL: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(*(const int*)optval <= 0)) + break; + sock->nak_rdata_ivl = *(const int*)optval; + status = TRUE; + break; + +/* limit for data. + * 0 < nak_data_retries < 256 + */ + case PGM_NAK_DATA_RETRIES: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(*(const int*)optval <= 0)) + break; + if (PGM_UNLIKELY(*(const int*)optval > UINT8_MAX)) + break; + sock->nak_data_retries = *(const int*)optval; + status = TRUE; + break; + +/* limit for NAK confirms. + * 0 < nak_ncf_retries < 256 + */ + case PGM_NAK_NCF_RETRIES: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(*(const int*)optval <= 0)) + break; + if (PGM_UNLIKELY(*(const int*)optval > UINT8_MAX)) + break; + sock->nak_ncf_retries = *(const int*)optval; + status = TRUE; + break; + +/* Enable FEC for this sock, specifically Reed Solmon encoding RS(n,k), common + * setting is RS(255, 223). + * + * inputs: + * + * n = FEC Block size = [k+1, 255] + * k = original data packets == transmission group size = [2, 4, 8, 16, 32, 64, 128] + * m = symbol size = 8 bits + * + * outputs: + * + * h = 2t = n - k = parity packets + * + * when h > k parity packets can be lost. + */ + case PGM_USE_FEC: + if (PGM_UNLIKELY(optlen != sizeof (struct pgm_fecinfo_t))) + break; + { + const struct pgm_fecinfo_t* fecinfo = optval; + if (PGM_UNLIKELY(0 != (fecinfo->group_size & (fecinfo->group_size - 1)))) + break; + if (PGM_UNLIKELY(fecinfo->group_size < 2 || fecinfo->group_size > 128)) + break; + if (PGM_UNLIKELY(fecinfo->group_size > fecinfo->block_size)) + break; + const uint8_t parity_packets = fecinfo->block_size - fecinfo->group_size; +/* technically could re-send previous packets */ + if (PGM_UNLIKELY(fecinfo->proactive_packets > parity_packets)) + break; +/* check validity of parameters */ + if (PGM_UNLIKELY(fecinfo->group_size > 223 && ((parity_packets * 223.0) / fecinfo->group_size) < 1.0)) + { + pgm_error (_("k/h ratio too low to generate parity data.")); + break; + } + sock->use_proactive_parity = (fecinfo->proactive_packets > 0); + sock->use_ondemand_parity = fecinfo->ondemand_parity_enabled; + sock->use_var_pktlen = fecinfo->var_pktlen_enabled; + sock->rs_n = fecinfo->block_size; + sock->rs_k = fecinfo->group_size; + sock->rs_proactive_h = fecinfo->proactive_packets; + } + status = TRUE; + break; + +/* congestion reporting */ + case PGM_USE_CR: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + if (PGM_UNLIKELY(*(const int*)optval <= 0)) + break; + sock->crqst_ivl = *(const int*)optval; + sock->use_cr = (sock->crqst_ivl > 0); + status = TRUE; + break; + +/* congestion control */ + case PGM_USE_PGMCC: + if (PGM_UNLIKELY(optlen != sizeof (struct pgm_pgmccinfo_t))) + break; + { + const struct pgm_pgmccinfo_t* pgmccinfo = optval; + sock->ack_bo_ivl = pgmccinfo->ack_bo_ivl; + sock->ack_c = pgmccinfo->ack_c; + sock->ack_c_p = pgmccinfo->ack_c_p; + sock->use_pgmcc = (sock->ack_c > 0); + } + status = TRUE; + break; + +/* declare socket only for sending, discard any incoming SPM, ODATA, + * RDATA, etc, packets. + */ + case PGM_SEND_ONLY: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + sock->can_recv_data = (0 == *(const int*)optval); + status = TRUE; + break; + +/* declare socket only for receiving, no transmit window will be created + * and no SPM broadcasts sent. + */ + case PGM_RECV_ONLY: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + sock->can_send_data = (0 == *(const int*)optval); + status = TRUE; + break; + +/* passive receiving socket, i.e. no back channel to source + */ + case PGM_PASSIVE: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + sock->can_send_nak = (0 == *(const int*)optval); + status = TRUE; + break; + +/* on unrecoverable data loss stop socket from further transmission and + * receiving. + */ + case PGM_ABORT_ON_RESET: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + sock->is_abort_on_reset = (0 != *(const int*)optval); + status = TRUE; + break; + +/* default non-blocking operation on send and receive sockets. + */ + case PGM_NOBLOCK: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + sock->is_nonblocking = (0 != *(const int*)optval); + pgm_sockaddr_nonblocking (sock->recv_sock, sock->is_nonblocking); + pgm_sockaddr_nonblocking (sock->send_sock, sock->is_nonblocking); + pgm_sockaddr_nonblocking (sock->send_with_router_alert_sock, sock->is_nonblocking); + status = TRUE; + break; + +/* sending group, singular. + */ + case PGM_SEND_GROUP: + if (PGM_UNLIKELY(optlen != sizeof(struct group_req))) + break; + memcpy (&sock->send_gsr, optval, optlen); + ((struct sockaddr_in*)&sock->send_gsr.gsr_group)->sin_port = htons (sock->udp_encap_mcast_port); + if (PGM_UNLIKELY(sock->family != sock->send_gsr.gsr_group.ss_family)) + break; + if (PGM_SOCKET_ERROR == pgm_sockaddr_multicast_if (sock->send_sock, (const struct sockaddr*)&sock->send_gsr.gsr_group, sock->send_gsr.gsr_interface) || + PGM_SOCKET_ERROR == pgm_sockaddr_multicast_if (sock->send_with_router_alert_sock, (const struct sockaddr*)&sock->send_gsr.gsr_group, sock->send_gsr.gsr_interface)) + break; + status = TRUE; + break; + +/* for any-source applications (ASM), join a new group + */ + case PGM_JOIN_GROUP: + if (PGM_UNLIKELY(optlen != sizeof(struct group_req))) + break; + if (PGM_UNLIKELY(sock->recv_gsr_len >= IP_MAX_MEMBERSHIPS)) + break; + { + const struct group_req* gr = optval; +/* verify not duplicate group/interface pairing */ + for (unsigned i = 0; i < sock->recv_gsr_len; i++) + { + if (pgm_sockaddr_cmp ((const struct sockaddr*)&gr->gr_group, (struct sockaddr*)&sock->recv_gsr[i].gsr_group) == 0 && + pgm_sockaddr_cmp ((const struct sockaddr*)&gr->gr_group, (struct sockaddr*)&sock->recv_gsr[i].gsr_source) == 0 && + (gr->gr_interface == sock->recv_gsr[i].gsr_interface || + 0 == sock->recv_gsr[i].gsr_interface )) + { +#ifdef SOCKET_DEBUG + char s[INET6_ADDRSTRLEN]; + pgm_sockaddr_ntop ((const struct sockaddr*)&gr->gr_group, s, sizeof(s)); + if (sock->recv_gsr[i].gsr_interface) { + pgm_warn(_("Socket has already joined group %s on interface %u"), s, gr->gr_interface); + } else { + pgm_warn(_("Socket has already joined group %s on all interfaces."), s); + } +#endif + break; + } + } + if (PGM_UNLIKELY(sock->family != gr->gr_group.ss_family)) + break; + if (PGM_SOCKET_ERROR == pgm_sockaddr_join_group (sock->recv_sock, sock->family, gr)) + break; + sock->recv_gsr[sock->recv_gsr_len].gsr_interface = gr->gr_interface; + memcpy (&sock->recv_gsr[sock->recv_gsr_len].gsr_group, &gr->gr_group, pgm_sockaddr_len ((const struct sockaddr*)&gr->gr_group)); + memcpy (&sock->recv_gsr[sock->recv_gsr_len].gsr_source, &gr->gr_group, pgm_sockaddr_len ((const struct sockaddr*)&gr->gr_group)); + sock->recv_gsr_len++; + } + status = TRUE; + break; + +/* for any-source applications (ASM), leave a joined group. + */ + case PGM_LEAVE_GROUP: + if (PGM_UNLIKELY(optlen != sizeof(struct group_req))) + break; + if (PGM_UNLIKELY(0 == sock->recv_gsr_len)) + break; + { + const struct group_req* gr = optval; + for (unsigned i = 0; i < sock->recv_gsr_len;) + { + if ((pgm_sockaddr_cmp ((const struct sockaddr*)&gr->gr_group, (struct sockaddr*)&sock->recv_gsr[i].gsr_group) == 0) && +/* drop all matching receiver entries */ + (gr->gr_interface == 0 || +/* drop all sources with matching interface */ + gr->gr_interface == sock->recv_gsr[i].gsr_interface) ) + { + sock->recv_gsr_len--; + if (i < (IP_MAX_MEMBERSHIPS - 1)) + { + memmove (&sock->recv_gsr[i], &sock->recv_gsr[i+1], (sock->recv_gsr_len - i) * sizeof(struct group_source_req)); + continue; + } + } + i++; + } + if (PGM_UNLIKELY(sock->family != gr->gr_group.ss_family)) + break; + if (PGM_SOCKET_ERROR == pgm_sockaddr_leave_group (sock->recv_sock, sock->family, gr)) + break; + } + status = TRUE; + break; + +/* for any-source applications (ASM), turn off a given source + */ + case PGM_BLOCK_SOURCE: + if (PGM_UNLIKELY(optlen != sizeof(struct group_source_req))) + break; + { + const struct group_source_req* gsr = optval; + if (PGM_UNLIKELY(sock->family != gsr->gsr_group.ss_family)) + break; + if (PGM_SOCKET_ERROR == pgm_sockaddr_block_source (sock->recv_sock, sock->family, gsr)) + break; + } + status = TRUE; + break; + +/* for any-source applications (ASM), re-allow a blocked source + */ + case PGM_UNBLOCK_SOURCE: + if (PGM_UNLIKELY(optlen != sizeof(struct group_source_req))) + break; + { + const struct group_source_req* gsr = optval; + if (PGM_UNLIKELY(sock->family != gsr->gsr_group.ss_family)) + break; + if (PGM_SOCKET_ERROR == pgm_sockaddr_unblock_source (sock->recv_sock, sock->family, gsr)) + break; + } + status = TRUE; + break; + +/* for controlled-source applications (SSM), join each group/source pair. + * + * SSM joins are allowed on top of ASM in order to merge a remote source onto the local segment. + */ + case PGM_JOIN_SOURCE_GROUP: + if (PGM_UNLIKELY(optlen != sizeof(struct group_source_req))) + break; + if (PGM_UNLIKELY(sock->recv_gsr_len >= IP_MAX_MEMBERSHIPS)) + break; + { + const struct group_source_req* gsr = optval; +/* verify if existing group/interface pairing */ + for (unsigned i = 0; i < sock->recv_gsr_len; i++) + { + if (pgm_sockaddr_cmp ((const struct sockaddr*)&gsr->gsr_group, (struct sockaddr*)&sock->recv_gsr[i].gsr_group) == 0 && + (gsr->gsr_interface == sock->recv_gsr[i].gsr_interface || + 0 == sock->recv_gsr[i].gsr_interface )) + { + if (pgm_sockaddr_cmp ((const struct sockaddr*)&gsr->gsr_source, (struct sockaddr*)&sock->recv_gsr[i].gsr_source) == 0) + { +#ifdef SOCKET_DEBUG + char s1[INET6_ADDRSTRLEN], s2[INET6_ADDRSTRLEN]; + pgm_sockaddr_ntop ((const struct sockaddr*)&gsr->gsr_group, s1, sizeof(s1)); + pgm_sockaddr_ntop ((const struct sockaddr*)&gsr->gsr_source, s2, sizeof(s2)); + if (sock->recv_gsr[i].gsr_interface) { + pgm_warn(_("Socket has already joined group %s from source %s on interface %d"), + s1, s2, (unsigned)gsr->gsr_interface); + } else { + pgm_warn(_("Socket has already joined group %s from source %s on all interfaces"), + s1, s2); + } +#endif + break; + } + break; + } + } + if (PGM_UNLIKELY(sock->family != gsr->gsr_group.ss_family)) + break; + if (PGM_UNLIKELY(sock->family != gsr->gsr_source.ss_family)) + break; + if (PGM_SOCKET_ERROR == pgm_sockaddr_join_source_group (sock->recv_sock, sock->family, gsr)) + break; + memcpy (&sock->recv_gsr[sock->recv_gsr_len], gsr, sizeof(struct group_source_req)); + sock->recv_gsr_len++; + } + status = TRUE; + break; + +/* for controlled-source applications (SSM), leave each group/source pair + */ + case PGM_LEAVE_SOURCE_GROUP: + if (PGM_UNLIKELY(optlen != sizeof(struct group_source_req))) + break; + if (PGM_UNLIKELY(0 == sock->recv_gsr_len)) + break; + { + const struct group_source_req* gsr = optval; +/* verify if existing group/interface pairing */ + for (unsigned i = 0; i < sock->recv_gsr_len; i++) + { + if (pgm_sockaddr_cmp ((const struct sockaddr*)&gsr->gsr_group, (struct sockaddr*)&sock->recv_gsr[i].gsr_group) == 0 && + pgm_sockaddr_cmp ((const struct sockaddr*)&gsr->gsr_source, (struct sockaddr*)&sock->recv_gsr[i].gsr_source) == 0 && + gsr->gsr_interface == sock->recv_gsr[i].gsr_interface) + { + sock->recv_gsr_len--; + if (i < (IP_MAX_MEMBERSHIPS - 1)) + { + memmove (&sock->recv_gsr[i], &sock->recv_gsr[i+1], (sock->recv_gsr_len - i) * sizeof(struct group_source_req)); + break; + } + } + } + if (PGM_UNLIKELY(sock->family != gsr->gsr_group.ss_family)) + break; + if (PGM_UNLIKELY(sock->family != gsr->gsr_source.ss_family)) + break; + if (PGM_SOCKET_ERROR == pgm_sockaddr_leave_source_group (sock->recv_sock, sock->family, gsr)) + break; + } + status = TRUE; + break; + +/* batch block and unblock sources */ + case PGM_MSFILTER: +#if defined(MCAST_MSFILTER) || defined(SIOCSMSFILTER) + if (PGM_UNLIKELY(optlen < sizeof(struct group_filter))) + break; + { + const struct group_filter* gf_list = optval; + if (GROUP_FILTER_SIZE( gf_list->gf_numsrc ) != optlen) + break; + if (PGM_UNLIKELY(sock->family != gf_list->gf_group.ss_family)) + break; +/* check only first */ + if (PGM_UNLIKELY(sock->family != gf_list->gf_slist[0].ss_family)) + break; + if (PGM_SOCKET_ERROR == pgm_sockaddr_msfilter (sock->recv_sock, sock->family, gf_list)) + break; + } + status = TRUE; +#endif + break; + +/* UDP encapsulation ports */ + case PGM_UDP_ENCAP_UCAST_PORT: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + sock->udp_encap_ucast_port = *(const int*)optval; + status = TRUE; + break; + + case PGM_UDP_ENCAP_MCAST_PORT: + if (PGM_UNLIKELY(optlen != sizeof (int))) + break; + sock->udp_encap_mcast_port = *(const int*)optval; + status = TRUE; + break; + + } + + pgm_rwlock_reader_unlock (&sock->lock); + return status; +} + +bool +pgm_bind ( + pgm_sock_t* restrict sock, + const struct pgm_sockaddr_t*const restrict sockaddr, + const socklen_t sockaddrlen, + pgm_error_t** restrict error + ) +{ + struct pgm_interface_req_t null_req; + memset (&null_req, 0, sizeof(null_req)); + return pgm_bind3 (sock, sockaddr, sockaddrlen, &null_req, sizeof(null_req), &null_req, sizeof(null_req), error); +} + +/* bind the sockets to the link layer to start receiving data. + * + * returns TRUE on success, or FALSE on error and sets error appropriately, + */ + +bool +pgm_bind3 ( + pgm_sock_t* restrict sock, + const struct pgm_sockaddr_t* restrict sockaddr, + const socklen_t sockaddrlen, + const struct pgm_interface_req_t* send_req, /* only use gr_interface and gr_group::sin6_scope */ + const socklen_t send_req_len, + const struct pgm_interface_req_t* recv_req, + const socklen_t recv_req_len, + pgm_error_t** restrict error /* maybe NULL */ + ) +{ + pgm_return_val_if_fail (NULL != sock, FALSE); + pgm_return_val_if_fail (NULL != sockaddr, FALSE); + pgm_return_val_if_fail (0 != sockaddrlen, FALSE); + if (sockaddr->sa_addr.sport) pgm_return_val_if_fail (sockaddr->sa_addr.sport != sockaddr->sa_port, FALSE); + pgm_return_val_if_fail (NULL != send_req, FALSE); + pgm_return_val_if_fail (sizeof(struct pgm_interface_req_t) == send_req_len, FALSE); + pgm_return_val_if_fail (NULL != recv_req, FALSE); + pgm_return_val_if_fail (sizeof(struct pgm_interface_req_t) == recv_req_len, FALSE); + + if (!pgm_rwlock_writer_trylock (&sock->lock)) + pgm_return_val_if_reached (FALSE); + if (sock->is_bound || + sock->is_destroyed) + { + pgm_rwlock_writer_unlock (&sock->lock); + pgm_return_val_if_reached (FALSE); + } + +/* sanity checks on state */ + if (sock->max_tpdu < (sizeof(struct pgm_ip) + sizeof(struct pgm_header))) { + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + PGM_ERROR_FAILED, + _("Invalid maximum TPDU size.")); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + if (sock->can_send_data) { + if (PGM_UNLIKELY(0 == sock->spm_ambient_interval)) { + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + PGM_ERROR_FAILED, + _("SPM ambient interval not configured.")); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + if (PGM_UNLIKELY(0 == sock->spm_heartbeat_len)) { + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + PGM_ERROR_FAILED, + _("SPM heartbeat interval not configured.")); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + if (PGM_UNLIKELY(0 == sock->txw_sqns && 0 == sock->txw_secs)) { + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + PGM_ERROR_FAILED, + _("TXW_SQNS not configured.")); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + if (PGM_UNLIKELY(0 == sock->txw_sqns && 0 == sock->txw_max_rte)) { + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + PGM_ERROR_FAILED, + _("TXW_MAX_RTE not configured.")); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + } + if (sock->can_recv_data) { + if (PGM_UNLIKELY(0 == sock->rxw_sqns && 0 == sock->rxw_secs)) { + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + PGM_ERROR_FAILED, + _("RXW_SQNS not configured.")); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + if (PGM_UNLIKELY(0 == sock->rxw_sqns && 0 == sock->rxw_max_rte)) { + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + PGM_ERROR_FAILED, + _("RXW_MAX_RTE not configured.")); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + if (PGM_UNLIKELY(0 == sock->peer_expiry)) { + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + PGM_ERROR_FAILED, + _("Peer timeout not configured.")); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + if (PGM_UNLIKELY(0 == sock->spmr_expiry)) { + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + PGM_ERROR_FAILED, + _("SPM-Request timeout not configured.")); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + if (PGM_UNLIKELY(0 == sock->nak_bo_ivl)) { + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + PGM_ERROR_FAILED, + _("NAK_BO_IVL not configured.")); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + if (PGM_UNLIKELY(0 == sock->nak_rpt_ivl)) { + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + PGM_ERROR_FAILED, + _("NAK_RPT_IVL not configured.")); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + if (PGM_UNLIKELY(0 == sock->nak_rdata_ivl)) { + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + PGM_ERROR_FAILED, + _("NAK_RDATA_IVL not configured.")); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + if (PGM_UNLIKELY(0 == sock->nak_data_retries)) { + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + PGM_ERROR_FAILED, + _("NAK_DATA_RETRIES not configured.")); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + if (PGM_UNLIKELY(0 == sock->nak_ncf_retries)) { + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + PGM_ERROR_FAILED, + _("NAK_NCF_RETRIES not configured.")); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + } + + pgm_debug ("bind3 (sock:%p sockaddr:%p sockaddrlen:%u send-req:%p send-req-len:%u recv-req:%p recv-req-len:%u error:%p)", + (const void*)sock, (const void*)sockaddr, (unsigned)sockaddrlen, (const void*)send_req, (unsigned)send_req_len, (const void*)recv_req, (unsigned)recv_req_len, (const void*)error); + + memcpy (&sock->tsi, &sockaddr->sa_addr, sizeof(pgm_tsi_t)); + sock->dport = htons (sockaddr->sa_port); + if (sock->tsi.sport) { + sock->tsi.sport = htons (sock->tsi.sport); + } else { + do { + sock->tsi.sport = htons (pgm_random_int_range (0, UINT16_MAX)); + } while (sock->tsi.sport == sock->dport); + } + +/* UDP encapsulation port */ + if (sock->udp_encap_mcast_port) { + ((struct sockaddr_in*)&sock->send_gsr.gsr_group)->sin_port = htons (sock->udp_encap_mcast_port); + } + +/* pseudo-random number generator for back-off intervals */ + pgm_rand_create (&sock->rand_); + +/* PGM Children support of POLLs requires 32-bit random node identifier RAND_NODE_ID */ + if (sock->can_recv_data) { + sock->rand_node_id = pgm_rand_int (&sock->rand_); + } + + if (sock->can_send_data) + { +/* Windows notify call will raise an assertion on error, only Unix versions will return + * a valid error. + */ + if (sock->use_pgmcc && + 0 != pgm_notify_init (&sock->ack_notify)) + { + const int save_errno = errno; + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + pgm_error_from_errno (save_errno), + _("Creating ACK notification channel: %s"), + strerror (save_errno)); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + if (0 != pgm_notify_init (&sock->rdata_notify)) + { + const int save_errno = errno; + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + pgm_error_from_errno (save_errno), + _("Creating RDATA notification channel: %s"), + strerror (save_errno)); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + } + if (0 != pgm_notify_init (&sock->pending_notify)) + { + const int save_errno = errno; + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + pgm_error_from_errno (save_errno), + _("Creating waiting peer notification channel: %s"), + strerror (save_errno)); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + +/* determine IP header size for rate regulation engine & stats */ + sock->iphdr_len = (AF_INET == sock->family) ? sizeof(struct pgm_ip) : sizeof(struct pgm_ip6_hdr); + pgm_trace (PGM_LOG_ROLE_NETWORK,"Assuming IP header size of %zu bytes", sock->iphdr_len); + + if (sock->udp_encap_ucast_port) { + const size_t udphdr_len = sizeof(struct pgm_udphdr); + pgm_trace (PGM_LOG_ROLE_NETWORK,"Assuming UDP header size of %zu bytes", udphdr_len); + sock->iphdr_len += udphdr_len; + } + + const sa_family_t pgmcc_family = sock->use_pgmcc ? sock->family : 0; + sock->max_tsdu = sock->max_tpdu - sock->iphdr_len - pgm_pkt_offset (FALSE, pgmcc_family); + sock->max_tsdu_fragment = sock->max_tpdu - sock->iphdr_len - pgm_pkt_offset (TRUE, pgmcc_family); + const unsigned max_fragments = sock->txw_sqns ? MIN( PGM_MAX_FRAGMENTS, sock->txw_sqns ) : PGM_MAX_FRAGMENTS; + sock->max_apdu = MIN( PGM_MAX_APDU, max_fragments * sock->max_tsdu_fragment ); + + if (sock->can_send_data) + { + pgm_trace (PGM_LOG_ROLE_TX_WINDOW,_("Create transmit window.")); + sock->window = sock->txw_sqns ? + pgm_txw_create (&sock->tsi, + 0, /* MAX_TPDU */ + sock->txw_sqns, /* TXW_SQNS */ + 0, /* TXW_SECS */ + 0, /* TXW_MAX_RTE */ + sock->use_ondemand_parity || sock->use_proactive_parity, + sock->rs_n, + sock->rs_k) : + pgm_txw_create (&sock->tsi, + sock->max_tpdu, /* MAX_TPDU */ + 0, /* TXW_SQNS */ + sock->txw_secs, /* TXW_SECS */ + sock->txw_max_rte, /* TXW_MAX_RTE */ + sock->use_ondemand_parity || sock->use_proactive_parity, + sock->rs_n, + sock->rs_k); + pgm_assert (NULL != sock->window); + } + +/* create peer list */ + if (sock->can_recv_data) { + sock->peers_hashtable = pgm_hashtable_new (pgm_tsi_hash, pgm_tsi_equal); + pgm_assert (NULL != sock->peers_hashtable); + } + + if (IPPROTO_UDP == sock->protocol) + { +/* Stevens: "SO_REUSEADDR has datatype int." + */ + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Set socket sharing.")); + const int v = 1; + if (PGM_SOCKET_ERROR == setsockopt (sock->recv_sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&v, sizeof(v)) || + PGM_SOCKET_ERROR == setsockopt (sock->send_sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&v, sizeof(v)) || + PGM_SOCKET_ERROR == setsockopt (sock->send_with_router_alert_sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&v, sizeof(v))) + { + const int save_errno = pgm_sock_errno(); + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + pgm_error_from_sock_errno (save_errno), + _("Enabling reuse of socket local address: %s"), + pgm_sock_strerror (save_errno)); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + +/* request extra packet information to determine destination address on each packet */ +#ifndef CONFIG_TARGET_WINE + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Request socket packet-info.")); + const sa_family_t recv_family = sock->family; + if (PGM_SOCKET_ERROR == pgm_sockaddr_pktinfo (sock->recv_sock, recv_family, TRUE)) + { + const int save_errno = pgm_sock_errno(); + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + pgm_error_from_sock_errno (save_errno), + _("Enabling receipt of ancillary information per incoming packet: %s"), + pgm_sock_strerror (save_errno)); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } +#endif + } + else + { + const sa_family_t recv_family = sock->family; + if (AF_INET == recv_family) + { +/* include IP header only for incoming data, only works for IPv4 */ + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Request IP headers.")); + if (PGM_SOCKET_ERROR == pgm_sockaddr_hdrincl (sock->recv_sock, recv_family, TRUE)) + { + const int save_errno = pgm_sock_errno(); + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + pgm_error_from_sock_errno (save_errno), + _("Enabling IP header in front of user data: %s"), + pgm_sock_strerror (save_errno)); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + } + else + { + pgm_assert (AF_INET6 == recv_family); + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Request socket packet-info.")); + if (PGM_SOCKET_ERROR == pgm_sockaddr_pktinfo (sock->recv_sock, recv_family, TRUE)) + { + const int save_errno = pgm_sock_errno(); + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + pgm_error_from_sock_errno (save_errno), + _("Enabling receipt of control message per incoming datagram: %s"), + pgm_sock_strerror (save_errno)); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + } + } + +/* Bind UDP sockets to interfaces, note multicast on a bound interface is + * fruity on some platforms. Roughly, binding to INADDR_ANY provides all + * data, binding to the multicast group provides only multicast traffic, + * and binding to the interface address provides only unicast traffic. + * + * Multicast routing, IGMP & MLD require a link local address, for IPv4 + * this is provided through MULTICAST_IF and IPv6 through bind, and these + * may be overridden by per packet scopes. + * + * After binding, default interfaces (0.0.0.0) are resolved. + */ +/* TODO: different ports requires a new bound socket */ + + union { + struct sockaddr sa; + struct sockaddr_in s4; + struct sockaddr_in6 s6; + struct sockaddr_storage ss; + } recv_addr, recv_addr2, send_addr, send_with_router_alert_addr; + +#ifdef CONFIG_BIND_INADDR_ANY +/* force default interface for bind-only, source address is still valid for multicast membership. + * effectively same as running getaddrinfo(hints = {ai_flags = AI_PASSIVE}) + */ + if (AF_INET == sock->family) { + memset (&recv_addr.s4, 0, sizeof(struct sockaddr_in)); + recv_addr.s4.sin_family = AF_INET; + recv_addr.s4.sin_addr.s_addr = INADDR_ANY; + } else { + memset (&recv_addr.s6, 0, sizeof(struct sockaddr_in6)); + recv_addr.s6.sin6_family = AF_INET6; + recv_addr.s6.sin6_addr = in6addr_any; + } + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Binding receive socket to INADDR_ANY.")); +#else + if (!pgm_if_indextoaddr (recv_req->ir_interface, + sock->family, + recv_req->ir_scope_id, + &recv_addr.sa, + error)) + { + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Binding receive socket to interface index %u scope %u"), + recv_req->ir_interface, + recv_req->ir_scope_id); + +#endif /* CONFIG_BIND_INADDR_ANY */ + + memcpy (&recv_addr2.sa, &recv_addr.sa, pgm_sockaddr_len (&recv_addr.sa)); + ((struct sockaddr_in*)&recv_addr)->sin_port = htons (sock->udp_encap_mcast_port); + if (PGM_SOCKET_ERROR == bind (sock->recv_sock, + &recv_addr.sa, + pgm_sockaddr_len (&recv_addr.sa))) + { + char addr[INET6_ADDRSTRLEN]; + pgm_sockaddr_ntop ((struct sockaddr*)&recv_addr, addr, sizeof(addr)); + const int save_errno = pgm_sock_errno(); + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + pgm_error_from_sock_errno (save_errno), + _("Binding receive socket to address %s: %s"), + addr, + pgm_sock_strerror (save_errno)); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + + if (PGM_UNLIKELY(pgm_log_mask & PGM_LOG_ROLE_NETWORK)) + { + char s[INET6_ADDRSTRLEN]; + pgm_sockaddr_ntop ((struct sockaddr*)&recv_addr, s, sizeof(s)); + pgm_debug ("bind succeeded on recv_gsr[0] interface %s", s); + } + +/* keep a copy of the original address source to re-use for router alert bind */ + memset (&send_addr, 0, sizeof(send_addr)); + + if (!pgm_if_indextoaddr (send_req->ir_interface, + sock->family, + send_req->ir_scope_id, + (struct sockaddr*)&send_addr, + error)) + { + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + else if (PGM_UNLIKELY(pgm_log_mask & PGM_LOG_ROLE_NETWORK)) + { + pgm_trace (PGM_LOG_ROLE_NETWORK,_("Binding send socket to interface index %u scope %u"), + send_req->ir_interface, + send_req->ir_scope_id); + } + + memcpy (&send_with_router_alert_addr, &send_addr, pgm_sockaddr_len ((struct sockaddr*)&send_addr)); + if (PGM_SOCKET_ERROR == bind (sock->send_sock, + (struct sockaddr*)&send_addr, + pgm_sockaddr_len ((struct sockaddr*)&send_addr))) + { + char addr[INET6_ADDRSTRLEN]; + pgm_sockaddr_ntop ((struct sockaddr*)&send_addr, addr, sizeof(addr)); + const int save_errno = pgm_sock_errno(); + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + pgm_error_from_sock_errno (save_errno), + _("Binding send socket to address %s: %s"), + addr, + pgm_sock_strerror (save_errno)); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + +/* resolve bound address if 0.0.0.0 */ + if (AF_INET == send_addr.ss.ss_family) + { + if ((INADDR_ANY == ((struct sockaddr_in*)&send_addr)->sin_addr.s_addr) && + !pgm_if_getnodeaddr (AF_INET, (struct sockaddr*)&send_addr, sizeof(send_addr), error)) + { + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + } + else if ((memcmp (&in6addr_any, &((struct sockaddr_in6*)&send_addr)->sin6_addr, sizeof(in6addr_any)) == 0) && + !pgm_if_getnodeaddr (AF_INET6, (struct sockaddr*)&send_addr, sizeof(send_addr), error)) + { + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + + if (PGM_UNLIKELY(pgm_log_mask & PGM_LOG_ROLE_NETWORK)) + { + char s[INET6_ADDRSTRLEN]; + pgm_sockaddr_ntop ((struct sockaddr*)&send_addr, s, sizeof(s)); + pgm_debug ("bind succeeded on send_gsr interface %s", s); + } + + if (PGM_SOCKET_ERROR == bind (sock->send_with_router_alert_sock, + (struct sockaddr*)&send_with_router_alert_addr, + pgm_sockaddr_len((struct sockaddr*)&send_with_router_alert_addr))) + { + char addr[INET6_ADDRSTRLEN]; + pgm_sockaddr_ntop ((struct sockaddr*)&send_with_router_alert_addr, addr, sizeof(addr)); + const int save_errno = pgm_sock_errno(); + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + pgm_error_from_sock_errno (save_errno), + _("Binding IP Router Alert (RFC 2113) send socket to address %s: %s"), + addr, + pgm_sock_strerror (save_errno)); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + + if (PGM_UNLIKELY(pgm_log_mask & PGM_LOG_ROLE_NETWORK)) + { + char s[INET6_ADDRSTRLEN]; + pgm_sockaddr_ntop ((struct sockaddr*)&send_with_router_alert_addr, s, sizeof(s)); + pgm_debug ("bind (router alert) succeeded on send_gsr interface %s", s); + } + +/* save send side address for broadcasting as source nla */ + memcpy (&sock->send_addr, &send_addr, pgm_sockaddr_len ((struct sockaddr*)&send_addr)); + +/* rx to nak processor notify channel */ + if (sock->can_send_data) + { +/* setup rate control */ + if (sock->txw_max_rte) + { + pgm_trace (PGM_LOG_ROLE_RATE_CONTROL,_("Setting rate regulation to %zd bytes per second."), + sock->txw_max_rte); + + pgm_rate_create (&sock->rate_control, sock->txw_max_rte, sock->iphdr_len, sock->max_tpdu); + sock->is_controlled_spm = TRUE; /* must always be set */ + sock->is_controlled_odata = TRUE; + sock->is_controlled_rdata = TRUE; + } + else + { + sock->is_controlled_spm = FALSE; + sock->is_controlled_odata = FALSE; + sock->is_controlled_rdata = FALSE; + } + } + +/* allocate first incoming packet buffer */ + sock->rx_buffer = pgm_alloc_skb (sock->max_tpdu); + +/* bind complete */ + sock->is_bound = TRUE; + +/* cleanup */ + pgm_rwlock_writer_unlock (&sock->lock); + pgm_debug ("PGM socket successfully bound."); + return TRUE; +} + +bool +pgm_connect ( + pgm_sock_t* restrict sock, + pgm_error_t** restrict error /* maybe NULL */ + ) +{ + pgm_return_val_if_fail (sock != NULL, FALSE); + pgm_return_val_if_fail (sock->recv_gsr_len > 0, FALSE); +#ifdef CONFIG_TARGET_WINE + pgm_return_val_if_fail (sock->recv_gsr_len == 1, FALSE); +#endif + for (unsigned i = 0; i < sock->recv_gsr_len; i++) + { + pgm_return_val_if_fail (sock->recv_gsr[i].gsr_group.ss_family == sock->recv_gsr[0].gsr_group.ss_family, FALSE); + pgm_return_val_if_fail (sock->recv_gsr[i].gsr_group.ss_family == sock->recv_gsr[i].gsr_source.ss_family, FALSE); + } + pgm_return_val_if_fail (sock->send_gsr.gsr_group.ss_family == sock->recv_gsr[0].gsr_group.ss_family, FALSE); +/* shutdown */ + if (PGM_UNLIKELY(!pgm_rwlock_writer_trylock (&sock->lock))) + pgm_return_val_if_reached (FALSE); +/* state */ + if (PGM_UNLIKELY(sock->is_connected || !sock->is_bound || sock->is_destroyed)) { + pgm_rwlock_writer_unlock (&sock->lock); + pgm_return_val_if_reached (FALSE); + } + + pgm_debug ("connect (sock:%p error:%p)", + (const void*)sock, (const void*)error); + +/* rx to nak processor notify channel */ + if (sock->can_send_data) + { +/* announce new sock by sending out SPMs */ + if (!pgm_send_spm (sock, PGM_OPT_SYN) || + !pgm_send_spm (sock, PGM_OPT_SYN) || + !pgm_send_spm (sock, PGM_OPT_SYN)) + { + const int save_errno = pgm_sock_errno(); + pgm_set_error (error, + PGM_ERROR_DOMAIN_SOCKET, + pgm_error_from_sock_errno (save_errno), + _("Sending SPM broadcast: %s"), + pgm_sock_strerror (save_errno)); + pgm_rwlock_writer_unlock (&sock->lock); + return FALSE; + } + + sock->next_poll = sock->next_ambient_spm = pgm_time_update_now() + sock->spm_ambient_interval; + +/* start PGMCC with one token */ + sock->tokens = sock->cwnd_size = pgm_fp8 (1); + +/* slow start threshold */ + sock->ssthresh = pgm_fp8 (4); + +/* ACK timeout, should be greater than first SPM heartbeat interval in order to be scheduled correctly */ + sock->ack_expiry_ivl = pgm_secs (3); + +/* start full history */ + sock->ack_bitmap = 0xffffffff; + } + else + { + pgm_assert (sock->can_recv_data); + sock->next_poll = pgm_time_update_now() + pgm_secs( 30 ); + } + + sock->is_connected = TRUE; + +/* cleanup */ + pgm_rwlock_writer_unlock (&sock->lock); + pgm_debug ("PGM socket successfully connected."); + return TRUE; +} + +/* return local endpoint address + */ + +bool +pgm_getsockname ( + pgm_sock_t* const restrict sock, + struct pgm_sockaddr_t* restrict addr, + socklen_t* restrict addrlen + ) +{ + pgm_assert (NULL != sock); + pgm_assert (NULL != addr); + pgm_assert (NULL != addrlen); + pgm_assert (sizeof(struct pgm_sockaddr_t) == *addrlen); + + if (!sock->is_bound) { + errno = EBADF; + return FALSE; + } + + addr->sa_port = sock->dport; + memcpy (&addr->sa_addr, &sock->tsi, sizeof(pgm_tsi_t)); + return TRUE; +} + +/* add select parameters for the receive socket(s) + * + * returns highest file descriptor used plus one. + */ + +int +pgm_select_info ( + pgm_sock_t* const restrict sock, + fd_set* const restrict readfds, /* blocking recv fds */ + fd_set* const restrict writefds, /* blocking send fds */ + int* const restrict n_fds /* in: max fds, out: max (in:fds, sock:fds) */ + ) +{ + int fds = 0; + + pgm_assert (NULL != sock); + pgm_assert (NULL != n_fds); + + if (!sock->is_bound || sock->is_destroyed) + { + errno = EBADF; + return -1; + } + + const bool is_congested = (sock->use_pgmcc && sock->tokens < pgm_fp8 (1)) ? TRUE : FALSE; + + if (readfds) + { + FD_SET(sock->recv_sock, readfds); + fds = sock->recv_sock + 1; + if (sock->can_send_data) { + const int rdata_fd = pgm_notify_get_fd (&sock->rdata_notify); + FD_SET(rdata_fd, readfds); + fds = MAX(fds, rdata_fd + 1); + if (is_congested) { + const int ack_fd = pgm_notify_get_fd (&sock->ack_notify); + FD_SET(ack_fd, readfds); + fds = MAX(fds, ack_fd + 1); + } + } + const int pending_fd = pgm_notify_get_fd (&sock->pending_notify); + FD_SET(pending_fd, readfds); + fds = MAX(fds, pending_fd + 1); + } + + if (sock->can_send_data && writefds && !is_congested) + { + FD_SET(sock->send_sock, writefds); + fds = MAX(sock->send_sock + 1, fds); + } + + return *n_fds = MAX(fds, *n_fds); +} + +#ifdef CONFIG_HAVE_POLL +/* add poll parameters for the receive socket(s) + * + * returns number of pollfd structures filled. + */ + +int +pgm_poll_info ( + pgm_sock_t* const restrict sock, + struct pollfd* const restrict fds, + int* const restrict n_fds, /* in: #fds, out: used #fds */ + const int events /* POLLIN, POLLOUT */ + ) +{ + pgm_assert (NULL != sock); + pgm_assert (NULL != fds); + pgm_assert (NULL != n_fds); + + if (!sock->is_bound || sock->is_destroyed) + { + errno = EBADF; + return -1; + } + + int moo = 0; + +/* we currently only support one incoming socket */ + if (events & POLLIN) + { + pgm_assert ( (1 + moo) <= *n_fds ); + fds[moo].fd = sock->recv_sock; + fds[moo].events = POLLIN; + moo++; + if (sock->can_send_data) { + pgm_assert ( (1 + moo) <= *n_fds ); + fds[moo].fd = pgm_notify_get_fd (&sock->rdata_notify); + fds[moo].events = POLLIN; + moo++; + } + pgm_assert ( (1 + moo) <= *n_fds ); + fds[moo].fd = pgm_notify_get_fd (&sock->pending_notify); + fds[moo].events = POLLIN; + moo++; + } + +/* ODATA only published on regular socket, no need to poll router-alert sock */ + if (sock->can_send_data && events & POLLOUT) + { + pgm_assert ( (1 + moo) <= *n_fds ); + if (sock->use_pgmcc && sock->tokens < pgm_fp8 (1)) { +/* rx thread poll for ACK */ + fds[moo].fd = pgm_notify_get_fd (&sock->ack_notify); + fds[moo].events = POLLIN; + } else { +/* kernel resource poll */ + fds[moo].fd = sock->send_sock; + fds[moo].events = POLLOUT; + } + moo++; + } + + return *n_fds = moo; +} +#endif /* CONFIG_HAVE_POLL */ + +/* add epoll parameters for the recieve socket(s), events should + * be set to EPOLLIN to wait for incoming events (data), and EPOLLOUT to wait + * for non-blocking write. + * + * returns 0 on success, -1 on failure and sets errno appropriately. + */ +#ifdef CONFIG_HAVE_EPOLL +int +pgm_epoll_ctl ( + pgm_sock_t* const sock, + const int epfd, + const int op, /* EPOLL_CTL_ADD, ... */ + const int events /* EPOLLIN, EPOLLOUT */ + ) +{ + if (!(op == EPOLL_CTL_ADD || op == EPOLL_CTL_MOD)) + { + errno = EINVAL; + return -1; + } + else if (!sock->is_bound || sock->is_destroyed) + { + errno = EBADF; + return -1; + } + + struct epoll_event event; + int retval = 0; + + if (events & EPOLLIN) + { + event.events = events & (EPOLLIN | EPOLLET | EPOLLONESHOT); + event.data.ptr = sock; + retval = epoll_ctl (epfd, op, sock->recv_sock, &event); + if (retval) + goto out; + if (sock->can_send_data) { + retval = epoll_ctl (epfd, op, pgm_notify_get_fd (&sock->rdata_notify), &event); + if (retval) + goto out; + } + retval = epoll_ctl (epfd, op, pgm_notify_get_fd (&sock->pending_notify), &event); + if (retval) + goto out; + + if (events & EPOLLET) + sock->is_edge_triggered_recv = TRUE; + } + + if (sock->can_send_data && events & EPOLLOUT) + { + bool enable_ack_socket = FALSE; + bool enable_send_socket = FALSE; + +/* both sockets need to be added when PGMCC is enabled */ + if (sock->use_pgmcc && EPOLL_CTL_ADD == op) { + enable_ack_socket = enable_send_socket = TRUE; + } else { +/* automagically switch socket when congestion stall occurs */ + if (sock->use_pgmcc && sock->tokens < pgm_fp8 (1)) + enable_ack_socket = TRUE; + else + enable_send_socket = TRUE; + } + + if (enable_ack_socket) + { +/* rx thread poll for ACK */ + event.events = EPOLLIN | (events & (EPOLLONESHOT)); + event.data.ptr = sock; + retval = epoll_ctl (epfd, op, pgm_notify_get_fd (&sock->ack_notify), &event); + } + + if (enable_send_socket) + { +/* kernel resource poll */ + event.events = events & (EPOLLOUT | EPOLLET | EPOLLONESHOT); + event.data.ptr = sock; + retval = epoll_ctl (epfd, op, sock->send_sock, &event); + } + } +out: + return retval; +} +#endif + +static +const char* +pgm_family_string ( + const int family + ) +{ + const char* c; + + switch (family) { + case AF_UNSPEC: c = "AF_UNSPEC"; break; + case AF_INET: c = "AF_INET"; break; + case AF_INET6: c = "AF_INET6"; break; + default: c = "(unknown)"; break; + } + + return c; +} + +static +const char* +pgm_sock_type_string ( + const int sock_type + ) +{ + const char* c; + + switch (sock_type) { + case SOCK_SEQPACKET: c = "SOCK_SEQPACKET"; break; + default: c = "(unknown)"; break; + } + + return c; +} + +static +const char* +pgm_protocol_string ( + const int protocol + ) +{ + const char* c; + + switch (protocol) { + case IPPROTO_UDP: c = "IPPROTO_UDP"; break; + case IPPROTO_PGM: c = "IPPROTO_PGM"; break; + default: c = "(unknown)"; break; + } + + return c; +} + +/* eof */ |