summaryrefslogtreecommitdiffstats
path: root/server/query.c
diff options
context:
space:
mode:
authorLars Müller2008-03-01 19:30:38 +0100
committerLars Müller2008-03-01 19:30:38 +0100
commit868fec1f8eca7c344fc9ac057b7418331299d9ce (patch)
treec07cdf11db710dc495c3c7a513cc0f8fd68d6626 /server/query.c
downloaddnbd-868fec1f8eca7c344fc9ac057b7418331299d9ce.tar.gz
dnbd-868fec1f8eca7c344fc9ac057b7418331299d9ce.tar.xz
dnbd-868fec1f8eca7c344fc9ac057b7418331299d9ce.zip
Import dnbd* from the former openslx-contrib repo as of revision 92.
openslx-contrib is currently read only and will get removed in some days. git-svn-id: http://svn.openslx.org/svn/openslx/contrib/dnbd/trunk@1592 95ad53e4-c205-0410-b2fa-d234c58c8868
Diffstat (limited to 'server/query.c')
-rw-r--r--server/query.c349
1 files changed, 349 insertions, 0 deletions
diff --git a/server/query.c b/server/query.c
new file mode 100644
index 0000000..59d1864
--- /dev/null
+++ b/server/query.c
@@ -0,0 +1,349 @@
+/*
+ * query.c - request/reply handling for the server
+ * Copyright (C) 2006 Thorsten Zitterell <thorsten@zitterell.de>
+ */
+
+#include <stdio.h>
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include <linux/types.h>
+#include <unistd.h>
+#include <time.h>
+
+#define DNBD_USERSPACE 1
+#include "../common/dnbd-cliserv.h"
+
+#include "query.h"
+
+/* number of threads used to service requests */
+#define NUM_HANDLER_THREADS 1 /* default */
+#define MAX_BLOCK_SIZE 4096
+
+struct query_thread {
+ query_info_t *query_info;
+ int id;
+ pthread_t p_thread;
+};
+
+struct query_thread query_thread[NUM_HANDLER_THREADS];
+
+/* recursive global mutex for our program. */
+pthread_mutex_t query_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
+/* mutex to avoid concurrent file access */
+pthread_mutex_t handler_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+/* global condition variable for our program. */
+pthread_cond_t got_query = PTHREAD_COND_INITIALIZER;
+
+int num_queries = 0; /* number of pending requests, initially none */
+int max_queries = 100; /* this value should be high enough */
+
+query_t *queries = NULL; /* head of linked list of requests. */
+int last_query = 0; /* initial position in circular buffer */
+int next_query = 0;
+
+
+void query_handle(struct query_info *query_info, query_t * query);
+
+/*
+ * function query_add_loop(): add incoming requests to circular buffer
+ */
+void *query_add_loop(void *data)
+{
+ int rc;
+ query_t *query;
+ query_info_t *query_info = (query_info_t *) data;
+
+ int tmp_query;
+
+ while (1) {
+
+ rc = pthread_mutex_lock(&query_mutex);
+ tmp_query = (next_query + 1) % max_queries;
+ rc = pthread_mutex_unlock(&query_mutex);
+
+ if (tmp_query == last_query)
+ continue;
+
+ query = &queries[next_query];
+
+ /* loop until a proper request arrives */
+ while (!net_rx(query_info->net_info, &query->request)) {}
+
+ rc = pthread_mutex_lock(&query_mutex);
+
+ next_query = tmp_query;
+
+ /* increase total number of pending requests */
+ num_queries++;
+
+ rc = pthread_mutex_unlock(&query_mutex);
+
+ /* signal that there's a new request to handle */
+ rc = pthread_cond_signal(&got_query);
+ }
+}
+
+/*
+ * function: query_get(): fetch request from circular buffer
+ * returns: pointer to request
+ */
+query_t *query_get(pthread_mutex_t * p_mutex)
+{
+ int rc;
+ query_t *query; /* pointer to request */
+
+ rc = pthread_mutex_lock(p_mutex);
+
+ if (last_query == next_query)
+ return NULL;
+
+ query = &queries[last_query];
+
+ last_query = (last_query + 1) % max_queries;
+ num_queries--;
+
+ rc = pthread_mutex_unlock(p_mutex);
+ /* return the request to the caller */
+ return query;
+}
+
+/*
+ * function query_handle(): handle a single request.
+ */
+void query_handle(struct query_info *query_info, query_t * query)
+{
+ int i, rc;
+ dnbd_request_t *dnbd_request;
+ dnbd_request_t *dnbd_old_request;
+ dnbd_reply_t *dnbd_reply = NULL;
+ struct dnbd_reply_init *dnbd_reply_init;
+ int tmp_query;
+ int recent = 0;
+ time_t timestamp;
+
+ dnbd_request = (dnbd_request_t *) & query->request.data;
+
+ query->reply.len = 0;
+
+ /* convert data from network to host byte order */
+ dnbd_request->magic = ntohl(dnbd_request->magic);
+ dnbd_request->time = ntohs(dnbd_request->time);
+ dnbd_request->id = ntohs(dnbd_request->id);
+ dnbd_request->cmd = ntohs(dnbd_request->cmd);
+ dnbd_request->pos = ntohll(dnbd_request->pos);
+ dnbd_request->len = ntohs(dnbd_request->len);
+
+ if (dnbd_request->magic != DNBD_MAGIC)
+ return;
+
+ /* we ususally only respond to a client */
+ if (!(dnbd_request->cmd & DNBD_CMD_CLI))
+ return;
+
+ /* does the client ask for our id? */
+ if (dnbd_request->id && (dnbd_request->id != query_info->id))
+ return;
+
+ switch (dnbd_request->cmd & DNBD_CMD_MASK) {
+ /* handle init request */
+ case DNBD_CMD_INIT:
+ /* handle heartbeat request */
+ case DNBD_CMD_HB:
+ dnbd_reply_init =
+ (struct dnbd_reply_init *) query->reply.data;
+ dnbd_reply_init->magic = htonl(DNBD_MAGIC);
+
+ dnbd_reply_init->capacity =
+ htonll(filer_getcapacity(query_info->filer_info));
+
+ dnbd_reply_init->cmd =
+ htons((dnbd_request->cmd
+ & ~DNBD_CMD_CLI) | DNBD_CMD_SRV);
+
+ dnbd_reply_init->blksize = htons(MAX_BLOCK_SIZE);
+ dnbd_reply_init->id = htons(query_info->id);
+
+ query->reply.len = sizeof(struct dnbd_reply_init);
+
+ net_tx(query_info->net_info, &query->reply);
+ break;
+ /* handle read request */
+ case DNBD_CMD_READ:
+ timestamp = time(NULL);
+
+ /* burst avoidance */
+ rc = pthread_mutex_lock(&query_mutex);
+ for (i = 2; i < max_queries; i++) {
+
+ tmp_query =
+ (last_query + (max_queries - i)) % max_queries;
+
+ if (tmp_query == last_query)
+ break;
+
+ /* check only up to one second */
+ if (!tmp_query
+ || queries[tmp_query].time - timestamp > 1) {
+ break;
+ }
+ dnbd_old_request =
+ (dnbd_request_t *) & queries[tmp_query].
+ request.data;
+
+ /* someone requested the same block before? */
+ if (dnbd_request->pos == dnbd_old_request->pos) {
+ /* was it the same client, then retransmit
+ as the packet was probably lost, otherwise
+ drop the request */
+ if (!((query->request.clientlen ==
+ queries[tmp_query].request.clientlen)
+ &&
+ (!memcmp
+ (&query->request.client,
+ &queries[tmp_query].request.client,
+ query->request.clientlen)))) {
+ recent = 1;
+ break;
+ }
+ else
+ break;
+ }
+ }
+ rc = pthread_mutex_unlock(&query_mutex);
+
+ if (recent)
+ break;
+
+ /* size of request block too high? */
+ if (dnbd_request->len > MAX_BLOCK_SIZE)
+ break;
+
+ /* create a DNBD reply packet */
+ dnbd_reply = (dnbd_reply_t *) query->reply.data;
+
+ dnbd_reply->magic = htonl(DNBD_MAGIC);
+ dnbd_reply->time = htons(dnbd_request->time);
+ dnbd_reply->id = htons(query_info->id);
+ dnbd_reply->pos = htonll(dnbd_request->pos);
+
+ dnbd_reply->cmd =
+ htons((dnbd_request->cmd
+ & ~DNBD_CMD_CLI) | DNBD_CMD_SRV);
+
+ /* read from underlying device/file */
+ pthread_mutex_lock(&handler_mutex);
+ filer_readblock(query_info->filer_info,
+ (void *) dnbd_reply +
+ sizeof(struct dnbd_reply),
+ dnbd_request->len, dnbd_request->pos);
+
+ pthread_mutex_unlock(&handler_mutex);
+
+ query->reply.len =
+ dnbd_request->len + sizeof(dnbd_reply_t);
+
+ query->time = time(NULL);
+
+ /* send reply */
+ net_tx(query_info->net_info, &query->reply);
+ break;
+ }
+
+
+}
+
+/*
+ * function query_handle_loop(): get queries and handle them in a loop
+ */
+void *query_handle_loop(void *data)
+{
+ int rc;
+ query_t *query; /* pointer to a request */
+ int thread_id = *((int *) data); /* thread id */
+
+ printf("Starting thread '%d'\n", thread_id);
+ fflush(stdout);
+
+ rc = pthread_mutex_lock(&query_mutex);
+
+ /* do forever.... */
+ while (1) {
+
+ if (num_queries > 0) {
+ /* a request is pending */
+ query = query_get(&query_mutex);
+
+ /* got a request? */
+ if (query) {
+
+ rc = pthread_mutex_unlock(&query_mutex);
+ /* handle request */
+ query_handle(query_thread[thread_id].
+ query_info, query);
+
+ rc = pthread_mutex_lock(&query_mutex);
+ }
+ } else {
+ /* wait for a request to arrive */
+ rc = pthread_cond_wait(&got_query, &query_mutex);
+ }
+ }
+}
+
+/*
+ * function query_init(): initialize request handling
+ * returns: pointer to data structure query_info (see header file)
+ */
+query_info_t *query_init(net_info_t * net_info, filer_info_t * filer_info,
+ int id, int threads)
+{
+ int i;
+ query_info_t *query_info = NULL;
+
+ query_info = (query_info_t *) malloc(sizeof(query_info_t));
+ if (!query_info)
+ return NULL;
+
+ /* fill query_info structure */
+ query_info->net_info = net_info;
+ query_info->filer_info = filer_info;
+ query_info->id = id;
+
+ if (!(queries = (query_t *) malloc(sizeof(query_t) * max_queries))) {
+ free(query_info);
+ return NULL;
+ }
+
+ last_query = 0;
+ next_query = 0;
+
+ /* reserve memory for circular buffer */
+ for (i = 0; i < max_queries; i++) {
+ queries[i].reply.data =
+ malloc(MAX_BLOCK_SIZE + sizeof(dnbd_reply_t));
+ }
+
+ /* create the request-handling threads */
+ for (i = 0; i < threads; i++) {
+
+ query_thread[i].id = i;
+ query_thread[i].query_info = query_info;
+
+ pthread_create(&query_thread[i].p_thread, NULL,
+ query_handle_loop,
+ (void *) &query_thread[i].id);
+ }
+
+ /* create thread for receiving network requests */
+ pthread_create(&query_info->p_thread, NULL,
+ query_add_loop, (void *) query_info);
+
+ return query_info;
+}