diff options
author | Sebastien Braun | 2010-10-03 16:14:44 +0200 |
---|---|---|
committer | Sebastien Braun | 2010-10-03 16:14:44 +0200 |
commit | ffee0868ef1341cfb7622821431cb73c52590962 (patch) | |
tree | bc96be65e0346ea25a8effb2118de59b08d54466 /src/net | |
parent | Add patch for OpenPGM to fix switch() fallthrough (diff) | |
download | pvs-ffee0868ef1341cfb7622821431cb73c52590962.tar.gz pvs-ffee0868ef1341cfb7622821431cb73c52590962.tar.xz pvs-ffee0868ef1341cfb7622821431cb73c52590962.zip |
Assorted Multicast Fixes:
- Upgrade bundled OpenPGM to SVN r1135
- Timing fixes: Make all rate-limited and timer-pending operation wait
for at least 1ms to avoid busy-waiting
- No distinction between sending and receiving sockets when setting
up socket options (Receivers need to be able to send anyway when
using PGMCC).
- Switch from fixed-rate transmission to using PGMCC for congestion
control.
- Remove some obnoxious debugging outputs
- Some white space fixes
- Introduce a short waiting time before actually starting file transmission
in order to allow enough SPM messages to be sent so that receivers
can initialize properly.
- Fix MCASTFTANNOUNCE message to include full file name instead of basename.
- Fix generateMcastTransferID in order to gather more random IDs. PVSGUI
may become confused if transfer IDs are reused.
- Properly dispose of clientFileReceiveDialog when multicast transfer is
finished.
- Properly display transfer size in clientFileReceiveDialog
Diffstat (limited to 'src/net')
-rw-r--r-- | src/net/mcast/CMakeLists.txt | 8 | ||||
-rw-r--r-- | src/net/mcast/McastPGMSocket.cpp | 206 | ||||
-rw-r--r-- | src/net/mcast/McastSender.cpp | 17 | ||||
-rw-r--r-- | src/net/mcast/McastSender.h | 1 | ||||
-rw-r--r-- | src/net/pvsOutgoingMulticastTransfer.cpp | 2 |
5 files changed, 142 insertions, 92 deletions
diff --git a/src/net/mcast/CMakeLists.txt b/src/net/mcast/CMakeLists.txt index 914c9d3..e92b090 100644 --- a/src/net/mcast/CMakeLists.txt +++ b/src/net/mcast/CMakeLists.txt @@ -1,15 +1,15 @@ INCLUDE(../../../OpenPGMConfig.cmake) ADD_DEFINITIONS( - ${LIBPGM_CFLAGS} + ${LIBPGM_CXXFLAGS} -D__STDC_CONSTANT_MACROS -D__STDC_LIMIT_MACROS ) # OpenPGM uses the C99 restrict keyword which g++ does not recognize: -IF(CMAKE_COMPILER_IS_GNUCXX) - ADD_DEFINITIONS(-Drestrict=__restrict__) -ENDIF(CMAKE_COMPILER_IS_GNUCXX) +#IF(CMAKE_COMPILER_IS_GNUCXX) +# ADD_DEFINITIONS(${LIBPGM_CXXFLAGS}) +#ENDIF(CMAKE_COMPILER_IS_GNUCXX) INCLUDE(${QT_USE_FILE}) diff --git a/src/net/mcast/McastPGMSocket.cpp b/src/net/mcast/McastPGMSocket.cpp index 7952f00..bf52bd7 100644 --- a/src/net/mcast/McastPGMSocket.cpp +++ b/src/net/mcast/McastPGMSocket.cpp @@ -148,67 +148,88 @@ bool McastPGMSocket::open(McastConfiguration const* config, Direction direction) return false; } - unsigned const ambient_spm = 4096 * 1000; // every four seconds (approx.) - - // set parameters - if (direction == PSOCK_WRITE) - { - // write-only socket - const int send_only = 1, - spm_heartbeat[] = - { 256 * 1000, - 512 * 1000, - 1024 * 1000, - 2048 * 1000, - 4096 * 1000 }, - max_rate = config->multicastRate(), - max_window = config->multicastWinSize(); - // const int max_window_sqns = 3000; - - pgm_setsockopt(_priv->socket, PGM_SEND_ONLY, &send_only, - sizeof(send_only)); - - // SPM messages - pgm_setsockopt(_priv->socket, PGM_AMBIENT_SPM, &ambient_spm, sizeof(ambient_spm)); - pgm_setsockopt(_priv->socket, PGM_HEARTBEAT_SPM, &spm_heartbeat, sizeof(spm_heartbeat)); - - // Transmit window - pgm_setsockopt(_priv->socket, PGM_TXW_MAX_RTE, &max_rate, sizeof(max_rate)); - pgm_setsockopt(_priv->socket, PGM_TXW_SECS, &max_window, sizeof(max_window)); - // pgm_setsockopt(_priv->socket, PGM_TXW_SQNS, &max_window, sizeof(max_window)); - } - else - { - // readonly - const int recv_only = 1, - passive = 0, - max_window = config->multicastWinSize(), - max_rate = config->multicastRate(), - peer_expiry = ambient_spm * 5, - spmr_expiry = 250 * 1000, - nak_bo_ivl = 100 * 1000, - nak_rpt_ivl = 400 * 1000, - nak_rdata_ivl = 400 * 1000, + unsigned const ambient_spm = 2000 * 1000; // every one hundred milliseconds (approx.) + + /* Options for sending data */ + const int spm_heartbeat[] = + { 512 * 1000, + 1024 * 1000, + 2048 * 1000, + 4096 * 1000 }, + max_rate = 0, + max_window = config->multicastWinSize() * config->multicastRate() / config->multicastMTU(); + // const int max_window_sqns = 3000; + qDebug() << "Computed window size " << max_window << " packets"; + +// pgm_setsockopt(_priv->socket, PGM_SEND_ONLY, &send_only, +// sizeof(send_only)); + + // SPM messages + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_AMBIENT_SPM, &ambient_spm, sizeof(ambient_spm)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_HEARTBEAT_SPM, &spm_heartbeat, sizeof(spm_heartbeat)); + + // Transmit window + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_TXW_MAX_RTE, &max_rate, sizeof(max_rate)); +// pgm_setsockopt(_priv->socket, PGM_TXW_SECS, &max_window, sizeof(max_window)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_TXW_SQNS, &max_window, sizeof(max_window)); + + /* Options for receiving data */ + const int passive = 0, + spmr_expiry = 500 * 1000, + nak_bo_ivl = 200 * 1000, + nak_rpt_ivl = 500 * 1000, + nak_rdata_ivl = 500 * 1000, nak_data_retries = 50, - nak_ncf_retries = 50, - no_rxw_sqns = 0; - pgm_setsockopt(_priv->socket, PGM_RECV_ONLY, &recv_only, sizeof(recv_only)); - pgm_setsockopt(_priv->socket, PGM_PASSIVE, &passive, sizeof(passive)); - pgm_setsockopt(_priv->socket, PGM_RXW_MAX_RTE, &max_rate, sizeof(max_rate)); - pgm_setsockopt(_priv->socket, PGM_RXW_SECS, &max_window, sizeof(max_window)); - pgm_setsockopt(_priv->socket, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry)); - pgm_setsockopt(_priv->socket, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry)); - pgm_setsockopt(_priv->socket, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl)); - pgm_setsockopt(_priv->socket, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl)); - pgm_setsockopt(_priv->socket, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl)); - pgm_setsockopt(_priv->socket, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries)); - pgm_setsockopt(_priv->socket, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries)); - pgm_setsockopt(_priv->socket, PGM_RXW_SQNS, &no_rxw_sqns, sizeof(no_rxw_sqns)); + nak_ncf_retries = 50; + qDebug() << "Computed window size " << max_window << " packets"; + +// pgm_setsockopt(_priv->socket, PGM_RECV_ONLY, &recv_only, sizeof(recv_only)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_PASSIVE, &passive, sizeof(passive)); +// pgm_setsockopt(_priv->socket, PGM_RXW_MAX_RTE, &max_rate, sizeof(max_rate)); +// pgm_setsockopt(_priv->socket, PGM_RXW_SECS, &max_window, sizeof(max_window)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_RXW_SQNS, &max_window, sizeof(max_window)); + + /* Try using PGMCC */ + const struct pgm_pgmccinfo_t pgmccinfo = { + 100 /* usecs */ * 1000 /* msecs */, + 75 /* from OpenPGM examples */, + 500 /* from PGMCC internet-draft */ + }; + good = pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_USE_PGMCC, &pgmccinfo, sizeof(pgmccinfo)); + if(!good) + { + qCritical() << "Could not enable PGMCC"; + return false; } +// /* Forward Error Correction */ +// const struct pgm_fecinfo_t pgmfecinfo = { +// 255 /* from OpenPGM examples */, +// 2 /* send two proactive packets */, +// 8 /* from OpenPGM examples */, +// 1 /* enable on-demand parity */, +// 1 /* enable variable packet length */ +// }; +// good = pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_USE_FEC, &pgmfecinfo, sizeof(pgmfecinfo)); +// if(!good) +// { +// qCritical() << "Could not enable FEC"; +// return false; +// } + + // Peer Expiry: We will give 1 minute. + int const peer_expiry = 60 /* seconds */ * 1000000 /* microseconds */; + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry)); + // MTU int const mtu = config->multicastMTU(); - pgm_setsockopt(_priv->socket, PGM_MTU, &mtu, sizeof(mtu)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MTU, &mtu, sizeof(mtu)); pgm_sockaddr_t addr; addr.sa_addr.sport = config->multicastSPort(); @@ -235,8 +256,8 @@ bool McastPGMSocket::open(McastConfiguration const* config, Direction direction) const int uport = config->multicastUDPUPort(); const int mport = config->multicastUDPMPort(); - pgm_setsockopt(_priv->socket, PGM_UDP_ENCAP_MCAST_PORT, &mport, sizeof(mport)); - pgm_setsockopt(_priv->socket, PGM_UDP_ENCAP_UCAST_PORT, &uport, sizeof(uport)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, &mport, sizeof(mport)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &uport, sizeof(uport)); } good = pgm_bind3(_priv->socket, &addr, sizeof(addr), &ifreq , sizeof(ifreq), &ifreq, sizeof(ifreq), &err); @@ -255,21 +276,21 @@ bool McastPGMSocket::open(McastConfiguration const* config, Direction direction) // join the group for (unsigned i = 0; i < addrinfo->ai_recv_addrs_len; i++) { - pgm_setsockopt(_priv->socket, PGM_JOIN_GROUP, + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_JOIN_GROUP, &addrinfo->ai_recv_addrs[i], sizeof(struct group_req)); } // set send address - pgm_setsockopt(_priv->socket, PGM_SEND_GROUP, &addrinfo->ai_send_addrs[0], + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_SEND_GROUP, &addrinfo->ai_send_addrs[0], sizeof(struct group_req)); // IP parameters const int nonblocking = 1, multicast_loop = 0, multicast_hops = 16; - pgm_setsockopt(_priv->socket, PGM_MULTICAST_LOOP, &multicast_loop, + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MULTICAST_LOOP, &multicast_loop, sizeof(multicast_loop)); - pgm_setsockopt(_priv->socket, PGM_MULTICAST_HOPS, &multicast_hops, + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MULTICAST_HOPS, &multicast_hops, sizeof(multicast_hops)); - pgm_setsockopt(_priv->socket, PGM_NOBLOCK, &nonblocking, + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking, sizeof(nonblocking)); good = pgm_connect(_priv->socket, &err); @@ -286,6 +307,10 @@ bool McastPGMSocket::open(McastConfiguration const* config, Direction direction) pgm_freeaddrinfo(addrinfo); + /* Prime the generation of SPM packets during the waiting period */ + if(_priv->direction == PSOCK_WRITE) + QTimer::singleShot(0, this, SLOT(handleNak())); + return true; } @@ -337,8 +362,6 @@ void McastPGMSocket::handleNak() if (_finished) return; - qDebug() << "handleNak()"; - // QTimer::singleShot(1000, this, SLOT(handleNakTimeout())); // to handle NAKs in OpenPGM, we need to pgm_recv: @@ -355,9 +378,12 @@ void McastPGMSocket::handleNak() { struct timeval tv; socklen_t size = sizeof(tv); - pgm_getsockopt(_priv->socket, PGM_TIME_REMAIN, &tv, &size); - const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); - qDebug() << " timer pending: " << msecs << "ms"; + pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &size); + const long usecs = tv.tv_sec * 1000000 + tv.tv_usec; + int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + if(msecs == 0) + msecs = 1; + qDebug() << " timer pending: " << usecs << "us (rounded to " << msecs << "ms)"; _nakTimeout->start(msecs); break; } @@ -365,9 +391,11 @@ void McastPGMSocket::handleNak() { struct timeval tv; socklen_t size = sizeof(tv); - pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, &size); - const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); - qDebug() << " rate limited: " << msecs << "ms"; + pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size); + int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + if(msecs == 0) + msecs = 1; + qDebug() << " rate limited: " << msecs << "ms"; _nakTimeout->start(msecs); break; } @@ -410,10 +438,7 @@ void McastPGMSocket::handleData(int fd) void McastPGMSocket::handleData() { - qDebug() << "handleData()"; - if (_finished) { - qDebug() << " finished!"; return; } @@ -445,9 +470,12 @@ void McastPGMSocket::handleData() { struct timeval tv; socklen_t size = sizeof(tv); - pgm_getsockopt(_priv->socket, PGM_TIME_REMAIN, &tv, &size); - const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); - qDebug() << " timer pending: " << msecs << "ms"; + pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &size); + const long usecs = tv.tv_sec * 1000000 + tv.tv_usec; + int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + if(msecs == 0) + msecs = 1; + qDebug() << " timer pending: " << usecs << "us (rounded to " << msecs << "ms)"; _dataTimeout->start(msecs); break; } @@ -455,9 +483,11 @@ void McastPGMSocket::handleData() { struct timeval tv; socklen_t size = sizeof(tv); - pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, &size); - const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); - qDebug() << " rate limit pending: " << msecs << "ms"; + pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size); + int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + if(msecs == 0) + msecs = 1; + qDebug() << " rate limit pending: " << msecs << "ms"; _dataTimeout->start(msecs); break; } @@ -502,7 +532,6 @@ void McastPGMSocket::canSend() if (_finished) return; - // qDebug() << "canSend()"; if (_priv->send_notif) { @@ -536,17 +565,26 @@ void McastPGMSocket::canSend() { _priv->send_notif->setEnabled(true); } + else if(status == PGM_IO_STATUS_CONGESTION) + { + qDebug() << " congested..."; + // wait a short time (10ms?) + _sendTimeout->start(10); + } else if(status == PGM_IO_STATUS_RATE_LIMITED) { struct timeval tv; socklen_t size = sizeof(tv); - pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, &size); - int msecs = (tv.tv_sec * 1000) + ((tv.tv_sec + 999) / 1000); + pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size); + int msecs = (tv.tv_sec * 1000) + ((tv.tv_usec + 999) / 1000); + if(msecs == 0) + msecs = 1; + qDebug() << " rate_limited, waiting" << msecs << "ms"; _sendTimeout->start(msecs); } else { - qCritical() << "Unhandled status in canSend()"; + qCritical() << "Unhandled status in canSend():" << status; } } diff --git a/src/net/mcast/McastSender.cpp b/src/net/mcast/McastSender.cpp index e25ec86..3fec6a4 100644 --- a/src/net/mcast/McastSender.cpp +++ b/src/net/mcast/McastSender.cpp @@ -18,11 +18,14 @@ #include "McastConstants.h" #include <QDataStream> +#include <QTimer> #include <pgm/pgm.h> // OpenPGM #defines bool. This is bad in C++. #undef bool +#define MCASTFT_START_DEFER_TIME 2000 /* msec */ + McastSender::McastSender(QIODevice* iodev, McastConfiguration const* config, QObject* parent) : QObject(parent), _config(config ? new McastConfiguration(*config) : new McastConfiguration()), @@ -42,16 +45,24 @@ McastSender::~McastSender() void McastSender::start() { _socket = new McastPGMSocket(this); - connect(_socket, SIGNAL(readyToSend()), this, SLOT(readyToSend())); + connect(_socket, SIGNAL(readyToSend()), this, SLOT(deferredStart())); _socket->open(_config, McastPGMSocket::PSOCK_WRITE); } void McastSender::start(McastPGMSocket* socket) { _socket = socket; - connect(_socket, SIGNAL(readyToSend()), this, SLOT(readyToSend())); Q_ASSERT(_socket->isOpen()); - readyToSend(); + deferredStart(); +} + +void McastSender::deferredStart() +{ + // Wait some time, to give the PGM library the chance to generate some + // undisturbed SPM messages: + QTimer::singleShot(MCASTFT_START_DEFER_TIME, this, SLOT(readyToSend())); + disconnect(_socket, SIGNAL(readyToSend()), this, SLOT(deferredStart())); + connect(_socket, SIGNAL(readyToSend()), this, SLOT(readyToSend())); } void McastSender::readyToSend() diff --git a/src/net/mcast/McastSender.h b/src/net/mcast/McastSender.h index dd5154c..0c5e29f 100644 --- a/src/net/mcast/McastSender.h +++ b/src/net/mcast/McastSender.h @@ -57,6 +57,7 @@ public slots: void close(); private slots: + void deferredStart(); void readyToSend(); void socketFinished(); diff --git a/src/net/pvsOutgoingMulticastTransfer.cpp b/src/net/pvsOutgoingMulticastTransfer.cpp index 2f24d49..4df4986 100644 --- a/src/net/pvsOutgoingMulticastTransfer.cpp +++ b/src/net/pvsOutgoingMulticastTransfer.cpp @@ -121,7 +121,7 @@ void PVSOutgoingMulticastTransfer::prepare() _socketInacceptable = false; // announce the transfer: QFileInfo info(*_file); - QString message = QString("%1:%2:%3:%4:%5").arg(_senderName).arg(_id).arg(info.baseName()).arg(info.size()).arg(_config->multicastUDPPortBase()); + QString message = QString("%1:%2:%3:%4:%5").arg(_senderName).arg(_id).arg(info.fileName()).arg(info.size()).arg(_config->multicastUDPPortBase()); PVSMsg msg(PVSCOMMAND, "MCASTFTANNOUNCE", message); emit announce(msg); _prepareTimer->start(5000); |