diff options
Diffstat (limited to '3rdparty/openpgm-svn-r1085/pgm/examples/pgmping.cc')
-rw-r--r-- | 3rdparty/openpgm-svn-r1085/pgm/examples/pgmping.cc | 1059 |
1 files changed, 1059 insertions, 0 deletions
diff --git a/3rdparty/openpgm-svn-r1085/pgm/examples/pgmping.cc b/3rdparty/openpgm-svn-r1085/pgm/examples/pgmping.cc new file mode 100644 index 0000000..38ac560 --- /dev/null +++ b/3rdparty/openpgm-svn-r1085/pgm/examples/pgmping.cc @@ -0,0 +1,1059 @@ +/* vim:ts=8:sts=8:sw=4:noai:noexpandtab + * + * Simple send/reply ping tool using the PGM transport. + * + * With no arguments, one message is sent per second. + * + * Copyright (c) 2006-2010 Miru Limited. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +/* c99 compatibility for c++ */ +#define __STDC_LIMIT_MACROS + +/* Must be first for Sun */ +#include "ping.pb.h" + +/* c99 compatibility for c++ */ +#define __STDC_FORMAT_MACROS +#define restrict + +#include <cerrno> +#include <clocale> +#include <csignal> +#include <cstdio> +#include <cstdlib> +#include <cstring> + +#include <inttypes.h> +#include <math.h> +#include <unistd.h> +#include <time.h> +#include <sys/time.h> +#ifdef CONFIG_HAVE_EPOLL +# include <sys/epoll.h> +#endif +#include <sys/types.h> +#ifndef _WIN32 +# include <netdb.h> +# include <netinet/in.h> +# include <sched.h> +# include <sys/socket.h> +# include <arpa/inet.h> +#endif +#include <glib.h> +#include <pgm/pgm.h> +#ifdef CONFIG_WITH_HTTP +# include <pgm/http.h> +#endif +#ifdef CONFIG_WITH_SNMP +# include <pgm/snmp.h> +#endif + +/* PGM internal time keeper */ +typedef pgm_time_t (*pgm_time_update_func)(void); +extern pgm_time_update_func pgm_time_update_now; +extern "C" { + size_t pgm_pkt_offset (bool, sa_family_t); +} + +/* example dependencies */ +#include <pgm/backtrace.h> +#include <pgm/log.h> +#include <pgm/signal.h> + + +using namespace std; + + +/* globals */ + +static int g_port = 0; +static const char* g_network = ""; +static int g_udp_encap_port = 0; + +static int g_odata_rate = 0; +static int g_odata_interval = 0; +static guint32 g_payload = 0; +static int g_max_tpdu = 1500; +static int g_max_rte = 16*1000*1000; +static int g_sqns = 200; + +static gboolean g_use_pgmcc = FALSE; +static sa_family_t g_pgmcc_family = 0; /* 0 = disabled */ + +static gboolean g_use_fec = FALSE; +static int g_rs_k = 8; +static int g_rs_n = 255; + +static enum { + PGMPING_MODE_SOURCE, + PGMPING_MODE_RECEIVER, + PGMPING_MODE_INITIATOR, + PGMPING_MODE_REFLECTOR +} g_mode = PGMPING_MODE_INITIATOR; + +static pgm_sock_t* g_sock = NULL; + +/* stats */ +static guint64 g_msg_sent = 0; +static guint64 g_msg_received = 0; +static pgm_time_t g_interval_start = 0; +static pgm_time_t g_latency_current = 0; +static guint64 g_latency_seqno = 0; +static guint64 g_last_seqno = 0; +static double g_latency_total = 0.0; +static double g_latency_square_total = 0.0; +static guint64 g_latency_count = 0; +static double g_latency_max = 0.0; +#ifdef INFINITY +static double g_latency_min = INFINITY; +#else +static double g_latency_min = INT64_MAX; +#endif +static double g_latency_running_average = 0.0; +static guint64 g_out_total = 0; +static guint64 g_in_total = 0; + +static GMainLoop* g_loop = NULL; +static GThread* g_sender_thread = NULL; +static GThread* g_receiver_thread = NULL; +static gboolean g_quit; +#ifdef G_OS_UNIX +static int g_quit_pipe[2]; +static void on_signal (int, gpointer); +#else +static HANDLE g_quit_event; +static BOOL on_console_ctrl (DWORD); +#endif + +static gboolean on_startup (gpointer); +static gboolean on_shutdown (gpointer); +static gboolean on_mark (gpointer); + +static void send_odata (void); +static int on_msgv (struct pgm_msgv_t*, size_t); + +static gpointer sender_thread (gpointer); +static gpointer receiver_thread (gpointer); + + +G_GNUC_NORETURN static void +usage (const char* bin) +{ + fprintf (stderr, "Usage: %s [options]\n", bin); + fprintf (stderr, " -n <network> : Multicast group or unicast IP address\n"); + fprintf (stderr, " -s <port> : IP port\n"); + fprintf (stderr, " -p <port> : Encapsulate PGM in UDP on IP port\n"); + fprintf (stderr, " -d <seconds> : Terminate transport after duration.\n"); + fprintf (stderr, " -m <frequency> : Number of message to send per second\n"); + fprintf (stderr, " -o : Send-only mode (default send & receive mode)\n"); + fprintf (stderr, " -l : Listen-only mode\n"); + fprintf (stderr, " -e : Relect mode\n"); + fprintf (stderr, " -r <rate> : Regulate to rate bytes per second\n"); + fprintf (stderr, " -c : Enable PGMCC\n"); + fprintf (stderr, " -f <type> : Enable FEC with either proactive or ondemand parity\n"); + fprintf (stderr, " -K <k> : Configure Reed-Solomon code (n, k)\n"); + fprintf (stderr, " -N <n>\n"); + fprintf (stderr, " -H : Enable HTTP administrative interface\n"); + fprintf (stderr, " -S : Enable SNMP interface\n"); + exit (1); +} + +int +main ( + int argc, + char *argv[] + ) +{ + GError* err = NULL; + pgm_error_t* pgm_err = NULL; + gboolean enable_http = FALSE; + gboolean enable_snmpx = FALSE; + int timeout = 0; + + GOOGLE_PROTOBUF_VERIFY_VERSION; + + setlocale (LC_ALL, ""); + setenv ("PGM_TIMER", "GTOD", 1); + setenv ("PGM_SLEEP", "USLEEP", 1); + + log_init (); + g_message ("pgmping"); + + g_thread_init (NULL); + + if (!pgm_init (&pgm_err)) { + g_error ("Unable to start PGM engine: %s", pgm_err->message); + pgm_error_free (pgm_err); + return EXIT_FAILURE; + } + +/* parse program arguments */ + const char* binary_name = g_get_prgname(); + int c; + while ((c = getopt (argc, argv, "s:n:p:m:old:r:cfeK:N:HSh")) != -1) + { + switch (c) { + case 'n': g_network = optarg; break; + case 's': g_port = atoi (optarg); break; + case 'p': g_udp_encap_port = atoi (optarg); break; + case 'r': g_max_rte = atoi (optarg); break; + + case 'c': g_use_pgmcc = TRUE; break; + + case 'f': g_use_fec = TRUE; break; + case 'K': g_rs_k = atoi (optarg); break; + case 'N': g_rs_n = atoi (optarg); break; + + case 'H': enable_http = TRUE; break; + case 'S': enable_snmpx = TRUE; break; + + case 'm': g_odata_rate = atoi (optarg); + g_odata_interval = (1000 * 1000) / g_odata_rate; break; + case 'd': timeout = 1000 * atoi (optarg); break; + + case 'o': g_mode = PGMPING_MODE_SOURCE; break; + case 'l': g_mode = PGMPING_MODE_RECEIVER; break; + case 'e': g_mode = PGMPING_MODE_REFLECTOR; break; + + case 'h': + case '?': usage (binary_name); + } + } + + if (g_use_fec && ( !g_rs_k || !g_rs_n )) { + g_error ("Invalid Reed-Solomon parameters."); + usage (binary_name); + } + +#ifdef CONFIG_WITH_HTTP + if (enable_http) { + if (!pgm_http_init (PGM_HTTP_DEFAULT_SERVER_PORT, &pgm_err)) { + g_error ("Unable to start HTTP interface: %s", pgm_err->message); + pgm_error_free (pgm_err); + pgm_shutdown (); + return EXIT_FAILURE; + } + } +#endif +#ifdef CONFIG_WITH_SNMP + if (enable_snmpx) { + if (!pgm_snmp_init (&pgm_err)) { + g_error ("Unable to start SNMP interface: %s", pgm_err->message); + pgm_error_free (pgm_err); +#ifdef CONFIG_WITH_HTTP + if (enable_http) + pgm_http_shutdown (); +#endif + pgm_shutdown (); + return EXIT_FAILURE; + } + } +#endif + + g_loop = g_main_loop_new (NULL, FALSE); + + g_quit = FALSE; + +/* setup signal handlers */ + signal (SIGSEGV, on_sigsegv); +#ifdef SIGHUP + signal (SIGHUP, SIG_IGN); +#endif +#ifdef G_OS_UNIX + pipe (g_quit_pipe); + pgm_signal_install (SIGINT, on_signal, g_loop); + pgm_signal_install (SIGTERM, on_signal, g_loop); +#else + g_quit_event = CreateEvent (NULL, TRUE, FALSE, TEXT("QuitEvent")); + SetConsoleCtrlHandler ((PHANDLER_ROUTINE)on_console_ctrl, TRUE); +#endif /* !G_OS_UNIX */ + +/* delayed startup */ + g_message ("scheduling startup."); + g_timeout_add (0, (GSourceFunc)on_startup, g_loop); + + if (timeout) { + g_message ("scheduling shutdown."); + g_timeout_add (timeout, (GSourceFunc)on_shutdown, g_loop); + } + +/* dispatch loop */ + g_message ("entering main event loop ... "); + g_main_loop_run (g_loop); + + g_message ("event loop terminated, cleaning up."); + +/* cleanup */ + g_quit = TRUE; +#ifdef G_OS_UNIX + const char one = '1'; + write (g_quit_pipe[1], &one, sizeof(one)); + if (PGMPING_MODE_SOURCE == g_mode || PGMPING_MODE_INITIATOR == g_mode) + g_thread_join (g_sender_thread); + g_thread_join (g_receiver_thread); + close (g_quit_pipe[0]); + close (g_quit_pipe[1]); +#else + SetEvent (g_quit_event); + if (PGMPING_MODE_SOURCE == g_mode || PGMPING_MODE_INITIATOR == g_mode) + g_thread_join (g_sender_thread); + g_thread_join (g_receiver_thread); + CloseHandle (g_quit_event); +#endif + + g_main_loop_unref (g_loop); + g_loop = NULL; + + if (g_sock) { + g_message ("closing PGM socket."); + pgm_close (g_sock, TRUE); + g_sock = NULL; + } + +#ifdef CONFIG_WITH_HTTP + if (enable_http) + pgm_http_shutdown(); +#endif +#ifdef CONFIG_WITH_SNMP + if (enable_snmpx) + pgm_snmp_shutdown(); +#endif + + google::protobuf::ShutdownProtobufLibrary(); + + g_message ("PGM engine shutdown."); + pgm_shutdown (); + g_message ("finished."); + return EXIT_SUCCESS; +} + +#ifdef G_OS_UNIX +static +void +on_signal ( + int signum, + gpointer user_data + ) +{ + GMainLoop* loop = (GMainLoop*)user_data; + g_message ("on_signal (signum:%d user-data:%p)", + signum, user_data); + g_main_loop_quit (loop); +} +#else +static +BOOL +on_console_ctrl ( + DWORD dwCtrlType + ) +{ + g_message ("on_console_ctrl (dwCtrlType:%lu)", (unsigned long)dwCtrlType); + g_main_loop_quit (g_loop); + return TRUE; +} +#endif /* !G_OS_UNIX */ + +static +gboolean +on_shutdown ( + gpointer user_data + ) +{ + GMainLoop* loop = (GMainLoop*)user_data; + g_message ("on_shutdown (user-data:%p)", user_data); + g_main_loop_quit (loop); + return FALSE; +} + +static +gboolean +on_startup ( + gpointer user_data + ) +{ + GMainLoop* loop = (GMainLoop*)user_data; + struct pgm_addrinfo_t* res = NULL; + GError* err = NULL; + pgm_error_t* pgm_err = NULL; + sa_family_t sa_family = AF_UNSPEC; + + g_message ("startup."); + +/* parse network parameter into transport address structure */ + if (!pgm_getaddrinfo (g_network, NULL, &res, &pgm_err)) { + g_error ("parsing network parameter: %s", pgm_err->message); + goto err_abort; + } + + sa_family = res->ai_send_addrs[0].gsr_group.ss_family; + if (g_use_pgmcc) + g_pgmcc_family = sa_family; + + if (g_udp_encap_port) { + g_message ("create PGM/UDP socket."); + if (!pgm_socket (&g_sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP, &pgm_err)) { + g_error ("socket: %s", pgm_err->message); + goto err_abort; + } + pgm_setsockopt (g_sock, PGM_UDP_ENCAP_UCAST_PORT, &g_udp_encap_port, sizeof(g_udp_encap_port)); + pgm_setsockopt (g_sock, PGM_UDP_ENCAP_MCAST_PORT, &g_udp_encap_port, sizeof(g_udp_encap_port)); + } else { + g_message ("create PGM/IP socket."); + if (!pgm_socket (&g_sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM, &pgm_err)) { + g_error ("socket: %s", pgm_err->message); + goto err_abort; + } + } + +/* Use RFC 2113 tagging for PGM Router Assist */ + { + const int no_router_assist = 0; + pgm_setsockopt (g_sock, PGM_IP_ROUTER_ALERT, &no_router_assist, sizeof(no_router_assist)); + } + + pgm_drop_superuser(); + +/* set PGM parameters */ + if (PGMPING_MODE_SOURCE == g_mode || + PGMPING_MODE_INITIATOR == g_mode || + PGMPING_MODE_REFLECTOR == g_mode) + { + const int send_only = PGMPING_MODE_SOURCE == g_mode ? 1 : 0, + ambient_spm = pgm_secs (30), + heartbeat_spm[] = { pgm_msecs (100), + pgm_msecs (100), + pgm_msecs (100), + pgm_msecs (100), + pgm_msecs (1300), + pgm_secs (7), + pgm_secs (16), + pgm_secs (25), + pgm_secs (30) }; + + pgm_setsockopt (g_sock, PGM_SEND_ONLY, &send_only, sizeof(send_only)); + pgm_setsockopt (g_sock, PGM_MTU, &g_max_tpdu, sizeof(g_max_tpdu)); + pgm_setsockopt (g_sock, PGM_TXW_SQNS, &g_sqns, sizeof(g_sqns)); + pgm_setsockopt (g_sock, PGM_TXW_MAX_RTE, &g_max_rte, sizeof(g_max_rte)); + pgm_setsockopt (g_sock, PGM_AMBIENT_SPM, &ambient_spm, sizeof(ambient_spm)); + pgm_setsockopt (g_sock, PGM_HEARTBEAT_SPM, &heartbeat_spm, sizeof(heartbeat_spm)); + } + if (PGMPING_MODE_RECEIVER == g_mode || + PGMPING_MODE_INITIATOR == g_mode || + PGMPING_MODE_REFLECTOR == g_mode) + { + const int recv_only = PGMPING_MODE_RECEIVER == g_mode ? 1 : 0, + passive = 0, + peer_expiry = pgm_secs (300), + spmr_expiry = pgm_msecs (250), + nak_bo_ivl = pgm_msecs (50), + nak_rpt_ivl = pgm_msecs (200), //pgm_secs (2), + nak_rdata_ivl = pgm_msecs (200), //pgm_secs (2), + nak_data_retries = 50, + nak_ncf_retries = 50; + + pgm_setsockopt (g_sock, PGM_RECV_ONLY, &recv_only, sizeof(recv_only)); + pgm_setsockopt (g_sock, PGM_PASSIVE, &passive, sizeof(passive)); + pgm_setsockopt (g_sock, PGM_MTU, &g_max_tpdu, sizeof(g_max_tpdu)); + pgm_setsockopt (g_sock, PGM_RXW_SQNS, &g_sqns, sizeof(g_sqns)); + pgm_setsockopt (g_sock, PGM_PEER_EXPIRY, &peer_expiry, sizeof(peer_expiry)); + pgm_setsockopt (g_sock, PGM_SPMR_EXPIRY, &spmr_expiry, sizeof(spmr_expiry)); + pgm_setsockopt (g_sock, PGM_NAK_BO_IVL, &nak_bo_ivl, sizeof(nak_bo_ivl)); + pgm_setsockopt (g_sock, PGM_NAK_RPT_IVL, &nak_rpt_ivl, sizeof(nak_rpt_ivl)); + pgm_setsockopt (g_sock, PGM_NAK_RDATA_IVL, &nak_rdata_ivl, sizeof(nak_rdata_ivl)); + pgm_setsockopt (g_sock, PGM_NAK_DATA_RETRIES, &nak_data_retries, sizeof(nak_data_retries)); + pgm_setsockopt (g_sock, PGM_NAK_NCF_RETRIES, &nak_ncf_retries, sizeof(nak_ncf_retries)); + } + +#ifdef I_UNDERSTAND_PGMCC_AND_FEC_ARE_NOT_SUPPORTED +/* PGMCC congestion control */ + if (g_use_pgmcc) { + struct pgm_pgmccinfo_t pgmccinfo; + pgmccinfo.ack_bo_ivl = pgm_msecs (50); + pgmccinfo.ack_c = 75; + pgmccinfo.ack_c_p = 500; + pgm_setsockopt (g_sock, PGM_USE_PGMCC, &pgmccinfo, sizeof(pgmccinfo)); + } + +/* Reed Solomon forward error correction */ + if (g_use_fec) { + struct pgm_fecinfo_t fecinfo; + fecinfo.block_size = g_rs_n; + fecinfo.proactive_packets = 0; + fecinfo.group_size = g_rs_k; + fecinfo.ondemand_parity_enabled = TRUE; + fecinfo.var_pktlen_enabled = TRUE; + pgm_setsockopt (g_sock, PGM_USE_FEC, &fecinfo, sizeof(fecinfo)); + } +#endif + +/* create global session identifier */ + struct pgm_sockaddr_t addr; + memset (&addr, 0, sizeof(addr)); + addr.sa_port = g_port ? g_port : DEFAULT_DATA_DESTINATION_PORT; + addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT; + if (!pgm_gsi_create_from_hostname (&addr.sa_addr.gsi, &pgm_err)) { + g_error ("creating GSI: %s", pgm_err->message); + goto err_abort; + } + +/* assign socket to specified address */ + struct pgm_interface_req_t if_req; + memset (&if_req, 0, sizeof(if_req)); + if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface; + if_req.ir_scope_id = 0; + if (AF_INET6 == sa_family) { + struct sockaddr_in6 sa6; + memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof(sa6)); + if_req.ir_scope_id = sa6.sin6_scope_id; + } + if (!pgm_bind3 (g_sock, + &addr, sizeof(addr), + &if_req, sizeof(if_req), /* tx interface */ + &if_req, sizeof(if_req), /* rx interface */ + &pgm_err)) + { + g_error ("binding PGM socket: %s", pgm_err->message); + goto err_abort; + } + +/* join IP multicast groups */ + for (unsigned i = 0; i < res->ai_recv_addrs_len; i++) + pgm_setsockopt (g_sock, PGM_JOIN_GROUP, &res->ai_recv_addrs[i], sizeof(struct group_req)); + pgm_setsockopt (g_sock, PGM_SEND_GROUP, &res->ai_send_addrs[0], sizeof(struct group_req)); + pgm_freeaddrinfo (res); + +/* set IP parameters */ + { + const int nonblocking = 1, + multicast_loop = 0, + multicast_hops = 16, + dscp = 0x2e << 2; /* Expedited Forwarding PHB for network elements, no ECN. */ + + pgm_setsockopt (g_sock, PGM_MULTICAST_LOOP, &multicast_loop, sizeof(multicast_loop)); + pgm_setsockopt (g_sock, PGM_MULTICAST_HOPS, &multicast_hops, sizeof(multicast_hops)); + if (AF_INET6 != sa_family) + pgm_setsockopt (g_sock, PGM_TOS, &dscp, sizeof(dscp)); + pgm_setsockopt (g_sock, PGM_NOBLOCK, &nonblocking, sizeof(nonblocking)); + } + + if (!pgm_connect (g_sock, &pgm_err)) { + g_error ("connecting PGM socket: %s", pgm_err->message); + goto err_abort; + } + +/* period timer to indicate some form of life */ +// TODO: Gnome 2.14: replace with g_timeout_add_seconds() + g_timeout_add (2 * 1000, (GSourceFunc)on_mark, NULL); + + if (PGMPING_MODE_SOURCE == g_mode || PGMPING_MODE_INITIATOR == g_mode) + { + g_sender_thread = g_thread_create_full (sender_thread, + g_sock, + 0, + TRUE, + TRUE, + G_THREAD_PRIORITY_NORMAL, + &err); + if (!g_sender_thread) { + g_critical ("g_thread_create_full failed errno %i: \"%s\"", err->code, err->message); + goto err_abort; + } + } + + { + g_receiver_thread = g_thread_create_full (receiver_thread, + g_sock, + 0, + TRUE, + TRUE, + G_THREAD_PRIORITY_HIGH, + &err); + if (!g_receiver_thread) { + g_critical ("g_thread_create_full failed errno %i: \"%s\"", err->code, err->message); + goto err_abort; + } + } + + g_message ("startup complete."); + return FALSE; + +err_abort: + if (NULL != g_sock) { + pgm_close (g_sock, FALSE); + g_sock = NULL; + } + if (NULL != res) { + pgm_freeaddrinfo (res); + res = NULL; + } + if (NULL != pgm_err) { + pgm_error_free (pgm_err); + pgm_err = NULL; + } + return FALSE; +} + +static +gpointer +sender_thread ( + gpointer user_data + ) +{ + pgm_sock_t* tx_sock = (pgm_sock_t*)user_data; + example::Ping ping; + string subject("PING.PGM.TEST."); + char hostname[NI_MAXHOST + 1]; + const long payload_len = 1000; + char payload[payload_len]; + gpointer buffer = NULL; + guint64 latency, now, last; + +#ifdef CONFIG_HAVE_EPOLL + const long ev_len = 1; + struct epoll_event events[ev_len]; + + int efd_again = epoll_create (IP_MAX_MEMBERSHIPS); + if (efd_again < 0) { + g_error ("epoll_create failed errno %i: \"%s\"", errno, strerror(errno)); + g_main_loop_quit (g_loop); + return NULL; + } +/* Add write event to epoll domain in order to re-enable as required by return + * value. We use one-shot flag to disable ASAP, as we don't want such events + * until triggered. + */ + if (pgm_epoll_ctl (tx_sock, efd_again, EPOLL_CTL_ADD, EPOLLOUT | EPOLLONESHOT) < 0) { + g_error ("pgm_epoll_ctl failed errno %i: \"%s\"", errno, strerror(errno)); + g_main_loop_quit (g_loop); + return NULL; + } + struct epoll_event event; + memset (&event, 0, sizeof(event)); + event.events = EPOLLIN; + event.data.fd = g_quit_pipe[0]; + if (epoll_ctl (efd_again, EPOLL_CTL_ADD, g_quit_pipe[0], &event) < 0) { + g_error ("epoll_ctl failed errno %i: \"%s\"", errno, strerror(errno)); + g_main_loop_quit (g_loop); + return NULL; + } +#elif defined(CONFIG_HAVE_POLL) + int n_fds = 2; + struct pollfd fds[ 2 + 1 ]; +#endif /* !CONFIG_HAVE_EPOLL */ + + gethostname (hostname, sizeof(hostname)); + subject.append(hostname); + memset (payload, 0, sizeof(payload)); + + ping.mutable_subscription_header()->set_subject (subject); + ping.mutable_market_data_header()->set_msg_type (example::MarketDataHeader::MSG_VERIFY); + ping.mutable_market_data_header()->set_rec_type (example::MarketDataHeader::PING); + ping.mutable_market_data_header()->set_rec_status (example::MarketDataHeader::STATUS_OK); + ping.set_time (last); + + last = now = pgm_time_update_now(); + do { + if (g_msg_sent && g_latency_seqno + 1 == g_msg_sent) + latency = g_latency_current; + else + latency = g_odata_interval; + + ping.set_seqno (g_msg_sent); + ping.set_latency (latency); + ping.set_payload (payload, sizeof(payload)); + + const size_t header_size = pgm_pkt_offset (FALSE, g_pgmcc_family); + const size_t apdu_size = ping.ByteSize(); + struct pgm_sk_buff_t* skb = pgm_alloc_skb (g_max_tpdu); + pgm_skb_reserve (skb, header_size); + pgm_skb_put (skb, apdu_size); + +/* wait on packet rate limit */ + if ((last + g_odata_interval) > now) { +#ifndef _WIN32 + const unsigned int usec = g_odata_interval - (now - last); + usleep (usec); +#else + const DWORD msec = usecs_to_msecs (g_odata_interval - (now - last)); + Sleep (msec); +#endif + now = pgm_time_update_now(); + } + last += g_odata_interval; + ping.set_time (now); + ping.SerializeToArray (skb->data, skb->len); + + struct timeval tv; + int timeout; + size_t bytes_written; + int status; +again: + status = pgm_send_skbv (tx_sock, &skb, 1, TRUE, &bytes_written); + switch (status) { +/* rate control */ + case PGM_IO_STATUS_RATE_LIMITED: + { + socklen_t optlen = sizeof (tv); + pgm_getsockopt (tx_sock, PGM_RATE_REMAIN, &tv, &optlen); + timeout = (tv.tv_sec * 1000) + ((tv.tv_usec + 500) / 1000); +/* busy wait under 2ms */ + if (timeout < 2) timeout = 0; +#ifdef CONFIG_HAVE_EPOLL + const int ready = epoll_wait (efd_again, events, G_N_ELEMENTS(events), timeout /* ms */); +#elif defined(CONFIG_HAVE_POLL) + memset (fds, 0, sizeof(fds)); + fds[0].fd = g_quit_pipe[0]; + fds[0].events = POLLIN; + pgm_poll_info (tx_sock, &fds[1], &n_fds, POLLIN); + poll (fds, 1 + n_fds, timeout /* ms */); +#endif /* !CONFIG_HAVE_EPOLL */ + if (G_UNLIKELY(g_quit)) + break; + goto again; + } +/* congestion control */ + case PGM_IO_STATUS_CONGESTION: +/* kernel feedback */ + case PGM_IO_STATUS_WOULD_BLOCK: + { +#ifdef CONFIG_HAVE_EPOLL +/* re-enable write event for one-shot */ + if (pgm_epoll_ctl (tx_sock, efd_again, EPOLL_CTL_MOD, EPOLLOUT | EPOLLONESHOT) < 0) + { + g_error ("pgm_epoll_ctl failed errno %i: \"%s\"", errno, strerror(errno)); + g_main_loop_quit (g_loop); + return NULL; + } + const int ready = epoll_wait (efd_again, events, G_N_ELEMENTS(events), -1 /* ms */); + if (G_UNLIKELY(g_quit)) + break; +#elif defined(CONFIG_HAVE_POLL) + memset (fds, 0, sizeof(fds)); + fds[0].fd = g_quit_pipe[0]; + fds[0].events = POLLIN; + pgm_poll_info (g_sock, &fds[1], &n_fds, POLLOUT); + poll (fds, 1 + n_fds, -1 /* ms */); +#endif /* !CONFIG_HAVE_EPOLL */ + goto again; + } +/* successful delivery */ + case PGM_IO_STATUS_NORMAL: +// g_message ("sent payload: %s", ping.DebugString().c_str()); +// g_message ("sent %u bytes", (unsigned)bytes_written); + break; + default: + g_warning ("pgm_send_skbv failed, status:%i", status); + g_main_loop_quit (g_loop); + return NULL; + } + g_out_total += bytes_written; + g_msg_sent++; + } while (!g_quit); + +#ifdef CONFIG_HAVE_EPOLL + close (efd_again); +#endif + return NULL; +} + +static +gpointer +receiver_thread ( + gpointer data + ) +{ + pgm_sock_t* rx_sock = (pgm_sock_t*)data; + const long iov_len = 20; + struct pgm_msgv_t msgv[iov_len]; + pgm_time_t lost_tstamp = 0; + pgm_tsi_t lost_tsi; + guint32 lost_count = 0; + +#ifdef CONFIG_HAVE_EPOLL + const long ev_len = 1; + struct epoll_event events[ev_len]; + + int efd = epoll_create (IP_MAX_MEMBERSHIPS); + if (efd < 0) { + g_error ("epoll_create failed errno %i: \"%s\"", errno, strerror(errno)); + g_main_loop_quit (g_loop); + return NULL; + } + if (pgm_epoll_ctl (rx_sock, efd, EPOLL_CTL_ADD, EPOLLIN) < 0) { + g_error ("pgm_epoll_ctl failed errno %i: \"%s\"", errno, strerror(errno)); + g_main_loop_quit (g_loop); + return NULL; + } + struct epoll_event event; + memset (&event, 0, sizeof(event)); + event.events = EPOLLIN; + if (epoll_ctl (efd, EPOLL_CTL_ADD, g_quit_pipe[0], &event) < 0) { + g_error ("epoll_ctl failed errno %i: \"%s\"", errno, strerror(errno)); + g_main_loop_quit (g_loop); + return NULL; + } +#elif defined(CONFIG_HAVE_POLL) + int n_fds = 3; + struct pollfd fds[ 3 + 1 ]; +#endif /* !CONFIG_HAVE_EPOLL */ + + memset (&lost_tsi, 0, sizeof(lost_tsi)); + + do { + struct timeval tv; + int timeout; + size_t len; + pgm_error_t* pgm_err = NULL; + const int status = pgm_recvmsgv (rx_sock, + msgv, + G_N_ELEMENTS(msgv), + MSG_ERRQUEUE, + &len, + &pgm_err); + if (lost_count) { + pgm_time_t elapsed = pgm_time_update_now() - lost_tstamp; + if (elapsed >= pgm_secs(1)) { + g_warning ("pgm data lost %" G_GUINT32_FORMAT " packets detected from %s", + lost_count, pgm_tsi_print (&lost_tsi)); + lost_count = 0; + } + } + + switch (status) { + case PGM_IO_STATUS_NORMAL: +// g_message ("recv %u bytes", (unsigned)len); + on_msgv (msgv, len); + break; + case PGM_IO_STATUS_TIMER_PENDING: + { + socklen_t optlen = sizeof (tv); + pgm_getsockopt (g_sock, PGM_TIME_REMAIN, &tv, &optlen); + } + goto block; + case PGM_IO_STATUS_RATE_LIMITED: + { + socklen_t optlen = sizeof (tv); + pgm_getsockopt (g_sock, PGM_RATE_REMAIN, &tv, &optlen); + } +/* fall through */ + case PGM_IO_STATUS_WOULD_BLOCK: +block: + timeout = PGM_IO_STATUS_WOULD_BLOCK == status ? -1 : ((tv.tv_sec * 1000) + (tv.tv_usec / 1000)); +/* busy wait under 2ms */ + if (timeout > 0 && timeout < 2) timeout = 0; +#ifdef CONFIG_HAVE_EPOLL + epoll_wait (efd, events, G_N_ELEMENTS(events), timeout /* ms */); +#elif defined(CONFIG_HAVE_POLL) + memset (fds, 0, sizeof(fds)); + fds[0].fd = g_quit_pipe[0]; + fds[0].events = POLLIN; + pgm_transport_poll_info (g_transport, &fds[1], &n_fds, POLLIN); + poll (fds, 1 + n_fds, timeout /* ms */); +#endif /* !CONFIG_HAVE_EPOLL */ + break; + case PGM_IO_STATUS_RESET: + { + struct pgm_sk_buff_t* skb = msgv[0].msgv_skb[0]; + lost_tstamp = skb->tstamp; + if (pgm_tsi_equal (&skb->tsi, &lost_tsi)) + lost_count += skb->sequence; + else { + lost_count = skb->sequence; + memcpy (&lost_tsi, &skb->tsi, sizeof(pgm_tsi_t)); + } + pgm_free_skb (skb); + break; + } + default: + if (pgm_err) { + g_warning ("%s", pgm_err->message); + pgm_error_free (pgm_err); + pgm_err = NULL; + } + break; + } + } while (!g_quit); + +#ifdef CONFIG_HAVE_EPOLL + close (efd); +#endif + return NULL; +} + +static +int +on_msgv ( + struct pgm_msgv_t* msgv, /* an array of msgvs */ + size_t len + ) +{ + example::Ping ping; + guint i = 0; + static pgm_time_t last_time = pgm_time_update_now(); + + while (len) + { + const struct pgm_sk_buff_t* pskb = msgv[i].msgv_skb[0]; + gsize apdu_len = 0; + for (unsigned j = 0; j < msgv[i].msgv_len; j++) + apdu_len += msgv[i].msgv_skb[j]->len; + + if (PGMPING_MODE_REFLECTOR == g_mode) + { + int status; +again: + status = pgm_send (g_sock, pskb->data, pskb->len, NULL); + switch (status) { + case PGM_IO_STATUS_RATE_LIMITED: + case PGM_IO_STATUS_CONGESTION: + case PGM_IO_STATUS_WOULD_BLOCK: +/* busy wait always as reflector */ + goto again; + + case PGM_IO_STATUS_NORMAL: + break; + + default: + g_warning ("pgm_send_skbv failed"); + g_main_loop_quit (g_loop); + return 0; + } + goto next_msg; + } + +/* only parse first fragment of each apdu */ + if (!ping.ParseFromArray (pskb->data, pskb->len)) + goto next_msg; +// g_message ("payload: %s", ping.DebugString().c_str()); + + { + const pgm_time_t send_time = ping.time(); + const pgm_time_t recv_time = pskb->tstamp; + const guint64 seqno = ping.seqno(); + const guint64 latency = ping.latency(); + + if (seqno < g_latency_seqno) { + g_message ("seqno replay?"); + goto next_msg; + } + + g_in_total += pskb->len; + g_msg_received++; + +/* handle ping */ + const pgm_time_t now = pgm_time_update_now(); + if (send_time > now) + g_warning ("send time %" PGM_TIME_FORMAT " newer than now %" PGM_TIME_FORMAT, + send_time, now); + if (recv_time > now) + g_warning ("recv time %" PGM_TIME_FORMAT " newer than now %" PGM_TIME_FORMAT, + recv_time, now); + if (send_time >= recv_time){ + g_message ("timer mismatch, send time = recv time + %.3f ms (last time + %.3f ms)", + pgm_to_msecsf(send_time - recv_time), + pgm_to_msecsf(last_time - send_time)); + goto next_msg; + } + g_latency_current = pgm_to_secs(recv_time - send_time); + g_latency_seqno = seqno; + + const double elapsed = pgm_to_usecsf (recv_time - send_time); + g_latency_total += elapsed; + g_latency_square_total += elapsed * elapsed; + + if (elapsed > g_latency_max) + g_latency_max = elapsed; + if (elapsed < g_latency_min) + g_latency_min = elapsed; + + g_latency_running_average += elapsed; + g_latency_count++; + last_time = recv_time; + } + +/* move onto next apdu */ +next_msg: + i++; + len -= apdu_len; + } + + return 0; +} + +/* idle log notification + */ + +static +gboolean +on_mark ( + G_GNUC_UNUSED gpointer data + ) +{ + const pgm_time_t now = pgm_time_update_now (); + const double interval = pgm_to_secsf(now - g_interval_start); + g_interval_start = now; + +/* receiving a ping */ + if (g_latency_count) + { + const double average = g_latency_total / g_latency_count; + const double variance = g_latency_square_total / g_latency_count + - average * average; + const double standard_deviation = sqrt (variance); + + if (g_latency_count < 10) + { + if (average < 1000.0) + g_message ("seqno=%" G_GUINT64_FORMAT " time=%.01f us", + g_latency_seqno, average); + else + g_message ("seqno=%" G_GUINT64_FORMAT " time=%.01f ms", + g_latency_seqno, average / 1000); + } + else + { + double seq_rate = (g_latency_seqno - g_last_seqno) / interval; + double out_rate = g_out_total * 8.0 / 1000000.0 / interval; + double in_rate = g_in_total * 8.0 / 1000000.0 / interval; + if (g_latency_min < 1000.0) + g_message ("s=%.01f avg=%.01f min=%.01f max=%.01f stddev=%0.1f us o=%.2f i=%.2f mbit", + seq_rate, average, g_latency_min, g_latency_max, standard_deviation, out_rate, in_rate); + else + g_message ("s=%.01f avg=%.01f min=%.01f max=%.01f stddev=%0.1f ms o=%.2f i=%.2f mbit", + seq_rate, average / 1000, g_latency_min / 1000, g_latency_max / 1000, standard_deviation / 1000, out_rate, in_rate); + } + +/* reset interval counters */ + g_latency_total = 0.0; + g_latency_square_total = 0.0; + g_latency_count = 0; + g_last_seqno = g_latency_seqno; +#ifdef INFINITY + g_latency_min = INFINITY; +#else + g_latency_min = INT64_MAX; +#endif + g_latency_max = 0.0; + g_out_total = 0; + g_in_total = 0; + } + + return TRUE; +} + +/* eof */ |