diff options
| author | Fabian Schillinger | 2010-11-01 17:35:27 +0100 |
|---|---|---|
| committer | Fabian Schillinger | 2010-11-01 17:35:27 +0100 |
| commit | ea3fb17345e5f82db9f2e98a8062e95797700ace (patch) | |
| tree | 1da0d1a8ec9455364386af78762d0f6fed187824 /src/net/mcast | |
| parent | Process start/stop/view functionality (diff) | |
| parent | [PVSGUI] No X required for --help and --version (diff) | |
| download | pvs-ea3fb17345e5f82db9f2e98a8062e95797700ace.tar.gz pvs-ea3fb17345e5f82db9f2e98a8062e95797700ace.tar.xz pvs-ea3fb17345e5f82db9f2e98a8062e95797700ace.zip | |
Merge branch 'master' of openslx.org:pvs
Conflicts:
CMakeLists.txt
src/core/pvsConnectionManager.cpp
src/pvs.cpp
src/pvs.h
Diffstat (limited to 'src/net/mcast')
| -rw-r--r-- | src/net/mcast/CMakeLists.txt | 61 | ||||
| -rw-r--r-- | src/net/mcast/McastConfiguration.cpp | 57 | ||||
| -rw-r--r-- | src/net/mcast/McastConfiguration.h | 204 | ||||
| -rw-r--r-- | src/net/mcast/McastConstants.h | 36 | ||||
| -rw-r--r-- | src/net/mcast/McastPGMSocket.cpp | 666 | ||||
| -rw-r--r-- | src/net/mcast/McastPGMSocket.h | 81 | ||||
| -rw-r--r-- | src/net/mcast/McastReceiver.cpp | 179 | ||||
| -rw-r--r-- | src/net/mcast/McastReceiver.h | 80 | ||||
| -rw-r--r-- | src/net/mcast/McastSender.cpp | 127 | ||||
| -rw-r--r-- | src/net/mcast/McastSender.h | 73 | ||||
| -rw-r--r-- | src/net/mcast/trial_programs/CMakeLists.txt | 38 | ||||
| -rw-r--r-- | src/net/mcast/trial_programs/McastConfigArgParser.cpp | 165 | ||||
| -rw-r--r-- | src/net/mcast/trial_programs/McastConfigArgParser.h | 26 | ||||
| -rw-r--r-- | src/net/mcast/trial_programs/mcastreceive.cpp | 150 | ||||
| -rw-r--r-- | src/net/mcast/trial_programs/mcastreceive.h | 44 | ||||
| -rw-r--r-- | src/net/mcast/trial_programs/mcastsend.cpp | 122 | ||||
| -rw-r--r-- | src/net/mcast/trial_programs/mcastsend.h | 42 |
17 files changed, 2151 insertions, 0 deletions
diff --git a/src/net/mcast/CMakeLists.txt b/src/net/mcast/CMakeLists.txt new file mode 100644 index 0000000..e92b090 --- /dev/null +++ b/src/net/mcast/CMakeLists.txt @@ -0,0 +1,61 @@ +INCLUDE(../../../OpenPGMConfig.cmake) + +ADD_DEFINITIONS( + ${LIBPGM_CXXFLAGS} + -D__STDC_CONSTANT_MACROS + -D__STDC_LIMIT_MACROS +) + +# OpenPGM uses the C99 restrict keyword which g++ does not recognize: +#IF(CMAKE_COMPILER_IS_GNUCXX) +# ADD_DEFINITIONS(${LIBPGM_CXXFLAGS}) +#ENDIF(CMAKE_COMPILER_IS_GNUCXX) + +INCLUDE(${QT_USE_FILE}) + +SET(pvsmcast_MOC_HDRS + McastConfiguration.h + McastPGMSocket.h + McastReceiver.h + McastSender.h +) + +SET(pvsmcast_HDRS + McastConfiguration.h + McastPGMSocket.h + McastReceiver.h + McastSender.h +) + +SET(pvsmcast_SRCS + McastConfiguration.cpp + McastPGMSocket.cpp + McastReceiver.cpp + McastSender.cpp +) + +QT4_WRAP_CPP( + pvsmcast_MOC_SRCS + ${pvsmcast_MOC_HDRS} +) + +SET_SOURCE_FILES_PROPERTIES(${pvsmcast_SRCS} ${pvsmcast_MOC_SRCS} + PROPERTIES + OBJECT_DEPENDS "3rdparty/libpgm.a" # Make sure libpgm gets unpacked before building C++ files +) + +ADD_LIBRARY( + pvsmcast + STATIC + ${pvsmcast_HDRS} + ${pvsmcast_SRCS} + ${pvsmcast_MOC_SRCS} +) + +TARGET_LINK_LIBRARIES( + pvsmcast + pgm + ${QT_LIBRARIES} +) + +ADD_SUBDIRECTORY(trial_programs) diff --git a/src/net/mcast/McastConfiguration.cpp b/src/net/mcast/McastConfiguration.cpp new file mode 100644 index 0000000..6c5e620 --- /dev/null +++ b/src/net/mcast/McastConfiguration.cpp @@ -0,0 +1,57 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastConfiguration.cpp +# - hold Multicast protocol configuration data +# ----------------------------------------------------------------------------- +*/ + +#include <QSettings> + +#include "McastConfiguration.h" + +void McastConfiguration::loadFrom(QSettings* _settings, char const* group) +{ + if (group) + _settings->beginGroup(group); + + _multicastAddress = _settings->value("groupAddress", DEFAULT_MULTICAST_ADDRESS).toString(); + _multicastInterface = _settings->value("interface", DEFAULT_MULTICAST_INTERFACE).toString(); + _multicastMTU = _settings->value("mtu", DEFAULT_MULTICAST_MTU).value<quint16>(); + _multicastRate = _settings->value("rate", DEFAULT_MULTICAST_RATE).value<quint32>(); + _multicastUseUDP = _settings->value("use-udp", DEFAULT_MULTICAST_USEUDP).toBool(); + _multicastWinSize = _settings->value("winsize", DEFAULT_MULTICAST_WSIZ).value<quint16>(); + _multicastUDPPortBase = _settings->value("portbase", DEFAULT_MULTICAST_UDPPORT).value<quint16>(); + _multicastDPort = _settings->value("dport", DEFAULT_MULTICAST_DPORT).value<quint16>(); + _multicastSPort = _settings->value("sport", DEFAULT_MULTICAST_SPORT).value<quint16>(); + + if (group) + _settings->endGroup(); +} + +void McastConfiguration::writeTo(QSettings* _settings, char const* group) const +{ + if (group) + _settings->beginGroup(group); + + _settings->setValue("groupAddress", _multicastAddress); + _settings->setValue("interface", _multicastInterface); + _settings->setValue("mtu", _multicastMTU); + _settings->setValue("rate", _multicastRate); + _settings->setValue("use-udp", _multicastUseUDP); + _settings->setValue("winsize", _multicastWinSize); + _settings->setValue("portbase", _multicastUDPPortBase); + _settings->setValue("dport", _multicastDPort); + _settings->setValue("sport", _multicastSPort); + + if (group) + _settings->endGroup(); +} diff --git a/src/net/mcast/McastConfiguration.h b/src/net/mcast/McastConfiguration.h new file mode 100644 index 0000000..53f7a54 --- /dev/null +++ b/src/net/mcast/McastConfiguration.h @@ -0,0 +1,204 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastConfiguration.h +# - hold Multicast protocol configuration data +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTCONFIGURATION_H_ +#define MCASTCONFIGURATION_H_ + +#include <QObject> +#include <QString> +#include <QtGlobal> + +#include "McastConstants.h" + +class QSettings; + +class McastConfiguration: public QObject +{ + Q_OBJECT +public: + McastConfiguration(QObject* parent = 0) : + QObject(parent), + _multicastInterface(DEFAULT_MULTICAST_INTERFACE), + _multicastAddress(DEFAULT_MULTICAST_ADDRESS), + _multicastRate(DEFAULT_MULTICAST_RATE), + _multicastSPort(DEFAULT_MULTICAST_SPORT), + _multicastDPort(DEFAULT_MULTICAST_DPORT), + _multicastWinSize(DEFAULT_MULTICAST_WSIZ), + _multicastMTU(DEFAULT_MULTICAST_MTU), + _multicastUDPPortBase(DEFAULT_MULTICAST_UDPPORT), + _multicastUseUDP(DEFAULT_MULTICAST_USEUDP) + { + } + + McastConfiguration(McastConfiguration const& other, QObject* parent = 0) : + QObject(parent), + _multicastInterface(other._multicastInterface), + _multicastAddress(other._multicastAddress), + _multicastRate(other._multicastRate), + _multicastSPort(other._multicastSPort), + _multicastDPort(other._multicastDPort), + _multicastWinSize(other._multicastWinSize), + _multicastMTU(other._multicastMTU), + _multicastUDPPortBase(other._multicastUDPPortBase), + _multicastUseUDP(other._multicastUseUDP) + { + } + + virtual ~McastConfiguration() + { + } + + QString multicastAddress() const + { + return _multicastAddress; + } + McastConfiguration* multicastAddress(QString const& address) + { + _multicastAddress = address; + return this; + } + + quint16 multicastSPort() const + { + return _multicastSPort; + } + McastConfiguration* multicastSPort(quint16 port) + { + _multicastSPort = port; + return this; + } + + quint16 multicastDPort() const + { + return _multicastDPort; + } + McastConfiguration* multicastDPort(quint16 port) + { + _multicastDPort = port; + return this; + } + + quint32 multicastRate() const + { + return _multicastRate; + } + McastConfiguration* multicastRate(quint32 rate) + { + _multicastRate = rate; + return this; + } + + quint16 multicastWinSize() const + { + return _multicastWinSize; + } + McastConfiguration* multicastWinSize(quint16 size) + { + _multicastWinSize = size; + return this; + } + + quint16 multicastMTU() const + { + return _multicastMTU; + } + McastConfiguration* multicastMTU(quint16 mtu) + { + _multicastMTU = mtu; + return this; + } + + bool multicastUseUDP() const + { + return _multicastUseUDP; + } + McastConfiguration* multicastUseUDP(bool useUDP) + { + _multicastUseUDP = useUDP; + return this; + } + + quint16 multicastUDPPortBase() const + { + return _multicastUDPPortBase; + } + McastConfiguration* multicastUDPPortBase(quint16 port) + { + _multicastUDPPortBase = port; + return this; + } + + QString multicastInterface() const + { + return _multicastInterface; + } + McastConfiguration* multicastInterface(QString const& interface) + { + _multicastInterface = interface; + return this; + } + + quint16 multicastUDPUPort() const + { + return _multicastUDPPortBase; + } + + quint16 multicastUDPMPort() const + { + return _multicastUDPPortBase; + } + + void commit() + { + emit changed(); + } + + McastConfiguration& operator=(McastConfiguration const& source) + { + if(this != &source) + { + _multicastInterface = source._multicastInterface; + _multicastAddress = source._multicastAddress; + _multicastRate = source._multicastRate; + _multicastSPort = source._multicastSPort; + _multicastDPort = source._multicastDPort; + _multicastWinSize = source._multicastWinSize; + _multicastMTU = source._multicastMTU; + _multicastUDPPortBase = source._multicastUDPPortBase; + _multicastUseUDP = source._multicastUseUDP; + } + return *this; + } + + void loadFrom(QSettings* settings, char const* group = 0); + void writeTo(QSettings* settings, char const* group = 0) const; + +signals: + void changed(); + +private: + QString _multicastInterface; + QString _multicastAddress; + quint32 _multicastRate; + quint16 _multicastSPort; + quint16 _multicastDPort; + quint16 _multicastWinSize; + quint16 _multicastMTU; + quint16 _multicastUDPPortBase; + bool _multicastUseUDP; +}; + +#endif /* MCASTCONFIGURATION_H_ */ diff --git a/src/net/mcast/McastConstants.h b/src/net/mcast/McastConstants.h new file mode 100644 index 0000000..624e195 --- /dev/null +++ b/src/net/mcast/McastConstants.h @@ -0,0 +1,36 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastMagic.h +# - Specify the magic numbers for the McastFT protocol +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTMAGIC_H_ +#define MCASTMAGIC_H_ + +#include <stdint.h> + +#define MCASTFT_MAGIC UINT64_C(0x6d60ad83825fb7f9) +#define DEFAULT_MULTICAST_INTERFACE "" +#define DEFAULT_MULTICAST_ADDRESS "239.255.220.207" +#define DEFAULT_MULTICAST_SPORT 6964 +#define DEFAULT_MULTICAST_DPORT 6965 +#define DEFAULT_MULTICAST_USEUDP true +#define DEFAULT_MULTICAST_UDPPORT 6966 +#define DEFAULT_MULTICAST_WSIZ 30 +#define DEFAULT_MULTICAST_RATE (100*1024) +#define DEFAULT_MULTICAST_MTU 1400 +#define DEFAULT_MULTICAST_APDU 1200 +#define DEFAULT_MULTICAST_CHUNK 1024 +#define DEFAULT_MULTICAST_SHUTDOWN_TIMEOUT 10000 + +#endif /* MCASTMAGIC_H_ */ diff --git a/src/net/mcast/McastPGMSocket.cpp b/src/net/mcast/McastPGMSocket.cpp new file mode 100644 index 0000000..731fc13 --- /dev/null +++ b/src/net/mcast/McastPGMSocket.cpp @@ -0,0 +1,666 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastPGMSocket.cpp +# - wrap OpenPGM Sockets in a nicer interface -- implementation +# ----------------------------------------------------------------------------- +*/ + +#include <cstdlib> + +#include <sys/poll.h> +#include <sys/socket.h> + +#include <QByteArray> +#include <QList> +#include <QtDebug> +#include <QPointer> +#include <QSocketNotifier> +#include <QTimer> + +#include <pgm/pgm.h> + +#include "McastPGMSocket.h" + +using namespace std; + +class McastPGMSocket_priv +{ +public: + McastPGMSocket_priv() : + socket(0), + send_notif(0) + { + } + ~McastPGMSocket_priv() + { + if (socket) + pgm_close(socket, 0); + Q_FOREACH(QSocketNotifier* notif, _notifs) + { + delete notif; + } + if (send_notif) + delete send_notif; + } + + pgm_sock_t* socket; + McastPGMSocket::Direction direction; + QSocketNotifier* send_notif; + QList<QSocketNotifier*> _notifs; + + QSocketNotifier* notifier_for(int fd) { + Q_FOREACH(QSocketNotifier* notif, _notifs) + { + if(notif->socket() == fd) + { + return notif; + } + } + return 0; + } +}; + +static void _ensurePGMInited() +{ + if (!pgm_supported()) + { + pgm_error_t* err; + int good = pgm_init(&err); + if (!good) + { + qCritical() << "Could not init OpenPGM library: PGM Error: " << (err->message ? err->message : "(null)"); + std::exit(1); + } + } +} + +McastPGMSocket::McastPGMSocket(QObject* parent) : + QObject(parent), + _priv(new McastPGMSocket_priv), + _opened(false), + _finished(false), + _nakTimeout(new QTimer()), + _dataTimeout(new QTimer()), + _sendTimeout(new QTimer()), + _shutdownTimer(0), + _shutdown_timeout(0) +{ + _ensurePGMInited(); + + _nakTimeout->setSingleShot(true); + _dataTimeout->setSingleShot(true); + _sendTimeout->setSingleShot(true); + + connect(_nakTimeout, SIGNAL(timeout()), this, SLOT(handleNakTimeout())); + connect(_dataTimeout, SIGNAL(timeout()), this, SLOT(handleDataTimeout())); + connect(_sendTimeout, SIGNAL(timeout()), this, SLOT(canSend())); +} + +McastPGMSocket::~McastPGMSocket() +{ + delete _priv; + delete _nakTimeout; + delete _dataTimeout; + delete _sendTimeout; +} + +bool McastPGMSocket::open(McastConfiguration const* config, Direction direction) +{ + _priv->direction = direction; + + pgm_error_t* err = 0; + int good; + + pgm_addrinfo_t* addrinfo; + // parse the address string + good = pgm_getaddrinfo((config->multicastInterface() + ";" + config->multicastAddress()).toLatin1().constData(), + 0, &addrinfo, &err); + if (!good) + { + qCritical() << "Could not parse address info: PGM Error: " + << err->message; + return false; + } + + sa_family_t family = addrinfo->ai_send_addrs[0].gsr_group.ss_family; + + if(config->multicastUseUDP()) + { + good = pgm_socket(&_priv->socket, family, SOCK_SEQPACKET, IPPROTO_UDP, &err); + } + else + { + good = pgm_socket(&_priv->socket, family, SOCK_SEQPACKET, IPPROTO_PGM, &err); + } + + if (!good) + { + qCritical() << "Could not open socket: PGM Error: " << err->message; + pgm_error_free(err); + return false; + } + + unsigned const ambient_spm = 2000 * 1000; // every one hundred milliseconds (approx.) + + /* Options for sending data */ + const int spm_heartbeat[] = + { 512 * 1000, + 1024 * 1000, + 2048 * 1000, + 4096 * 1000 }, + max_rate = 0, + max_window = config->multicastWinSize() * config->multicastRate() / config->multicastMTU(); + // const int max_window_sqns = 3000; + qDebug() << "Computed window size " << max_window << " packets"; + +// pgm_setsockopt(_priv->socket, PGM_SEND_ONLY, &send_only, +// sizeof(send_only)); + + // SPM messages + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_AMBIENT_SPM, &ambient_spm, sizeof(ambient_spm)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_HEARTBEAT_SPM, &spm_heartbeat, sizeof(spm_heartbeat)); + + // Transmit window + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_TXW_MAX_RTE, &max_rate, sizeof(max_rate)); +// pgm_setsockopt(_priv->socket, PGM_TXW_SECS, &max_window, sizeof(max_window)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_TXW_SQNS, &max_window, sizeof(max_window)); + + /* Options for receiving data */ + const int passive = 0, + spmr_expiry = 500 * 1000, + nak_bo_ivl = 200 * 1000, + nak_rpt_ivl = 500 * 1000, + nak_rdata_ivl = 500 * 1000, + nak_data_retries = 50, + nak_ncf_retries = 50; + qDebug() << "Computed window size " << max_window << " packets"; + +// pgm_setsockopt(_priv->socket, PGM_RECV_ONLY, &recv_only, sizeof(recv_only)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_PASSIVE, &passive, sizeof(passive)); +// pgm_setsockopt(_priv->socket, PGM_RXW_MAX_RTE, &max_rate, sizeof(max_rate)); +// pgm_setsockopt(_priv->socket, PGM_RXW_SECS, &max_window, sizeof(max_window)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_RXW_SQNS, &max_window, sizeof(max_window)); + + /* Try using PGMCC */ + const struct pgm_pgmccinfo_t pgmccinfo = { + 100 /* usecs */ * 1000 /* msecs */, + 75 /* from OpenPGM examples */, + 500 /* from PGMCC internet-draft */ + }; + good = pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_USE_PGMCC, &pgmccinfo, sizeof(pgmccinfo)); + if(!good) + { + qCritical() << "Could not enable PGMCC"; + return false; + } + +// /* Forward Error Correction */ +// const struct pgm_fecinfo_t pgmfecinfo = { +// 255 /* from OpenPGM examples */, +// 2 /* send two proactive packets */, +// 8 /* from OpenPGM examples */, +// 1 /* enable on-demand parity */, +// 1 /* enable variable packet length */ +// }; +// good = pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_USE_FEC, &pgmfecinfo, sizeof(pgmfecinfo)); +// if(!good) +// { +// qCritical() << "Could not enable FEC"; +// return false; +// } + + // Peer Expiry: We will give 1 minute. + int const peer_expiry = 60 /* seconds */ * 1000000 /* microseconds */; + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry)); + + // MTU + int const mtu = config->multicastMTU(); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MTU, &mtu, sizeof(mtu)); + + pgm_sockaddr_t addr; + addr.sa_addr.sport = config->multicastSPort(); + addr.sa_port = config->multicastDPort(); + good = pgm_gsi_create_from_hostname(&addr.sa_addr.gsi, &err); + if (!good) + { + qCritical() << "Could not generate a GSI: PGM Error: " << err->message; + pgm_error_free(err); + return false; + } + + struct pgm_interface_req_t ifreq; + ifreq.ir_interface = addrinfo->ai_send_addrs[0].gsr_interface; + ifreq.ir_scope_id = 0; + if (AF_INET6 == family) + { + ifreq.ir_scope_id = ((struct sockaddr_in6*)&addrinfo->ai_send_addrs[0])->sin6_scope_id; + } + + // UDP Encapsulation + if(config->multicastUseUDP()) + { + const int uport = config->multicastUDPUPort(); + const int mport = config->multicastUDPMPort(); + + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, &mport, sizeof(mport)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &uport, sizeof(uport)); + } + + good = pgm_bind3(_priv->socket, &addr, sizeof(addr), &ifreq , sizeof(ifreq), &ifreq, sizeof(ifreq), &err); + if (!good) + { + qCritical() << "Could not bind socket: PGM Error: " << err->message; + pgm_error_free(err); + return false; + } + + // qDebug() << "Max APDU is " << _priv->socket->max_apdu; + // qDebug() << "Max TPDU is " << _priv->socket->max_tpdu; + // qDebug() << "Max TSDU Fragment is " << _priv->socket->max_tsdu_fragment; + // qDebug() << "TXW_SQNS is " << _priv->socket->txw_sqns; + + // join the group + for (unsigned i = 0; i < addrinfo->ai_recv_addrs_len; i++) + { + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_JOIN_GROUP, + &addrinfo->ai_recv_addrs[i], sizeof(struct group_req)); + } + + // set send address + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_SEND_GROUP, &addrinfo->ai_send_addrs[0], + sizeof(struct group_req)); + + // IP parameters + const int nonblocking = 1, multicast_loop = 0, multicast_hops = 16; + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MULTICAST_LOOP, &multicast_loop, + sizeof(multicast_loop)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MULTICAST_HOPS, &multicast_hops, + sizeof(multicast_hops)); + pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking, + sizeof(nonblocking)); + + good = pgm_connect(_priv->socket, &err); + if (!good) + { + qCritical() << "Could not connect socket: PGM Error: " << err->message; + pgm_error_free(err); + return false; + } + + _opened = true; + + setupNotifiers(); + + pgm_freeaddrinfo(addrinfo); + + /* Prime the generation of SPM packets during the waiting period */ + if(_priv->direction == PSOCK_WRITE) + QTimer::singleShot(0, this, SLOT(handleNak())); + + return true; +} + +void McastPGMSocket::setupNotifiers() +{ + int recv_sock, repair_sock, pending_sock; + char const* slotname = (_priv->direction == PSOCK_WRITE) ? SLOT(handleNak(int)) : SLOT(handleData(int)); + + struct pollfd pollin[10]; + int in_nfds = 10; + pgm_poll_info(_priv->socket, pollin, &in_nfds, POLLIN); + for(int i = 0; i < in_nfds; i++) + { + QSocketNotifier* notif = new QSocketNotifier(pollin[i].fd, QSocketNotifier::Read, this); + _priv->_notifs.append(notif); + connect(notif, SIGNAL(activated(int)), this, slotname); + } + + if(_priv->direction == PSOCK_WRITE) + { + struct pollfd pfd; + int nfds = 1; + pgm_poll_info(_priv->socket, &pfd, &nfds, POLLOUT); + _priv->send_notif = new QSocketNotifier(pfd.fd, QSocketNotifier::Write, this); + connect(_priv->send_notif, SIGNAL(activated(int)), this, SLOT(canSend())); + } +} + +void McastPGMSocket::handleNak(int fd) +{ + qDebug() << "handleNak(" << fd << ")"; + + QSocketNotifier* notif = _priv->notifier_for(fd); + notif->setEnabled(false); + + if (_shutdownTimer) + { + _shutdownTimer->start(_shutdown_timeout); + qDebug() << "Started shutdown timer"; + } + + handleNak(); + + notif->setEnabled(true); +} + +void McastPGMSocket::handleNak() +{ + if (_finished) + return; + + // QTimer::singleShot(1000, this, SLOT(handleNakTimeout())); + + // to handle NAKs in OpenPGM, we need to pgm_recv: + char buf[4096]; + pgm_error_t* err = 0; + + int status; + // while we don't block: + do + { + status = pgm_recv(_priv->socket, buf, sizeof(buf), MSG_DONTWAIT, 0, &err); + + if(status == PGM_IO_STATUS_TIMER_PENDING) + { + struct timeval tv; + socklen_t size = sizeof(tv); + pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &size); + const long usecs = tv.tv_sec * 1000000 + tv.tv_usec; + int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + if(msecs == 0) + msecs = 1; + qDebug() << " timer pending: " << usecs << "us (rounded to " << msecs << "ms)"; + _nakTimeout->start(msecs); + break; + } + else if(status == PGM_IO_STATUS_RATE_LIMITED) + { + struct timeval tv; + socklen_t size = sizeof(tv); + pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size); + int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + if(msecs == 0) + msecs = 1; + qDebug() << " rate limited: " << msecs << "ms"; + _nakTimeout->start(msecs); + break; + } + else if(status == PGM_IO_STATUS_WOULD_BLOCK) + { + qDebug() << " wouldblock"; + break; + } + else + { + if(err) + { + qCritical() << "Could not handle NAKs: PGM Error: " << err->message; + pgm_error_free(err); + err = 0; + } + } + } + while (true); +} + +void McastPGMSocket::handleNakTimeout() +{ + qDebug() << "handleNakTimeout()"; + + handleNak(); +} + +void McastPGMSocket::handleData(int fd) +{ + // need to guard against destruction in finish() via signals/slots + QPointer<QSocketNotifier> notif(_priv->notifier_for(fd)); + notif->setEnabled(false); + + handleData(); + + if (notif) + notif->setEnabled(true); +} + +void McastPGMSocket::handleData() +{ + if (_finished) { + return; + } + + int status; + do + { + char buf[4096]; + size_t size; + pgm_error_t* err = 0; + + status = pgm_recv(_priv->socket, buf, sizeof(buf), MSG_DONTWAIT, &size, &err); + + if (status == PGM_IO_STATUS_NORMAL) + { + qDebug() << " normally received"; + if(size > 0) + { + QByteArray bytes(buf, size); + emit receivedPacket(bytes); + } + } + else if (status == PGM_IO_STATUS_WOULD_BLOCK) + { + qDebug() << " would block"; + // nothing more to do this time + break; + } + else if (status == PGM_IO_STATUS_TIMER_PENDING) + { + struct timeval tv; + socklen_t size = sizeof(tv); + pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &size); + const long usecs = tv.tv_sec * 1000000 + tv.tv_usec; + int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + if(msecs == 0) + msecs = 1; + qDebug() << " timer pending: " << usecs << "us (rounded to " << msecs << "ms)"; + _dataTimeout->start(msecs); + break; + } + else if (status == PGM_IO_STATUS_RATE_LIMITED) + { + struct timeval tv; + socklen_t size = sizeof(tv); + pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size); + int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + if(msecs == 0) + msecs = 1; + qDebug() << " rate limit pending: " << msecs << "ms"; + _dataTimeout->start(msecs); + break; + } + else if (status == PGM_IO_STATUS_RESET) + { + qDebug() << " connection reset"; + emit connectionReset(); + qCritical() << "Connection Reset: PGM Error: " << (err ? err->message : "(null)"); + pgm_error_free(err); + break; + } + else if (status == PGM_IO_STATUS_FIN) + { + qDebug() << " connection finished"; + emit connectionFinished(); + break; + } + else + { + if(err) + { + qCritical() << "Could not read packet: PGM Error: " << (err ? err->message: "(null)"); + break; + } + } + + // the socket might have been closed from under us + if (!_priv->socket) + break; + } + while (true); +} + +void McastPGMSocket::handleDataTimeout() +{ + qDebug() << "handleDataTimeout()"; + + handleData(); +} + +void McastPGMSocket::canSend() +{ + if (_finished) + return; + + + if (_priv->send_notif) + { + _priv->send_notif->setEnabled(false); + } + + if(_q.isEmpty()) + { + emit readyToSend(); + } + else + { + QByteArray const packet(_q.head()); + int status; + + status = pgm_send(_priv->socket, packet.constData(), packet.size(), 0); + + if(status == PGM_IO_STATUS_NORMAL) + { + _q.dequeue(); + if(!_q.isEmpty()) + { + _priv->send_notif->setEnabled(true); + } + else + { + emit readyToSend(); + } + } + else if(status == PGM_IO_STATUS_WOULD_BLOCK) + { + _priv->send_notif->setEnabled(true); + } + else if(status == PGM_IO_STATUS_CONGESTION) + { + qDebug() << " congested..."; + // wait a short time (10ms?) + _sendTimeout->start(10); + } + else if(status == PGM_IO_STATUS_RATE_LIMITED) + { + struct timeval tv; + socklen_t size = sizeof(tv); + pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size); + int msecs = (tv.tv_sec * 1000) + ((tv.tv_usec + 999) / 1000); + if(msecs == 0) + msecs = 1; + qDebug() << " rate_limited, waiting" << msecs << "ms"; + _sendTimeout->start(msecs); + } + else + { + qCritical() << "Unhandled status in canSend():" << status; + } + } + + if (_shutdownTimer) + _shutdownTimer->start(_shutdown_timeout); +} + +void McastPGMSocket::sendPacket(QByteArray const& bytes) +{ + if(_shutdownTimer) + { + qCritical() << "Logic error: sendPacket() after shutdown()"; + } + + _q.enqueue(bytes); + _priv->send_notif->setEnabled(true); +} + +void McastPGMSocket::finish() +{ + if (_finished) + { + return; + } + + qDebug() << "finish()"; + + Q_FOREACH(QSocketNotifier* notif, _priv->_notifs) + { + notif->setEnabled(false); + delete notif; + } + _priv->_notifs.clear(); + + if(_priv->send_notif) + { + delete _priv->send_notif; + _priv->send_notif = 0; + } + + if (_priv->socket) + { + pgm_close(_priv->socket, 1); + _priv->socket = 0; + } + + _finished = true; + + emit connectionFinished(); + + qDebug() << "Socket finished"; +} + +bool McastPGMSocket::finished() const +{ + return _finished; +} + +bool McastPGMSocket::isOpen() const +{ + return _opened && !_finished; +} + +void McastPGMSocket::shutdown(int interval) +{ + if(_priv->direction == PSOCK_READ) + return; + + _shutdown_timeout = interval; + _shutdownTimer = new QTimer(this); + connect(_shutdownTimer, SIGNAL(timeout()), this, SLOT(finish())); + if (_q.isEmpty()) + { + _shutdownTimer->start(_shutdown_timeout); + qDebug() << "Started shutdown timer"; + } +} diff --git a/src/net/mcast/McastPGMSocket.h b/src/net/mcast/McastPGMSocket.h new file mode 100644 index 0000000..4ccf931 --- /dev/null +++ b/src/net/mcast/McastPGMSocket.h @@ -0,0 +1,81 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastPGMSocket.h +# - wrap OpenPGM Sockets in a nicer interface -- interface +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTPGMSOCKET_H_ +#define MCASTPGMSOCKET_H_ + +#include <QByteArray> +#include <QObject> +#include <QQueue> + +#include <src/net/mcast/McastConfiguration.h> +#include <src/net/mcast/McastConstants.h> + +class McastPGMSocket_priv; +class QTimer; + +class McastPGMSocket : public QObject +{ + Q_OBJECT +public: + enum Direction { + PSOCK_READ, + PSOCK_WRITE + }; + + McastPGMSocket(QObject* parent = 0); + virtual ~McastPGMSocket(); + + bool open(McastConfiguration const* config, Direction direction); + bool finished() const; + bool isOpen() const; + void shutdown(int interval = DEFAULT_MULTICAST_SHUTDOWN_TIMEOUT); + +signals: + void readyToSend(); + void receivedPacket(QByteArray const& bytes); + void connectionReset(); + void connectionFinished(); + void shutdownComplete(); + +public slots: + void sendPacket(QByteArray const& bytes); + void finish(); + +private slots: + void handleNak(int fd); + void handleData(int fd); + void handleNak(); + void handleData(); + void handleNakTimeout(); + void handleDataTimeout(); + void canSend(); + +private: + McastPGMSocket_priv* _priv; + QQueue<QByteArray> _q; + bool _finished; + bool _opened; + QTimer* _nakTimeout; + QTimer* _dataTimeout; + QTimer* _sendTimeout; + QTimer* _shutdownTimer; + int _shutdown_timeout; + + void setupNotifiers(); +}; + +#endif /* MCASTPGMSOCKET_H_ */ diff --git a/src/net/mcast/McastReceiver.cpp b/src/net/mcast/McastReceiver.cpp new file mode 100644 index 0000000..1f27127 --- /dev/null +++ b/src/net/mcast/McastReceiver.cpp @@ -0,0 +1,179 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastReceiver.h +# - implement the receiver-side multicast file transfer protocol -- implementation +# ----------------------------------------------------------------------------- +*/ + +#include <QDataStream> +#include <QtDebug> +#include <QtGlobal> + +#include <pgm/pgm.h> +// OpenPGM #defines bool. This is bad in C++. +#undef bool + +#include "McastConstants.h" +#include "McastReceiver.h" + +McastReceiver::McastReceiver(QIODevice* iodev, McastConfiguration* config, QObject* parent) : + QObject(parent), + _config(config ? new McastConfiguration(*config) : 0), + _socket(0), + _curoffs(0), + _closed(false), + _hash(QCryptographicHash::Md5), + _iodev(iodev) +{ + _config->setParent(this); +} + +McastReceiver::~McastReceiver() +{ + if (_config) + delete _config; +} + +void McastReceiver::config(McastConfiguration const* config) +{ + if (_config) + delete _config; + _config = new McastConfiguration(*config, this); +} + +bool McastReceiver::start() +{ + McastConfiguration *config = _config; + if (!config) + config = new McastConfiguration(); + + if (_socket) + { + delete _socket; + } + _socket = new McastPGMSocket(this); + + connect(_socket, SIGNAL(receivedPacket(QByteArray)), SLOT(receivedPacket(QByteArray))); + connect(_socket, SIGNAL(connectionReset()), SLOT(connectionReset())); + // connect(_socket, SIGNAL(connectionFinished()), this, SLOT(connectionFinished())); + if (_socket->open(_config, McastPGMSocket::PSOCK_READ)) + { + return true; + } + else + { + disconnect(_socket, SIGNAL(receivedPacket(QByteArray)), this, SLOT(receivedPacket(QByteArray))); + disconnect(_socket, SIGNAL(connectionReset()), this, SLOT(connectionReset())); + return false; + } +} + +void McastReceiver::abort() +{ + if (_socket) + { + delete _socket; + _socket = 0; + } + + if (_iodev) + { + _iodev->close(); + } +} + +void McastReceiver::receivedPacket(QByteArray const& bytes) +{ + if(_closed) + return; + + quint16 checksum_should = qChecksum(bytes.constData(), bytes.size() - 2); + + QDataStream strm(bytes); + strm.setByteOrder(QDataStream::BigEndian); + + // read the packet + quint64 magic; + quint64 offset; + quint16 checksum; + + + strm >> magic; + if(magic != MCASTFT_MAGIC) + { + qCritical() << "Received packet whose magic number does not match. Ignoring."; + return; + } + + strm >> offset; + qDebug() << " Received packet for offset" << offset; + + if (offset == UINT64_C(0xffffffffffffffff)) + { + // this is the end of the data stream. + QByteArray md5; + strm >> md5; + + quint16 fchecksum; + strm >> fchecksum; + + // compare the hash value + if ((fchecksum != checksum_should) || (md5 != _hash.result())) + { + _close(RES_MD5_MISMATCH); + } + else + { + _close(RES_OK); + } + + return; + } + else if (offset != _curoffs) + { + qCritical() << "Packet loss or double delivery. PGM should have prevented this. Bailing out."; + _close(RES_OFFSET_MISMATCH); + return; + } + + QByteArray contents; + strm >> contents; + _curoffs += contents.size(); + + strm >> checksum; + if(checksum != checksum_should) + { + qCritical() << "Checksum does not match. Bailing out."; + _close(RES_CHECKSUM_MISMATCH); + return; + } + + _hash.addData(contents); + + _iodev->write(contents); + + emit progress(_curoffs); +} + +void McastReceiver::connectionReset() +{ + _close(RES_CONNECTION_RESET); +} + +void McastReceiver::_close(Result result) +{ + _iodev->close(); + _socket->finish(); + + _closed = true; + emit finished(result); +} diff --git a/src/net/mcast/McastReceiver.h b/src/net/mcast/McastReceiver.h new file mode 100644 index 0000000..247733d --- /dev/null +++ b/src/net/mcast/McastReceiver.h @@ -0,0 +1,80 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastReceiver.h +# - implement the receiver-side multicast file transfer protocol -- interface +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTRECEIVER_H_ +#define MCASTRECEIVER_H_ + +#include <QByteArray> +#include <QCryptographicHash> +#include <QIODevice> +#include <QObject> +#include <QtGlobal> + +#include <src/net/mcast/McastConfiguration.h> +#include <src/net/mcast/McastPGMSocket.h> + +class McastReceiver : public QObject +{ + Q_OBJECT +public: + enum Result { + RES_OK, + RES_ABORTED, + RES_OFFSET_MISMATCH, + RES_CHECKSUM_MISMATCH, + RES_MD5_MISMATCH, + RES_CONNECTION_RESET + }; + + McastReceiver(QIODevice* iodev, McastConfiguration* config = 0, QObject* parent = 0); + virtual ~McastReceiver(); + + McastConfiguration* config() + { + return _config; + } + + void config(McastConfiguration const* config); + + static inline bool is_error(Result result) + { + return result != RES_OK; + } + +signals: + void finished(int result); + void progress(quint64 offset); + +public slots: + bool start(); + void abort(); + +private: + McastConfiguration* _config; + McastPGMSocket* _socket; + quint64 _curoffs; + bool _closed; + QCryptographicHash _hash; + QIODevice* _iodev; + +private slots: + void receivedPacket(QByteArray const& bytes); + void connectionReset(); + + void _close(Result result); +}; + +#endif /* MCASTRECEIVER_H_ */ diff --git a/src/net/mcast/McastSender.cpp b/src/net/mcast/McastSender.cpp new file mode 100644 index 0000000..3fec6a4 --- /dev/null +++ b/src/net/mcast/McastSender.cpp @@ -0,0 +1,127 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastReceiver.h +# - implement the receiver-side multicast file transfer protocol -- implementation +# ----------------------------------------------------------------------------- +*/ + +#include "McastSender.h" +#include "McastConstants.h" + +#include <QDataStream> +#include <QTimer> + +#include <pgm/pgm.h> +// OpenPGM #defines bool. This is bad in C++. +#undef bool + +#define MCASTFT_START_DEFER_TIME 2000 /* msec */ + +McastSender::McastSender(QIODevice* iodev, McastConfiguration const* config, QObject* parent) : + QObject(parent), + _config(config ? new McastConfiguration(*config) : new McastConfiguration()), + _socket(0), + _iodev(iodev), + _curoffs(0), + _hash(QCryptographicHash::Md5), + _finished(false) +{ +} + +McastSender::~McastSender() +{ + delete _config; +} + +void McastSender::start() +{ + _socket = new McastPGMSocket(this); + connect(_socket, SIGNAL(readyToSend()), this, SLOT(deferredStart())); + _socket->open(_config, McastPGMSocket::PSOCK_WRITE); +} + +void McastSender::start(McastPGMSocket* socket) +{ + _socket = socket; + Q_ASSERT(_socket->isOpen()); + deferredStart(); +} + +void McastSender::deferredStart() +{ + // Wait some time, to give the PGM library the chance to generate some + // undisturbed SPM messages: + QTimer::singleShot(MCASTFT_START_DEFER_TIME, this, SLOT(readyToSend())); + disconnect(_socket, SIGNAL(readyToSend()), this, SLOT(deferredStart())); + connect(_socket, SIGNAL(readyToSend()), this, SLOT(readyToSend())); +} + +void McastSender::readyToSend() +{ + if(_finished) + return; + + if(_iodev->atEnd()) + { + QByteArray fpdu; + QDataStream strm(&fpdu, QIODevice::WriteOnly); + strm.setByteOrder(QDataStream::BigEndian); + + strm << (quint64)MCASTFT_MAGIC << (quint64)UINT64_C(0xffffffffffffffff) << _hash.result(); + strm << qChecksum(fpdu.constData(), fpdu.size()); + + _socket->sendPacket(fpdu); + connect(_socket, SIGNAL(connectionFinished()), this, SLOT(socketFinished())); + _socket->shutdown(); + + _finished = true; + + _iodev->close(); + + emit allSent(); + } + else + { + QByteArray barr(DEFAULT_MULTICAST_APDU, '\0'); + qint64 len_read; + len_read = _iodev->read(barr.data(), barr.capacity()); + barr.resize((int)len_read); + + _hash.addData(barr); + + QByteArray pdu; + QDataStream strm(&pdu, QIODevice::WriteOnly); + strm.setByteOrder(QDataStream::BigEndian); + + strm << (quint64)MCASTFT_MAGIC << _curoffs; + strm << barr; + quint16 checksum = qChecksum(pdu.constData(), pdu.size()); + strm << checksum; + + _curoffs += len_read; + + _socket->sendPacket(pdu); + + emit progress(_curoffs); + } +} + +void McastSender::close() +{ + _socket->finish(); +} + +void McastSender::socketFinished() +{ + _socket->deleteLater(); + emit finished(); +} diff --git a/src/net/mcast/McastSender.h b/src/net/mcast/McastSender.h new file mode 100644 index 0000000..0c5e29f --- /dev/null +++ b/src/net/mcast/McastSender.h @@ -0,0 +1,73 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastReceiver.h +# - implement the sender-side multicast file transfer protocol -- interface +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTSENDER_H_ +#define MCASTSENDER_H_ + +#include <QCryptographicHash> +#include <QIODevice> +#include <QObject> + +#include "McastConfiguration.h" +#include "McastPGMSocket.h" + +class McastSender : public QObject +{ + Q_OBJECT +public: + McastSender(QIODevice* iodev = 0, McastConfiguration const* config = 0, QObject* parent = 0); + virtual ~McastSender(); + + McastConfiguration* config() + { + return _config; + } + + QIODevice* iodevice() const + { + return _iodev; + } + + void setIODevice(QIODevice* iodevice) + { + _iodev = iodevice; + } + +signals: + void finished(); + void progress(quint64 offset); + void allSent(); + +public slots: + void start(); + void start(McastPGMSocket* openSocket); + void close(); + +private slots: + void deferredStart(); + void readyToSend(); + void socketFinished(); + +private: + McastConfiguration* _config; + McastPGMSocket* _socket; + QIODevice* _iodev; + quint64 _curoffs; + QCryptographicHash _hash; + bool _finished; +}; + +#endif /* MCASTSENDER_H_ */ diff --git a/src/net/mcast/trial_programs/CMakeLists.txt b/src/net/mcast/trial_programs/CMakeLists.txt new file mode 100644 index 0000000..d0f68fa --- /dev/null +++ b/src/net/mcast/trial_programs/CMakeLists.txt @@ -0,0 +1,38 @@ +INCLUDE(${QT_USE_FILE}) + +QT4_WRAP_CPP( + mcastsend_MOC + mcastsend.h +) + +QT4_WRAP_CPP( + mcastreceive_MOC + mcastreceive.h +) + +SET(argparser_SRC + McastConfigArgParser.h + McastConfigArgParser.cpp +) + +ADD_EXECUTABLE(mcastsend + mcastsend.cpp + mcastsend.h + ${argparser_SRC} + ${mcastsend_MOC} +) + +ADD_EXECUTABLE(mcastreceive + mcastreceive.cpp + mcastreceive.h + ${argparser_SRC} + ${mcastreceive_MOC} +) + +TARGET_LINK_LIBRARIES(mcastsend + pvsmcast +) + +TARGET_LINK_LIBRARIES(mcastreceive + pvsmcast +)
\ No newline at end of file diff --git a/src/net/mcast/trial_programs/McastConfigArgParser.cpp b/src/net/mcast/trial_programs/McastConfigArgParser.cpp new file mode 100644 index 0000000..881f728 --- /dev/null +++ b/src/net/mcast/trial_programs/McastConfigArgParser.cpp @@ -0,0 +1,165 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/McastConfigArgParser.cpp +# - Parse common Multicast Configuration CLI arguments +# ----------------------------------------------------------------------------- +*/ + +#include <iostream> + +#include <QCoreApplication> + +#include "McastConfigArgParser.h" + +using namespace std; + +bool parseMcastConfigArg(QStringList::iterator& i, QStringList::iterator const& end, McastConfiguration* config) +{ + QString arg = *i; + + if (arg == "-addr") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + config->multicastAddress(*i); + } + else if (arg == "-dport") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint16 dport = (quint16)i->toInt(&ok); + if (!ok) + { + cerr << "Error: dport is not an integer" << endl; + return false; + } + config->multicastDPort(dport); + } + else if (arg == "-sport") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint16 sport = (quint16)i->toInt(&ok); + if (!ok) + { + cerr << "Error: sport is not an integer" << endl; + return false; + } + config->multicastSPort(sport); + } + else if (arg == "-mtu") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint16 mtu = (quint16)i->toInt(&ok); + if (!ok) + { + cerr << "Error: MTU is not an integer" << endl; + return false; + } + config->multicastMTU(mtu); + } + else if (arg == "-rate") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint32 rate = i->toInt(&ok); + if (!ok) + { + cerr << "Error: Rate is not an integer" << endl; + return false; + } + config->multicastRate(rate); + } + else if (arg == "-winsize") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint16 winsize = (quint16)i->toInt(&ok); + if (!ok) + { + cerr << "Error: Winsize is not an integer" << endl; + return false; + } + config->multicastWinSize(winsize); + } + else if (arg == "-udp") + { + config->multicastUseUDP(true); + } + else if (arg == "-no-udp") + { + config->multicastUseUDP(false); + } + else if (arg == "-udp-port") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint16 udpport = (quint16)i->toInt(&ok); + if (!ok) + { + cerr << "Error: UDP-Port is not an integer" << endl; + return false; + } + config->multicastUDPPortBase(udpport); + } + else if (arg == "-intf") + { + i++; + if (i == end) + { + cerr << "Option " << arg.toLatin1().constData() << "is missing argument" << endl; + return false; + } + config->multicastInterface(*i); + } + else + { + return false; + } + + return true; +} diff --git a/src/net/mcast/trial_programs/McastConfigArgParser.h b/src/net/mcast/trial_programs/McastConfigArgParser.h new file mode 100644 index 0000000..4fb18a7 --- /dev/null +++ b/src/net/mcast/trial_programs/McastConfigArgParser.h @@ -0,0 +1,26 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/McastConfigArgParser.h +# - Parse common Multicast Configuration CLI arguments +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTCONFIGARGPARSER_H_ +#define MCASTCONFIGARGPARSER_H_ + +#include <QStringList> + +#include "../McastConfiguration.h" + +bool parseMcastConfigArg(QStringList::iterator& i, QStringList::iterator const& end, McastConfiguration* config); + +#endif /* MCASTCONFIGARGPARSER_H_ */ diff --git a/src/net/mcast/trial_programs/mcastreceive.cpp b/src/net/mcast/trial_programs/mcastreceive.cpp new file mode 100644 index 0000000..48a0f10 --- /dev/null +++ b/src/net/mcast/trial_programs/mcastreceive.cpp @@ -0,0 +1,150 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/mcastsend.cpp +# - Receive a file via the PVS Mcast protocol +# ----------------------------------------------------------------------------- +*/ + +#include <iostream> + +#include <QCoreApplication> +#include <QFile> +#include <QStringList> +#include <QTimer> + +#include "mcastreceive.h" +#include "McastConfigArgParser.h" +#include "../McastConfiguration.h" +#include "../McastReceiver.h" + +using namespace std; + +int main(int argc, char** argv) +{ + QCoreApplication app(argc, argv); + McastReceive me; + + QTimer::singleShot(0, &me, SLOT(run())); + + return app.exec(); +} + +void McastReceive::run() +{ + QStringList args = QCoreApplication::arguments(); + QStringList::iterator i = args.begin(); + QStringList::iterator const end = args.end(); + + QString filename(""); + + McastConfiguration config; + + ++i; + while (i != end) + { + QString arg = *i; + + cerr << "Arg: " << arg.toLatin1().constData() << endl; + + if (arg == "-file") + { + ++i; + if (i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing its argument" << endl; + QCoreApplication::exit(1); + return; + } + filename = *i; + } + else if (arg == "-help") + { + cerr << "Options:" << endl << endl + << " -file <FILE> Receive to file FILE" << endl + << " -addr <ADDR> Use ADDR as address specification" << endl + << " -dport <PORT> Send to port PORT" << endl + << " -sport <PORT> Send from port PORT" << endl + << " -mtu <BYTES> Set MTU to BYTES" << endl + << " -rate <BYTES> Send BYTES per second" << endl + << " -winsize <SECONDS> Set Window Size to SECONDS" << endl + << " -udp Use UDP encapsulation" << endl + << " -udp-port PORT Use UDP port PORT" << endl; + QCoreApplication::quit(); + return; + } + else + { + if (!parseMcastConfigArg(i, end, &config)) + { + cerr << "Unknown argument: " << arg.toLatin1().constData() << endl; + QCoreApplication::exit(1); + return; + } + } + + ++i; + } + + if (filename == "") + { + cerr << "No Filename given" << endl; + QCoreApplication::exit(1); + return; + } + + _target = new QFile(filename, this); + _target->open(QIODevice::WriteOnly); + + McastReceiver* recv = new McastReceiver(_target, &config, this); + + connect(recv, SIGNAL(finished(int)), this, SLOT(finished(int))); + + QTimer::singleShot(0, recv, SLOT(start())); +} + +void McastReceive::finished(int state) +{ + cerr << "finished: "; + + switch(state) + { + case McastReceiver::RES_OK: + cerr << "OK." << endl; + break; + case McastReceiver::RES_ABORTED: + cerr << "Aborted." << endl; + goto failed; + case McastReceiver::RES_CHECKSUM_MISMATCH: + cerr << "Checksum mismatch." << endl; + goto failed; + case McastReceiver::RES_CONNECTION_RESET: + cerr << "Connection reset." << endl; + goto failed; + case McastReceiver::RES_MD5_MISMATCH: + cerr << "MD5 mismatch." << endl; + goto failed; + case McastReceiver::RES_OFFSET_MISMATCH: + cerr << "Offset mismatch. Undetected packet loss?" << endl; + goto failed; + default: + cerr << "Unknown error code!" << endl; + goto failed; + } + + QCoreApplication::quit(); + return; +failed: + cerr << "Deleting file." << endl; + _target->remove(); + QCoreApplication::exit(1); + return; +} diff --git a/src/net/mcast/trial_programs/mcastreceive.h b/src/net/mcast/trial_programs/mcastreceive.h new file mode 100644 index 0000000..3e72d4c --- /dev/null +++ b/src/net/mcast/trial_programs/mcastreceive.h @@ -0,0 +1,44 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/mcastsend.cpp +# - Receive a file via the PVS Mcast protocol +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTRECEIVE_H_ +#define MCASTRECEIVE_H_ + +#include <QObject> + +class QFile; +class McastReceiver; + +class McastReceive : public QObject +{ + Q_OBJECT +public: + McastReceive() : + QObject(), + _receiver(0) + { + } + +public slots: + void run(); + void finished(int state); + +private: + McastReceiver* _receiver; + QFile* _target; +}; + +#endif /* MCASTRECEIVE_H_ */ diff --git a/src/net/mcast/trial_programs/mcastsend.cpp b/src/net/mcast/trial_programs/mcastsend.cpp new file mode 100644 index 0000000..f78a9ce --- /dev/null +++ b/src/net/mcast/trial_programs/mcastsend.cpp @@ -0,0 +1,122 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/mcastsend.cpp +# - Send a file via the PVS Mcast protocol +# ----------------------------------------------------------------------------- +*/ + +#include <iostream> + +#include <QCoreApplication> +#include <QFile> +#include <QStringList> +#include <QTimer> + +#include <src/net/mcast/McastSender.h> +#include "mcastsend.h" +#include "McastConfigArgParser.h" +#include "../McastConstants.h" +#include "../McastConfiguration.h" + +using namespace std; + +int +main(int argc, char**argv) +{ + QCoreApplication app(argc, argv); + McastSend me; + + QTimer::singleShot(0, &me, SLOT(run())); + + return app.exec(); +} + +void McastSend::run() +{ + QStringList args = QCoreApplication::arguments(); + QStringList::iterator i = args.begin(); + QStringList::iterator const end = args.end(); + QString filename(""); + McastConfiguration config; + + ++i; + while(i != end) + { + // parse command line arguments + + QString arg = *i; + + cerr << "Arg: " << arg.toLatin1().constData() << endl; + + if (arg == "-file") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + QCoreApplication::exit(1); + return; + } + filename = *i; + } + else if (arg == "-help") + { + cerr << "Options:" << endl << endl + << " -file <FILE> Send FILE to the listeners" << endl + << " -addr <ADDR> Use ADDR as address specification" << endl + << " -dport <PORT> Send to port PORT" << endl + << " -sport <PORT> Send from port PORT" << endl + << " -mtu <BYTES> Set MTU to BYTES" << endl + << " -rate <BYTES> Send BYTES per second" << endl + << " -winsize <SECONDS> Set Window Size to SECONDS" << endl + << " -udp Use UDP encapsulation" << endl + << " -udp-port PORT Use UDP port PORT" << endl; + QCoreApplication::quit(); + return; + } + else + { + if (!parseMcastConfigArg(i, end, &config)) + { + cerr << "Unknown command line argument: " << arg.toLatin1().constData() << endl; + QCoreApplication::exit(1); + return; + } + } + + ++i; + } + + if(filename == "") + { + cerr << "No filename given" << endl; + QCoreApplication::exit(1); + return; + } + + // now, do it. + QFile* file = new QFile(filename); + file->open(QIODevice::ReadOnly); + + McastSender* sender = new McastSender(file, &config, this); + file->setParent(sender); + + connect(sender, SIGNAL(finished()), this, SLOT(finished())); + + QTimer::singleShot(0, sender, SLOT(start())); +} + +void McastSend::finished() +{ + cerr << "finished." << endl; + QCoreApplication::quit(); +} diff --git a/src/net/mcast/trial_programs/mcastsend.h b/src/net/mcast/trial_programs/mcastsend.h new file mode 100644 index 0000000..ae15eb4 --- /dev/null +++ b/src/net/mcast/trial_programs/mcastsend.h @@ -0,0 +1,42 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/mcastsend.cpp +# - Send a file via the PVS Mcast protocol +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTSEND_H_ +#define MCASTSEND_H_ + +#include <QObject> + +#include "../McastSender.h" + +class McastSend : public QObject +{ + Q_OBJECT +public: + McastSend() : + QObject(), + _sender(0) + { + } + +public slots: + void run(); + void finished(); + +private: + McastSender* _sender; +}; + +#endif /* MCASTSEND_H_ */ |
