summaryrefslogtreecommitdiffstats
path: root/src/net/mcast/McastPGMSocket.cpp
diff options
context:
space:
mode:
authorSebastien Braun2010-10-06 00:04:49 +0200
committerSebastien Braun2010-10-06 00:04:49 +0200
commitf07fc3b426815e28fde23313242fbbb998a08d45 (patch)
treeba9eda1a83135a1727d2d35661d6facabee53b95 /src/net/mcast/McastPGMSocket.cpp
parentFix recognition of letters in keyboard handler (diff)
parentMerge remote branch 'openslx/master' into mcastft (diff)
downloadpvs-f07fc3b426815e28fde23313242fbbb998a08d45.tar.gz
pvs-f07fc3b426815e28fde23313242fbbb998a08d45.tar.xz
pvs-f07fc3b426815e28fde23313242fbbb998a08d45.zip
Merge remote branch 'openslx/mcastft' into input
Conflicts: CMakeLists.txt i18n/pvs_ar_JO.ts i18n/pvs_de_DE.ts i18n/pvs_es_MX.ts i18n/pvs_fr_FR.ts i18n/pvs_pl_PL.ts i18n/pvsmgr_ar_JO.ts i18n/pvsmgr_de_DE.ts i18n/pvsmgr_es_MX.ts i18n/pvsmgr_fr_FR.ts i18n/pvsmgr_pl_PL.ts icons/README pvsmgr.qrc src/gui/mainWindow.cpp src/pvs.cpp src/pvs.h src/pvsDaemon.cpp src/util/clientGUIUtils.h
Diffstat (limited to 'src/net/mcast/McastPGMSocket.cpp')
-rw-r--r--src/net/mcast/McastPGMSocket.cpp666
1 files changed, 666 insertions, 0 deletions
diff --git a/src/net/mcast/McastPGMSocket.cpp b/src/net/mcast/McastPGMSocket.cpp
new file mode 100644
index 0000000..731fc13
--- /dev/null
+++ b/src/net/mcast/McastPGMSocket.cpp
@@ -0,0 +1,666 @@
+/*
+# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg
+#
+# This program is free software distributed under the GPL version 2.
+# See http://openslx.org/COPYING
+#
+# If you have any feedback please consult http://openslx.org/feedback and
+# send your suggestions, praise, or complaints to feedback@openslx.org
+#
+# General information about OpenSLX can be found at http://openslx.org/
+# -----------------------------------------------------------------------------
+# src/net/mcast/McastPGMSocket.cpp
+# - wrap OpenPGM Sockets in a nicer interface -- implementation
+# -----------------------------------------------------------------------------
+*/
+
+#include <cstdlib>
+
+#include <sys/poll.h>
+#include <sys/socket.h>
+
+#include <QByteArray>
+#include <QList>
+#include <QtDebug>
+#include <QPointer>
+#include <QSocketNotifier>
+#include <QTimer>
+
+#include <pgm/pgm.h>
+
+#include "McastPGMSocket.h"
+
+using namespace std;
+
+class McastPGMSocket_priv
+{
+public:
+ McastPGMSocket_priv() :
+ socket(0),
+ send_notif(0)
+ {
+ }
+ ~McastPGMSocket_priv()
+ {
+ if (socket)
+ pgm_close(socket, 0);
+ Q_FOREACH(QSocketNotifier* notif, _notifs)
+ {
+ delete notif;
+ }
+ if (send_notif)
+ delete send_notif;
+ }
+
+ pgm_sock_t* socket;
+ McastPGMSocket::Direction direction;
+ QSocketNotifier* send_notif;
+ QList<QSocketNotifier*> _notifs;
+
+ QSocketNotifier* notifier_for(int fd) {
+ Q_FOREACH(QSocketNotifier* notif, _notifs)
+ {
+ if(notif->socket() == fd)
+ {
+ return notif;
+ }
+ }
+ return 0;
+ }
+};
+
+static void _ensurePGMInited()
+{
+ if (!pgm_supported())
+ {
+ pgm_error_t* err;
+ int good = pgm_init(&err);
+ if (!good)
+ {
+ qCritical() << "Could not init OpenPGM library: PGM Error: " << (err->message ? err->message : "(null)");
+ std::exit(1);
+ }
+ }
+}
+
+McastPGMSocket::McastPGMSocket(QObject* parent) :
+ QObject(parent),
+ _priv(new McastPGMSocket_priv),
+ _opened(false),
+ _finished(false),
+ _nakTimeout(new QTimer()),
+ _dataTimeout(new QTimer()),
+ _sendTimeout(new QTimer()),
+ _shutdownTimer(0),
+ _shutdown_timeout(0)
+{
+ _ensurePGMInited();
+
+ _nakTimeout->setSingleShot(true);
+ _dataTimeout->setSingleShot(true);
+ _sendTimeout->setSingleShot(true);
+
+ connect(_nakTimeout, SIGNAL(timeout()), this, SLOT(handleNakTimeout()));
+ connect(_dataTimeout, SIGNAL(timeout()), this, SLOT(handleDataTimeout()));
+ connect(_sendTimeout, SIGNAL(timeout()), this, SLOT(canSend()));
+}
+
+McastPGMSocket::~McastPGMSocket()
+{
+ delete _priv;
+ delete _nakTimeout;
+ delete _dataTimeout;
+ delete _sendTimeout;
+}
+
+bool McastPGMSocket::open(McastConfiguration const* config, Direction direction)
+{
+ _priv->direction = direction;
+
+ pgm_error_t* err = 0;
+ int good;
+
+ pgm_addrinfo_t* addrinfo;
+ // parse the address string
+ good = pgm_getaddrinfo((config->multicastInterface() + ";" + config->multicastAddress()).toLatin1().constData(),
+ 0, &addrinfo, &err);
+ if (!good)
+ {
+ qCritical() << "Could not parse address info: PGM Error: "
+ << err->message;
+ return false;
+ }
+
+ sa_family_t family = addrinfo->ai_send_addrs[0].gsr_group.ss_family;
+
+ if(config->multicastUseUDP())
+ {
+ good = pgm_socket(&_priv->socket, family, SOCK_SEQPACKET, IPPROTO_UDP, &err);
+ }
+ else
+ {
+ good = pgm_socket(&_priv->socket, family, SOCK_SEQPACKET, IPPROTO_PGM, &err);
+ }
+
+ if (!good)
+ {
+ qCritical() << "Could not open socket: PGM Error: " << err->message;
+ pgm_error_free(err);
+ return false;
+ }
+
+ unsigned const ambient_spm = 2000 * 1000; // every one hundred milliseconds (approx.)
+
+ /* Options for sending data */
+ const int spm_heartbeat[] =
+ { 512 * 1000,
+ 1024 * 1000,
+ 2048 * 1000,
+ 4096 * 1000 },
+ max_rate = 0,
+ max_window = config->multicastWinSize() * config->multicastRate() / config->multicastMTU();
+ // const int max_window_sqns = 3000;
+ qDebug() << "Computed window size " << max_window << " packets";
+
+// pgm_setsockopt(_priv->socket, PGM_SEND_ONLY, &send_only,
+// sizeof(send_only));
+
+ // SPM messages
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_AMBIENT_SPM, &ambient_spm, sizeof(ambient_spm));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_HEARTBEAT_SPM, &spm_heartbeat, sizeof(spm_heartbeat));
+
+ // Transmit window
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_TXW_MAX_RTE, &max_rate, sizeof(max_rate));
+// pgm_setsockopt(_priv->socket, PGM_TXW_SECS, &max_window, sizeof(max_window));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_TXW_SQNS, &max_window, sizeof(max_window));
+
+ /* Options for receiving data */
+ const int passive = 0,
+ spmr_expiry = 500 * 1000,
+ nak_bo_ivl = 200 * 1000,
+ nak_rpt_ivl = 500 * 1000,
+ nak_rdata_ivl = 500 * 1000,
+ nak_data_retries = 50,
+ nak_ncf_retries = 50;
+ qDebug() << "Computed window size " << max_window << " packets";
+
+// pgm_setsockopt(_priv->socket, PGM_RECV_ONLY, &recv_only, sizeof(recv_only));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_PASSIVE, &passive, sizeof(passive));
+// pgm_setsockopt(_priv->socket, PGM_RXW_MAX_RTE, &max_rate, sizeof(max_rate));
+// pgm_setsockopt(_priv->socket, PGM_RXW_SECS, &max_window, sizeof(max_window));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_RXW_SQNS, &max_window, sizeof(max_window));
+
+ /* Try using PGMCC */
+ const struct pgm_pgmccinfo_t pgmccinfo = {
+ 100 /* usecs */ * 1000 /* msecs */,
+ 75 /* from OpenPGM examples */,
+ 500 /* from PGMCC internet-draft */
+ };
+ good = pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_USE_PGMCC, &pgmccinfo, sizeof(pgmccinfo));
+ if(!good)
+ {
+ qCritical() << "Could not enable PGMCC";
+ return false;
+ }
+
+// /* Forward Error Correction */
+// const struct pgm_fecinfo_t pgmfecinfo = {
+// 255 /* from OpenPGM examples */,
+// 2 /* send two proactive packets */,
+// 8 /* from OpenPGM examples */,
+// 1 /* enable on-demand parity */,
+// 1 /* enable variable packet length */
+// };
+// good = pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_USE_FEC, &pgmfecinfo, sizeof(pgmfecinfo));
+// if(!good)
+// {
+// qCritical() << "Could not enable FEC";
+// return false;
+// }
+
+ // Peer Expiry: We will give 1 minute.
+ int const peer_expiry = 60 /* seconds */ * 1000000 /* microseconds */;
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry));
+
+ // MTU
+ int const mtu = config->multicastMTU();
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MTU, &mtu, sizeof(mtu));
+
+ pgm_sockaddr_t addr;
+ addr.sa_addr.sport = config->multicastSPort();
+ addr.sa_port = config->multicastDPort();
+ good = pgm_gsi_create_from_hostname(&addr.sa_addr.gsi, &err);
+ if (!good)
+ {
+ qCritical() << "Could not generate a GSI: PGM Error: " << err->message;
+ pgm_error_free(err);
+ return false;
+ }
+
+ struct pgm_interface_req_t ifreq;
+ ifreq.ir_interface = addrinfo->ai_send_addrs[0].gsr_interface;
+ ifreq.ir_scope_id = 0;
+ if (AF_INET6 == family)
+ {
+ ifreq.ir_scope_id = ((struct sockaddr_in6*)&addrinfo->ai_send_addrs[0])->sin6_scope_id;
+ }
+
+ // UDP Encapsulation
+ if(config->multicastUseUDP())
+ {
+ const int uport = config->multicastUDPUPort();
+ const int mport = config->multicastUDPMPort();
+
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, &mport, sizeof(mport));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &uport, sizeof(uport));
+ }
+
+ good = pgm_bind3(_priv->socket, &addr, sizeof(addr), &ifreq , sizeof(ifreq), &ifreq, sizeof(ifreq), &err);
+ if (!good)
+ {
+ qCritical() << "Could not bind socket: PGM Error: " << err->message;
+ pgm_error_free(err);
+ return false;
+ }
+
+ // qDebug() << "Max APDU is " << _priv->socket->max_apdu;
+ // qDebug() << "Max TPDU is " << _priv->socket->max_tpdu;
+ // qDebug() << "Max TSDU Fragment is " << _priv->socket->max_tsdu_fragment;
+ // qDebug() << "TXW_SQNS is " << _priv->socket->txw_sqns;
+
+ // join the group
+ for (unsigned i = 0; i < addrinfo->ai_recv_addrs_len; i++)
+ {
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_JOIN_GROUP,
+ &addrinfo->ai_recv_addrs[i], sizeof(struct group_req));
+ }
+
+ // set send address
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_SEND_GROUP, &addrinfo->ai_send_addrs[0],
+ sizeof(struct group_req));
+
+ // IP parameters
+ const int nonblocking = 1, multicast_loop = 0, multicast_hops = 16;
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MULTICAST_LOOP, &multicast_loop,
+ sizeof(multicast_loop));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_MULTICAST_HOPS, &multicast_hops,
+ sizeof(multicast_hops));
+ pgm_setsockopt(_priv->socket, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking,
+ sizeof(nonblocking));
+
+ good = pgm_connect(_priv->socket, &err);
+ if (!good)
+ {
+ qCritical() << "Could not connect socket: PGM Error: " << err->message;
+ pgm_error_free(err);
+ return false;
+ }
+
+ _opened = true;
+
+ setupNotifiers();
+
+ pgm_freeaddrinfo(addrinfo);
+
+ /* Prime the generation of SPM packets during the waiting period */
+ if(_priv->direction == PSOCK_WRITE)
+ QTimer::singleShot(0, this, SLOT(handleNak()));
+
+ return true;
+}
+
+void McastPGMSocket::setupNotifiers()
+{
+ int recv_sock, repair_sock, pending_sock;
+ char const* slotname = (_priv->direction == PSOCK_WRITE) ? SLOT(handleNak(int)) : SLOT(handleData(int));
+
+ struct pollfd pollin[10];
+ int in_nfds = 10;
+ pgm_poll_info(_priv->socket, pollin, &in_nfds, POLLIN);
+ for(int i = 0; i < in_nfds; i++)
+ {
+ QSocketNotifier* notif = new QSocketNotifier(pollin[i].fd, QSocketNotifier::Read, this);
+ _priv->_notifs.append(notif);
+ connect(notif, SIGNAL(activated(int)), this, slotname);
+ }
+
+ if(_priv->direction == PSOCK_WRITE)
+ {
+ struct pollfd pfd;
+ int nfds = 1;
+ pgm_poll_info(_priv->socket, &pfd, &nfds, POLLOUT);
+ _priv->send_notif = new QSocketNotifier(pfd.fd, QSocketNotifier::Write, this);
+ connect(_priv->send_notif, SIGNAL(activated(int)), this, SLOT(canSend()));
+ }
+}
+
+void McastPGMSocket::handleNak(int fd)
+{
+ qDebug() << "handleNak(" << fd << ")";
+
+ QSocketNotifier* notif = _priv->notifier_for(fd);
+ notif->setEnabled(false);
+
+ if (_shutdownTimer)
+ {
+ _shutdownTimer->start(_shutdown_timeout);
+ qDebug() << "Started shutdown timer";
+ }
+
+ handleNak();
+
+ notif->setEnabled(true);
+}
+
+void McastPGMSocket::handleNak()
+{
+ if (_finished)
+ return;
+
+ // QTimer::singleShot(1000, this, SLOT(handleNakTimeout()));
+
+ // to handle NAKs in OpenPGM, we need to pgm_recv:
+ char buf[4096];
+ pgm_error_t* err = 0;
+
+ int status;
+ // while we don't block:
+ do
+ {
+ status = pgm_recv(_priv->socket, buf, sizeof(buf), MSG_DONTWAIT, 0, &err);
+
+ if(status == PGM_IO_STATUS_TIMER_PENDING)
+ {
+ struct timeval tv;
+ socklen_t size = sizeof(tv);
+ pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &size);
+ const long usecs = tv.tv_sec * 1000000 + tv.tv_usec;
+ int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+ if(msecs == 0)
+ msecs = 1;
+ qDebug() << " timer pending: " << usecs << "us (rounded to " << msecs << "ms)";
+ _nakTimeout->start(msecs);
+ break;
+ }
+ else if(status == PGM_IO_STATUS_RATE_LIMITED)
+ {
+ struct timeval tv;
+ socklen_t size = sizeof(tv);
+ pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size);
+ int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+ if(msecs == 0)
+ msecs = 1;
+ qDebug() << " rate limited: " << msecs << "ms";
+ _nakTimeout->start(msecs);
+ break;
+ }
+ else if(status == PGM_IO_STATUS_WOULD_BLOCK)
+ {
+ qDebug() << " wouldblock";
+ break;
+ }
+ else
+ {
+ if(err)
+ {
+ qCritical() << "Could not handle NAKs: PGM Error: " << err->message;
+ pgm_error_free(err);
+ err = 0;
+ }
+ }
+ }
+ while (true);
+}
+
+void McastPGMSocket::handleNakTimeout()
+{
+ qDebug() << "handleNakTimeout()";
+
+ handleNak();
+}
+
+void McastPGMSocket::handleData(int fd)
+{
+ // need to guard against destruction in finish() via signals/slots
+ QPointer<QSocketNotifier> notif(_priv->notifier_for(fd));
+ notif->setEnabled(false);
+
+ handleData();
+
+ if (notif)
+ notif->setEnabled(true);
+}
+
+void McastPGMSocket::handleData()
+{
+ if (_finished) {
+ return;
+ }
+
+ int status;
+ do
+ {
+ char buf[4096];
+ size_t size;
+ pgm_error_t* err = 0;
+
+ status = pgm_recv(_priv->socket, buf, sizeof(buf), MSG_DONTWAIT, &size, &err);
+
+ if (status == PGM_IO_STATUS_NORMAL)
+ {
+ qDebug() << " normally received";
+ if(size > 0)
+ {
+ QByteArray bytes(buf, size);
+ emit receivedPacket(bytes);
+ }
+ }
+ else if (status == PGM_IO_STATUS_WOULD_BLOCK)
+ {
+ qDebug() << " would block";
+ // nothing more to do this time
+ break;
+ }
+ else if (status == PGM_IO_STATUS_TIMER_PENDING)
+ {
+ struct timeval tv;
+ socklen_t size = sizeof(tv);
+ pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &size);
+ const long usecs = tv.tv_sec * 1000000 + tv.tv_usec;
+ int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+ if(msecs == 0)
+ msecs = 1;
+ qDebug() << " timer pending: " << usecs << "us (rounded to " << msecs << "ms)";
+ _dataTimeout->start(msecs);
+ break;
+ }
+ else if (status == PGM_IO_STATUS_RATE_LIMITED)
+ {
+ struct timeval tv;
+ socklen_t size = sizeof(tv);
+ pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size);
+ int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+ if(msecs == 0)
+ msecs = 1;
+ qDebug() << " rate limit pending: " << msecs << "ms";
+ _dataTimeout->start(msecs);
+ break;
+ }
+ else if (status == PGM_IO_STATUS_RESET)
+ {
+ qDebug() << " connection reset";
+ emit connectionReset();
+ qCritical() << "Connection Reset: PGM Error: " << (err ? err->message : "(null)");
+ pgm_error_free(err);
+ break;
+ }
+ else if (status == PGM_IO_STATUS_FIN)
+ {
+ qDebug() << " connection finished";
+ emit connectionFinished();
+ break;
+ }
+ else
+ {
+ if(err)
+ {
+ qCritical() << "Could not read packet: PGM Error: " << (err ? err->message: "(null)");
+ break;
+ }
+ }
+
+ // the socket might have been closed from under us
+ if (!_priv->socket)
+ break;
+ }
+ while (true);
+}
+
+void McastPGMSocket::handleDataTimeout()
+{
+ qDebug() << "handleDataTimeout()";
+
+ handleData();
+}
+
+void McastPGMSocket::canSend()
+{
+ if (_finished)
+ return;
+
+
+ if (_priv->send_notif)
+ {
+ _priv->send_notif->setEnabled(false);
+ }
+
+ if(_q.isEmpty())
+ {
+ emit readyToSend();
+ }
+ else
+ {
+ QByteArray const packet(_q.head());
+ int status;
+
+ status = pgm_send(_priv->socket, packet.constData(), packet.size(), 0);
+
+ if(status == PGM_IO_STATUS_NORMAL)
+ {
+ _q.dequeue();
+ if(!_q.isEmpty())
+ {
+ _priv->send_notif->setEnabled(true);
+ }
+ else
+ {
+ emit readyToSend();
+ }
+ }
+ else if(status == PGM_IO_STATUS_WOULD_BLOCK)
+ {
+ _priv->send_notif->setEnabled(true);
+ }
+ else if(status == PGM_IO_STATUS_CONGESTION)
+ {
+ qDebug() << " congested...";
+ // wait a short time (10ms?)
+ _sendTimeout->start(10);
+ }
+ else if(status == PGM_IO_STATUS_RATE_LIMITED)
+ {
+ struct timeval tv;
+ socklen_t size = sizeof(tv);
+ pgm_getsockopt(_priv->socket, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &size);
+ int msecs = (tv.tv_sec * 1000) + ((tv.tv_usec + 999) / 1000);
+ if(msecs == 0)
+ msecs = 1;
+ qDebug() << " rate_limited, waiting" << msecs << "ms";
+ _sendTimeout->start(msecs);
+ }
+ else
+ {
+ qCritical() << "Unhandled status in canSend():" << status;
+ }
+ }
+
+ if (_shutdownTimer)
+ _shutdownTimer->start(_shutdown_timeout);
+}
+
+void McastPGMSocket::sendPacket(QByteArray const& bytes)
+{
+ if(_shutdownTimer)
+ {
+ qCritical() << "Logic error: sendPacket() after shutdown()";
+ }
+
+ _q.enqueue(bytes);
+ _priv->send_notif->setEnabled(true);
+}
+
+void McastPGMSocket::finish()
+{
+ if (_finished)
+ {
+ return;
+ }
+
+ qDebug() << "finish()";
+
+ Q_FOREACH(QSocketNotifier* notif, _priv->_notifs)
+ {
+ notif->setEnabled(false);
+ delete notif;
+ }
+ _priv->_notifs.clear();
+
+ if(_priv->send_notif)
+ {
+ delete _priv->send_notif;
+ _priv->send_notif = 0;
+ }
+
+ if (_priv->socket)
+ {
+ pgm_close(_priv->socket, 1);
+ _priv->socket = 0;
+ }
+
+ _finished = true;
+
+ emit connectionFinished();
+
+ qDebug() << "Socket finished";
+}
+
+bool McastPGMSocket::finished() const
+{
+ return _finished;
+}
+
+bool McastPGMSocket::isOpen() const
+{
+ return _opened && !_finished;
+}
+
+void McastPGMSocket::shutdown(int interval)
+{
+ if(_priv->direction == PSOCK_READ)
+ return;
+
+ _shutdown_timeout = interval;
+ _shutdownTimer = new QTimer(this);
+ connect(_shutdownTimer, SIGNAL(timeout()), this, SLOT(finish()));
+ if (_q.isEmpty())
+ {
+ _shutdownTimer->start(_shutdown_timeout);
+ qDebug() << "Started shutdown timer";
+ }
+}