summaryrefslogtreecommitdiffstats
path: root/3rdparty/openpgm-svn-r1135/pgm/examples/async.c
diff options
context:
space:
mode:
Diffstat (limited to '3rdparty/openpgm-svn-r1135/pgm/examples/async.c')
-rw-r--r--3rdparty/openpgm-svn-r1135/pgm/examples/async.c441
1 files changed, 441 insertions, 0 deletions
diff --git a/3rdparty/openpgm-svn-r1135/pgm/examples/async.c b/3rdparty/openpgm-svn-r1135/pgm/examples/async.c
new file mode 100644
index 0000000..9ac15dd
--- /dev/null
+++ b/3rdparty/openpgm-svn-r1135/pgm/examples/async.c
@@ -0,0 +1,441 @@
+/* vim:ts=8:sts=8:sw=4:noai:noexpandtab
+ *
+ * Asynchronous queue for receiving packets in a separate managed thread.
+ *
+ * Copyright (c) 2006-2010 Miru Limited.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include <assert.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+#ifndef _WIN32
+# include <fcntl.h>
+# include <unistd.h>
+# include <pthread.h>
+#else
+# include <process.h>
+#endif
+#include <pgm/pgm.h>
+
+#include "async.h"
+
+
+/* locals */
+
+struct async_event_t {
+ struct async_event_t *next, *prev;
+ size_t len;
+ struct pgm_sockaddr_t addr;
+#if defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L)
+ char data[];
+#elif defined(__cplusplus)
+ char data[1];
+#else
+ char data[0];
+#endif
+};
+
+
+static void on_data (async_t*const restrict, const void*restrict, const size_t, const struct pgm_sockaddr_t*restrict, const socklen_t);
+
+
+/* queued data is stored as async_event_t objects
+ */
+
+static inline
+struct async_event_t*
+async_event_alloc (
+ size_t len
+ )
+{
+ struct async_event_t* event;
+ event = (struct async_event_t*)calloc (1, len + sizeof(struct async_event_t));
+ event->len = len;
+ return event;
+}
+
+static inline
+void
+async_event_unref (
+ struct async_event_t* const event
+ )
+{
+ free (event);
+}
+
+/* async_t implements a queue
+ */
+
+static inline
+void
+async_push_event (
+ async_t* restrict async,
+ struct async_event_t* restrict event
+ )
+{
+ event->next = async->head;
+ if (async->head)
+ async->head->prev = event;
+ else
+ async->tail = event;
+ async->head = event;
+ async->length++;
+}
+
+static inline
+struct async_event_t*
+async_pop_event (
+ async_t* async
+ )
+{
+ if (async->tail)
+ {
+ struct async_event_t *event = async->tail;
+
+ async->tail = event->prev;
+ if (async->tail)
+ {
+ async->tail->next = NULL;
+ event->prev = NULL;
+ }
+ else
+ async->head = NULL;
+ async->length--;
+
+ return event;
+ }
+
+ return NULL;
+}
+
+/* asynchronous receiver thread, sits in a loop processing incoming packets
+ */
+
+static
+#ifndef _WIN32
+void*
+#else
+unsigned
+__stdcall
+#endif
+receiver_routine (
+ void* arg
+ )
+{
+ assert (NULL != arg);
+ async_t* async = (async_t*)arg;
+ assert (NULL != async->sock);
+#ifndef _WIN32
+ int fds;
+ fd_set readfds;
+#else
+ int n_handles = 3, recv_sock, pending_sock;
+ HANDLE waitHandles[ 3 ];
+ DWORD dwTimeout, dwEvents;
+ WSAEVENT recvEvent, pendingEvent;
+ socklen_t socklen = sizeof(int);
+
+ recvEvent = WSACreateEvent ();
+ pgm_getsockopt (async->sock, IPPROTO_PGM, PGM_RECV_SOCK, &recv_sock, &socklen);
+ WSAEventSelect (recv_sock, recvEvent, FD_READ);
+ pendingEvent = WSACreateEvent ();
+ pgm_getsockopt (async->sock, IPPROTO_PGM, PGM_PENDING_SOCK, &pending_sock, &socklen);
+ WSAEventSelect (pending_sock, pendingEvent, FD_READ);
+
+ waitHandles[0] = async->destroy_event;
+ waitHandles[1] = recvEvent;
+ waitHandles[2] = pendingEvent;
+#endif /* !_WIN32 */
+
+/* dispatch loop */
+ do {
+ struct timeval tv;
+ char buffer[4096];
+ size_t len;
+ struct pgm_sockaddr_t from;
+ socklen_t fromlen = sizeof (from);
+ const int status = pgm_recvfrom (async->sock,
+ buffer,
+ sizeof(buffer),
+ 0,
+ &len,
+ &from,
+ &fromlen,
+ NULL);
+ switch (status) {
+ case PGM_IO_STATUS_NORMAL:
+ on_data (async, buffer, len, &from, fromlen);
+ break;
+ case PGM_IO_STATUS_TIMER_PENDING:
+ {
+ socklen_t optlen = sizeof (tv);
+ pgm_getsockopt (async->sock, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &optlen);
+ }
+ goto block;
+ case PGM_IO_STATUS_RATE_LIMITED:
+ {
+ socklen_t optlen = sizeof (tv);
+ pgm_getsockopt (async->sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen);
+ }
+ case PGM_IO_STATUS_WOULD_BLOCK:
+/* select for next event */
+block:
+#ifndef _WIN32
+ fds = async->destroy_pipe[0] + 1;
+ FD_ZERO(&readfds);
+ FD_SET(async->destroy_pipe[0], &readfds);
+ pgm_select_info (async->sock, &readfds, NULL, &fds);
+ fds = select (fds, &readfds, NULL, NULL, PGM_IO_STATUS_WOULD_BLOCK == status ? NULL : &tv);
+#else
+ dwTimeout = PGM_IO_STATUS_WOULD_BLOCK == status ? INFINITE : (DWORD)((tv.tv_sec * 1000) + (tv.
+tv_usec / 1000));
+ dwEvents = WaitForMultipleObjects (n_handles, waitHandles, FALSE, dwTimeout);
+ switch (dwEvents) {
+ case WAIT_OBJECT_0+1: WSAResetEvent (recvEvent); break;
+ case WAIT_OBJECT_0+2: WSAResetEvent (pendingEvent); break;
+ default: break;
+ }
+#endif /* !_WIN32 */
+ break;
+
+ default:
+ if (PGM_IO_STATUS_ERROR == status)
+ break;
+ }
+ } while (!async->is_destroyed);
+
+/* cleanup */
+#ifndef _WIN32
+ return NULL;
+#else
+ WSACloseEvent (recvEvent);
+ WSACloseEvent (pendingEvent);
+ _endthread();
+ return 0;
+#endif /* !_WIN32 */
+}
+
+/* enqueue a new data event.
+ */
+
+static
+void
+on_data (
+ async_t*const restrict async,
+ const void* restrict data,
+ const size_t len,
+ const struct pgm_sockaddr_t* restrict from,
+ const socklen_t fromlen
+ )
+{
+ struct async_event_t* event = async_event_alloc (len);
+ memcpy (&event->addr, from, fromlen);
+ memcpy (&event->data, data, len);
+#ifndef _WIN32
+ pthread_mutex_lock (&async->pthread_mutex);
+ async_push_event (async, event);
+ if (1 == async->length) {
+ const char one = '1';
+ const size_t writelen = write (async->notify_pipe[1], &one, sizeof(one));
+ assert (sizeof(one) == writelen);
+ }
+ pthread_mutex_unlock (&async->pthread_mutex);
+#else
+ WaitForSingleObject (async->win32_mutex, INFINITE);
+ async_push_event (async, event);
+ if (1 == async->length) {
+ SetEvent (async->notify_event);
+ }
+ ReleaseMutex (async->win32_mutex);
+#endif /* _WIN32 */
+}
+
+/* create asynchronous thread handler from bound PGM sock.
+ *
+ * on success, 0 is returned. on error, -1 is returned, and errno set appropriately.
+ */
+
+int
+async_create (
+ async_t** restrict async,
+ pgm_sock_t* const restrict sock
+ )
+{
+ async_t* new_async;
+
+ if (NULL == async || NULL == sock) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ new_async = (async_t*)calloc (1, sizeof(async_t));
+ new_async->sock = sock;
+#ifndef _WIN32
+ int e;
+ e = pthread_mutex_init (&new_async->pthread_mutex, NULL);
+ if (0 != e) goto err_destroy;
+ e = pipe (new_async->notify_pipe);
+ const int flags = fcntl (new_async->notify_pipe[0], F_GETFL);
+ fcntl (new_async->notify_pipe[0], F_SETFL, flags | O_NONBLOCK);
+ if (0 != e) goto err_destroy;
+ e = pipe (new_async->destroy_pipe);
+ if (0 != e) goto err_destroy;
+ const int status = pthread_create (&new_async->thread, NULL, &receiver_routine, new_async);
+ if (0 != status) goto err_destroy;
+#else
+ new_async->win32_mutex = CreateMutex (NULL, FALSE, NULL);
+ new_async->notify_event = CreateEvent (NULL, TRUE, FALSE, TEXT("AsyncNotify"));
+ new_async->destroy_event = CreateEvent (NULL, TRUE, FALSE, TEXT("AsyncDestroy"));
+ new_async->thread = (HANDLE)_beginthreadex (NULL, 0, &receiver_routine, new_async, 0, NULL);
+ if (0 == new_async->thread) goto err_destroy;
+#endif /* _WIN32 */
+
+/* return new object */
+ *async = new_async;
+ return 0;
+
+err_destroy:
+#ifndef _WIN32
+ close (new_async->destroy_pipe[0]);
+ close (new_async->destroy_pipe[1]);
+ close (new_async->notify_pipe[0]);
+ close (new_async->notify_pipe[1]);
+ pthread_mutex_destroy (&new_async->pthread_mutex);
+#else
+ CloseHandle (new_async->destroy_event);
+ CloseHandle (new_async->notify_event);
+ CloseHandle (new_async->win32_mutex);
+#endif /* _WIN32 */
+ if (new_async)
+ free (new_async);
+ return -1;
+}
+
+/* Destroy asynchronous receiver, there must be no active queue consumer.
+ *
+ * on success, 0 is returned, on error -1 is returned and errno set appropriately.
+ */
+
+int
+async_destroy (
+ async_t* const async
+ )
+{
+ if (NULL == async || async->is_destroyed) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ async->is_destroyed = TRUE;
+#ifndef _WIN32
+ const char one = '1';
+ const size_t writelen = write (async->destroy_pipe[1], &one, sizeof(one));
+ assert (sizeof(one) == writelen);
+ pthread_join (async->thread, NULL);
+ close (async->destroy_pipe[0]);
+ close (async->destroy_pipe[1]);
+ close (async->notify_pipe[0]);
+ close (async->notify_pipe[1]);
+ pthread_mutex_destroy (&async->pthread_mutex);
+#else
+ SetEvent (async->destroy_event);
+ WaitForSingleObject (async->thread, INFINITE);
+ CloseHandle (async->thread);
+ CloseHandle (async->destroy_event);
+ CloseHandle (async->notify_event);
+ CloseHandle (async->win32_mutex);
+#endif /* !_WIN32 */
+ while (async->head) {
+ struct async_event_t *next = async->head->next;
+ async_event_unref (async->head);
+ async->head = next;
+ async->length--;
+ }
+ free (async);
+ return 0;
+}
+
+/* synchronous reading from the queue.
+ *
+ * returns GIOStatus with success, error, again, or eof.
+ */
+
+ssize_t
+async_recvfrom (
+ async_t* const restrict async,
+ void* restrict buf,
+ size_t len,
+ struct pgm_sockaddr_t* restrict from,
+ socklen_t* restrict fromlen
+ )
+{
+ struct async_event_t* event;
+
+ if (NULL == async || NULL == buf || async->is_destroyed) {
+ errno = EINVAL;
+ return -1;
+ }
+
+#ifndef _WIN32
+ pthread_mutex_lock (&async->pthread_mutex);
+ if (0 == async->length) {
+/* flush event pipe */
+ char tmp;
+ while (sizeof(tmp) == read (async->notify_pipe[0], &tmp, sizeof(tmp)));
+ pthread_mutex_unlock (&async->pthread_mutex);
+ errno = EAGAIN;
+ return -1;
+ }
+ event = async_pop_event (async);
+ pthread_mutex_unlock (&async->pthread_mutex);
+#else
+ WaitForSingleObject (async->win32_mutex, INFINITE);
+ if (0 == async->length) {
+/* clear event */
+ ResetEvent (async->notify_event);
+ ReleaseMutex (async->win32_mutex);
+ errno = EAGAIN;
+ return -1;
+ }
+ event = async_pop_event (async);
+ ReleaseMutex (async->win32_mutex);
+#endif /* _WIN32 */
+ assert (NULL != event);
+
+/* pass data back to callee */
+ const size_t event_len = MIN(event->len, len);
+ if (NULL != from && sizeof(struct pgm_sockaddr_t) == *fromlen) {
+ memcpy (from, &event->addr, *fromlen);
+ }
+ memcpy (buf, event->data, event_len);
+ async_event_unref (event);
+ return event_len;
+}
+
+ssize_t
+async_recv (
+ async_t* const restrict async,
+ void* restrict buf,
+ size_t len
+ )
+{
+ return async_recvfrom (async, buf, len, NULL, NULL);
+}
+
+/* eof */