diff options
Diffstat (limited to 'src/net/mcast/McastPGMSocket.cpp')
| -rw-r--r-- | src/net/mcast/McastPGMSocket.cpp | 601 |
1 files changed, 601 insertions, 0 deletions
diff --git a/src/net/mcast/McastPGMSocket.cpp b/src/net/mcast/McastPGMSocket.cpp new file mode 100644 index 0000000..0d6b694 --- /dev/null +++ b/src/net/mcast/McastPGMSocket.cpp @@ -0,0 +1,601 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastPGMSocket.cpp +# - wrap OpenPGM Sockets in a nicer interface -- implementation +# ----------------------------------------------------------------------------- +*/ + +#include <cstdlib> + +#include <QByteArray> +#include <QtDebug> +#include <QPointer> +#include <QSocketNotifier> +#include <QTimer> + +#include <pgm/pgm.h> +// #include <stdint.h> +// #define SIZE_MAX UINT64_MAX +// #include <impl/socket.h> +// pgm redefined bool to int. Undo that. +#undef bool + +#include <sys/poll.h> + +#include "McastPGMSocket.h" + +class McastPGMSocket_priv +{ +public: + McastPGMSocket_priv() : + socket(0), + recv_notif(0), + repair_notif(0), + pending_notif(0), + send_notif(0) + { + } + ~McastPGMSocket_priv() + { + if (socket) + pgm_close(socket, 0); + if (recv_notif) + delete recv_notif; + if (repair_notif) + delete repair_notif; + if (pending_notif) + delete pending_notif; + if (send_notif) + delete send_notif; + } + + pgm_sock_t* socket; + McastPGMSocket::Direction direction; + QSocketNotifier* recv_notif; + QSocketNotifier* repair_notif; + QSocketNotifier* pending_notif; + QSocketNotifier* send_notif; + + QSocketNotifier* notifier_for(int fd) { + if (recv_notif && (fd == recv_notif->socket())) + { + return recv_notif; + } + else if (repair_notif && (fd == repair_notif->socket())) + { + return repair_notif; + } + else if (pending_notif && (fd == pending_notif->socket())) + { + return pending_notif; + } + return 0; + } +}; + +static void _ensurePGMInited() +{ + if (!pgm_supported()) + { + pgm_error_t* err; + int good = pgm_init(&err); + if (!good) + { + qCritical() << "Could not init OpenPGM library: PGM Error: " << (err->message ? err->message : "(null)"); + std::exit(1); + } + } +} + +McastPGMSocket::McastPGMSocket(QObject* parent) : + QObject(parent), + _priv(new McastPGMSocket_priv), + _finished(false), + _nakTimeout(new QTimer()), + _dataTimeout(new QTimer()), + _sendTimeout(new QTimer()) +{ + _ensurePGMInited(); + + _nakTimeout->setSingleShot(true); + _dataTimeout->setSingleShot(true); + _sendTimeout->setSingleShot(true); + + connect(_nakTimeout, SIGNAL(timeout()), this, SLOT(handleNakTimeout())); + connect(_dataTimeout, SIGNAL(timeout()), this, SLOT(handleDataTimeout())); + connect(_sendTimeout, SIGNAL(timeout()), this, SLOT(canSend())); +} + +McastPGMSocket::~McastPGMSocket() +{ + delete _priv; + delete _nakTimeout; + delete _dataTimeout; + delete _sendTimeout; +} + +bool McastPGMSocket::open(McastConfiguration const* config, Direction direction) +{ + _priv->direction = direction; + + pgm_error_t* err = 0; + int good; + + pgm_addrinfo_t* addrinfo; + // parse the address string + good = pgm_getaddrinfo(config->multicastAddress().toLatin1().constData(), + 0, &addrinfo, &err); + if (!good) + { + qCritical() << "Could not parse address info: PGM Error: " + << err->message; + } + + sa_family_t family = addrinfo->ai_send_addrs[0].gsr_group.ss_family; + + good + = pgm_socket(&_priv->socket, family, SOCK_SEQPACKET, IPPROTO_PGM, &err); + if (!good) + { + qCritical() << "Could not open socket: PGM Error: " << err->message; + pgm_error_free(err); + return false; + } + + unsigned const ambient_spm = 4096 * 1000; // every four seconds (approx.) + + // set parameters + if (direction == PSOCK_WRITE) + { + // write-only socket + const int send_only = 1, + spm_heartbeat[] = + { 16 * 1000, 16 * 1000, 16 * 1000, 16 * 1000, 32 * 1000, 64 * 1000, 128 + * 1000, 256 * 1000, 512 * 1000, 1024 * 1000, 2048 * 1000, 4096 + * 1000 }, + max_rate = config->multicastRate(), + max_window = config->multicastWinSize(); + // const int max_window_sqns = 3000; + + pgm_setsockopt(_priv->socket, PGM_SEND_ONLY, &send_only, + sizeof(send_only)); + + // SPM messages + pgm_setsockopt(_priv->socket, PGM_AMBIENT_SPM, &ambient_spm, sizeof(ambient_spm)); + pgm_setsockopt(_priv->socket, PGM_HEARTBEAT_SPM, &spm_heartbeat, sizeof(spm_heartbeat)); + + // Transmit window + pgm_setsockopt(_priv->socket, PGM_TXW_MAX_RTE, &max_rate, sizeof(max_rate)); + // pgm_setsockopt(_priv->socket, PGM_TXW_SECS, &max_window, sizeof(max_window)); + pgm_setsockopt(_priv->socket, PGM_TXW_SQNS, &max_window, sizeof(max_window)); + } + else + { + // readonly + const int recv_only = 1, + passive = 0, + max_window = config->multicastWinSize(), + max_winsqns = 0, + peer_expiry = ambient_spm * 5, + spmr_expiry = 250 * 1000, + nak_bo_ivl = 500 * 1000, + nak_rpt_ivl = 500 * 1000, + nak_rdata_ivl = 2000 * 1000, + nak_data_retries = 50, + nak_ncf_retries = 50; + pgm_setsockopt(_priv->socket, PGM_RECV_ONLY, &recv_only, sizeof(recv_only)); + pgm_setsockopt(_priv->socket, PGM_PASSIVE, &passive, sizeof(passive)); + // pgm_setsockopt(_priv->socket, PGM_RXW_SECS, &max_window, sizeof(max_window)); + pgm_setsockopt(_priv->socket, PGM_RXW_SQNS, &max_window, sizeof(max_window)); + pgm_setsockopt(_priv->socket, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry)); + pgm_setsockopt(_priv->socket, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry)); + pgm_setsockopt(_priv->socket, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl)); + pgm_setsockopt(_priv->socket, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl)); + pgm_setsockopt(_priv->socket, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl)); + pgm_setsockopt(_priv->socket, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries)); + pgm_setsockopt(_priv->socket, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries)); + } + + // MTU + int const mtu = config->multicastMTU(); + pgm_setsockopt(_priv->socket, PGM_MTU, &mtu, sizeof(mtu)); + + pgm_sockaddr_t addr; + addr.sa_addr.sport = config->multicastSPort(); + addr.sa_port = config->multicastDPort(); + good = pgm_gsi_create_from_hostname(&addr.sa_addr.gsi, &err); + if (!good) + { + qCritical() << "Could not generate a GSI: PGM Error: " << err->message; + pgm_error_free(err); + return false; + } + + good = pgm_bind3(_priv->socket, &addr, sizeof(addr), (struct group_req*)&addrinfo->ai_send_addrs[0], sizeof(struct group_req), (struct group_req*)&addrinfo->ai_recv_addrs[0], sizeof(struct group_req), &err); + if (!good) + { + qCritical() << "Could not bind socket: PGM Error: " << err->message; + pgm_error_free(err); + return false; + } + + // qDebug() << "Max APDU is " << _priv->socket->max_apdu; + // qDebug() << "Max TPDU is " << _priv->socket->max_tpdu; + // qDebug() << "Max TSDU Fragment is " << _priv->socket->max_tsdu_fragment; + // qDebug() << "TXW_SQNS is " << _priv->socket->txw_sqns; + + // join the group + for (unsigned i = 0; i < addrinfo->ai_recv_addrs_len; i++) + { + pgm_setsockopt(_priv->socket, PGM_JOIN_GROUP, + &addrinfo->ai_recv_addrs[i], sizeof(struct group_req)); + } + + // set send address + pgm_setsockopt(_priv->socket, PGM_SEND_GROUP, &addrinfo->ai_send_addrs[0], + sizeof(struct group_req)); + + // IP parameters + const int nonblocking = 1, multicast_loop = 0, multicast_hops = 16; + pgm_setsockopt(_priv->socket, PGM_MULTICAST_LOOP, &multicast_loop, + sizeof(multicast_loop)); + pgm_setsockopt(_priv->socket, PGM_MULTICAST_HOPS, &multicast_hops, + sizeof(multicast_hops)); + pgm_setsockopt(_priv->socket, PGM_NOBLOCK, &nonblocking, + sizeof(nonblocking)); + + good = pgm_connect(_priv->socket, &err); + if (!good) + { + qCritical() << "Could not connect socket: PGM Error: " << err->message; + pgm_error_free(err); + return false; + } + + setupNotifiers(); + + pgm_freeaddrinfo(addrinfo); + + return true; +} + +void McastPGMSocket::setupNotifiers() +{ + int recv_sock, repair_sock, pending_sock; + char const* slotname = (_priv->direction == PSOCK_WRITE) ? SLOT(handleNak(int)) : SLOT(handleData(int)); + + pgm_getsockopt(_priv->socket, PGM_RECV_SOCK, &recv_sock, sizeof(recv_sock)); + _priv->recv_notif = new QSocketNotifier(recv_sock, QSocketNotifier::Read, + this); + connect(_priv->recv_notif, SIGNAL(activated(int)), this, slotname); + + pgm_getsockopt(_priv->socket, PGM_REPAIR_SOCK, &repair_sock, sizeof(repair_sock)); + _priv->repair_notif = new QSocketNotifier(repair_sock, + QSocketNotifier::Read, this); + connect(_priv->repair_notif, SIGNAL(activated(int)), this, slotname); + + pgm_getsockopt(_priv->socket, PGM_PENDING_SOCK, &pending_sock, sizeof(pending_sock)); + _priv->pending_notif = new QSocketNotifier(pending_sock, + QSocketNotifier::Read, this); + connect(_priv->pending_notif, SIGNAL(activated(int)), this, slotname); + + if(_priv->direction == PSOCK_WRITE) + { + struct pollfd pfd; + int nfds = 1; + pgm_poll_info(_priv->socket, &pfd, &nfds, POLLOUT); + _priv->send_notif = new QSocketNotifier(pfd.fd, QSocketNotifier::Write, this); + connect(_priv->send_notif, SIGNAL(activated(int)), this, SLOT(canSend())); + } +} + +void McastPGMSocket::handleNak(int fd) +{ + qDebug() << "handleNak(int)"; + + QSocketNotifier* notif = _priv->notifier_for(fd); + notif->setEnabled(false); + + handleNak(); + + notif->setEnabled(true); +} + +void McastPGMSocket::handleNak() +{ + if (_finished) + return; + + qDebug() << "handleNak()"; + + QTimer::singleShot(1000, this, SLOT(handleNakTimeout())); + + // to handle NAKs in OpenPGM, we need to pgm_recv: + char buf[4096]; + pgm_error_t* err = 0; + + int status; + // while we don't block: + do + { + status = pgm_recv(_priv->socket, buf, sizeof(buf), MSG_DONTWAIT, 0, &err); + + if(status == PGM_IO_STATUS_TIMER_PENDING) + { + struct timeval tv; + pgm_getsockopt(_priv->socket, PGM_TIME_REMAIN, &tv, sizeof(tv)); + const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + qDebug() << " timer pending: " << msecs << "ms"; + _nakTimeout->start(msecs); + break; + } + else if(status == PGM_IO_STATUS_RATE_LIMITED) + { + struct timeval tv; + pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, sizeof(tv)); + const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + qDebug() << " rate limited: " << msecs << "ms"; + _nakTimeout->start(msecs); + break; + } + else if(status == PGM_IO_STATUS_WOULD_BLOCK) + { + qDebug() << " wouldblock"; + break; + } + else + { + if(err) + { + qCritical() << "Could not handle NAKs: PGM Error: " << err->message; + pgm_error_free(err); + err = 0; + } + } + } + while (true); +} + +void McastPGMSocket::handleNakTimeout() +{ + qDebug() << "handleNakTimeout()"; + + handleNak(); +} + +void McastPGMSocket::handleData(int fd) +{ + // need to guard against destruction in finish() via signals/slots + QPointer<QSocketNotifier> notif(_priv->notifier_for(fd)); + notif->setEnabled(false); + + handleData(); + + if (notif) + notif->setEnabled(true); +} + +void McastPGMSocket::handleData() +{ + qDebug() << "handleData()"; + + if (_finished) { + qDebug() << " finished!"; + return; + } + + int status; + do + { + char buf[4096]; + size_t size; + pgm_error_t* err; + + status = pgm_recv(_priv->socket, buf, sizeof(buf), MSG_DONTWAIT, &size, &err); + + if (status == PGM_IO_STATUS_NORMAL) + { + qDebug() << " normally received"; + if(size > 0) + { + QByteArray bytes(buf, size); + emit receivedPacket(bytes); + } + } + else if (status == PGM_IO_STATUS_WOULD_BLOCK) + { + qDebug() << " would block"; + // nothing more to do this time + break; + } + else if (status == PGM_IO_STATUS_TIMER_PENDING) + { + struct timeval tv; + pgm_getsockopt(_priv->socket, PGM_TIME_REMAIN, &tv, sizeof(tv)); + const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + qDebug() << " timer pending: " << msecs << "ms"; + _dataTimeout->start(msecs); + break; + } + else if (status == PGM_IO_STATUS_RATE_LIMITED) + { + struct timeval tv; + pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, sizeof(tv)); + const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + qDebug() << " rate limit pending: " << msecs << "ms"; + _dataTimeout->start(msecs); + break; + } + else if (status == PGM_IO_STATUS_RESET) + { + qDebug() << " connection reset"; + emit connectionReset(); + qCritical() << "Connection Reset: PGM Error: " << (err ? err->message : "(null)"); + break; + } + else if (status == PGM_IO_STATUS_FIN) + { + qDebug() << " connection finished"; + emit connectionFinished(); + break; + } + else + { + if(err) + { + qCritical() << "Could not read packet: PGM Error: " << (err ? err->message: "(null)"); + break; + } + } + + // the socket might have been closed from under us + if (!_priv->socket) + break; + } + while (true); +} + +void McastPGMSocket::handleDataTimeout() +{ + qDebug() << "handleDataTimeout()"; + + handleData(); +} + +void McastPGMSocket::canSend() +{ + if (_finished) + return; + + qDebug() << "canSend()"; + + if (_priv->send_notif) + { + _priv->send_notif->setEnabled(false); + } + + bool reenable = true; + + while (!_q.isEmpty()) + { + int status; + QByteArray const nextPacket(_q.head()); + status = pgm_send(_priv->socket, nextPacket.constData(), nextPacket.size(), 0); + if (status == PGM_IO_STATUS_ERROR || status == PGM_IO_STATUS_RESET) + { + qCritical() << "Could not send packet: PGM Error."; + continue; + } + else if (status == PGM_IO_STATUS_WOULD_BLOCK) + { + qDebug() << " would block"; + break; + } + else if (status == PGM_IO_STATUS_RATE_LIMITED) + { + struct timeval tv; + pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, sizeof(tv)); + const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + qDebug() << " rate limited:" << msecs << "ms"; + _sendTimeout->start((msecs > 0) ? msecs : 1); + reenable = false; + break; + } + else if (status == PGM_IO_STATUS_NORMAL) + { + qDebug() << " sent"; + _q.dequeue(); + continue; + } + else + { + qCritical() << "Unhandled condition in McastPGMSocket::canSend()"; + } + } + + if (_priv->send_notif && reenable) + { + emit readyToSend(); + + qDebug() << " reenable notifier"; + _priv->send_notif->setEnabled(true); + } +} + +void McastPGMSocket::sendPacket(QByteArray const& bytes) +{ + if(_q.isEmpty()) + { + int status = pgm_send(_priv->socket, bytes.constData(), bytes.size(), 0); + + if (status == PGM_IO_STATUS_ERROR || status == PGM_IO_STATUS_RESET) + { + qCritical() << "Could not send packet: PGM Error."; + return; + } + else if (status == PGM_IO_STATUS_WOULD_BLOCK) + { + _q.enqueue(bytes); + } + else if (status == PGM_IO_STATUS_RATE_LIMITED) + { + _q.enqueue(bytes); + struct timeval tv; + pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, sizeof(tv)); + _dataTimeout->start((tv.tv_sec * 1000) + (tv.tv_usec / 1000)); + } + else if (status == PGM_IO_STATUS_NORMAL) + { + return; + } + else + { + qCritical() << "Unhandled condition in McastPGMSocket::sendPacket()"; + } + } else { + _q.enqueue(bytes); + } +} + +void McastPGMSocket::finish() +{ + if(_priv->pending_notif) + { + delete _priv->pending_notif; + _priv->pending_notif = 0; + } + if(_priv->recv_notif) + { + delete _priv->recv_notif; + _priv->recv_notif = 0; + } + if(_priv->repair_notif) + { + delete _priv->repair_notif; + _priv->repair_notif = 0; + } + if(_priv->send_notif) + { + delete _priv->send_notif; + _priv->send_notif = 0; + } + + pgm_close(_priv->socket, 1); + _priv->socket = 0; + + _finished = true; +} + +bool McastPGMSocket::finished() const +{ + return _finished; +} |
