diff options
Diffstat (limited to '3rdparty/openpgm-svn-r1085/pgm/test/async.c')
-rw-r--r-- | 3rdparty/openpgm-svn-r1085/pgm/test/async.c | 572 |
1 files changed, 572 insertions, 0 deletions
diff --git a/3rdparty/openpgm-svn-r1085/pgm/test/async.c b/3rdparty/openpgm-svn-r1085/pgm/test/async.c new file mode 100644 index 0000000..cc2c1ba --- /dev/null +++ b/3rdparty/openpgm-svn-r1085/pgm/test/async.c @@ -0,0 +1,572 @@ +/* vim:ts=8:sts=8:sw=4:noai:noexpandtab + * + * Asynchronous queue for receiving packets in a separate managed thread. + * + * Copyright (c) 2006-2010 Miru Limited. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + + +#include <stdio.h> +#include <time.h> +#include <unistd.h> +#include <glib.h> +#include <glib/gi18n-lib.h> +#include <pgm/pgm.h> +#include "async.h" + + +//#define ASYNC_DEBUG + +#ifndef ASYNC_DEBUG +# define g_trace(...) while (0) +#else +#include <ctype.h> +# define g_trace(...) g_debug(__VA_ARGS__) +#endif + + +/* globals */ + + +/* global locals */ + +typedef struct pgm_event_t pgm_event_t; + +struct pgm_event_t { + gpointer data; + guint len; +}; + + +/* external: Glib event loop GSource of pgm contiguous data */ +struct pgm_watch_t { + GSource source; + GPollFD pollfd; + pgm_async_t* async; +}; + +typedef struct pgm_watch_t pgm_watch_t; + + +static gboolean pgm_src_prepare (GSource*, gint*); +static gboolean pgm_src_check (GSource*); +static gboolean pgm_src_dispatch (GSource*, GSourceFunc, gpointer); + +static GSourceFuncs g_pgm_watch_funcs = { + .prepare = pgm_src_prepare, + .check = pgm_src_check, + .dispatch = pgm_src_dispatch, + .finalize = NULL, + .closure_callback = NULL +}; + + +static inline gpointer pgm_event_alloc (pgm_async_t* const) G_GNUC_MALLOC; +static PGMAsyncError pgm_async_error_from_errno (const gint); + + +static inline +gpointer +pgm_event_alloc ( + pgm_async_t* const async + ) +{ + g_return_val_if_fail (async != NULL, NULL); + return g_slice_alloc (sizeof(pgm_event_t)); +} + +/* release event memory for custom async queue dispatch handlers + */ + +static inline +void +pgm_event_unref ( + pgm_async_t* const async, + pgm_event_t* const event + ) +{ + g_return_if_fail (async != NULL); + g_return_if_fail (event != NULL); + g_slice_free1 (sizeof(pgm_event_t), event); +} + +/* internal receiver thread, sits in a loop processing incoming packets + */ + +static +gpointer +pgm_receiver_thread ( + gpointer data + ) +{ + g_assert (NULL != data); + + pgm_async_t* async = (pgm_async_t*)data; + g_async_queue_ref (async->commit_queue); + +/* incoming message buffer */ + struct pgm_msgv_t msgv; + gsize bytes_read = 0; + struct timeval tv; + + do { +/* blocking read */ + const int status = pgm_recvmsg (async->transport, &msgv, 0, &bytes_read, NULL); + switch (status) { + case PGM_IO_STATUS_NORMAL: + { +/* queue a copy to receiver */ + pgm_event_t* event = pgm_event_alloc (async); + event->data = bytes_read > 0 ? g_malloc (bytes_read) : NULL; + event->len = bytes_read; + gpointer dst = event->data; + guint i = 0; + while (bytes_read) { + const struct pgm_sk_buff_t* skb = msgv.msgv_skb[i++]; + g_assert (NULL != skb); + g_assert (skb->len > 0); + g_assert (skb->len <= bytes_read); + memcpy (dst, skb->data, skb->len); + dst = (char*)dst + skb->len; + bytes_read -= skb->len; + } +/* prod pipe on edge */ + g_async_queue_lock (async->commit_queue); + g_async_queue_push_unlocked (async->commit_queue, event); + if (g_async_queue_length_unlocked (async->commit_queue) == 1) + pgm_notify_send (&async->commit_notify); + g_async_queue_unlock (async->commit_queue); + break; + } + + case PGM_IO_STATUS_TIMER_PENDING: + { + pgm_transport_get_timer_pending (async->transport, &tv); + goto block; + } + + case PGM_IO_STATUS_RATE_LIMITED: + { + pgm_transport_get_rate_remaining (async->transport, &tv); + } +/* fall through */ + case PGM_IO_STATUS_WOULD_BLOCK: +block: + { +#ifdef CONFIG_HAVE_POLL + const int timeout = PGM_IO_STATUS_WOULD_BLOCK == status ? -1 : ((tv.tv_sec * 1000) + (tv.tv_usec / 1000)); + int n_fds = 3; + struct pollfd fds[1+n_fds]; + memset (fds, 0, sizeof(fds)); + fds[0].fd = pgm_notify_get_fd (&async->destroy_notify); + fds[0].events = POLLIN; + if (-1 == pgm_transport_poll_info (async->transport, &fds[1], &n_fds, POLLIN)) { + g_trace ("poll_info returned errno=%i",errno); + goto cleanup; + } + const int ready = poll (fds, 1 + n_fds, timeout); +#else /* HAVE_SELECT */ + fd_set readfds; + int fd = pgm_notify_get_fd (&async->destroy_notify), n_fds = 1 + fd; + FD_ZERO(&readfds); + FD_SET(fd, &readfds); + if (-1 == pgm_transport_select_info (async->transport, &readfds, NULL, &n_fds)) { + g_trace ("select_info returned errno=%i",errno); + goto cleanup; + } + const int ready = select (n_fds, &readfds, NULL, NULL, PGM_IO_STATUS_RATE_LIMITED == status ? &tv : NULL); +#endif + if (-1 == ready) { + g_trace ("block returned errno=%i",errno); + goto cleanup; + } +#ifdef CONFIG_HAVE_POLL + if (ready > 0 && fds[0].revents) +#else + if (ready > 0 && FD_ISSET(fd, &readfds)) +#endif + goto cleanup; + break; + } + + case PGM_IO_STATUS_ERROR: + case PGM_IO_STATUS_EOF: + goto cleanup; + + case PGM_IO_STATUS_RESET: + if (async->transport->is_abort_on_reset) + goto cleanup; + break; + +/* TODO: report to user */ + case PGM_IO_STATUS_FIN: + break; + + default: + g_assert_not_reached(); + } + } while (!async->is_destroyed); + +cleanup: + g_async_queue_unref (async->commit_queue); + return NULL; +} + +/* create asynchronous thread handler + * + * on success, 0 is returned. on error, -1 is returned, and errno set appropriately. + * on invalid parameters, -EINVAL is returned. + */ + +gboolean +pgm_async_create ( + pgm_async_t** async, + pgm_transport_t* const transport, + GError** error + ) +{ + pgm_async_t* new_async; + + g_return_val_if_fail (NULL != async, FALSE); + g_return_val_if_fail (NULL != transport, FALSE); + + g_trace ("create (async:%p transport:%p error:%p)", + (gpointer)async, (gpointer)transport, (gpointer)error); + + if (!g_thread_supported()) + g_thread_init (NULL); + + new_async = g_new0 (pgm_async_t, 1); + new_async->transport = transport; + if (0 != pgm_notify_init (&new_async->commit_notify) || + 0 != pgm_notify_init (&new_async->destroy_notify)) + { + g_set_error (error, + PGM_ASYNC_ERROR, + pgm_async_error_from_errno (errno), + _("Creating async notification channels: %s"), + g_strerror (errno)); + g_free (new_async); + return FALSE; + } + new_async->commit_queue = g_async_queue_new(); +/* setup new thread */ + new_async->thread = g_thread_create_full (pgm_receiver_thread, + new_async, + 0, + TRUE, + TRUE, + G_THREAD_PRIORITY_HIGH, + error); + if (NULL == new_async->thread) { + g_async_queue_unref (new_async->commit_queue); + pgm_notify_destroy (&new_async->commit_notify); + g_free (new_async); + return FALSE; + } + +/* return new object */ + *async = new_async; + return TRUE; +} + +/* tell async thread to stop, wait for it to stop, then cleanup. + * + * on success, 0 is returned. if async is invalid, -EINVAL is returned. + */ + +gboolean +pgm_async_destroy ( + pgm_async_t* const async + ) +{ + g_return_val_if_fail (NULL != async, FALSE); + g_return_val_if_fail (!async->is_destroyed, FALSE); + + async->is_destroyed = TRUE; + pgm_notify_send (&async->destroy_notify); + if (async->thread) + g_thread_join (async->thread); + if (async->commit_queue) { + g_async_queue_unref (async->commit_queue); + async->commit_queue = NULL; + } + pgm_notify_destroy (&async->destroy_notify); + pgm_notify_destroy (&async->commit_notify); + g_free (async); + return TRUE; +} + +/* queue to GSource and GMainLoop */ + +GSource* +pgm_async_create_watch ( + pgm_async_t* async + ) +{ + g_return_val_if_fail (async != NULL, NULL); + + GSource *source = g_source_new (&g_pgm_watch_funcs, sizeof(pgm_watch_t)); + pgm_watch_t *watch = (pgm_watch_t*)source; + + watch->async = async; + watch->pollfd.fd = pgm_async_get_fd (async); + watch->pollfd.events = G_IO_IN; + + g_source_add_poll (source, &watch->pollfd); + + return source; +} + +/* pgm transport attaches to the callees context: the default context instead of + * any internal contexts. + */ + +int +pgm_async_add_watch_full ( + pgm_async_t* async, + gint priority, + pgm_eventfn_t function, + gpointer user_data, + GDestroyNotify notify + ) +{ + g_return_val_if_fail (async != NULL, -EINVAL); + g_return_val_if_fail (function != NULL, -EINVAL); + + GSource* source = pgm_async_create_watch (async); + + if (priority != G_PRIORITY_DEFAULT) + g_source_set_priority (source, priority); + + g_source_set_callback (source, (GSourceFunc)function, user_data, notify); + + guint id = g_source_attach (source, NULL); + g_source_unref (source); + + return id; +} + +int +pgm_async_add_watch ( + pgm_async_t* async, + pgm_eventfn_t function, + gpointer user_data + ) +{ + return pgm_async_add_watch_full (async, G_PRIORITY_HIGH, function, user_data, NULL); +} + +/* returns TRUE if source has data ready, i.e. async queue is not empty + * + * called before event loop poll() + */ + +static +gboolean +pgm_src_prepare ( + GSource* source, + gint* timeout + ) +{ + pgm_watch_t* watch = (pgm_watch_t*)source; + +/* infinite timeout */ + *timeout = -1; + + return ( g_async_queue_length(watch->async->commit_queue) > 0 ); +} + +/* called after event loop poll() + * + * return TRUE if ready to dispatch. + */ + +static +gboolean +pgm_src_check ( + GSource* source + ) +{ + pgm_watch_t* watch = (pgm_watch_t*)source; + + return ( g_async_queue_length(watch->async->commit_queue) > 0 ); +} + +/* called when TRUE returned from prepare or check + */ + +static gboolean +pgm_src_dispatch ( + GSource* source, + GSourceFunc callback, + gpointer user_data + ) +{ + g_trace ("pgm_src_dispatch (source:%p callback:() user-data:%p)", + (gpointer)source, user_data); + + const pgm_eventfn_t function = (pgm_eventfn_t)callback; + pgm_watch_t* watch = (pgm_watch_t*)source; + pgm_async_t* async = watch->async; + +/* empty pipe */ + pgm_notify_read (&async->commit_notify); + +/* purge only one message from the asynchronous queue */ + pgm_event_t* event = g_async_queue_try_pop (async->commit_queue); + if (event) + { +/* important that callback occurs out of lock to allow PGM layer to add more messages */ + (*function) (event->data, event->len, user_data); + +/* return memory to receive window */ + if (event->len) g_free (event->data); + pgm_event_unref (async, event); + } + + return TRUE; +} + +/* synchronous reading from the queue. + * + * returns GIOStatus with success, error, again, or eof. + */ + +GIOStatus +pgm_async_recv ( + pgm_async_t* const async, + gpointer data, + const gsize len, + gsize* const bytes_read, + const int flags, /* MSG_DONTWAIT for non-blocking */ + GError** error + ) +{ + g_return_val_if_fail (NULL != async, G_IO_STATUS_ERROR); + if (len) g_return_val_if_fail (NULL != data, G_IO_STATUS_ERROR); + + g_trace ("pgm_async_recv (async:%p data:%p len:%" G_GSIZE_FORMAT" bytes-read:%p flags:%d error:%p)", + (gpointer)async, data, len, (gpointer)bytes_read, flags, (gpointer)error); + + pgm_event_t* event = NULL; + g_async_queue_lock (async->commit_queue); + if (g_async_queue_length_unlocked (async->commit_queue) == 0) + { + g_async_queue_unlock (async->commit_queue); + if (flags & MSG_DONTWAIT || async->is_nonblocking) + return G_IO_STATUS_AGAIN; +#ifdef CONFIG_HAVE_POLL + struct pollfd fds[1]; + int ready; + do { + memset (fds, 0, sizeof(fds)); + fds[0].fd = pgm_notify_get_fd (&async->commit_notify); + fds[0].events = POLLIN; + ready = poll (fds, G_N_ELEMENTS(fds), -1); + if (-1 == ready || async->is_destroyed) /* errno = EINTR */ + return G_IO_STATUS_ERROR; + } while (ready <= 0); +#else + fd_set readfds; + int n_fds, ready, fd = pgm_notify_get_fd (&async->commit_notify); + do { + FD_ZERO(&readfds); + FD_SET(fd, &readfds); + n_fds = fd + 1; + ready = select (n_fds, &readfds, NULL, NULL, NULL); + if (-1 == ready || async->is_destroyed) /* errno = EINTR */ + return G_IO_STATUS_ERROR; + } while (ready <= 0); +#endif + pgm_notify_read (&async->commit_notify); + g_async_queue_lock (async->commit_queue); + } + event = g_async_queue_pop_unlocked (async->commit_queue); + g_async_queue_unlock (async->commit_queue); + +/* pass data back to callee */ + if (event->len > len) { + *bytes_read = len; + memcpy (data, event->data, *bytes_read); + g_set_error (error, + PGM_ASYNC_ERROR, + PGM_ASYNC_ERROR_OVERFLOW, + _("Message too large to be stored in buffer.")); + pgm_event_unref (async, event); + return G_IO_STATUS_ERROR; + } + + if (bytes_read) + *bytes_read = event->len; + memcpy (data, event->data, event->len); + +/* cleanup */ + if (event->len) g_free (event->data); + pgm_event_unref (async, event); + return G_IO_STATUS_NORMAL; +} + +gboolean +pgm_async_set_nonblocking ( + pgm_async_t* const async, + const gboolean nonblocking + ) +{ + g_return_val_if_fail (NULL != async, FALSE); + async->is_nonblocking = nonblocking; + return TRUE; +} + +GQuark +pgm_async_error_quark (void) +{ + return g_quark_from_static_string ("pgm-async-error-quark"); +} + +static +PGMAsyncError +pgm_async_error_from_errno ( + const gint err_no + ) +{ + switch (err_no) { +#ifdef EFAULT + case EFAULT: + return PGM_ASYNC_ERROR_FAULT; + break; +#endif + +#ifdef EMFILE + case EMFILE: + return PGM_ASYNC_ERROR_MFILE; + break; +#endif + +#ifdef ENFILE + case ENFILE: + return PGM_ASYNC_ERROR_NFILE; + break; +#endif + + default : + return PGM_ASYNC_ERROR_FAILED; + break; + } +} + +/* eof */ |