summaryrefslogtreecommitdiffstats
path: root/src/net
diff options
context:
space:
mode:
authorSebastien Braun2010-10-03 16:14:44 +0200
committerSebastien Braun2010-10-03 16:14:44 +0200
commitffee0868ef1341cfb7622821431cb73c52590962 (patch)
treebc96be65e0346ea25a8effb2118de59b08d54466 /src/net
parentAdd patch for OpenPGM to fix switch() fallthrough (diff)
downloadpvs-ffee0868ef1341cfb7622821431cb73c52590962.tar.gz
pvs-ffee0868ef1341cfb7622821431cb73c52590962.tar.xz
pvs-ffee0868ef1341cfb7622821431cb73c52590962.zip
Assorted Multicast Fixes:
- Upgrade bundled OpenPGM to SVN r1135 - Timing fixes: Make all rate-limited and timer-pending operation wait for at least 1ms to avoid busy-waiting - No distinction between sending and receiving sockets when setting up socket options (Receivers need to be able to send anyway when using PGMCC). - Switch from fixed-rate transmission to using PGMCC for congestion control. - Remove some obnoxious debugging outputs - Some white space fixes - Introduce a short waiting time before actually starting file transmission in order to allow enough SPM messages to be sent so that receivers can initialize properly. - Fix MCASTFTANNOUNCE message to include full file name instead of basename. - Fix generateMcastTransferID in order to gather more random IDs. PVSGUI may become confused if transfer IDs are reused. - Properly dispose of clientFileReceiveDialog when multicast transfer is finished. - Properly display transfer size in clientFileReceiveDialog
Diffstat (limited to 'src/net')
-rw-r--r--src/net/mcast/CMakeLists.txt8
-rw-r--r--src/net/mcast/McastPGMSocket.cpp206
-rw-r--r--src/net/mcast/McastSender.cpp17
-rw-r--r--src/net/mcast/McastSender.h1
-rw-r--r--src/net/pvsOutgoingMulticastTransfer.cpp2
5 files changed, 142 insertions, 92 deletions
diff --git a/src/net/mcast/CMakeLists.txt b/src/net/mcast/CMakeLists.txt
index 914c9d3..e92b090 100644
--- a/src/net/mcast/CMakeLists.txt
+++ b/src/net/mcast/CMakeLists.txt
@@ -1,15 +1,15 @@
INCLUDE(../../../OpenPGMConfig.cmake)
ADD_DEFINITIONS(
- ${LIBPGM_CFLAGS}
+ ${LIBPGM_CXXFLAGS}
-D__STDC_CONSTANT_MACROS
-D__STDC_LIMIT_MACROS
)
# OpenPGM uses the C99 restrict keyword which g++ does not recognize:
-IF(CMAKE_COMPILER_IS_GNUCXX)
- ADD_DEFINITIONS(-Drestrict=__restrict__)
-ENDIF(CMAKE_COMPILER_IS_GNUCXX)
+#IF(CMAKE_COMPILER_IS_GNUCXX)
+# ADD_DEFINITIONS(${LIBPGM_CXXFLAGS})
+#ENDIF(CMAKE_COMPILER_IS_GNUCXX)
INCLUDE(${QT_USE_FILE})
diff --git a/src/net/mcast/McastPGMSocket.cpp b/src/net/mcast/McastPGMSocket.cpp
index 7952f00..bf52bd7 100644
--- a/src/net/mcast/McastPGMSocket.cpp
+++ b/src/net/mcast/McastPGMSocket.cpp
@@ -148,67 +148,88 @@ bool McastPGMSocket::open(McastConfiguration const* config, Direction direction)
return false;
}
- unsigned const ambient_spm = 4096 * 1000; // every four seconds (approx.)
-
- // set parameters
- if (direction == PSOCK_WRITE)
- {
- // write-only socket
- const int send_only = 1,
- spm_heartbeat[] =
- { 256 * 1000,
- 512 * 1000,
- 1024 * 1000,
- 2048 * 1000,
- 4096 * 1000 },
- max_rate = config->multicastRate(),
- max_window = config->multicastWinSize();
- // const int max_window_sqns = 3000;
-
- pgm_setsockopt(_priv->socket, PGM_SEND_ONLY, &send_only,
- sizeof(send_only));
-
- // SPM messages
- pgm_setsockopt(_priv->socket, PGM_AMBIENT_SPM, &ambient_spm, sizeof(ambient_spm));
- pgm_setsockopt(_priv->socket, PGM_HEARTBEAT_SPM, &spm_heartbeat, sizeof(spm_heartbeat));
-
- // Transmit window
- pgm_setsockopt(_priv->socket, PGM_TXW_MAX_RTE, &max_rate, sizeof(max_rate));
- pgm_setsockopt(_priv->socket, PGM_TXW_SECS, &max_window, sizeof(max_window));
- // pgm_setsockopt(_priv->socket, PGM_TXW_SQNS, &max_window, sizeof(max_window));
- }
- else
- {
- // readonly
- const int recv_only = 1,
- passive = 0,
- max_window = config->multicastWinSize(),
- max_rate = config->multicastRate(),
- peer_expiry = ambient_spm * 5,
- spmr_expiry = 250 * 1000,
- nak_bo_ivl = 100 * 1000,
- nak_rpt_ivl = 400 * 1000,
- nak_rdata_ivl = 400 * 1000,
+ unsigned const ambient_spm = 2000 * 1000; // every one hundred milliseconds (approx.)
+
+ /* Options for sending data */
+ const int spm_heartbeat[] =
+ { 512 * 1000,
+ 1024 * 1000,
+ 2048 * 1000,
+ 4096 * 1000 },
+ max_rate = 0,
+ max_window = config->multicastWinSize() * config->multicastRate() / config->multicastMTU();
+ // const int max_window_sqns = 3000;
+ qDebug() << "Computed window size " << max_window << " packets";
+
+// pgm_setsockopt(_priv->socket, PGM_SEND_ONLY, &send_only,
+// sizeof(send_only));
+
+ // SPM messages
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_AMBIENT_SPM, &ambient_spm, sizeof(ambient_spm));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_HEARTBEAT_SPM, &spm_heartbeat, sizeof(spm_heartbeat));
+
+ // Transmit window
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_TXW_MAX_RTE, &max_rate, sizeof(max_rate));
+// pgm_setsockopt(_priv->socket, PGM_TXW_SECS, &max_window, sizeof(max_window));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_TXW_SQNS, &max_window, sizeof(max_window));
+
+ /* Options for receiving data */
+ const int passive = 0,
+ spmr_expiry = 500 * 1000,
+ nak_bo_ivl = 200 * 1000,
+ nak_rpt_ivl = 500 * 1000,
+ nak_rdata_ivl = 500 * 1000,
nak_data_retries = 50,
- nak_ncf_retries = 50,
- no_rxw_sqns = 0;
- pgm_setsockopt(_priv->socket, PGM_RECV_ONLY, &recv_only, sizeof(recv_only));
- pgm_setsockopt(_priv->socket, PGM_PASSIVE, &passive, sizeof(passive));
- pgm_setsockopt(_priv->socket, PGM_RXW_MAX_RTE, &max_rate, sizeof(max_rate));
- pgm_setsockopt(_priv->socket, PGM_RXW_SECS, &max_window, sizeof(max_window));
- pgm_setsockopt(_priv->socket, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry));
- pgm_setsockopt(_priv->socket, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry));
- pgm_setsockopt(_priv->socket, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl));
- pgm_setsockopt(_priv->socket, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl));
- pgm_setsockopt(_priv->socket, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl));
- pgm_setsockopt(_priv->socket, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries));
- pgm_setsockopt(_priv->socket, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries));
- pgm_setsockopt(_priv->socket, PGM_RXW_SQNS, &no_rxw_sqns, sizeof(no_rxw_sqns));
+ nak_ncf_retries = 50;
+ qDebug() << "Computed window size " << max_window << " packets";
+
+// pgm_setsockopt(_priv->socket, PGM_RECV_ONLY, &recv_only, sizeof(recv_only));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_PASSIVE, &passive, sizeof(passive));
+// pgm_setsockopt(_priv->socket, PGM_RXW_MAX_RTE, &max_rate, sizeof(max_rate));
+// pgm_setsockopt(_priv->socket, PGM_RXW_SECS, &max_window, sizeof(max_window));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_RXW_SQNS, &max_window, sizeof(max_window));
+
+ /* Try using PGMCC */
+ const struct pgm_pgmccinfo_t pgmccinfo = {
+ 100 /* usecs */ * 1000 /* msecs */,
+ 75 /* from OpenPGM examples */,
+ 500 /* from PGMCC internet-draft */
+ };
+ good = pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_USE_PGMCC, &pgmccinfo, sizeof(pgmccinfo));
+ if(!good)
+ {
+ qCritical() << "Could not enable PGMCC";
+ return false;
}
+// /* Forward Error Correction */
+// const struct pgm_fecinfo_t pgmfecinfo = {
+// 255 /* from OpenPGM examples */,
+// 2 /* send two proactive packets */,
+// 8 /* from OpenPGM examples */,
+// 1 /* enable on-demand parity */,
+// 1 /* enable variable packet length */
+// };
+// good = pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_USE_FEC, &pgmfecinfo, sizeof(pgmfecinfo));
+// if(!good)
+// {
+// qCritical() << "Could not enable FEC";
+// return false;
+// }
+
+ // Peer Expiry: We will give 1 minute.
+ int const peer_expiry = 60 /* seconds */ * 1000000 /* microseconds */;
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry));
+
// MTU
int const mtu = config->multicastMTU();
- pgm_setsockopt(_priv->socket, PGM_MTU, &mtu, sizeof(mtu));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MTU, &mtu, sizeof(mtu));
pgm_sockaddr_t addr;
addr.sa_addr.sport = config->multicastSPort();
@@ -235,8 +256,8 @@ bool McastPGMSocket::open(McastConfiguration const* config, Direction direction)
const int uport = config->multicastUDPUPort();
const int mport = config->multicastUDPMPort();
- pgm_setsockopt(_priv->socket, PGM_UDP_ENCAP_MCAST_PORT, &mport, sizeof(mport));
- pgm_setsockopt(_priv->socket, PGM_UDP_ENCAP_UCAST_PORT, &uport, sizeof(uport));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, &mport, sizeof(mport));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &uport, sizeof(uport));
}
good = pgm_bind3(_priv->socket, &addr, sizeof(addr), &ifreq , sizeof(ifreq), &ifreq, sizeof(ifreq), &err);
@@ -255,21 +276,21 @@ bool McastPGMSocket::open(McastConfiguration const* config, Direction direction)
// join the group
for (unsigned i = 0; i < addrinfo->ai_recv_addrs_len; i++)
{
- pgm_setsockopt(_priv->socket, PGM_JOIN_GROUP,
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_JOIN_GROUP,
&addrinfo->ai_recv_addrs[i], sizeof(struct group_req));
}
// set send address
- pgm_setsockopt(_priv->socket, PGM_SEND_GROUP, &addrinfo->ai_send_addrs[0],
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_SEND_GROUP, &addrinfo->ai_send_addrs[0],
sizeof(struct group_req));
// IP parameters
const int nonblocking = 1, multicast_loop = 0, multicast_hops = 16;
- pgm_setsockopt(_priv->socket, PGM_MULTICAST_LOOP, &multicast_loop,
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MULTICAST_LOOP, &multicast_loop,
sizeof(multicast_loop));
- pgm_setsockopt(_priv->socket, PGM_MULTICAST_HOPS, &multicast_hops,
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MULTICAST_HOPS, &multicast_hops,
sizeof(multicast_hops));
- pgm_setsockopt(_priv->socket, PGM_NOBLOCK, &nonblocking,
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking,
sizeof(nonblocking));
good = pgm_connect(_priv->socket, &err);
@@ -286,6 +307,10 @@ bool McastPGMSocket::open(McastConfiguration const* config, Direction direction)
pgm_freeaddrinfo(addrinfo);
+ /* Prime the generation of SPM packets during the waiting period */
+ if(_priv->direction == PSOCK_WRITE)
+ QTimer::singleShot(0, this, SLOT(handleNak()));
+
return true;
}
@@ -337,8 +362,6 @@ void McastPGMSocket::handleNak()
if (_finished)
return;
- qDebug() << "handleNak()";
-
// QTimer::singleShot(1000, this, SLOT(handleNakTimeout()));
// to handle NAKs in OpenPGM, we need to pgm_recv:
@@ -355,9 +378,12 @@ void McastPGMSocket::handleNak()
{
struct timeval tv;
socklen_t size = sizeof(tv);
- pgm_getsockopt(_priv->socket, PGM_TIME_REMAIN, &tv, &size);
- const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
- qDebug() << " timer pending: " << msecs << "ms";
+ pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &size);
+ const long usecs = tv.tv_sec * 1000000 + tv.tv_usec;
+ int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+ if(msecs == 0)
+ msecs = 1;
+ qDebug() << " timer pending: " << usecs << "us (rounded to " << msecs << "ms)";
_nakTimeout->start(msecs);
break;
}
@@ -365,9 +391,11 @@ void McastPGMSocket::handleNak()
{
struct timeval tv;
socklen_t size = sizeof(tv);
- pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, &size);
- const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
- qDebug() << " rate limited: " << msecs << "ms";
+ pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size);
+ int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+ if(msecs == 0)
+ msecs = 1;
+ qDebug() << " rate limited: " << msecs << "ms";
_nakTimeout->start(msecs);
break;
}
@@ -410,10 +438,7 @@ void McastPGMSocket::handleData(int fd)
void McastPGMSocket::handleData()
{
- qDebug() << "handleData()";
-
if (_finished) {
- qDebug() << " finished!";
return;
}
@@ -445,9 +470,12 @@ void McastPGMSocket::handleData()
{
struct timeval tv;
socklen_t size = sizeof(tv);
- pgm_getsockopt(_priv->socket, PGM_TIME_REMAIN, &tv, &size);
- const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
- qDebug() << " timer pending: " << msecs << "ms";
+ pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &size);
+ const long usecs = tv.tv_sec * 1000000 + tv.tv_usec;
+ int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+ if(msecs == 0)
+ msecs = 1;
+ qDebug() << " timer pending: " << usecs << "us (rounded to " << msecs << "ms)";
_dataTimeout->start(msecs);
break;
}
@@ -455,9 +483,11 @@ void McastPGMSocket::handleData()
{
struct timeval tv;
socklen_t size = sizeof(tv);
- pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, &size);
- const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
- qDebug() << " rate limit pending: " << msecs << "ms";
+ pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size);
+ int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+ if(msecs == 0)
+ msecs = 1;
+ qDebug() << " rate limit pending: " << msecs << "ms";
_dataTimeout->start(msecs);
break;
}
@@ -502,7 +532,6 @@ void McastPGMSocket::canSend()
if (_finished)
return;
- // qDebug() << "canSend()";
if (_priv->send_notif)
{
@@ -536,17 +565,26 @@ void McastPGMSocket::canSend()
{
_priv->send_notif->setEnabled(true);
}
+ else if(status == PGM_IO_STATUS_CONGESTION)
+ {
+ qDebug() << " congested...";
+ // wait a short time (10ms?)
+ _sendTimeout->start(10);
+ }
else if(status == PGM_IO_STATUS_RATE_LIMITED)
{
struct timeval tv;
socklen_t size = sizeof(tv);
- pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, &size);
- int msecs = (tv.tv_sec * 1000) + ((tv.tv_sec + 999) / 1000);
+ pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size);
+ int msecs = (tv.tv_sec * 1000) + ((tv.tv_usec + 999) / 1000);
+ if(msecs == 0)
+ msecs = 1;
+ qDebug() << " rate_limited, waiting" << msecs << "ms";
_sendTimeout->start(msecs);
}
else
{
- qCritical() << "Unhandled status in canSend()";
+ qCritical() << "Unhandled status in canSend():" << status;
}
}
diff --git a/src/net/mcast/McastSender.cpp b/src/net/mcast/McastSender.cpp
index e25ec86..3fec6a4 100644
--- a/src/net/mcast/McastSender.cpp
+++ b/src/net/mcast/McastSender.cpp
@@ -18,11 +18,14 @@
#include "McastConstants.h"
#include <QDataStream>
+#include <QTimer>
#include <pgm/pgm.h>
// OpenPGM #defines bool. This is bad in C++.
#undef bool
+#define MCASTFT_START_DEFER_TIME 2000 /* msec */
+
McastSender::McastSender(QIODevice* iodev, McastConfiguration const* config, QObject* parent) :
QObject(parent),
_config(config ? new McastConfiguration(*config) : new McastConfiguration()),
@@ -42,16 +45,24 @@ McastSender::~McastSender()
void McastSender::start()
{
_socket = new McastPGMSocket(this);
- connect(_socket, SIGNAL(readyToSend()), this, SLOT(readyToSend()));
+ connect(_socket, SIGNAL(readyToSend()), this, SLOT(deferredStart()));
_socket->open(_config, McastPGMSocket::PSOCK_WRITE);
}
void McastSender::start(McastPGMSocket* socket)
{
_socket = socket;
- connect(_socket, SIGNAL(readyToSend()), this, SLOT(readyToSend()));
Q_ASSERT(_socket->isOpen());
- readyToSend();
+ deferredStart();
+}
+
+void McastSender::deferredStart()
+{
+ // Wait some time, to give the PGM library the chance to generate some
+ // undisturbed SPM messages:
+ QTimer::singleShot(MCASTFT_START_DEFER_TIME, this, SLOT(readyToSend()));
+ disconnect(_socket, SIGNAL(readyToSend()), this, SLOT(deferredStart()));
+ connect(_socket, SIGNAL(readyToSend()), this, SLOT(readyToSend()));
}
void McastSender::readyToSend()
diff --git a/src/net/mcast/McastSender.h b/src/net/mcast/McastSender.h
index dd5154c..0c5e29f 100644
--- a/src/net/mcast/McastSender.h
+++ b/src/net/mcast/McastSender.h
@@ -57,6 +57,7 @@ public slots:
void close();
private slots:
+ void deferredStart();
void readyToSend();
void socketFinished();
diff --git a/src/net/pvsOutgoingMulticastTransfer.cpp b/src/net/pvsOutgoingMulticastTransfer.cpp
index 2f24d49..4df4986 100644
--- a/src/net/pvsOutgoingMulticastTransfer.cpp
+++ b/src/net/pvsOutgoingMulticastTransfer.cpp
@@ -121,7 +121,7 @@ void PVSOutgoingMulticastTransfer::prepare()
_socketInacceptable = false;
// announce the transfer:
QFileInfo info(*_file);
- QString message = QString("%1:%2:%3:%4:%5").arg(_senderName).arg(_id).arg(info.baseName()).arg(info.size()).arg(_config->multicastUDPPortBase());
+ QString message = QString("%1:%2:%3:%4:%5").arg(_senderName).arg(_id).arg(info.fileName()).arg(info.size()).arg(_config->multicastUDPPortBase());
PVSMsg msg(PVSCOMMAND, "MCASTFTANNOUNCE", message);
emit announce(msg);
_prepareTimer->start(5000);