diff options
-rw-r--r-- | CMakeLists.txt | 2 | ||||
-rw-r--r-- | src/net/mcast/McastReceiver.cpp | 49 | ||||
-rw-r--r-- | src/net/mcast/McastReceiver.h | 5 | ||||
-rw-r--r-- | src/net/pvsIncomingMulticastTransfer.cpp | 134 | ||||
-rw-r--r-- | src/net/pvsIncomingMulticastTransfer.h | 71 | ||||
-rw-r--r-- | src/pvs.cpp | 85 | ||||
-rw-r--r-- | src/pvs.h | 11 |
7 files changed, 351 insertions, 6 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 93e281e..3f2089a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -100,6 +100,7 @@ SET( PVS_SRCS src/util/TextFile.cpp src/util/serviceDiscoveryUtil.cpp src/net/pvsOutgoingMulticastTransfer.cpp + src/net/pvsIncomingMulticastTransfer.cpp ) # pvsgui @@ -194,6 +195,7 @@ SET( PVS_MOC_HDRS src/net/pvsServiceDiscovery.h src/net/pvsDiscoveredServer.h src/net/pvsOutgoingMulticastTransfer.h + src/net/pvsIncomingMulticastTransfer.h ) SET( PVSGUI_MOC_HDRS diff --git a/src/net/mcast/McastReceiver.cpp b/src/net/mcast/McastReceiver.cpp index 6070208..1f27127 100644 --- a/src/net/mcast/McastReceiver.cpp +++ b/src/net/mcast/McastReceiver.cpp @@ -27,7 +27,7 @@ McastReceiver::McastReceiver(QIODevice* iodev, McastConfiguration* config, QObject* parent) : QObject(parent), - _config(config ? new McastConfiguration(*config) : new McastConfiguration()), + _config(config ? new McastConfiguration(*config) : 0), _socket(0), _curoffs(0), _closed(false), @@ -43,13 +43,52 @@ McastReceiver::~McastReceiver() delete _config; } -void McastReceiver::start() +void McastReceiver::config(McastConfiguration const* config) { + if (_config) + delete _config; + _config = new McastConfiguration(*config, this); +} + +bool McastReceiver::start() +{ + McastConfiguration *config = _config; + if (!config) + config = new McastConfiguration(); + + if (_socket) + { + delete _socket; + } _socket = new McastPGMSocket(this); - connect(_socket, SIGNAL(receivedPacket(QByteArray)), this, SLOT(receivedPacket(QByteArray))); - connect(_socket, SIGNAL(connectionReset()), this, SLOT(connectionReset())); + + connect(_socket, SIGNAL(receivedPacket(QByteArray)), SLOT(receivedPacket(QByteArray))); + connect(_socket, SIGNAL(connectionReset()), SLOT(connectionReset())); // connect(_socket, SIGNAL(connectionFinished()), this, SLOT(connectionFinished())); - _socket->open(_config, McastPGMSocket::PSOCK_READ); + if (_socket->open(_config, McastPGMSocket::PSOCK_READ)) + { + return true; + } + else + { + disconnect(_socket, SIGNAL(receivedPacket(QByteArray)), this, SLOT(receivedPacket(QByteArray))); + disconnect(_socket, SIGNAL(connectionReset()), this, SLOT(connectionReset())); + return false; + } +} + +void McastReceiver::abort() +{ + if (_socket) + { + delete _socket; + _socket = 0; + } + + if (_iodev) + { + _iodev->close(); + } } void McastReceiver::receivedPacket(QByteArray const& bytes) diff --git a/src/net/mcast/McastReceiver.h b/src/net/mcast/McastReceiver.h index 48ff7c5..247733d 100644 --- a/src/net/mcast/McastReceiver.h +++ b/src/net/mcast/McastReceiver.h @@ -47,6 +47,8 @@ public: return _config; } + void config(McastConfiguration const* config); + static inline bool is_error(Result result) { return result != RES_OK; @@ -57,7 +59,8 @@ signals: void progress(quint64 offset); public slots: - void start(); + bool start(); + void abort(); private: McastConfiguration* _config; diff --git a/src/net/pvsIncomingMulticastTransfer.cpp b/src/net/pvsIncomingMulticastTransfer.cpp new file mode 100644 index 0000000..01507a9 --- /dev/null +++ b/src/net/pvsIncomingMulticastTransfer.cpp @@ -0,0 +1,134 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/pcsIncomingMulticastTransfer.h +# - wrap McastReceiver functionality in PVS daemon +# ----------------------------------------------------------------------------- +*/ + +#include <QDir> +#include <QTemporaryFile> +#include <QTimer> + +#include "pvsIncomingMulticastTransfer.h" +#include <src/net/mcast/McastReceiver.h> + +PVSIncomingMulticastTransfer::PVSIncomingMulticastTransfer(QString const& sender, qulonglong transferID, qulonglong size, + ushort port, McastConfiguration const* configTemplate, QObject* parent) : + QObject(parent), + _sender(sender), + _transferID(transferID), + _bytes(0), + _size(size), + _port(port), + _temporaryFile(new QTemporaryFile(QFileInfo(QDir::homePath(), "incoming.mcastft.XXXXXX").absolutePath(), this)), + _finalFile(0), + _receiver(0), + _config(configTemplate ? + new McastConfiguration(*configTemplate) : + new McastConfiguration()), + _progressTimer(new QTimer(this)) +{ + _config->multicastUDPPortBase(port); + _config->multicastDPort(port); + _config->multicastSPort(port); + + connect(_progressTimer, SIGNAL(timeout()), SLOT(updateProgress())); +} + +PVSIncomingMulticastTransfer::~PVSIncomingMulticastTransfer() +{ + // TODO Auto-generated destructor stub +} + +bool PVSIncomingMulticastTransfer::start() +{ + QFile *dest = _finalFile ? _finalFile : _temporaryFile; + _receiver = new McastReceiver(dest, new McastConfiguration(*_config), this); + connect(_receiver, SIGNAL(finished(int)), SLOT(receiverFinished(int))); + connect(_receiver, SIGNAL(progress(quint64)), SLOT(receiverProgress(quint64))); + + if (!_receiver->start()) + { + emit retry(_sender, _transferID); + return false; + } + else + { + _progressTimer->start(333); + return true; + } +} + +void PVSIncomingMulticastTransfer::abort() +{ + delete _receiver; + _receiver = 0; + + delete _progressTimer; + _progressTimer = 0; + + if (_temporaryFile) + { + _temporaryFile->remove(); + } + delete _temporaryFile; + + if (_finalFile) + { + _finalFile->remove(); + } + delete _finalFile; +} + +void PVSIncomingMulticastTransfer::updatePort(ushort port) +{ + _config->multicastUDPPortBase(port); + _config->multicastSPort(port); + _config->multicastDPort(port); +} + +void PVSIncomingMulticastTransfer::receiverProgressed(quint64 bytes) +{ + _bytes = bytes; +} + +void PVSIncomingMulticastTransfer::receiverFinished(int how) +{ + switch(how) + { + case McastReceiver::RES_OK: + emit finished(_transferID); + break; + case McastReceiver::RES_ABORTED: + emit failed(_transferID, tr("Aborted")); + break; + case McastReceiver::RES_MD5_MISMATCH: + case McastReceiver::RES_CHECKSUM_MISMATCH: + emit failed(_transferID, tr("Unrecoverable data corruption")); + break; + case McastReceiver::RES_CONNECTION_RESET: + emit failed(_transferID, tr("Connection was reset")); + break; + case McastReceiver::RES_OFFSET_MISMATCH: + emit failed(_transferID, tr("Unrecoverable data loss. Try a lower transfer rate")); + break; + } +} + +void PVSIncomingMulticastTransfer::updateProgress() +{ + if (!_started) + { + emit started(_transferID); + } + emit progress(_transferID, _bytes, _size); +} diff --git a/src/net/pvsIncomingMulticastTransfer.h b/src/net/pvsIncomingMulticastTransfer.h new file mode 100644 index 0000000..9a33348 --- /dev/null +++ b/src/net/pvsIncomingMulticastTransfer.h @@ -0,0 +1,71 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/pcsIncomingMulticastTransfer.h +# - wrap McastReceiver functionality in PVS daemon +# ----------------------------------------------------------------------------- +*/ + +#ifndef PVSINCOMINGMULTICASTTRANSFER_H_ +#define PVSINCOMINGMULTICASTTRANSFER_H_ + +#include <QObject> +#include <QString> + +#include <src/net/pvsMsg.h> + +class McastConfiguration; +class McastReceiver; +class QFile; +class QTimer; + +class PVSIncomingMulticastTransfer : public QObject +{ + Q_OBJECT +public: + PVSIncomingMulticastTransfer(QString const& sender, qulonglong transferID, qulonglong size, ushort port, + McastConfiguration const* configTemplate, QObject* parent = 0); + virtual ~PVSIncomingMulticastTransfer(); + + void setFinalFile(QString const& filename); + +signals: + void retry(QString const& sender, qulonglong transferID); + void started(qulonglong transferID); + void progress(qulonglong transferID, qulonglong bytes, qulonglong of); + void finished(qulonglong transferID); + void failed(qulonglong transferID, QString const& reason); + +public slots: + void updatePort(ushort port); + bool start(); + void abort(); + +private slots: + void receiverProgressed(quint64 bytes); + void receiverFinished(int reason); + void updateProgress(); + +private: + QString _sender; + qulonglong _transferID; + qulonglong _bytes; + qulonglong _size; + ushort _port; + QFile* _temporaryFile; + QFile* _finalFile; + McastReceiver* _receiver; + McastConfiguration* _config; + bool _started; + QTimer* _progressTimer; +}; + +#endif /* PVSINCOMINGMULTICASTTRANSFER_H_ */ diff --git a/src/pvs.cpp b/src/pvs.cpp index af5958e..8f06f74 100644 --- a/src/pvs.cpp +++ b/src/pvs.cpp @@ -17,6 +17,7 @@ #include "src/net/pvsDiscoveredServer.h" #include "src/net/mcast/McastConfiguration.h" #include "src/net/pvsOutgoingMulticastTransfer.h" +#include "src/net/pvsIncomingMulticastTransfer.h" // D-Bus #include "pvsadaptor.h" @@ -190,6 +191,45 @@ void PVS::onCommand(PVSMsg cmdMessage) transfer->retry(); } } + if (ident.compare("MCASTFTANNOUNCE") == 0) + { + QStringList fields = message.split(':'); + bool ok; + QString sender; + qulonglong transferID; + QString basename; + qulonglong size; + ushort port; + + if (!fields.size() == 5) + { + goto malformedAnnounce; + } + sender = fields[0]; + transferID = fields[1].toULongLong(&ok); + if (!ok) + { + goto malformedAnnounce; + } + basename = fields[2]; + size = fields[3].toULongLong(&ok); + if (!ok) + { + goto malformedAnnounce; + } + port = fields[4].toUShort(&ok); + if (!ok) + { + goto malformedAnnounce; + } + + onIncomingMulticastTransfer(sender, transferID, basename, size, port); + return; + + malformedAnnounce: + qDebug() << "Ignoring malformed MCASTFTANNOUNCE command: " << message; + return; + } #ifdef never // prototype @@ -676,6 +716,39 @@ void PVS::cancelOutgoingMulticastTransfer(quint64 transferID) } } +void PVS::onIncomingMulticastTransfer(QString const& sender, qulonglong transferID, + QString const& basename, qulonglong size, ushort port) +{ + PVSIncomingMulticastTransfer* transfer; + if (_incomingTransfers.value(transferID, 0)) + { + transfer->updatePort(port); + QTimer::singleShot(0, transfer, SLOT(start())); + } + else + { + transfer = new PVSIncomingMulticastTransfer(sender, transferID, size, port, _masterMcastConfig, this); + _incomingTransfers.insert(transferID, transfer); + + connect(transfer, SIGNAL(retry(QString const&, qulonglong)), SLOT(onIncomingMulticastTransferRetry(QString const&, qulonglong))); + connect(transfer, SIGNAL(started(qulonglong)), SIGNAL(incomingMulticastTransferStarted(qulonglong))); + connect(transfer, SIGNAL(progress(qulonglong, qulonglong, qulonglong)), SIGNAL(incomingMulticastTransferProgress(qulonglong, qulonglong, qulonglong))); + connect(transfer, SIGNAL(finished(qulonglong)), SIGNAL(incomingMulticastTransferFinished(qulonglong))); + connect(transfer, SIGNAL(failed(qulonglong, QString const&)), SIGNAL(incomingMulticastTransferFailed(qulonglong, QString))); + connect(transfer, SIGNAL(finished(qulonglong)), SLOT(incomingMulticastTransferDelete(qulonglong))); + connect(transfer, SIGNAL(failed(qulonglong, QString const&)), SLOT(incomingMulticastTransferDelete(qulonglong))); + + emit incomingMulticastTransferNew(transferID, sender, basename, size); + QTimer::singleShot(0, transfer, SLOT(start())); + } +} + +void PVS::onIncomingMulticastTransferRetry(QString const& sender, qulonglong transferID) +{ + PVSMsg retryMessage(PVSCOMMAND, "MCASTFTRETRY", QString("%1:%2").arg(sender).arg(transferID)); + _pvsServerConnection->sendMessage(retryMessage); +} + quint64 PVS::generateMcastTransferID() { static quint64 nodeID = 0; @@ -698,3 +771,15 @@ void PVS::outgoingMulticastTransferDelete(qulonglong transferID) _outgoingTransfers.remove(transferID); transfer->deleteLater(); } + +void PVS::incomingMulticastTransferDelete(qulonglong transferID) +{ + PVSIncomingMulticastTransfer* transfer = _incomingTransfers.value(transferID, 0); + if (!transfer) + { + return; + } + + _incomingTransfers.remove(transferID); + transfer->deleteLater(); +} @@ -30,6 +30,7 @@ class PVSServiceDiscovery; class PVSDiscoveredServer; class McastConfiguration; class PVSOutgoingMulticastTransfer; +class PVSIncomingMulticastTransfer; /** * PVSClient @@ -105,6 +106,11 @@ Q_SIGNALS: void outgoingMulticastTransferProgress(qulonglong transferID, qulonglong bytes, qulonglong of); void outgoingMulticastTransferFinished(qulonglong transferID); void outgoingMulticastTransferFailed(qulonglong transferID, QString reason); + void incomingMulticastTransferNew(qulonglong transferID, QString sender, QString basename, qulonglong size); + void incomingMulticastTransferStarted(qulonglong transferID); + void incomingMulticastTransferProgress(qulonglong transferID, qulonglong bytes, qulonglong of); + void incomingMulticastTransferFinished(qulonglong transferID); + void incomingMulticastTransferFailed(qulonglong transferID, QString reason); protected: @@ -157,10 +163,15 @@ private: McastConfiguration* _masterMcastConfig; QHash<quint64, PVSOutgoingMulticastTransfer*> _outgoingTransfers; + QHash<quint64, PVSIncomingMulticastTransfer*> _incomingTransfers; + void onIncomingMulticastTransfer(QString const& sender, qulonglong transferID, QString const& basename, qulonglong size, ushort port); + void onIncomingMulticastTransferRetry(QString const& sender, qulonglong transferID); static quint64 generateMcastTransferID(); + private Q_SLOTS: // housekeeping void outgoingMulticastTransferDelete(qulonglong transferID); + void incomingMulticastTransferDelete(qulonglong transferID); }; #endif /* PVSCLIENT_H_ */ |