summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt2
-rw-r--r--src/net/mcast/McastReceiver.cpp49
-rw-r--r--src/net/mcast/McastReceiver.h5
-rw-r--r--src/net/pvsIncomingMulticastTransfer.cpp134
-rw-r--r--src/net/pvsIncomingMulticastTransfer.h71
-rw-r--r--src/pvs.cpp85
-rw-r--r--src/pvs.h11
7 files changed, 351 insertions, 6 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 93e281e..3f2089a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -100,6 +100,7 @@ SET( PVS_SRCS
src/util/TextFile.cpp
src/util/serviceDiscoveryUtil.cpp
src/net/pvsOutgoingMulticastTransfer.cpp
+ src/net/pvsIncomingMulticastTransfer.cpp
)
# pvsgui
@@ -194,6 +195,7 @@ SET( PVS_MOC_HDRS
src/net/pvsServiceDiscovery.h
src/net/pvsDiscoveredServer.h
src/net/pvsOutgoingMulticastTransfer.h
+ src/net/pvsIncomingMulticastTransfer.h
)
SET( PVSGUI_MOC_HDRS
diff --git a/src/net/mcast/McastReceiver.cpp b/src/net/mcast/McastReceiver.cpp
index 6070208..1f27127 100644
--- a/src/net/mcast/McastReceiver.cpp
+++ b/src/net/mcast/McastReceiver.cpp
@@ -27,7 +27,7 @@
McastReceiver::McastReceiver(QIODevice* iodev, McastConfiguration* config, QObject* parent) :
QObject(parent),
- _config(config ? new McastConfiguration(*config) : new McastConfiguration()),
+ _config(config ? new McastConfiguration(*config) : 0),
_socket(0),
_curoffs(0),
_closed(false),
@@ -43,13 +43,52 @@ McastReceiver::~McastReceiver()
delete _config;
}
-void McastReceiver::start()
+void McastReceiver::config(McastConfiguration const* config)
{
+ if (_config)
+ delete _config;
+ _config = new McastConfiguration(*config, this);
+}
+
+bool McastReceiver::start()
+{
+ McastConfiguration *config = _config;
+ if (!config)
+ config = new McastConfiguration();
+
+ if (_socket)
+ {
+ delete _socket;
+ }
_socket = new McastPGMSocket(this);
- connect(_socket, SIGNAL(receivedPacket(QByteArray)), this, SLOT(receivedPacket(QByteArray)));
- connect(_socket, SIGNAL(connectionReset()), this, SLOT(connectionReset()));
+
+ connect(_socket, SIGNAL(receivedPacket(QByteArray)), SLOT(receivedPacket(QByteArray)));
+ connect(_socket, SIGNAL(connectionReset()), SLOT(connectionReset()));
// connect(_socket, SIGNAL(connectionFinished()), this, SLOT(connectionFinished()));
- _socket->open(_config, McastPGMSocket::PSOCK_READ);
+ if (_socket->open(_config, McastPGMSocket::PSOCK_READ))
+ {
+ return true;
+ }
+ else
+ {
+ disconnect(_socket, SIGNAL(receivedPacket(QByteArray)), this, SLOT(receivedPacket(QByteArray)));
+ disconnect(_socket, SIGNAL(connectionReset()), this, SLOT(connectionReset()));
+ return false;
+ }
+}
+
+void McastReceiver::abort()
+{
+ if (_socket)
+ {
+ delete _socket;
+ _socket = 0;
+ }
+
+ if (_iodev)
+ {
+ _iodev->close();
+ }
}
void McastReceiver::receivedPacket(QByteArray const& bytes)
diff --git a/src/net/mcast/McastReceiver.h b/src/net/mcast/McastReceiver.h
index 48ff7c5..247733d 100644
--- a/src/net/mcast/McastReceiver.h
+++ b/src/net/mcast/McastReceiver.h
@@ -47,6 +47,8 @@ public:
return _config;
}
+ void config(McastConfiguration const* config);
+
static inline bool is_error(Result result)
{
return result != RES_OK;
@@ -57,7 +59,8 @@ signals:
void progress(quint64 offset);
public slots:
- void start();
+ bool start();
+ void abort();
private:
McastConfiguration* _config;
diff --git a/src/net/pvsIncomingMulticastTransfer.cpp b/src/net/pvsIncomingMulticastTransfer.cpp
new file mode 100644
index 0000000..01507a9
--- /dev/null
+++ b/src/net/pvsIncomingMulticastTransfer.cpp
@@ -0,0 +1,134 @@
+/*
+# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg
+#
+# This program is free software distributed under the GPL version 2.
+# See http://openslx.org/COPYING
+#
+# If you have any feedback please consult http://openslx.org/feedback and
+# send your suggestions, praise, or complaints to feedback@openslx.org
+#
+# General information about OpenSLX can be found at http://openslx.org/
+# -----------------------------------------------------------------------------
+# src/net/pcsIncomingMulticastTransfer.h
+# - wrap McastReceiver functionality in PVS daemon
+# -----------------------------------------------------------------------------
+*/
+
+#include <QDir>
+#include <QTemporaryFile>
+#include <QTimer>
+
+#include "pvsIncomingMulticastTransfer.h"
+#include <src/net/mcast/McastReceiver.h>
+
+PVSIncomingMulticastTransfer::PVSIncomingMulticastTransfer(QString const& sender, qulonglong transferID, qulonglong size,
+ ushort port, McastConfiguration const* configTemplate, QObject* parent) :
+ QObject(parent),
+ _sender(sender),
+ _transferID(transferID),
+ _bytes(0),
+ _size(size),
+ _port(port),
+ _temporaryFile(new QTemporaryFile(QFileInfo(QDir::homePath(), "incoming.mcastft.XXXXXX").absolutePath(), this)),
+ _finalFile(0),
+ _receiver(0),
+ _config(configTemplate ?
+ new McastConfiguration(*configTemplate) :
+ new McastConfiguration()),
+ _progressTimer(new QTimer(this))
+{
+ _config->multicastUDPPortBase(port);
+ _config->multicastDPort(port);
+ _config->multicastSPort(port);
+
+ connect(_progressTimer, SIGNAL(timeout()), SLOT(updateProgress()));
+}
+
+PVSIncomingMulticastTransfer::~PVSIncomingMulticastTransfer()
+{
+ // TODO Auto-generated destructor stub
+}
+
+bool PVSIncomingMulticastTransfer::start()
+{
+ QFile *dest = _finalFile ? _finalFile : _temporaryFile;
+ _receiver = new McastReceiver(dest, new McastConfiguration(*_config), this);
+ connect(_receiver, SIGNAL(finished(int)), SLOT(receiverFinished(int)));
+ connect(_receiver, SIGNAL(progress(quint64)), SLOT(receiverProgress(quint64)));
+
+ if (!_receiver->start())
+ {
+ emit retry(_sender, _transferID);
+ return false;
+ }
+ else
+ {
+ _progressTimer->start(333);
+ return true;
+ }
+}
+
+void PVSIncomingMulticastTransfer::abort()
+{
+ delete _receiver;
+ _receiver = 0;
+
+ delete _progressTimer;
+ _progressTimer = 0;
+
+ if (_temporaryFile)
+ {
+ _temporaryFile->remove();
+ }
+ delete _temporaryFile;
+
+ if (_finalFile)
+ {
+ _finalFile->remove();
+ }
+ delete _finalFile;
+}
+
+void PVSIncomingMulticastTransfer::updatePort(ushort port)
+{
+ _config->multicastUDPPortBase(port);
+ _config->multicastSPort(port);
+ _config->multicastDPort(port);
+}
+
+void PVSIncomingMulticastTransfer::receiverProgressed(quint64 bytes)
+{
+ _bytes = bytes;
+}
+
+void PVSIncomingMulticastTransfer::receiverFinished(int how)
+{
+ switch(how)
+ {
+ case McastReceiver::RES_OK:
+ emit finished(_transferID);
+ break;
+ case McastReceiver::RES_ABORTED:
+ emit failed(_transferID, tr("Aborted"));
+ break;
+ case McastReceiver::RES_MD5_MISMATCH:
+ case McastReceiver::RES_CHECKSUM_MISMATCH:
+ emit failed(_transferID, tr("Unrecoverable data corruption"));
+ break;
+ case McastReceiver::RES_CONNECTION_RESET:
+ emit failed(_transferID, tr("Connection was reset"));
+ break;
+ case McastReceiver::RES_OFFSET_MISMATCH:
+ emit failed(_transferID, tr("Unrecoverable data loss. Try a lower transfer rate"));
+ break;
+ }
+}
+
+void PVSIncomingMulticastTransfer::updateProgress()
+{
+ if (!_started)
+ {
+ emit started(_transferID);
+ }
+ emit progress(_transferID, _bytes, _size);
+}
diff --git a/src/net/pvsIncomingMulticastTransfer.h b/src/net/pvsIncomingMulticastTransfer.h
new file mode 100644
index 0000000..9a33348
--- /dev/null
+++ b/src/net/pvsIncomingMulticastTransfer.h
@@ -0,0 +1,71 @@
+/*
+# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg
+#
+# This program is free software distributed under the GPL version 2.
+# See http://openslx.org/COPYING
+#
+# If you have any feedback please consult http://openslx.org/feedback and
+# send your suggestions, praise, or complaints to feedback@openslx.org
+#
+# General information about OpenSLX can be found at http://openslx.org/
+# -----------------------------------------------------------------------------
+# src/net/pcsIncomingMulticastTransfer.h
+# - wrap McastReceiver functionality in PVS daemon
+# -----------------------------------------------------------------------------
+*/
+
+#ifndef PVSINCOMINGMULTICASTTRANSFER_H_
+#define PVSINCOMINGMULTICASTTRANSFER_H_
+
+#include <QObject>
+#include <QString>
+
+#include <src/net/pvsMsg.h>
+
+class McastConfiguration;
+class McastReceiver;
+class QFile;
+class QTimer;
+
+class PVSIncomingMulticastTransfer : public QObject
+{
+ Q_OBJECT
+public:
+ PVSIncomingMulticastTransfer(QString const& sender, qulonglong transferID, qulonglong size, ushort port,
+ McastConfiguration const* configTemplate, QObject* parent = 0);
+ virtual ~PVSIncomingMulticastTransfer();
+
+ void setFinalFile(QString const& filename);
+
+signals:
+ void retry(QString const& sender, qulonglong transferID);
+ void started(qulonglong transferID);
+ void progress(qulonglong transferID, qulonglong bytes, qulonglong of);
+ void finished(qulonglong transferID);
+ void failed(qulonglong transferID, QString const& reason);
+
+public slots:
+ void updatePort(ushort port);
+ bool start();
+ void abort();
+
+private slots:
+ void receiverProgressed(quint64 bytes);
+ void receiverFinished(int reason);
+ void updateProgress();
+
+private:
+ QString _sender;
+ qulonglong _transferID;
+ qulonglong _bytes;
+ qulonglong _size;
+ ushort _port;
+ QFile* _temporaryFile;
+ QFile* _finalFile;
+ McastReceiver* _receiver;
+ McastConfiguration* _config;
+ bool _started;
+ QTimer* _progressTimer;
+};
+
+#endif /* PVSINCOMINGMULTICASTTRANSFER_H_ */
diff --git a/src/pvs.cpp b/src/pvs.cpp
index af5958e..8f06f74 100644
--- a/src/pvs.cpp
+++ b/src/pvs.cpp
@@ -17,6 +17,7 @@
#include "src/net/pvsDiscoveredServer.h"
#include "src/net/mcast/McastConfiguration.h"
#include "src/net/pvsOutgoingMulticastTransfer.h"
+#include "src/net/pvsIncomingMulticastTransfer.h"
// D-Bus
#include "pvsadaptor.h"
@@ -190,6 +191,45 @@ void PVS::onCommand(PVSMsg cmdMessage)
transfer->retry();
}
}
+ if (ident.compare("MCASTFTANNOUNCE") == 0)
+ {
+ QStringList fields = message.split(':');
+ bool ok;
+ QString sender;
+ qulonglong transferID;
+ QString basename;
+ qulonglong size;
+ ushort port;
+
+ if (!fields.size() == 5)
+ {
+ goto malformedAnnounce;
+ }
+ sender = fields[0];
+ transferID = fields[1].toULongLong(&ok);
+ if (!ok)
+ {
+ goto malformedAnnounce;
+ }
+ basename = fields[2];
+ size = fields[3].toULongLong(&ok);
+ if (!ok)
+ {
+ goto malformedAnnounce;
+ }
+ port = fields[4].toUShort(&ok);
+ if (!ok)
+ {
+ goto malformedAnnounce;
+ }
+
+ onIncomingMulticastTransfer(sender, transferID, basename, size, port);
+ return;
+
+ malformedAnnounce:
+ qDebug() << "Ignoring malformed MCASTFTANNOUNCE command: " << message;
+ return;
+ }
#ifdef never
// prototype
@@ -676,6 +716,39 @@ void PVS::cancelOutgoingMulticastTransfer(quint64 transferID)
}
}
+void PVS::onIncomingMulticastTransfer(QString const& sender, qulonglong transferID,
+ QString const& basename, qulonglong size, ushort port)
+{
+ PVSIncomingMulticastTransfer* transfer;
+ if (_incomingTransfers.value(transferID, 0))
+ {
+ transfer->updatePort(port);
+ QTimer::singleShot(0, transfer, SLOT(start()));
+ }
+ else
+ {
+ transfer = new PVSIncomingMulticastTransfer(sender, transferID, size, port, _masterMcastConfig, this);
+ _incomingTransfers.insert(transferID, transfer);
+
+ connect(transfer, SIGNAL(retry(QString const&, qulonglong)), SLOT(onIncomingMulticastTransferRetry(QString const&, qulonglong)));
+ connect(transfer, SIGNAL(started(qulonglong)), SIGNAL(incomingMulticastTransferStarted(qulonglong)));
+ connect(transfer, SIGNAL(progress(qulonglong, qulonglong, qulonglong)), SIGNAL(incomingMulticastTransferProgress(qulonglong, qulonglong, qulonglong)));
+ connect(transfer, SIGNAL(finished(qulonglong)), SIGNAL(incomingMulticastTransferFinished(qulonglong)));
+ connect(transfer, SIGNAL(failed(qulonglong, QString const&)), SIGNAL(incomingMulticastTransferFailed(qulonglong, QString)));
+ connect(transfer, SIGNAL(finished(qulonglong)), SLOT(incomingMulticastTransferDelete(qulonglong)));
+ connect(transfer, SIGNAL(failed(qulonglong, QString const&)), SLOT(incomingMulticastTransferDelete(qulonglong)));
+
+ emit incomingMulticastTransferNew(transferID, sender, basename, size);
+ QTimer::singleShot(0, transfer, SLOT(start()));
+ }
+}
+
+void PVS::onIncomingMulticastTransferRetry(QString const& sender, qulonglong transferID)
+{
+ PVSMsg retryMessage(PVSCOMMAND, "MCASTFTRETRY", QString("%1:%2").arg(sender).arg(transferID));
+ _pvsServerConnection->sendMessage(retryMessage);
+}
+
quint64 PVS::generateMcastTransferID()
{
static quint64 nodeID = 0;
@@ -698,3 +771,15 @@ void PVS::outgoingMulticastTransferDelete(qulonglong transferID)
_outgoingTransfers.remove(transferID);
transfer->deleteLater();
}
+
+void PVS::incomingMulticastTransferDelete(qulonglong transferID)
+{
+ PVSIncomingMulticastTransfer* transfer = _incomingTransfers.value(transferID, 0);
+ if (!transfer)
+ {
+ return;
+ }
+
+ _incomingTransfers.remove(transferID);
+ transfer->deleteLater();
+}
diff --git a/src/pvs.h b/src/pvs.h
index 4e07fd8..dc272f0 100644
--- a/src/pvs.h
+++ b/src/pvs.h
@@ -30,6 +30,7 @@ class PVSServiceDiscovery;
class PVSDiscoveredServer;
class McastConfiguration;
class PVSOutgoingMulticastTransfer;
+class PVSIncomingMulticastTransfer;
/**
* PVSClient
@@ -105,6 +106,11 @@ Q_SIGNALS:
void outgoingMulticastTransferProgress(qulonglong transferID, qulonglong bytes, qulonglong of);
void outgoingMulticastTransferFinished(qulonglong transferID);
void outgoingMulticastTransferFailed(qulonglong transferID, QString reason);
+ void incomingMulticastTransferNew(qulonglong transferID, QString sender, QString basename, qulonglong size);
+ void incomingMulticastTransferStarted(qulonglong transferID);
+ void incomingMulticastTransferProgress(qulonglong transferID, qulonglong bytes, qulonglong of);
+ void incomingMulticastTransferFinished(qulonglong transferID);
+ void incomingMulticastTransferFailed(qulonglong transferID, QString reason);
protected:
@@ -157,10 +163,15 @@ private:
McastConfiguration* _masterMcastConfig;
QHash<quint64, PVSOutgoingMulticastTransfer*> _outgoingTransfers;
+ QHash<quint64, PVSIncomingMulticastTransfer*> _incomingTransfers;
+ void onIncomingMulticastTransfer(QString const& sender, qulonglong transferID, QString const& basename, qulonglong size, ushort port);
+ void onIncomingMulticastTransferRetry(QString const& sender, qulonglong transferID);
static quint64 generateMcastTransferID();
+
private Q_SLOTS:
// housekeeping
void outgoingMulticastTransferDelete(qulonglong transferID);
+ void incomingMulticastTransferDelete(qulonglong transferID);
};
#endif /* PVSCLIENT_H_ */