diff options
| author | Fabian Schillinger | 2010-11-01 17:35:27 +0100 |
|---|---|---|
| committer | Fabian Schillinger | 2010-11-01 17:35:27 +0100 |
| commit | ea3fb17345e5f82db9f2e98a8062e95797700ace (patch) | |
| tree | 1da0d1a8ec9455364386af78762d0f6fed187824 /src/net | |
| parent | Process start/stop/view functionality (diff) | |
| parent | [PVSGUI] No X required for --help and --version (diff) | |
| download | pvs-ea3fb17345e5f82db9f2e98a8062e95797700ace.tar.gz pvs-ea3fb17345e5f82db9f2e98a8062e95797700ace.tar.xz pvs-ea3fb17345e5f82db9f2e98a8062e95797700ace.zip | |
Merge branch 'master' of openslx.org:pvs
Conflicts:
CMakeLists.txt
src/core/pvsConnectionManager.cpp
src/pvs.cpp
src/pvs.h
Diffstat (limited to 'src/net')
26 files changed, 2846 insertions, 3 deletions
diff --git a/src/net/mcast/CMakeLists.txt b/src/net/mcast/CMakeLists.txt new file mode 100644 index 0000000..e92b090 --- /dev/null +++ b/src/net/mcast/CMakeLists.txt @@ -0,0 +1,61 @@ +INCLUDE(../../../OpenPGMConfig.cmake) + +ADD_DEFINITIONS( + ${LIBPGM_CXXFLAGS} + -D__STDC_CONSTANT_MACROS + -D__STDC_LIMIT_MACROS +) + +# OpenPGM uses the C99 restrict keyword which g++ does not recognize: +#IF(CMAKE_COMPILER_IS_GNUCXX) +# ADD_DEFINITIONS(${LIBPGM_CXXFLAGS}) +#ENDIF(CMAKE_COMPILER_IS_GNUCXX) + +INCLUDE(${QT_USE_FILE}) + +SET(pvsmcast_MOC_HDRS + McastConfiguration.h + McastPGMSocket.h + McastReceiver.h + McastSender.h +) + +SET(pvsmcast_HDRS + McastConfiguration.h + McastPGMSocket.h + McastReceiver.h + McastSender.h +) + +SET(pvsmcast_SRCS + McastConfiguration.cpp + McastPGMSocket.cpp + McastReceiver.cpp + McastSender.cpp +) + +QT4_WRAP_CPP( + pvsmcast_MOC_SRCS + ${pvsmcast_MOC_HDRS} +) + +SET_SOURCE_FILES_PROPERTIES(${pvsmcast_SRCS} ${pvsmcast_MOC_SRCS} + PROPERTIES + OBJECT_DEPENDS "3rdparty/libpgm.a" # Make sure libpgm gets unpacked before building C++ files +) + +ADD_LIBRARY( + pvsmcast + STATIC + ${pvsmcast_HDRS} + ${pvsmcast_SRCS} + ${pvsmcast_MOC_SRCS} +) + +TARGET_LINK_LIBRARIES( + pvsmcast + pgm + ${QT_LIBRARIES} +) + +ADD_SUBDIRECTORY(trial_programs) diff --git a/src/net/mcast/McastConfiguration.cpp b/src/net/mcast/McastConfiguration.cpp new file mode 100644 index 0000000..6c5e620 --- /dev/null +++ b/src/net/mcast/McastConfiguration.cpp @@ -0,0 +1,57 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastConfiguration.cpp +# - hold Multicast protocol configuration data +# ----------------------------------------------------------------------------- +*/ + +#include <QSettings> + +#include "McastConfiguration.h" + +void McastConfiguration::loadFrom(QSettings* _settings, char const* group) +{ + if (group) + _settings->beginGroup(group); + + _multicastAddress = _settings->value("groupAddress", DEFAULT_MULTICAST_ADDRESS).toString(); + _multicastInterface = _settings->value("interface", DEFAULT_MULTICAST_INTERFACE).toString(); + _multicastMTU = _settings->value("mtu", DEFAULT_MULTICAST_MTU).value<quint16>(); + _multicastRate = _settings->value("rate", DEFAULT_MULTICAST_RATE).value<quint32>(); + _multicastUseUDP = _settings->value("use-udp", DEFAULT_MULTICAST_USEUDP).toBool(); + _multicastWinSize = _settings->value("winsize", DEFAULT_MULTICAST_WSIZ).value<quint16>(); + _multicastUDPPortBase = _settings->value("portbase", DEFAULT_MULTICAST_UDPPORT).value<quint16>(); + _multicastDPort = _settings->value("dport", DEFAULT_MULTICAST_DPORT).value<quint16>(); + _multicastSPort = _settings->value("sport", DEFAULT_MULTICAST_SPORT).value<quint16>(); + + if (group) + _settings->endGroup(); +} + +void McastConfiguration::writeTo(QSettings* _settings, char const* group) const +{ + if (group) + _settings->beginGroup(group); + + _settings->setValue("groupAddress", _multicastAddress); + _settings->setValue("interface", _multicastInterface); + _settings->setValue("mtu", _multicastMTU); + _settings->setValue("rate", _multicastRate); + _settings->setValue("use-udp", _multicastUseUDP); + _settings->setValue("winsize", _multicastWinSize); + _settings->setValue("portbase", _multicastUDPPortBase); + _settings->setValue("dport", _multicastDPort); + _settings->setValue("sport", _multicastSPort); + + if (group) + _settings->endGroup(); +} diff --git a/src/net/mcast/McastConfiguration.h b/src/net/mcast/McastConfiguration.h new file mode 100644 index 0000000..53f7a54 --- /dev/null +++ b/src/net/mcast/McastConfiguration.h @@ -0,0 +1,204 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastConfiguration.h +# - hold Multicast protocol configuration data +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTCONFIGURATION_H_ +#define MCASTCONFIGURATION_H_ + +#include <QObject> +#include <QString> +#include <QtGlobal> + +#include "McastConstants.h" + +class QSettings; + +class McastConfiguration: public QObject +{ + Q_OBJECT +public: + McastConfiguration(QObject* parent = 0) : + QObject(parent), + _multicastInterface(DEFAULT_MULTICAST_INTERFACE), + _multicastAddress(DEFAULT_MULTICAST_ADDRESS), + _multicastRate(DEFAULT_MULTICAST_RATE), + _multicastSPort(DEFAULT_MULTICAST_SPORT), + _multicastDPort(DEFAULT_MULTICAST_DPORT), + _multicastWinSize(DEFAULT_MULTICAST_WSIZ), + _multicastMTU(DEFAULT_MULTICAST_MTU), + _multicastUDPPortBase(DEFAULT_MULTICAST_UDPPORT), + _multicastUseUDP(DEFAULT_MULTICAST_USEUDP) + { + } + + McastConfiguration(McastConfiguration const& other, QObject* parent = 0) : + QObject(parent), + _multicastInterface(other._multicastInterface), + _multicastAddress(other._multicastAddress), + _multicastRate(other._multicastRate), + _multicastSPort(other._multicastSPort), + _multicastDPort(other._multicastDPort), + _multicastWinSize(other._multicastWinSize), + _multicastMTU(other._multicastMTU), + _multicastUDPPortBase(other._multicastUDPPortBase), + _multicastUseUDP(other._multicastUseUDP) + { + } + + virtual ~McastConfiguration() + { + } + + QString multicastAddress() const + { + return _multicastAddress; + } + McastConfiguration* multicastAddress(QString const& address) + { + _multicastAddress = address; + return this; + } + + quint16 multicastSPort() const + { + return _multicastSPort; + } + McastConfiguration* multicastSPort(quint16 port) + { + _multicastSPort = port; + return this; + } + + quint16 multicastDPort() const + { + return _multicastDPort; + } + McastConfiguration* multicastDPort(quint16 port) + { + _multicastDPort = port; + return this; + } + + quint32 multicastRate() const + { + return _multicastRate; + } + McastConfiguration* multicastRate(quint32 rate) + { + _multicastRate = rate; + return this; + } + + quint16 multicastWinSize() const + { + return _multicastWinSize; + } + McastConfiguration* multicastWinSize(quint16 size) + { + _multicastWinSize = size; + return this; + } + + quint16 multicastMTU() const + { + return _multicastMTU; + } + McastConfiguration* multicastMTU(quint16 mtu) + { + _multicastMTU = mtu; + return this; + } + + bool multicastUseUDP() const + { + return _multicastUseUDP; + } + McastConfiguration* multicastUseUDP(bool useUDP) + { + _multicastUseUDP = useUDP; + return this; + } + + quint16 multicastUDPPortBase() const + { + return _multicastUDPPortBase; + } + McastConfiguration* multicastUDPPortBase(quint16 port) + { + _multicastUDPPortBase = port; + return this; + } + + QString multicastInterface() const + { + return _multicastInterface; + } + McastConfiguration* multicastInterface(QString const& interface) + { + _multicastInterface = interface; + return this; + } + + quint16 multicastUDPUPort() const + { + return _multicastUDPPortBase; + } + + quint16 multicastUDPMPort() const + { + return _multicastUDPPortBase; + } + + void commit() + { + emit changed(); + } + + McastConfiguration& operator=(McastConfiguration const& source) + { + if(this != &source) + { + _multicastInterface = source._multicastInterface; + _multicastAddress = source._multicastAddress; + _multicastRate = source._multicastRate; + _multicastSPort = source._multicastSPort; + _multicastDPort = source._multicastDPort; + _multicastWinSize = source._multicastWinSize; + _multicastMTU = source._multicastMTU; + _multicastUDPPortBase = source._multicastUDPPortBase; + _multicastUseUDP = source._multicastUseUDP; + } + return *this; + } + + void loadFrom(QSettings* settings, char const* group = 0); + void writeTo(QSettings* settings, char const* group = 0) const; + +signals: + void changed(); + +private: + QString _multicastInterface; + QString _multicastAddress; + quint32 _multicastRate; + quint16 _multicastSPort; + quint16 _multicastDPort; + quint16 _multicastWinSize; + quint16 _multicastMTU; + quint16 _multicastUDPPortBase; + bool _multicastUseUDP; +}; + +#endif /* MCASTCONFIGURATION_H_ */ diff --git a/src/net/mcast/McastConstants.h b/src/net/mcast/McastConstants.h new file mode 100644 index 0000000..624e195 --- /dev/null +++ b/src/net/mcast/McastConstants.h @@ -0,0 +1,36 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastMagic.h +# - Specify the magic numbers for the McastFT protocol +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTMAGIC_H_ +#define MCASTMAGIC_H_ + +#include <stdint.h> + +#define MCASTFT_MAGIC UINT64_C(0x6d60ad83825fb7f9) +#define DEFAULT_MULTICAST_INTERFACE "" +#define DEFAULT_MULTICAST_ADDRESS "239.255.220.207" +#define DEFAULT_MULTICAST_SPORT 6964 +#define DEFAULT_MULTICAST_DPORT 6965 +#define DEFAULT_MULTICAST_USEUDP true +#define DEFAULT_MULTICAST_UDPPORT 6966 +#define DEFAULT_MULTICAST_WSIZ 30 +#define DEFAULT_MULTICAST_RATE (100*1024) +#define DEFAULT_MULTICAST_MTU 1400 +#define DEFAULT_MULTICAST_APDU 1200 +#define DEFAULT_MULTICAST_CHUNK 1024 +#define DEFAULT_MULTICAST_SHUTDOWN_TIMEOUT 10000 + +#endif /* MCASTMAGIC_H_ */ diff --git a/src/net/mcast/McastPGMSocket.cpp b/src/net/mcast/McastPGMSocket.cpp new file mode 100644 index 0000000..731fc13 --- /dev/null +++ b/src/net/mcast/McastPGMSocket.cpp @@ -0,0 +1,666 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastPGMSocket.cpp +# - wrap OpenPGM Sockets in a nicer interface -- implementation +# ----------------------------------------------------------------------------- +*/ + +#include <cstdlib> + +#include <sys/poll.h> +#include <sys/socket.h> + +#include <QByteArray> +#include <QList> +#include <QtDebug> +#include <QPointer> +#include <QSocketNotifier> +#include <QTimer> + +#include <pgm/pgm.h> + +#include "McastPGMSocket.h" + +using namespace std; + +class McastPGMSocket_priv +{ +public: + McastPGMSocket_priv() : + socket(0), + send_notif(0) + { + } + ~McastPGMSocket_priv() + { + if (socket) + pgm_close(socket, 0); + Q_FOREACH(QSocketNotifier* notif, _notifs) + { + delete notif; + } + if (send_notif) + delete send_notif; + } + + pgm_sock_t* socket; + McastPGMSocket::Direction direction; + QSocketNotifier* send_notif; + QList<QSocketNotifier*> _notifs; + + QSocketNotifier* notifier_for(int fd) { + Q_FOREACH(QSocketNotifier* notif, _notifs) + { + if(notif->socket() == fd) + { + return notif; + } + } + return 0; + } +}; + +static void _ensurePGMInited() +{ + if (!pgm_supported()) + { + pgm_error_t* err; + int good = pgm_init(&err); + if (!good) + { + qCritical() << "Could not init OpenPGM library: PGM Error: " << (err->message ? err->message : "(null)"); + std::exit(1); + } + } +} + +McastPGMSocket::McastPGMSocket(QObject* parent) : + QObject(parent), + _priv(new McastPGMSocket_priv), + _opened(false), + _finished(false), + _nakTimeout(new QTimer()), + _dataTimeout(new QTimer()), + _sendTimeout(new QTimer()), + _shutdownTimer(0), + _shutdown_timeout(0) +{ + _ensurePGMInited(); + + _nakTimeout->setSingleShot(true); + _dataTimeout->setSingleShot(true); + _sendTimeout->setSingleShot(true); + + connect(_nakTimeout, SIGNAL(timeout()), this, SLOT(handleNakTimeout())); + connect(_dataTimeout, SIGNAL(timeout()), this, SLOT(handleDataTimeout())); + connect(_sendTimeout, SIGNAL(timeout()), this, SLOT(canSend())); +} + +McastPGMSocket::~McastPGMSocket() +{ + delete _priv; + delete _nakTimeout; + delete _dataTimeout; + delete _sendTimeout; +} + +bool McastPGMSocket::open(McastConfiguration const* config, Direction direction) +{ + _priv->direction = direction; + + pgm_error_t* err = 0; + int good; + + pgm_addrinfo_t* addrinfo; + // parse the address string + good = pgm_getaddrinfo((config->multicastInterface() + ";" + config->multicastAddress()).toLatin1().constData(), + 0, &addrinfo, &err); + if (!good) + { + qCritical() << "Could not parse address info: PGM Error: " + << err->message; + return false; + } + + sa_family_t family = addrinfo->ai_send_addrs[0].gsr_group.ss_family; + + if(config->multicastUseUDP()) + { + good = pgm_socket(&_priv->socket, family, SOCK_SEQPACKET, IPPROTO_UDP, &err); + } + else + { + good = pgm_socket(&_priv->socket, family, SOCK_SEQPACKET, IPPROTO_PGM, &err); + } + + if (!good) + { + qCritical() << "Could not open socket: PGM Error: " << err->message; + pgm_error_free(err); + return false; + } + + unsigned const ambient_spm = 2000 * 1000; // every one hundred milliseconds (approx.) + + /* Options for sending data */ + const int spm_heartbeat[] = + { 512 * 1000, + 1024 * 1000, + 2048 * 1000, + 4096 * 1000 }, + max_rate = 0, + max_window = config->multicastWinSize() * config->multicastRate() / config->multicastMTU(); + // const int max_window_sqns = 3000; + qDebug() << "Computed window size " << max_window << " packets"; + +// pgm_setsockopt(_priv->socket, PGM_SEND_ONLY, &send_only, +// sizeof(send_only)); + + // SPM messages + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_AMBIENT_SPM, &ambient_spm, sizeof(ambient_spm)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_HEARTBEAT_SPM, &spm_heartbeat, sizeof(spm_heartbeat)); + + // Transmit window + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_TXW_MAX_RTE, &max_rate, sizeof(max_rate)); +// pgm_setsockopt(_priv->socket, PGM_TXW_SECS, &max_window, sizeof(max_window)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_TXW_SQNS, &max_window, sizeof(max_window)); + + /* Options for receiving data */ + const int passive = 0, + spmr_expiry = 500 * 1000, + nak_bo_ivl = 200 * 1000, + nak_rpt_ivl = 500 * 1000, + nak_rdata_ivl = 500 * 1000, + nak_data_retries = 50, + nak_ncf_retries = 50; + qDebug() << "Computed window size " << max_window << " packets"; + +// pgm_setsockopt(_priv->socket, PGM_RECV_ONLY, &recv_only, sizeof(recv_only)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_PASSIVE, &passive, sizeof(passive)); +// pgm_setsockopt(_priv->socket, PGM_RXW_MAX_RTE, &max_rate, sizeof(max_rate)); +// pgm_setsockopt(_priv->socket, PGM_RXW_SECS, &max_window, sizeof(max_window)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_RXW_SQNS, &max_window, sizeof(max_window)); + + /* Try using PGMCC */ + const struct pgm_pgmccinfo_t pgmccinfo = { + 100 /* usecs */ * 1000 /* msecs */, + 75 /* from OpenPGM examples */, + 500 /* from PGMCC internet-draft */ + }; + good = pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_USE_PGMCC, &pgmccinfo, sizeof(pgmccinfo)); + if(!good) + { + qCritical() << "Could not enable PGMCC"; + return false; + } + +// /* Forward Error Correction */ +// const struct pgm_fecinfo_t pgmfecinfo = { +// 255 /* from OpenPGM examples */, +// 2 /* send two proactive packets */, +// 8 /* from OpenPGM examples */, +// 1 /* enable on-demand parity */, +// 1 /* enable variable packet length */ +// }; +// good = pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_USE_FEC, &pgmfecinfo, sizeof(pgmfecinfo)); +// if(!good) +// { +// qCritical() << "Could not enable FEC"; +// return false; +// } + + // Peer Expiry: We will give 1 minute. + int const peer_expiry = 60 /* seconds */ * 1000000 /* microseconds */; + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry)); + + // MTU + int const mtu = config->multicastMTU(); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MTU, &mtu, sizeof(mtu)); + + pgm_sockaddr_t addr; + addr.sa_addr.sport = config->multicastSPort(); + addr.sa_port = config->multicastDPort(); + good = pgm_gsi_create_from_hostname(&addr.sa_addr.gsi, &err); + if (!good) + { + qCritical() << "Could not generate a GSI: PGM Error: " << err->message; + pgm_error_free(err); + return false; + } + + struct pgm_interface_req_t ifreq; + ifreq.ir_interface = addrinfo->ai_send_addrs[0].gsr_interface; + ifreq.ir_scope_id = 0; + if (AF_INET6 == family) + { + ifreq.ir_scope_id = ((struct sockaddr_in6*)&addrinfo->ai_send_addrs[0])->sin6_scope_id; + } + + // UDP Encapsulation + if(config->multicastUseUDP()) + { + const int uport = config->multicastUDPUPort(); + const int mport = config->multicastUDPMPort(); + + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, &mport, sizeof(mport)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &uport, sizeof(uport)); + } + + good = pgm_bind3(_priv->socket, &addr, sizeof(addr), &ifreq , sizeof(ifreq), &ifreq, sizeof(ifreq), &err); + if (!good) + { + qCritical() << "Could not bind socket: PGM Error: " << err->message; + pgm_error_free(err); + return false; + } + + // qDebug() << "Max APDU is " << _priv->socket->max_apdu; + // qDebug() << "Max TPDU is " << _priv->socket->max_tpdu; + // qDebug() << "Max TSDU Fragment is " << _priv->socket->max_tsdu_fragment; + // qDebug() << "TXW_SQNS is " << _priv->socket->txw_sqns; + + // join the group + for (unsigned i = 0; i < addrinfo->ai_recv_addrs_len; i++) + { + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_JOIN_GROUP, + &addrinfo->ai_recv_addrs[i], sizeof(struct group_req)); + } + + // set send address + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_SEND_GROUP, &addrinfo->ai_send_addrs[0], + sizeof(struct group_req)); + + // IP parameters + const int nonblocking = 1, multicast_loop = 0, multicast_hops = 16; + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MULTICAST_LOOP, &multicast_loop, + sizeof(multicast_loop)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MULTICAST_HOPS, &multicast_hops, + sizeof(multicast_hops)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking, + sizeof(nonblocking)); + + good = pgm_connect(_priv->socket, &err); + if (!good) + { + qCritical() << "Could not connect socket: PGM Error: " << err->message; + pgm_error_free(err); + return false; + } + + _opened = true; + + setupNotifiers(); + + pgm_freeaddrinfo(addrinfo); + + /* Prime the generation of SPM packets during the waiting period */ + if(_priv->direction == PSOCK_WRITE) + QTimer::singleShot(0, this, SLOT(handleNak())); + + return true; +} + +void McastPGMSocket::setupNotifiers() +{ + int recv_sock, repair_sock, pending_sock; + char const* slotname = (_priv->direction == PSOCK_WRITE) ? SLOT(handleNak(int)) : SLOT(handleData(int)); + + struct pollfd pollin[10]; + int in_nfds = 10; + pgm_poll_info(_priv->socket, pollin, &in_nfds, POLLIN); + for(int i = 0; i < in_nfds; i++) + { + QSocketNotifier* notif = new QSocketNotifier(pollin[i].fd, QSocketNotifier::Read, this); + _priv->_notifs.append(notif); + connect(notif, SIGNAL(activated(int)), this, slotname); + } + + if(_priv->direction == PSOCK_WRITE) + { + struct pollfd pfd; + int nfds = 1; + pgm_poll_info(_priv->socket, &pfd, &nfds, POLLOUT); + _priv->send_notif = new QSocketNotifier(pfd.fd, QSocketNotifier::Write, this); + connect(_priv->send_notif, SIGNAL(activated(int)), this, SLOT(canSend())); + } +} + +void McastPGMSocket::handleNak(int fd) +{ + qDebug() << "handleNak(" << fd << ")"; + + QSocketNotifier* notif = _priv->notifier_for(fd); + notif->setEnabled(false); + + if (_shutdownTimer) + { + _shutdownTimer->start(_shutdown_timeout); + qDebug() << "Started shutdown timer"; + } + + handleNak(); + + notif->setEnabled(true); +} + +void McastPGMSocket::handleNak() +{ + if (_finished) + return; + + // QTimer::singleShot(1000, this, SLOT(handleNakTimeout())); + + // to handle NAKs in OpenPGM, we need to pgm_recv: + char buf[4096]; + pgm_error_t* err = 0; + + int status; + // while we don't block: + do + { + status = pgm_recv(_priv->socket, buf, sizeof(buf), MSG_DONTWAIT, 0, &err); + + if(status == PGM_IO_STATUS_TIMER_PENDING) + { + struct timeval tv; + socklen_t size = sizeof(tv); + pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &size); + const long usecs = tv.tv_sec * 1000000 + tv.tv_usec; + int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + if(msecs == 0) + msecs = 1; + qDebug() << " timer pending: " << usecs << "us (rounded to " << msecs << "ms)"; + _nakTimeout->start(msecs); + break; + } + else if(status == PGM_IO_STATUS_RATE_LIMITED) + { + struct timeval tv; + socklen_t size = sizeof(tv); + pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size); + int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + if(msecs == 0) + msecs = 1; + qDebug() << " rate limited: " << msecs << "ms"; + _nakTimeout->start(msecs); + break; + } + else if(status == PGM_IO_STATUS_WOULD_BLOCK) + { + qDebug() << " wouldblock"; + break; + } + else + { + if(err) + { + qCritical() << "Could not handle NAKs: PGM Error: " << err->message; + pgm_error_free(err); + err = 0; + } + } + } + while (true); +} + +void McastPGMSocket::handleNakTimeout() +{ + qDebug() << "handleNakTimeout()"; + + handleNak(); +} + +void McastPGMSocket::handleData(int fd) +{ + // need to guard against destruction in finish() via signals/slots + QPointer<QSocketNotifier> notif(_priv->notifier_for(fd)); + notif->setEnabled(false); + + handleData(); + + if (notif) + notif->setEnabled(true); +} + +void McastPGMSocket::handleData() +{ + if (_finished) { + return; + } + + int status; + do + { + char buf[4096]; + size_t size; + pgm_error_t* err = 0; + + status = pgm_recv(_priv->socket, buf, sizeof(buf), MSG_DONTWAIT, &size, &err); + + if (status == PGM_IO_STATUS_NORMAL) + { + qDebug() << " normally received"; + if(size > 0) + { + QByteArray bytes(buf, size); + emit receivedPacket(bytes); + } + } + else if (status == PGM_IO_STATUS_WOULD_BLOCK) + { + qDebug() << " would block"; + // nothing more to do this time + break; + } + else if (status == PGM_IO_STATUS_TIMER_PENDING) + { + struct timeval tv; + socklen_t size = sizeof(tv); + pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &size); + const long usecs = tv.tv_sec * 1000000 + tv.tv_usec; + int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + if(msecs == 0) + msecs = 1; + qDebug() << " timer pending: " << usecs << "us (rounded to " << msecs << "ms)"; + _dataTimeout->start(msecs); + break; + } + else if (status == PGM_IO_STATUS_RATE_LIMITED) + { + struct timeval tv; + socklen_t size = sizeof(tv); + pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size); + int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + if(msecs == 0) + msecs = 1; + qDebug() << " rate limit pending: " << msecs << "ms"; + _dataTimeout->start(msecs); + break; + } + else if (status == PGM_IO_STATUS_RESET) + { + qDebug() << " connection reset"; + emit connectionReset(); + qCritical() << "Connection Reset: PGM Error: " << (err ? err->message : "(null)"); + pgm_error_free(err); + break; + } + else if (status == PGM_IO_STATUS_FIN) + { + qDebug() << " connection finished"; + emit connectionFinished(); + break; + } + else + { + if(err) + { + qCritical() << "Could not read packet: PGM Error: " << (err ? err->message: "(null)"); + break; + } + } + + // the socket might have been closed from under us + if (!_priv->socket) + break; + } + while (true); +} + +void McastPGMSocket::handleDataTimeout() +{ + qDebug() << "handleDataTimeout()"; + + handleData(); +} + +void McastPGMSocket::canSend() +{ + if (_finished) + return; + + + if (_priv->send_notif) + { + _priv->send_notif->setEnabled(false); + } + + if(_q.isEmpty()) + { + emit readyToSend(); + } + else + { + QByteArray const packet(_q.head()); + int status; + + status = pgm_send(_priv->socket, packet.constData(), packet.size(), 0); + + if(status == PGM_IO_STATUS_NORMAL) + { + _q.dequeue(); + if(!_q.isEmpty()) + { + _priv->send_notif->setEnabled(true); + } + else + { + emit readyToSend(); + } + } + else if(status == PGM_IO_STATUS_WOULD_BLOCK) + { + _priv->send_notif->setEnabled(true); + } + else if(status == PGM_IO_STATUS_CONGESTION) + { + qDebug() << " congested..."; + // wait a short time (10ms?) + _sendTimeout->start(10); + } + else if(status == PGM_IO_STATUS_RATE_LIMITED) + { + struct timeval tv; + socklen_t size = sizeof(tv); + pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size); + int msecs = (tv.tv_sec * 1000) + ((tv.tv_usec + 999) / 1000); + if(msecs == 0) + msecs = 1; + qDebug() << " rate_limited, waiting" << msecs << "ms"; + _sendTimeout->start(msecs); + } + else + { + qCritical() << "Unhandled status in canSend():" << status; + } + } + + if (_shutdownTimer) + _shutdownTimer->start(_shutdown_timeout); +} + +void McastPGMSocket::sendPacket(QByteArray const& bytes) +{ + if(_shutdownTimer) + { + qCritical() << "Logic error: sendPacket() after shutdown()"; + } + + _q.enqueue(bytes); + _priv->send_notif->setEnabled(true); +} + +void McastPGMSocket::finish() +{ + if (_finished) + { + return; + } + + qDebug() << "finish()"; + + Q_FOREACH(QSocketNotifier* notif, _priv->_notifs) + { + notif->setEnabled(false); + delete notif; + } + _priv->_notifs.clear(); + + if(_priv->send_notif) + { + delete _priv->send_notif; + _priv->send_notif = 0; + } + + if (_priv->socket) + { + pgm_close(_priv->socket, 1); + _priv->socket = 0; + } + + _finished = true; + + emit connectionFinished(); + + qDebug() << "Socket finished"; +} + +bool McastPGMSocket::finished() const +{ + return _finished; +} + +bool McastPGMSocket::isOpen() const +{ + return _opened && !_finished; +} + +void McastPGMSocket::shutdown(int interval) +{ + if(_priv->direction == PSOCK_READ) + return; + + _shutdown_timeout = interval; + _shutdownTimer = new QTimer(this); + connect(_shutdownTimer, SIGNAL(timeout()), this, SLOT(finish())); + if (_q.isEmpty()) + { + _shutdownTimer->start(_shutdown_timeout); + qDebug() << "Started shutdown timer"; + } +} diff --git a/src/net/mcast/McastPGMSocket.h b/src/net/mcast/McastPGMSocket.h new file mode 100644 index 0000000..4ccf931 --- /dev/null +++ b/src/net/mcast/McastPGMSocket.h @@ -0,0 +1,81 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastPGMSocket.h +# - wrap OpenPGM Sockets in a nicer interface -- interface +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTPGMSOCKET_H_ +#define MCASTPGMSOCKET_H_ + +#include <QByteArray> +#include <QObject> +#include <QQueue> + +#include <src/net/mcast/McastConfiguration.h> +#include <src/net/mcast/McastConstants.h> + +class McastPGMSocket_priv; +class QTimer; + +class McastPGMSocket : public QObject +{ + Q_OBJECT +public: + enum Direction { + PSOCK_READ, + PSOCK_WRITE + }; + + McastPGMSocket(QObject* parent = 0); + virtual ~McastPGMSocket(); + + bool open(McastConfiguration const* config, Direction direction); + bool finished() const; + bool isOpen() const; + void shutdown(int interval = DEFAULT_MULTICAST_SHUTDOWN_TIMEOUT); + +signals: + void readyToSend(); + void receivedPacket(QByteArray const& bytes); + void connectionReset(); + void connectionFinished(); + void shutdownComplete(); + +public slots: + void sendPacket(QByteArray const& bytes); + void finish(); + +private slots: + void handleNak(int fd); + void handleData(int fd); + void handleNak(); + void handleData(); + void handleNakTimeout(); + void handleDataTimeout(); + void canSend(); + +private: + McastPGMSocket_priv* _priv; + QQueue<QByteArray> _q; + bool _finished; + bool _opened; + QTimer* _nakTimeout; + QTimer* _dataTimeout; + QTimer* _sendTimeout; + QTimer* _shutdownTimer; + int _shutdown_timeout; + + void setupNotifiers(); +}; + +#endif /* MCASTPGMSOCKET_H_ */ diff --git a/src/net/mcast/McastReceiver.cpp b/src/net/mcast/McastReceiver.cpp new file mode 100644 index 0000000..1f27127 --- /dev/null +++ b/src/net/mcast/McastReceiver.cpp @@ -0,0 +1,179 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastReceiver.h +# - implement the receiver-side multicast file transfer protocol -- implementation +# ----------------------------------------------------------------------------- +*/ + +#include <QDataStream> +#include <QtDebug> +#include <QtGlobal> + +#include <pgm/pgm.h> +// OpenPGM #defines bool. This is bad in C++. +#undef bool + +#include "McastConstants.h" +#include "McastReceiver.h" + +McastReceiver::McastReceiver(QIODevice* iodev, McastConfiguration* config, QObject* parent) : + QObject(parent), + _config(config ? new McastConfiguration(*config) : 0), + _socket(0), + _curoffs(0), + _closed(false), + _hash(QCryptographicHash::Md5), + _iodev(iodev) +{ + _config->setParent(this); +} + +McastReceiver::~McastReceiver() +{ + if (_config) + delete _config; +} + +void McastReceiver::config(McastConfiguration const* config) +{ + if (_config) + delete _config; + _config = new McastConfiguration(*config, this); +} + +bool McastReceiver::start() +{ + McastConfiguration *config = _config; + if (!config) + config = new McastConfiguration(); + + if (_socket) + { + delete _socket; + } + _socket = new McastPGMSocket(this); + + connect(_socket, SIGNAL(receivedPacket(QByteArray)), SLOT(receivedPacket(QByteArray))); + connect(_socket, SIGNAL(connectionReset()), SLOT(connectionReset())); + // connect(_socket, SIGNAL(connectionFinished()), this, SLOT(connectionFinished())); + if (_socket->open(_config, McastPGMSocket::PSOCK_READ)) + { + return true; + } + else + { + disconnect(_socket, SIGNAL(receivedPacket(QByteArray)), this, SLOT(receivedPacket(QByteArray))); + disconnect(_socket, SIGNAL(connectionReset()), this, SLOT(connectionReset())); + return false; + } +} + +void McastReceiver::abort() +{ + if (_socket) + { + delete _socket; + _socket = 0; + } + + if (_iodev) + { + _iodev->close(); + } +} + +void McastReceiver::receivedPacket(QByteArray const& bytes) +{ + if(_closed) + return; + + quint16 checksum_should = qChecksum(bytes.constData(), bytes.size() - 2); + + QDataStream strm(bytes); + strm.setByteOrder(QDataStream::BigEndian); + + // read the packet + quint64 magic; + quint64 offset; + quint16 checksum; + + + strm >> magic; + if(magic != MCASTFT_MAGIC) + { + qCritical() << "Received packet whose magic number does not match. Ignoring."; + return; + } + + strm >> offset; + qDebug() << " Received packet for offset" << offset; + + if (offset == UINT64_C(0xffffffffffffffff)) + { + // this is the end of the data stream. + QByteArray md5; + strm >> md5; + + quint16 fchecksum; + strm >> fchecksum; + + // compare the hash value + if ((fchecksum != checksum_should) || (md5 != _hash.result())) + { + _close(RES_MD5_MISMATCH); + } + else + { + _close(RES_OK); + } + + return; + } + else if (offset != _curoffs) + { + qCritical() << "Packet loss or double delivery. PGM should have prevented this. Bailing out."; + _close(RES_OFFSET_MISMATCH); + return; + } + + QByteArray contents; + strm >> contents; + _curoffs += contents.size(); + + strm >> checksum; + if(checksum != checksum_should) + { + qCritical() << "Checksum does not match. Bailing out."; + _close(RES_CHECKSUM_MISMATCH); + return; + } + + _hash.addData(contents); + + _iodev->write(contents); + + emit progress(_curoffs); +} + +void McastReceiver::connectionReset() +{ + _close(RES_CONNECTION_RESET); +} + +void McastReceiver::_close(Result result) +{ + _iodev->close(); + _socket->finish(); + + _closed = true; + emit finished(result); +} diff --git a/src/net/mcast/McastReceiver.h b/src/net/mcast/McastReceiver.h new file mode 100644 index 0000000..247733d --- /dev/null +++ b/src/net/mcast/McastReceiver.h @@ -0,0 +1,80 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastReceiver.h +# - implement the receiver-side multicast file transfer protocol -- interface +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTRECEIVER_H_ +#define MCASTRECEIVER_H_ + +#include <QByteArray> +#include <QCryptographicHash> +#include <QIODevice> +#include <QObject> +#include <QtGlobal> + +#include <src/net/mcast/McastConfiguration.h> +#include <src/net/mcast/McastPGMSocket.h> + +class McastReceiver : public QObject +{ + Q_OBJECT +public: + enum Result { + RES_OK, + RES_ABORTED, + RES_OFFSET_MISMATCH, + RES_CHECKSUM_MISMATCH, + RES_MD5_MISMATCH, + RES_CONNECTION_RESET + }; + + McastReceiver(QIODevice* iodev, McastConfiguration* config = 0, QObject* parent = 0); + virtual ~McastReceiver(); + + McastConfiguration* config() + { + return _config; + } + + void config(McastConfiguration const* config); + + static inline bool is_error(Result result) + { + return result != RES_OK; + } + +signals: + void finished(int result); + void progress(quint64 offset); + +public slots: + bool start(); + void abort(); + +private: + McastConfiguration* _config; + McastPGMSocket* _socket; + quint64 _curoffs; + bool _closed; + QCryptographicHash _hash; + QIODevice* _iodev; + +private slots: + void receivedPacket(QByteArray const& bytes); + void connectionReset(); + + void _close(Result result); +}; + +#endif /* MCASTRECEIVER_H_ */ diff --git a/src/net/mcast/McastSender.cpp b/src/net/mcast/McastSender.cpp new file mode 100644 index 0000000..3fec6a4 --- /dev/null +++ b/src/net/mcast/McastSender.cpp @@ -0,0 +1,127 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastReceiver.h +# - implement the receiver-side multicast file transfer protocol -- implementation +# ----------------------------------------------------------------------------- +*/ + +#include "McastSender.h" +#include "McastConstants.h" + +#include <QDataStream> +#include <QTimer> + +#include <pgm/pgm.h> +// OpenPGM #defines bool. This is bad in C++. +#undef bool + +#define MCASTFT_START_DEFER_TIME 2000 /* msec */ + +McastSender::McastSender(QIODevice* iodev, McastConfiguration const* config, QObject* parent) : + QObject(parent), + _config(config ? new McastConfiguration(*config) : new McastConfiguration()), + _socket(0), + _iodev(iodev), + _curoffs(0), + _hash(QCryptographicHash::Md5), + _finished(false) +{ +} + +McastSender::~McastSender() +{ + delete _config; +} + +void McastSender::start() +{ + _socket = new McastPGMSocket(this); + connect(_socket, SIGNAL(readyToSend()), this, SLOT(deferredStart())); + _socket->open(_config, McastPGMSocket::PSOCK_WRITE); +} + +void McastSender::start(McastPGMSocket* socket) +{ + _socket = socket; + Q_ASSERT(_socket->isOpen()); + deferredStart(); +} + +void McastSender::deferredStart() +{ + // Wait some time, to give the PGM library the chance to generate some + // undisturbed SPM messages: + QTimer::singleShot(MCASTFT_START_DEFER_TIME, this, SLOT(readyToSend())); + disconnect(_socket, SIGNAL(readyToSend()), this, SLOT(deferredStart())); + connect(_socket, SIGNAL(readyToSend()), this, SLOT(readyToSend())); +} + +void McastSender::readyToSend() +{ + if(_finished) + return; + + if(_iodev->atEnd()) + { + QByteArray fpdu; + QDataStream strm(&fpdu, QIODevice::WriteOnly); + strm.setByteOrder(QDataStream::BigEndian); + + strm << (quint64)MCASTFT_MAGIC << (quint64)UINT64_C(0xffffffffffffffff) << _hash.result(); + strm << qChecksum(fpdu.constData(), fpdu.size()); + + _socket->sendPacket(fpdu); + connect(_socket, SIGNAL(connectionFinished()), this, SLOT(socketFinished())); + _socket->shutdown(); + + _finished = true; + + _iodev->close(); + + emit allSent(); + } + else + { + QByteArray barr(DEFAULT_MULTICAST_APDU, '\0'); + qint64 len_read; + len_read = _iodev->read(barr.data(), barr.capacity()); + barr.resize((int)len_read); + + _hash.addData(barr); + + QByteArray pdu; + QDataStream strm(&pdu, QIODevice::WriteOnly); + strm.setByteOrder(QDataStream::BigEndian); + + strm << (quint64)MCASTFT_MAGIC << _curoffs; + strm << barr; + quint16 checksum = qChecksum(pdu.constData(), pdu.size()); + strm << checksum; + + _curoffs += len_read; + + _socket->sendPacket(pdu); + + emit progress(_curoffs); + } +} + +void McastSender::close() +{ + _socket->finish(); +} + +void McastSender::socketFinished() +{ + _socket->deleteLater(); + emit finished(); +} diff --git a/src/net/mcast/McastSender.h b/src/net/mcast/McastSender.h new file mode 100644 index 0000000..0c5e29f --- /dev/null +++ b/src/net/mcast/McastSender.h @@ -0,0 +1,73 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastReceiver.h +# - implement the sender-side multicast file transfer protocol -- interface +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTSENDER_H_ +#define MCASTSENDER_H_ + +#include <QCryptographicHash> +#include <QIODevice> +#include <QObject> + +#include "McastConfiguration.h" +#include "McastPGMSocket.h" + +class McastSender : public QObject +{ + Q_OBJECT +public: + McastSender(QIODevice* iodev = 0, McastConfiguration const* config = 0, QObject* parent = 0); + virtual ~McastSender(); + + McastConfiguration* config() + { + return _config; + } + + QIODevice* iodevice() const + { + return _iodev; + } + + void setIODevice(QIODevice* iodevice) + { + _iodev = iodevice; + } + +signals: + void finished(); + void progress(quint64 offset); + void allSent(); + +public slots: + void start(); + void start(McastPGMSocket* openSocket); + void close(); + +private slots: + void deferredStart(); + void readyToSend(); + void socketFinished(); + +private: + McastConfiguration* _config; + McastPGMSocket* _socket; + QIODevice* _iodev; + quint64 _curoffs; + QCryptographicHash _hash; + bool _finished; +}; + +#endif /* MCASTSENDER_H_ */ diff --git a/src/net/mcast/trial_programs/CMakeLists.txt b/src/net/mcast/trial_programs/CMakeLists.txt new file mode 100644 index 0000000..d0f68fa --- /dev/null +++ b/src/net/mcast/trial_programs/CMakeLists.txt @@ -0,0 +1,38 @@ +INCLUDE(${QT_USE_FILE}) + +QT4_WRAP_CPP( + mcastsend_MOC + mcastsend.h +) + +QT4_WRAP_CPP( + mcastreceive_MOC + mcastreceive.h +) + +SET(argparser_SRC + McastConfigArgParser.h + McastConfigArgParser.cpp +) + +ADD_EXECUTABLE(mcastsend + mcastsend.cpp + mcastsend.h + ${argparser_SRC} + ${mcastsend_MOC} +) + +ADD_EXECUTABLE(mcastreceive + mcastreceive.cpp + mcastreceive.h + ${argparser_SRC} + ${mcastreceive_MOC} +) + +TARGET_LINK_LIBRARIES(mcastsend + pvsmcast +) + +TARGET_LINK_LIBRARIES(mcastreceive + pvsmcast +)
\ No newline at end of file diff --git a/src/net/mcast/trial_programs/McastConfigArgParser.cpp b/src/net/mcast/trial_programs/McastConfigArgParser.cpp new file mode 100644 index 0000000..881f728 --- /dev/null +++ b/src/net/mcast/trial_programs/McastConfigArgParser.cpp @@ -0,0 +1,165 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/McastConfigArgParser.cpp +# - Parse common Multicast Configuration CLI arguments +# ----------------------------------------------------------------------------- +*/ + +#include <iostream> + +#include <QCoreApplication> + +#include "McastConfigArgParser.h" + +using namespace std; + +bool parseMcastConfigArg(QStringList::iterator& i, QStringList::iterator const& end, McastConfiguration* config) +{ + QString arg = *i; + + if (arg == "-addr") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + config->multicastAddress(*i); + } + else if (arg == "-dport") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint16 dport = (quint16)i->toInt(&ok); + if (!ok) + { + cerr << "Error: dport is not an integer" << endl; + return false; + } + config->multicastDPort(dport); + } + else if (arg == "-sport") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint16 sport = (quint16)i->toInt(&ok); + if (!ok) + { + cerr << "Error: sport is not an integer" << endl; + return false; + } + config->multicastSPort(sport); + } + else if (arg == "-mtu") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint16 mtu = (quint16)i->toInt(&ok); + if (!ok) + { + cerr << "Error: MTU is not an integer" << endl; + return false; + } + config->multicastMTU(mtu); + } + else if (arg == "-rate") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint32 rate = i->toInt(&ok); + if (!ok) + { + cerr << "Error: Rate is not an integer" << endl; + return false; + } + config->multicastRate(rate); + } + else if (arg == "-winsize") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint16 winsize = (quint16)i->toInt(&ok); + if (!ok) + { + cerr << "Error: Winsize is not an integer" << endl; + return false; + } + config->multicastWinSize(winsize); + } + else if (arg == "-udp") + { + config->multicastUseUDP(true); + } + else if (arg == "-no-udp") + { + config->multicastUseUDP(false); + } + else if (arg == "-udp-port") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint16 udpport = (quint16)i->toInt(&ok); + if (!ok) + { + cerr << "Error: UDP-Port is not an integer" << endl; + return false; + } + config->multicastUDPPortBase(udpport); + } + else if (arg == "-intf") + { + i++; + if (i == end) + { + cerr << "Option " << arg.toLatin1().constData() << "is missing argument" << endl; + return false; + } + config->multicastInterface(*i); + } + else + { + return false; + } + + return true; +} diff --git a/src/net/mcast/trial_programs/McastConfigArgParser.h b/src/net/mcast/trial_programs/McastConfigArgParser.h new file mode 100644 index 0000000..4fb18a7 --- /dev/null +++ b/src/net/mcast/trial_programs/McastConfigArgParser.h @@ -0,0 +1,26 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/McastConfigArgParser.h +# - Parse common Multicast Configuration CLI arguments +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTCONFIGARGPARSER_H_ +#define MCASTCONFIGARGPARSER_H_ + +#include <QStringList> + +#include "../McastConfiguration.h" + +bool parseMcastConfigArg(QStringList::iterator& i, QStringList::iterator const& end, McastConfiguration* config); + +#endif /* MCASTCONFIGARGPARSER_H_ */ diff --git a/src/net/mcast/trial_programs/mcastreceive.cpp b/src/net/mcast/trial_programs/mcastreceive.cpp new file mode 100644 index 0000000..48a0f10 --- /dev/null +++ b/src/net/mcast/trial_programs/mcastreceive.cpp @@ -0,0 +1,150 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/mcastsend.cpp +# - Receive a file via the PVS Mcast protocol +# ----------------------------------------------------------------------------- +*/ + +#include <iostream> + +#include <QCoreApplication> +#include <QFile> +#include <QStringList> +#include <QTimer> + +#include "mcastreceive.h" +#include "McastConfigArgParser.h" +#include "../McastConfiguration.h" +#include "../McastReceiver.h" + +using namespace std; + +int main(int argc, char** argv) +{ + QCoreApplication app(argc, argv); + McastReceive me; + + QTimer::singleShot(0, &me, SLOT(run())); + + return app.exec(); +} + +void McastReceive::run() +{ + QStringList args = QCoreApplication::arguments(); + QStringList::iterator i = args.begin(); + QStringList::iterator const end = args.end(); + + QString filename(""); + + McastConfiguration config; + + ++i; + while (i != end) + { + QString arg = *i; + + cerr << "Arg: " << arg.toLatin1().constData() << endl; + + if (arg == "-file") + { + ++i; + if (i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing its argument" << endl; + QCoreApplication::exit(1); + return; + } + filename = *i; + } + else if (arg == "-help") + { + cerr << "Options:" << endl << endl + << " -file <FILE> Receive to file FILE" << endl + << " -addr <ADDR> Use ADDR as address specification" << endl + << " -dport <PORT> Send to port PORT" << endl + << " -sport <PORT> Send from port PORT" << endl + << " -mtu <BYTES> Set MTU to BYTES" << endl + << " -rate <BYTES> Send BYTES per second" << endl + << " -winsize <SECONDS> Set Window Size to SECONDS" << endl + << " -udp Use UDP encapsulation" << endl + << " -udp-port PORT Use UDP port PORT" << endl; + QCoreApplication::quit(); + return; + } + else + { + if (!parseMcastConfigArg(i, end, &config)) + { + cerr << "Unknown argument: " << arg.toLatin1().constData() << endl; + QCoreApplication::exit(1); + return; + } + } + + ++i; + } + + if (filename == "") + { + cerr << "No Filename given" << endl; + QCoreApplication::exit(1); + return; + } + + _target = new QFile(filename, this); + _target->open(QIODevice::WriteOnly); + + McastReceiver* recv = new McastReceiver(_target, &config, this); + + connect(recv, SIGNAL(finished(int)), this, SLOT(finished(int))); + + QTimer::singleShot(0, recv, SLOT(start())); +} + +void McastReceive::finished(int state) +{ + cerr << "finished: "; + + switch(state) + { + case McastReceiver::RES_OK: + cerr << "OK." << endl; + break; + case McastReceiver::RES_ABORTED: + cerr << "Aborted." << endl; + goto failed; + case McastReceiver::RES_CHECKSUM_MISMATCH: + cerr << "Checksum mismatch." << endl; + goto failed; + case McastReceiver::RES_CONNECTION_RESET: + cerr << "Connection reset." << endl; + goto failed; + case McastReceiver::RES_MD5_MISMATCH: + cerr << "MD5 mismatch." << endl; + goto failed; + case McastReceiver::RES_OFFSET_MISMATCH: + cerr << "Offset mismatch. Undetected packet loss?" << endl; + goto failed; + default: + cerr << "Unknown error code!" << endl; + goto failed; + } + + QCoreApplication::quit(); + return; +failed: + cerr << "Deleting file." << endl; + _target->remove(); + QCoreApplication::exit(1); + return; +} diff --git a/src/net/mcast/trial_programs/mcastreceive.h b/src/net/mcast/trial_programs/mcastreceive.h new file mode 100644 index 0000000..3e72d4c --- /dev/null +++ b/src/net/mcast/trial_programs/mcastreceive.h @@ -0,0 +1,44 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/mcastsend.cpp +# - Receive a file via the PVS Mcast protocol +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTRECEIVE_H_ +#define MCASTRECEIVE_H_ + +#include <QObject> + +class QFile; +class McastReceiver; + +class McastReceive : public QObject +{ + Q_OBJECT +public: + McastReceive() : + QObject(), + _receiver(0) + { + } + +public slots: + void run(); + void finished(int state); + +private: + McastReceiver* _receiver; + QFile* _target; +}; + +#endif /* MCASTRECEIVE_H_ */ diff --git a/src/net/mcast/trial_programs/mcastsend.cpp b/src/net/mcast/trial_programs/mcastsend.cpp new file mode 100644 index 0000000..f78a9ce --- /dev/null +++ b/src/net/mcast/trial_programs/mcastsend.cpp @@ -0,0 +1,122 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/mcastsend.cpp +# - Send a file via the PVS Mcast protocol +# ----------------------------------------------------------------------------- +*/ + +#include <iostream> + +#include <QCoreApplication> +#include <QFile> +#include <QStringList> +#include <QTimer> + +#include <src/net/mcast/McastSender.h> +#include "mcastsend.h" +#include "McastConfigArgParser.h" +#include "../McastConstants.h" +#include "../McastConfiguration.h" + +using namespace std; + +int +main(int argc, char**argv) +{ + QCoreApplication app(argc, argv); + McastSend me; + + QTimer::singleShot(0, &me, SLOT(run())); + + return app.exec(); +} + +void McastSend::run() +{ + QStringList args = QCoreApplication::arguments(); + QStringList::iterator i = args.begin(); + QStringList::iterator const end = args.end(); + QString filename(""); + McastConfiguration config; + + ++i; + while(i != end) + { + // parse command line arguments + + QString arg = *i; + + cerr << "Arg: " << arg.toLatin1().constData() << endl; + + if (arg == "-file") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + QCoreApplication::exit(1); + return; + } + filename = *i; + } + else if (arg == "-help") + { + cerr << "Options:" << endl << endl + << " -file <FILE> Send FILE to the listeners" << endl + << " -addr <ADDR> Use ADDR as address specification" << endl + << " -dport <PORT> Send to port PORT" << endl + << " -sport <PORT> Send from port PORT" << endl + << " -mtu <BYTES> Set MTU to BYTES" << endl + << " -rate <BYTES> Send BYTES per second" << endl + << " -winsize <SECONDS> Set Window Size to SECONDS" << endl + << " -udp Use UDP encapsulation" << endl + << " -udp-port PORT Use UDP port PORT" << endl; + QCoreApplication::quit(); + return; + } + else + { + if (!parseMcastConfigArg(i, end, &config)) + { + cerr << "Unknown command line argument: " << arg.toLatin1().constData() << endl; + QCoreApplication::exit(1); + return; + } + } + + ++i; + } + + if(filename == "") + { + cerr << "No filename given" << endl; + QCoreApplication::exit(1); + return; + } + + // now, do it. + QFile* file = new QFile(filename); + file->open(QIODevice::ReadOnly); + + McastSender* sender = new McastSender(file, &config, this); + file->setParent(sender); + + connect(sender, SIGNAL(finished()), this, SLOT(finished())); + + QTimer::singleShot(0, sender, SLOT(start())); +} + +void McastSend::finished() +{ + cerr << "finished." << endl; + QCoreApplication::quit(); +} diff --git a/src/net/mcast/trial_programs/mcastsend.h b/src/net/mcast/trial_programs/mcastsend.h new file mode 100644 index 0000000..ae15eb4 --- /dev/null +++ b/src/net/mcast/trial_programs/mcastsend.h @@ -0,0 +1,42 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/mcastsend.cpp +# - Send a file via the PVS Mcast protocol +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTSEND_H_ +#define MCASTSEND_H_ + +#include <QObject> + +#include "../McastSender.h" + +class McastSend : public QObject +{ + Q_OBJECT +public: + McastSend() : + QObject(), + _sender(0) + { + } + +public slots: + void run(); + void finished(); + +private: + McastSender* _sender; +}; + +#endif /* MCASTSEND_H_ */ diff --git a/src/net/pvsIncomingMulticastTransfer.cpp b/src/net/pvsIncomingMulticastTransfer.cpp new file mode 100644 index 0000000..10b5307 --- /dev/null +++ b/src/net/pvsIncomingMulticastTransfer.cpp @@ -0,0 +1,133 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/pcsIncomingMulticastTransfer.h +# - wrap McastReceiver functionality in PVS daemon +# ----------------------------------------------------------------------------- +*/ + +#include <QDir> +#include <QTemporaryFile> +#include <QTimer> + +#include "pvsIncomingMulticastTransfer.h" +#include <src/net/mcast/McastReceiver.h> + +PVSIncomingMulticastTransfer::PVSIncomingMulticastTransfer(QString const& sender, qulonglong transferID, qulonglong size, QString const& filename, + ushort port, McastConfiguration const* configTemplate, QObject* parent) : + QObject(parent), + _sender(sender), + _transferID(transferID), + _bytes(0), + _size(size), + _port(port), + _file(new QFile(filename, this)), + _receiver(0), + _config(configTemplate ? + new McastConfiguration(*configTemplate) : + new McastConfiguration()), + _progressTimer(new QTimer(this)) +{ + _file->open(QIODevice::WriteOnly); + + _config->multicastUDPPortBase(port); + // _config->multicastDPort(port+1); + // _config->multicastSPort(port+2); + + connect(_progressTimer, SIGNAL(timeout()), SLOT(updateProgress())); + connect(this, SIGNAL(failed(qulonglong, QString const&)), SLOT(removeFile())); +} + +PVSIncomingMulticastTransfer::~PVSIncomingMulticastTransfer() +{ + // TODO Auto-generated destructor stub +} + +bool PVSIncomingMulticastTransfer::start() +{ + _file->open(QIODevice::WriteOnly); + _receiver = new McastReceiver(_file, new McastConfiguration(*_config), this); + connect(_receiver, SIGNAL(finished(int)), SLOT(receiverFinished(int))); + connect(_receiver, SIGNAL(progress(quint64)), SLOT(receiverProgressed(quint64))); + + if (!_receiver->start()) + { + emit retry(_sender, _transferID); + return false; + } + else + { + _progressTimer->start(333); + return true; + } +} + +void PVSIncomingMulticastTransfer::abort() +{ + delete _receiver; + _receiver = 0; + + delete _progressTimer; + _progressTimer = 0; + + if(_file) + delete _file; +} + +void PVSIncomingMulticastTransfer::updatePort(ushort port) +{ + _config->multicastUDPPortBase(port); + _config->multicastSPort(port); + _config->multicastDPort(port); +} + +void PVSIncomingMulticastTransfer::receiverProgressed(quint64 bytes) +{ + _bytes = bytes; +} + +void PVSIncomingMulticastTransfer::receiverFinished(int how) +{ + switch(how) + { + case McastReceiver::RES_OK: + emit finished(_transferID); + break; + case McastReceiver::RES_ABORTED: + emit failed(_transferID, tr("Aborted")); + break; + case McastReceiver::RES_MD5_MISMATCH: + case McastReceiver::RES_CHECKSUM_MISMATCH: + emit failed(_transferID, tr("Unrecoverable data corruption")); + break; + case McastReceiver::RES_CONNECTION_RESET: + emit failed(_transferID, tr("Connection was reset")); + break; + case McastReceiver::RES_OFFSET_MISMATCH: + emit failed(_transferID, tr("Unrecoverable data loss. Try a lower transfer rate")); + break; + } +} + +void PVSIncomingMulticastTransfer::removeFile() +{ + if(_file) + _file->remove(); +} + +void PVSIncomingMulticastTransfer::updateProgress() +{ + if (!_started) + { + emit started(_transferID); + } + emit progress(_transferID, _bytes, _size); +} diff --git a/src/net/pvsIncomingMulticastTransfer.h b/src/net/pvsIncomingMulticastTransfer.h new file mode 100644 index 0000000..f96e176 --- /dev/null +++ b/src/net/pvsIncomingMulticastTransfer.h @@ -0,0 +1,71 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/pcsIncomingMulticastTransfer.h +# - wrap McastReceiver functionality in PVS daemon +# ----------------------------------------------------------------------------- +*/ + +#ifndef PVSINCOMINGMULTICASTTRANSFER_H_ +#define PVSINCOMINGMULTICASTTRANSFER_H_ + +#include <QObject> +#include <QString> + +#include <src/net/pvsMsg.h> + +class McastConfiguration; +class McastReceiver; +class QFile; +class QTimer; + +class PVSIncomingMulticastTransfer : public QObject +{ + Q_OBJECT +public: + PVSIncomingMulticastTransfer(QString const& sender, qulonglong transferID, qulonglong size, QString const& filename, ushort port, + McastConfiguration const* configTemplate, QObject* parent = 0); + virtual ~PVSIncomingMulticastTransfer(); + + void setFinalFile(QString const& filename); + +signals: + void retry(QString const& sender, qulonglong transferID); + void started(qulonglong transferID); + void progress(qulonglong transferID, qulonglong bytes, qulonglong of); + void finished(qulonglong transferID); + void failed(qulonglong transferID, QString const& reason); + +public slots: + void updatePort(ushort port); + bool start(); + void abort(); + +private slots: + void receiverProgressed(quint64 bytes); + void receiverFinished(int reason); + void updateProgress(); + void removeFile(); + +private: + QString _sender; + qulonglong _transferID; + qulonglong _bytes; + qulonglong _size; + ushort _port; + QFile* _file; + McastReceiver* _receiver; + McastConfiguration* _config; + bool _started; + QTimer* _progressTimer; +}; + +#endif /* PVSINCOMINGMULTICASTTRANSFER_H_ */ diff --git a/src/net/pvsListenServer.cpp b/src/net/pvsListenServer.cpp index 58d4aee..90c2dfb 100644 --- a/src/net/pvsListenServer.cpp +++ b/src/net/pvsListenServer.cpp @@ -21,9 +21,13 @@ #include "pvsClientConnection.h" #include "src/util/consoleLogger.h" #include <QtNetwork/QSslSocket> +#include <QBuffer> +#include <QByteArray> +#include <QSettings> #include "SslServer.h" #include <cassert> //#define verbose +#include "mcast/McastConfiguration.h" // Create listener PVSListenServer::PVSListenServer(int port, int clients) @@ -36,6 +40,7 @@ PVSListenServer::PVSListenServer(int port, int clients) else _clientsMax = clients; _port = port; + _mcastConfig = 0; init(); } @@ -178,6 +183,7 @@ void PVSListenServer::onClientConnected(PVSClientConnection* connected) { connected->setServerID(_id); connected->setID(generateID()); + connected->push_back_send(mcastConfigMessage()); } void PVSListenServer::onClientDisconnected(PVSClientConnection* disconnected) @@ -227,6 +233,11 @@ PVSClientConnection* PVSListenServer::getConnectionFromID(int id) // Initialize listening socket bool PVSListenServer::init() { + if (_mcastConfig) + delete _mcastConfig; + _mcastConfig = new McastConfiguration(this); + loadMcastConfig(); + if (_listenSocket != NULL) shutdown(); @@ -305,3 +316,52 @@ bool PVSListenServer::isListening() { return _listenSocket != NULL && _listenSocket->isListening(); } + +void PVSListenServer::loadMcastConfig() +{ + QSettings settings; + _mcastConfig->loadFrom(&settings, "multicast-filetransfer"); +} + +void PVSListenServer::saveMcastConfig() +{ + QSettings settings; + _mcastConfig->writeTo(&settings, "multicast-filetransfer"); + settings.sync(); +} + +PVSMsg PVSListenServer::mcastConfigMessage() +{ + // If anything is changed here, do not forget to + // 1. assign a new version number + // 2. adapt PVS::onCommand(PVSMsg) in pvs.cpp + QByteArray ba; + QDataStream strm(&ba, QIODevice::WriteOnly); + strm << (quint16)1 // version + << _mcastConfig->multicastAddress() + << _mcastConfig->multicastUDPPortBase() + << _mcastConfig->multicastDPort() + << _mcastConfig->multicastSPort() + << _mcastConfig->multicastMTU() + << _mcastConfig->multicastWinSize() + << _mcastConfig->multicastRate() + << _mcastConfig->multicastUseUDP(); + + QByteArray b64 = ba.toBase64(); + QString message = QString::fromAscii(b64.constData(), b64.length()); + PVSMsg msg(PVSCOMMAND, "MCASTFTCONFIG", message); + return msg; +} + +void PVSListenServer::multicastReconfigure(McastConfiguration const* source) +{ + _mcastConfig->multicastAddress(source->multicastAddress()); + *_mcastConfig = *source; + saveMcastConfig(); + sendToAll(mcastConfigMessage()); +} + +McastConfiguration const* PVSListenServer::getMulticastConfiguration() +{ + return _mcastConfig; +} diff --git a/src/net/pvsListenServer.h b/src/net/pvsListenServer.h index b43b730..ca9977c 100644 --- a/src/net/pvsListenServer.h +++ b/src/net/pvsListenServer.h @@ -10,6 +10,7 @@ class SslServer; class PVSClientConnection; class PVSMsg; +class McastConfiguration; class PVSListenServer : public QObject { @@ -40,6 +41,12 @@ private: bool init(); unsigned int generateID(); + McastConfiguration* _mcastConfig; + + void loadMcastConfig(); + void saveMcastConfig(); + PVSMsg mcastConfigMessage(); + protected: void timerEvent(QTimerEvent *event); @@ -60,6 +67,9 @@ public: bool disconnectClient(PVSClientConnection* delinquent); void onConnectionRemoved(PVSClientConnection* delinquent); + void multicastReconfigure(McastConfiguration const* source); + McastConfiguration const* getMulticastConfiguration(); + std::list<PVSClientConnection*>* getClientListPtr() { return &_clients; diff --git a/src/net/pvsNetworkInterfaceListModel.cpp b/src/net/pvsNetworkInterfaceListModel.cpp new file mode 100644 index 0000000..67d0c0a --- /dev/null +++ b/src/net/pvsNetworkInterfaceListModel.cpp @@ -0,0 +1,81 @@ +/* + * pvsNetworkInterfaceListModel.cpp + * + * Created on: 04.08.2010 + * Author: brs + */ + +#include "pvsNetworkInterfaceListModel.h" +#include <QStringList> + +PVSNetworkInterfaceListModel::PVSNetworkInterfaceListModel(QObject* parent) : + QAbstractListModel(parent) +{ + reloadInterfaceList(); +} + +PVSNetworkInterfaceListModel::~PVSNetworkInterfaceListModel() +{ +} + +void PVSNetworkInterfaceListModel::reloadInterfaceList() +{ + _interfaces = QNetworkInterface::allInterfaces(); + reset(); +} + +QVariant PVSNetworkInterfaceListModel::data(QModelIndex const& index, int role) const +{ + int i = index.row(); + if(0 > i || i >= _interfaces.size()) + { + return QVariant(); + } + QNetworkInterface intf = _interfaces.at(i); + + switch(role) + { + case Qt::DisplayRole: + { + QString name = intf.humanReadableName(); + QList<QNetworkAddressEntry> addresses = intf.addressEntries(); + QStringList l; + + foreach(QNetworkAddressEntry addr, addresses) + { + l.append(addr.ip().toString()); + } + + return QString("%1 (%2)").arg(name).arg(l.join(", ")); + } + case Qt::EditRole: + case Qt::UserRole: + return intf.name(); + default: + return QVariant(); + } +} + +QVariant PVSNetworkInterfaceListModel::headerData(int section, Qt::Orientation orientation, int role) const +{ + if(section == 0 && orientation == Qt::Vertical && role == Qt::DisplayRole) + { + return tr("Interface"); + } + else + { + return QVariant(); + } +} + +int PVSNetworkInterfaceListModel::rowCount(QModelIndex const& parent) const +{ + if(parent.isValid()) + { + return 0; + } + else + { + return _interfaces.size(); + } +} diff --git a/src/net/pvsNetworkInterfaceListModel.h b/src/net/pvsNetworkInterfaceListModel.h new file mode 100644 index 0000000..3a9b95d --- /dev/null +++ b/src/net/pvsNetworkInterfaceListModel.h @@ -0,0 +1,35 @@ +/* + * pvsNetworkInterfaceListModel.h + * + * Created on: 04.08.2010 + * Author: brs + */ + +#ifndef PVSNETWORKINTERFACELISTMODEL_H_ +#define PVSNETWORKINTERFACELISTMODEL_H_ + +#include <QAbstractListModel> +#include <QList> +#include <QNetworkInterface> +#include <QVariant> + +class PVSNetworkInterfaceListModel : public QAbstractListModel +{ + Q_OBJECT + +public: + PVSNetworkInterfaceListModel(QObject* parent = 0); + virtual ~PVSNetworkInterfaceListModel(); + + QVariant data(QModelIndex const& index, int role = Qt::DisplayRole) const; + QVariant headerData(int section, Qt::Orientation orientation, int role = Qt::DisplayRole) const; + int rowCount(QModelIndex const&) const; + +private: + QList<QNetworkInterface> _interfaces; + +public slots: + void reloadInterfaceList(); +}; + +#endif /* PVSNETWORKINTERFACELISTMODEL_H_ */ diff --git a/src/net/pvsOutgoingMulticastTransfer.cpp b/src/net/pvsOutgoingMulticastTransfer.cpp new file mode 100644 index 0000000..4df4986 --- /dev/null +++ b/src/net/pvsOutgoingMulticastTransfer.cpp @@ -0,0 +1,209 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/pvsOutgoingMulticastTransfer.cpp +# - wrap McastSender functionality in PVS daemon +# ----------------------------------------------------------------------------- +*/ + +#include "pvsOutgoingMulticastTransfer.h" + +#include <QDataStream> +#include <QFile> +#include <QFileInfo> +#include <QHostInfo> +#include <QSettings> +#include <QTimer> + +#include <src/net/pvsMsg.h> +#include <src/net/mcast/McastConfiguration.h> +#include <src/net/mcast/McastPGMSocket.h> +#include <src/net/mcast/McastSender.h> +#include <src/util/consoleLogger.h> + +PVSOutgoingMulticastTransfer::PVSOutgoingMulticastTransfer(QString senderName, quint64 id, QString filename, QObject* parent) : + QObject(parent), + _file(0), + _progress(0), + _config(0), + _socket(0), + _progressTimer(0), + _prepareTimer(0), + _senderName(senderName), + _id(id), + _error(false) +{ + QFileInfo finfo(filename); + if(!finfo.exists()) + { + error("File does not exist"); + } + + if(!finfo.isReadable()) + { + error("File is not readable"); + } + + _file = new QFile(filename, this); + _length = _file->size(); +} + +PVSOutgoingMulticastTransfer::~PVSOutgoingMulticastTransfer() +{ + if(_file) + delete _file; + if(_config) + delete _config; +} + +void PVSOutgoingMulticastTransfer::prepare() +{ + if (_socket && !_socketInacceptable) + { + _prepareTimer->stop(); + delete _prepareTimer; + _prepareTimer = 0; + + QTimer::singleShot(0, this, SLOT(doStart())); + return; + } + else if (_socket) + { + delete _socket; + _socket = 0; + } + + QSettings settings; + quint16 portbase = settings.value("multicast/port-base", "6966").value<quint16>(); + quint16 portlimit = settings.value("multicast/port-limit", "7966").value<quint16>(); + + int tries_remaining = 5; + while(tries_remaining > 0) + { + quint16 port = portbase + (qrand() % (portlimit - portbase + 1)); + + if (!_config) + { + _config = new McastConfiguration(); + } + _config->loadFrom(&settings, "multicast"); + _config->multicastUDPPortBase(port); + + _socket = new McastPGMSocket(this); + if(_socket->open(_config, McastPGMSocket::PSOCK_WRITE)) + { + break; + } + else + { + delete _socket; + _socket = 0; + } + } + + if (!_socket) + { + emit failed(_id, "Could not open socket"); + delete _prepareTimer; + _prepareTimer = 0; + return; + } + else + { + _socketInacceptable = false; + // announce the transfer: + QFileInfo info(*_file); + QString message = QString("%1:%2:%3:%4:%5").arg(_senderName).arg(_id).arg(info.fileName()).arg(info.size()).arg(_config->multicastUDPPortBase()); + PVSMsg msg(PVSCOMMAND, "MCASTFTANNOUNCE", message); + emit announce(msg); + _prepareTimer->start(5000); + } +} + +void PVSOutgoingMulticastTransfer::doStart() +{ + ConsoleLog writeLine(QString("Starting multicast transfer %1").arg(_id)); + + _file->open(QIODevice::ReadOnly); + + _sender = new McastSender(_file, _config, this); + connect(_sender, SIGNAL(finished()), SLOT(senderFinished())); + connect(_sender, SIGNAL(progress(quint64)), SLOT(senderProgress(quint64))); + // connect(_sender, SIGNAL(allSent()), SIGNAL(allSent())); + _socket->setParent(_sender); + _sender->start(_socket); + + emit started(_id); + + _progressTimer = new QTimer(this); + connect(_progressTimer, SIGNAL(timeout()), SLOT(reportTimeout())); + _progressTimer->setInterval(333); + _progressTimer->start(); +} + +void PVSOutgoingMulticastTransfer::senderProgress(quint64 bytes) + +{ + _progress = bytes; +} + +void PVSOutgoingMulticastTransfer::senderFinished() +{ + if(_progressTimer) + { + _progressTimer->stop(); + delete _progressTimer; + _progressTimer = 0; + } + emit finished(_id); + _sender->close(); + delete _sender; +} + +void PVSOutgoingMulticastTransfer::reportTimeout() +{ + emit progress(_id, _progress, _length); +} + +void PVSOutgoingMulticastTransfer::retry() +{ + bool first = !_socketInacceptable; + _socketInacceptable = true; + + if(first) + { + _prepareTimer->setInterval(1000); + } +} + +void PVSOutgoingMulticastTransfer::start() +{ + if (!_prepareTimer) + { + _prepareTimer = new QTimer(this); + _prepareTimer->setSingleShot(true); + connect(_prepareTimer, SIGNAL(timeout()), SLOT(prepare())); + } + QTimer::singleShot(0, this, SLOT(prepare())); +} + +void PVSOutgoingMulticastTransfer::abort() +{ + if (_sender) + _sender->close(); +} + +void PVSOutgoingMulticastTransfer::error(QString const& reason) +{ + qCritical() << "Could not create an outgoing mcast transfer: " << reason; + _error = true; + _reason = reason; +} diff --git a/src/net/pvsOutgoingMulticastTransfer.h b/src/net/pvsOutgoingMulticastTransfer.h new file mode 100644 index 0000000..5fd6a3d --- /dev/null +++ b/src/net/pvsOutgoingMulticastTransfer.h @@ -0,0 +1,92 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/pvsOutgoingMulticastTransfer.h +# - wrap McastSender functionality in PVS daemon +# ----------------------------------------------------------------------------- +*/ + +#ifndef PVSOUTGOINGMULTICASTTRANSFER_H_ +#define PVSOUTGOINGMULTICASTTRANSFER_H_ + +#include <QtGlobal> +#include <QObject> +#include <QString> + +#include <src/net/pvsMsg.h> + +class QFile; +class QTimer; +class McastConfiguration; +class McastPGMSocket; +class McastSender; + +class PVSOutgoingMulticastTransfer : public QObject +{ + Q_OBJECT +public: + PVSOutgoingMulticastTransfer(QString senderName, quint64 id, QString filename, QObject* parent = 0); + virtual ~PVSOutgoingMulticastTransfer(); + + quint64 id() const + { + return _id; + } + + bool isError() const + { + return _error; + } + + QString reason() const + { + return _reason; + } + +signals: + void started(qulonglong id); + void progress(qulonglong id, qulonglong bytes, qulonglong of); + void allSent(qulonglong id); + void finished(qulonglong id); + void failed(qulonglong id, QString const reason); + void announce(PVSMsg announcement); + +private slots: + void senderProgress(quint64 bytes); + void senderFinished(); + void reportTimeout(); + void doStart(); + void prepare(); + +public slots: + void start(); + void abort(); + void retry(); + +private: + QFile* _file; + quint64 _length; + quint64 _progress; + McastConfiguration* _config; + McastPGMSocket* _socket; + McastSender* _sender; + QTimer* _progressTimer; + QTimer* _prepareTimer; + QString _senderName; + quint64 _id; + bool _error; + QString _reason; + bool _socketInacceptable; + + void error(QString const& reason); +}; + +#endif /* PVSOUTGOINGMULTICASTTRANSFER_H_ */ diff --git a/src/net/pvsServerConnection.h b/src/net/pvsServerConnection.h index 0669d88..c6ef015 100644 --- a/src/net/pvsServerConnection.h +++ b/src/net/pvsServerConnection.h @@ -11,9 +11,9 @@ #define _PVSSERVERCONNECTION_H_ #include "src/util/dispatcher.h" +#include "src/net/pvsMsg.h" #include <QtNetwork/QSslSocket> -class PVSMsg; class PVS; class PVSDiscoveredServer; @@ -30,8 +30,6 @@ public: return _socket != NULL && _socket->state() == QAbstractSocket::ConnectedState; } - void sendMessage(PVSMsg newMessage); - void ping(); QString getServerName(); @@ -63,6 +61,9 @@ public: _commandDispatcher.removeListener(ident, who, func); }; +public Q_SLOTS: + void sendMessage(PVSMsg newMessage); + protected: void timerEvent(QTimerEvent *event); |
