summaryrefslogtreecommitdiffstats
path: root/3rdparty/openpgm-svn-r1085/pgm/examples/pgmping.cc
diff options
context:
space:
mode:
Diffstat (limited to '3rdparty/openpgm-svn-r1085/pgm/examples/pgmping.cc')
-rw-r--r--3rdparty/openpgm-svn-r1085/pgm/examples/pgmping.cc1059
1 files changed, 1059 insertions, 0 deletions
diff --git a/3rdparty/openpgm-svn-r1085/pgm/examples/pgmping.cc b/3rdparty/openpgm-svn-r1085/pgm/examples/pgmping.cc
new file mode 100644
index 0000000..38ac560
--- /dev/null
+++ b/3rdparty/openpgm-svn-r1085/pgm/examples/pgmping.cc
@@ -0,0 +1,1059 @@
+/* vim:ts=8:sts=8:sw=4:noai:noexpandtab
+ *
+ * Simple send/reply ping tool using the PGM transport.
+ *
+ * With no arguments, one message is sent per second.
+ *
+ * Copyright (c) 2006-2010 Miru Limited.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+/* c99 compatibility for c++ */
+#define __STDC_LIMIT_MACROS
+
+/* Must be first for Sun */
+#include "ping.pb.h"
+
+/* c99 compatibility for c++ */
+#define __STDC_FORMAT_MACROS
+#define restrict
+
+#include <cerrno>
+#include <clocale>
+#include <csignal>
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+
+#include <inttypes.h>
+#include <math.h>
+#include <unistd.h>
+#include <time.h>
+#include <sys/time.h>
+#ifdef CONFIG_HAVE_EPOLL
+# include <sys/epoll.h>
+#endif
+#include <sys/types.h>
+#ifndef _WIN32
+# include <netdb.h>
+# include <netinet/in.h>
+# include <sched.h>
+# include <sys/socket.h>
+# include <arpa/inet.h>
+#endif
+#include <glib.h>
+#include <pgm/pgm.h>
+#ifdef CONFIG_WITH_HTTP
+# include <pgm/http.h>
+#endif
+#ifdef CONFIG_WITH_SNMP
+# include <pgm/snmp.h>
+#endif
+
+/* PGM internal time keeper */
+typedef pgm_time_t (*pgm_time_update_func)(void);
+extern pgm_time_update_func pgm_time_update_now;
+extern "C" {
+ size_t pgm_pkt_offset (bool, sa_family_t);
+}
+
+/* example dependencies */
+#include <pgm/backtrace.h>
+#include <pgm/log.h>
+#include <pgm/signal.h>
+
+
+using namespace std;
+
+
+/* globals */
+
+static int g_port = 0;
+static const char* g_network = "";
+static int g_udp_encap_port = 0;
+
+static int g_odata_rate = 0;
+static int g_odata_interval = 0;
+static guint32 g_payload = 0;
+static int g_max_tpdu = 1500;
+static int g_max_rte = 16*1000*1000;
+static int g_sqns = 200;
+
+static gboolean g_use_pgmcc = FALSE;
+static sa_family_t g_pgmcc_family = 0; /* 0 = disabled */
+
+static gboolean g_use_fec = FALSE;
+static int g_rs_k = 8;
+static int g_rs_n = 255;
+
+static enum {
+ PGMPING_MODE_SOURCE,
+ PGMPING_MODE_RECEIVER,
+ PGMPING_MODE_INITIATOR,
+ PGMPING_MODE_REFLECTOR
+} g_mode = PGMPING_MODE_INITIATOR;
+
+static pgm_sock_t* g_sock = NULL;
+
+/* stats */
+static guint64 g_msg_sent = 0;
+static guint64 g_msg_received = 0;
+static pgm_time_t g_interval_start = 0;
+static pgm_time_t g_latency_current = 0;
+static guint64 g_latency_seqno = 0;
+static guint64 g_last_seqno = 0;
+static double g_latency_total = 0.0;
+static double g_latency_square_total = 0.0;
+static guint64 g_latency_count = 0;
+static double g_latency_max = 0.0;
+#ifdef INFINITY
+static double g_latency_min = INFINITY;
+#else
+static double g_latency_min = INT64_MAX;
+#endif
+static double g_latency_running_average = 0.0;
+static guint64 g_out_total = 0;
+static guint64 g_in_total = 0;
+
+static GMainLoop* g_loop = NULL;
+static GThread* g_sender_thread = NULL;
+static GThread* g_receiver_thread = NULL;
+static gboolean g_quit;
+#ifdef G_OS_UNIX
+static int g_quit_pipe[2];
+static void on_signal (int, gpointer);
+#else
+static HANDLE g_quit_event;
+static BOOL on_console_ctrl (DWORD);
+#endif
+
+static gboolean on_startup (gpointer);
+static gboolean on_shutdown (gpointer);
+static gboolean on_mark (gpointer);
+
+static void send_odata (void);
+static int on_msgv (struct pgm_msgv_t*, size_t);
+
+static gpointer sender_thread (gpointer);
+static gpointer receiver_thread (gpointer);
+
+
+G_GNUC_NORETURN static void
+usage (const char* bin)
+{
+ fprintf (stderr, "Usage: %s [options]\n", bin);
+ fprintf (stderr, " -n <network> : Multicast group or unicast IP address\n");
+ fprintf (stderr, " -s <port> : IP port\n");
+ fprintf (stderr, " -p <port> : Encapsulate PGM in UDP on IP port\n");
+ fprintf (stderr, " -d <seconds> : Terminate transport after duration.\n");
+ fprintf (stderr, " -m <frequency> : Number of message to send per second\n");
+ fprintf (stderr, " -o : Send-only mode (default send & receive mode)\n");
+ fprintf (stderr, " -l : Listen-only mode\n");
+ fprintf (stderr, " -e : Relect mode\n");
+ fprintf (stderr, " -r <rate> : Regulate to rate bytes per second\n");
+ fprintf (stderr, " -c : Enable PGMCC\n");
+ fprintf (stderr, " -f <type> : Enable FEC with either proactive or ondemand parity\n");
+ fprintf (stderr, " -K <k> : Configure Reed-Solomon code (n, k)\n");
+ fprintf (stderr, " -N <n>\n");
+ fprintf (stderr, " -H : Enable HTTP administrative interface\n");
+ fprintf (stderr, " -S : Enable SNMP interface\n");
+ exit (1);
+}
+
+int
+main (
+ int argc,
+ char *argv[]
+ )
+{
+ GError* err = NULL;
+ pgm_error_t* pgm_err = NULL;
+ gboolean enable_http = FALSE;
+ gboolean enable_snmpx = FALSE;
+ int timeout = 0;
+
+ GOOGLE_PROTOBUF_VERIFY_VERSION;
+
+ setlocale (LC_ALL, "");
+ setenv ("PGM_TIMER", "GTOD", 1);
+ setenv ("PGM_SLEEP", "USLEEP", 1);
+
+ log_init ();
+ g_message ("pgmping");
+
+ g_thread_init (NULL);
+
+ if (!pgm_init (&pgm_err)) {
+ g_error ("Unable to start PGM engine: %s", pgm_err->message);
+ pgm_error_free (pgm_err);
+ return EXIT_FAILURE;
+ }
+
+/* parse program arguments */
+ const char* binary_name = g_get_prgname();
+ int c;
+ while ((c = getopt (argc, argv, "s:n:p:m:old:r:cfeK:N:HSh")) != -1)
+ {
+ switch (c) {
+ case 'n': g_network = optarg; break;
+ case 's': g_port = atoi (optarg); break;
+ case 'p': g_udp_encap_port = atoi (optarg); break;
+ case 'r': g_max_rte = atoi (optarg); break;
+
+ case 'c': g_use_pgmcc = TRUE; break;
+
+ case 'f': g_use_fec = TRUE; break;
+ case 'K': g_rs_k = atoi (optarg); break;
+ case 'N': g_rs_n = atoi (optarg); break;
+
+ case 'H': enable_http = TRUE; break;
+ case 'S': enable_snmpx = TRUE; break;
+
+ case 'm': g_odata_rate = atoi (optarg);
+ g_odata_interval = (1000 * 1000) / g_odata_rate; break;
+ case 'd': timeout = 1000 * atoi (optarg); break;
+
+ case 'o': g_mode = PGMPING_MODE_SOURCE; break;
+ case 'l': g_mode = PGMPING_MODE_RECEIVER; break;
+ case 'e': g_mode = PGMPING_MODE_REFLECTOR; break;
+
+ case 'h':
+ case '?': usage (binary_name);
+ }
+ }
+
+ if (g_use_fec && ( !g_rs_k || !g_rs_n )) {
+ g_error ("Invalid Reed-Solomon parameters.");
+ usage (binary_name);
+ }
+
+#ifdef CONFIG_WITH_HTTP
+ if (enable_http) {
+ if (!pgm_http_init (PGM_HTTP_DEFAULT_SERVER_PORT, &pgm_err)) {
+ g_error ("Unable to start HTTP interface: %s", pgm_err->message);
+ pgm_error_free (pgm_err);
+ pgm_shutdown ();
+ return EXIT_FAILURE;
+ }
+ }
+#endif
+#ifdef CONFIG_WITH_SNMP
+ if (enable_snmpx) {
+ if (!pgm_snmp_init (&pgm_err)) {
+ g_error ("Unable to start SNMP interface: %s", pgm_err->message);
+ pgm_error_free (pgm_err);
+#ifdef CONFIG_WITH_HTTP
+ if (enable_http)
+ pgm_http_shutdown ();
+#endif
+ pgm_shutdown ();
+ return EXIT_FAILURE;
+ }
+ }
+#endif
+
+ g_loop = g_main_loop_new (NULL, FALSE);
+
+ g_quit = FALSE;
+
+/* setup signal handlers */
+ signal (SIGSEGV, on_sigsegv);
+#ifdef SIGHUP
+ signal (SIGHUP, SIG_IGN);
+#endif
+#ifdef G_OS_UNIX
+ pipe (g_quit_pipe);
+ pgm_signal_install (SIGINT, on_signal, g_loop);
+ pgm_signal_install (SIGTERM, on_signal, g_loop);
+#else
+ g_quit_event = CreateEvent (NULL, TRUE, FALSE, TEXT("QuitEvent"));
+ SetConsoleCtrlHandler ((PHANDLER_ROUTINE)on_console_ctrl, TRUE);
+#endif /* !G_OS_UNIX */
+
+/* delayed startup */
+ g_message ("scheduling startup.");
+ g_timeout_add (0, (GSourceFunc)on_startup, g_loop);
+
+ if (timeout) {
+ g_message ("scheduling shutdown.");
+ g_timeout_add (timeout, (GSourceFunc)on_shutdown, g_loop);
+ }
+
+/* dispatch loop */
+ g_message ("entering main event loop ... ");
+ g_main_loop_run (g_loop);
+
+ g_message ("event loop terminated, cleaning up.");
+
+/* cleanup */
+ g_quit = TRUE;
+#ifdef G_OS_UNIX
+ const char one = '1';
+ write (g_quit_pipe[1], &one, sizeof(one));
+ if (PGMPING_MODE_SOURCE == g_mode || PGMPING_MODE_INITIATOR == g_mode)
+ g_thread_join (g_sender_thread);
+ g_thread_join (g_receiver_thread);
+ close (g_quit_pipe[0]);
+ close (g_quit_pipe[1]);
+#else
+ SetEvent (g_quit_event);
+ if (PGMPING_MODE_SOURCE == g_mode || PGMPING_MODE_INITIATOR == g_mode)
+ g_thread_join (g_sender_thread);
+ g_thread_join (g_receiver_thread);
+ CloseHandle (g_quit_event);
+#endif
+
+ g_main_loop_unref (g_loop);
+ g_loop = NULL;
+
+ if (g_sock) {
+ g_message ("closing PGM socket.");
+ pgm_close (g_sock, TRUE);
+ g_sock = NULL;
+ }
+
+#ifdef CONFIG_WITH_HTTP
+ if (enable_http)
+ pgm_http_shutdown();
+#endif
+#ifdef CONFIG_WITH_SNMP
+ if (enable_snmpx)
+ pgm_snmp_shutdown();
+#endif
+
+ google::protobuf::ShutdownProtobufLibrary();
+
+ g_message ("PGM engine shutdown.");
+ pgm_shutdown ();
+ g_message ("finished.");
+ return EXIT_SUCCESS;
+}
+
+#ifdef G_OS_UNIX
+static
+void
+on_signal (
+ int signum,
+ gpointer user_data
+ )
+{
+ GMainLoop* loop = (GMainLoop*)user_data;
+ g_message ("on_signal (signum:%d user-data:%p)",
+ signum, user_data);
+ g_main_loop_quit (loop);
+}
+#else
+static
+BOOL
+on_console_ctrl (
+ DWORD dwCtrlType
+ )
+{
+ g_message ("on_console_ctrl (dwCtrlType:%lu)", (unsigned long)dwCtrlType);
+ g_main_loop_quit (g_loop);
+ return TRUE;
+}
+#endif /* !G_OS_UNIX */
+
+static
+gboolean
+on_shutdown (
+ gpointer user_data
+ )
+{
+ GMainLoop* loop = (GMainLoop*)user_data;
+ g_message ("on_shutdown (user-data:%p)", user_data);
+ g_main_loop_quit (loop);
+ return FALSE;
+}
+
+static
+gboolean
+on_startup (
+ gpointer user_data
+ )
+{
+ GMainLoop* loop = (GMainLoop*)user_data;
+ struct pgm_addrinfo_t* res = NULL;
+ GError* err = NULL;
+ pgm_error_t* pgm_err = NULL;
+ sa_family_t sa_family = AF_UNSPEC;
+
+ g_message ("startup.");
+
+/* parse network parameter into transport address structure */
+ if (!pgm_getaddrinfo (g_network, NULL, &res, &pgm_err)) {
+ g_error ("parsing network parameter: %s", pgm_err->message);
+ goto err_abort;
+ }
+
+ sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
+ if (g_use_pgmcc)
+ g_pgmcc_family = sa_family;
+
+ if (g_udp_encap_port) {
+ g_message ("create PGM/UDP socket.");
+ if (!pgm_socket (&g_sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP, &pgm_err)) {
+ g_error ("socket: %s", pgm_err->message);
+ goto err_abort;
+ }
+ pgm_setsockopt (g_sock, PGM_UDP_ENCAP_UCAST_PORT, &g_udp_encap_port, sizeof(g_udp_encap_port));
+ pgm_setsockopt (g_sock, PGM_UDP_ENCAP_MCAST_PORT, &g_udp_encap_port, sizeof(g_udp_encap_port));
+ } else {
+ g_message ("create PGM/IP socket.");
+ if (!pgm_socket (&g_sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM, &pgm_err)) {
+ g_error ("socket: %s", pgm_err->message);
+ goto err_abort;
+ }
+ }
+
+/* Use RFC 2113 tagging for PGM Router Assist */
+ {
+ const int no_router_assist = 0;
+ pgm_setsockopt (g_sock, PGM_IP_ROUTER_ALERT, &no_router_assist, sizeof(no_router_assist));
+ }
+
+ pgm_drop_superuser();
+
+/* set PGM parameters */
+ if (PGMPING_MODE_SOURCE == g_mode ||
+ PGMPING_MODE_INITIATOR == g_mode ||
+ PGMPING_MODE_REFLECTOR == g_mode)
+ {
+ const int send_only = PGMPING_MODE_SOURCE == g_mode ? 1 : 0,
+ ambient_spm = pgm_secs (30),
+ heartbeat_spm[] = { pgm_msecs (100),
+ pgm_msecs (100),
+ pgm_msecs (100),
+ pgm_msecs (100),
+ pgm_msecs (1300),
+ pgm_secs (7),
+ pgm_secs (16),
+ pgm_secs (25),
+ pgm_secs (30) };
+
+ pgm_setsockopt (g_sock, PGM_SEND_ONLY, &send_only, sizeof(send_only));
+ pgm_setsockopt (g_sock, PGM_MTU, &g_max_tpdu, sizeof(g_max_tpdu));
+ pgm_setsockopt (g_sock, PGM_TXW_SQNS, &g_sqns, sizeof(g_sqns));
+ pgm_setsockopt (g_sock, PGM_TXW_MAX_RTE, &g_max_rte, sizeof(g_max_rte));
+ pgm_setsockopt (g_sock, PGM_AMBIENT_SPM, &ambient_spm, sizeof(ambient_spm));
+ pgm_setsockopt (g_sock, PGM_HEARTBEAT_SPM, &heartbeat_spm, sizeof(heartbeat_spm));
+ }
+ if (PGMPING_MODE_RECEIVER == g_mode ||
+ PGMPING_MODE_INITIATOR == g_mode ||
+ PGMPING_MODE_REFLECTOR == g_mode)
+ {
+ const int recv_only = PGMPING_MODE_RECEIVER == g_mode ? 1 : 0,
+ passive = 0,
+ peer_expiry = pgm_secs (300),
+ spmr_expiry = pgm_msecs (250),
+ nak_bo_ivl = pgm_msecs (50),
+ nak_rpt_ivl = pgm_msecs (200), //pgm_secs (2),
+ nak_rdata_ivl = pgm_msecs (200), //pgm_secs (2),
+ nak_data_retries = 50,
+ nak_ncf_retries = 50;
+
+ pgm_setsockopt (g_sock, PGM_RECV_ONLY, &recv_only, sizeof(recv_only));
+ pgm_setsockopt (g_sock, PGM_PASSIVE, &passive, sizeof(passive));
+ pgm_setsockopt (g_sock, PGM_MTU, &g_max_tpdu, sizeof(g_max_tpdu));
+ pgm_setsockopt (g_sock, PGM_RXW_SQNS, &g_sqns, sizeof(g_sqns));
+ pgm_setsockopt (g_sock, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry));
+ pgm_setsockopt (g_sock, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry));
+ pgm_setsockopt (g_sock, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl));
+ pgm_setsockopt (g_sock, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl));
+ pgm_setsockopt (g_sock, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl));
+ pgm_setsockopt (g_sock, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries));
+ pgm_setsockopt (g_sock, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries));
+ }
+
+#ifdef I_UNDERSTAND_PGMCC_AND_FEC_ARE_NOT_SUPPORTED
+/* PGMCC congestion control */
+ if (g_use_pgmcc) {
+ struct pgm_pgmccinfo_t pgmccinfo;
+ pgmccinfo.ack_bo_ivl = pgm_msecs (50);
+ pgmccinfo.ack_c = 75;
+ pgmccinfo.ack_c_p = 500;
+ pgm_setsockopt (g_sock, PGM_USE_PGMCC, &pgmccinfo, sizeof(pgmccinfo));
+ }
+
+/* Reed Solomon forward error correction */
+ if (g_use_fec) {
+ struct pgm_fecinfo_t fecinfo;
+ fecinfo.block_size = g_rs_n;
+ fecinfo.proactive_packets = 0;
+ fecinfo.group_size = g_rs_k;
+ fecinfo.ondemand_parity_enabled = TRUE;
+ fecinfo.var_pktlen_enabled = TRUE;
+ pgm_setsockopt (g_sock, PGM_USE_FEC, &fecinfo, sizeof(fecinfo));
+ }
+#endif
+
+/* create global session identifier */
+ struct pgm_sockaddr_t addr;
+ memset (&addr, 0, sizeof(addr));
+ addr.sa_port = g_port ? g_port : DEFAULT_DATA_DESTINATION_PORT;
+ addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
+ if (!pgm_gsi_create_from_hostname (&addr.sa_addr.gsi, &pgm_err)) {
+ g_error ("creating GSI: %s", pgm_err->message);
+ goto err_abort;
+ }
+
+/* assign socket to specified address */
+ struct pgm_interface_req_t if_req;
+ memset (&if_req, 0, sizeof(if_req));
+ if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface;
+ if_req.ir_scope_id = 0;
+ if (AF_INET6 == sa_family) {
+ struct sockaddr_in6 sa6;
+ memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof(sa6));
+ if_req.ir_scope_id = sa6.sin6_scope_id;
+ }
+ if (!pgm_bind3 (g_sock,
+ &addr, sizeof(addr),
+ &if_req, sizeof(if_req), /* tx interface */
+ &if_req, sizeof(if_req), /* rx interface */
+ &pgm_err))
+ {
+ g_error ("binding PGM socket: %s", pgm_err->message);
+ goto err_abort;
+ }
+
+/* join IP multicast groups */
+ for (unsigned i = 0; i < res->ai_recv_addrs_len; i++)
+ pgm_setsockopt (g_sock, PGM_JOIN_GROUP, &res->ai_recv_addrs[i], sizeof(struct group_req));
+ pgm_setsockopt (g_sock, PGM_SEND_GROUP, &res->ai_send_addrs[0], sizeof(struct group_req));
+ pgm_freeaddrinfo (res);
+
+/* set IP parameters */
+ {
+ const int nonblocking = 1,
+ multicast_loop = 0,
+ multicast_hops = 16,
+ dscp = 0x2e << 2; /* Expedited Forwarding PHB for network elements, no ECN. */
+
+ pgm_setsockopt (g_sock, PGM_MULTICAST_LOOP, &multicast_loop, sizeof(multicast_loop));
+ pgm_setsockopt (g_sock, PGM_MULTICAST_HOPS, &multicast_hops, sizeof(multicast_hops));
+ if (AF_INET6 != sa_family)
+ pgm_setsockopt (g_sock, PGM_TOS, &dscp, sizeof(dscp));
+ pgm_setsockopt (g_sock, PGM_NOBLOCK, &nonblocking, sizeof(nonblocking));
+ }
+
+ if (!pgm_connect (g_sock, &pgm_err)) {
+ g_error ("connecting PGM socket: %s", pgm_err->message);
+ goto err_abort;
+ }
+
+/* period timer to indicate some form of life */
+// TODO: Gnome 2.14: replace with g_timeout_add_seconds()
+ g_timeout_add (2 * 1000, (GSourceFunc)on_mark, NULL);
+
+ if (PGMPING_MODE_SOURCE == g_mode || PGMPING_MODE_INITIATOR == g_mode)
+ {
+ g_sender_thread = g_thread_create_full (sender_thread,
+ g_sock,
+ 0,
+ TRUE,
+ TRUE,
+ G_THREAD_PRIORITY_NORMAL,
+ &err);
+ if (!g_sender_thread) {
+ g_critical ("g_thread_create_full failed errno %i: \"%s\"", err->code, err->message);
+ goto err_abort;
+ }
+ }
+
+ {
+ g_receiver_thread = g_thread_create_full (receiver_thread,
+ g_sock,
+ 0,
+ TRUE,
+ TRUE,
+ G_THREAD_PRIORITY_HIGH,
+ &err);
+ if (!g_receiver_thread) {
+ g_critical ("g_thread_create_full failed errno %i: \"%s\"", err->code, err->message);
+ goto err_abort;
+ }
+ }
+
+ g_message ("startup complete.");
+ return FALSE;
+
+err_abort:
+ if (NULL != g_sock) {
+ pgm_close (g_sock, FALSE);
+ g_sock = NULL;
+ }
+ if (NULL != res) {
+ pgm_freeaddrinfo (res);
+ res = NULL;
+ }
+ if (NULL != pgm_err) {
+ pgm_error_free (pgm_err);
+ pgm_err = NULL;
+ }
+ return FALSE;
+}
+
+static
+gpointer
+sender_thread (
+ gpointer user_data
+ )
+{
+ pgm_sock_t* tx_sock = (pgm_sock_t*)user_data;
+ example::Ping ping;
+ string subject("PING.PGM.TEST.");
+ char hostname[NI_MAXHOST + 1];
+ const long payload_len = 1000;
+ char payload[payload_len];
+ gpointer buffer = NULL;
+ guint64 latency, now, last;
+
+#ifdef CONFIG_HAVE_EPOLL
+ const long ev_len = 1;
+ struct epoll_event events[ev_len];
+
+ int efd_again = epoll_create (IP_MAX_MEMBERSHIPS);
+ if (efd_again < 0) {
+ g_error ("epoll_create failed errno %i: \"%s\"", errno, strerror(errno));
+ g_main_loop_quit (g_loop);
+ return NULL;
+ }
+/* Add write event to epoll domain in order to re-enable as required by return
+ * value. We use one-shot flag to disable ASAP, as we don't want such events
+ * until triggered.
+ */
+ if (pgm_epoll_ctl (tx_sock, efd_again, EPOLL_CTL_ADD, EPOLLOUT | EPOLLONESHOT) < 0) {
+ g_error ("pgm_epoll_ctl failed errno %i: \"%s\"", errno, strerror(errno));
+ g_main_loop_quit (g_loop);
+ return NULL;
+ }
+ struct epoll_event event;
+ memset (&event, 0, sizeof(event));
+ event.events = EPOLLIN;
+ event.data.fd = g_quit_pipe[0];
+ if (epoll_ctl (efd_again, EPOLL_CTL_ADD, g_quit_pipe[0], &event) < 0) {
+ g_error ("epoll_ctl failed errno %i: \"%s\"", errno, strerror(errno));
+ g_main_loop_quit (g_loop);
+ return NULL;
+ }
+#elif defined(CONFIG_HAVE_POLL)
+ int n_fds = 2;
+ struct pollfd fds[ 2 + 1 ];
+#endif /* !CONFIG_HAVE_EPOLL */
+
+ gethostname (hostname, sizeof(hostname));
+ subject.append(hostname);
+ memset (payload, 0, sizeof(payload));
+
+ ping.mutable_subscription_header()->set_subject (subject);
+ ping.mutable_market_data_header()->set_msg_type (example::MarketDataHeader::MSG_VERIFY);
+ ping.mutable_market_data_header()->set_rec_type (example::MarketDataHeader::PING);
+ ping.mutable_market_data_header()->set_rec_status (example::MarketDataHeader::STATUS_OK);
+ ping.set_time (last);
+
+ last = now = pgm_time_update_now();
+ do {
+ if (g_msg_sent && g_latency_seqno + 1 == g_msg_sent)
+ latency = g_latency_current;
+ else
+ latency = g_odata_interval;
+
+ ping.set_seqno (g_msg_sent);
+ ping.set_latency (latency);
+ ping.set_payload (payload, sizeof(payload));
+
+ const size_t header_size = pgm_pkt_offset (FALSE, g_pgmcc_family);
+ const size_t apdu_size = ping.ByteSize();
+ struct pgm_sk_buff_t* skb = pgm_alloc_skb (g_max_tpdu);
+ pgm_skb_reserve (skb, header_size);
+ pgm_skb_put (skb, apdu_size);
+
+/* wait on packet rate limit */
+ if ((last + g_odata_interval) > now) {
+#ifndef _WIN32
+ const unsigned int usec = g_odata_interval - (now - last);
+ usleep (usec);
+#else
+ const DWORD msec = usecs_to_msecs (g_odata_interval - (now - last));
+ Sleep (msec);
+#endif
+ now = pgm_time_update_now();
+ }
+ last += g_odata_interval;
+ ping.set_time (now);
+ ping.SerializeToArray (skb->data, skb->len);
+
+ struct timeval tv;
+ int timeout;
+ size_t bytes_written;
+ int status;
+again:
+ status = pgm_send_skbv (tx_sock, &skb, 1, TRUE, &bytes_written);
+ switch (status) {
+/* rate control */
+ case PGM_IO_STATUS_RATE_LIMITED:
+ {
+ socklen_t optlen = sizeof (tv);
+ pgm_getsockopt (tx_sock, PGM_RATE_REMAIN, &tv, &optlen);
+ timeout = (tv.tv_sec * 1000) + ((tv.tv_usec + 500) / 1000);
+/* busy wait under 2ms */
+ if (timeout < 2) timeout = 0;
+#ifdef CONFIG_HAVE_EPOLL
+ const int ready = epoll_wait (efd_again, events, G_N_ELEMENTS(events), timeout /* ms */);
+#elif defined(CONFIG_HAVE_POLL)
+ memset (fds, 0, sizeof(fds));
+ fds[0].fd = g_quit_pipe[0];
+ fds[0].events = POLLIN;
+ pgm_poll_info (tx_sock, &fds[1], &n_fds, POLLIN);
+ poll (fds, 1 + n_fds, timeout /* ms */);
+#endif /* !CONFIG_HAVE_EPOLL */
+ if (G_UNLIKELY(g_quit))
+ break;
+ goto again;
+ }
+/* congestion control */
+ case PGM_IO_STATUS_CONGESTION:
+/* kernel feedback */
+ case PGM_IO_STATUS_WOULD_BLOCK:
+ {
+#ifdef CONFIG_HAVE_EPOLL
+/* re-enable write event for one-shot */
+ if (pgm_epoll_ctl (tx_sock, efd_again, EPOLL_CTL_MOD, EPOLLOUT | EPOLLONESHOT) < 0)
+ {
+ g_error ("pgm_epoll_ctl failed errno %i: \"%s\"", errno, strerror(errno));
+ g_main_loop_quit (g_loop);
+ return NULL;
+ }
+ const int ready = epoll_wait (efd_again, events, G_N_ELEMENTS(events), -1 /* ms */);
+ if (G_UNLIKELY(g_quit))
+ break;
+#elif defined(CONFIG_HAVE_POLL)
+ memset (fds, 0, sizeof(fds));
+ fds[0].fd = g_quit_pipe[0];
+ fds[0].events = POLLIN;
+ pgm_poll_info (g_sock, &fds[1], &n_fds, POLLOUT);
+ poll (fds, 1 + n_fds, -1 /* ms */);
+#endif /* !CONFIG_HAVE_EPOLL */
+ goto again;
+ }
+/* successful delivery */
+ case PGM_IO_STATUS_NORMAL:
+// g_message ("sent payload: %s", ping.DebugString().c_str());
+// g_message ("sent %u bytes", (unsigned)bytes_written);
+ break;
+ default:
+ g_warning ("pgm_send_skbv failed, status:%i", status);
+ g_main_loop_quit (g_loop);
+ return NULL;
+ }
+ g_out_total += bytes_written;
+ g_msg_sent++;
+ } while (!g_quit);
+
+#ifdef CONFIG_HAVE_EPOLL
+ close (efd_again);
+#endif
+ return NULL;
+}
+
+static
+gpointer
+receiver_thread (
+ gpointer data
+ )
+{
+ pgm_sock_t* rx_sock = (pgm_sock_t*)data;
+ const long iov_len = 20;
+ struct pgm_msgv_t msgv[iov_len];
+ pgm_time_t lost_tstamp = 0;
+ pgm_tsi_t lost_tsi;
+ guint32 lost_count = 0;
+
+#ifdef CONFIG_HAVE_EPOLL
+ const long ev_len = 1;
+ struct epoll_event events[ev_len];
+
+ int efd = epoll_create (IP_MAX_MEMBERSHIPS);
+ if (efd < 0) {
+ g_error ("epoll_create failed errno %i: \"%s\"", errno, strerror(errno));
+ g_main_loop_quit (g_loop);
+ return NULL;
+ }
+ if (pgm_epoll_ctl (rx_sock, efd, EPOLL_CTL_ADD, EPOLLIN) < 0) {
+ g_error ("pgm_epoll_ctl failed errno %i: \"%s\"", errno, strerror(errno));
+ g_main_loop_quit (g_loop);
+ return NULL;
+ }
+ struct epoll_event event;
+ memset (&event, 0, sizeof(event));
+ event.events = EPOLLIN;
+ if (epoll_ctl (efd, EPOLL_CTL_ADD, g_quit_pipe[0], &event) < 0) {
+ g_error ("epoll_ctl failed errno %i: \"%s\"", errno, strerror(errno));
+ g_main_loop_quit (g_loop);
+ return NULL;
+ }
+#elif defined(CONFIG_HAVE_POLL)
+ int n_fds = 3;
+ struct pollfd fds[ 3 + 1 ];
+#endif /* !CONFIG_HAVE_EPOLL */
+
+ memset (&lost_tsi, 0, sizeof(lost_tsi));
+
+ do {
+ struct timeval tv;
+ int timeout;
+ size_t len;
+ pgm_error_t* pgm_err = NULL;
+ const int status = pgm_recvmsgv (rx_sock,
+ msgv,
+ G_N_ELEMENTS(msgv),
+ MSG_ERRQUEUE,
+ &len,
+ &pgm_err);
+ if (lost_count) {
+ pgm_time_t elapsed = pgm_time_update_now() - lost_tstamp;
+ if (elapsed >= pgm_secs(1)) {
+ g_warning ("pgm data lost %" G_GUINT32_FORMAT " packets detected from %s",
+ lost_count, pgm_tsi_print (&lost_tsi));
+ lost_count = 0;
+ }
+ }
+
+ switch (status) {
+ case PGM_IO_STATUS_NORMAL:
+// g_message ("recv %u bytes", (unsigned)len);
+ on_msgv (msgv, len);
+ break;
+ case PGM_IO_STATUS_TIMER_PENDING:
+ {
+ socklen_t optlen = sizeof (tv);
+ pgm_getsockopt (g_sock, PGM_TIME_REMAIN, &tv, &optlen);
+ }
+ goto block;
+ case PGM_IO_STATUS_RATE_LIMITED:
+ {
+ socklen_t optlen = sizeof (tv);
+ pgm_getsockopt (g_sock, PGM_RATE_REMAIN, &tv, &optlen);
+ }
+/* fall through */
+ case PGM_IO_STATUS_WOULD_BLOCK:
+block:
+ timeout = PGM_IO_STATUS_WOULD_BLOCK == status ? -1 : ((tv.tv_sec * 1000) + (tv.tv_usec / 1000));
+/* busy wait under 2ms */
+ if (timeout > 0 && timeout < 2) timeout = 0;
+#ifdef CONFIG_HAVE_EPOLL
+ epoll_wait (efd, events, G_N_ELEMENTS(events), timeout /* ms */);
+#elif defined(CONFIG_HAVE_POLL)
+ memset (fds, 0, sizeof(fds));
+ fds[0].fd = g_quit_pipe[0];
+ fds[0].events = POLLIN;
+ pgm_transport_poll_info (g_transport, &fds[1], &n_fds, POLLIN);
+ poll (fds, 1 + n_fds, timeout /* ms */);
+#endif /* !CONFIG_HAVE_EPOLL */
+ break;
+ case PGM_IO_STATUS_RESET:
+ {
+ struct pgm_sk_buff_t* skb = msgv[0].msgv_skb[0];
+ lost_tstamp = skb->tstamp;
+ if (pgm_tsi_equal (&skb->tsi, &lost_tsi))
+ lost_count += skb->sequence;
+ else {
+ lost_count = skb->sequence;
+ memcpy (&lost_tsi, &skb->tsi, sizeof(pgm_tsi_t));
+ }
+ pgm_free_skb (skb);
+ break;
+ }
+ default:
+ if (pgm_err) {
+ g_warning ("%s", pgm_err->message);
+ pgm_error_free (pgm_err);
+ pgm_err = NULL;
+ }
+ break;
+ }
+ } while (!g_quit);
+
+#ifdef CONFIG_HAVE_EPOLL
+ close (efd);
+#endif
+ return NULL;
+}
+
+static
+int
+on_msgv (
+ struct pgm_msgv_t* msgv, /* an array of msgvs */
+ size_t len
+ )
+{
+ example::Ping ping;
+ guint i = 0;
+ static pgm_time_t last_time = pgm_time_update_now();
+
+ while (len)
+ {
+ const struct pgm_sk_buff_t* pskb = msgv[i].msgv_skb[0];
+ gsize apdu_len = 0;
+ for (unsigned j = 0; j < msgv[i].msgv_len; j++)
+ apdu_len += msgv[i].msgv_skb[j]->len;
+
+ if (PGMPING_MODE_REFLECTOR == g_mode)
+ {
+ int status;
+again:
+ status = pgm_send (g_sock, pskb->data, pskb->len, NULL);
+ switch (status) {
+ case PGM_IO_STATUS_RATE_LIMITED:
+ case PGM_IO_STATUS_CONGESTION:
+ case PGM_IO_STATUS_WOULD_BLOCK:
+/* busy wait always as reflector */
+ goto again;
+
+ case PGM_IO_STATUS_NORMAL:
+ break;
+
+ default:
+ g_warning ("pgm_send_skbv failed");
+ g_main_loop_quit (g_loop);
+ return 0;
+ }
+ goto next_msg;
+ }
+
+/* only parse first fragment of each apdu */
+ if (!ping.ParseFromArray (pskb->data, pskb->len))
+ goto next_msg;
+// g_message ("payload: %s", ping.DebugString().c_str());
+
+ {
+ const pgm_time_t send_time = ping.time();
+ const pgm_time_t recv_time = pskb->tstamp;
+ const guint64 seqno = ping.seqno();
+ const guint64 latency = ping.latency();
+
+ if (seqno < g_latency_seqno) {
+ g_message ("seqno replay?");
+ goto next_msg;
+ }
+
+ g_in_total += pskb->len;
+ g_msg_received++;
+
+/* handle ping */
+ const pgm_time_t now = pgm_time_update_now();
+ if (send_time > now)
+ g_warning ("send time %" PGM_TIME_FORMAT " newer than now %" PGM_TIME_FORMAT,
+ send_time, now);
+ if (recv_time > now)
+ g_warning ("recv time %" PGM_TIME_FORMAT " newer than now %" PGM_TIME_FORMAT,
+ recv_time, now);
+ if (send_time >= recv_time){
+ g_message ("timer mismatch, send time = recv time + %.3f ms (last time + %.3f ms)",
+ pgm_to_msecsf(send_time - recv_time),
+ pgm_to_msecsf(last_time - send_time));
+ goto next_msg;
+ }
+ g_latency_current = pgm_to_secs(recv_time - send_time);
+ g_latency_seqno = seqno;
+
+ const double elapsed = pgm_to_usecsf (recv_time - send_time);
+ g_latency_total += elapsed;
+ g_latency_square_total += elapsed * elapsed;
+
+ if (elapsed > g_latency_max)
+ g_latency_max = elapsed;
+ if (elapsed < g_latency_min)
+ g_latency_min = elapsed;
+
+ g_latency_running_average += elapsed;
+ g_latency_count++;
+ last_time = recv_time;
+ }
+
+/* move onto next apdu */
+next_msg:
+ i++;
+ len -= apdu_len;
+ }
+
+ return 0;
+}
+
+/* idle log notification
+ */
+
+static
+gboolean
+on_mark (
+ G_GNUC_UNUSED gpointer data
+ )
+{
+ const pgm_time_t now = pgm_time_update_now ();
+ const double interval = pgm_to_secsf(now - g_interval_start);
+ g_interval_start = now;
+
+/* receiving a ping */
+ if (g_latency_count)
+ {
+ const double average = g_latency_total / g_latency_count;
+ const double variance = g_latency_square_total / g_latency_count
+ - average * average;
+ const double standard_deviation = sqrt (variance);
+
+ if (g_latency_count < 10)
+ {
+ if (average < 1000.0)
+ g_message ("seqno=%" G_GUINT64_FORMAT " time=%.01f us",
+ g_latency_seqno, average);
+ else
+ g_message ("seqno=%" G_GUINT64_FORMAT " time=%.01f ms",
+ g_latency_seqno, average / 1000);
+ }
+ else
+ {
+ double seq_rate = (g_latency_seqno - g_last_seqno) / interval;
+ double out_rate = g_out_total * 8.0 / 1000000.0 / interval;
+ double in_rate = g_in_total * 8.0 / 1000000.0 / interval;
+ if (g_latency_min < 1000.0)
+ g_message ("s=%.01f avg=%.01f min=%.01f max=%.01f stddev=%0.1f us o=%.2f i=%.2f mbit",
+ seq_rate, average, g_latency_min, g_latency_max, standard_deviation, out_rate, in_rate);
+ else
+ g_message ("s=%.01f avg=%.01f min=%.01f max=%.01f stddev=%0.1f ms o=%.2f i=%.2f mbit",
+ seq_rate, average / 1000, g_latency_min / 1000, g_latency_max / 1000, standard_deviation / 1000, out_rate, in_rate);
+ }
+
+/* reset interval counters */
+ g_latency_total = 0.0;
+ g_latency_square_total = 0.0;
+ g_latency_count = 0;
+ g_last_seqno = g_latency_seqno;
+#ifdef INFINITY
+ g_latency_min = INFINITY;
+#else
+ g_latency_min = INT64_MAX;
+#endif
+ g_latency_max = 0.0;
+ g_out_total = 0;
+ g_in_total = 0;
+ }
+
+ return TRUE;
+}
+
+/* eof */