diff options
-rw-r--r-- | 3rdparty/CMakeLists.txt | 110 | ||||
-rw-r--r-- | CMakeLists.txt | 10 | ||||
-rw-r--r-- | OpenPGMConfig.cmake | 53 | ||||
-rw-r--r-- | src/net/mcast/CMakeLists.txt | 65 | ||||
-rw-r--r-- | src/net/mcast/McastConfiguration.cpp | 9 | ||||
-rw-r--r-- | src/net/mcast/McastConfiguration.h | 159 | ||||
-rw-r--r-- | src/net/mcast/McastConstants.h | 33 | ||||
-rw-r--r-- | src/net/mcast/McastPGMSocket.cpp | 601 | ||||
-rw-r--r-- | src/net/mcast/McastPGMSocket.h | 74 | ||||
-rw-r--r-- | src/net/mcast/McastReceiver.cpp | 138 | ||||
-rw-r--r-- | src/net/mcast/McastReceiver.h | 76 | ||||
-rw-r--r-- | src/net/mcast/McastSender.cpp | 96 | ||||
-rw-r--r-- | src/net/mcast/McastSender.h | 68 | ||||
-rw-r--r-- | src/net/mcast/trial_programs/CMakeLists.txt | 38 | ||||
-rw-r--r-- | src/net/mcast/trial_programs/McastConfigArgParser.cpp | 151 | ||||
-rw-r--r-- | src/net/mcast/trial_programs/McastConfigArgParser.h | 26 | ||||
-rw-r--r-- | src/net/mcast/trial_programs/mcastreceive.cpp | 150 | ||||
-rw-r--r-- | src/net/mcast/trial_programs/mcastreceive.h | 44 | ||||
-rw-r--r-- | src/net/mcast/trial_programs/mcastsend.cpp | 123 | ||||
-rw-r--r-- | src/net/mcast/trial_programs/mcastsend.h | 42 |
20 files changed, 2008 insertions, 58 deletions
diff --git a/3rdparty/CMakeLists.txt b/3rdparty/CMakeLists.txt index 84f366b..8a4cea0 100644 --- a/3rdparty/CMakeLists.txt +++ b/3rdparty/CMakeLists.txt @@ -2,15 +2,9 @@ # Build OpenPGM ################################################################################ -# We need GLib -INCLUDE(${CMAKE_ROOT}/Modules/FindPkgConfig.cmake) +INCLUDE(../OpenPGMConfig.cmake) -PKG_CHECK_MODULES(GLIB glib-2.0>=2.10) -IF(NOT GLIB_FOUND) - MESSAGE(FATAL_ERROR "You don't seem to have GLib2 installed.") -ELSE(NOT GLIB_FOUND) - MESSAGE("-- GLib2 found. Libraries: ${GLIB_LIBRARIES}, CFLAGS: ${GLIB_CFLAGS}") -ENDIF(NOT GLIB_FOUND) +ADD_DEFINITIONS(${LIBPGM_CFLAGS}) # Set up build SET(pgm_VERSION @@ -19,7 +13,7 @@ SET(pgm_VERSION # OpenPGM will be built in the binary tree SET(pgm - ${CMAKE_CURRENT_BINARY_DIR}/libpgm-${pgm_VERSION}/openpgm/pgm + ${CMAKE_CURRENT_BINARY_DIR}/libpgm-src/openpgm/pgm ) # This has been adapted from SConscript.libpgm @@ -66,6 +60,35 @@ SET(pgm_SRCS ${pgm}/histogram.c ) +SET(pgm_HDRS + ${pgm}/include/pgm/atomic.h + ${pgm}/include/pgm/backtrace.h + ${pgm}/include/pgm/engine.h + ${pgm}/include/pgm/error.h + ${pgm}/include/pgm/gsi.h + ${pgm}/include/pgm/http.h + ${pgm}/include/pgm/if.h + ${pgm}/include/pgm/list.h + ${pgm}/include/pgm/log.h + ${pgm}/include/pgm/macros.h + ${pgm}/include/pgm/mem.h + ${pgm}/include/pgm/messages.h + ${pgm}/include/pgm/msgv.h + ${pgm}/include/pgm/packet.h + ${pgm}/include/pgm/pgm.h + ${pgm}/include/pgm/signal.h + ${pgm}/include/pgm/skbuff.h + ${pgm}/include/pgm/snmp.h + ${pgm}/include/pgm/socket.h + ${pgm}/include/pgm/time.h + ${pgm}/include/pgm/tsi.h + ${pgm}/include/pgm/types.h + ${pgm}/include/pgm/version.h + ${pgm}/include/pgm/winint.h + ${pgm}/include/pgm/wininttypes.h +) + + SET(pgm_GENERATED ${CMAKE_CURRENT_BINARY_DIR}/version.c ${CMAKE_CURRENT_BINARY_DIR}/galois_tables.c @@ -86,10 +109,24 @@ ADD_CUSTOM_COMMAND(OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/version.c # ... and we need to unpack the tree. ADD_CUSTOM_COMMAND( OUTPUT - ${pgm_SRCS} + ${pgm_SRCS} + ${pgm_HDRS} ${pgm}/galois_generator.pl ${pgm}/version_generator.py - COMMAND bzip2 -dc ${CMAKE_CURRENT_SOURCE_DIR}/libpgm-${pgm_VERSION}.tar.bz2 | tar -C ${CMAKE_CURRENT_BINARY_DIR} -x + DEPENDS + "${CMAKE_CURRENT_SOURCE_DIR}/libpgm-${pgm_VERSION}.tar.bz2" + COMMAND rm -rf "${CMAKE_CURRENT_BINARY_DIR}/libpgm-src" + COMMAND bzip2 -dc "${CMAKE_CURRENT_SOURCE_DIR}/libpgm-${pgm_VERSION}.tar.bz2" | tar -C "${CMAKE_CURRENT_BINARY_DIR}" -x + COMMAND mv "${CMAKE_CURRENT_BINARY_DIR}/libpgm-${pgm_VERSION}" "${CMAKE_CURRENT_BINARY_DIR}/libpgm-src" +) + +ADD_CUSTOM_COMMAND(TARGET clean + COMMAND rm -rf "${CMAKE_CURRENT_BINARY_DIR}/libpgm-src" +) + +ADD_CUSTOM_TARGET( + unpack_libpgm + DEPENDS ${pgm_SRCS} ${pgm_HDRS} ) INCLUDE_DIRECTORIES( @@ -101,9 +138,14 @@ ADD_LIBRARY( pgm STATIC ${pgm_SRCS} + ${pgm_HDRS} ${pgm_GENERATED} ) +TARGET_LINK_LIBRARIES(pgm + ${LIBPGM_LIBRARIES} +) + LINK_DIRECTORIES( ${GLIB_LIBRARY_DIRS} ) @@ -112,49 +154,3 @@ ADD_DEFINITIONS( ${GLIB_CFLAGS} ) -IF(UNIX) - IF(CMAKE_COMPILER_IS_GNUCC) - # The scripts are fine for Linux/GCC, other platforms may or may - # not work. - ADD_DEFINITIONS( - -std=gnu99 - -D_XOPEN_SOURCE=600 - -D_BSD_SOURCE - -D_REENTRANT - -DCONFIG_HAVE_GETPROTOBYNAME_R2 - -DCONFIG_HAVE_ISO_VARARGS - -DCONFIG_HAVE_ALLOCA_H - -DCONFIG_16BIT_CHECKSUM - -DCONFIG_HAVE_PROC - -DCONFIG_HAVE_BACKTRACE - -DCONFIG_HAVE_PSELECT - -DCONFIG_HAVE_RTC - -DCONFIG_HAVE_TSC - -DCONFIG_HAVE_HPET - -DCONFIG_HAVE_POLL - -DCONFIG_HAVE_EPOLL - -DCONFIG_HAVE_GETIFADDRS - -DCONFIG_HAVE_IFR_NETMASK - -DCONFIG_HAVE_MCAST_JOIN - -DCONFIG_HAVE_IP_MREQN - -DCONFIG_HAVE_SPRINTF_GROUPING - -DCONFIG_HAVE_VASPRINTF - -DCONFIG_HAVE_DSO_VISIBILITY - -DCONFIG_BIND_INADDR_ANY - -DCONFIG_GALOIS_MUL_LUT - -DCONFIG_HAVE_GETOPT - ) - - TARGET_LINK_LIBRARIES(pgm - m rt - ${GLIB_LIBRARIES}) - - SET(_SYSTEM_SPECIFICS_SET 1) - ENDIF(CMAKE_COMPILER_IS_GNUCC) -ENDIF(UNIX) - -# Complain if this is NOT Linux/GCC. -IF(NOT _SYSTEM_SPECIFICS_SET) - MESSAGE(FATAL_ERROR "Can only build libpgm on Unix with gcc.") -ENDIF(NOT _SYSTEM_SPECIFICS_SET) - diff --git a/CMakeLists.txt b/CMakeLists.txt index 5e63636..8eb1961 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,6 +10,8 @@ CMAKE_MINIMUM_REQUIRED( VERSION 2.6.2 ) # set compiler optimizations for debug and release SET(CMAKE_BUILD_TYPE Debug) +SET(CMAKE_C_FLAGS_DEBUG "-O0 -g -Wall") +SET(CMAKE_C_FLAGS_RELEASE "-O3 -march=native") SET(CMAKE_CXX_FLAGS_DEBUG "-O0 -g -Wall") SET(CMAKE_CXX_FLAGS_RELEASE "-O3 -march=native") # -DQT_NO_DEBUG_OUTPUT") @@ -38,6 +40,12 @@ INCLUDE_DIRECTORIES( ADD_SUBDIRECTORY(3rdparty) ################################################################################ +# Common multicast transfer module +################################################################################ + +ADD_SUBDIRECTORY(src/net/mcast) + +################################################################################ # Variables ################################################################################ @@ -338,7 +346,7 @@ INSTALL( PROGRAMS misc/pvs-vncsrv DESTINATION bin) INSTALL( FILES ${CMAKE_BINARY_DIR}/org.openslx.pvs.service DESTINATION share/dbus-1/services ) # add package target to our makefile -SET( CPACK_GENERATOR "DEB" ) +SET( CPACK_GENERATOR "DEB;RPM" ) SET( CPACK_SET_DESTDIR "ON" ) SET( CPACK_PACKAGE_NAME "pvs" ) SET( CPACK_PACKAGE_VERSION_MAJOR "2" ) diff --git a/OpenPGMConfig.cmake b/OpenPGMConfig.cmake new file mode 100644 index 0000000..074e65b --- /dev/null +++ b/OpenPGMConfig.cmake @@ -0,0 +1,53 @@ +INCLUDE(${CMAKE_ROOT}/Modules/FindPkgConfig.cmake) + +PKG_CHECK_MODULES(GLIB glib-2.0>=2.10) +IF(NOT GLIB_FOUND) + MESSAGE(FATAL_ERROR "You don't seem to have GLib2 installed.") +ENDIF(NOT GLIB_FOUND) + +IF(UNIX) + IF(CMAKE_COMPILER_IS_GNUCC) + # The scripts are fine for Linux/GCC, other platforms may or may + # not work. + SET(LIBPGM_CFLAGS + -std=gnu99 + -D_XOPEN_SOURCE=600 + -D_BSD_SOURCE + -D_REENTRANT + -DCONFIG_HAVE_GETPROTOBYNAME_R2 + -DCONFIG_HAVE_ISO_VARARGS + -DCONFIG_HAVE_ALLOCA_H + -DCONFIG_16BIT_CHECKSUM + -DCONFIG_HAVE_PROC + -DCONFIG_HAVE_BACKTRACE + -DCONFIG_HAVE_PSELECT + -DCONFIG_HAVE_RTC + -DCONFIG_HAVE_TSC + -DCONFIG_HAVE_HPET + -DCONFIG_HAVE_POLL + -DCONFIG_HAVE_EPOLL + -DCONFIG_HAVE_GETIFADDRS + -DCONFIG_HAVE_IFR_NETMASK + -DCONFIG_HAVE_MCAST_JOIN + -DCONFIG_HAVE_IP_MREQN + -DCONFIG_HAVE_SPRINTF_GROUPING + -DCONFIG_HAVE_VASPRINTF + -DCONFIG_HAVE_DSO_VISIBILITY + -DCONFIG_BIND_INADDR_ANY + -DCONFIG_GALOIS_MUL_LUT + -DCONFIG_HAVE_GETOPT + ) + + SET(LIBPGM_LIBRARIES + m rt + ${GLIB_LIBRARIES}) + + SET(_SYSTEM_SPECIFICS_SET 1) + ENDIF(CMAKE_COMPILER_IS_GNUCC) +ENDIF(UNIX) + +# Complain if this is NOT Linux/GCC. +IF(NOT _SYSTEM_SPECIFICS_SET) + MESSAGE(FATAL_ERROR "Can only build libpgm on Unix with gcc.") +ENDIF(NOT _SYSTEM_SPECIFICS_SET) + diff --git a/src/net/mcast/CMakeLists.txt b/src/net/mcast/CMakeLists.txt new file mode 100644 index 0000000..e418c64 --- /dev/null +++ b/src/net/mcast/CMakeLists.txt @@ -0,0 +1,65 @@ +INCLUDE(../../../OpenPGMConfig.cmake) + +ADD_DEFINITIONS( + ${LIBPGM_CFLAGS} + -D__STDC_CONSTANT_MACROS + -D__STDC_LIMIT_MACROS +) + +# OpenPGM uses the C99 restrict keyword which g++ does not recognize: +IF(CMAKE_COMPILER_IS_GNUCXX) + ADD_DEFINITIONS(-Drestrict=__restrict__) +ENDIF(CMAKE_COMPILER_IS_GNUCXX) + +INCLUDE(${QT_USE_FILE}) + +SET(pvsmcast_MOC_HDRS + McastConfiguration.h + McastPGMSocket.h + McastReceiver.h + McastSender.h +) + +SET(pvsmcast_HDRS + McastConfiguration.h + McastPGMSocket.h + McastReceiver.h + McastSender.h +) + +SET(pvsmcast_SRCS + McastConfiguration.cpp + McastPGMSocket.cpp + McastReceiver.cpp + McastSender.cpp +) + +INCLUDE_DIRECTORIES( + ${CMAKE_BINARY_DIR}/3rdparty/libpgm-src/openpgm/pgm/include +) + +QT4_WRAP_CPP( + pvsmcast_MOC_SRCS + ${pvsmcast_MOC_HDRS} +) + +SET_SOURCE_FILES_PROPERTIES(${pvsmcast_SRCS} ${pvsmcast_MOC_SRCS} + PROPERTIES + OBJECT_DEPENDS "3rdparty/libpgm.a" # Make sure libpgm gets unpacked before building C++ files +) + +ADD_LIBRARY( + pvsmcast + STATIC + ${pvsmcast_HDRS} + ${pvsmcast_SRCS} + ${pvsmcast_MOC_SRCS} +) + +TARGET_LINK_LIBRARIES( + pvsmcast + pgm + ${QT_LIBRARIES} +) + +ADD_SUBDIRECTORY(trial_programs) diff --git a/src/net/mcast/McastConfiguration.cpp b/src/net/mcast/McastConfiguration.cpp new file mode 100644 index 0000000..1a1c0a8 --- /dev/null +++ b/src/net/mcast/McastConfiguration.cpp @@ -0,0 +1,9 @@ +/* + * McastConfiguration.cpp + * + * Created on: Jul 10, 2010 + * Author: brs + */ + +#include "McastConfiguration.h" + diff --git a/src/net/mcast/McastConfiguration.h b/src/net/mcast/McastConfiguration.h new file mode 100644 index 0000000..a609ce1 --- /dev/null +++ b/src/net/mcast/McastConfiguration.h @@ -0,0 +1,159 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastConfiguration.h +# - hold Multicast protocol configuration data +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTCONFIGURATION_H_ +#define MCASTCONFIGURATION_H_ + +#include <QObject> +#include <QString> +#include <QtGlobal> + +#include "McastConstants.h" + +class McastConfiguration: public QObject +{ +Q_OBJECT +public: + McastConfiguration(QObject* parent = 0) : + QObject(parent), + _multicastAddress(DEFAULT_MULTICAST_ADDRESS), + _multicastRate(DEFAULT_MULTICAST_RATE), + _multicastSPort(DEFAULT_MULTICAST_SPORT), + _multicastDPort(DEFAULT_MULTICAST_DPORT), + _multicastWinSize(DEFAULT_MULTICAST_WSIZ), + _multicastMTU(DEFAULT_MULTICAST_MTU), + _multicastUDPPort(DEFAULT_MULTICAST_UDPPORT), + _multicastUseUDP(false) + { + } + + McastConfiguration(McastConfiguration const& other) : + QObject(), + _multicastAddress(other._multicastAddress), + _multicastRate(other._multicastRate), + _multicastSPort(other._multicastSPort), + _multicastDPort(other._multicastDPort), + _multicastWinSize(other._multicastWinSize), + _multicastMTU(other._multicastMTU), + _multicastUDPPort(other._multicastUDPPort), + _multicastUseUDP(other._multicastUseUDP) + { + } + + virtual ~McastConfiguration() + { + } + + QString multicastAddress() const + { + return _multicastAddress; + } + McastConfiguration* multicastAddress(QString const& address) + { + _multicastAddress = address; + return this; + } + + quint16 multicastSPort() const + { + return _multicastSPort; + } + McastConfiguration* multicastSPort(quint16 port) + { + _multicastSPort = port; + return this; + } + + quint16 multicastDPort() const + { + return _multicastDPort; + } + McastConfiguration* multicastDPort(quint16 port) + { + _multicastDPort = port; + return this; + } + + quint32 multicastRate() const + { + return _multicastRate; + } + McastConfiguration* multicastRate(quint32 rate) + { + _multicastRate = rate; + return this; + } + + quint16 multicastWinSize() const + { + return _multicastWinSize; + } + McastConfiguration* multicastWinSize(quint16 size) + { + _multicastWinSize = size; + return this; + } + + quint16 multicastMTU() const + { + return _multicastMTU; + } + McastConfiguration* multicastMTU(quint16 mtu) + { + _multicastMTU = mtu; + return this; + } + + bool multicastUseUDP() const + { + return _multicastUseUDP; + } + McastConfiguration* multicastUseUDP(bool useUDP) + { + _multicastUseUDP = useUDP; + return this; + } + + bool multicastUDPPort() const + { + return _multicastUDPPort; + } + McastConfiguration* multicastUDPPort(quint16 port) + { + _multicastUDPPort = port; + return this; + } + + void commit() + { + emit changed(); + } + +signals: + void changed(); + +private: + QString _multicastAddress; + quint32 _multicastRate; + quint16 _multicastSPort; + quint16 _multicastDPort; + quint16 _multicastWinSize; + quint16 _multicastMTU; + quint16 _multicastUDPPort; + bool _multicastUseUDP; +}; + +#endif /* MCASTCONFIGURATION_H_ */ diff --git a/src/net/mcast/McastConstants.h b/src/net/mcast/McastConstants.h new file mode 100644 index 0000000..712a0d5 --- /dev/null +++ b/src/net/mcast/McastConstants.h @@ -0,0 +1,33 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastMagic.h +# - Specify the magic numbers for the McastFT protocol +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTMAGIC_H_ +#define MCASTMAGIC_H_ + +#include <stdint.h> + +#define MCASTFT_MAGIC UINT64_C(0x6d60ad83825fb7f9) +#define DEFAULT_MULTICAST_ADDRESS ";239.255.220.207" +#define DEFAULT_MULTICAST_SPORT 6964 +#define DEFAULT_MULTICAST_DPORT 6965 +#define DEFAULT_MULTICAST_UDPPORT 6966 +#define DEFAULT_MULTICAST_RATE (100*1024) +#define DEFAULT_MULTICAST_WSIZ 3000 +#define DEFAULT_MULTICAST_MTU 1400 +#define DEFAULT_MULTICAST_APDU 1200 +#define DEFAULT_MULTICAST_CHUNK 1024 + +#endif /* MCASTMAGIC_H_ */ diff --git a/src/net/mcast/McastPGMSocket.cpp b/src/net/mcast/McastPGMSocket.cpp new file mode 100644 index 0000000..0d6b694 --- /dev/null +++ b/src/net/mcast/McastPGMSocket.cpp @@ -0,0 +1,601 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastPGMSocket.cpp +# - wrap OpenPGM Sockets in a nicer interface -- implementation +# ----------------------------------------------------------------------------- +*/ + +#include <cstdlib> + +#include <QByteArray> +#include <QtDebug> +#include <QPointer> +#include <QSocketNotifier> +#include <QTimer> + +#include <pgm/pgm.h> +// #include <stdint.h> +// #define SIZE_MAX UINT64_MAX +// #include <impl/socket.h> +// pgm redefined bool to int. Undo that. +#undef bool + +#include <sys/poll.h> + +#include "McastPGMSocket.h" + +class McastPGMSocket_priv +{ +public: + McastPGMSocket_priv() : + socket(0), + recv_notif(0), + repair_notif(0), + pending_notif(0), + send_notif(0) + { + } + ~McastPGMSocket_priv() + { + if (socket) + pgm_close(socket, 0); + if (recv_notif) + delete recv_notif; + if (repair_notif) + delete repair_notif; + if (pending_notif) + delete pending_notif; + if (send_notif) + delete send_notif; + } + + pgm_sock_t* socket; + McastPGMSocket::Direction direction; + QSocketNotifier* recv_notif; + QSocketNotifier* repair_notif; + QSocketNotifier* pending_notif; + QSocketNotifier* send_notif; + + QSocketNotifier* notifier_for(int fd) { + if (recv_notif && (fd == recv_notif->socket())) + { + return recv_notif; + } + else if (repair_notif && (fd == repair_notif->socket())) + { + return repair_notif; + } + else if (pending_notif && (fd == pending_notif->socket())) + { + return pending_notif; + } + return 0; + } +}; + +static void _ensurePGMInited() +{ + if (!pgm_supported()) + { + pgm_error_t* err; + int good = pgm_init(&err); + if (!good) + { + qCritical() << "Could not init OpenPGM library: PGM Error: " << (err->message ? err->message : "(null)"); + std::exit(1); + } + } +} + +McastPGMSocket::McastPGMSocket(QObject* parent) : + QObject(parent), + _priv(new McastPGMSocket_priv), + _finished(false), + _nakTimeout(new QTimer()), + _dataTimeout(new QTimer()), + _sendTimeout(new QTimer()) +{ + _ensurePGMInited(); + + _nakTimeout->setSingleShot(true); + _dataTimeout->setSingleShot(true); + _sendTimeout->setSingleShot(true); + + connect(_nakTimeout, SIGNAL(timeout()), this, SLOT(handleNakTimeout())); + connect(_dataTimeout, SIGNAL(timeout()), this, SLOT(handleDataTimeout())); + connect(_sendTimeout, SIGNAL(timeout()), this, SLOT(canSend())); +} + +McastPGMSocket::~McastPGMSocket() +{ + delete _priv; + delete _nakTimeout; + delete _dataTimeout; + delete _sendTimeout; +} + +bool McastPGMSocket::open(McastConfiguration const* config, Direction direction) +{ + _priv->direction = direction; + + pgm_error_t* err = 0; + int good; + + pgm_addrinfo_t* addrinfo; + // parse the address string + good = pgm_getaddrinfo(config->multicastAddress().toLatin1().constData(), + 0, &addrinfo, &err); + if (!good) + { + qCritical() << "Could not parse address info: PGM Error: " + << err->message; + } + + sa_family_t family = addrinfo->ai_send_addrs[0].gsr_group.ss_family; + + good + = pgm_socket(&_priv->socket, family, SOCK_SEQPACKET, IPPROTO_PGM, &err); + if (!good) + { + qCritical() << "Could not open socket: PGM Error: " << err->message; + pgm_error_free(err); + return false; + } + + unsigned const ambient_spm = 4096 * 1000; // every four seconds (approx.) + + // set parameters + if (direction == PSOCK_WRITE) + { + // write-only socket + const int send_only = 1, + spm_heartbeat[] = + { 16 * 1000, 16 * 1000, 16 * 1000, 16 * 1000, 32 * 1000, 64 * 1000, 128 + * 1000, 256 * 1000, 512 * 1000, 1024 * 1000, 2048 * 1000, 4096 + * 1000 }, + max_rate = config->multicastRate(), + max_window = config->multicastWinSize(); + // const int max_window_sqns = 3000; + + pgm_setsockopt(_priv->socket, PGM_SEND_ONLY, &send_only, + sizeof(send_only)); + + // SPM messages + pgm_setsockopt(_priv->socket, PGM_AMBIENT_SPM, &ambient_spm, sizeof(ambient_spm)); + pgm_setsockopt(_priv->socket, PGM_HEARTBEAT_SPM, &spm_heartbeat, sizeof(spm_heartbeat)); + + // Transmit window + pgm_setsockopt(_priv->socket, PGM_TXW_MAX_RTE, &max_rate, sizeof(max_rate)); + // pgm_setsockopt(_priv->socket, PGM_TXW_SECS, &max_window, sizeof(max_window)); + pgm_setsockopt(_priv->socket, PGM_TXW_SQNS, &max_window, sizeof(max_window)); + } + else + { + // readonly + const int recv_only = 1, + passive = 0, + max_window = config->multicastWinSize(), + max_winsqns = 0, + peer_expiry = ambient_spm * 5, + spmr_expiry = 250 * 1000, + nak_bo_ivl = 500 * 1000, + nak_rpt_ivl = 500 * 1000, + nak_rdata_ivl = 2000 * 1000, + nak_data_retries = 50, + nak_ncf_retries = 50; + pgm_setsockopt(_priv->socket, PGM_RECV_ONLY, &recv_only, sizeof(recv_only)); + pgm_setsockopt(_priv->socket, PGM_PASSIVE, &passive, sizeof(passive)); + // pgm_setsockopt(_priv->socket, PGM_RXW_SECS, &max_window, sizeof(max_window)); + pgm_setsockopt(_priv->socket, PGM_RXW_SQNS, &max_window, sizeof(max_window)); + pgm_setsockopt(_priv->socket, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry)); + pgm_setsockopt(_priv->socket, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry)); + pgm_setsockopt(_priv->socket, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl)); + pgm_setsockopt(_priv->socket, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl)); + pgm_setsockopt(_priv->socket, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl)); + pgm_setsockopt(_priv->socket, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries)); + pgm_setsockopt(_priv->socket, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries)); + } + + // MTU + int const mtu = config->multicastMTU(); + pgm_setsockopt(_priv->socket, PGM_MTU, &mtu, sizeof(mtu)); + + pgm_sockaddr_t addr; + addr.sa_addr.sport = config->multicastSPort(); + addr.sa_port = config->multicastDPort(); + good = pgm_gsi_create_from_hostname(&addr.sa_addr.gsi, &err); + if (!good) + { + qCritical() << "Could not generate a GSI: PGM Error: " << err->message; + pgm_error_free(err); + return false; + } + + good = pgm_bind3(_priv->socket, &addr, sizeof(addr), (struct group_req*)&addrinfo->ai_send_addrs[0], sizeof(struct group_req), (struct group_req*)&addrinfo->ai_recv_addrs[0], sizeof(struct group_req), &err); + if (!good) + { + qCritical() << "Could not bind socket: PGM Error: " << err->message; + pgm_error_free(err); + return false; + } + + // qDebug() << "Max APDU is " << _priv->socket->max_apdu; + // qDebug() << "Max TPDU is " << _priv->socket->max_tpdu; + // qDebug() << "Max TSDU Fragment is " << _priv->socket->max_tsdu_fragment; + // qDebug() << "TXW_SQNS is " << _priv->socket->txw_sqns; + + // join the group + for (unsigned i = 0; i < addrinfo->ai_recv_addrs_len; i++) + { + pgm_setsockopt(_priv->socket, PGM_JOIN_GROUP, + &addrinfo->ai_recv_addrs[i], sizeof(struct group_req)); + } + + // set send address + pgm_setsockopt(_priv->socket, PGM_SEND_GROUP, &addrinfo->ai_send_addrs[0], + sizeof(struct group_req)); + + // IP parameters + const int nonblocking = 1, multicast_loop = 0, multicast_hops = 16; + pgm_setsockopt(_priv->socket, PGM_MULTICAST_LOOP, &multicast_loop, + sizeof(multicast_loop)); + pgm_setsockopt(_priv->socket, PGM_MULTICAST_HOPS, &multicast_hops, + sizeof(multicast_hops)); + pgm_setsockopt(_priv->socket, PGM_NOBLOCK, &nonblocking, + sizeof(nonblocking)); + + good = pgm_connect(_priv->socket, &err); + if (!good) + { + qCritical() << "Could not connect socket: PGM Error: " << err->message; + pgm_error_free(err); + return false; + } + + setupNotifiers(); + + pgm_freeaddrinfo(addrinfo); + + return true; +} + +void McastPGMSocket::setupNotifiers() +{ + int recv_sock, repair_sock, pending_sock; + char const* slotname = (_priv->direction == PSOCK_WRITE) ? SLOT(handleNak(int)) : SLOT(handleData(int)); + + pgm_getsockopt(_priv->socket, PGM_RECV_SOCK, &recv_sock, sizeof(recv_sock)); + _priv->recv_notif = new QSocketNotifier(recv_sock, QSocketNotifier::Read, + this); + connect(_priv->recv_notif, SIGNAL(activated(int)), this, slotname); + + pgm_getsockopt(_priv->socket, PGM_REPAIR_SOCK, &repair_sock, sizeof(repair_sock)); + _priv->repair_notif = new QSocketNotifier(repair_sock, + QSocketNotifier::Read, this); + connect(_priv->repair_notif, SIGNAL(activated(int)), this, slotname); + + pgm_getsockopt(_priv->socket, PGM_PENDING_SOCK, &pending_sock, sizeof(pending_sock)); + _priv->pending_notif = new QSocketNotifier(pending_sock, + QSocketNotifier::Read, this); + connect(_priv->pending_notif, SIGNAL(activated(int)), this, slotname); + + if(_priv->direction == PSOCK_WRITE) + { + struct pollfd pfd; + int nfds = 1; + pgm_poll_info(_priv->socket, &pfd, &nfds, POLLOUT); + _priv->send_notif = new QSocketNotifier(pfd.fd, QSocketNotifier::Write, this); + connect(_priv->send_notif, SIGNAL(activated(int)), this, SLOT(canSend())); + } +} + +void McastPGMSocket::handleNak(int fd) +{ + qDebug() << "handleNak(int)"; + + QSocketNotifier* notif = _priv->notifier_for(fd); + notif->setEnabled(false); + + handleNak(); + + notif->setEnabled(true); +} + +void McastPGMSocket::handleNak() +{ + if (_finished) + return; + + qDebug() << "handleNak()"; + + QTimer::singleShot(1000, this, SLOT(handleNakTimeout())); + + // to handle NAKs in OpenPGM, we need to pgm_recv: + char buf[4096]; + pgm_error_t* err = 0; + + int status; + // while we don't block: + do + { + status = pgm_recv(_priv->socket, buf, sizeof(buf), MSG_DONTWAIT, 0, &err); + + if(status == PGM_IO_STATUS_TIMER_PENDING) + { + struct timeval tv; + pgm_getsockopt(_priv->socket, PGM_TIME_REMAIN, &tv, sizeof(tv)); + const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + qDebug() << " timer pending: " << msecs << "ms"; + _nakTimeout->start(msecs); + break; + } + else if(status == PGM_IO_STATUS_RATE_LIMITED) + { + struct timeval tv; + pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, sizeof(tv)); + const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + qDebug() << " rate limited: " << msecs << "ms"; + _nakTimeout->start(msecs); + break; + } + else if(status == PGM_IO_STATUS_WOULD_BLOCK) + { + qDebug() << " wouldblock"; + break; + } + else + { + if(err) + { + qCritical() << "Could not handle NAKs: PGM Error: " << err->message; + pgm_error_free(err); + err = 0; + } + } + } + while (true); +} + +void McastPGMSocket::handleNakTimeout() +{ + qDebug() << "handleNakTimeout()"; + + handleNak(); +} + +void McastPGMSocket::handleData(int fd) +{ + // need to guard against destruction in finish() via signals/slots + QPointer<QSocketNotifier> notif(_priv->notifier_for(fd)); + notif->setEnabled(false); + + handleData(); + + if (notif) + notif->setEnabled(true); +} + +void McastPGMSocket::handleData() +{ + qDebug() << "handleData()"; + + if (_finished) { + qDebug() << " finished!"; + return; + } + + int status; + do + { + char buf[4096]; + size_t size; + pgm_error_t* err; + + status = pgm_recv(_priv->socket, buf, sizeof(buf), MSG_DONTWAIT, &size, &err); + + if (status == PGM_IO_STATUS_NORMAL) + { + qDebug() << " normally received"; + if(size > 0) + { + QByteArray bytes(buf, size); + emit receivedPacket(bytes); + } + } + else if (status == PGM_IO_STATUS_WOULD_BLOCK) + { + qDebug() << " would block"; + // nothing more to do this time + break; + } + else if (status == PGM_IO_STATUS_TIMER_PENDING) + { + struct timeval tv; + pgm_getsockopt(_priv->socket, PGM_TIME_REMAIN, &tv, sizeof(tv)); + const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + qDebug() << " timer pending: " << msecs << "ms"; + _dataTimeout->start(msecs); + break; + } + else if (status == PGM_IO_STATUS_RATE_LIMITED) + { + struct timeval tv; + pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, sizeof(tv)); + const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + qDebug() << " rate limit pending: " << msecs << "ms"; + _dataTimeout->start(msecs); + break; + } + else if (status == PGM_IO_STATUS_RESET) + { + qDebug() << " connection reset"; + emit connectionReset(); + qCritical() << "Connection Reset: PGM Error: " << (err ? err->message : "(null)"); + break; + } + else if (status == PGM_IO_STATUS_FIN) + { + qDebug() << " connection finished"; + emit connectionFinished(); + break; + } + else + { + if(err) + { + qCritical() << "Could not read packet: PGM Error: " << (err ? err->message: "(null)"); + break; + } + } + + // the socket might have been closed from under us + if (!_priv->socket) + break; + } + while (true); +} + +void McastPGMSocket::handleDataTimeout() +{ + qDebug() << "handleDataTimeout()"; + + handleData(); +} + +void McastPGMSocket::canSend() +{ + if (_finished) + return; + + qDebug() << "canSend()"; + + if (_priv->send_notif) + { + _priv->send_notif->setEnabled(false); + } + + bool reenable = true; + + while (!_q.isEmpty()) + { + int status; + QByteArray const nextPacket(_q.head()); + status = pgm_send(_priv->socket, nextPacket.constData(), nextPacket.size(), 0); + if (status == PGM_IO_STATUS_ERROR || status == PGM_IO_STATUS_RESET) + { + qCritical() << "Could not send packet: PGM Error."; + continue; + } + else if (status == PGM_IO_STATUS_WOULD_BLOCK) + { + qDebug() << " would block"; + break; + } + else if (status == PGM_IO_STATUS_RATE_LIMITED) + { + struct timeval tv; + pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, sizeof(tv)); + const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); + qDebug() << " rate limited:" << msecs << "ms"; + _sendTimeout->start((msecs > 0) ? msecs : 1); + reenable = false; + break; + } + else if (status == PGM_IO_STATUS_NORMAL) + { + qDebug() << " sent"; + _q.dequeue(); + continue; + } + else + { + qCritical() << "Unhandled condition in McastPGMSocket::canSend()"; + } + } + + if (_priv->send_notif && reenable) + { + emit readyToSend(); + + qDebug() << " reenable notifier"; + _priv->send_notif->setEnabled(true); + } +} + +void McastPGMSocket::sendPacket(QByteArray const& bytes) +{ + if(_q.isEmpty()) + { + int status = pgm_send(_priv->socket, bytes.constData(), bytes.size(), 0); + + if (status == PGM_IO_STATUS_ERROR || status == PGM_IO_STATUS_RESET) + { + qCritical() << "Could not send packet: PGM Error."; + return; + } + else if (status == PGM_IO_STATUS_WOULD_BLOCK) + { + _q.enqueue(bytes); + } + else if (status == PGM_IO_STATUS_RATE_LIMITED) + { + _q.enqueue(bytes); + struct timeval tv; + pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, sizeof(tv)); + _dataTimeout->start((tv.tv_sec * 1000) + (tv.tv_usec / 1000)); + } + else if (status == PGM_IO_STATUS_NORMAL) + { + return; + } + else + { + qCritical() << "Unhandled condition in McastPGMSocket::sendPacket()"; + } + } else { + _q.enqueue(bytes); + } +} + +void McastPGMSocket::finish() +{ + if(_priv->pending_notif) + { + delete _priv->pending_notif; + _priv->pending_notif = 0; + } + if(_priv->recv_notif) + { + delete _priv->recv_notif; + _priv->recv_notif = 0; + } + if(_priv->repair_notif) + { + delete _priv->repair_notif; + _priv->repair_notif = 0; + } + if(_priv->send_notif) + { + delete _priv->send_notif; + _priv->send_notif = 0; + } + + pgm_close(_priv->socket, 1); + _priv->socket = 0; + + _finished = true; +} + +bool McastPGMSocket::finished() const +{ + return _finished; +} diff --git a/src/net/mcast/McastPGMSocket.h b/src/net/mcast/McastPGMSocket.h new file mode 100644 index 0000000..b0007a7 --- /dev/null +++ b/src/net/mcast/McastPGMSocket.h @@ -0,0 +1,74 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastPGMSocket.h +# - wrap OpenPGM Sockets in a nicer interface -- interface +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTPGMSOCKET_H_ +#define MCASTPGMSOCKET_H_ + +#include <QByteArray> +#include <QObject> +#include <QQueue> + +#include <src/net/mcast/McastConfiguration.h> + +class McastPGMSocket_priv; +class QTimer; + +class McastPGMSocket : public QObject +{ + Q_OBJECT +public: + enum Direction { + PSOCK_READ, + PSOCK_WRITE + }; + + McastPGMSocket(QObject* parent = 0); + virtual ~McastPGMSocket(); + + bool open(McastConfiguration const* config, Direction direction); + bool finished() const; + +signals: + void readyToSend(); + void receivedPacket(QByteArray const& bytes); + void connectionReset(); + void connectionFinished(); + +public slots: + void sendPacket(QByteArray const& bytes); + void finish(); + +private slots: + void handleNak(int fd); + void handleData(int fd); + void handleNak(); + void handleData(); + void handleNakTimeout(); + void handleDataTimeout(); + void canSend(); + +private: + McastPGMSocket_priv* _priv; + QQueue<QByteArray> _q; + bool _finished; + QTimer* _nakTimeout; + QTimer* _dataTimeout; + QTimer* _sendTimeout; + + void setupNotifiers(); +}; + +#endif /* MCASTPGMSOCKET_H_ */ diff --git a/src/net/mcast/McastReceiver.cpp b/src/net/mcast/McastReceiver.cpp new file mode 100644 index 0000000..7480ac2 --- /dev/null +++ b/src/net/mcast/McastReceiver.cpp @@ -0,0 +1,138 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastReceiver.h +# - implement the receiver-side multicast file transfer protocol -- implementation +# ----------------------------------------------------------------------------- +*/ + +#include <QDataStream> +#include <QtDebug> +#include <QtGlobal> + +#include <pgm/pgm.h> +// OpenPGM #defines bool. This is bad in C++. +#undef bool + +#include "McastConstants.h" +#include "McastReceiver.h" + +McastReceiver::McastReceiver(QIODevice* iodev, McastConfiguration* config, QObject* parent) : + QObject(parent), + _config(config ? new McastConfiguration(*config) : new McastConfiguration()), + _socket(0), + _curoffs(0), + _closed(false), + _hash(QCryptographicHash::Md5), + _iodev(iodev) +{ + _config->setParent(this); +} + +McastReceiver::~McastReceiver() +{ + if (_config) + delete _config; +} + +void McastReceiver::start() +{ + _socket = new McastPGMSocket(this); + connect(_socket, SIGNAL(receivedPacket(QByteArray)), this, SLOT(receivedPacket(QByteArray))); + connect(_socket, SIGNAL(connectionReset()), this, SLOT(connectionReset())); + // connect(_socket, SIGNAL(connectionFinished()), this, SLOT(connectionFinished())); + _socket->open(_config, McastPGMSocket::PSOCK_READ); +} + +void McastReceiver::receivedPacket(QByteArray const& bytes) +{ + if(_closed) + return; + + quint16 checksum_should = qChecksum(bytes.constData(), bytes.size() - 2); + + QDataStream strm(bytes); + strm.setByteOrder(QDataStream::BigEndian); + + // read the packet + quint64 magic; + quint64 offset; + quint16 checksum; + + + strm >> magic; + if(magic != MCASTFT_MAGIC) + { + qCritical() << "Received packet whose magic number does not match. Ignoring."; + return; + } + + strm >> offset; + qDebug() << " Received packet for offset" << offset; + + if (offset == UINT64_C(0xffffffffffffffff)) + { + // this is the end of the data stream. + QByteArray md5; + strm >> md5; + + quint16 fchecksum; + strm >> fchecksum; + + // compare the hash value + if ((fchecksum != checksum_should) || (md5 != _hash.result())) + { + _close(RES_MD5_MISMATCH); + } + else + { + _close(RES_OK); + } + + return; + } + else if (offset != _curoffs) + { + qCritical() << "Packet loss or double delivery. PGM should have prevented this. Bailing out."; + _close(RES_OFFSET_MISMATCH); + return; + } + + QByteArray contents; + strm >> contents; + _curoffs += contents.size(); + + strm >> checksum; + if(checksum != checksum_should) + { + qCritical() << "Checksum does not match. Bailing out."; + _close(RES_CHECKSUM_MISMATCH); + return; + } + + _hash.addData(contents); + + _iodev->write(contents); +} + +void McastReceiver::connectionReset() +{ + _close(RES_CONNECTION_RESET); +} + +void McastReceiver::_close(Result result) +{ + _iodev->close(); + _socket->finish(); + + _closed = true; + emit finished(result); +} diff --git a/src/net/mcast/McastReceiver.h b/src/net/mcast/McastReceiver.h new file mode 100644 index 0000000..38e1219 --- /dev/null +++ b/src/net/mcast/McastReceiver.h @@ -0,0 +1,76 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastReceiver.h +# - implement the receiver-side multicast file transfer protocol -- interface +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTRECEIVER_H_ +#define MCASTRECEIVER_H_ + +#include <QByteArray> +#include <QCryptographicHash> +#include <QIODevice> +#include <QObject> +#include <QtGlobal> + +#include <src/net/mcast/McastConfiguration.h> +#include <src/net/mcast/McastPGMSocket.h> + +class McastReceiver : public QObject +{ + Q_OBJECT +public: + enum Result { + RES_OK, + RES_ABORTED, + RES_OFFSET_MISMATCH, + RES_CHECKSUM_MISMATCH, + RES_MD5_MISMATCH, + RES_CONNECTION_RESET + }; + + McastReceiver(QIODevice* iodev, McastConfiguration* config = 0, QObject* parent = 0); + virtual ~McastReceiver(); + + McastConfiguration* config() + { + return _config; + } + + static inline bool is_error(Result result) + { + return result != RES_OK; + } + +signals: + void finished(int result); + +public slots: + void start(); + +private: + McastConfiguration* _config; + McastPGMSocket* _socket; + quint64 _curoffs; + bool _closed; + QCryptographicHash _hash; + QIODevice* _iodev; + +private slots: + void receivedPacket(QByteArray const& bytes); + void connectionReset(); + + void _close(Result result); +}; + +#endif /* MCASTRECEIVER_H_ */ diff --git a/src/net/mcast/McastSender.cpp b/src/net/mcast/McastSender.cpp new file mode 100644 index 0000000..24a629c --- /dev/null +++ b/src/net/mcast/McastSender.cpp @@ -0,0 +1,96 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastReceiver.h +# - implement the receiver-side multicast file transfer protocol -- implementation +# ----------------------------------------------------------------------------- +*/ + +#include "McastSender.h" +#include "McastConstants.h" + +#include <QDataStream> + +#include <pgm/pgm.h> +// OpenPGM #defines bool. This is bad in C++. +#undef bool + +McastSender::McastSender(QIODevice* iodev, McastConfiguration const* config, QObject* parent) : + QObject(parent), + _config(config ? new McastConfiguration(*config) : new McastConfiguration()), + _socket(0), + _iodev(iodev), + _curoffs(0), + _hash(QCryptographicHash::Md5), + _finished(false) +{ +} + +McastSender::~McastSender() +{ + delete _config; +} + +void McastSender::start() +{ + _socket = new McastPGMSocket(this); + connect(_socket, SIGNAL(readyToSend()), this, SLOT(readyToSend())); + _socket->open(_config, McastPGMSocket::PSOCK_WRITE); +} + +void McastSender::readyToSend() +{ + if(_finished) + return; + + if(_iodev->atEnd()) + { + QByteArray fpdu; + QDataStream strm(&fpdu, QIODevice::WriteOnly); + strm.setByteOrder(QDataStream::BigEndian); + + strm << (quint64)MCASTFT_MAGIC << (quint64)UINT64_C(0xffffffffffffffff) << _hash.result(); + strm << qChecksum(fpdu.constData(), fpdu.size()); + + _socket->sendPacket(fpdu); + // _socket->finish(); + + _finished = true; + + emit finished(); + return; + } + + QByteArray barr(DEFAULT_MULTICAST_APDU, '\0'); + qint64 len_read; + len_read = _iodev->read(barr.data(), barr.capacity()); + barr.resize((int)len_read); + + _hash.addData(barr); + + QByteArray pdu; + QDataStream strm(&pdu, QIODevice::WriteOnly); + strm.setByteOrder(QDataStream::BigEndian); + + strm << (quint64)MCASTFT_MAGIC << _curoffs; + strm << barr; + quint16 checksum = qChecksum(pdu.constData(), pdu.size()); + strm << checksum; + + _curoffs += len_read; + + _socket->sendPacket(pdu); +} + +void McastSender::close() +{ + _socket->finish(); +} diff --git a/src/net/mcast/McastSender.h b/src/net/mcast/McastSender.h new file mode 100644 index 0000000..e713886 --- /dev/null +++ b/src/net/mcast/McastSender.h @@ -0,0 +1,68 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/McastReceiver.h +# - implement the sender-side multicast file transfer protocol -- interface +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTSENDER_H_ +#define MCASTSENDER_H_ + +#include <QCryptographicHash> +#include <QIODevice> +#include <QObject> + +#include "McastConfiguration.h" +#include "McastPGMSocket.h" + +class McastSender : public QObject +{ + Q_OBJECT +public: + McastSender(QIODevice* iodev = 0, McastConfiguration const* config = 0, QObject* parent = 0); + virtual ~McastSender(); + + McastConfiguration* config() + { + return _config; + } + + QIODevice* iodevice() const + { + return _iodev; + } + + void setIODevice(QIODevice* iodevice) + { + _iodev = iodevice; + } + +signals: + void finished(); + +public slots: + void start(); + void close(); + +private slots: + void readyToSend(); + +private: + McastConfiguration* _config; + McastPGMSocket* _socket; + QIODevice* _iodev; + quint64 _curoffs; + QCryptographicHash _hash; + bool _finished; +}; + +#endif /* MCASTSENDER_H_ */ diff --git a/src/net/mcast/trial_programs/CMakeLists.txt b/src/net/mcast/trial_programs/CMakeLists.txt new file mode 100644 index 0000000..d0f68fa --- /dev/null +++ b/src/net/mcast/trial_programs/CMakeLists.txt @@ -0,0 +1,38 @@ +INCLUDE(${QT_USE_FILE}) + +QT4_WRAP_CPP( + mcastsend_MOC + mcastsend.h +) + +QT4_WRAP_CPP( + mcastreceive_MOC + mcastreceive.h +) + +SET(argparser_SRC + McastConfigArgParser.h + McastConfigArgParser.cpp +) + +ADD_EXECUTABLE(mcastsend + mcastsend.cpp + mcastsend.h + ${argparser_SRC} + ${mcastsend_MOC} +) + +ADD_EXECUTABLE(mcastreceive + mcastreceive.cpp + mcastreceive.h + ${argparser_SRC} + ${mcastreceive_MOC} +) + +TARGET_LINK_LIBRARIES(mcastsend + pvsmcast +) + +TARGET_LINK_LIBRARIES(mcastreceive + pvsmcast +)
\ No newline at end of file diff --git a/src/net/mcast/trial_programs/McastConfigArgParser.cpp b/src/net/mcast/trial_programs/McastConfigArgParser.cpp new file mode 100644 index 0000000..8849544 --- /dev/null +++ b/src/net/mcast/trial_programs/McastConfigArgParser.cpp @@ -0,0 +1,151 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/McastConfigArgParser.cpp +# - Parse common Multicast Configuration CLI arguments +# ----------------------------------------------------------------------------- +*/ + +#include <iostream> + +#include <QCoreApplication> + +#include "McastConfigArgParser.h" + +using namespace std; + +bool parseMcastConfigArg(QStringList::iterator& i, QStringList::iterator const& end, McastConfiguration* config) +{ + QString arg = *i; + + if (arg == "-addr") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + config->multicastAddress(*i); + } + else if (arg == "-dport") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint16 dport = (quint16)i->toInt(&ok); + if (!ok) + { + cerr << "Error: dport is not an integer" << endl; + return false; + } + config->multicastDPort(dport); + } + else if (arg == "-sport") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint16 sport = (quint16)i->toInt(&ok); + if (!ok) + { + cerr << "Error: sport is not an integer" << endl; + return false; + } + config->multicastSPort(sport); + } + else if (arg == "-mtu") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint16 mtu = (quint16)i->toInt(&ok); + if (!ok) + { + cerr << "Error: MTU is not an integer" << endl; + return false; + } + config->multicastMTU(mtu); + } + else if (arg == "-rate") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint32 rate = i->toInt(&ok); + if (!ok) + { + cerr << "Error: Rate is not an integer" << endl; + return false; + } + config->multicastRate(rate); + } + else if (arg == "-winsize") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint16 winsize = (quint16)i->toInt(&ok); + if (!ok) + { + cerr << "Error: Winsize is not an integer" << endl; + return false; + } + config->multicastWinSize(winsize); + } + else if (arg == "-udp") + { + config->multicastUseUDP(true); + } + else if (arg == "-udp-port") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + return false; + } + bool ok; + quint16 udpport = (quint16)i->toInt(&ok); + if (!ok) + { + cerr << "Error: UDP-Port is not an integer" << endl; + return false; + } + config->multicastUDPPort(udpport); + } + else + { + return false; + } + + return true; +} diff --git a/src/net/mcast/trial_programs/McastConfigArgParser.h b/src/net/mcast/trial_programs/McastConfigArgParser.h new file mode 100644 index 0000000..4fb18a7 --- /dev/null +++ b/src/net/mcast/trial_programs/McastConfigArgParser.h @@ -0,0 +1,26 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/McastConfigArgParser.h +# - Parse common Multicast Configuration CLI arguments +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTCONFIGARGPARSER_H_ +#define MCASTCONFIGARGPARSER_H_ + +#include <QStringList> + +#include "../McastConfiguration.h" + +bool parseMcastConfigArg(QStringList::iterator& i, QStringList::iterator const& end, McastConfiguration* config); + +#endif /* MCASTCONFIGARGPARSER_H_ */ diff --git a/src/net/mcast/trial_programs/mcastreceive.cpp b/src/net/mcast/trial_programs/mcastreceive.cpp new file mode 100644 index 0000000..48a0f10 --- /dev/null +++ b/src/net/mcast/trial_programs/mcastreceive.cpp @@ -0,0 +1,150 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/mcastsend.cpp +# - Receive a file via the PVS Mcast protocol +# ----------------------------------------------------------------------------- +*/ + +#include <iostream> + +#include <QCoreApplication> +#include <QFile> +#include <QStringList> +#include <QTimer> + +#include "mcastreceive.h" +#include "McastConfigArgParser.h" +#include "../McastConfiguration.h" +#include "../McastReceiver.h" + +using namespace std; + +int main(int argc, char** argv) +{ + QCoreApplication app(argc, argv); + McastReceive me; + + QTimer::singleShot(0, &me, SLOT(run())); + + return app.exec(); +} + +void McastReceive::run() +{ + QStringList args = QCoreApplication::arguments(); + QStringList::iterator i = args.begin(); + QStringList::iterator const end = args.end(); + + QString filename(""); + + McastConfiguration config; + + ++i; + while (i != end) + { + QString arg = *i; + + cerr << "Arg: " << arg.toLatin1().constData() << endl; + + if (arg == "-file") + { + ++i; + if (i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing its argument" << endl; + QCoreApplication::exit(1); + return; + } + filename = *i; + } + else if (arg == "-help") + { + cerr << "Options:" << endl << endl + << " -file <FILE> Receive to file FILE" << endl + << " -addr <ADDR> Use ADDR as address specification" << endl + << " -dport <PORT> Send to port PORT" << endl + << " -sport <PORT> Send from port PORT" << endl + << " -mtu <BYTES> Set MTU to BYTES" << endl + << " -rate <BYTES> Send BYTES per second" << endl + << " -winsize <SECONDS> Set Window Size to SECONDS" << endl + << " -udp Use UDP encapsulation" << endl + << " -udp-port PORT Use UDP port PORT" << endl; + QCoreApplication::quit(); + return; + } + else + { + if (!parseMcastConfigArg(i, end, &config)) + { + cerr << "Unknown argument: " << arg.toLatin1().constData() << endl; + QCoreApplication::exit(1); + return; + } + } + + ++i; + } + + if (filename == "") + { + cerr << "No Filename given" << endl; + QCoreApplication::exit(1); + return; + } + + _target = new QFile(filename, this); + _target->open(QIODevice::WriteOnly); + + McastReceiver* recv = new McastReceiver(_target, &config, this); + + connect(recv, SIGNAL(finished(int)), this, SLOT(finished(int))); + + QTimer::singleShot(0, recv, SLOT(start())); +} + +void McastReceive::finished(int state) +{ + cerr << "finished: "; + + switch(state) + { + case McastReceiver::RES_OK: + cerr << "OK." << endl; + break; + case McastReceiver::RES_ABORTED: + cerr << "Aborted." << endl; + goto failed; + case McastReceiver::RES_CHECKSUM_MISMATCH: + cerr << "Checksum mismatch." << endl; + goto failed; + case McastReceiver::RES_CONNECTION_RESET: + cerr << "Connection reset." << endl; + goto failed; + case McastReceiver::RES_MD5_MISMATCH: + cerr << "MD5 mismatch." << endl; + goto failed; + case McastReceiver::RES_OFFSET_MISMATCH: + cerr << "Offset mismatch. Undetected packet loss?" << endl; + goto failed; + default: + cerr << "Unknown error code!" << endl; + goto failed; + } + + QCoreApplication::quit(); + return; +failed: + cerr << "Deleting file." << endl; + _target->remove(); + QCoreApplication::exit(1); + return; +} diff --git a/src/net/mcast/trial_programs/mcastreceive.h b/src/net/mcast/trial_programs/mcastreceive.h new file mode 100644 index 0000000..3e72d4c --- /dev/null +++ b/src/net/mcast/trial_programs/mcastreceive.h @@ -0,0 +1,44 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/mcastsend.cpp +# - Receive a file via the PVS Mcast protocol +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTRECEIVE_H_ +#define MCASTRECEIVE_H_ + +#include <QObject> + +class QFile; +class McastReceiver; + +class McastReceive : public QObject +{ + Q_OBJECT +public: + McastReceive() : + QObject(), + _receiver(0) + { + } + +public slots: + void run(); + void finished(int state); + +private: + McastReceiver* _receiver; + QFile* _target; +}; + +#endif /* MCASTRECEIVE_H_ */ diff --git a/src/net/mcast/trial_programs/mcastsend.cpp b/src/net/mcast/trial_programs/mcastsend.cpp new file mode 100644 index 0000000..da8ecf4 --- /dev/null +++ b/src/net/mcast/trial_programs/mcastsend.cpp @@ -0,0 +1,123 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/mcastsend.cpp +# - Send a file via the PVS Mcast protocol +# ----------------------------------------------------------------------------- +*/ + +#include <iostream> + +#include <QCoreApplication> +#include <QFile> +#include <QStringList> +#include <QTimer> + +#include <src/net/mcast/McastSender.h> +#include "mcastsend.h" +#include "McastConfigArgParser.h" +#include "../McastConstants.h" +#include "../McastConfiguration.h" + +using namespace std; + +int +main(int argc, char**argv) +{ + QCoreApplication app(argc, argv); + McastSend me; + + QTimer::singleShot(0, &me, SLOT(run())); + + return app.exec(); +} + +void McastSend::run() +{ + QStringList args = QCoreApplication::arguments(); + QStringList::iterator i = args.begin(); + QStringList::iterator const end = args.end(); + QString filename(""); + McastConfiguration config; + + ++i; + while(i != end) + { + // parse command line arguments + + QString arg = *i; + + cerr << "Arg: " << arg.toLatin1().constData() << endl; + + if (arg == "-file") + { + i++; + if(i == end) + { + cerr << "Option " << arg.toLatin1().constData() << " is missing argument" << endl; + QCoreApplication::exit(1); + return; + } + filename = *i; + } + else if (arg == "-help") + { + cerr << "Options:" << endl << endl + << " -file <FILE> Send FILE to the listeners" << endl + << " -addr <ADDR> Use ADDR as address specification" << endl + << " -dport <PORT> Send to port PORT" << endl + << " -sport <PORT> Send from port PORT" << endl + << " -mtu <BYTES> Set MTU to BYTES" << endl + << " -rate <BYTES> Send BYTES per second" << endl + << " -winsize <SECONDS> Set Window Size to SECONDS" << endl + << " -udp Use UDP encapsulation" << endl + << " -udp-port PORT Use UDP port PORT" << endl; + QCoreApplication::quit(); + return; + } + else + { + if (!parseMcastConfigArg(i, end, &config)) + { + cerr << "Unknown command line argument: " << arg.toLatin1().constData() << endl; + QCoreApplication::exit(1); + return; + } + } + + ++i; + } + + if(filename == "") + { + cerr << "No filename given" << endl; + QCoreApplication::exit(1); + return; + } + + // now, do it. + QFile* file = new QFile(filename); + file->open(QIODevice::ReadOnly); + + McastSender* sender = new McastSender(file, &config, this); + file->setParent(sender); + + connect(sender, SIGNAL(finished()), this, SLOT(finished())); + + QTimer::singleShot(0, sender, SLOT(start())); +} + +void McastSend::finished() +{ + cerr << "finished." << endl; + // QTimer::singleShot(30000, QCoreApplication::instance(), SLOT(quit())); + // QCoreApplication::quit(); +} diff --git a/src/net/mcast/trial_programs/mcastsend.h b/src/net/mcast/trial_programs/mcastsend.h new file mode 100644 index 0000000..ae15eb4 --- /dev/null +++ b/src/net/mcast/trial_programs/mcastsend.h @@ -0,0 +1,42 @@ +/* +# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg +# +# This program is free software distributed under the GPL version 2. +# See http://openslx.org/COPYING +# +# If you have any feedback please consult http://openslx.org/feedback and +# send your suggestions, praise, or complaints to feedback@openslx.org +# +# General information about OpenSLX can be found at http://openslx.org/ +# ----------------------------------------------------------------------------- +# src/net/mcast/trial_programs/mcastsend.cpp +# - Send a file via the PVS Mcast protocol +# ----------------------------------------------------------------------------- +*/ + +#ifndef MCASTSEND_H_ +#define MCASTSEND_H_ + +#include <QObject> + +#include "../McastSender.h" + +class McastSend : public QObject +{ + Q_OBJECT +public: + McastSend() : + QObject(), + _sender(0) + { + } + +public slots: + void run(); + void finished(); + +private: + McastSender* _sender; +}; + +#endif /* MCASTSEND_H_ */ |