From 868fec1f8eca7c344fc9ac057b7418331299d9ce Mon Sep 17 00:00:00 2001 From: Lars Müller Date: Sat, 1 Mar 2008 18:30:38 +0000 Subject: Import dnbd* from the former openslx-contrib repo as of revision 92. openslx-contrib is currently read only and will get removed in some days. git-svn-id: http://svn.openslx.org/svn/openslx/contrib/dnbd/trunk@1592 95ad53e4-c205-0410-b2fa-d234c58c8868 --- server/query.c | 349 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 349 insertions(+) create mode 100644 server/query.c (limited to 'server/query.c') diff --git a/server/query.c b/server/query.c new file mode 100644 index 0000000..59d1864 --- /dev/null +++ b/server/query.c @@ -0,0 +1,349 @@ +/* + * query.c - request/reply handling for the server + * Copyright (C) 2006 Thorsten Zitterell + */ + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#define DNBD_USERSPACE 1 +#include "../common/dnbd-cliserv.h" + +#include "query.h" + +/* number of threads used to service requests */ +#define NUM_HANDLER_THREADS 1 /* default */ +#define MAX_BLOCK_SIZE 4096 + +struct query_thread { + query_info_t *query_info; + int id; + pthread_t p_thread; +}; + +struct query_thread query_thread[NUM_HANDLER_THREADS]; + +/* recursive global mutex for our program. */ +pthread_mutex_t query_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; +/* mutex to avoid concurrent file access */ +pthread_mutex_t handler_mutex = PTHREAD_MUTEX_INITIALIZER; + +/* global condition variable for our program. */ +pthread_cond_t got_query = PTHREAD_COND_INITIALIZER; + +int num_queries = 0; /* number of pending requests, initially none */ +int max_queries = 100; /* this value should be high enough */ + +query_t *queries = NULL; /* head of linked list of requests. */ +int last_query = 0; /* initial position in circular buffer */ +int next_query = 0; + + +void query_handle(struct query_info *query_info, query_t * query); + +/* + * function query_add_loop(): add incoming requests to circular buffer + */ +void *query_add_loop(void *data) +{ + int rc; + query_t *query; + query_info_t *query_info = (query_info_t *) data; + + int tmp_query; + + while (1) { + + rc = pthread_mutex_lock(&query_mutex); + tmp_query = (next_query + 1) % max_queries; + rc = pthread_mutex_unlock(&query_mutex); + + if (tmp_query == last_query) + continue; + + query = &queries[next_query]; + + /* loop until a proper request arrives */ + while (!net_rx(query_info->net_info, &query->request)) {} + + rc = pthread_mutex_lock(&query_mutex); + + next_query = tmp_query; + + /* increase total number of pending requests */ + num_queries++; + + rc = pthread_mutex_unlock(&query_mutex); + + /* signal that there's a new request to handle */ + rc = pthread_cond_signal(&got_query); + } +} + +/* + * function: query_get(): fetch request from circular buffer + * returns: pointer to request + */ +query_t *query_get(pthread_mutex_t * p_mutex) +{ + int rc; + query_t *query; /* pointer to request */ + + rc = pthread_mutex_lock(p_mutex); + + if (last_query == next_query) + return NULL; + + query = &queries[last_query]; + + last_query = (last_query + 1) % max_queries; + num_queries--; + + rc = pthread_mutex_unlock(p_mutex); + /* return the request to the caller */ + return query; +} + +/* + * function query_handle(): handle a single request. + */ +void query_handle(struct query_info *query_info, query_t * query) +{ + int i, rc; + dnbd_request_t *dnbd_request; + dnbd_request_t *dnbd_old_request; + dnbd_reply_t *dnbd_reply = NULL; + struct dnbd_reply_init *dnbd_reply_init; + int tmp_query; + int recent = 0; + time_t timestamp; + + dnbd_request = (dnbd_request_t *) & query->request.data; + + query->reply.len = 0; + + /* convert data from network to host byte order */ + dnbd_request->magic = ntohl(dnbd_request->magic); + dnbd_request->time = ntohs(dnbd_request->time); + dnbd_request->id = ntohs(dnbd_request->id); + dnbd_request->cmd = ntohs(dnbd_request->cmd); + dnbd_request->pos = ntohll(dnbd_request->pos); + dnbd_request->len = ntohs(dnbd_request->len); + + if (dnbd_request->magic != DNBD_MAGIC) + return; + + /* we ususally only respond to a client */ + if (!(dnbd_request->cmd & DNBD_CMD_CLI)) + return; + + /* does the client ask for our id? */ + if (dnbd_request->id && (dnbd_request->id != query_info->id)) + return; + + switch (dnbd_request->cmd & DNBD_CMD_MASK) { + /* handle init request */ + case DNBD_CMD_INIT: + /* handle heartbeat request */ + case DNBD_CMD_HB: + dnbd_reply_init = + (struct dnbd_reply_init *) query->reply.data; + dnbd_reply_init->magic = htonl(DNBD_MAGIC); + + dnbd_reply_init->capacity = + htonll(filer_getcapacity(query_info->filer_info)); + + dnbd_reply_init->cmd = + htons((dnbd_request->cmd + & ~DNBD_CMD_CLI) | DNBD_CMD_SRV); + + dnbd_reply_init->blksize = htons(MAX_BLOCK_SIZE); + dnbd_reply_init->id = htons(query_info->id); + + query->reply.len = sizeof(struct dnbd_reply_init); + + net_tx(query_info->net_info, &query->reply); + break; + /* handle read request */ + case DNBD_CMD_READ: + timestamp = time(NULL); + + /* burst avoidance */ + rc = pthread_mutex_lock(&query_mutex); + for (i = 2; i < max_queries; i++) { + + tmp_query = + (last_query + (max_queries - i)) % max_queries; + + if (tmp_query == last_query) + break; + + /* check only up to one second */ + if (!tmp_query + || queries[tmp_query].time - timestamp > 1) { + break; + } + dnbd_old_request = + (dnbd_request_t *) & queries[tmp_query]. + request.data; + + /* someone requested the same block before? */ + if (dnbd_request->pos == dnbd_old_request->pos) { + /* was it the same client, then retransmit + as the packet was probably lost, otherwise + drop the request */ + if (!((query->request.clientlen == + queries[tmp_query].request.clientlen) + && + (!memcmp + (&query->request.client, + &queries[tmp_query].request.client, + query->request.clientlen)))) { + recent = 1; + break; + } + else + break; + } + } + rc = pthread_mutex_unlock(&query_mutex); + + if (recent) + break; + + /* size of request block too high? */ + if (dnbd_request->len > MAX_BLOCK_SIZE) + break; + + /* create a DNBD reply packet */ + dnbd_reply = (dnbd_reply_t *) query->reply.data; + + dnbd_reply->magic = htonl(DNBD_MAGIC); + dnbd_reply->time = htons(dnbd_request->time); + dnbd_reply->id = htons(query_info->id); + dnbd_reply->pos = htonll(dnbd_request->pos); + + dnbd_reply->cmd = + htons((dnbd_request->cmd + & ~DNBD_CMD_CLI) | DNBD_CMD_SRV); + + /* read from underlying device/file */ + pthread_mutex_lock(&handler_mutex); + filer_readblock(query_info->filer_info, + (void *) dnbd_reply + + sizeof(struct dnbd_reply), + dnbd_request->len, dnbd_request->pos); + + pthread_mutex_unlock(&handler_mutex); + + query->reply.len = + dnbd_request->len + sizeof(dnbd_reply_t); + + query->time = time(NULL); + + /* send reply */ + net_tx(query_info->net_info, &query->reply); + break; + } + + +} + +/* + * function query_handle_loop(): get queries and handle them in a loop + */ +void *query_handle_loop(void *data) +{ + int rc; + query_t *query; /* pointer to a request */ + int thread_id = *((int *) data); /* thread id */ + + printf("Starting thread '%d'\n", thread_id); + fflush(stdout); + + rc = pthread_mutex_lock(&query_mutex); + + /* do forever.... */ + while (1) { + + if (num_queries > 0) { + /* a request is pending */ + query = query_get(&query_mutex); + + /* got a request? */ + if (query) { + + rc = pthread_mutex_unlock(&query_mutex); + /* handle request */ + query_handle(query_thread[thread_id]. + query_info, query); + + rc = pthread_mutex_lock(&query_mutex); + } + } else { + /* wait for a request to arrive */ + rc = pthread_cond_wait(&got_query, &query_mutex); + } + } +} + +/* + * function query_init(): initialize request handling + * returns: pointer to data structure query_info (see header file) + */ +query_info_t *query_init(net_info_t * net_info, filer_info_t * filer_info, + int id, int threads) +{ + int i; + query_info_t *query_info = NULL; + + query_info = (query_info_t *) malloc(sizeof(query_info_t)); + if (!query_info) + return NULL; + + /* fill query_info structure */ + query_info->net_info = net_info; + query_info->filer_info = filer_info; + query_info->id = id; + + if (!(queries = (query_t *) malloc(sizeof(query_t) * max_queries))) { + free(query_info); + return NULL; + } + + last_query = 0; + next_query = 0; + + /* reserve memory for circular buffer */ + for (i = 0; i < max_queries; i++) { + queries[i].reply.data = + malloc(MAX_BLOCK_SIZE + sizeof(dnbd_reply_t)); + } + + /* create the request-handling threads */ + for (i = 0; i < threads; i++) { + + query_thread[i].id = i; + query_thread[i].query_info = query_info; + + pthread_create(&query_thread[i].p_thread, NULL, + query_handle_loop, + (void *) &query_thread[i].id); + } + + /* create thread for receiving network requests */ + pthread_create(&query_info->p_thread, NULL, + query_add_loop, (void *) query_info); + + return query_info; +} -- cgit v1.2.3-55-g7522