summaryrefslogblamecommitdiffstats
path: root/server/query.c
blob: 59d1864b3b69b8dd655ac17d87b5d0411e068d4d (plain) (tree)




























































































































































































































































































































































                                                                                
/*
 * query.c - request/reply handling for the server
 * Copyright (C) 2006 Thorsten Zitterell <thorsten@zitterell.de>
 */
 
#include <stdio.h>		
#include <pthread.h>		
#include <stdlib.h>	
#include <string.h>

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <linux/types.h>
#include <unistd.h>
#include <time.h>

#define DNBD_USERSPACE		1
#include "../common/dnbd-cliserv.h"

#include "query.h"

/* number of threads used to service requests */
#define NUM_HANDLER_THREADS	1		/* default */
#define MAX_BLOCK_SIZE		4096

struct query_thread {
	query_info_t *query_info;
	int id;
	pthread_t p_thread;
};

struct query_thread query_thread[NUM_HANDLER_THREADS];

/* recursive global mutex for our program. */
pthread_mutex_t query_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
/* mutex to avoid concurrent file access */
pthread_mutex_t handler_mutex = PTHREAD_MUTEX_INITIALIZER;

/* global condition variable for our program. */
pthread_cond_t got_query = PTHREAD_COND_INITIALIZER;

int num_queries = 0;		/* number of pending requests, initially none */
int max_queries = 100;		/* this value should be high enough */

query_t *queries = NULL;	/* head of linked list of requests. */
int last_query = 0;		/* initial position in circular buffer */
int next_query = 0;		


void query_handle(struct query_info *query_info, query_t * query);

/* 
 * function query_add_loop(): add incoming requests to circular buffer 
 */
void *query_add_loop(void *data)
{
	int rc;
	query_t *query;
	query_info_t *query_info = (query_info_t *) data;

	int tmp_query;

	while (1) {

		rc = pthread_mutex_lock(&query_mutex);
		tmp_query = (next_query + 1) % max_queries;
		rc = pthread_mutex_unlock(&query_mutex);

		if (tmp_query == last_query)
			continue;

		query = &queries[next_query];

		/* loop until a proper request arrives */
		while (!net_rx(query_info->net_info, &query->request)) {}

		rc = pthread_mutex_lock(&query_mutex);

		next_query = tmp_query;

		/* increase total number of pending requests */
		num_queries++;

		rc = pthread_mutex_unlock(&query_mutex);

		/* signal that there's a new request to handle */
		rc = pthread_cond_signal(&got_query);
	}
}

/*
 * function: query_get(): fetch request from circular buffer
 * returns: pointer to request
 */
query_t *query_get(pthread_mutex_t * p_mutex)
{
	int rc;		
	query_t *query;		/* pointer to request */

	rc = pthread_mutex_lock(p_mutex);

	if (last_query == next_query)
		return NULL;

	query = &queries[last_query];

	last_query = (last_query + 1) % max_queries;
	num_queries--;

	rc = pthread_mutex_unlock(p_mutex);
	/* return the request to the caller */
	return query;
}

/*
 * function query_handle(): handle a single request.
 */
void query_handle(struct query_info *query_info, query_t * query)
{
	int i, rc;
	dnbd_request_t *dnbd_request;
	dnbd_request_t *dnbd_old_request;
	dnbd_reply_t *dnbd_reply = NULL;
	struct dnbd_reply_init *dnbd_reply_init;
	int tmp_query;
	int recent = 0;
	time_t timestamp;

	dnbd_request = (dnbd_request_t *) & query->request.data;

	query->reply.len = 0;

	/* convert data from network to host byte order */
	dnbd_request->magic = ntohl(dnbd_request->magic);
	dnbd_request->time = ntohs(dnbd_request->time);
	dnbd_request->id = ntohs(dnbd_request->id);
	dnbd_request->cmd = ntohs(dnbd_request->cmd);
	dnbd_request->pos = ntohll(dnbd_request->pos);
	dnbd_request->len = ntohs(dnbd_request->len);

	if (dnbd_request->magic != DNBD_MAGIC)
		return;

	/* we ususally only respond to a client */
	if (!(dnbd_request->cmd & DNBD_CMD_CLI))
		return;

	/* does the client ask for our id? */
	if (dnbd_request->id && (dnbd_request->id != query_info->id))
		return;

	switch (dnbd_request->cmd & DNBD_CMD_MASK) {
	/* handle init request */
	case DNBD_CMD_INIT:
	/* handle heartbeat request */
	case DNBD_CMD_HB:
		dnbd_reply_init =
		    (struct dnbd_reply_init *) query->reply.data;
		dnbd_reply_init->magic = htonl(DNBD_MAGIC);

		dnbd_reply_init->capacity =
		    htonll(filer_getcapacity(query_info->filer_info));

		dnbd_reply_init->cmd =
		    htons((dnbd_request->cmd
			   & ~DNBD_CMD_CLI) | DNBD_CMD_SRV);

		dnbd_reply_init->blksize = htons(MAX_BLOCK_SIZE);
		dnbd_reply_init->id = htons(query_info->id);

		query->reply.len = sizeof(struct dnbd_reply_init);

		net_tx(query_info->net_info, &query->reply);
		break;
	/* handle read request */
	case DNBD_CMD_READ:
		timestamp = time(NULL);
	
		/* burst avoidance */
		rc = pthread_mutex_lock(&query_mutex);
		for (i = 2; i < max_queries; i++) {

			tmp_query =
			    (last_query + (max_queries - i)) % max_queries;
				
			if (tmp_query == last_query)
				break;

			/* check only up to one second */
			if (!tmp_query
			    || queries[tmp_query].time - timestamp > 1) {
				break;
			}
			dnbd_old_request =
			    (dnbd_request_t *) & queries[tmp_query].
			    request.data;

			/* someone requested the same block before? */
			if (dnbd_request->pos == dnbd_old_request->pos) {
				/* was it the same client, then retransmit
				as the packet was probably lost, otherwise
				drop the request */
				if (!((query->request.clientlen ==
				     queries[tmp_query].request.clientlen)
				    &&
				    (!memcmp
				     (&query->request.client,
				      &queries[tmp_query].request.client,
				      query->request.clientlen)))) {
					recent = 1;
					break;
				}
				else
					break;
			}
		} 
		rc = pthread_mutex_unlock(&query_mutex);

		if (recent)
			break;

		/* size of request block too high? */
		if (dnbd_request->len > MAX_BLOCK_SIZE)
			break;

		/* create a DNBD reply packet */
		dnbd_reply = (dnbd_reply_t *) query->reply.data;

		dnbd_reply->magic = htonl(DNBD_MAGIC);
		dnbd_reply->time = htons(dnbd_request->time);
		dnbd_reply->id = htons(query_info->id);
		dnbd_reply->pos = htonll(dnbd_request->pos);

		dnbd_reply->cmd =
		    htons((dnbd_request->cmd
			   & ~DNBD_CMD_CLI) | DNBD_CMD_SRV);

		/* read from underlying device/file */
		pthread_mutex_lock(&handler_mutex);
		filer_readblock(query_info->filer_info,
				(void *) dnbd_reply +
				sizeof(struct dnbd_reply),
				dnbd_request->len, dnbd_request->pos);

		pthread_mutex_unlock(&handler_mutex);

		query->reply.len =
		    dnbd_request->len + sizeof(dnbd_reply_t);

		query->time = time(NULL);

		/* send reply */
		net_tx(query_info->net_info, &query->reply);
		break;
	}


}

/*
 * function query_handle_loop(): get queries and handle them in a loop
 */
void *query_handle_loop(void *data)
{
	int rc;			
	query_t *query;				/* pointer to a request */
	int thread_id = *((int *) data);	/* thread id */

	printf("Starting thread '%d'\n", thread_id);
	fflush(stdout);

	rc = pthread_mutex_lock(&query_mutex);

	/* do forever.... */
	while (1) {

		if (num_queries > 0) {	
			/* a request is pending */
			query = query_get(&query_mutex);

			/* got a request? */
			if (query) {	
				
				rc = pthread_mutex_unlock(&query_mutex);
				/* handle request */
				query_handle(query_thread[thread_id].
					     query_info, query);

				rc = pthread_mutex_lock(&query_mutex);
			}
		} else {
			/* wait for a request to arrive */
			rc = pthread_cond_wait(&got_query, &query_mutex);
		}
	}
}

/*
 * function query_init(): initialize request handling
 * returns: pointer to data structure query_info (see header file)
 */
query_info_t *query_init(net_info_t * net_info, filer_info_t * filer_info,
			 int id, int threads)
{
	int i;
	query_info_t *query_info = NULL;

	query_info = (query_info_t *) malloc(sizeof(query_info_t));
	if (!query_info)
		return NULL;

	/* fill query_info structure */
	query_info->net_info = net_info;
	query_info->filer_info = filer_info;
	query_info->id = id;

	if (!(queries = (query_t *) malloc(sizeof(query_t) * max_queries))) {
		free(query_info);
		return NULL;
	}

	last_query = 0;
	next_query = 0;

	/* reserve memory for circular buffer */
	for (i = 0; i < max_queries; i++) {
		queries[i].reply.data =
		    malloc(MAX_BLOCK_SIZE + sizeof(dnbd_reply_t));
	}

	/* create the request-handling threads */
	for (i = 0; i < threads; i++) {

		query_thread[i].id = i;
		query_thread[i].query_info = query_info;

		pthread_create(&query_thread[i].p_thread, NULL,
			       query_handle_loop,
			       (void *) &query_thread[i].id);
	}

	/* create thread for receiving network requests */
	pthread_create(&query_info->p_thread, NULL,
		       query_add_loop, (void *) query_info);

	return query_info;
}