summaryrefslogtreecommitdiffstats
path: root/src/net
diff options
context:
space:
mode:
authorSebastien Braun2010-07-19 11:53:55 +0200
committerSebastien Braun2010-07-19 11:53:55 +0200
commit755f07b1a25c414b0c7cbe28db4a7ecbc32975c7 (patch)
treea9dc99869865bc0f1f0b3e19c488459b3bc8a7fd /src/net
parentRemember to delete outgoing transfers when they are finished or failed (diff)
downloadpvs-755f07b1a25c414b0c7cbe28db4a7ecbc32975c7.tar.gz
pvs-755f07b1a25c414b0c7cbe28db4a7ecbc32975c7.tar.xz
pvs-755f07b1a25c414b0c7cbe28db4a7ecbc32975c7.zip
Implement initial multicast receive functionality in PVS daemon
Diffstat (limited to 'src/net')
-rw-r--r--src/net/mcast/McastReceiver.cpp49
-rw-r--r--src/net/mcast/McastReceiver.h5
-rw-r--r--src/net/pvsIncomingMulticastTransfer.cpp134
-rw-r--r--src/net/pvsIncomingMulticastTransfer.h71
4 files changed, 253 insertions, 6 deletions
diff --git a/src/net/mcast/McastReceiver.cpp b/src/net/mcast/McastReceiver.cpp
index 6070208..1f27127 100644
--- a/src/net/mcast/McastReceiver.cpp
+++ b/src/net/mcast/McastReceiver.cpp
@@ -27,7 +27,7 @@
McastReceiver::McastReceiver(QIODevice* iodev, McastConfiguration* config, QObject* parent) :
QObject(parent),
- _config(config ? new McastConfiguration(*config) : new McastConfiguration()),
+ _config(config ? new McastConfiguration(*config) : 0),
_socket(0),
_curoffs(0),
_closed(false),
@@ -43,13 +43,52 @@ McastReceiver::~McastReceiver()
delete _config;
}
-void McastReceiver::start()
+void McastReceiver::config(McastConfiguration const* config)
{
+ if (_config)
+ delete _config;
+ _config = new McastConfiguration(*config, this);
+}
+
+bool McastReceiver::start()
+{
+ McastConfiguration *config = _config;
+ if (!config)
+ config = new McastConfiguration();
+
+ if (_socket)
+ {
+ delete _socket;
+ }
_socket = new McastPGMSocket(this);
- connect(_socket, SIGNAL(receivedPacket(QByteArray)), this, SLOT(receivedPacket(QByteArray)));
- connect(_socket, SIGNAL(connectionReset()), this, SLOT(connectionReset()));
+
+ connect(_socket, SIGNAL(receivedPacket(QByteArray)), SLOT(receivedPacket(QByteArray)));
+ connect(_socket, SIGNAL(connectionReset()), SLOT(connectionReset()));
// connect(_socket, SIGNAL(connectionFinished()), this, SLOT(connectionFinished()));
- _socket->open(_config, McastPGMSocket::PSOCK_READ);
+ if (_socket->open(_config, McastPGMSocket::PSOCK_READ))
+ {
+ return true;
+ }
+ else
+ {
+ disconnect(_socket, SIGNAL(receivedPacket(QByteArray)), this, SLOT(receivedPacket(QByteArray)));
+ disconnect(_socket, SIGNAL(connectionReset()), this, SLOT(connectionReset()));
+ return false;
+ }
+}
+
+void McastReceiver::abort()
+{
+ if (_socket)
+ {
+ delete _socket;
+ _socket = 0;
+ }
+
+ if (_iodev)
+ {
+ _iodev->close();
+ }
}
void McastReceiver::receivedPacket(QByteArray const& bytes)
diff --git a/src/net/mcast/McastReceiver.h b/src/net/mcast/McastReceiver.h
index 48ff7c5..247733d 100644
--- a/src/net/mcast/McastReceiver.h
+++ b/src/net/mcast/McastReceiver.h
@@ -47,6 +47,8 @@ public:
return _config;
}
+ void config(McastConfiguration const* config);
+
static inline bool is_error(Result result)
{
return result != RES_OK;
@@ -57,7 +59,8 @@ signals:
void progress(quint64 offset);
public slots:
- void start();
+ bool start();
+ void abort();
private:
McastConfiguration* _config;
diff --git a/src/net/pvsIncomingMulticastTransfer.cpp b/src/net/pvsIncomingMulticastTransfer.cpp
new file mode 100644
index 0000000..01507a9
--- /dev/null
+++ b/src/net/pvsIncomingMulticastTransfer.cpp
@@ -0,0 +1,134 @@
+/*
+# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg
+#
+# This program is free software distributed under the GPL version 2.
+# See http://openslx.org/COPYING
+#
+# If you have any feedback please consult http://openslx.org/feedback and
+# send your suggestions, praise, or complaints to feedback@openslx.org
+#
+# General information about OpenSLX can be found at http://openslx.org/
+# -----------------------------------------------------------------------------
+# src/net/pcsIncomingMulticastTransfer.h
+# - wrap McastReceiver functionality in PVS daemon
+# -----------------------------------------------------------------------------
+*/
+
+#include <QDir>
+#include <QTemporaryFile>
+#include <QTimer>
+
+#include "pvsIncomingMulticastTransfer.h"
+#include <src/net/mcast/McastReceiver.h>
+
+PVSIncomingMulticastTransfer::PVSIncomingMulticastTransfer(QString const& sender, qulonglong transferID, qulonglong size,
+ ushort port, McastConfiguration const* configTemplate, QObject* parent) :
+ QObject(parent),
+ _sender(sender),
+ _transferID(transferID),
+ _bytes(0),
+ _size(size),
+ _port(port),
+ _temporaryFile(new QTemporaryFile(QFileInfo(QDir::homePath(), "incoming.mcastft.XXXXXX").absolutePath(), this)),
+ _finalFile(0),
+ _receiver(0),
+ _config(configTemplate ?
+ new McastConfiguration(*configTemplate) :
+ new McastConfiguration()),
+ _progressTimer(new QTimer(this))
+{
+ _config->multicastUDPPortBase(port);
+ _config->multicastDPort(port);
+ _config->multicastSPort(port);
+
+ connect(_progressTimer, SIGNAL(timeout()), SLOT(updateProgress()));
+}
+
+PVSIncomingMulticastTransfer::~PVSIncomingMulticastTransfer()
+{
+ // TODO Auto-generated destructor stub
+}
+
+bool PVSIncomingMulticastTransfer::start()
+{
+ QFile *dest = _finalFile ? _finalFile : _temporaryFile;
+ _receiver = new McastReceiver(dest, new McastConfiguration(*_config), this);
+ connect(_receiver, SIGNAL(finished(int)), SLOT(receiverFinished(int)));
+ connect(_receiver, SIGNAL(progress(quint64)), SLOT(receiverProgress(quint64)));
+
+ if (!_receiver->start())
+ {
+ emit retry(_sender, _transferID);
+ return false;
+ }
+ else
+ {
+ _progressTimer->start(333);
+ return true;
+ }
+}
+
+void PVSIncomingMulticastTransfer::abort()
+{
+ delete _receiver;
+ _receiver = 0;
+
+ delete _progressTimer;
+ _progressTimer = 0;
+
+ if (_temporaryFile)
+ {
+ _temporaryFile->remove();
+ }
+ delete _temporaryFile;
+
+ if (_finalFile)
+ {
+ _finalFile->remove();
+ }
+ delete _finalFile;
+}
+
+void PVSIncomingMulticastTransfer::updatePort(ushort port)
+{
+ _config->multicastUDPPortBase(port);
+ _config->multicastSPort(port);
+ _config->multicastDPort(port);
+}
+
+void PVSIncomingMulticastTransfer::receiverProgressed(quint64 bytes)
+{
+ _bytes = bytes;
+}
+
+void PVSIncomingMulticastTransfer::receiverFinished(int how)
+{
+ switch(how)
+ {
+ case McastReceiver::RES_OK:
+ emit finished(_transferID);
+ break;
+ case McastReceiver::RES_ABORTED:
+ emit failed(_transferID, tr("Aborted"));
+ break;
+ case McastReceiver::RES_MD5_MISMATCH:
+ case McastReceiver::RES_CHECKSUM_MISMATCH:
+ emit failed(_transferID, tr("Unrecoverable data corruption"));
+ break;
+ case McastReceiver::RES_CONNECTION_RESET:
+ emit failed(_transferID, tr("Connection was reset"));
+ break;
+ case McastReceiver::RES_OFFSET_MISMATCH:
+ emit failed(_transferID, tr("Unrecoverable data loss. Try a lower transfer rate"));
+ break;
+ }
+}
+
+void PVSIncomingMulticastTransfer::updateProgress()
+{
+ if (!_started)
+ {
+ emit started(_transferID);
+ }
+ emit progress(_transferID, _bytes, _size);
+}
diff --git a/src/net/pvsIncomingMulticastTransfer.h b/src/net/pvsIncomingMulticastTransfer.h
new file mode 100644
index 0000000..9a33348
--- /dev/null
+++ b/src/net/pvsIncomingMulticastTransfer.h
@@ -0,0 +1,71 @@
+/*
+# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg
+#
+# This program is free software distributed under the GPL version 2.
+# See http://openslx.org/COPYING
+#
+# If you have any feedback please consult http://openslx.org/feedback and
+# send your suggestions, praise, or complaints to feedback@openslx.org
+#
+# General information about OpenSLX can be found at http://openslx.org/
+# -----------------------------------------------------------------------------
+# src/net/pcsIncomingMulticastTransfer.h
+# - wrap McastReceiver functionality in PVS daemon
+# -----------------------------------------------------------------------------
+*/
+
+#ifndef PVSINCOMINGMULTICASTTRANSFER_H_
+#define PVSINCOMINGMULTICASTTRANSFER_H_
+
+#include <QObject>
+#include <QString>
+
+#include <src/net/pvsMsg.h>
+
+class McastConfiguration;
+class McastReceiver;
+class QFile;
+class QTimer;
+
+class PVSIncomingMulticastTransfer : public QObject
+{
+ Q_OBJECT
+public:
+ PVSIncomingMulticastTransfer(QString const& sender, qulonglong transferID, qulonglong size, ushort port,
+ McastConfiguration const* configTemplate, QObject* parent = 0);
+ virtual ~PVSIncomingMulticastTransfer();
+
+ void setFinalFile(QString const& filename);
+
+signals:
+ void retry(QString const& sender, qulonglong transferID);
+ void started(qulonglong transferID);
+ void progress(qulonglong transferID, qulonglong bytes, qulonglong of);
+ void finished(qulonglong transferID);
+ void failed(qulonglong transferID, QString const& reason);
+
+public slots:
+ void updatePort(ushort port);
+ bool start();
+ void abort();
+
+private slots:
+ void receiverProgressed(quint64 bytes);
+ void receiverFinished(int reason);
+ void updateProgress();
+
+private:
+ QString _sender;
+ qulonglong _transferID;
+ qulonglong _bytes;
+ qulonglong _size;
+ ushort _port;
+ QFile* _temporaryFile;
+ QFile* _finalFile;
+ McastReceiver* _receiver;
+ McastConfiguration* _config;
+ bool _started;
+ QTimer* _progressTimer;
+};
+
+#endif /* PVSINCOMINGMULTICASTTRANSFER_H_ */