/* # Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg # # This program is free software distributed under the GPL version 2. # See http://openslx.org/COPYING # # If you have any feedback please consult http://openslx.org/feedback and # send your suggestions, praise, or complaints to feedback@openslx.org # # General information about OpenSLX can be found at http://openslx.org/ # ----------------------------------------------------------------------------- # src/net/mcast/McastPGMSocket.cpp # - wrap OpenPGM Sockets in a nicer interface -- implementation # ----------------------------------------------------------------------------- */ #include #include #include #include #include #include #include #include #include #include #include "McastPGMSocket.h" using namespace std; class McastPGMSocket_priv { public: McastPGMSocket_priv() : socket(0), send_notif(0) { } ~McastPGMSocket_priv() { if (socket) pgm_close(socket, 0); Q_FOREACH(QSocketNotifier* notif, _notifs) { delete notif; } if (send_notif) delete send_notif; } pgm_sock_t* socket; McastPGMSocket::Direction direction; QSocketNotifier* send_notif; QList _notifs; QSocketNotifier* notifier_for(int fd) { Q_FOREACH(QSocketNotifier* notif, _notifs) { if(notif->socket() == fd) { return notif; } } return 0; } }; static void _ensurePGMInited() { if (!pgm_supported()) { pgm_error_t* err; int good = pgm_init(&err); if (!good) { qCritical() << "Could not init OpenPGM library: PGM Error: " << (err->message ? err->message : "(null)"); std::exit(1); } } } McastPGMSocket::McastPGMSocket(QObject* parent) : QObject(parent), _priv(new McastPGMSocket_priv), _finished(false), _nakTimeout(new QTimer()), _dataTimeout(new QTimer()), _sendTimeout(new QTimer()), _shutdownTimer(0), _shutdown_timeout(0) { _ensurePGMInited(); _nakTimeout->setSingleShot(true); _dataTimeout->setSingleShot(true); _sendTimeout->setSingleShot(true); connect(_nakTimeout, SIGNAL(timeout()), this, SLOT(handleNakTimeout())); connect(_dataTimeout, SIGNAL(timeout()), this, SLOT(handleDataTimeout())); connect(_sendTimeout, SIGNAL(timeout()), this, SLOT(canSend())); } McastPGMSocket::~McastPGMSocket() { delete _priv; delete _nakTimeout; delete _dataTimeout; delete _sendTimeout; } bool McastPGMSocket::open(McastConfiguration const* config, Direction direction) { _priv->direction = direction; pgm_error_t* err = 0; int good; pgm_addrinfo_t* addrinfo; // parse the address string good = pgm_getaddrinfo((config->multicastInterface() + ";" + config->multicastAddress()).toLatin1().constData(), 0, &addrinfo, &err); if (!good) { qCritical() << "Could not parse address info: PGM Error: " << err->message; } sa_family_t family = addrinfo->ai_send_addrs[0].gsr_group.ss_family; if(config->multicastUseUDP()) { good = pgm_socket(&_priv->socket, family, SOCK_SEQPACKET, IPPROTO_UDP, &err); } else { good = pgm_socket(&_priv->socket, family, SOCK_SEQPACKET, IPPROTO_PGM, &err); } if (!good) { qCritical() << "Could not open socket: PGM Error: " << err->message; pgm_error_free(err); return false; } unsigned const ambient_spm = 4096 * 1000; // every four seconds (approx.) // set parameters if (direction == PSOCK_WRITE) { // write-only socket const int send_only = 1, spm_heartbeat[] = { 256 * 1000, 512 * 1000, 1024 * 1000, 2048 * 1000, 4096 * 1000 }, max_rate = config->multicastRate(), max_window = config->multicastWinSize(); // const int max_window_sqns = 3000; pgm_setsockopt(_priv->socket, PGM_SEND_ONLY, &send_only, sizeof(send_only)); // SPM messages pgm_setsockopt(_priv->socket, PGM_AMBIENT_SPM, &ambient_spm, sizeof(ambient_spm)); pgm_setsockopt(_priv->socket, PGM_HEARTBEAT_SPM, &spm_heartbeat, sizeof(spm_heartbeat)); // Transmit window pgm_setsockopt(_priv->socket, PGM_TXW_MAX_RTE, &max_rate, sizeof(max_rate)); pgm_setsockopt(_priv->socket, PGM_TXW_SECS, &max_window, sizeof(max_window)); // pgm_setsockopt(_priv->socket, PGM_TXW_SQNS, &max_window, sizeof(max_window)); } else { // readonly const int recv_only = 1, passive = 0, max_window = config->multicastWinSize(), max_rate = config->multicastRate(), peer_expiry = ambient_spm * 5, spmr_expiry = 250 * 1000, nak_bo_ivl = 100 * 1000, nak_rpt_ivl = 400 * 1000, nak_rdata_ivl = 400 * 1000, nak_data_retries = 50, nak_ncf_retries = 50, no_rxw_sqns = 0; pgm_setsockopt(_priv->socket, PGM_RECV_ONLY, &recv_only, sizeof(recv_only)); pgm_setsockopt(_priv->socket, PGM_PASSIVE, &passive, sizeof(passive)); pgm_setsockopt(_priv->socket, PGM_RXW_MAX_RTE, &max_rate, sizeof(max_rate)); pgm_setsockopt(_priv->socket, PGM_RXW_SECS, &max_window, sizeof(max_window)); pgm_setsockopt(_priv->socket, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry)); pgm_setsockopt(_priv->socket, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry)); pgm_setsockopt(_priv->socket, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl)); pgm_setsockopt(_priv->socket, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl)); pgm_setsockopt(_priv->socket, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl)); pgm_setsockopt(_priv->socket, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries)); pgm_setsockopt(_priv->socket, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries)); pgm_setsockopt(_priv->socket, PGM_RXW_SQNS, &no_rxw_sqns, sizeof(no_rxw_sqns)); } // MTU int const mtu = config->multicastMTU(); pgm_setsockopt(_priv->socket, PGM_MTU, &mtu, sizeof(mtu)); pgm_sockaddr_t addr; addr.sa_addr.sport = config->multicastSPort(); addr.sa_port = config->multicastDPort(); good = pgm_gsi_create_from_hostname(&addr.sa_addr.gsi, &err); if (!good) { qCritical() << "Could not generate a GSI: PGM Error: " << err->message; pgm_error_free(err); return false; } struct pgm_interface_req_t ifreq; ifreq.ir_interface = addrinfo->ai_send_addrs[0].gsr_interface; ifreq.ir_scope_id = 0; if (AF_INET6 == family) { ifreq.ir_scope_id = ((struct sockaddr_in6*)&addrinfo->ai_send_addrs[0])->sin6_scope_id; } // UDP Encapsulation if(config->multicastUseUDP()) { const int uport = config->multicastUDPUPort(); const int mport = config->multicastUDPMPort(); pgm_setsockopt(_priv->socket, PGM_UDP_ENCAP_MCAST_PORT, &mport, sizeof(mport)); pgm_setsockopt(_priv->socket, PGM_UDP_ENCAP_UCAST_PORT, &uport, sizeof(uport)); } good = pgm_bind3(_priv->socket, &addr, sizeof(addr), &ifreq , sizeof(ifreq), &ifreq, sizeof(ifreq), &err); if (!good) { qCritical() << "Could not bind socket: PGM Error: " << err->message; pgm_error_free(err); return false; } // qDebug() << "Max APDU is " << _priv->socket->max_apdu; // qDebug() << "Max TPDU is " << _priv->socket->max_tpdu; // qDebug() << "Max TSDU Fragment is " << _priv->socket->max_tsdu_fragment; // qDebug() << "TXW_SQNS is " << _priv->socket->txw_sqns; // join the group for (unsigned i = 0; i < addrinfo->ai_recv_addrs_len; i++) { pgm_setsockopt(_priv->socket, PGM_JOIN_GROUP, &addrinfo->ai_recv_addrs[i], sizeof(struct group_req)); } // set send address pgm_setsockopt(_priv->socket, PGM_SEND_GROUP, &addrinfo->ai_send_addrs[0], sizeof(struct group_req)); // IP parameters const int nonblocking = 1, multicast_loop = 0, multicast_hops = 16; pgm_setsockopt(_priv->socket, PGM_MULTICAST_LOOP, &multicast_loop, sizeof(multicast_loop)); pgm_setsockopt(_priv->socket, PGM_MULTICAST_HOPS, &multicast_hops, sizeof(multicast_hops)); pgm_setsockopt(_priv->socket, PGM_NOBLOCK, &nonblocking, sizeof(nonblocking)); good = pgm_connect(_priv->socket, &err); if (!good) { qCritical() << "Could not connect socket: PGM Error: " << err->message; pgm_error_free(err); return false; } setupNotifiers(); pgm_freeaddrinfo(addrinfo); return true; } void McastPGMSocket::setupNotifiers() { int recv_sock, repair_sock, pending_sock; char const* slotname = (_priv->direction == PSOCK_WRITE) ? SLOT(handleNak(int)) : SLOT(handleData(int)); struct pollfd pollin[10]; int in_nfds = 10; pgm_poll_info(_priv->socket, pollin, &in_nfds, POLLIN); for(int i = 0; i < in_nfds; i++) { QSocketNotifier* notif = new QSocketNotifier(pollin[i].fd, QSocketNotifier::Read, this); _priv->_notifs.append(notif); connect(notif, SIGNAL(activated(int)), this, slotname); } if(_priv->direction == PSOCK_WRITE) { struct pollfd pfd; int nfds = 1; pgm_poll_info(_priv->socket, &pfd, &nfds, POLLOUT); _priv->send_notif = new QSocketNotifier(pfd.fd, QSocketNotifier::Write, this); connect(_priv->send_notif, SIGNAL(activated(int)), this, SLOT(canSend())); } } void McastPGMSocket::handleNak(int fd) { qDebug() << "handleNak(" << fd << ")"; QSocketNotifier* notif = _priv->notifier_for(fd); notif->setEnabled(false); if (_shutdownTimer) { _shutdownTimer->start(_shutdown_timeout); qDebug() << "Started shutdown timer"; } handleNak(); notif->setEnabled(true); } void McastPGMSocket::handleNak() { if (_finished) return; qDebug() << "handleNak()"; // QTimer::singleShot(1000, this, SLOT(handleNakTimeout())); // to handle NAKs in OpenPGM, we need to pgm_recv: char buf[4096]; pgm_error_t* err = 0; int status; // while we don't block: do { status = pgm_recv(_priv->socket, buf, sizeof(buf), MSG_DONTWAIT, 0, &err); if(status == PGM_IO_STATUS_TIMER_PENDING) { struct timeval tv; socklen_t size = sizeof(tv); pgm_getsockopt(_priv->socket, PGM_TIME_REMAIN, &tv, &size); const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); qDebug() << " timer pending: " << msecs << "ms"; _nakTimeout->start(msecs); break; } else if(status == PGM_IO_STATUS_RATE_LIMITED) { struct timeval tv; socklen_t size = sizeof(tv); pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, &size); const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); qDebug() << " rate limited: " << msecs << "ms"; _nakTimeout->start(msecs); break; } else if(status == PGM_IO_STATUS_WOULD_BLOCK) { qDebug() << " wouldblock"; break; } else { if(err) { qCritical() << "Could not handle NAKs: PGM Error: " << err->message; pgm_error_free(err); err = 0; } } } while (true); } void McastPGMSocket::handleNakTimeout() { qDebug() << "handleNakTimeout()"; handleNak(); } void McastPGMSocket::handleData(int fd) { // need to guard against destruction in finish() via signals/slots QPointer notif(_priv->notifier_for(fd)); notif->setEnabled(false); handleData(); if (notif) notif->setEnabled(true); } void McastPGMSocket::handleData() { qDebug() << "handleData()"; if (_finished) { qDebug() << " finished!"; return; } int status; do { char buf[4096]; size_t size; pgm_error_t* err; status = pgm_recv(_priv->socket, buf, sizeof(buf), MSG_DONTWAIT, &size, &err); if (status == PGM_IO_STATUS_NORMAL) { qDebug() << " normally received"; if(size > 0) { QByteArray bytes(buf, size); emit receivedPacket(bytes); } } else if (status == PGM_IO_STATUS_WOULD_BLOCK) { qDebug() << " would block"; // nothing more to do this time break; } else if (status == PGM_IO_STATUS_TIMER_PENDING) { struct timeval tv; socklen_t size = sizeof(tv); pgm_getsockopt(_priv->socket, PGM_TIME_REMAIN, &tv, &size); const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); qDebug() << " timer pending: " << msecs << "ms"; _dataTimeout->start(msecs); break; } else if (status == PGM_IO_STATUS_RATE_LIMITED) { struct timeval tv; socklen_t size = sizeof(tv); pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, &size); const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); qDebug() << " rate limit pending: " << msecs << "ms"; _dataTimeout->start(msecs); break; } else if (status == PGM_IO_STATUS_RESET) { qDebug() << " connection reset"; emit connectionReset(); qCritical() << "Connection Reset: PGM Error: " << (err ? err->message : "(null)"); break; } else if (status == PGM_IO_STATUS_FIN) { qDebug() << " connection finished"; emit connectionFinished(); break; } else { if(err) { qCritical() << "Could not read packet: PGM Error: " << (err ? err->message: "(null)"); break; } } // the socket might have been closed from under us if (!_priv->socket) break; } while (true); } void McastPGMSocket::handleDataTimeout() { qDebug() << "handleDataTimeout()"; handleData(); } void McastPGMSocket::canSend() { if (_finished) return; // qDebug() << "canSend()"; if (_priv->send_notif) { _priv->send_notif->setEnabled(false); } if(_q.isEmpty()) { emit readyToSend(); } else { QByteArray const packet(_q.head()); int status; status = pgm_send(_priv->socket, packet.constData(), packet.size(), 0); if(status == PGM_IO_STATUS_NORMAL) { _q.dequeue(); if(!_q.isEmpty()) { _priv->send_notif->setEnabled(true); } else { emit readyToSend(); } } else if(status == PGM_IO_STATUS_WOULD_BLOCK) { _priv->send_notif->setEnabled(true); } else if(status == PGM_IO_STATUS_RATE_LIMITED) { struct timeval tv; socklen_t size = sizeof(tv); pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, &size); int msecs = (tv.tv_sec * 1000) + ((tv.tv_sec + 999) / 1000); _sendTimeout->start(msecs); } else { qCritical() << "Unhandled status in canSend()"; } } if (_shutdownTimer) _shutdownTimer->start(_shutdown_timeout); } void McastPGMSocket::sendPacket(QByteArray const& bytes) { if(_shutdownTimer) { qCritical() << "Logic error: sendPacket() after shutdown()"; } _q.enqueue(bytes); _priv->send_notif->setEnabled(true); } void McastPGMSocket::finish() { qDebug() << "finish()"; Q_FOREACH(QSocketNotifier* notif, _priv->_notifs) { notif->setEnabled(false); delete notif; } _priv->_notifs.clear(); if(_priv->send_notif) { delete _priv->send_notif; _priv->send_notif = 0; } pgm_close(_priv->socket, 1); _priv->socket = 0; _finished = true; emit connectionFinished(); qDebug() << "Socket finished"; } bool McastPGMSocket::finished() const { return _finished; } void McastPGMSocket::shutdown(int interval) { if(_priv->direction == PSOCK_READ) return; _shutdown_timeout = interval; _shutdownTimer = new QTimer(this); connect(_shutdownTimer, SIGNAL(timeout()), this, SLOT(finish())); if (_q.isEmpty()) { _shutdownTimer->start(_shutdown_timeout); qDebug() << "Started shutdown timer"; } }