summaryrefslogtreecommitdiffstats
path: root/src/net/mcast/McastPGMSocket.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net/mcast/McastPGMSocket.cpp')
-rw-r--r--src/net/mcast/McastPGMSocket.cpp601
1 files changed, 601 insertions, 0 deletions
diff --git a/src/net/mcast/McastPGMSocket.cpp b/src/net/mcast/McastPGMSocket.cpp
new file mode 100644
index 0000000..0d6b694
--- /dev/null
+++ b/src/net/mcast/McastPGMSocket.cpp
@@ -0,0 +1,601 @@
+/*
+# Copyright (c) 2009 - OpenSLX Project, Computer Center University of Freiburg
+#
+# This program is free software distributed under the GPL version 2.
+# See http://openslx.org/COPYING
+#
+# If you have any feedback please consult http://openslx.org/feedback and
+# send your suggestions, praise, or complaints to feedback@openslx.org
+#
+# General information about OpenSLX can be found at http://openslx.org/
+# -----------------------------------------------------------------------------
+# src/net/mcast/McastPGMSocket.cpp
+# - wrap OpenPGM Sockets in a nicer interface -- implementation
+# -----------------------------------------------------------------------------
+*/
+
+#include <cstdlib>
+
+#include <QByteArray>
+#include <QtDebug>
+#include <QPointer>
+#include <QSocketNotifier>
+#include <QTimer>
+
+#include <pgm/pgm.h>
+// #include <stdint.h>
+// #define SIZE_MAX UINT64_MAX
+// #include <impl/socket.h>
+// pgm redefined bool to int. Undo that.
+#undef bool
+
+#include <sys/poll.h>
+
+#include "McastPGMSocket.h"
+
+class McastPGMSocket_priv
+{
+public:
+ McastPGMSocket_priv() :
+ socket(0),
+ recv_notif(0),
+ repair_notif(0),
+ pending_notif(0),
+ send_notif(0)
+ {
+ }
+ ~McastPGMSocket_priv()
+ {
+ if (socket)
+ pgm_close(socket, 0);
+ if (recv_notif)
+ delete recv_notif;
+ if (repair_notif)
+ delete repair_notif;
+ if (pending_notif)
+ delete pending_notif;
+ if (send_notif)
+ delete send_notif;
+ }
+
+ pgm_sock_t* socket;
+ McastPGMSocket::Direction direction;
+ QSocketNotifier* recv_notif;
+ QSocketNotifier* repair_notif;
+ QSocketNotifier* pending_notif;
+ QSocketNotifier* send_notif;
+
+ QSocketNotifier* notifier_for(int fd) {
+ if (recv_notif && (fd == recv_notif->socket()))
+ {
+ return recv_notif;
+ }
+ else if (repair_notif && (fd == repair_notif->socket()))
+ {
+ return repair_notif;
+ }
+ else if (pending_notif && (fd == pending_notif->socket()))
+ {
+ return pending_notif;
+ }
+ return 0;
+ }
+};
+
+static void _ensurePGMInited()
+{
+ if (!pgm_supported())
+ {
+ pgm_error_t* err;
+ int good = pgm_init(&err);
+ if (!good)
+ {
+ qCritical() << "Could not init OpenPGM library: PGM Error: " << (err->message ? err->message : "(null)");
+ std::exit(1);
+ }
+ }
+}
+
+McastPGMSocket::McastPGMSocket(QObject* parent) :
+ QObject(parent),
+ _priv(new McastPGMSocket_priv),
+ _finished(false),
+ _nakTimeout(new QTimer()),
+ _dataTimeout(new QTimer()),
+ _sendTimeout(new QTimer())
+{
+ _ensurePGMInited();
+
+ _nakTimeout->setSingleShot(true);
+ _dataTimeout->setSingleShot(true);
+ _sendTimeout->setSingleShot(true);
+
+ connect(_nakTimeout, SIGNAL(timeout()), this, SLOT(handleNakTimeout()));
+ connect(_dataTimeout, SIGNAL(timeout()), this, SLOT(handleDataTimeout()));
+ connect(_sendTimeout, SIGNAL(timeout()), this, SLOT(canSend()));
+}
+
+McastPGMSocket::~McastPGMSocket()
+{
+ delete _priv;
+ delete _nakTimeout;
+ delete _dataTimeout;
+ delete _sendTimeout;
+}
+
+bool McastPGMSocket::open(McastConfiguration const* config, Direction direction)
+{
+ _priv->direction = direction;
+
+ pgm_error_t* err = 0;
+ int good;
+
+ pgm_addrinfo_t* addrinfo;
+ // parse the address string
+ good = pgm_getaddrinfo(config->multicastAddress().toLatin1().constData(),
+ 0, &addrinfo, &err);
+ if (!good)
+ {
+ qCritical() << "Could not parse address info: PGM Error: "
+ << err->message;
+ }
+
+ sa_family_t family = addrinfo->ai_send_addrs[0].gsr_group.ss_family;
+
+ good
+ = pgm_socket(&_priv->socket, family, SOCK_SEQPACKET, IPPROTO_PGM, &err);
+ if (!good)
+ {
+ qCritical() << "Could not open socket: PGM Error: " << err->message;
+ pgm_error_free(err);
+ return false;
+ }
+
+ unsigned const ambient_spm = 4096 * 1000; // every four seconds (approx.)
+
+ // set parameters
+ if (direction == PSOCK_WRITE)
+ {
+ // write-only socket
+ const int send_only = 1,
+ spm_heartbeat[] =
+ { 16 * 1000, 16 * 1000, 16 * 1000, 16 * 1000, 32 * 1000, 64 * 1000, 128
+ * 1000, 256 * 1000, 512 * 1000, 1024 * 1000, 2048 * 1000, 4096
+ * 1000 },
+ max_rate = config->multicastRate(),
+ max_window = config->multicastWinSize();
+ // const int max_window_sqns = 3000;
+
+ pgm_setsockopt(_priv->socket, PGM_SEND_ONLY, &send_only,
+ sizeof(send_only));
+
+ // SPM messages
+ pgm_setsockopt(_priv->socket, PGM_AMBIENT_SPM, &ambient_spm, sizeof(ambient_spm));
+ pgm_setsockopt(_priv->socket, PGM_HEARTBEAT_SPM, &spm_heartbeat, sizeof(spm_heartbeat));
+
+ // Transmit window
+ pgm_setsockopt(_priv->socket, PGM_TXW_MAX_RTE, &max_rate, sizeof(max_rate));
+ // pgm_setsockopt(_priv->socket, PGM_TXW_SECS, &max_window, sizeof(max_window));
+ pgm_setsockopt(_priv->socket, PGM_TXW_SQNS, &max_window, sizeof(max_window));
+ }
+ else
+ {
+ // readonly
+ const int recv_only = 1,
+ passive = 0,
+ max_window = config->multicastWinSize(),
+ max_winsqns = 0,
+ peer_expiry = ambient_spm * 5,
+ spmr_expiry = 250 * 1000,
+ nak_bo_ivl = 500 * 1000,
+ nak_rpt_ivl = 500 * 1000,
+ nak_rdata_ivl = 2000 * 1000,
+ nak_data_retries = 50,
+ nak_ncf_retries = 50;
+ pgm_setsockopt(_priv->socket, PGM_RECV_ONLY, &recv_only, sizeof(recv_only));
+ pgm_setsockopt(_priv->socket, PGM_PASSIVE, &passive, sizeof(passive));
+ // pgm_setsockopt(_priv->socket, PGM_RXW_SECS, &max_window, sizeof(max_window));
+ pgm_setsockopt(_priv->socket, PGM_RXW_SQNS, &max_window, sizeof(max_window));
+ pgm_setsockopt(_priv->socket, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry));
+ pgm_setsockopt(_priv->socket, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry));
+ pgm_setsockopt(_priv->socket, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl));
+ pgm_setsockopt(_priv->socket, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl));
+ pgm_setsockopt(_priv->socket, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl));
+ pgm_setsockopt(_priv->socket, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries));
+ pgm_setsockopt(_priv->socket, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries));
+ }
+
+ // MTU
+ int const mtu = config->multicastMTU();
+ pgm_setsockopt(_priv->socket, PGM_MTU, &mtu, sizeof(mtu));
+
+ pgm_sockaddr_t addr;
+ addr.sa_addr.sport = config->multicastSPort();
+ addr.sa_port = config->multicastDPort();
+ good = pgm_gsi_create_from_hostname(&addr.sa_addr.gsi, &err);
+ if (!good)
+ {
+ qCritical() << "Could not generate a GSI: PGM Error: " << err->message;
+ pgm_error_free(err);
+ return false;
+ }
+
+ good = pgm_bind3(_priv->socket, &addr, sizeof(addr), (struct group_req*)&addrinfo->ai_send_addrs[0], sizeof(struct group_req), (struct group_req*)&addrinfo->ai_recv_addrs[0], sizeof(struct group_req), &err);
+ if (!good)
+ {
+ qCritical() << "Could not bind socket: PGM Error: " << err->message;
+ pgm_error_free(err);
+ return false;
+ }
+
+ // qDebug() << "Max APDU is " << _priv->socket->max_apdu;
+ // qDebug() << "Max TPDU is " << _priv->socket->max_tpdu;
+ // qDebug() << "Max TSDU Fragment is " << _priv->socket->max_tsdu_fragment;
+ // qDebug() << "TXW_SQNS is " << _priv->socket->txw_sqns;
+
+ // join the group
+ for (unsigned i = 0; i < addrinfo->ai_recv_addrs_len; i++)
+ {
+ pgm_setsockopt(_priv->socket, PGM_JOIN_GROUP,
+ &addrinfo->ai_recv_addrs[i], sizeof(struct group_req));
+ }
+
+ // set send address
+ pgm_setsockopt(_priv->socket, PGM_SEND_GROUP, &addrinfo->ai_send_addrs[0],
+ sizeof(struct group_req));
+
+ // IP parameters
+ const int nonblocking = 1, multicast_loop = 0, multicast_hops = 16;
+ pgm_setsockopt(_priv->socket, PGM_MULTICAST_LOOP, &multicast_loop,
+ sizeof(multicast_loop));
+ pgm_setsockopt(_priv->socket, PGM_MULTICAST_HOPS, &multicast_hops,
+ sizeof(multicast_hops));
+ pgm_setsockopt(_priv->socket, PGM_NOBLOCK, &nonblocking,
+ sizeof(nonblocking));
+
+ good = pgm_connect(_priv->socket, &err);
+ if (!good)
+ {
+ qCritical() << "Could not connect socket: PGM Error: " << err->message;
+ pgm_error_free(err);
+ return false;
+ }
+
+ setupNotifiers();
+
+ pgm_freeaddrinfo(addrinfo);
+
+ return true;
+}
+
+void McastPGMSocket::setupNotifiers()
+{
+ int recv_sock, repair_sock, pending_sock;
+ char const* slotname = (_priv->direction == PSOCK_WRITE) ? SLOT(handleNak(int)) : SLOT(handleData(int));
+
+ pgm_getsockopt(_priv->socket, PGM_RECV_SOCK, &recv_sock, sizeof(recv_sock));
+ _priv->recv_notif = new QSocketNotifier(recv_sock, QSocketNotifier::Read,
+ this);
+ connect(_priv->recv_notif, SIGNAL(activated(int)), this, slotname);
+
+ pgm_getsockopt(_priv->socket, PGM_REPAIR_SOCK, &repair_sock, sizeof(repair_sock));
+ _priv->repair_notif = new QSocketNotifier(repair_sock,
+ QSocketNotifier::Read, this);
+ connect(_priv->repair_notif, SIGNAL(activated(int)), this, slotname);
+
+ pgm_getsockopt(_priv->socket, PGM_PENDING_SOCK, &pending_sock, sizeof(pending_sock));
+ _priv->pending_notif = new QSocketNotifier(pending_sock,
+ QSocketNotifier::Read, this);
+ connect(_priv->pending_notif, SIGNAL(activated(int)), this, slotname);
+
+ if(_priv->direction == PSOCK_WRITE)
+ {
+ struct pollfd pfd;
+ int nfds = 1;
+ pgm_poll_info(_priv->socket, &pfd, &nfds, POLLOUT);
+ _priv->send_notif = new QSocketNotifier(pfd.fd, QSocketNotifier::Write, this);
+ connect(_priv->send_notif, SIGNAL(activated(int)), this, SLOT(canSend()));
+ }
+}
+
+void McastPGMSocket::handleNak(int fd)
+{
+ qDebug() << "handleNak(int)";
+
+ QSocketNotifier* notif = _priv->notifier_for(fd);
+ notif->setEnabled(false);
+
+ handleNak();
+
+ notif->setEnabled(true);
+}
+
+void McastPGMSocket::handleNak()
+{
+ if (_finished)
+ return;
+
+ qDebug() << "handleNak()";
+
+ QTimer::singleShot(1000, this, SLOT(handleNakTimeout()));
+
+ // to handle NAKs in OpenPGM, we need to pgm_recv:
+ char buf[4096];
+ pgm_error_t* err = 0;
+
+ int status;
+ // while we don't block:
+ do
+ {
+ status = pgm_recv(_priv->socket, buf, sizeof(buf), MSG_DONTWAIT, 0, &err);
+
+ if(status == PGM_IO_STATUS_TIMER_PENDING)
+ {
+ struct timeval tv;
+ pgm_getsockopt(_priv->socket, PGM_TIME_REMAIN, &tv, sizeof(tv));
+ const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+ qDebug() << " timer pending: " << msecs << "ms";
+ _nakTimeout->start(msecs);
+ break;
+ }
+ else if(status == PGM_IO_STATUS_RATE_LIMITED)
+ {
+ struct timeval tv;
+ pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, sizeof(tv));
+ const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+ qDebug() << " rate limited: " << msecs << "ms";
+ _nakTimeout->start(msecs);
+ break;
+ }
+ else if(status == PGM_IO_STATUS_WOULD_BLOCK)
+ {
+ qDebug() << " wouldblock";
+ break;
+ }
+ else
+ {
+ if(err)
+ {
+ qCritical() << "Could not handle NAKs: PGM Error: " << err->message;
+ pgm_error_free(err);
+ err = 0;
+ }
+ }
+ }
+ while (true);
+}
+
+void McastPGMSocket::handleNakTimeout()
+{
+ qDebug() << "handleNakTimeout()";
+
+ handleNak();
+}
+
+void McastPGMSocket::handleData(int fd)
+{
+ // need to guard against destruction in finish() via signals/slots
+ QPointer<QSocketNotifier> notif(_priv->notifier_for(fd));
+ notif->setEnabled(false);
+
+ handleData();
+
+ if (notif)
+ notif->setEnabled(true);
+}
+
+void McastPGMSocket::handleData()
+{
+ qDebug() << "handleData()";
+
+ if (_finished) {
+ qDebug() << " finished!";
+ return;
+ }
+
+ int status;
+ do
+ {
+ char buf[4096];
+ size_t size;
+ pgm_error_t* err;
+
+ status = pgm_recv(_priv->socket, buf, sizeof(buf), MSG_DONTWAIT, &size, &err);
+
+ if (status == PGM_IO_STATUS_NORMAL)
+ {
+ qDebug() << " normally received";
+ if(size > 0)
+ {
+ QByteArray bytes(buf, size);
+ emit receivedPacket(bytes);
+ }
+ }
+ else if (status == PGM_IO_STATUS_WOULD_BLOCK)
+ {
+ qDebug() << " would block";
+ // nothing more to do this time
+ break;
+ }
+ else if (status == PGM_IO_STATUS_TIMER_PENDING)
+ {
+ struct timeval tv;
+ pgm_getsockopt(_priv->socket, PGM_TIME_REMAIN, &tv, sizeof(tv));
+ const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+ qDebug() << " timer pending: " << msecs << "ms";
+ _dataTimeout->start(msecs);
+ break;
+ }
+ else if (status == PGM_IO_STATUS_RATE_LIMITED)
+ {
+ struct timeval tv;
+ pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, sizeof(tv));
+ const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+ qDebug() << " rate limit pending: " << msecs << "ms";
+ _dataTimeout->start(msecs);
+ break;
+ }
+ else if (status == PGM_IO_STATUS_RESET)
+ {
+ qDebug() << " connection reset";
+ emit connectionReset();
+ qCritical() << "Connection Reset: PGM Error: " << (err ? err->message : "(null)");
+ break;
+ }
+ else if (status == PGM_IO_STATUS_FIN)
+ {
+ qDebug() << " connection finished";
+ emit connectionFinished();
+ break;
+ }
+ else
+ {
+ if(err)
+ {
+ qCritical() << "Could not read packet: PGM Error: " << (err ? err->message: "(null)");
+ break;
+ }
+ }
+
+ // the socket might have been closed from under us
+ if (!_priv->socket)
+ break;
+ }
+ while (true);
+}
+
+void McastPGMSocket::handleDataTimeout()
+{
+ qDebug() << "handleDataTimeout()";
+
+ handleData();
+}
+
+void McastPGMSocket::canSend()
+{
+ if (_finished)
+ return;
+
+ qDebug() << "canSend()";
+
+ if (_priv->send_notif)
+ {
+ _priv->send_notif->setEnabled(false);
+ }
+
+ bool reenable = true;
+
+ while (!_q.isEmpty())
+ {
+ int status;
+ QByteArray const nextPacket(_q.head());
+ status = pgm_send(_priv->socket, nextPacket.constData(), nextPacket.size(), 0);
+ if (status == PGM_IO_STATUS_ERROR || status == PGM_IO_STATUS_RESET)
+ {
+ qCritical() << "Could not send packet: PGM Error.";
+ continue;
+ }
+ else if (status == PGM_IO_STATUS_WOULD_BLOCK)
+ {
+ qDebug() << " would block";
+ break;
+ }
+ else if (status == PGM_IO_STATUS_RATE_LIMITED)
+ {
+ struct timeval tv;
+ pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, sizeof(tv));
+ const int msecs = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
+ qDebug() << " rate limited:" << msecs << "ms";
+ _sendTimeout->start((msecs > 0) ? msecs : 1);
+ reenable = false;
+ break;
+ }
+ else if (status == PGM_IO_STATUS_NORMAL)
+ {
+ qDebug() << " sent";
+ _q.dequeue();
+ continue;
+ }
+ else
+ {
+ qCritical() << "Unhandled condition in McastPGMSocket::canSend()";
+ }
+ }
+
+ if (_priv->send_notif && reenable)
+ {
+ emit readyToSend();
+
+ qDebug() << " reenable notifier";
+ _priv->send_notif->setEnabled(true);
+ }
+}
+
+void McastPGMSocket::sendPacket(QByteArray const& bytes)
+{
+ if(_q.isEmpty())
+ {
+ int status = pgm_send(_priv->socket, bytes.constData(), bytes.size(), 0);
+
+ if (status == PGM_IO_STATUS_ERROR || status == PGM_IO_STATUS_RESET)
+ {
+ qCritical() << "Could not send packet: PGM Error.";
+ return;
+ }
+ else if (status == PGM_IO_STATUS_WOULD_BLOCK)
+ {
+ _q.enqueue(bytes);
+ }
+ else if (status == PGM_IO_STATUS_RATE_LIMITED)
+ {
+ _q.enqueue(bytes);
+ struct timeval tv;
+ pgm_getsockopt(_priv->socket, PGM_RATE_REMAIN, &tv, sizeof(tv));
+ _dataTimeout->start((tv.tv_sec * 1000) + (tv.tv_usec / 1000));
+ }
+ else if (status == PGM_IO_STATUS_NORMAL)
+ {
+ return;
+ }
+ else
+ {
+ qCritical() << "Unhandled condition in McastPGMSocket::sendPacket()";
+ }
+ } else {
+ _q.enqueue(bytes);
+ }
+}
+
+void McastPGMSocket::finish()
+{
+ if(_priv->pending_notif)
+ {
+ delete _priv->pending_notif;
+ _priv->pending_notif = 0;
+ }
+ if(_priv->recv_notif)
+ {
+ delete _priv->recv_notif;
+ _priv->recv_notif = 0;
+ }
+ if(_priv->repair_notif)
+ {
+ delete _priv->repair_notif;
+ _priv->repair_notif = 0;
+ }
+ if(_priv->send_notif)
+ {
+ delete _priv->send_notif;
+ _priv->send_notif = 0;
+ }
+
+ pgm_close(_priv->socket, 1);
+ _priv->socket = 0;
+
+ _finished = true;
+}
+
+bool McastPGMSocket::finished() const
+{
+ return _finished;
+}