summaryrefslogtreecommitdiffstats
path: root/3rdparty/openpgm-svn-r1085/pgm/examples/enonblocksyncrecvmsg.c
diff options
context:
space:
mode:
Diffstat (limited to '3rdparty/openpgm-svn-r1085/pgm/examples/enonblocksyncrecvmsg.c')
-rw-r--r--3rdparty/openpgm-svn-r1085/pgm/examples/enonblocksyncrecvmsg.c382
1 files changed, 382 insertions, 0 deletions
diff --git a/3rdparty/openpgm-svn-r1085/pgm/examples/enonblocksyncrecvmsg.c b/3rdparty/openpgm-svn-r1085/pgm/examples/enonblocksyncrecvmsg.c
new file mode 100644
index 0000000..9a0e9c6
--- /dev/null
+++ b/3rdparty/openpgm-svn-r1085/pgm/examples/enonblocksyncrecvmsg.c
@@ -0,0 +1,382 @@
+/* vim:ts=8:sts=8:sw=4:noai:noexpandtab
+ *
+ * Simple PGM receiver: blocking synchronous receiver with scatter/gather io
+ *
+ * Copyright (c) 2006-2010 Miru Limited.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include <errno.h>
+#include <locale.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+#include <sys/epoll.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <glib.h>
+#ifdef G_OS_UNIX
+# include <netdb.h>
+# include <arpa/inet.h>
+# include <netinet/in.h>
+# include <sys/socket.h>
+#endif
+#include <pgm/pgm.h>
+
+/* example dependencies */
+#include <pgm/backtrace.h>
+#include <pgm/log.h>
+
+
+/* typedefs */
+
+/* globals */
+
+static int g_port = 0;
+static const char* g_network = "";
+static gboolean g_multicast_loop = FALSE;
+static int g_udp_encap_port = 0;
+
+static int g_max_tpdu = 1500;
+static int g_sqns = 100;
+
+static pgm_sock_t* g_sock = NULL;
+static gboolean g_quit = FALSE;
+
+static void on_signal (int);
+static gboolean on_startup (void);
+
+static int on_datav (struct pgm_msgv_t*, size_t);
+
+
+G_GNUC_NORETURN static
+void
+usage (
+ const char* bin
+ )
+{
+ fprintf (stderr, "Usage: %s [options]\n", bin);
+ fprintf (stderr, " -n <network> : Multicast group or unicast IP address\n");
+ fprintf (stderr, " -s <port> : IP port\n");
+ fprintf (stderr, " -p <port> : Encapsulate PGM in UDP on IP port\n");
+ fprintf (stderr, " -l : Enable multicast loopback and address sharing\n");
+ exit (1);
+}
+
+int
+main (
+ int argc,
+ char* argv[]
+ )
+{
+ pgm_error_t* pgm_err = NULL;
+
+ setlocale (LC_ALL, "");
+
+ log_init ();
+ g_message ("enonblocksyncrecvmsg");
+
+ if (!pgm_init (&pgm_err)) {
+ g_error ("Unable to start PGM engine: %s", pgm_err->message);
+ pgm_error_free (pgm_err);
+ return EXIT_FAILURE;
+ }
+
+/* parse program arguments */
+ const char* binary_name = strrchr (argv[0], '/');
+ int c;
+ while ((c = getopt (argc, argv, "s:n:p:lh")) != -1)
+ {
+ switch (c) {
+ case 'n': g_network = optarg; break;
+ case 's': g_port = atoi (optarg); break;
+ case 'p': g_udp_encap_port = atoi (optarg); break;
+ case 'l': g_multicast_loop = TRUE; break;
+
+ case 'h':
+ case '?': usage (binary_name);
+ }
+ }
+
+/* setup signal handlers */
+ signal (SIGSEGV, on_sigsegv);
+ signal (SIGINT, on_signal);
+ signal (SIGTERM, on_signal);
+#ifdef SIGHUP
+ signal (SIGHUP, SIG_IGN);
+#endif
+
+ if (!on_startup ()) {
+ g_error ("startup failed");
+ return EXIT_FAILURE;
+ }
+
+/* epoll file descriptor */
+ int efd = epoll_create (IP_MAX_MEMBERSHIPS);
+ if (efd < 0) {
+ g_error ("epoll_create failed errno %i: \"%s\"", errno, strerror(errno));
+ return EXIT_FAILURE;
+ }
+
+ int retval = pgm_epoll_ctl (g_sock, efd, EPOLL_CTL_ADD, EPOLLIN);
+ if (retval < 0) {
+ g_error ("pgm_epoll_ctl failed.");
+ return EXIT_FAILURE;
+ }
+
+/* incoming message buffer */
+ struct pgm_msgv_t msgv;
+ struct epoll_event events[1]; /* wait for maximum 1 event */
+
+/* dispatch loop */
+ g_message ("entering PGM message loop ... ");
+ do {
+ struct timeval tv;
+ int timeout;
+ size_t len;
+ const int status = pgm_recvmsg (g_sock,
+ &msgv,
+ 0,
+ &len,
+ &pgm_err);
+ switch (status) {
+ case PGM_IO_STATUS_NORMAL:
+ on_datav (&msgv, len);
+ break;
+
+ case PGM_IO_STATUS_TIMER_PENDING:
+ {
+ socklen_t optlen = sizeof (tv);
+ pgm_getsockopt (g_sock, PGM_TIME_REMAIN, &tv, &optlen);
+ }
+ goto block;
+ case PGM_IO_STATUS_RATE_LIMITED:
+ {
+ socklen_t optlen = sizeof (tv);
+ pgm_getsockopt (g_sock, PGM_RATE_REMAIN, &tv, &optlen);
+ }
+/* fall through */
+ case PGM_IO_STATUS_WOULD_BLOCK:
+/* poll for next event */
+block:
+ timeout = PGM_IO_STATUS_WOULD_BLOCK == status ? -1 : ((tv.tv_sec * 1000) + (tv.tv_usec / 1000));
+ epoll_wait (efd, events, G_N_ELEMENTS(events), timeout /* ms */);
+ break;
+
+ default:
+ if (pgm_err) {
+ g_warning ("%s", pgm_err->message);
+ pgm_error_free (pgm_err);
+ pgm_err = NULL;
+ }
+ if (PGM_IO_STATUS_ERROR == status)
+ break;
+ }
+ } while (!g_quit);
+
+ g_message ("message loop terminated, cleaning up.");
+
+/* cleanup */
+ close (efd);
+ if (g_sock) {
+ g_message ("closing PGM socket.");
+ pgm_close (g_sock, TRUE);
+ g_sock = NULL;
+ }
+
+ g_message ("PGM engine shutdown.");
+ pgm_shutdown ();
+ g_message ("finished.");
+ return EXIT_SUCCESS;
+}
+
+static
+void
+on_signal (
+ int signum
+ )
+{
+ g_message ("on_signal (signum:%d)", signum);
+ g_quit = TRUE;
+}
+
+static
+gboolean
+on_startup (void)
+{
+ struct pgm_addrinfo_t* res = NULL;
+ pgm_error_t* pgm_err = NULL;
+ sa_family_t sa_family = AF_UNSPEC;
+
+ g_message ("startup.");
+
+/* parse network parameter into transport address structure */
+ if (!pgm_getaddrinfo (g_network, NULL, &res, &pgm_err)) {
+ g_error ("parsing network parameter: %s", pgm_err->message);
+ goto err_abort;
+ }
+
+ sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
+
+ if (g_udp_encap_port) {
+ g_message ("create PGM/UDP socket.");
+ if (!pgm_socket (&g_sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP, &pgm_err)) {
+ g_error ("socket: %s", pgm_err->message);
+ goto err_abort;
+ }
+ pgm_setsockopt (g_sock, PGM_UDP_ENCAP_UCAST_PORT, &g_udp_encap_port, sizeof(g_udp_encap_port));
+ pgm_setsockopt (g_sock, PGM_UDP_ENCAP_MCAST_PORT, &g_udp_encap_port, sizeof(g_udp_encap_port));
+ } else {
+ g_message ("create PGM/IP socket.");
+ if (!pgm_socket (&g_sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM, &pgm_err)) {
+ g_error ("socket: %s", pgm_err->message);
+ goto err_abort;
+ }
+ }
+
+/* Use RFC 2113 tagging for PGM Router Assist */
+ const int no_router_assist = 0;
+ pgm_setsockopt (g_sock, PGM_IP_ROUTER_ALERT, &no_router_assist, sizeof(no_router_assist));
+
+ pgm_drop_superuser();
+
+/* set PGM parameters */
+ const int recv_only = 1,
+ passive = 0,
+ peer_expiry = pgm_secs (300),
+ spmr_expiry = pgm_msecs (250),
+ nak_bo_ivl = pgm_msecs (50),
+ nak_rpt_ivl = pgm_secs (2),
+ nak_rdata_ivl = pgm_secs (2),
+ nak_data_retries = 50,
+ nak_ncf_retries = 50;
+
+ pgm_setsockopt (g_sock, PGM_RECV_ONLY, &recv_only, sizeof(recv_only));
+ pgm_setsockopt (g_sock, PGM_PASSIVE, &passive, sizeof(passive));
+ pgm_setsockopt (g_sock, PGM_MTU, &g_max_tpdu, sizeof(g_max_tpdu));
+ pgm_setsockopt (g_sock, PGM_RXW_SQNS, &g_sqns, sizeof(g_sqns));
+ pgm_setsockopt (g_sock, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry));
+ pgm_setsockopt (g_sock, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry));
+ pgm_setsockopt (g_sock, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl));
+ pgm_setsockopt (g_sock, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl));
+ pgm_setsockopt (g_sock, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl));
+ pgm_setsockopt (g_sock, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries));
+ pgm_setsockopt (g_sock, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries));
+
+/* create global session identifier */
+ struct pgm_sockaddr_t addr;
+ memset (&addr, 0, sizeof(addr));
+ addr.sa_port = g_port ? g_port : DEFAULT_DATA_DESTINATION_PORT;
+ addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
+ if (!pgm_gsi_create_from_hostname (&addr.sa_addr.gsi, &pgm_err)) {
+ g_error ("creating GSI: %s", pgm_err->message);
+ goto err_abort;
+ }
+
+/* assign socket to specified address */
+ struct pgm_interface_req_t if_req;
+ memset (&if_req, 0, sizeof(if_req));
+ if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface;
+ if_req.ir_scope_id = 0;
+ if (AF_INET6 == sa_family) {
+ struct sockaddr_in6 sa6;
+ memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof(sa6));
+ if_req.ir_scope_id = sa6.sin6_scope_id;
+ }
+ if (!pgm_bind3 (g_sock,
+ &addr, sizeof(addr),
+ &if_req, sizeof(if_req), /* tx interface */
+ &if_req, sizeof(if_req), /* rx interface */
+ &pgm_err))
+ {
+ g_error ("binding PGM socket: %s", pgm_err->message);
+ goto err_abort;
+ }
+
+/* join IP multicast groups */
+ for (unsigned i = 0; i < res->ai_recv_addrs_len; i++)
+ pgm_setsockopt (g_sock, PGM_JOIN_GROUP, &res->ai_recv_addrs[i], sizeof(struct group_req));
+ pgm_setsockopt (g_sock, PGM_SEND_GROUP, &res->ai_send_addrs[0], sizeof(struct group_req));
+ pgm_freeaddrinfo (res);
+
+/* set IP parameters */
+ const int nonblocking = 1,
+ multicast_loop = g_multicast_loop ? 1 : 0,
+ multicast_hops = 16,
+ dscp = 0x2e << 2; /* Expedited Forwarding PHB for network elements, no ECN. */
+
+ pgm_setsockopt (g_sock, PGM_MULTICAST_LOOP, &multicast_loop, sizeof(multicast_loop));
+ pgm_setsockopt (g_sock, PGM_MULTICAST_HOPS, &multicast_hops, sizeof(multicast_hops));
+ if (AF_INET6 != sa_family)
+ pgm_setsockopt (g_sock, PGM_TOS, &dscp, sizeof(dscp));
+ pgm_setsockopt (g_sock, PGM_NOBLOCK, &nonblocking, sizeof(nonblocking));
+
+ if (!pgm_connect (g_sock, &pgm_err)) {
+ g_error ("connecting PGM socket: %s", pgm_err->message);
+ goto err_abort;
+ }
+
+ g_message ("startup complete.");
+ return TRUE;
+
+err_abort:
+ if (NULL != g_sock) {
+ pgm_close (g_sock, FALSE);
+ g_sock = NULL;
+ }
+ if (NULL != res) {
+ pgm_freeaddrinfo (res);
+ res = NULL;
+ }
+ if (NULL != pgm_err) {
+ pgm_error_free (pgm_err);
+ pgm_err = NULL;
+ }
+ return FALSE;
+}
+
+static
+int
+on_datav (
+ struct pgm_msgv_t* datav, /* one msgv object */
+ size_t len
+ )
+{
+ char tsi[PGM_TSISTRLEN];
+ pgm_tsi_print_r (&datav->msgv_skb[0]->tsi, tsi, sizeof(tsi));
+ g_message ("(%u bytes from %s)", (unsigned)len, tsi);
+
+/* protect against non-null terminated strings */
+ const struct pgm_sk_buff_t* skb = datav->msgv_skb[0];
+ int i = 0;
+ while (len)
+ {
+ char buf[1024];
+ const size_t buflen = MIN( sizeof(buf) - 1, skb->len );
+ strncpy (buf, (const char*)skb->data, buflen);
+ buf[buflen] = '\0';
+ g_message ("\t%i: %s (%" G_GUINT16_FORMAT " bytes)", ++i, buf, skb->len);
+ len -= skb->len;
+ skb++;
+ }
+
+ return 0;
+}
+
+/* eof */