diff options
Diffstat (limited to '3rdparty/openpgm-svn-r1135/pgm/examples/enonblocksyncrecvmsg.c')
-rw-r--r-- | 3rdparty/openpgm-svn-r1135/pgm/examples/enonblocksyncrecvmsg.c | 382 |
1 files changed, 382 insertions, 0 deletions
diff --git a/3rdparty/openpgm-svn-r1135/pgm/examples/enonblocksyncrecvmsg.c b/3rdparty/openpgm-svn-r1135/pgm/examples/enonblocksyncrecvmsg.c new file mode 100644 index 0000000..43deb89 --- /dev/null +++ b/3rdparty/openpgm-svn-r1135/pgm/examples/enonblocksyncrecvmsg.c @@ -0,0 +1,382 @@ +/* vim:ts=8:sts=8:sw=4:noai:noexpandtab + * + * Simple PGM receiver: blocking synchronous receiver with scatter/gather io + * + * Copyright (c) 2006-2010 Miru Limited. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include <errno.h> +#include <locale.h> +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <unistd.h> +#include <sys/epoll.h> +#include <sys/time.h> +#include <sys/types.h> +#include <glib.h> +#ifdef G_OS_UNIX +# include <netdb.h> +# include <arpa/inet.h> +# include <netinet/in.h> +# include <sys/socket.h> +#endif +#include <pgm/pgm.h> + +/* example dependencies */ +#include <pgm/backtrace.h> +#include <pgm/log.h> + + +/* typedefs */ + +/* globals */ + +static int g_port = 0; +static const char* g_network = ""; +static gboolean g_multicast_loop = FALSE; +static int g_udp_encap_port = 0; + +static int g_max_tpdu = 1500; +static int g_sqns = 100; + +static pgm_sock_t* g_sock = NULL; +static gboolean g_quit = FALSE; + +static void on_signal (int); +static gboolean on_startup (void); + +static int on_datav (struct pgm_msgv_t*, size_t); + + +G_GNUC_NORETURN static +void +usage ( + const char* bin + ) +{ + fprintf (stderr, "Usage: %s [options]\n", bin); + fprintf (stderr, " -n <network> : Multicast group or unicast IP address\n"); + fprintf (stderr, " -s <port> : IP port\n"); + fprintf (stderr, " -p <port> : Encapsulate PGM in UDP on IP port\n"); + fprintf (stderr, " -l : Enable multicast loopback and address sharing\n"); + exit (1); +} + +int +main ( + int argc, + char* argv[] + ) +{ + pgm_error_t* pgm_err = NULL; + + setlocale (LC_ALL, ""); + + log_init (); + g_message ("enonblocksyncrecvmsg"); + + if (!pgm_init (&pgm_err)) { + g_error ("Unable to start PGM engine: %s", pgm_err->message); + pgm_error_free (pgm_err); + return EXIT_FAILURE; + } + +/* parse program arguments */ + const char* binary_name = strrchr (argv[0], '/'); + int c; + while ((c = getopt (argc, argv, "s:n:p:lh")) != -1) + { + switch (c) { + case 'n': g_network = optarg; break; + case 's': g_port = atoi (optarg); break; + case 'p': g_udp_encap_port = atoi (optarg); break; + case 'l': g_multicast_loop = TRUE; break; + + case 'h': + case '?': usage (binary_name); + } + } + +/* setup signal handlers */ + signal (SIGSEGV, on_sigsegv); + signal (SIGINT, on_signal); + signal (SIGTERM, on_signal); +#ifdef SIGHUP + signal (SIGHUP, SIG_IGN); +#endif + + if (!on_startup ()) { + g_error ("startup failed"); + return EXIT_FAILURE; + } + +/* epoll file descriptor */ + int efd = epoll_create (IP_MAX_MEMBERSHIPS); + if (efd < 0) { + g_error ("epoll_create failed errno %i: \"%s\"", errno, strerror(errno)); + return EXIT_FAILURE; + } + + int retval = pgm_epoll_ctl (g_sock, efd, EPOLL_CTL_ADD, EPOLLIN); + if (retval < 0) { + g_error ("pgm_epoll_ctl failed."); + return EXIT_FAILURE; + } + +/* incoming message buffer */ + struct pgm_msgv_t msgv; + struct epoll_event events[1]; /* wait for maximum 1 event */ + +/* dispatch loop */ + g_message ("entering PGM message loop ... "); + do { + struct timeval tv; + int timeout; + size_t len; + const int status = pgm_recvmsg (g_sock, + &msgv, + 0, + &len, + &pgm_err); + switch (status) { + case PGM_IO_STATUS_NORMAL: + on_datav (&msgv, len); + break; + + case PGM_IO_STATUS_TIMER_PENDING: + { + socklen_t optlen = sizeof (tv); + pgm_getsockopt (g_sock, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &optlen); + } + goto block; + case PGM_IO_STATUS_RATE_LIMITED: + { + socklen_t optlen = sizeof (tv); + pgm_getsockopt (g_sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen); + } +/* fall through */ + case PGM_IO_STATUS_WOULD_BLOCK: +/* poll for next event */ +block: + timeout = PGM_IO_STATUS_WOULD_BLOCK == status ? -1 : ((tv.tv_sec * 1000) + (tv.tv_usec / 1000)); + epoll_wait (efd, events, G_N_ELEMENTS(events), timeout /* ms */); + break; + + default: + if (pgm_err) { + g_warning ("%s", pgm_err->message); + pgm_error_free (pgm_err); + pgm_err = NULL; + } + if (PGM_IO_STATUS_ERROR == status) + break; + } + } while (!g_quit); + + g_message ("message loop terminated, cleaning up."); + +/* cleanup */ + close (efd); + if (g_sock) { + g_message ("closing PGM socket."); + pgm_close (g_sock, TRUE); + g_sock = NULL; + } + + g_message ("PGM engine shutdown."); + pgm_shutdown (); + g_message ("finished."); + return EXIT_SUCCESS; +} + +static +void +on_signal ( + int signum + ) +{ + g_message ("on_signal (signum:%d)", signum); + g_quit = TRUE; +} + +static +gboolean +on_startup (void) +{ + struct pgm_addrinfo_t* res = NULL; + pgm_error_t* pgm_err = NULL; + sa_family_t sa_family = AF_UNSPEC; + + g_message ("startup."); + +/* parse network parameter into transport address structure */ + if (!pgm_getaddrinfo (g_network, NULL, &res, &pgm_err)) { + g_error ("parsing network parameter: %s", pgm_err->message); + goto err_abort; + } + + sa_family = res->ai_send_addrs[0].gsr_group.ss_family; + + if (g_udp_encap_port) { + g_message ("create PGM/UDP socket."); + if (!pgm_socket (&g_sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP, &pgm_err)) { + g_error ("socket: %s", pgm_err->message); + goto err_abort; + } + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, &g_udp_encap_port, sizeof(g_udp_encap_port)); + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, &g_udp_encap_port, sizeof(g_udp_encap_port)); + } else { + g_message ("create PGM/IP socket."); + if (!pgm_socket (&g_sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM, &pgm_err)) { + g_error ("socket: %s", pgm_err->message); + goto err_abort; + } + } + +/* Use RFC 2113 tagging for PGM Router Assist */ + const int no_router_assist = 0; + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_IP_ROUTER_ALERT, &no_router_assist, sizeof(no_router_assist)); + + pgm_drop_superuser(); + +/* set PGM parameters */ + const int recv_only = 1, + passive = 0, + peer_expiry = pgm_secs (300), + spmr_expiry = pgm_msecs (250), + nak_bo_ivl = pgm_msecs (50), + nak_rpt_ivl = pgm_secs (2), + nak_rdata_ivl = pgm_secs (2), + nak_data_retries = 50, + nak_ncf_retries = 50; + + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only, sizeof(recv_only)); + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_PASSIVE, &passive, sizeof(passive)); + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_MTU, &g_max_tpdu, sizeof(g_max_tpdu)); + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_RXW_SQNS, &g_sqns, sizeof(g_sqns)); + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry)); + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry)); + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl)); + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl)); + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl)); + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries)); + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries)); + +/* create global session identifier */ + struct pgm_sockaddr_t addr; + memset (&addr, 0, sizeof(addr)); + addr.sa_port = g_port ? g_port : DEFAULT_DATA_DESTINATION_PORT; + addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT; + if (!pgm_gsi_create_from_hostname (&addr.sa_addr.gsi, &pgm_err)) { + g_error ("creating GSI: %s", pgm_err->message); + goto err_abort; + } + +/* assign socket to specified address */ + struct pgm_interface_req_t if_req; + memset (&if_req, 0, sizeof(if_req)); + if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface; + if_req.ir_scope_id = 0; + if (AF_INET6 == sa_family) { + struct sockaddr_in6 sa6; + memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof(sa6)); + if_req.ir_scope_id = sa6.sin6_scope_id; + } + if (!pgm_bind3 (g_sock, + &addr, sizeof(addr), + &if_req, sizeof(if_req), /* tx interface */ + &if_req, sizeof(if_req), /* rx interface */ + &pgm_err)) + { + g_error ("binding PGM socket: %s", pgm_err->message); + goto err_abort; + } + +/* join IP multicast groups */ + for (unsigned i = 0; i < res->ai_recv_addrs_len; i++) + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_JOIN_GROUP, &res->ai_recv_addrs[i], sizeof(struct group_req)); + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_SEND_GROUP, &res->ai_send_addrs[0], sizeof(struct group_req)); + pgm_freeaddrinfo (res); + +/* set IP parameters */ + const int nonblocking = 1, + multicast_loop = g_multicast_loop ? 1 : 0, + multicast_hops = 16, + dscp = 0x2e << 2; /* Expedited Forwarding PHB for network elements, no ECN. */ + + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_MULTICAST_LOOP, &multicast_loop, sizeof(multicast_loop)); + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_MULTICAST_HOPS, &multicast_hops, sizeof(multicast_hops)); + if (AF_INET6 != sa_family) + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_TOS, &dscp, sizeof(dscp)); + pgm_setsockopt (g_sock, IPPROTO_PGM, PGM_NOBLOCK, &nonblocking, sizeof(nonblocking)); + + if (!pgm_connect (g_sock, &pgm_err)) { + g_error ("connecting PGM socket: %s", pgm_err->message); + goto err_abort; + } + + g_message ("startup complete."); + return TRUE; + +err_abort: + if (NULL != g_sock) { + pgm_close (g_sock, FALSE); + g_sock = NULL; + } + if (NULL != res) { + pgm_freeaddrinfo (res); + res = NULL; + } + if (NULL != pgm_err) { + pgm_error_free (pgm_err); + pgm_err = NULL; + } + return FALSE; +} + +static +int +on_datav ( + struct pgm_msgv_t* datav, /* one msgv object */ + size_t len + ) +{ + char tsi[PGM_TSISTRLEN]; + pgm_tsi_print_r (&datav->msgv_skb[0]->tsi, tsi, sizeof(tsi)); + g_message ("(%u bytes from %s)", (unsigned)len, tsi); + +/* protect against non-null terminated strings */ + const struct pgm_sk_buff_t* skb = datav->msgv_skb[0]; + int i = 0; + while (len) + { + char buf[1024]; + const size_t buflen = MIN( sizeof(buf) - 1, skb->len ); + strncpy (buf, (const char*)skb->data, buflen); + buf[buflen] = '\0'; + g_message ("\t%i: %s (%" G_GUINT16_FORMAT " bytes)", ++i, buf, skb->len); + len -= skb->len; + skb++; + } + + return 0; +} + +/* eof */ |