/* # Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg # # This program is free software distributed under the GPL version 2. # See http://openslx.org/COPYING # # If you have any feedback please consult http://openslx.org/feedback and # send your suggestions, praise, or complaints to feedback@openslx.org # # General information about OpenSLX can be found at http://openslx.org/ # ----------------------------------------------------------------------------- # src/net/mcast/McastPGMSocket.cpp # - wrap OpenPGM Sockets in a nicer interface -- implementation # ----------------------------------------------------------------------------- */ #include #include #include #include #include #include #include #include #include #include #include "McastPGMSocket.h" using namespace std; class McastPGMSocket_priv { public: McastPGMSocket_priv() : socket(0), send_notif(0) { } ~McastPGMSocket_priv() { if (socket) pgm_close(socket, 0); Q_FOREACH(QSocketNotifier* notif, _notifs) { delete notif; } if (send_notif) delete send_notif; } pgm_sock_t* socket; McastPGMSocket::Direction direction; QSocketNotifier* send_notif; QList _notifs; QSocketNotifier* notifier_for(int fd) { Q_FOREACH(QSocketNotifier* notif, _notifs) { if(notif->socket() == fd) { return notif; } } return 0; } }; static void _ensurePGMInited() { if (!pgm_supported()) { pgm_error_t* err; int good = pgm_init(&err); if (!good) { qCritical() << "Could not init OpenPGM library: PGM Error: " << (err->message ? err->message : "(null)"); std::exit(1); } } } McastPGMSocket::McastPGMSocket(QObject* parent) : QObject(parent), _priv(new McastPGMSocket_priv), _opened(false), _finished(false), _nakTimeout(new QTimer()), _dataTimeout(new QTimer()), _sendTimeout(new QTimer()), _shutdownTimer(0), _shutdown_timeout(0) { _ensurePGMInited(); _nakTimeout->setSingleShot(true); _dataTimeout->setSingleShot(true); _sendTimeout->setSingleShot(true); connect(_nakTimeout, SIGNAL(timeout()), this, SLOT(handleNakTimeout())); connect(_dataTimeout, SIGNAL(timeout()), this, SLOT(handleDataTimeout())); connect(_sendTimeout, SIGNAL(timeout()), this, SLOT(canSend())); } McastPGMSocket::~McastPGMSocket() { delete _priv; delete _nakTimeout; delete _dataTimeout; delete _sendTimeout; } bool McastPGMSocket::open(McastConfiguration const* config, Direction direction) { _priv->direction = direction; pgm_error_t* err = 0; int good; pgm_addrinfo_t* addrinfo; // parse the address string good = pgm_getaddrinfo((config->multicastInterface() + ";" + config->multicastAddress()).toLatin1().constData(), 0, &addrinfo, &err); if (!good) { qCritical() << "Could not parse address info: PGM Error: " << err->message; } sa_family_t family = addrinfo->ai_send_addrs[0].gsr_group.ss_family; if(config->multicastUseUDP()) { good = pgm_socket(&_priv->socket, family, SOCK_SEQPACKET, IPPROTO_UDP, &err); } else { good = pgm_socket(&_priv->socket, family, SOCK_SEQPACKET, IPPROTO_PGM, &err); } if (!good) { qCritical() << "Could not open socket: PGM Error: " << err->message; pgm_error_free(err); return false; } unsigned const ambient_spm = 2000 * 1000; // every one hundred milliseconds (approx.) /* Options for sending data */ const int spm_heartbeat[] = { 512 * 1000, 1024 * 1000, 2048 * 1000, 4096 * 1000 }, max_rate = 0, max_window = config->multicastWinSize() * config->multicastRate() / config->multicastMTU(); // const int max_window_sqns = 3000; qDebug() << "Computed window size " << max_window << " packets"; // pgm_setsockopt(_priv->socket, PGM_SEND_ONLY, &send_only, // sizeof(send_only)); // SPM messages pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_AMBIENT_SPM, &ambient_spm, sizeof(ambient_spm)); pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_HEARTBEAT_SPM, &spm_heartbeat, sizeof(spm_heartbeat)); // Transmit window pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_TXW_MAX_RTE, &max_rate, sizeof(max_rate)); // pgm_setsockopt(_priv->socket, PGM_TXW_SECS, &max_window, sizeof(max_window)); pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_TXW_SQNS, &max_window, sizeof(max_window)); /* Options for receiving data */ const int passive = 0, spmr_expiry = 500 * 1000, nak_bo_ivl = 200 * 1000, nak_rpt_ivl = 500 * 1000, nak_rdata_ivl = 500 * 1000, nak_data_retries = 50, nak_ncf_retries = 50; qDebug() << "Computed window size " << max_window << " packets"; // pgm_setsockopt(_priv->socket, PGM_RECV_ONLY, &recv_only, sizeof(recv_only)); pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_PASSIVE, &passive, sizeof(passive)); // pgm_setsockopt(_priv->socket, PGM_RXW_MAX_RTE, &max_rate, sizeof(max_rate)); // pgm_setsockopt(_priv->socket, PGM_RXW_SECS, &max_window, sizeof(max_window)); pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry)); pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl)); pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl)); pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl)); pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries)); pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries)); pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_RXW_SQNS, &max_window, sizeof(max_window)); /* Try using PGMCC */ const struct pgm_pgmccinfo_t pgmccinfo = { 100 /* usecs */ * 1000 /* msecs */, 75 /* from OpenPGM examples */, 500 /* from PGMCC internet-draft */ }; good = pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_USE_PGMCC, &pgmccinfo, sizeof(pgmccinfo)); if(!good) { qCritical() << "Could not enable PGMCC"; return false; } // /* Forward Error Correction */ // const struct pgm_fecinfo_t pgmfecinfo = { // 255 /* from OpenPGM examples */, // 2 /* send two proactive packets */, // 8 /* from OpenPGM examples */, // 1 /* enable on-demand parity */, // 1 /* enable variable packet length */ // }; // good = pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_USE_FEC, &pgmfecinfo, sizeof(pgmfecinfo)); // if(!good) // { // qCritical() << "Could not enable FEC"; // return false; // } // Peer Expiry: We will give 1 minute. int const peer_expiry = 60 /* seconds */ * 1000000 /* microseconds */; pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry)); // MTU int const mtu = config->multicastMTU(); pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MTU, &mtu, sizeof(mtu)); pgm_sockaddr_t addr; addr.sa_addr.sport = config->multicastSPort(); addr.sa_port = config->multicastDPort(); good = pgm_gsi_create_from_hostname(&addr.sa_addr.gsi, &err); if (!good) { qCritical() << "Could not generate a GSI: PGM Error: " << err->message; pgm_error_free(err); return false; } struct pgm_interface_req_t ifreq; ifreq.ir_interface = addrinfo->ai_send_addrs[0].gsr_interface; ifreq.ir_scope_id = 0; if (AF_INET6 == family) { ifreq.ir_scope_id = ((struct sockaddr_in6*)&addrinfo->ai_send_addrs[0])->sin6_scope_id; } // UDP Encapsulation if(config->multicastUseUDP()) { const int uport = config->multicastUDPUPort(); const int mport = config->multicastUDPMPort(); pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, &mport, sizeof(mport)); pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &uport, sizeof(uport)); } good = pgm_bind3(_priv->socket, &addr, sizeof(addr), &ifreq , sizeof(ifreq), &ifreq, sizeof(ifreq), &err); if (!good) { qCritical() << "Could not bind socket: PGM Error: " << err->message; pgm_error_free(err); return false; } // qDebug() << "Max APDU is " << _priv->socket->max_apdu; // qDebug() << "Max TPDU is " << _priv->socket->max_tpdu; // qDebug() << "Max TSDU Fragment is " << _priv->socket->max_tsdu_fragment; // qDebug() << "TXW_SQNS is " << _priv->socket->txw_sqns; // join the group for (unsigned i = 0; i < addrinfo->ai_recv_addrs_len; i++) { pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_JOIN_GROUP, &addrinfo->ai_recv_addrs[i], sizeof(struct group_req)); } // set send address pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_SEND_GROUP, &addrinfo->ai_send_addrs[0], sizeof(struct group_req)); // IP parameters const int nonblocking = 1, multicast_loop = 0, multicast_hops = 16; pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MULTICAST_LOOP, &multicast_loop, sizeof(multicast_loop)); pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MULTICAST_HOPS, &multicast_hops, sizeof(multicast_hops)); pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking, sizeof(nonblocking)); good = pgm_connect(_priv->socket, &err); if (!good) { qCritical() << "Could not connect socket: PGM Error: " << err->message; pgm_error_free(err); return false; } _opened = true; setupNotifiers(); pgm_freeaddrinfo(addrinfo); /* Prime the generation of SPM packets during the waiting period */ if(_priv->direction == PSOCK_WRITE) QTimer::singleShot(0, this, SLOT(handleNak())); return true; } void McastPGMSocket::setupNotifiers() { int recv_sock, repair_sock, pending_sock; char const* slotname = (_priv->direction == PSOCK_WRITE) ? SLOT(handleNak(int)) : SLOT(handleData(int)); struct pollfd pollin[10]; int in_nfds = 10; pgm_poll_info(_priv->socket, pollin, &in_nfds, POLLIN); for(int i = 0; i < in_nfds; i++) { QSocketNotifier* notif = new QSocketNotifier(pollin[i].fd, QSocketNotifier::Read, this); _priv->_notifs.append(notif); connect(notif, SIGNAL(activated(int)), this, slotname); } if(_priv->direction == PSOCK_WRITE) { struct pollfd pfd; int nfds = 1; pgm_poll_info(_priv->socket, &pfd, &nfds, POLLOUT); _priv->send_notif = new QSocketNotifier(pfd.fd, QSocketNotifier::Write, this); connect(_priv->send_notif, SIGNAL(activated(int)), this, SLOT(canSend())); } } void McastPGMSocket::handleNak(int fd) { qDebug() << "handleNak(" << fd << ")"; QSocketNotifier* notif = _priv->notifier_for(fd); notif->setEnabled(false); if (_shutdownTimer) { _shutdownTimer->start(_shutdown_timeout); qDebug() << "Started shutdown timer"; } handleNak(); notif->setEnabled(true); } void McastPGMSocket::handleNak() { if (_finished) return; // QTimer::singleShot(1000, this, SLOT(handleNakTimeout())); // to handle NAKs in OpenPGM, we need to pgm_recv: char buf[4096]; pgm_error_t* err = 0; int status; // while we don't block: do { status = pgm_recv(_priv->socket, buf, sizeof(buf), MSG_DONTWAIT, 0, &err); if(status == PGM_IO_STATUS_TIMER_PENDING) { struct timeval tv; socklen_t size = sizeof(tv); pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &size); const long usecs = tv.tv_sec * 1000000 + tv.tv_usec; int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); if(msecs == 0) msecs = 1; qDebug() << " timer pending: " << usecs << "us (rounded to " << msecs << "ms)"; _nakTimeout->start(msecs); break; } else if(status == PGM_IO_STATUS_RATE_LIMITED) { struct timeval tv; socklen_t size = sizeof(tv); pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size); int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); if(msecs == 0) msecs = 1; qDebug() << " rate limited: " << msecs << "ms"; _nakTimeout->start(msecs); break; } else if(status == PGM_IO_STATUS_WOULD_BLOCK) { qDebug() << " wouldblock"; break; } else { if(err) { qCritical() << "Could not handle NAKs: PGM Error: " << err->message; pgm_error_free(err); err = 0; } } } while (true); } void McastPGMSocket::handleNakTimeout() { qDebug() << "handleNakTimeout()"; handleNak(); } void McastPGMSocket::handleData(int fd) { // need to guard against destruction in finish() via signals/slots QPointer notif(_priv->notifier_for(fd)); notif->setEnabled(false); handleData(); if (notif) notif->setEnabled(true); } void McastPGMSocket::handleData() { if (_finished) { return; } int status; do { char buf[4096]; size_t size; pgm_error_t* err = 0; status = pgm_recv(_priv->socket, buf, sizeof(buf), MSG_DONTWAIT, &size, &err); if (status == PGM_IO_STATUS_NORMAL) { qDebug() << " normally received"; if(size > 0) { QByteArray bytes(buf, size); emit receivedPacket(bytes); } } else if (status == PGM_IO_STATUS_WOULD_BLOCK) { qDebug() << " would block"; // nothing more to do this time break; } else if (status == PGM_IO_STATUS_TIMER_PENDING) { struct timeval tv; socklen_t size = sizeof(tv); pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &size); const long usecs = tv.tv_sec * 1000000 + tv.tv_usec; int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); if(msecs == 0) msecs = 1; qDebug() << " timer pending: " << usecs << "us (rounded to " << msecs << "ms)"; _dataTimeout->start(msecs); break; } else if (status == PGM_IO_STATUS_RATE_LIMITED) { struct timeval tv; socklen_t size = sizeof(tv); pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size); int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); if(msecs == 0) msecs = 1; qDebug() << " rate limit pending: " << msecs << "ms"; _dataTimeout->start(msecs); break; } else if (status == PGM_IO_STATUS_RESET) { qDebug() << " connection reset"; emit connectionReset(); qCritical() << "Connection Reset: PGM Error: " << (err ? err->message : "(null)"); pgm_error_free(err); break; } else if (status == PGM_IO_STATUS_FIN) { qDebug() << " connection finished"; emit connectionFinished(); break; } else { if(err) { qCritical() << "Could not read packet: PGM Error: " << (err ? err->message: "(null)"); break; } } // the socket might have been closed from under us if (!_priv->socket) break; } while (true); } void McastPGMSocket::handleDataTimeout() { qDebug() << "handleDataTimeout()"; handleData(); } void McastPGMSocket::canSend() { if (_finished) return; if (_priv->send_notif) { _priv->send_notif->setEnabled(false); } if(_q.isEmpty()) { emit readyToSend(); } else { QByteArray const packet(_q.head()); int status; status = pgm_send(_priv->socket, packet.constData(), packet.size(), 0); if(status == PGM_IO_STATUS_NORMAL) { _q.dequeue(); if(!_q.isEmpty()) { _priv->send_notif->setEnabled(true); } else { emit readyToSend(); } } else if(status == PGM_IO_STATUS_WOULD_BLOCK) { _priv->send_notif->setEnabled(true); } else if(status == PGM_IO_STATUS_CONGESTION) { qDebug() << " congested..."; // wait a short time (10ms?) _sendTimeout->start(10); } else if(status == PGM_IO_STATUS_RATE_LIMITED) { struct timeval tv; socklen_t size = sizeof(tv); pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size); int msecs = (tv.tv_sec * 1000) + ((tv.tv_usec + 999) / 1000); if(msecs == 0) msecs = 1; qDebug() << " rate_limited, waiting" << msecs << "ms"; _sendTimeout->start(msecs); } else { qCritical() << "Unhandled status in canSend():" << status; } } if (_shutdownTimer) _shutdownTimer->start(_shutdown_timeout); } void McastPGMSocket::sendPacket(QByteArray const& bytes) { if(_shutdownTimer) { qCritical() << "Logic error: sendPacket() after shutdown()"; } _q.enqueue(bytes); _priv->send_notif->setEnabled(true); } void McastPGMSocket::finish() { if (_finished) { return; } qDebug() << "finish()"; Q_FOREACH(QSocketNotifier* notif, _priv->_notifs) { notif->setEnabled(false); delete notif; } _priv->_notifs.clear(); if(_priv->send_notif) { delete _priv->send_notif; _priv->send_notif = 0; } if (_priv->socket) { pgm_close(_priv->socket, 1); _priv->socket = 0; } _finished = true; emit connectionFinished(); qDebug() << "Socket finished"; } bool McastPGMSocket::finished() const { return _finished; } bool McastPGMSocket::isOpen() const { return _opened && !_finished; } void McastPGMSocket::shutdown(int interval) { if(_priv->direction == PSOCK_READ) return; _shutdown_timeout = interval; _shutdownTimer = new QTimer(this); connect(_shutdownTimer, SIGNAL(timeout()), this, SLOT(finish())); if (_q.isEmpty()) { _shutdownTimer->start(_shutdown_timeout); qDebug() << "Started shutdown timer"; } }