/* * query.c - request/reply handling for the server * Copyright (C) 2006 Thorsten Zitterell */ #include #include #include #include #include #include #include #include #include #include #define DNBD_USERSPACE 1 #include "../common/dnbd-cliserv.h" #include "query.h" /* number of threads used to service requests */ #define NUM_HANDLER_THREADS 1 /* default */ #define MAX_BLOCK_SIZE 4096 struct query_thread { query_info_t *query_info; int id; pthread_t p_thread; }; struct query_thread query_thread[NUM_HANDLER_THREADS]; /* recursive global mutex for our program. */ pthread_mutex_t query_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; /* mutex to avoid concurrent file access */ pthread_mutex_t handler_mutex = PTHREAD_MUTEX_INITIALIZER; /* global condition variable for our program. */ pthread_cond_t got_query = PTHREAD_COND_INITIALIZER; int num_queries = 0; /* number of pending requests, initially none */ int max_queries = 100; /* this value should be high enough */ query_t *queries = NULL; /* head of linked list of requests. */ int last_query = 0; /* initial position in circular buffer */ int next_query = 0; void query_handle(struct query_info *query_info, query_t * query); /* * function query_add_loop(): add incoming requests to circular buffer */ void *query_add_loop(void *data) { int rc; query_t *query; query_info_t *query_info = (query_info_t *) data; int tmp_query; while (1) { rc = pthread_mutex_lock(&query_mutex); tmp_query = (next_query + 1) % max_queries; rc = pthread_mutex_unlock(&query_mutex); if (tmp_query == last_query) continue; query = &queries[next_query]; /* loop until a proper request arrives */ while (!net_rx(query_info->net_info, &query->request)) {} rc = pthread_mutex_lock(&query_mutex); next_query = tmp_query; /* increase total number of pending requests */ num_queries++; rc = pthread_mutex_unlock(&query_mutex); /* signal that there's a new request to handle */ rc = pthread_cond_signal(&got_query); } } /* * function: query_get(): fetch request from circular buffer * returns: pointer to request */ query_t *query_get(pthread_mutex_t * p_mutex) { int rc; query_t *query; /* pointer to request */ rc = pthread_mutex_lock(p_mutex); if (last_query == next_query) return NULL; query = &queries[last_query]; last_query = (last_query + 1) % max_queries; num_queries--; rc = pthread_mutex_unlock(p_mutex); /* return the request to the caller */ return query; } /* * function query_handle(): handle a single request. */ void query_handle(struct query_info *query_info, query_t * query) { int i, rc; dnbd_request_t *dnbd_request; dnbd_request_t *dnbd_old_request; dnbd_reply_t *dnbd_reply = NULL; struct dnbd_reply_init *dnbd_reply_init; int tmp_query; int recent = 0; time_t timestamp; dnbd_request = (dnbd_request_t *) & query->request.data; query->reply.len = 0; /* convert data from network to host byte order */ dnbd_request->magic = ntohl(dnbd_request->magic); dnbd_request->time = ntohs(dnbd_request->time); dnbd_request->id = ntohs(dnbd_request->id); dnbd_request->cmd = ntohs(dnbd_request->cmd); dnbd_request->pos = ntohll(dnbd_request->pos); dnbd_request->len = ntohs(dnbd_request->len); if (dnbd_request->magic != DNBD_MAGIC) return; /* we ususally only respond to a client */ if (!(dnbd_request->cmd & DNBD_CMD_CLI)) return; /* does the client ask for our id? */ if (dnbd_request->id && (dnbd_request->id != query_info->id)) return; switch (dnbd_request->cmd & DNBD_CMD_MASK) { /* handle init request */ case DNBD_CMD_INIT: /* handle heartbeat request */ case DNBD_CMD_HB: dnbd_reply_init = (struct dnbd_reply_init *) query->reply.data; dnbd_reply_init->magic = htonl(DNBD_MAGIC); dnbd_reply_init->capacity = htonll(filer_getcapacity(query_info->filer_info)); dnbd_reply_init->cmd = htons((dnbd_request->cmd & ~DNBD_CMD_CLI) | DNBD_CMD_SRV); dnbd_reply_init->blksize = htons(MAX_BLOCK_SIZE); dnbd_reply_init->id = htons(query_info->id); query->reply.len = sizeof(struct dnbd_reply_init); net_tx(query_info->net_info, &query->reply); break; /* handle read request */ case DNBD_CMD_READ: timestamp = time(NULL); /* burst avoidance */ rc = pthread_mutex_lock(&query_mutex); for (i = 2; i < max_queries; i++) { tmp_query = (last_query + (max_queries - i)) % max_queries; if (tmp_query == last_query) break; /* check only up to one second */ if (!tmp_query || queries[tmp_query].time - timestamp > 1) { break; } dnbd_old_request = (dnbd_request_t *) & queries[tmp_query]. request.data; /* someone requested the same block before? */ if (dnbd_request->pos == dnbd_old_request->pos) { /* was it the same client, then retransmit as the packet was probably lost, otherwise drop the request */ if (!((query->request.clientlen == queries[tmp_query].request.clientlen) && (!memcmp (&query->request.client, &queries[tmp_query].request.client, query->request.clientlen)))) { recent = 1; break; } else break; } } rc = pthread_mutex_unlock(&query_mutex); if (recent) break; /* size of request block too high? */ if (dnbd_request->len > MAX_BLOCK_SIZE) break; /* create a DNBD reply packet */ dnbd_reply = (dnbd_reply_t *) query->reply.data; dnbd_reply->magic = htonl(DNBD_MAGIC); dnbd_reply->time = htons(dnbd_request->time); dnbd_reply->id = htons(query_info->id); dnbd_reply->pos = htonll(dnbd_request->pos); dnbd_reply->cmd = htons((dnbd_request->cmd & ~DNBD_CMD_CLI) | DNBD_CMD_SRV); /* read from underlying device/file */ pthread_mutex_lock(&handler_mutex); filer_readblock(query_info->filer_info, (void *) dnbd_reply + sizeof(struct dnbd_reply), dnbd_request->len, dnbd_request->pos); pthread_mutex_unlock(&handler_mutex); query->reply.len = dnbd_request->len + sizeof(dnbd_reply_t); query->time = time(NULL); /* send reply */ net_tx(query_info->net_info, &query->reply); break; } } /* * function query_handle_loop(): get queries and handle them in a loop */ void *query_handle_loop(void *data) { int rc; query_t *query; /* pointer to a request */ int thread_id = *((int *) data); /* thread id */ printf("Starting thread '%d'\n", thread_id); fflush(stdout); rc = pthread_mutex_lock(&query_mutex); /* do forever.... */ while (1) { if (num_queries > 0) { /* a request is pending */ query = query_get(&query_mutex); /* got a request? */ if (query) { rc = pthread_mutex_unlock(&query_mutex); /* handle request */ query_handle(query_thread[thread_id]. query_info, query); rc = pthread_mutex_lock(&query_mutex); } } else { /* wait for a request to arrive */ rc = pthread_cond_wait(&got_query, &query_mutex); } } } /* * function query_init(): initialize request handling * returns: pointer to data structure query_info (see header file) */ query_info_t *query_init(net_info_t * net_info, filer_info_t * filer_info, int id, int threads) { int i; query_info_t *query_info = NULL; query_info = (query_info_t *) malloc(sizeof(query_info_t)); if (!query_info) return NULL; /* fill query_info structure */ query_info->net_info = net_info; query_info->filer_info = filer_info; query_info->id = id; if (!(queries = (query_t *) malloc(sizeof(query_t) * max_queries))) { free(query_info); return NULL; } last_query = 0; next_query = 0; /* reserve memory for circular buffer */ for (i = 0; i < max_queries; i++) { queries[i].reply.data = malloc(MAX_BLOCK_SIZE + sizeof(dnbd_reply_t)); } /* create the request-handling threads */ for (i = 0; i < threads; i++) { query_thread[i].id = i; query_thread[i].query_info = query_info; pthread_create(&query_thread[i].p_thread, NULL, query_handle_loop, (void *) &query_thread[i].id); } /* create thread for receiving network requests */ pthread_create(&query_info->p_thread, NULL, query_add_loop, (void *) query_info); return query_info; }