diff options
-rw-r--r-- | COPYING | 340 | ||||
-rw-r--r-- | CREDITS | 10 | ||||
-rw-r--r-- | Makefile | 18 | ||||
-rw-r--r-- | README | 153 | ||||
-rw-r--r-- | client/Makefile | 15 | ||||
-rw-r--r-- | client/client.c | 549 | ||||
-rw-r--r-- | client/client.h | 7 | ||||
-rw-r--r-- | common/dnbd-cliserv.h | 103 | ||||
-rw-r--r-- | common/dnbd.h | 18 | ||||
-rw-r--r-- | kernel/Makefile | 18 | ||||
-rw-r--r-- | kernel/cache.c | 389 | ||||
-rw-r--r-- | kernel/cache.h | 49 | ||||
-rw-r--r-- | kernel/dnbd.h | 57 | ||||
-rw-r--r-- | kernel/main.c | 1264 | ||||
-rw-r--r-- | kernel/net.c | 248 | ||||
-rw-r--r-- | kernel/net.h | 73 | ||||
-rw-r--r-- | kernel/queue.c | 126 | ||||
-rw-r--r-- | kernel/queue.h | 29 | ||||
-rw-r--r-- | server/Makefile | 16 | ||||
-rw-r--r-- | server/filer.c | 120 | ||||
-rw-r--r-- | server/filer.h | 19 | ||||
-rw-r--r-- | server/net.c | 147 | ||||
-rw-r--r-- | server/net.h | 51 | ||||
-rw-r--r-- | server/query.c | 349 | ||||
-rw-r--r-- | server/query.h | 42 | ||||
-rw-r--r-- | server/server.c | 216 | ||||
-rw-r--r-- | server/server.h | 21 |
27 files changed, 4447 insertions, 0 deletions
@@ -0,0 +1,340 @@ + GNU GENERAL PUBLIC LICENSE + Version 2, June 1991 + + Copyright (C) 1989, 1991 Free Software Foundation, Inc. + 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The licenses for most software are designed to take away your +freedom to share and change it. By contrast, the GNU General Public +License is intended to guarantee your freedom to share and change free +software--to make sure the software is free for all its users. This +General Public License applies to most of the Free Software +Foundation's software and to any other program whose authors commit to +using it. (Some other Free Software Foundation software is covered by +the GNU Library General Public License instead.) You can apply it to +your programs, too. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +this service if you wish), that you receive source code or can get it +if you want it, that you can change the software or use pieces of it +in new free programs; and that you know you can do these things. + + To protect your rights, we need to make restrictions that forbid +anyone to deny you these rights or to ask you to surrender the rights. +These restrictions translate to certain responsibilities for you if you +distribute copies of the software, or if you modify it. + + For example, if you distribute copies of such a program, whether +gratis or for a fee, you must give the recipients all the rights that +you have. You must make sure that they, too, receive or can get the +source code. And you must show them these terms so they know their +rights. + + We protect your rights with two steps: (1) copyright the software, and +(2) offer you this license which gives you legal permission to copy, +distribute and/or modify the software. + + Also, for each author's protection and ours, we want to make certain +that everyone understands that there is no warranty for this free +software. If the software is modified by someone else and passed on, we +want its recipients to know that what they have is not the original, so +that any problems introduced by others will not reflect on the original +authors' reputations. + + Finally, any free program is threatened constantly by software +patents. We wish to avoid the danger that redistributors of a free +program will individually obtain patent licenses, in effect making the +program proprietary. To prevent this, we have made it clear that any +patent must be licensed for everyone's free use or not licensed at all. + + The precise terms and conditions for copying, distribution and +modification follow. + + GNU GENERAL PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. This License applies to any program or other work which contains +a notice placed by the copyright holder saying it may be distributed +under the terms of this General Public License. The "Program", below, +refers to any such program or work, and a "work based on the Program" +means either the Program or any derivative work under copyright law: +that is to say, a work containing the Program or a portion of it, +either verbatim or with modifications and/or translated into another +language. (Hereinafter, translation is included without limitation in +the term "modification".) Each licensee is addressed as "you". + +Activities other than copying, distribution and modification are not +covered by this License; they are outside its scope. The act of +running the Program is not restricted, and the output from the Program +is covered only if its contents constitute a work based on the +Program (independent of having been made by running the Program). +Whether that is true depends on what the Program does. + + 1. You may copy and distribute verbatim copies of the Program's +source code as you receive it, in any medium, provided that you +conspicuously and appropriately publish on each copy an appropriate +copyright notice and disclaimer of warranty; keep intact all the +notices that refer to this License and to the absence of any warranty; +and give any other recipients of the Program a copy of this License +along with the Program. + +You may charge a fee for the physical act of transferring a copy, and +you may at your option offer warranty protection in exchange for a fee. + + 2. You may modify your copy or copies of the Program or any portion +of it, thus forming a work based on the Program, and copy and +distribute such modifications or work under the terms of Section 1 +above, provided that you also meet all of these conditions: + + a) You must cause the modified files to carry prominent notices + stating that you changed the files and the date of any change. + + b) You must cause any work that you distribute or publish, that in + whole or in part contains or is derived from the Program or any + part thereof, to be licensed as a whole at no charge to all third + parties under the terms of this License. + + c) If the modified program normally reads commands interactively + when run, you must cause it, when started running for such + interactive use in the most ordinary way, to print or display an + announcement including an appropriate copyright notice and a + notice that there is no warranty (or else, saying that you provide + a warranty) and that users may redistribute the program under + these conditions, and telling the user how to view a copy of this + License. (Exception: if the Program itself is interactive but + does not normally print such an announcement, your work based on + the Program is not required to print an announcement.) + +These requirements apply to the modified work as a whole. If +identifiable sections of that work are not derived from the Program, +and can be reasonably considered independent and separate works in +themselves, then this License, and its terms, do not apply to those +sections when you distribute them as separate works. But when you +distribute the same sections as part of a whole which is a work based +on the Program, the distribution of the whole must be on the terms of +this License, whose permissions for other licensees extend to the +entire whole, and thus to each and every part regardless of who wrote it. + +Thus, it is not the intent of this section to claim rights or contest +your rights to work written entirely by you; rather, the intent is to +exercise the right to control the distribution of derivative or +collective works based on the Program. + +In addition, mere aggregation of another work not based on the Program +with the Program (or with a work based on the Program) on a volume of +a storage or distribution medium does not bring the other work under +the scope of this License. + + 3. You may copy and distribute the Program (or a work based on it, +under Section 2) in object code or executable form under the terms of +Sections 1 and 2 above provided that you also do one of the following: + + a) Accompany it with the complete corresponding machine-readable + source code, which must be distributed under the terms of Sections + 1 and 2 above on a medium customarily used for software interchange; or, + + b) Accompany it with a written offer, valid for at least three + years, to give any third party, for a charge no more than your + cost of physically performing source distribution, a complete + machine-readable copy of the corresponding source code, to be + distributed under the terms of Sections 1 and 2 above on a medium + customarily used for software interchange; or, + + c) Accompany it with the information you received as to the offer + to distribute corresponding source code. (This alternative is + allowed only for noncommercial distribution and only if you + received the program in object code or executable form with such + an offer, in accord with Subsection b above.) + +The source code for a work means the preferred form of the work for +making modifications to it. For an executable work, complete source +code means all the source code for all modules it contains, plus any +associated interface definition files, plus the scripts used to +control compilation and installation of the executable. However, as a +special exception, the source code distributed need not include +anything that is normally distributed (in either source or binary +form) with the major components (compiler, kernel, and so on) of the +operating system on which the executable runs, unless that component +itself accompanies the executable. + +If distribution of executable or object code is made by offering +access to copy from a designated place, then offering equivalent +access to copy the source code from the same place counts as +distribution of the source code, even though third parties are not +compelled to copy the source along with the object code. + + 4. You may not copy, modify, sublicense, or distribute the Program +except as expressly provided under this License. Any attempt +otherwise to copy, modify, sublicense or distribute the Program is +void, and will automatically terminate your rights under this License. +However, parties who have received copies, or rights, from you under +this License will not have their licenses terminated so long as such +parties remain in full compliance. + + 5. You are not required to accept this License, since you have not +signed it. However, nothing else grants you permission to modify or +distribute the Program or its derivative works. These actions are +prohibited by law if you do not accept this License. Therefore, by +modifying or distributing the Program (or any work based on the +Program), you indicate your acceptance of this License to do so, and +all its terms and conditions for copying, distributing or modifying +the Program or works based on it. + + 6. Each time you redistribute the Program (or any work based on the +Program), the recipient automatically receives a license from the +original licensor to copy, distribute or modify the Program subject to +these terms and conditions. You may not impose any further +restrictions on the recipients' exercise of the rights granted herein. +You are not responsible for enforcing compliance by third parties to +this License. + + 7. If, as a consequence of a court judgment or allegation of patent +infringement or for any other reason (not limited to patent issues), +conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot +distribute so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you +may not distribute the Program at all. For example, if a patent +license would not permit royalty-free redistribution of the Program by +all those who receive copies directly or indirectly through you, then +the only way you could satisfy both it and this License would be to +refrain entirely from distribution of the Program. + +If any portion of this section is held invalid or unenforceable under +any particular circumstance, the balance of the section is intended to +apply and the section as a whole is intended to apply in other +circumstances. + +It is not the purpose of this section to induce you to infringe any +patents or other property right claims or to contest validity of any +such claims; this section has the sole purpose of protecting the +integrity of the free software distribution system, which is +implemented by public license practices. Many people have made +generous contributions to the wide range of software distributed +through that system in reliance on consistent application of that +system; it is up to the author/donor to decide if he or she is willing +to distribute software through any other system and a licensee cannot +impose that choice. + +This section is intended to make thoroughly clear what is believed to +be a consequence of the rest of this License. + + 8. If the distribution and/or use of the Program is restricted in +certain countries either by patents or by copyrighted interfaces, the +original copyright holder who places the Program under this License +may add an explicit geographical distribution limitation excluding +those countries, so that distribution is permitted only in or among +countries not thus excluded. In such case, this License incorporates +the limitation as if written in the body of this License. + + 9. The Free Software Foundation may publish revised and/or new versions +of the General Public License from time to time. Such new versions will +be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + +Each version is given a distinguishing version number. If the Program +specifies a version number of this License which applies to it and "any +later version", you have the option of following the terms and conditions +either of that version or of any later version published by the Free +Software Foundation. If the Program does not specify a version number of +this License, you may choose any version ever published by the Free Software +Foundation. + + 10. If you wish to incorporate parts of the Program into other free +programs whose distribution conditions are different, write to the author +to ask for permission. For software which is copyrighted by the Free +Software Foundation, write to the Free Software Foundation; we sometimes +make exceptions for this. Our decision will be guided by the two goals +of preserving the free status of all derivatives of our free software and +of promoting the sharing and reuse of software generally. + + NO WARRANTY + + 11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY +FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN +OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES +PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED +OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS +TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE +PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, +REPAIR OR CORRECTION. + + 12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR +REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, +INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING +OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED +TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY +YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER +PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +convey the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + <one line to give the program's name and a brief idea of what it does.> + Copyright (C) <year> <name of author> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + + +Also add information on how to contact you by electronic and paper mail. + +If the program is interactive, make it output a short notice like this +when it starts in an interactive mode: + + Gnomovision version 69, Copyright (C) year name of author + Gnomovision comes with ABSOLUTELY NO WARRANTY; for details type `show w'. + This is free software, and you are welcome to redistribute it + under certain conditions; type `show c' for details. + +The hypothetical commands `show w' and `show c' should show the appropriate +parts of the General Public License. Of course, the commands you use may +be called something other than `show w' and `show c'; they could even be +mouse-clicks or menu items--whatever suits your program. + +You should also get your employer (if you work as a programmer) or your +school, if any, to sign a "copyright disclaimer" for the program, if +necessary. Here is a sample; alter the names: + + Yoyodyne, Inc., hereby disclaims all copyright interest in the program + `Gnomovision' (which makes passes at compilers) written by James Hacker. + + <signature of Ty Coon>, 1 April 1989 + Ty Coon, President of Vice + +This General Public License does not permit incorporating your program into +proprietary programs. If your program is a subroutine library, you may +consider it more useful to permit linking proprietary applications with the +library. If this is what you want to do, use the GNU Library General +Public License instead of this License. @@ -0,0 +1,10 @@ +My thank is addressed to the developers of the + +* network block device (NBD) - http://nbd.sourceforge.net/ + - Pavel Machek + - Steven Whitehouse + +* another network block device (ANBD) - http://www.aros.net/~ldl/anbd/ + - Louis D. Langholtz + +for their valuable work. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..fc279e2 --- /dev/null +++ b/Makefile @@ -0,0 +1,18 @@ +SUBDIRS = kernel client server +SRCDIR = . +TO_TAR = ./{Makefile,README,COPYING,CREDITS,{kernel,client,server}/{*.c,*.h,Makefile},common/*.h} + +all-recursive: + for dir in $(SUBDIRS); do cd $$dir; $(MAKE) all; cd ..; done + +all-clean: + for dir in $(SUBDIRS); do cd $$dir; $(MAKE) clean; cd ..; done + +all: all-recursive + +clean: all-clean + +tar: + cd $(SRCDIR) + dirname=`basename $(PWD)`; cd ..; \ + tar czfv $$dirname.tar.gz $$dirname/$(TO_TAR) @@ -0,0 +1,153 @@ +INTRODUCTION +============ + +DNBD (Distributed Network Block Device) is a read-only and caching network +block device and supports following main features: + +* replication of servers for robustness +* multicast communication and caching of network traffic for scalability + +These characteristics make it suitable especially for use in wireless networks, +e.g. for diskless clients or to share multimedia files in such an environment. +The servers can export a file or block device equipped with a operating system, +movies, music, etc. Several clients can import the block device and access it +like a local hard disk. However, each block transfer over the network can be +cached by all clients: If several users on each client start to watch a movie +within a certain time interval, the movie data has to be transmitted only once +(depending on the cache size). The network is not burdened with unnecessary +traffic. +DNBD can be used together with cowloop [1] or unionfs [2] in order to get local +write semantics, e.g. for diskless clients. Especially in wireless environments +with limited bandwidth, caching can increase boot-up time enormously. + +COMPILATION +=========== + +DNBD was developed for kernel 2.6.13 and later releases. Kernel 2.4 is not +supported. The kernel sources and common tools (gcc, make, etc.) have to be +installed. + +Server and Client +-------------------- +Unpacking: +$ tar xzvf dnbd.tar.gz + +Compiling: +$ cd dnbd; make + +USAGE +===== + +Server +------ +To show available command line parameters, start the server without +arguments: + +$ ./server/dnbd-server +dnbd-server, version 0.9.0 +Usage: dnbd-server -m <address> -d <device/file> -i <number> + [-t <threads>] + +description: + -m|--mcast <multicast address> + -d|--device <block device or file> + -i|--id <unique identification number> + -t|--threads <number of threads> + +With the following command, the server will be started for the multicast +network with address 239.0.0.1 and export the given file or block device. +Its unique id is 1: + +root@server1 $ ./server/dnbd-server -m 239.0.0.1 -d <partition/file> -i 1 + +To start a server on another computer, the used file or block device must have +the same content and size as on the first server. However, the id has to be +changed: + +root@server2 $ ./server/dnbd-server -m 239.0.0.1 -d <partition/file> -i 2 + +If DNBD is used for wired networks and on multi-processor machines, the +number of threads should be increased to the number of CPUs. + +To access the exported file or block device, another computer is used as +client. + +Client +------ + +The kernel module has to be loaded, before the client application can be used: + +root@client1 $ insmod ./kernel/dnbd.ko + +There should be an entry in syslog after successful loading. With no command +line arguments the client gives available options: + +root@client1 $ ./client/dnbd-client +dnbd-client, version 0.9.0 +Usage: dnbd-client -d device -b <address> [-c <file>] + or dnbd-client -d device -u + or dnbd-client -d device -c <file> + +description: + -d|--device <device> + -b|--bind <multicast-address> + -u|--unbind + -c|--cache <file> + +We will now import the block device of the server, e.g.: + +root@client1 $ ./client/dnbd-client -d /dev/dnbd0 -b 239.0.0.1 + +The client should tell you that it found a server with id "1". If you exported +a CDROM with a movie, you can watch it on the client over the network, e.g. +with mplayer (usually after mounting). + +If someone else wants to watch the movie on a different client, you should +enable caching either during operation + +root@client1 $ ./client/dnbd-client -d /dev/dnbd0 -c <cachefile> + +or at the beginning + +root@client2 $ ./client/dnbd-client -d /dev/dnbd0 -b 239.0.0.1 -c <cachefile> + +To create a cache with, e.g. 32M use + +root@client1$ dd if=/dev/zero of=/tmp/cachefile bs=1M count=32 + +Cache statistics are shown with + +root@client1$ cat /proc/driver/dnbd/dnbd0 + +The block device has to be unbound before the module can be unloaded: + +root@client1 $ ./client/dnbd-client -d /dev/dnbd0 -u +root@client1 $ rmmod dnbd + +FILES +===== + +Client +------ +./client + client.c # client application + +Server +------ +./server + net.c # network routines + query.c # server request handling + filer.c # file/device I/O + server.c # server application (main file) + +Kernel module +------------- +./kernel + net.c # server management + queue.c # queue handling for requests + cache.c # cache implementation (red-black trees) + main.c # module and block device (un)registration, threads + + +[1] http://www.atconsultancy.nl/cowloop/ +[2] http://www.fsl.cs.sunysb.edu/project-unionfs.html diff --git a/client/Makefile b/client/Makefile new file mode 100644 index 0000000..ed11db3 --- /dev/null +++ b/client/Makefile @@ -0,0 +1,15 @@ +CLIENT_BIN = dnbd-client +CLIENT_SRC = client.c + +BINS = $(CLIENT_BIN) + +CFLAGS = -Wall -D_GNU_SOURCE -D_LARGEFILE64_SOURCE -D_FILE_OFFSET_BITS=64 -O2 + +$(CLIENT_BIN): + $(CC) $(CFLAGS) -o $@ $(CLIENT_SRC) + +all: $(BINS) + +.PHONY: +clean: + -$(RM) *.o $(BINS) *~ diff --git a/client/client.c b/client/client.c new file mode 100644 index 0000000..790606f --- /dev/null +++ b/client/client.c @@ -0,0 +1,549 @@ +/* + * client.c - controlling application for block device driver + * Copyright (C) 2006 Thorsten Zitterell <thorsten@zitterell.de> + */ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <getopt.h> +#include <errno.h> +#include <time.h> + +/* network includes */ +#include <netdb.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <sys/poll.h> + + +/* file operations */ +#include <sys/time.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include <unistd.h> +#include <sys/ioctl.h> + +#include <linux/kdev_t.h> + +#define DNBD_USERSPACE 1 +#include "../common/dnbd-cliserv.h" +#include "client.h" + +/* device driver setup information */ +struct client_s { + const char *mnetname; /* multicast address */ + const char *cachefile; /* name of cache file */ + struct sockaddr_in mca_adr; /* multicast address */ + int mca_len; /* and its byte length */ + const char *devicename; /* name of the device */ + int dnbd; /* file descriptor of dnbd device */ + int port; /* used port for multicast */ + int sock; /* socket descriptor */ + uint64_t capacity; /* capacity of device */ + uint16_t blksize; /* blocksize of device */ +}; + +typedef struct client_s client_t; + +/* beeing more verbose, if necessary */ +static int verbose = 0; + +/* structure of request to server */ +struct dnbd_request request; + +/* + * function daemonize(): forks our process that it can run in background + * returns: 1 on success, otherwise 0; + */ +int daemonize(void) +{ + pid_t pid; + + pid = fork(); + + if (pid > 0) { + exit(0); + } + if (pid < 0) { + fprintf(stderr, "fork() failed!\n"); + return 0; + } + return 1; +} + +/* + * function open_dnbd(): open the block device and do some prechecking + * returns: file descriptor of block device + */ +int open_dnbd(client_t * client) +{ + int dnbd; + struct stat statbuf; + + /* try to open the network block device */ + if ((dnbd = open(client->devicename, O_RDONLY)) < 0) { + if (errno == ENXIO) + fprintf(stderr, + "ERROR: make sure dnbd module is loaded!\n"); + fprintf(stderr, + "ERROR: Dnbd devide could not be opened!\n"); + return -EINVAL; + } + + /* error, if we cannot get status of file */ + if (fstat(dnbd, &statbuf) == -1) { + fprintf(stderr, "ERROR: Can not stat dnbd!\n"); + return -EINVAL; + } + + /* error, if it is not a block device */ + if (!(S_ISBLK(statbuf.st_mode))) + return -EINVAL; + + return dnbd; +} + +/* + * function open_net(): configures network parameters + * returns: socket descriptor of multicast net (int) + */ +int open_net(client_t * client) +{ + int sock; + const int ttl = 64; /* a TTL of 64 for multicast should be enough */ + struct ip_mreq mreq; + u_char loop = 0; /* multicast looping is disabled */ + + /* zero multicast address and convert to appropriate type */ + memset(&client->mca_adr, 0, sizeof(client->mca_adr)); + if (inet_aton(client->mnetname, &client->mca_adr.sin_addr) < 0) { + fprintf(stderr, "ERROR: Wrong multicast address \"%s\"!", + client->mnetname); + return -EINVAL; + } + + /* configure multicast socket */ + client->mca_adr.sin_family = AF_INET; + client->mca_adr.sin_port = htons(client->port); + if ((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { + fprintf(stderr, "ERROR: Socket creation failed!\n"); + return -EINVAL; + } + + /* bind socket */ + if (bind + (sock, (struct sockaddr *) &client->mca_adr, + sizeof(client->mca_adr)) < 0) { + fprintf(stderr, "ERROR: Socket bind failed!\n"); + return -EINVAL; + } + + /* setup multicast, join multicast group, set TTL and disable looping */ + if (inet_aton(client->mnetname, &mreq.imr_multiaddr) < 0) { + fprintf(stderr, "ERROR: Wrong multicast address \"%s\"!", + client->mnetname); + return -EINVAL; + } + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + if (setsockopt + (sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, + sizeof(mreq)) < 0) { + fprintf(stderr, "ERROR: Adding multicast membership\n"); + return -1; + } + if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL, + &ttl, sizeof(ttl)) < 0) { + fprintf(stderr, "ERROR: Setting TTL to %i\n",ttl); + return -1; + } + setsockopt(sock, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, + sizeof(loop)); + + + return sock; +} + +/* + * function do_handshake(): send init requests to the network and wait for + * server replies + * returns: 0 on success, otherwise POSIX error code + */ +int do_handshake(client_t * client) +{ + int result; + struct pollfd read_fds[1]; + struct dnbd_request request; + int replylen; + struct dnbd_reply_init reply_init; + time_t starttime, stoptime; + int cycle = 1; + int servers = 0; + + client->capacity = 0; + client->blksize = 0; + + /* for network socket polls */ + read_fds[0].fd = client->sock; + read_fds[0].events = POLLIN; + read_fds[0].revents = 0; + + /* request comes from a client and is addressed to all servers */ + request.magic = htonl(DNBD_MAGIC); + request.cmd = htons(DNBD_CMD_INIT | DNBD_CMD_CLI); + request.id = htons(0); /* ask all servers */ + + /* send requests (in 1 second intervals) */ + /* and wait DNBD_TIMEOUT seconds for replies */ + printf("Searching for servers...\n"); + starttime = time(NULL); + (void) sendto(client->sock, &request, sizeof(request), + 0, (struct sockaddr *) &client->mca_adr, + sizeof(client->mca_adr)); + + + while (cycle) { + + stoptime = time(NULL); + + /* timeout after DNBD_TIMEOUT seconds */ + if ((stoptime - starttime) > DNBD_TIMEOUT) + break; + + /* wait for reply or send a request each second */ + if ((result = poll(read_fds, 1, 1000)) != 1) { + (void) sendto(client->sock, &request, + sizeof(request), 0, + (struct sockaddr *) &client->mca_adr, + sizeof(client->mca_adr)); + continue; + } + + /* handle reply */ + replylen = + recv(client->sock, &reply_init, + sizeof(struct dnbd_reply_init), MSG_WAITALL); + + /* check for integrity */ + if (replylen != sizeof(struct dnbd_reply_init)) + continue; + reply_init.magic = ntohl(reply_init.magic); + if (reply_init.magic != DNBD_MAGIC) { + continue; + } + reply_init.cmd = ntohs(reply_init.cmd); + if (!((reply_init.cmd & DNBD_CMD_SRV) && + ((reply_init.cmd & DNBD_CMD_MASK) == DNBD_CMD_INIT))) + continue; + + /* copy parameters of block device from reply */ + reply_init.id = ntohs(reply_init.id); + reply_init.capacity = ntohll(reply_init.capacity); + reply_init.blksize = ntohs(reply_init.blksize); + + /* add server to block device servers */ + if ((result = + ioctl(client->dnbd, DNBD_SET_SERVERID, + reply_init.id)) < 0) { + if (errno == EEXIST) + continue; + else + fprintf(stderr, + "ERROR: ioctl DNBD_SET_SERVERID failed!\n"); + return -EINVAL; + } else { + printf("* Added server with id %i\n", + reply_init.id); + + client->capacity = reply_init.capacity; + client->blksize = reply_init.blksize; + servers++; + } + + } + + /* check, if servers have been found */ + if (servers) { + printf("Capacity of device is %llu, blksize is %i\n", + client->capacity, client->blksize); + result = 0; + } else { + if (ioctl(client->dnbd, DNBD_DISCONNECT) < 0) { + fprintf(stderr, + "ERROR: ioctl DNBD_DISCONNECT failed!\n"); + } + fprintf(stderr, "No servers found!\n"); + result = -ECONNABORTED; + } + + return result; +} + +/* + * function do_bind(): open block devicename, bind network, do handshake, + * set block device configuration (size, blocksize), + * start session, fork and go to background + * returns: 1 when session finished, otherwise POSIX error code + */ +int do_bind(client_t * client) +{ + struct dnbd_file cfile; + + /* open block device */ + if ((client->dnbd = open_dnbd(client)) < 0) + return -EINVAL; + printf("DNBD device successfully set.\n"); + + /* bind network */ + if ((client->sock = open_net(client)) < 0) + return -EINVAL; + fprintf(stdout, "Socket successfully opened.\n"); + + /* configure block device */ + if (ioctl(client->dnbd, DNBD_SET_SOCK, client->sock) < 0) { + close(client->sock); + fprintf(stderr, "ERROR: ioctl DNBD_SET_SOCKET failed!\n"); + return -EINVAL; + } + if (ioctl(client->dnbd, DNBD_SET_GROUPNET, &client->mca_adr) < 0) { + fprintf(stderr, + "ERROR: ioctl DNBD_SET_GROUPNET failed!\n"); + return -EINVAL; + } + fprintf(stdout, "Multicast address successfully set to %s.\n", + inet_ntoa(client->mca_adr.sin_addr)); + + /* start handshake */ + if (do_handshake(client) < 0) + return -EINVAL; + + /* set block size and capacity of device */ + if (ioctl(client->dnbd, DNBD_SET_BLKSIZE, client->blksize) < 0) { + fprintf(stderr, "ERROR: ioctl DNBD_SET_BLKSIZE failed!\n"); + return -EINVAL; + } + if (ioctl(client->dnbd, DNBD_SET_CAPACITY, &client->capacity) < 0) { + fprintf(stderr, "ERROR: ioctl DNBD_SET_SIZE failed!\n"); + return -EINVAL; + } + + /* activate cache, if necessary */ + if (client->cachefile) { + cfile.name = client->cachefile; + cfile.len = strlen(client->cachefile); + if (ioctl(client->dnbd, DNBD_SET_CACHE, &cfile) < 0) { + fprintf(stderr, + "ERROR: ioctl DNBD_SET_CACHE failed!\n"); + return -EINVAL; + } + printf("Cachefile successfully set.\n"); + } + + /* go to background */ + if (!daemonize()) + return -ECHILD; + if (ioctl(client->dnbd, DNBD_DO_IT) < 0) { + fprintf(stderr, + "ERROR: ioctl DNBD_DO_IT terminated unexpected!\n"); + } else { + fprintf(stdout, "dnbd terminated.\n"); + } + + return 1; +} + +/* + * function do_unbind(): end session + * returns: 1 on success, otherwise POSIX error code + */ +int do_unbind(client_t * client) +{ + /* open block device */ + if ((client->dnbd = open_dnbd(client)) < 0) + return -EINVAL; + fprintf(stdout, "dnbd device successfully opened.\n"); + + /* send disconnect */ + if (ioctl(client->dnbd, DNBD_DISCONNECT) < 0) { + fprintf(stderr, "ERROR: ioctl DNBD_DISCONNECT failed!\n"); + return -EINVAL; + } + + return 1; +} + +/* + * function: do_setcache(): set cache when block device is already active + * returns: 1 on success, otherwise POSIX error code + */ +int do_setcache(client_t * client) +{ + struct dnbd_file cfile; + if ((client->dnbd = open_dnbd(client)) < 0) + return -EINVAL; + fprintf(stdout, "dnbd device successfully opened.\n"); + + if (client->cachefile) { + cfile.name = client->cachefile; + cfile.len = strlen(client->cachefile); + if (ioctl(client->dnbd, DNBD_SET_CACHE, &cfile) < 0) { + fprintf(stderr, + "ERROR: ioctl DNBD_SET_CACHE failed!\n"); + return -EINVAL; + } + printf("Cachefile successfully set.\n"); + } + + return 1; +} + +/* + * function: client_help + */ +void client_help(void) +{ + fprintf(stderr, "dnbd-client, version %s\n", DNBD_VERSION); + fprintf(stderr, + "Usage: dnbd-client -d device -b <address> [-c <file>]\n"); + fprintf(stderr, " or dnbd-client -d device -u\n"); + fprintf(stderr, " or dnbd-client -d device -c <file>\n"); + fprintf(stderr, "\n"); + fprintf(stderr, "description:\n"); + fprintf(stderr, " -d|--device <device>\n"); + fprintf(stderr, " -b|--bind <multicast-address>\n"); + fprintf(stderr, " -u|--unbind \n"); + fprintf(stderr, " -c|--cache <file>\n"); + fprintf(stderr, "\n"); +} + +/* + * function client_shutdown() + */ +void client_shutdown(client_t * client) +{ + if (client->dnbd > 0) + close(client->dnbd); +} + +/* + * function parseopts(): parse command line options + * returns: command identifier or error <= 0 + * -1: error + * 0: not defined + * 1: bind a block device + * 2: unbind a block devicename + * 3: set cache file + */ +int parseopts(int argc, char **argv, client_t * client) +{ + int cmd = 0, err = 0; + + memset(client, 0, sizeof(client_t)); + client->port = DNBD_PORT; + /* return value for getopt */ + int c; + /* start option parsing */ + while (1) { + static struct option long_options[] = { + {"verbose", no_argument, 0, 'v'}, + {"bind", required_argument, 0, 'b'}, + {"unbind", no_argument, 0, 'u'}, + {"cache", required_argument, 0, 'c'}, + {"device", required_argument, 0, 'd'}, + {0, 0, 0, 0} + }; + /* option index for getopt_long */ + int option_index = 0; + opterr = 0; + c = getopt_long(argc, argv, "b:ud:c:v", + long_options, &option_index); + /* at end of options? */ + if (c == -1) + break; + /* + cmd = (cmd ? -1 : xx) is used to set cmd when it was + unset (0) before. Otherwise save error value + */ + switch (c) { + case 'v': + verbose++; + break; + case 'b': + cmd = (cmd ? -1 : 1); /* bind */ + client->mnetname = optarg; + break; + case 'u': + cmd = (cmd ? -1 : 2); /* unbind */ + break; + case 'd': + cmd = (client->devicename ? -1 : cmd); + client->devicename = optarg; + break; + case 'c': + cmd = (client->cachefile ? -1 : cmd); + client->cachefile = optarg; + break; + case '?': + fprintf(stderr, "ERROR: wrong parameters\n"); + default: + cmd = -1; + } + + } + + /* no/wrong command given? */ + if (cmd <= 0) { + /* set cache file, when not (un)bind */ + if (client->cachefile) + cmd = 3; + else + err = -EINVAL; + } + + if (cmd > 0 && !client->devicename) { + fprintf(stderr, "ERROR: no device given!\n"); + err = -EINVAL; + } + + if (err < 0) { + fprintf(stderr, "\n"); + client_help(); + return -err; + } + + + return (cmd); +} + +/* + * function main(): calls option parser, + * executes subrotine (bind/unbind/set cache) + * returns: 0 on success, otherwise POSIX error code + */ + +int main(int argc, char **argv) +{ + client_t client; + int cmd, err = 0; + cmd = parseopts(argc, argv, &client); + if (cmd < 0) + return 1; + /* call function for command */ + switch (cmd) { + case 1: + err = do_bind(&client); + break; + case 2: + err = do_unbind(&client); + break; + case 3: + err = do_setcache(&client); + break; + } + + client_shutdown(&client); + return err; +} diff --git a/client/client.h b/client/client.h new file mode 100644 index 0000000..952af1b --- /dev/null +++ b/client/client.h @@ -0,0 +1,7 @@ +#ifndef LINUX_DNBD_CLIENT_H +#define LINUX_DNBD_CLIENT_H 1 + +/* timeout in seconds to wait for servers, not yet changeable from user space */ +#define DNBD_TIMEOUT 3 + +#endif diff --git a/common/dnbd-cliserv.h b/common/dnbd-cliserv.h new file mode 100644 index 0000000..cf96767 --- /dev/null +++ b/common/dnbd-cliserv.h @@ -0,0 +1,103 @@ +#ifndef LINUX_DNBD_CLISERV_H +#define LINUX_DNBD_CLISERV_H 1 + +#ifndef MODULE +#include <stdint.h> +#endif + +#ifdef DNBD_USERSPACE +#include <bits/types.h> +#include <netinet/in.h> +#include <endian.h> +#include <byteswap.h> + +/* host byte order <-> network byte order */ +#if __BYTE_ORDER == __BIG_ENDIAN +#define ntohll(x) (x) +#else +#define ntohll(x) bswap_64(x) +#endif + +#else +#include <linux/in.h> +#endif + +#include <linux/types.h> + +/* some constants */ +#define DNBD_VERSION "0.9.0" +#define DNBD_PORT 5001 +#define DNBD_MAGIC 0x19051979 +#define DNBD_MAJOR 0 +#define DNBD_UIDLEN 20 + +/* states */ +#define DNBD_STATE_LOADED 1<<0 +#define DNBD_STATE_CONFIGURED 1<<1 +#define DNBD_STATE_RUNNING 1<<2 + +/* define ioctls */ +#define DNBD_IOCTL_TYPE 0xac +#define DNBD_SET_SOCK _IO( DNBD_IOCTL_TYPE, 0) +#define DNBD_SET_GROUPNET _IO( DNBD_IOCTL_TYPE, 1) +#define DNBD_SET_BLKSIZE _IO( DNBD_IOCTL_TYPE, 2) +#define DNBD_SET_CAPACITY _IO( DNBD_IOCTL_TYPE, 3) +#define DNBD_SET_SERVERID _IO( DNBD_IOCTL_TYPE, 4) +#define DNBD_SET_CACHE _IO( DNBD_IOCTL_TYPE, 5) +#define DNBD_DO_IT _IO( DNBD_IOCTL_TYPE, 6) +#define DNBD_DISCONNECT _IO( DNBD_IOCTL_TYPE, 7) + +/* define communication between server and client */ +#define DNBD_CMD_MASK 0x07 +#define DNBD_CMD_INIT 0x01 +#define DNBD_CMD_READ 0x02 +#define DNBD_CMD_HB 0x03 + +#define DNBD_CMD_CLI 0x08 +#define DNBD_CMD_SRV 0x10 + +#define DNBD_TMR_OUT 0x0a + +/* do not allign variables to 32bit etc.*/ +#pragma pack(1) +struct dnbd_request { + uint32_t magic; + uint16_t id; + uint16_t cmd; + uint64_t pos; + uint16_t time; + uint16_t len; +}; +#pragma pack() + +#pragma pack(1) +struct dnbd_reply { + uint32_t magic; + uint16_t id; + uint16_t cmd; + uint64_t pos; + uint16_t time; +}; +#pragma pack() + +#pragma pack(1) +struct dnbd_reply_init { + uint32_t magic; + uint16_t id; + uint16_t cmd; + uint64_t capacity; + uint16_t time; + uint16_t blksize; +}; +#pragma pack() + +typedef struct dnbd_reply dnbd_reply_t; +typedef struct dnbd_reply_init dnbd_reply_init_t; +typedef struct dnbd_request dnbd_request_t; + +struct dnbd_file { + const char *name; + int len; +}; + +#endif /* LINUX_DNBD_CLISERV_H */ diff --git a/common/dnbd.h b/common/dnbd.h new file mode 100644 index 0000000..eb8de33 --- /dev/null +++ b/common/dnbd.h @@ -0,0 +1,18 @@ +#define DNBD_MAJOR 43 +#define MAX_DNBD 32 + +struct dnbd_device { + int flags; + int harderror; /* Code of hard error */ + struct socket * sock; + struct file * file; /* If == NULL, device is not ready, yet */ + int magic; + spinlock_t queue_lock; + struct list_head queue_head;/* Requests are added here... */ + struct semaphore tx_lock; + struct gendisk *disk; + int blksize; + u64 bytesize; +}; + +typedef struct dnbd_device dnbd_device_t; diff --git a/kernel/Makefile b/kernel/Makefile new file mode 100644 index 0000000..fbff4e6 --- /dev/null +++ b/kernel/Makefile @@ -0,0 +1,18 @@ +KDIR := /lib/modules/$(shell uname -r)/build + +# change a line according to your needs, +# if you want to build for another kernel +#KDIR := /lib/modules/2.6.12-usermode-r1-bs5/build +#KDIR := /lib/modules/2.6.13-15.7-smp/build + +PWD := $(shell pwd) + + +obj-m += dnbd.o +dnbd-objs := queue.o cache.o net.o main.o + +all: + $(MAKE) -C $(KDIR) SUBDIRS=$(PWD) modules + +clean: + -$(RM) *.o dnbd.ko *~ diff --git a/kernel/cache.c b/kernel/cache.c new file mode 100644 index 0000000..e72b2ff --- /dev/null +++ b/kernel/cache.c @@ -0,0 +1,389 @@ +/* + * cache.c - block cache with red-black trees + * Copyright (C) 2006 Thorsten Zitterell <thorsten@zitterell.de> + */ + +#include <linux/module.h> +#include <linux/types.h> +#include <linux/highmem.h> +#include <linux/fs.h> +#include <linux/file.h> +/* use red-black library of kernel */ +#include <linux/rbtree.h> +#include <asm/uaccess.h> + +#include "../common/dnbd-cliserv.h" +#include "cache.h" + +/* empty node */ +#define rb_entry_cn(node) rb_entry((node), struct cache_node, rb_node) + +/* dummy operations of no cache is used */ +int dnbd_cache_dummy_search(dnbd_cache_t * cache, struct request *req) +{ + return 0; +} + +int dnbd_cache_dummy_insert(dnbd_cache_t * cache, sector_t block, + void *buf) +{ + return 0; +} + +void dnbd_cache_dummy_clean(dnbd_cache_t * cache) +{ + return; +} + +int dnbd_cache_search(dnbd_cache_t * cache, struct request *req) +{ + /* hold segment as we copy from user space */ + mm_segment_t old_fs = get_fs(); + size_t blksize; + void *kaddr; + + int i; + struct bio *bio; + struct bio_vec *bvec; + + int result = 0, rbytes; + struct rb_node *n; + struct cache_node *cn; + loff_t offset; + char *buf; + + down(&cache->sema); + n = cache->root.rb_node; + blksize = cache->blksize; + + /* search for block */ + while (n) { + cn = rb_entry_cn(n); + + if (req->sector < cn->rb_key) + n = n->rb_left; + else if (req->sector > cn->rb_key) + n = n->rb_right; + else + goto found; + } + /* block is not cached */ + cache->miss++; + goto out; + + /* cached block was found */ + found: + cache->hits++; + offset = cn->rb_data * blksize; + rq_for_each_bio(bio, req) { + bio_for_each_segment(bvec, bio, i) { + if (bvec->bv_len > blksize) { + printk(KERN_WARNING + "bvec->bv_len greater than cache block size\n"); + goto out; + } + /* copy cached block from cache file */ + set_fs(get_ds()); + buf = req->buffer; + kaddr = kmap(bvec->bv_page); + rbytes = + vfs_read(cache->filp, kaddr + bvec->bv_offset, + bvec->bv_len, &offset); + kunmap(bvec->bv_page); + set_fs(old_fs); + + /* error on reading? */ + if (rbytes != bio_iovec(req->bio)->bv_len) { + printk + ("dnbd: ERROR reading from cache!\n"); + result = 0; + goto out; + } + + result += rbytes; + if (result == blksize) + goto out; + } + } + + out: + up(&cache->sema); + + /* return number of copied sectors */ + return result >> 9; +} + +int dnbd_cache_insert(dnbd_cache_t * cache, sector_t sector, void *buf) +{ + mm_segment_t old_fs = get_fs(); + int result = 0; + struct rb_node **p; + struct rb_node *parent = NULL; + cache_node_t *__cn, *cn; + sector_t act_block; + + loff_t offset; + + down(&cache->sema); + p = &cache->root.rb_node; + + /* red-black tree search and replacement in O(log n) */ + + /* check if node was already inserted to cache and, + if necessary, do LRU replacement */ + + while (*p) { + parent = *p; + __cn = rb_entry_cn(parent); + if (sector < __cn->rb_key) + p = &(*p)->rb_left; + else if (sector > __cn->rb_key) + p = &(*p)->rb_right; + else { + /* the sector was already added to cache */ + + /* LRU replacement policy */ + if (__cn->prev) + __cn->prev->next = __cn->next; + else + /* __cn is head - do nothing */ + goto no_lru; + + if (__cn->next) + __cn->next->prev = __cn->prev; + else + /* __cn is tail - so set new one */ + cache->tail = + (__cn->prev ? __cn->prev : __cn); + + /* insert new node to head: + head of list has no predecessor, + set previous node to NULL and next + node to old head and set new head */ + + __cn->prev = NULL; + __cn->next = cache->head; + + if (cache->head) + cache->head->prev = __cn; + + cache->head = __cn; + + cache->lru++; + no_lru: + result = 1; + goto out; + } + } + + /* check if cache is full */ + if (cache->used_blocks == cache->max_blocks) { + /* + remove LRU node (cn), but keep reserved + data structure in memory + */ + cn = cache->tail; + cache->tail->prev->next = NULL; + cache->tail = cache->tail->prev; + + /* + Node (which is tail of LRU list) will be erased from tree + which is then rebalanced. + Re-finding a parent for the a node is mandatory. + */ + act_block = cn->rb_data; + rb_erase(&cn->rb_node, &cache->root); + p = &cache->root.rb_node; + while (*p) { + parent = *p; + __cn = rb_entry_cn(parent); + if (sector < __cn->rb_key) + p = &(*p)->rb_left; + else + p = &(*p)->rb_right; + } + } else { + /* cache is not full, so reserve memory for red-black tree node */ + if (! + (cn = + (cache_node_t *) kmalloc(sizeof(cache_node_t), + GFP_KERNEL))) { + result = -ENOMEM; + goto out; + } + act_block = cache->used_blocks; + /* cn = &cache->nodes[act_block]; */ + cache->used_blocks++; + } + + /* write block to cache file */ + offset = act_block * cache->blksize; + set_fs(get_ds()); + result = vfs_write(cache->filp, buf, cache->blksize, &offset); + set_fs(old_fs); + + if (result != cache->blksize) { + printk("dnbd: ERROR writing to cache!\n"); + cache->used_blocks--; + kfree(cn); + goto out; + } + + /* cn (current node) points to an empty node, now */ + cn->rb_key = sector; + cn->rb_data = act_block; + + /* current node (which will become the new head) has no predecessor */ + cn->prev = NULL; + cn->next = cache->head; + + /* adapt head element - if it exists */ + if (cache->head) + cache->head->prev = cn; + + /* set new head */ + cache->head = cn; + + /* set new tail */ + if (!cache->tail) + cache->tail = cn; + + /* call kernel helpers for red-black trees */ + rb_link_node(&cn->rb_node, parent, p); + rb_insert_color(&cn->rb_node, &cache->root); + + out: + up(&cache->sema); + return result; +} + +int dnbd_cache_init(dnbd_cache_t * cache) +{ + int result = -EINVAL; + + /* initialize cache */ + cache->active = 0; + + /* set dummy function, if no cache is used */ + cache->insert = &dnbd_cache_dummy_insert; + cache->search = &dnbd_cache_dummy_search; + cache->clean = &dnbd_cache_dummy_clean; + + cache->root.rb_node = NULL; + + cache->max_blocks = 0; + cache->used_blocks = 0; + cache->blksize = 0; + + cache->hits = 0; + cache->miss = 0; + cache->lru = 0; + + cache->filp = NULL; + cache->fname = NULL; + + cache->head = NULL; + cache->tail = NULL; + spin_lock_init(&cache->lock); + init_MUTEX(&cache->sema); + + result = 0; + return result; +} + +void dnbd_cache_clean(dnbd_cache_t * cache) +{ + cache_node_t *node; + cache_node_t *tmp; + int cnt = 0; + + spin_lock(&cache->lock); + node = cache->head; + + cache->head = NULL; + cache->tail = NULL; + + if (cache->fname) + kfree(cache->fname); + + /* free memory of all nodes; start with head */ + while (node) { + tmp = node; + node = node->next; + kfree(tmp); + cnt++; + } + printk(KERN_INFO "dnbd: freed %i cache nodes\n", cnt); + + cache->active = 0; + spin_unlock(&cache->lock); + +} + +int dnbd_cache_set(dnbd_cache_t * cache, struct dnbd_file __user * arg, + size_t blksize) +{ + int result = -EINVAL; + struct dnbd_file cfile; + struct kstat stat; + loff_t div1; + size_t div2; + + if (cache->active) { + printk(KERN_ERR "dnbd: cachefile is already set!\n"); + return -EFAULT; + } + + /* open, verify and set cache file */ + if (copy_from_user(&cfile, arg, sizeof(cfile))) + return -EFAULT; + + if (!(cache->fname = (char *) kmalloc(cfile.len + 1, GFP_KERNEL))) + return -ENOMEM; + + if (copy_from_user + (cache->fname, (void __user *) cfile.name, cfile.len)) { + result = -EFAULT; + goto out_free; + } + *(cache->fname + cfile.len) = 0; + + printk(KERN_INFO "dnbd: setting cachefile to %s\n", cache->fname); + + cache->filp = filp_open(cache->fname, O_RDWR | O_LARGEFILE, 0); + + if (cache->filp == NULL || IS_ERR(cache->filp)) { + printk(KERN_ERR "dnbd: ERROR opening cache file!\n"); + result = -EINVAL; + goto out_free; + } + + generic_fillattr(cache->filp->f_dentry->d_inode, &stat); + + div1 = stat.size; + div2 = blksize; + do_div(div1, div2); + + printk(KERN_INFO + "dnbd: cachefile size %llu KB using %llu blocks a %i bytes for caching.\n", + stat.size >> 10, div1, blksize); + + cache->max_blocks = div1; + cache->blksize = blksize; + + /* activate cache and adapt function for insert, search and clean up */ + cache->active = 1; + cache->insert = &dnbd_cache_insert; + cache->search = &dnbd_cache_search; + cache->clean = &dnbd_cache_clean; + + result = 0; + goto out; + + out_free: + kfree(cache->fname); + out: + if (result < 0 && cache->filp) + fput(cache->filp); + return result; +} diff --git a/kernel/cache.h b/kernel/cache.h new file mode 100644 index 0000000..1cc2b49 --- /dev/null +++ b/kernel/cache.h @@ -0,0 +1,49 @@ +#ifndef LINUX_DNBD_CACHE_H +#define LINUX_DNBD_CACHE_H 1 + +#include <linux/rbtree.h> +#include <linux/blkdev.h> + +#include "../common/dnbd-cliserv.h" + +/* node for red-black tree */ +struct cache_node { + struct rb_node rb_node; + sector_t rb_key; + sector_t rb_data; + /* previous and next node used for LRU */ + struct cache_node *prev; + struct cache_node *next; +}; + +typedef struct cache_node cache_node_t; + +/* cache characteristics */ +struct dnbd_cache { + int active; /* !0 when cache active */ + char *fname; /* cache file name */ + int fd; /* cache file descriptor */ + struct file *filp; /* cache file structure */ + struct rb_root root; /* root node of cache */ + sector_t max_blocks; /* maximum of cached blocks */ + sector_t used_blocks; /* current used blocks */ + size_t blksize; + struct cache_node *head; /* head of LRU list */ + struct cache_node *tail; /* tail of LRU list */ + spinlock_t lock; + struct semaphore sema; + int (*insert) (struct dnbd_cache * cache, sector_t sector, void *buf); + int (*search) (struct dnbd_cache * cache, struct request *req); + void (*clean) (struct dnbd_cache * cache); + long hits; /* statistics */ + long miss; + long lru; + +}; + +typedef struct dnbd_cache dnbd_cache_t; + +int dnbd_cache_init(dnbd_cache_t * cache); +int dnbd_cache_set(dnbd_cache_t * dcache, struct dnbd_file __user * cachefile, size_t blksize); + +#endif diff --git a/kernel/dnbd.h b/kernel/dnbd.h new file mode 100644 index 0000000..8d232b0 --- /dev/null +++ b/kernel/dnbd.h @@ -0,0 +1,57 @@ +#ifndef LINUX_DNBD_H +#define LINUX_DNBD_H 1 + +#include <linux/completion.h> +#include <linux/in.h> +#include <asm/semaphore.h> +#include <linux/blkdev.h> +#include <linux/rbtree.h> +#include <linux/jiffies.h> +#include <linux/timer.h> +#include <linux/version.h> + +#include "queue.h" +#include "cache.h" +#include "net.h" + +#define MAX_DNBD 16 + +/* needed for rx_loop, tx_loop, ss_loop */ +struct dnbd_thread { + struct task_struct *task; + struct completion startup; + struct completion finish; +}; + +typedef struct dnbd_thread dnbd_thread_t; + +struct dnbd_device { + int magic; + int state; + struct socket *sock; /* network socket */ + struct sockaddr_in mcast; + struct file *file; + spinlock_t thread_lock; /* locks */ + spinlock_t queue_lock; + spinlock_t timer_lock; + struct semaphore semalock; + struct gendisk *disk; /* general disk interface */ + int blksize; + u64 bytesize; + atomic_t refcnt; /* reference counter for module */ + dnbd_thread_t rx_thread; + dnbd_thread_t tx_thread; + dnbd_thread_t ss_thread; + atomic_t num_io_threads; + wait_queue_head_t io_waiters; + dnbd_queue_t rx_queue; /* queue for outstanding request */ + dnbd_queue_t tx_queue; /* queue for requests to be sent */ + struct dnbd_cache cache; + struct dnbd_servers servers; /* pointer to servers */ + struct timer_list timer; +}; + +typedef struct dnbd_device dnbd_device_t; + + +#endif /* LINUX_DNBD_H */ diff --git a/kernel/main.c b/kernel/main.c new file mode 100644 index 0000000..22a6032 --- /dev/null +++ b/kernel/main.c @@ -0,0 +1,1264 @@ + /* + * main.c - central part of the dnbd device + * Copyright (C) 2006 Thorsten Zitterell <thorsten@zitterell.de> + * + * see CREDITS for licence + * + * parts and ideas based on + * + * - ANBD (another network block device) + * Copyright (C) 2003 Louis D. Langholtz <ld@aros.net>. + * + * - NBD (network block device) + * Copytight 1979-2000 Pavel Machek <pavel@ucw.cz> + * Parts copyright 2001 Steven Whitehouse <steve@chygwyn.com> + * + */ + +#include <linux/module.h> +#include <linux/proc_fs.h> +#include <linux/blkdev.h> +#include <linux/init.h> +#include <linux/sched.h> +#include <linux/preempt.h> +#include <linux/fs.h> /* everything... */ +#include <linux/bio.h> + +#include <linux/errno.h> /* error codes */ +#include <linux/devfs_fs_kernel.h> +#include <asm/uaccess.h> +#include <linux/file.h> + +/* network stuff */ +#include <linux/net.h> +#include <linux/in.h> +#include <net/sock.h> +#include <linux/skbuff.h> +#include <linux/udp.h> + +#include <linux/types.h> /* size_t */ + +#include "../common/dnbd-cliserv.h" +#include "dnbd.h" +#include "queue.h" +#include "cache.h" +#include "net.h" + +#define LO_MAGIC 0x68797548 +#define DEVICE_TO_MINOR(dnbd) ((int)((dnbd)-dnbd_dev)) + +int dnbd_major = DNBD_MAJOR; + +/* private structures */ +typedef int (*thread_fn_t) (void *); + +/* function */ +static int dnbd_rx_loop(void *data); +static int dnbd_tx_loop(void *data); + +static struct dnbd_device dnbd_dev[MAX_DNBD]; +static struct proc_dir_entry *dnbd_proc_dir = NULL; + +/* inform kernel that some sectors of a request have been transferred */ +static int dnbd_end_request(dnbd_device_t * dnbd, struct request *req, + int success, int size) +{ + unsigned long flags; + request_queue_t *q = req->q; + + int result = 0; + + spin_lock_irqsave(q->queue_lock, flags); + if (!(result = end_that_request_first(req, success, size))) { +#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,16) + end_that_request_last(req,success); +#else + end_that_request_last(req); +#endif + + } + spin_unlock_irqrestore(q->queue_lock, flags); + return result; /* 0, if request is completed */ +} + +/* empty a request queue */ +void dnbd_clear_queue(dnbd_device_t * dnbd, dnbd_queue_t * q) +{ + struct request *req; + do { + req = dnbd_deq_request(q); + if (req) { + dnbd_end_request(dnbd, req, 0, req->nr_sectors); + } + } while (req); +} + +/* empty all queues: tx_queue, rx_queue */ +void dnbd_clear_queues(dnbd_device_t * dnbd) +{ + spin_lock_irq(&dnbd->thread_lock); + + if (dnbd->rx_thread.task) { + printk(KERN_ERR + "dnbd_clear_queues: rx_thread still active!\n"); + } else { + dnbd_clear_queue(dnbd, &dnbd->rx_queue); + } + + if (dnbd->tx_thread.task) { + printk(KERN_ERR + "dnbd_clear_queues: tx_thread still active!\n"); + } else { + dnbd_clear_queue(dnbd, &dnbd->tx_queue); + } + + spin_unlock_irq(&dnbd->thread_lock); +} + +/* send a request via network */ +static int sock_xmit(dnbd_device_t * dnbd, int send, void *buf, int size, + int flags) +{ + int result = 0; + struct msghdr msg; + struct kvec iov; + unsigned long irqflags; + sigset_t oldset; + + /* do not allow signals, except of SIGKILL */ + spin_lock_irqsave(¤t->sighand->siglock, irqflags); + oldset = current->blocked; + sigfillset(¤t->blocked); + sigdelsetmask(¤t->blocked, sigmask(SIGKILL)); + recalc_sigpending(); + spin_unlock_irqrestore(¤t->sighand->siglock, irqflags); + + /* prepare data structures and call kernel send routine */ + do { + dnbd->sock->sk->sk_allocation = GFP_NOIO; + iov.iov_base = buf; + iov.iov_len = size; + if (send) { + msg.msg_name = &dnbd->mcast; + msg.msg_namelen = sizeof(dnbd->mcast); + } else { + msg.msg_name = NULL; + msg.msg_namelen = 0; + } + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = MSG_NOSIGNAL; + + if (send) { + result = + kernel_sendmsg(dnbd->sock, &msg, &iov, 1, + size); + } else { + result = + kernel_recvmsg(dnbd->sock, &msg, &iov, 1, size, + 0); + } + + if (result <= 0) + break; + + size -= result; + buf += result; + + } while (0); + + /* set signal mask to original state */ + spin_lock_irqsave(¤t->sighand->siglock, irqflags); + current->blocked = oldset; + recalc_sigpending(); + spin_unlock_irqrestore(¤t->sighand->siglock, irqflags); + + return result; +} + +/* copy sectors to cache */ +static void dnbd_xfer_to_cache(dnbd_device_t * dnbd, struct sk_buff *skb, + int offset, int remain, sector_t sector) +{ + mm_segment_t oldfs = get_fs(); + int result; + size_t blksize = dnbd->cache.blksize; + char block_buf[blksize]; + struct iovec iov; + + if (!dnbd->cache.active) + return; + + set_fs(get_ds()); + while (remain >= blksize) { + iov.iov_base = &block_buf; + iov.iov_len = blksize; + /* copy data from socket buffer */ + if ((result = + skb_copy_datagram_iovec(skb, offset, &iov, + blksize)) < 0) { + printk(KERN_WARNING + "dnbd: error copy packet to iovec!\n"); + } + /* and insert to cache */ + dnbd->cache.insert(&dnbd->cache, sector, &block_buf); + remain -= blksize; + offset += blksize; + sector += blksize / (1 << 9); + } + set_fs(oldfs); +} + +/* process incoming network packets */ +static int inline dnbd_recv_reply(dnbd_device_t * dnbd) +{ + mm_segment_t oldfs = get_fs(); + int i; + unsigned int nsect = 0; + int err; + struct sk_buff *skb; + struct iovec iov; + int remain, offset, tocopy; + dnbd_reply_t *reply; + struct request *req = NULL; + struct bio *bio; + struct bio_vec *bvec; + int tt; + void *kaddr; + + /* sleep until packet arrives */ + skb = skb_recv_datagram(dnbd->sock->sk, 0, 0, &err); + + if (!skb) + goto out_nofree; + + /* + some NICs can verify checksums themselves and then is + unnecessary for us + */ + offset = sizeof(struct udphdr); + if (skb->ip_summed != CHECKSUM_UNNECESSARY && (unsigned short) + csum_fold(skb_checksum(skb, 0, skb->len, skb->csum))) { + printk(KERN_ERR "dnbd: udp checksum error!\n"); + goto out; + } + reply = (dnbd_reply_t *) (skb->data + offset); + + /* transform values from network to host byte order */ + reply->magic = ntohl(reply->magic); + reply->id = ntohs(reply->id); + reply->time = ntohs(reply->time); + reply->cmd = ntohs(reply->cmd); + reply->pos = be64_to_cpu(reply->pos); + + if (reply->magic != DNBD_MAGIC) { + printk(KERN_ERR "dnbd: wrong magic in reply!\n"); + goto out; + } + + /* calculate RTT */ + tt = jiffies & 0xffff; + tt -= reply->time; + if (tt < 0) + tt += 1 << 16; + + /* check reply command */ + if (reply->cmd & DNBD_CMD_SRV) { + switch (reply->cmd & DNBD_CMD_MASK) { + case DNBD_CMD_READ: + break; + case DNBD_CMD_HB: + if (!dnbd_set_serverid(&dnbd->servers, reply->id)) + printk(KERN_INFO + "dnbd: (re)activate server #%i\n", + reply->id); + /* update times */ + dnbd_rx_update(dnbd->servers, reply->id); + dnbd_rtt_server(&dnbd->servers, reply->id, tt); + default: + goto out; + } + } else + goto out; + + /* update times */ + dnbd_rx_update(dnbd->servers, reply->id); + + /* try to find outstanding request */ + req = dnbd_deq_request_handle(&dnbd->rx_queue, reply->pos); + + offset += sizeof(struct dnbd_reply); + remain = skb->len - offset; + + /* we know this request? No? Let's cache it ... */ + if (!req) { + if ((reply->cmd & DNBD_CMD_SRV) + && (reply->cmd & DNBD_CMD_READ)) + dnbd_xfer_to_cache(dnbd, skb, offset, remain, + reply->pos >> 9); + if (!req) + goto out; + } + + /* the reply fits to an outstanding request */ + dnbd_rtt_server(&dnbd->servers, reply->id, tt); + + nsect = 0; + err = 0; + /* copy network data to BIOs */ + rq_for_each_bio(bio, req) { + bio_for_each_segment(bvec, bio, i) { + tocopy = bvec->bv_len; + if (tocopy > remain) + goto nobytesleft; + kaddr = kmap(bvec->bv_page); + iov.iov_base = kaddr + bvec->bv_offset; + iov.iov_len = tocopy; + set_fs(KERNEL_DS); + err = + skb_copy_datagram_iovec(skb, offset, &iov, + tocopy); + set_fs(oldfs); + kunmap(bvec->bv_page); + + if (err) { + printk(KERN_ERR "dnbd: ERROR copy data\n"); + goto nobytesleft; + } + + offset += tocopy; + remain -= tocopy; + nsect += bvec->bv_len >> 9; + } + } + nobytesleft: + /* end request partially or fully */ + if (dnbd_end_request(dnbd, req, 1, nsect)) { + dnbd_enq_request(&dnbd->tx_queue, req, 1); + } + out: + /* free reserved memory of packet */ + skb_free_datagram(dnbd->sock->sk, skb); + out_nofree: + + return nsect; +} + +static int dnbd_send_request(dnbd_device_t * dnbd, struct request *req) +{ + int result = 0; + dnbd_request_t request; + unsigned long size = req->current_nr_sectors << 9; + int id; + + /* find nearest server */ + id = dnbd_next_server(&dnbd->servers); + + /* fill structure for a DNBD request */ + request.magic = cpu_to_be32(DNBD_MAGIC); + request.id = cpu_to_be16((u16) id); + request.time = cpu_to_be16(jiffies & 0xffff); + request.cmd = cpu_to_be16(DNBD_CMD_READ | DNBD_CMD_CLI); + request.pos = cpu_to_be64((u64) req->sector << 9); + request.len = cpu_to_be16(size); + + /* send DNBD request */ + result = sock_xmit(dnbd, 1, &request, sizeof(request), 0); + /* set times */ + req->start_time = jiffies; + dnbd_tx_update(dnbd->servers, id); + + return result; +} + +/* same for heartbeats */ +static int dnbd_send_hb(dnbd_device_t * dnbd) +{ + int result = 0; + dnbd_request_t request; + + request.magic = cpu_to_be32(DNBD_MAGIC); + request.id = cpu_to_be16((u16) 0); + request.time = cpu_to_be16(jiffies & 0xffff); + request.cmd = cpu_to_be16(DNBD_CMD_HB | DNBD_CMD_CLI); + request.pos = 0; + request.len = 0; + + result = sock_xmit(dnbd, 1, &request, sizeof(request), 0); + + return result; +} + +/* helper function to start threads */ +static int dnbd_start_thread(dnbd_device_t * dnbd, + dnbd_thread_t * thread, thread_fn_t fn) +{ + int result = -EINVAL; + struct task_struct *task; + + spin_lock(&dnbd->thread_lock); + + task = thread->task; + if (!task) + thread->task = (struct task_struct *) -1; + + spin_unlock(&dnbd->thread_lock); + + if (task) + return -EBUSY; + + init_completion(&thread->startup); + init_completion(&thread->finish); + + result = kernel_thread(fn, dnbd, CLONE_FS | CLONE_FILES); + + if (result) + wait_for_completion(&thread->startup); + return result; +} + +/* helper function to stop threads */ +static int dnbd_stop_thread(dnbd_device_t * dnbd, + dnbd_thread_t * thread, int wait) +{ + pid_t signaled; + struct task_struct *task; + + signaled = 0; + spin_lock(&dnbd->thread_lock); + task = thread->task; + if (task) { + force_sig(SIGKILL, task); + signaled = task->pid; + } + spin_unlock(&dnbd->thread_lock); + if (signaled) { + if (wait) + wait_for_completion(&thread->finish); + return 1; + } + return 0; +} + +/* helper function for clean up */ +static void dnbd_end_io(dnbd_device_t * dnbd) +{ + dnbd_clear_queues(dnbd); + wake_up(&dnbd->io_waiters); +} + +/* rx_loop function */ +static int dnbd_rx_loop(void *data) +{ + int signr; + + dnbd_device_t *dnbd = (dnbd_device_t *) data; + + __module_get(THIS_MODULE); + printk("rx_loop: enter\n"); + atomic_inc(&dnbd->num_io_threads); + daemonize("dnbd_rx_loop"); + allow_signal(SIGKILL); + + spin_lock(&dnbd->thread_lock); + dnbd->rx_thread.task = current; + spin_unlock(&dnbd->thread_lock); + + complete(&dnbd->rx_thread.startup); + + /* loop until SIGKILL arrives */ + while ((signr = signal_pending(current)) == 0) { + dnbd_recv_reply(dnbd); + } + + spin_lock(&dnbd->thread_lock); + dnbd->rx_thread.task = NULL; + spin_unlock(&dnbd->thread_lock); + + dnbd_stop_thread(dnbd, &dnbd->rx_thread, 0); + complete(&dnbd->rx_thread.finish); + if (atomic_dec_and_test(&dnbd->num_io_threads)) + dnbd_end_io(dnbd); + + printk("rx_loop: leave\n"); + module_put(THIS_MODULE); + + return 0; +} + +static int dnbd_tx_loop(void *data) +{ + int signr; + dnbd_device_t *dnbd = (dnbd_device_t *) data; + struct request *req; + int result, cached; + + __module_get(THIS_MODULE); + printk("tx_loop: enter\n"); + atomic_inc(&dnbd->num_io_threads); + daemonize("dnbd_tx_loop"); + allow_signal(SIGKILL); + + spin_lock(&dnbd->thread_lock); + dnbd->tx_thread.task = current; + spin_unlock(&dnbd->thread_lock); + + complete(&dnbd->tx_thread.startup); + + /* loop until SIGKILL arrives */ + while ((signr = signal_pending(current)) == 0) { + req = dnbd_try_deq_request(&dnbd->tx_queue); + + if (!req) + continue; + + /* request already in cache? */ + cached = dnbd->cache.search(&dnbd->cache, req); + + if (cached) { + if (!end_that_request_first(req, 1, cached)) { +#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,16) + end_that_request_last(req,1); +#else + end_that_request_last(req); +#endif + } else { + dnbd_enq_request(&dnbd->tx_queue, req, 1); + } + continue; + } + + dnbd_enq_request(&dnbd->rx_queue, req, 0); + result = dnbd_send_request(dnbd, req); + } + + spin_lock(&dnbd->thread_lock); + dnbd->tx_thread.task = NULL; + spin_unlock(&dnbd->thread_lock); + + dnbd_stop_thread(dnbd, &dnbd->tx_thread, 0); + complete(&dnbd->tx_thread.finish); + if (atomic_dec_and_test(&dnbd->num_io_threads)) + dnbd_end_io(dnbd); + + printk("tx_loop: leave\n"); + module_put(THIS_MODULE); + return 0; +} + +/* rexmit function is called periodically by kernel timer */ +static void dnbd_rexmit(unsigned long arg) +{ + dnbd_device_t *dnbd = (dnbd_device_t *) arg; + unsigned long flags; + unsigned long timeout; + + int requeued; + + /* difference in jiffies for request timeout */ + int diff = dnbd->servers.asrtt >> SRTT_SHIFT; + + /* just in case, give boundaries for request timeouts */ + if (diff < dnbd->servers.timeout_min) + diff = dnbd->servers.timeout_min; + if (diff > dnbd->servers.timeout_max) + diff = dnbd->servers.timeout_max; + + timeout = jiffies - (diff << TIMEOUT_SHIFT); + + requeued = + dnbd_requeue_requests(&dnbd->tx_queue, &dnbd->rx_queue, + timeout); + + /* set timer again in ASRTT jiffies for better granularity */ + if (dnbd->state & DNBD_STATE_RUNNING) { + spin_lock_irqsave(&dnbd->timer_lock, flags); + dnbd->timer.expires = jiffies + diff; + add_timer(&dnbd->timer); + spin_unlock_irqrestore(&dnbd->timer_lock, flags); + } +} + +/* session loop takes care of statistics */ +static int dnbd_ss_loop(void *data) +{ + dnbd_device_t *dnbd = (dnbd_device_t *) data; + int signr; + + __module_get(THIS_MODULE); + printk("ss_loop: enter\n"); + atomic_inc(&dnbd->num_io_threads); + daemonize("dnbd_ss_loop"); + allow_signal(SIGKILL); + + spin_lock(&dnbd->thread_lock); + dnbd->ss_thread.task = current; + spin_unlock(&dnbd->thread_lock); + + complete(&dnbd->ss_thread.startup); + + while ((signr = signal_pending(current)) == 0) { + set_current_state(TASK_INTERRUPTIBLE); + schedule_timeout(HZ * 4); /* fixme: give user space option */ + set_current_state(TASK_RUNNING); + dnbd_servers_weight(&dnbd->servers); + dnbd_send_hb(dnbd); + } + + spin_lock(&dnbd->thread_lock); + dnbd->ss_thread.task = NULL; + spin_unlock(&dnbd->thread_lock); + + dnbd_stop_thread(dnbd, &dnbd->ss_thread, 0); + complete(&dnbd->ss_thread.finish); + if (atomic_dec_and_test(&dnbd->num_io_threads)) + dnbd_end_io(dnbd); + + printk("ss_loop: leave\n"); + module_put(THIS_MODULE); + return 0; + +} + +/* waits until a thread has exited */ +static int dnbd_wait_threads_finished(dnbd_device_t * dnbd) +{ + int signaled = 0; + struct task_struct *tsk = current; + DECLARE_WAITQUEUE(wait, tsk); + + if (atomic_read(&dnbd->num_io_threads) > 0) { + add_wait_queue(&dnbd->io_waiters, &wait); + set_current_state(TASK_INTERRUPTIBLE); + if (atomic_read(&dnbd->num_io_threads) > 0) + schedule(); + set_current_state(TASK_RUNNING); + if (signal_pending(current)) + signaled = 1; + remove_wait_queue(&dnbd->io_waiters, &wait); + } + return signaled ? 0 : 1; +} + +/* activate threads (rx_loop, tx_loop, ss_loop) */ +static int dnbd_activate_threads(dnbd_device_t * dnbd) +{ + int result = -EINVAL; + + printk(KERN_NOTICE "dnbd: activating threads...\n"); + result = dnbd_start_thread(dnbd, &dnbd->rx_thread, dnbd_rx_loop); + if (result < 0) + return result; + + result = dnbd_start_thread(dnbd, &dnbd->tx_thread, dnbd_tx_loop); + if (result < 0) { + dnbd_stop_thread(dnbd, &dnbd->rx_thread, 1); + return result; + } + result = dnbd_start_thread(dnbd, &dnbd->ss_thread, dnbd_ss_loop); + if (result < 0) { + dnbd_stop_thread(dnbd, &dnbd->rx_thread, 1); + dnbd_stop_thread(dnbd, &dnbd->tx_thread, 1); + return result; + } + return 0; +} + +/* deactivate threads (rx_loop, tx_loop, ss_loop) */ +static int dnbd_deactivate_threads(dnbd_device_t * dnbd) +{ + + printk(KERN_NOTICE "dnbd: deactivating threads...\n"); + dnbd_stop_thread(dnbd, &dnbd->tx_thread, 1); + dnbd_stop_thread(dnbd, &dnbd->rx_thread, 1); + dnbd_stop_thread(dnbd, &dnbd->ss_thread, 1); + return 0; +} + +/* start threads and activate timer for retransmits */ +static int dnbd_startup(dnbd_device_t * dnbd) +{ + int result = -EINVAL; + result = dnbd_activate_threads(dnbd); + + if (result < 0) { + printk(KERN_NOTICE + "dnbd_startup: ERROR activating threads!\n"); + + goto out; + } + + dnbd->state = DNBD_STATE_RUNNING; + + dnbd->timer.data = (unsigned long) dnbd; + dnbd->timer.function = dnbd_rexmit; + dnbd->timer.expires = jiffies; + add_timer(&dnbd->timer); + out: + return result; +} + +/* disable timer and shutdown threads */ +static int dnbd_shutdown(dnbd_device_t * dnbd) +{ + int result = -EINVAL; + del_timer(&dnbd->timer); + result = dnbd_deactivate_threads(dnbd); + if (result < 0) + printk(KERN_NOTICE + "dnbd_shutdown: ERROR deactivating threads!\n"); + else + dnbd->state &= ~DNBD_STATE_RUNNING; + + return result; +} + +/* startup with semaphore */ +static int dnbd_start(dnbd_device_t * dnbd) +{ + int result; + + down(&dnbd->semalock); + result = dnbd_startup(dnbd); + up(&dnbd->semalock); + return result; +} + +/* shutdown with semaphore */ +static int dnbd_stop(dnbd_device_t * dnbd) +{ + int result; + down(&dnbd->semalock); + result = dnbd_shutdown(dnbd); + up(&dnbd->semalock); + return result; +} + +/* function called by the kernel to make DNBD process a request */ +static void dnbd_do_request(request_queue_t * q) +{ + dnbd_device_t *dnbd = NULL; + int minor; + + struct request *req; + + /* as long as there are requests... */ + while ((req = elv_next_request(q)) != NULL) { + + /* dequeue request from kernel queue */ + blkdev_dequeue_request(req); + if (!blk_fs_request(req)) { + printk(KERN_NOTICE "Skip non-CMD request\n"); + goto error_out; + } + + dnbd = req->rq_disk->private_data; + if (!dnbd) { + printk(KERN_ERR "dnbd: no private data\n"); + } + + minor = DEVICE_TO_MINOR(dnbd); + + if (!(dnbd->state & DNBD_STATE_RUNNING)) + goto error_out; + + if (rq_data_dir(req) != READ) { + goto error_out; + } + + /* + enqueue request to tx_queue, where it will be fetched + by the tx_loop + */ + spin_unlock_irq(q->queue_lock); + dnbd_enq_request(&dnbd->tx_queue, req, 1); + spin_lock_irq(q->queue_lock); + + continue; + + error_out: + spin_unlock_irq(q->queue_lock); + dnbd_end_request(dnbd, req, 0, req->nr_sectors); + spin_lock_irq(q->queue_lock); + } + return; +} + +/* called from ioctl to set socket */ +static int dnbd_set_sock(dnbd_device_t * dnbd, unsigned long arg) +{ + int result = -EINVAL; + struct file *file = NULL; + struct inode *inode = NULL; + struct socket *sock = NULL; + + if (dnbd->sock || dnbd->file) { + result = -EBUSY; + goto out; + } + + file = fget(arg); + if (!file) { + result = -EBADF; + goto out; + } + + inode = file->f_dentry->d_inode; + if (!S_ISSOCK(inode->i_mode)) { + result = -ENOTSOCK; + goto out; + } + + if (!(sock = SOCKET_I(inode))) { + result = -ENOTSOCK; + goto out; + } + + if (sock->type != SOCK_DGRAM) { + result = -EPROTONOSUPPORT; + goto out; + } + + atomic_inc(&dnbd->refcnt); + dnbd->file = file; + dnbd->sock = sock; + + result = 0; + + out: + if (result < 0 && file) + result = -EINVAL; + + return result; +} + +/* release socket */ +static int dnbd_clear_sock(dnbd_device_t * dnbd) +{ + int result = -EINVAL; + struct file *file = NULL; + struct socket *sock = NULL; + + if (!dnbd) + goto out; + + spin_lock(&dnbd->thread_lock); + sock = dnbd->sock; + file = dnbd->file; + dnbd->sock = NULL; + dnbd->file = NULL; + spin_unlock(&dnbd->thread_lock); + + if (!sock) { + result = -ENOTCONN; + goto out; + } + + if (!file) { + result = -EINVAL; + goto out; + } + /* + * space for operations when socket has to be cleared, + * which is done from user space (client/client.c) + */ + + atomic_dec(&dnbd->refcnt); + result = 0; + + out: + if (file) { + fput(file); + } + return result; + +} + +/* function is invoked from user space to start session */ +static int dnbd_do_it(dnbd_device_t * dnbd) +{ + int result = 0; + + if (!try_module_get(THIS_MODULE)) { + printk(KERN_ERR + "dnbd_do_it: try_module_get not worked!\n"); + goto out; + } + + result = dnbd_start(dnbd); + + if (result < 0) + goto out; + + /* + * will return when session ends (disconnect), which is + * invoked from user space + */ + dnbd_wait_threads_finished(dnbd); + + dnbd_stop(dnbd); + + + module_put(THIS_MODULE); + + out: + return result; +} + +static int dnbd_disconnect(dnbd_device_t * dnbd) +{ + int result = -EINVAL; + + if (!dnbd->sock) { + result = -ENOTCONN; + goto out; + } + + /* end session and stop threads */ + dnbd_shutdown(dnbd); + + /* wait until threads exited */ + dnbd_wait_threads_finished(dnbd); + + /* clean up */ + dnbd_clear_sock(dnbd); + dnbd->cache.clean(&dnbd->cache); + dnbd_clean_servers(&dnbd->servers); + + result = 0; + out: + return result; + +} + +/* handle ioctl calls from user space */ +static int dnbd_ioctl(struct inode *inode, struct file *file, + unsigned int cmd, unsigned long arg) +{ + int result = -EINVAL; + dnbd_device_t *dnbd; + int minor; + + if (!capable(CAP_SYS_ADMIN)) + return -EPERM; + if (!inode) + return -EINVAL; + + dnbd = inode->i_bdev->bd_disk->private_data; + minor = DEVICE_TO_MINOR(dnbd); + + if (minor >= MAX_DNBD) + return -ENODEV; + + /* different locking behavior needed for ioctl calls */ + switch (cmd) { + case DNBD_DO_IT: + return dnbd_do_it(dnbd); + case DNBD_DISCONNECT: + return dnbd_disconnect(dnbd); + } + + down(&dnbd->semalock); + switch (cmd) { + case DNBD_SET_SOCK: + result = dnbd_set_sock(dnbd, arg); + break; + case DNBD_SET_GROUPNET: + result = + copy_from_user(&dnbd->mcast, (void *) arg, + sizeof(dnbd->mcast)) ? -EFAULT : 0; + break; + case DNBD_SET_BLKSIZE: + dnbd->blksize = arg; + printk(KERN_INFO "dnbd: setting blksize to %i\n", + dnbd->blksize); + dnbd->bytesize &= ~(dnbd->blksize - 1); + inode->i_bdev->bd_inode->i_size = dnbd->bytesize; + set_blocksize(inode->i_bdev, dnbd->blksize); + set_capacity(dnbd->disk, dnbd->bytesize >> 9); + result = 0; + break; + case DNBD_SET_CAPACITY: + result = + copy_from_user(&dnbd->bytesize, (void *) arg, + sizeof(dnbd->bytesize)) ? -EFAULT : 0; + if (result) + break; + dnbd->bytesize = dnbd->bytesize & ~(dnbd->blksize - 1); + inode->i_bdev->bd_inode->i_size = dnbd->bytesize; + set_blocksize(inode->i_bdev, dnbd->blksize); + set_capacity(dnbd->disk, dnbd->bytesize >> 9); + result = 0; + break; + case DNBD_SET_CACHE: + result = + dnbd_cache_set(&dnbd->cache, + (struct dnbd_file __user *) arg, + inode->i_bdev->bd_block_size); + break; + case DNBD_SET_SERVERID: + result = dnbd_set_serverid(&dnbd->servers, arg); + break; + default: + result = -EINVAL; + } + up(&dnbd->semalock); + + return result; +} + +static int dnbd_open(struct inode *inode, struct file *file) +{ + int result = -EINVAL; + dnbd_device_t *dnbd; + int minor; + + if (!inode) + return -EINVAL; + + dnbd = inode->i_bdev->bd_disk->private_data; + minor = DEVICE_TO_MINOR(dnbd); + + if (minor >= MAX_DNBD) + return -ENODEV; + + result = 0; + down(&dnbd->semalock); + + /* open only read-only */ + if ((file->f_mode & FMODE_WRITE)) { + result = -EROFS; + goto out; + } + + /* increment reference counter */ + atomic_inc(&dnbd->refcnt); + out: + up(&dnbd->semalock); + return result; +} + +static int dnbd_release(struct inode *inode, struct file *file) +{ + dnbd_device_t *dnbd; + int minor; + + if (!inode) + return -EINVAL; + + dnbd = inode->i_bdev->bd_disk->private_data; + minor = DEVICE_TO_MINOR(dnbd); + + if (minor >= MAX_DNBD) + return -ENODEV; + + down(&dnbd->semalock); + + /* decrement reference counter */ + atomic_dec(&dnbd->refcnt); + + up(&dnbd->semalock); + return 0; +} + +static struct block_device_operations dnbd_fops = { + .ioctl = dnbd_ioctl, + .open = dnbd_open, +/* .owner = THIS_MODULE, */ + .release = dnbd_release, +}; + +/* reader function for proc interface */ +static int +dnbd_read_proc(char *buf, char **start, off_t offset, + int count, int *eof, void *data) +{ + int i, len = 0; + dnbd_device_t *dnbd; + + i = (int) data; + dnbd = &dnbd_dev[i]; + + + spin_lock(&dnbd->thread_lock); + + len += + snprintf(buf + len, count - len, + "Cache:\n hits %li\n miss %li\n lru replaced %li\n", + dnbd->cache.hits, dnbd->cache.miss, dnbd->cache.lru); + + len += snprintf(buf + len, count - len, "Servers:\n"); + + len += dnbd_show_servers(&dnbd->servers, buf + len, count - len); + + spin_unlock(&dnbd->thread_lock); + + *eof = 1; + return len; +} + +/* register network block device */ +static int __init dnbd_init(void) +{ + int err = -ENOMEM; + int i = 0; + char name[] = "dnbdxx"; + + if (!(dnbd_proc_dir = proc_mkdir("driver/dnbd", NULL))) { + printk(KERN_ERR + "dnbd: can't create dir /proc/driver/dnbd\n"); + goto out; + } + + for (i = 0; (i < MAX_DNBD && i < 100); i++) { + sprintf(name, "dnbd%i", i); + if (!create_proc_read_entry + (name, 0, dnbd_proc_dir, dnbd_read_proc, (void *) i)) { + printk(KERN_ERR + "dnbd: can't create /proc/driver/dnbd\n"); + goto out; + } + } + + + for (i = 0; i < MAX_DNBD; i++) { + /* + * get pre initialized structure for block device minor + */ + struct gendisk *disk = alloc_disk(1); + if (!disk) { + printk(KERN_CRIT "dnbd: alloc_disk failed\n"); + goto out; + } + dnbd_dev[i].disk = disk; + /* + * initizialisation of request queue + * dnbd_do_request() is our function to handle the requests + */ + disk->queue = + blk_init_queue(dnbd_do_request, + &dnbd_dev[i].thread_lock); + + if (!disk->queue) { + printk(KERN_CRIT "dnbd: blk_init_queue failed\n"); + put_disk(disk); + goto out; + } + + /* read ahead */ + disk->queue->backing_dev_info.ra_pages = 8; + + } + + /* unregister_blkdev(DNBD_MAJOR, "dnbd"); */ + if ((dnbd_major = register_blkdev(DNBD_MAJOR, "dnbd")) < 0) { + printk(KERN_CRIT "dnbd: register_blkdev failed\n"); + err = -EIO; + goto out; + } + + printk(KERN_INFO "dnbd: module loaded with major %i\n", + dnbd_major); + + devfs_mk_dir("dnbd"); + for (i = 0; i < MAX_DNBD; i++) { + struct gendisk *disk = dnbd_dev[i].disk; + dnbd_dev[i].state = DNBD_STATE_LOADED; + init_MUTEX(&dnbd_dev[i].semalock); + init_timer(&dnbd_dev[i].timer); + + spin_lock_init(&dnbd_dev[i].thread_lock); + spin_lock_init(&dnbd_dev[i].queue_lock); + spin_lock_init(&dnbd_dev[i].timer_lock); + + /* initialize up rx&tx queue */ + dnbd_dev[i].rx_thread.task = NULL; + dnbd_dev[i].tx_thread.task = NULL; + atomic_set(&dnbd_dev[i].num_io_threads, 0); + init_waitqueue_head(&dnbd_dev[i].io_waiters); + spin_lock_init(&dnbd_dev[i].rx_queue.lock); + INIT_LIST_HEAD(&dnbd_dev[i].rx_queue.head); + init_waitqueue_head(&dnbd_dev[i].rx_queue.waiters); + spin_lock_init(&dnbd_dev[i].tx_queue.lock); + INIT_LIST_HEAD(&dnbd_dev[i].tx_queue.head); + init_waitqueue_head(&dnbd_dev[i].tx_queue.waiters); + + /* initialize device characteristics */ + dnbd_dev[i].file = NULL; + dnbd_dev[i].magic = LO_MAGIC; + dnbd_dev[i].blksize = 1 << 9; + dnbd_dev[i].bytesize = 0; + disk->major = dnbd_major; + disk->first_minor = i; + disk->fops = &dnbd_fops; + disk->private_data = &dnbd_dev[i]; + disk->flags |= GENHD_FL_SUPPRESS_PARTITION_INFO; + sprintf(disk->disk_name, "dnbd%d", i); + sprintf(disk->devfs_name, "dnbd/%d", i); + set_capacity(disk, 0); + + /* initialize cache */ + dnbd_cache_init(&dnbd_dev[i].cache); + + /* initialize servers */ + dnbd_servers_init(&dnbd_dev[i].servers); + + /* register disk to kernel */ + add_disk(disk); + } + + return 0; + + out: + printk(KERN_CRIT "dnbd: could not initialize dnbd!\n"); + while (i--) { + blk_cleanup_queue(dnbd_dev[i].disk->queue); + put_disk(dnbd_dev[i].disk); + } + return err; +} + +/* unregister network block device */ +static void __exit dnbd_exit(void) +{ + int i; + char name[] = "dnbdxx"; + struct gendisk *disk; + + /* force disconnects */ + for (i = 0; i < MAX_DNBD; i++) { + if (!dnbd_disconnect(&dnbd_dev[i])) { + printk(KERN_INFO "dnbd%i: disconnected.\n", i); + } + } + + /* remove disks */ + for (i = 0; i < MAX_DNBD; i++) { + dnbd_rem_servers(&dnbd_dev[i].servers); + + disk = dnbd_dev[i].disk; + if (disk) { + del_gendisk(disk); + blk_cleanup_queue(disk->queue); + put_disk(disk); + } + } + devfs_remove("dnbd"); + unregister_blkdev(dnbd_major, "dnbd"); + + for (i = 0; (i < MAX_DNBD && i < 100); i++) { + sprintf(name, "dnbd%i", i); + remove_proc_entry(name, dnbd_proc_dir); + } + + remove_proc_entry("driver/dnbd", NULL); + + printk(KERN_INFO "dnbd: unregistered device.\n"); + +} + +module_init(dnbd_init); +module_exit(dnbd_exit); + +MODULE_DESCRIPTION("Distributed Network Block Device"); +MODULE_LICENSE("GPL"); diff --git a/kernel/net.c b/kernel/net.c new file mode 100644 index 0000000..e5c396b --- /dev/null +++ b/kernel/net.c @@ -0,0 +1,248 @@ +/* + * net.c - network stuff for DNBD + * Copyright (C) 2006 Thorsten Zitterell <thorsten@zitterell.de> + */ + +#include <linux/errno.h> +#include <linux/slab.h> +#include <linux/random.h> + +#include "net.h" + +/* return pointer to server structure */ +dnbd_server_t *dnbd_get_server(dnbd_servers_t * servers, int id) +{ + if ((0 < id) && (id <= SERVERS_MAX)) + return &servers->serverlist[id - 1]; + else + return NULL; +} + +/* add a new server */ +int dnbd_set_serverid(dnbd_servers_t * servers, int id) +{ + int result = -EINVAL; + + dnbd_server_t *server; + + if (!(server = dnbd_get_server(servers, id))) + goto out; + + switch (server->state) { + case SERVER_INACTIVE: + break; + case SERVER_ACTIVE: + result = -EEXIST; + goto out; + case SERVER_STALLED: + server->state = SERVER_ACTIVE; + result = 0; + goto out; + } + + server->state = SERVER_ACTIVE; + server->id = id; + server->srtt = servers->timeout_min; + server->weight = 0; + server->last_rx = jiffies; + server->last_tx = jiffies; + + servers->count++; + result = 0; + out: + return result; +} + +/* return server according to their weights (= probability) */ +int dnbd_next_server(dnbd_servers_t * servers) +{ + int i; + char rnd; + dnbd_server_t *server = NULL; + int id = 0; + int weightsum = 0; + + /* get random byte from kernel */ + get_random_bytes(&rnd, 1); + + for (i = 0; i < SERVERS_MAX; i++) { + server = &servers->serverlist[i]; + if ((server->state == SERVER_ACTIVE) + && ((weightsum += server->weight) > (unsigned char) rnd)) { + id = server->id; + break; + } + } + + /* alternatively, use server with highest weight */ +/* for (i = 0; i < SERVERS_MAX; i++) { + server = &servers->serverlist[i]; + if ((server->state == SERVER_ACTIVE) + && (server->weight > weight)) + id = server->id; + }*/ + + return id; +} + +/* remove a server */ +void dnbd_rem_servers(dnbd_servers_t * servers) +{ + if (!servers->serverlist) + return; + + kfree(servers->serverlist); + servers->serverlist = NULL; +} + +/* remove all servers */ +void dnbd_clean_servers(dnbd_servers_t * servers) +{ + int i; + for (i = 0; i < SERVERS_MAX; i++) { + servers->serverlist[i].state = 0; + } + +} + +/* update round trip time of a server */ +void dnbd_rtt_server(dnbd_servers_t * servers, int id, int rtt) +{ + dnbd_server_t *server; + + if (!(server = dnbd_get_server(servers, id))) + goto out; + + if (rtt > servers->timeout_max) + rtt = TIMEOUT_MAX; + else if (rtt < servers->timeout_min) + rtt = TIMEOUT_MIN; + + down(&servers->sema); + server->srtt = ((SRTT_BETA * server->srtt + + (((SRTT_BETA_BASE - SRTT_BETA) * rtt) << SRTT_SHIFT)) + / SRTT_BETA_BASE); + up(&servers->sema); + + out: + return; +} + +/* recalculate server weights */ +void dnbd_servers_weight(dnbd_servers_t * servers) +{ + int i; + int num_servers = 0; + long weightsum = 0; + long prod = 0; + long asrtt = 0; + int srtt = 0; + dnbd_server_t *server; + + /* + * float arithmetics in kernel would be nice... + */ + down(&servers->sema); + + for (i = 0; i < SERVERS_MAX; i++) { + server = &servers->serverlist[i]; + + if (server->state == SERVER_ACTIVE) { + if (server->last_tx > + server->last_rx + servers->timeout_stalled) { + printk(KERN_ERR + "dnbd: disable server #%i\n", + i + 1); + server->state = SERVER_STALLED; + continue; + } + srtt = (server->srtt ? server->srtt : 1); + weightsum += WEIGHT_FACTOR / srtt; + asrtt += srtt; + num_servers++; + } + } + + if (!num_servers) + goto out; + + servers->asrtt = asrtt / num_servers; + + for (i = 0; i < SERVERS_MAX; i++) { + server = &servers->serverlist[i]; + + if (server->state == SERVER_ACTIVE) { + srtt = (server->srtt ? server->srtt : 1); + prod = srtt * weightsum; + + if (prod > 0) + server->weight = WEIGHT_NORMAL * WEIGHT_FACTOR / prod; + else + server->weight = WEIGHT_NORMAL / num_servers; + } + } + out: + up(&servers->sema); + +} + +/* fill buffer with server statistics in human readable form for /proc */ +int dnbd_show_servers(dnbd_servers_t * servers, void *buf, int size) +{ + int i, n = 0; + dnbd_server_t *server; + + n += snprintf(buf + n, size - n, + " timeout_min: %i jiffies\n timeout_max: %i jiffies\n", + servers->timeout_min, servers->timeout_max); + + n += snprintf(buf + n, size - n, "Average SRTT: %i\n", + servers->asrtt >> SRTT_SHIFT); + + for (i = 0; i < SERVERS_MAX; i++) { + server = &servers->serverlist[i]; + + switch (server->state) { + case SERVER_INACTIVE: + continue; + case SERVER_STALLED: + n += snprintf(buf + n, size - n, + " id: %i (stalled)\n", server->id); + continue; + default: + n += snprintf(buf + n, size - n, " id: %i\n", + server->id); + } + n += snprintf(buf + n, size - n, + " srtt: %i\n", server->srtt >> SRTT_SHIFT); + n += snprintf(buf + n, size - n, + " weight: %i (of %i)\n", server->weight,WEIGHT_NORMAL); + } + + return n; +} + +/* initialize servers */ +int dnbd_servers_init(dnbd_servers_t * servers) +{ + int i; + + spin_lock_init(&servers->lock); + init_MUTEX(&servers->sema); + + if (!(servers->serverlist = + (dnbd_server_t *) kmalloc(SERVERS_MAX * + sizeof(dnbd_server_t), + GFP_KERNEL))) + return -EINVAL; + + for (i = 0; i < SERVERS_MAX; i++) { + servers->serverlist[i].state = 0; + } + + servers->count = 0; + servers->timeout_min = TIMEOUT_MIN; + servers->timeout_max = TIMEOUT_MAX; + servers->timeout_stalled = TIMEOUT_STALLED; + return 0; +} diff --git a/kernel/net.h b/kernel/net.h new file mode 100644 index 0000000..cdd7996 --- /dev/null +++ b/kernel/net.h @@ -0,0 +1,73 @@ +#ifndef LINUX_DNBD_NET_H +#define LINUX_DNBD_NET_H 1 + +#include <linux/spinlock.h> +#include <asm/semaphore.h> +#include <linux/list.h> +#include <linux/param.h> +#include <linux/jiffies.h> + +#define SERVERS_MAX 8 +#define SERVER_STALLED -1 +#define SERVER_INACTIVE 0 +#define SERVER_ACTIVE 1 + +/* limits and other constants for SRTT calculations */ +#define TIMEOUT_MIN 1 +#define TIMEOUT_MAX HZ / 4 +#define TIMEOUT_STALLED 5 * HZ +#define TIMEOUT_SHIFT 2 + +/* beta is 99% (990/1000) */ +#define SRTT_BETA 990 +#define SRTT_BETA_BASE 1000 +#define SRTT_SHIFT 10 + +/* normalize weights to 255 as there is no float arithmetic in kernel */ +#define WEIGHT_NORMAL ((1<<8)-1) +#define WEIGHT_FACTOR (1<<20) + +#define dnbd_rx_update(servers, id) \ +if ((id > 0) && (id <= SERVERS_MAX)) servers.serverlist[id-1].last_rx = jiffies; + +#define dnbd_tx_update(servers, id) \ +if ((id > 0) && (id <= SERVERS_MAX)) servers.serverlist[id-1].last_tx = jiffies; + +/* characteristics of a server */ +struct dnbd_server { + int id; + int state; + int srtt; + int weight; + unsigned long last_rx; /* in jiffies */ + unsigned long last_tx; /* in jiffies */ +}; + +typedef struct dnbd_server dnbd_server_t; + +/* common server information and helper variables */ +struct dnbd_servers { + struct dnbd_server *serverlist; + struct dnbd_server *server; + spinlock_t lock; + struct semaphore sema; + int timeout_min; + int timeout_max; + int timeout_stalled; + int asrtt; + int count; +}; + +typedef struct dnbd_servers dnbd_servers_t; + +/* functions */ +int dnbd_set_serverid(dnbd_servers_t * servers, int id); +int dnbd_next_server(dnbd_servers_t * servers); +void dnbd_rem_servers(dnbd_servers_t * servers); +void dnbd_rtt_server(dnbd_servers_t * servers, int id, int rtt); +int dnbd_servers_init(dnbd_servers_t *servers); +void dnbd_servers_weight(dnbd_servers_t * servers); +int dnbd_show_servers(dnbd_servers_t * servers, void *buf, int size); +void dnbd_clean_servers(dnbd_servers_t * servers); + +#endif diff --git a/kernel/queue.c b/kernel/queue.c new file mode 100644 index 0000000..30a0112 --- /dev/null +++ b/kernel/queue.c @@ -0,0 +1,126 @@ +/* + * queue.c - queues for requests to be submitted (tx_queue) + * and outstanding requests (rx_queue) + * Copyright (C) 2006 Thorsten Zitterell <thorsten@zitterell.de> + */ + +#include <linux/blkdev.h> +#include <linux/init.h> +#include <linux/sched.h> +#include <linux/fs.h> + +#include <linux/spinlock.h> + +#include <linux/in.h> + +#include "dnbd.h" +#include "queue.h" + +/* enqueue to a queue */ +void dnbd_enq_request(dnbd_queue_t * q, struct request *req, int wakeup) +{ + unsigned long flags; + spin_lock_irqsave(&q->lock, flags); + list_add(&req->queuelist, &q->head); + spin_unlock_irqrestore(&q->lock,flags); + if (wakeup) + wake_up(&q->waiters); +} + +/* dequeue from a queue with position */ +struct request *dnbd_deq_request_handle(dnbd_queue_t * q, uint64_t pos) +{ + struct request *req = NULL; + struct list_head *tmp; + unsigned long flags; + + spin_lock_irqsave(&q->lock,flags); + list_for_each(tmp, &q->head) { + req = blkdev_entry_to_request(tmp); + if (((u64) req->sector) << 9 == pos) { + list_del_init(&req->queuelist); + goto out; + } + + } + req = NULL; + out: + spin_unlock_irqrestore(&q->lock,flags); + return req; +} + +/* dequeue from queue */ +struct request *dnbd_deq_request(dnbd_queue_t * q) +{ + struct request *req = NULL; + unsigned long flags; + + spin_lock_irqsave(&q->lock, flags); + if (!list_empty(&q->head)) { + req = blkdev_entry_to_request(q->head.prev); + list_del_init(&req->queuelist); + } + spin_unlock_irqrestore(&q->lock, flags); + return req; +} + +/* sleep until request can be dequeued */ +struct request *dnbd_try_deq_request(dnbd_queue_t * q) +{ + struct request *req; + + + req = dnbd_deq_request(q); + if (!req) { + struct task_struct *tsk = current; + + DECLARE_WAITQUEUE(wait, tsk); + add_wait_queue(&q->waiters, &wait); + + for (;;) { + + set_current_state(TASK_INTERRUPTIBLE); + req = dnbd_deq_request(q); + + if (req || signal_pending(current)) + break; + + schedule(); + } + + set_current_state(TASK_RUNNING); + remove_wait_queue(&q->waiters, &wait); + } + + return req; +} + +/* requeue requests with timeout */ +int dnbd_requeue_requests(dnbd_queue_t * to, dnbd_queue_t * from, + unsigned long timeout) +{ + struct request *req = NULL; + struct list_head *tmp, *keep; + int requeued = 0; + unsigned long flags; + + spin_lock_irqsave(&from->lock,flags); + + list_for_each_safe(tmp, keep, &from->head) { + req = blkdev_entry_to_request(tmp); + if (req->start_time < timeout) { + requeued++; + list_del_init(&req->queuelist); + + spin_lock_irqsave(&to->lock,flags); + list_add(&req->queuelist, &to->head); + spin_unlock_irqrestore(&to->lock,flags); + } + } + + spin_unlock_irqrestore(&from->lock,flags); + + wake_up(&to->waiters); + + return requeued; +} diff --git a/kernel/queue.h b/kernel/queue.h new file mode 100644 index 0000000..f349637 --- /dev/null +++ b/kernel/queue.h @@ -0,0 +1,29 @@ +#ifndef LINUX_DNBD_QUEUE_H +#define LINUX_DNBD_QUEUE_H 1 + +#include <linux/spinlock.h> +#include <asm/semaphore.h> +#include <linux/list.h> +#include <linux/wait.h> + +/* queue structure used for rx_queue and tx_queue */ +struct dnbd_queue { + spinlock_t lock; + struct semaphore sema; + struct list_head head; + wait_queue_head_t waiters; +}; + +typedef struct dnbd_queue dnbd_queue_t; + +/* functions */ +void dnbd_enq_request(dnbd_queue_t * q, struct request *req, int wakeup); +struct request *dnbd_deq_request(dnbd_queue_t * q); +struct request *dnbd_deq_request_handle(dnbd_queue_t * q, uint64_t pos); +struct request *dnbd_try_deq_request(dnbd_queue_t * q); +void dnbd_mark_old_requests(dnbd_queue_t * q); +int dnbd_requeue_requests(dnbd_queue_t * to, dnbd_queue_t * from, unsigned long timeout); +void dnbd_error_old_requests(dnbd_queue_t * q); + + +#endif /* LINUX_QUEUE_H */ diff --git a/server/Makefile b/server/Makefile new file mode 100644 index 0000000..3d10466 --- /dev/null +++ b/server/Makefile @@ -0,0 +1,16 @@ +SERVER_BIN = dnbd-server +SERVER_SRC = filer.c net.c query.c server.c + +BINS = $(SERVER_BIN) + +CFLAGS = -Wall -D_GNU_SOURCE -D_LARGEFILE64_SOURCE -D_FILE_OFFSET_BITS=64 -O2 +LDFLAGS = -lpthread + +$(SERVER_BIN): + $(CC) $(CFLAGS) -o $@ $(SERVER_SRC) $(LDFLAGS) + +all: $(BINS) + +.PHONY: +clean: + -$(RM) *.o $(BINS) *~ diff --git a/server/filer.c b/server/filer.c new file mode 100644 index 0000000..b8412b7 --- /dev/null +++ b/server/filer.c @@ -0,0 +1,120 @@ +/* + * filer.c - open, seeks in and reads from a file + * + * Copyright (C) 2006 Thorsten Zitterell <thorsten@zitterell.de> + */ + +#include <stdlib.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#include <fcntl.h> +#include <string.h> +#include <stdio.h> +#include <errno.h> +#include <pthread.h> + +#include "filer.h" + +/* + * function filer_getcapacity() + * returns: size/capacity of file/device + */ +unsigned long long filer_getcapacity(filer_info_t * filer_info) +{ + return filer_info->size; +} + +/* + * function filer_seekblock(): seek to position in file/block device + * returns: 1 on success, otherwise 0 + */ +static inline int filer_seekblock(filer_info_t * filer_info, off_t newpos) +{ + if (lseek(filer_info->fd, newpos, SEEK_SET) == (off_t) -1) { + return 0; + } + filer_info->pos = newpos; + return 1; +} + +/* + * function filer_readblock(): read bytes at specific position + * returns: 1 on success, otherwise 0 + */ +inline int filer_readblock(filer_info_t * filer_info, void *buf, size_t size, + off_t pos) +{ + size_t remain = size; + int result = 0; + int numblocks = 0; + + if (!filer_seekblock(filer_info, pos)) goto leave; + + while (remain > 0) { + if ((numblocks = read(filer_info->fd, buf, remain)) <= 0) { + if (errno == EINTR) + continue; + goto leave; + } + + if (numblocks == 0) { + goto leave; + } + remain -= numblocks; + buf += numblocks; + } + result = 1; + leave: + filer_info->pos += (size - remain); + return result; +} + +/* + * function filer_init(): open file to be served + * returns: data structure with file information + */ +filer_info_t *filer_init(const char *filename) +{ + filer_info_t *filer_info; + struct stat64 stbuf; + + filer_info = (filer_info_t *) malloc(sizeof(filer_info_t)); + if (!filer_info) + return NULL; + + filer_info->filename = strdup(filename); + if ((filer_info->fd = open(filename, O_RDONLY | O_LARGEFILE)) < 0) { + fprintf(stderr, "ERROR: Cannot open filename \"%s\"\n", + filename); + goto out_free; + } + + stbuf.st_size = 0; + + if (fstat64(filer_info->fd, &stbuf) < 0) { + fprintf(stderr, "ERROR: Cannot stat file \"%s\"\n", + filename); + goto out_free; + } + + /* get file/device size */ + if ((filer_info->size = stbuf.st_size) == 0) { + filer_info->size = lseek64(filer_info->fd, (off_t) 0, SEEK_END); + } + + if (filer_info->size == 0) { + fprintf(stderr, "ERROR: File/device has zero size\n"); + goto out_free; + } + + goto out; + + out_free: + if (filer_info) + free(filer_info); + + filer_info = NULL; + out: + return filer_info; +} diff --git a/server/filer.h b/server/filer.h new file mode 100644 index 0000000..6e39bfa --- /dev/null +++ b/server/filer.h @@ -0,0 +1,19 @@ +#ifndef LINUX_DNBD_FILER_H +#define LINUX_DNBD_FILER_H 1 + +/* information of served file/block device */ +struct filer_info { + const char *filename; + int fd; + unsigned long long size; + off_t pos; +}; + +typedef struct filer_info filer_info_t; + +/* functions */ +unsigned long long filer_getcapacity(filer_info_t * filer); +int inline filer_readblock(filer_info_t * filer_info, void *buf, size_t size, off_t pos); +filer_info_t *filer_init(const char *filename); + +#endif diff --git a/server/net.c b/server/net.c new file mode 100644 index 0000000..02db9aa --- /dev/null +++ b/server/net.c @@ -0,0 +1,147 @@ +/* + * net.c - network stuff for the server + * Copyright (C) 2006 Thorsten Zitterell <thorsten@zitterell.de> + */ + +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <pthread.h> + +#include <string.h> +#include <stdlib.h> +#include <stdio.h> +#include <unistd.h> + +#define DNBD_USERSPACE 1 +#include "../common/dnbd-cliserv.h" + +#include "net.h" + +struct listener_s { + pthread_t tid; + net_request_t *request; +}; + +typedef struct listener_s listener_t; +listener_t listener; + +/* + * function net_tx(): send a server reply + */ +void net_tx(net_info_t * net_info, net_reply_t * reply) +{ + if (sendto + (net_info->sock, reply->data, reply->len, 0, + (struct sockaddr *) &net_info->groupnet, + sizeof(net_info->groupnet)) < 0) + fprintf(stderr, "net_tx: mcast sendproblem\n"); + +} + +/* + * function net_rx(): receive a client request + * returns: 1 on correct size of reply, otherwise 0 + */ +int net_rx(net_info_t * net_info, net_request_t * request) +{ + ssize_t n; + + request->clientlen = sizeof(request->client); + + n = recvfrom(net_info->sock, &request->data, + sizeof(request->data), 0, + &request->client, &request->clientlen); + + /* sizeof of request must be size of a DNBD request */ + return (n == sizeof(request->data) ? 1 : 0); +} + +/* + * function net_init(): initialize network for multicast + * returns: structure with network related information + */ +net_info_t *net_init(const char *mnet) +{ + struct ip_mreq mreq; + const int ttl = 64; /* TTL of 64 should be enough */ + u_char loop = 0; + + net_info_t *net_info = NULL; + + net_info = (net_info_t *) malloc(sizeof(net_info_t)); + if (!net_info) + return NULL; + + memset(net_info, 0, sizeof(net_info_t)); + + /* network setup */ + net_info->server.sin_family = AF_INET; + net_info->server.sin_port = htons(DNBD_PORT); + net_info->sock = socket(PF_INET, SOCK_DGRAM, 0); + + if (!inet_aton(mnet, &net_info->server.sin_addr)) { + fprintf(stderr, + "ERROR: multicast group %s is not a valid address!\n", + mnet); + goto out_free; + } + + if (bind + (net_info->sock, (struct sockaddr *) &net_info->server, + sizeof(net_info->server)) < 0) { + fprintf(stderr, "ERROR: binding socket!\n"); + goto out_free; + } + + if (!inet_aton(mnet, &net_info->groupnet.sin_addr)) { + fprintf(stderr, + "ERROR: multicast group %s is not a valid address!\n", + mnet); + goto out_free; + } + + /* multicast setup */ + net_info->groupnet.sin_family = AF_INET; + net_info->groupnet.sin_port = htons(DNBD_PORT); + + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + memcpy(&mreq.imr_multiaddr, &net_info->groupnet.sin_addr, + sizeof(struct in_addr)); + + if (setsockopt + (net_info->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, + sizeof(mreq)) < 0) { + fprintf(stderr, + "ERROR: cannot add multicast membership!\n"); + goto out_free; + } + + if (setsockopt(net_info->sock, IPPROTO_IP, IP_MULTICAST_TTL, + &ttl, sizeof(ttl)) < 0) { + fprintf(stderr, "ERROR: Setting TTL to 2\n"); + goto out_free; + } + + /* no looping, please */ + if (setsockopt + (net_info->sock, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, + sizeof(loop)) < 0) { + fprintf(stderr, + "ERROR: cannot disable multicast looping!\n"); + goto out_free; + } + + goto out; + + out_free: + fprintf(stderr, + "hint: check kernel multicast support, multicast routing\n"); + if (net_info) + free(net_info); + + net_info = NULL; + out: + return net_info; +} diff --git a/server/net.h b/server/net.h new file mode 100644 index 0000000..d0f12c6 --- /dev/null +++ b/server/net.h @@ -0,0 +1,51 @@ +#ifndef LINUX_DNBD_NET_H +#define LINUX_DNBD_NET_H 1 + +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> + +/* network information */ +struct net_info { + int sock; + struct sockaddr_in server; + struct sockaddr_in groupnet; +}; +typedef struct net_info net_info_t; + +/* structure for received network packet */ +struct net_request { + struct sockaddr_in client; + socklen_t clientlen; + dnbd_request_t data; + size_t len; +}; +typedef struct net_request net_request_t; + +/* structure for network packets to be sent */ +struct net_reply { + void *data; + size_t len; +}; +typedef struct net_reply net_reply_t; + + +/* struct net_info_s net_info; */ + +net_info_t * net_init(); + +/* functions */ +void net_tx(net_info_t *net, net_reply_t *reply); +int net_rx(net_info_t * net, net_request_t *request); + +/* network to host byte order */ +#include <endian.h> +#if __BYTE_ORDER == __BIG_ENDIAN +#define ntohll(x) (x) +#else +#define ntohll(x) bswap_64(x) +#endif + + + +#endif diff --git a/server/query.c b/server/query.c new file mode 100644 index 0000000..59d1864 --- /dev/null +++ b/server/query.c @@ -0,0 +1,349 @@ +/* + * query.c - request/reply handling for the server + * Copyright (C) 2006 Thorsten Zitterell <thorsten@zitterell.de> + */ + +#include <stdio.h> +#include <pthread.h> +#include <stdlib.h> +#include <string.h> + +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> + +#include <linux/types.h> +#include <unistd.h> +#include <time.h> + +#define DNBD_USERSPACE 1 +#include "../common/dnbd-cliserv.h" + +#include "query.h" + +/* number of threads used to service requests */ +#define NUM_HANDLER_THREADS 1 /* default */ +#define MAX_BLOCK_SIZE 4096 + +struct query_thread { + query_info_t *query_info; + int id; + pthread_t p_thread; +}; + +struct query_thread query_thread[NUM_HANDLER_THREADS]; + +/* recursive global mutex for our program. */ +pthread_mutex_t query_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; +/* mutex to avoid concurrent file access */ +pthread_mutex_t handler_mutex = PTHREAD_MUTEX_INITIALIZER; + +/* global condition variable for our program. */ +pthread_cond_t got_query = PTHREAD_COND_INITIALIZER; + +int num_queries = 0; /* number of pending requests, initially none */ +int max_queries = 100; /* this value should be high enough */ + +query_t *queries = NULL; /* head of linked list of requests. */ +int last_query = 0; /* initial position in circular buffer */ +int next_query = 0; + + +void query_handle(struct query_info *query_info, query_t * query); + +/* + * function query_add_loop(): add incoming requests to circular buffer + */ +void *query_add_loop(void *data) +{ + int rc; + query_t *query; + query_info_t *query_info = (query_info_t *) data; + + int tmp_query; + + while (1) { + + rc = pthread_mutex_lock(&query_mutex); + tmp_query = (next_query + 1) % max_queries; + rc = pthread_mutex_unlock(&query_mutex); + + if (tmp_query == last_query) + continue; + + query = &queries[next_query]; + + /* loop until a proper request arrives */ + while (!net_rx(query_info->net_info, &query->request)) {} + + rc = pthread_mutex_lock(&query_mutex); + + next_query = tmp_query; + + /* increase total number of pending requests */ + num_queries++; + + rc = pthread_mutex_unlock(&query_mutex); + + /* signal that there's a new request to handle */ + rc = pthread_cond_signal(&got_query); + } +} + +/* + * function: query_get(): fetch request from circular buffer + * returns: pointer to request + */ +query_t *query_get(pthread_mutex_t * p_mutex) +{ + int rc; + query_t *query; /* pointer to request */ + + rc = pthread_mutex_lock(p_mutex); + + if (last_query == next_query) + return NULL; + + query = &queries[last_query]; + + last_query = (last_query + 1) % max_queries; + num_queries--; + + rc = pthread_mutex_unlock(p_mutex); + /* return the request to the caller */ + return query; +} + +/* + * function query_handle(): handle a single request. + */ +void query_handle(struct query_info *query_info, query_t * query) +{ + int i, rc; + dnbd_request_t *dnbd_request; + dnbd_request_t *dnbd_old_request; + dnbd_reply_t *dnbd_reply = NULL; + struct dnbd_reply_init *dnbd_reply_init; + int tmp_query; + int recent = 0; + time_t timestamp; + + dnbd_request = (dnbd_request_t *) & query->request.data; + + query->reply.len = 0; + + /* convert data from network to host byte order */ + dnbd_request->magic = ntohl(dnbd_request->magic); + dnbd_request->time = ntohs(dnbd_request->time); + dnbd_request->id = ntohs(dnbd_request->id); + dnbd_request->cmd = ntohs(dnbd_request->cmd); + dnbd_request->pos = ntohll(dnbd_request->pos); + dnbd_request->len = ntohs(dnbd_request->len); + + if (dnbd_request->magic != DNBD_MAGIC) + return; + + /* we ususally only respond to a client */ + if (!(dnbd_request->cmd & DNBD_CMD_CLI)) + return; + + /* does the client ask for our id? */ + if (dnbd_request->id && (dnbd_request->id != query_info->id)) + return; + + switch (dnbd_request->cmd & DNBD_CMD_MASK) { + /* handle init request */ + case DNBD_CMD_INIT: + /* handle heartbeat request */ + case DNBD_CMD_HB: + dnbd_reply_init = + (struct dnbd_reply_init *) query->reply.data; + dnbd_reply_init->magic = htonl(DNBD_MAGIC); + + dnbd_reply_init->capacity = + htonll(filer_getcapacity(query_info->filer_info)); + + dnbd_reply_init->cmd = + htons((dnbd_request->cmd + & ~DNBD_CMD_CLI) | DNBD_CMD_SRV); + + dnbd_reply_init->blksize = htons(MAX_BLOCK_SIZE); + dnbd_reply_init->id = htons(query_info->id); + + query->reply.len = sizeof(struct dnbd_reply_init); + + net_tx(query_info->net_info, &query->reply); + break; + /* handle read request */ + case DNBD_CMD_READ: + timestamp = time(NULL); + + /* burst avoidance */ + rc = pthread_mutex_lock(&query_mutex); + for (i = 2; i < max_queries; i++) { + + tmp_query = + (last_query + (max_queries - i)) % max_queries; + + if (tmp_query == last_query) + break; + + /* check only up to one second */ + if (!tmp_query + || queries[tmp_query].time - timestamp > 1) { + break; + } + dnbd_old_request = + (dnbd_request_t *) & queries[tmp_query]. + request.data; + + /* someone requested the same block before? */ + if (dnbd_request->pos == dnbd_old_request->pos) { + /* was it the same client, then retransmit + as the packet was probably lost, otherwise + drop the request */ + if (!((query->request.clientlen == + queries[tmp_query].request.clientlen) + && + (!memcmp + (&query->request.client, + &queries[tmp_query].request.client, + query->request.clientlen)))) { + recent = 1; + break; + } + else + break; + } + } + rc = pthread_mutex_unlock(&query_mutex); + + if (recent) + break; + + /* size of request block too high? */ + if (dnbd_request->len > MAX_BLOCK_SIZE) + break; + + /* create a DNBD reply packet */ + dnbd_reply = (dnbd_reply_t *) query->reply.data; + + dnbd_reply->magic = htonl(DNBD_MAGIC); + dnbd_reply->time = htons(dnbd_request->time); + dnbd_reply->id = htons(query_info->id); + dnbd_reply->pos = htonll(dnbd_request->pos); + + dnbd_reply->cmd = + htons((dnbd_request->cmd + & ~DNBD_CMD_CLI) | DNBD_CMD_SRV); + + /* read from underlying device/file */ + pthread_mutex_lock(&handler_mutex); + filer_readblock(query_info->filer_info, + (void *) dnbd_reply + + sizeof(struct dnbd_reply), + dnbd_request->len, dnbd_request->pos); + + pthread_mutex_unlock(&handler_mutex); + + query->reply.len = + dnbd_request->len + sizeof(dnbd_reply_t); + + query->time = time(NULL); + + /* send reply */ + net_tx(query_info->net_info, &query->reply); + break; + } + + +} + +/* + * function query_handle_loop(): get queries and handle them in a loop + */ +void *query_handle_loop(void *data) +{ + int rc; + query_t *query; /* pointer to a request */ + int thread_id = *((int *) data); /* thread id */ + + printf("Starting thread '%d'\n", thread_id); + fflush(stdout); + + rc = pthread_mutex_lock(&query_mutex); + + /* do forever.... */ + while (1) { + + if (num_queries > 0) { + /* a request is pending */ + query = query_get(&query_mutex); + + /* got a request? */ + if (query) { + + rc = pthread_mutex_unlock(&query_mutex); + /* handle request */ + query_handle(query_thread[thread_id]. + query_info, query); + + rc = pthread_mutex_lock(&query_mutex); + } + } else { + /* wait for a request to arrive */ + rc = pthread_cond_wait(&got_query, &query_mutex); + } + } +} + +/* + * function query_init(): initialize request handling + * returns: pointer to data structure query_info (see header file) + */ +query_info_t *query_init(net_info_t * net_info, filer_info_t * filer_info, + int id, int threads) +{ + int i; + query_info_t *query_info = NULL; + + query_info = (query_info_t *) malloc(sizeof(query_info_t)); + if (!query_info) + return NULL; + + /* fill query_info structure */ + query_info->net_info = net_info; + query_info->filer_info = filer_info; + query_info->id = id; + + if (!(queries = (query_t *) malloc(sizeof(query_t) * max_queries))) { + free(query_info); + return NULL; + } + + last_query = 0; + next_query = 0; + + /* reserve memory for circular buffer */ + for (i = 0; i < max_queries; i++) { + queries[i].reply.data = + malloc(MAX_BLOCK_SIZE + sizeof(dnbd_reply_t)); + } + + /* create the request-handling threads */ + for (i = 0; i < threads; i++) { + + query_thread[i].id = i; + query_thread[i].query_info = query_info; + + pthread_create(&query_thread[i].p_thread, NULL, + query_handle_loop, + (void *) &query_thread[i].id); + } + + /* create thread for receiving network requests */ + pthread_create(&query_info->p_thread, NULL, + query_add_loop, (void *) query_info); + + return query_info; +} diff --git a/server/query.h b/server/query.h new file mode 100644 index 0000000..f4a9376 --- /dev/null +++ b/server/query.h @@ -0,0 +1,42 @@ +#ifndef LINUX_DNBD_REQUEST_H +#define LINUX_DNBD_REQUEST_H 1 + +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <time.h> + +#include "net.h" +#include "filer.h" + +struct query_info { + pthread_t p_thread; + net_info_t *net_info; + filer_info_t *filer_info; + int id; +}; + +typedef struct query_info query_info_t; + +/* query information for requests and replies */ +struct query { + time_t time; + net_request_t request; + net_reply_t reply; +}; + +typedef struct query query_t; + +/* functions */ +query_info_t *query_init(net_info_t *, filer_info_t *, int id, int threads); + +/* host to network byte order */ +#include <endian.h> +#if __BYTE_ORDER == __BIG_ENDIAN +#define htonll(x) (x) +#else +#define htonll(x) bswap_64(x) +#endif + + +#endif diff --git a/server/server.c b/server/server.c new file mode 100644 index 0000000..b701e4c --- /dev/null +++ b/server/server.c @@ -0,0 +1,216 @@ +/* + * main.c - central part of the DNBD server application + * Copyright (C) 2006 Thorsten Zitterell <thorsten@zitterell.de> + */ + +#include <stdio.h> +#include <string.h> +#include <getopt.h> +#include <unistd.h> +#include <stdlib.h> + +/* network includes */ +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <signal.h> + +#define DNBD_USERSPACE 1 +#include "../common/dnbd-cliserv.h" + +#include "server.h" +#include "query.h" +#include "net.h" +#include "filer.h" + + +static int verbose = 0; +static int running = 1; + +/* + * function: handle_signal(): set global variable running to 0 if signal arrives + */ +void handle_signal(int signum) +{ + running = 0; +} + +void server_help(void) +{ + fprintf(stderr, "dnbd-server, version %s\n", DNBD_VERSION); + fprintf(stderr, + "Usage: dnbd-server -m <address> -d <device/file> -i <number>\n"); + fprintf(stderr, "\n"); + fprintf(stderr, "description:\n"); + fprintf(stderr, " -m|--mcast <multicast-address>\n"); + fprintf(stderr, " -d|--device <block device or file>\n"); + fprintf(stderr, " -i|--id <unique identification number>\n"); + fprintf(stderr, " -t|--threads <number of threads>\n"); +} + +/* + * function: server_init(): parse command lines + */ +server_info_t *server_init(int argc, char **argv) +{ + /* cmd + * -1: error + * 0: not defined + * 1: serve + */ + int cmd = 0; + server_info_t *server_info = NULL; + + server_info = (server_info_t *) malloc(sizeof(server_info_t)); + if (!server_info) + return NULL; + + memset(server_info, 0, sizeof(server_info_t)); + + server_info->threads = 1; + + /* return value for getopt */ + int c; + + while (1) { + static struct option long_options[] = { + {"verbose", no_argument, 0, 'v'}, + {"mcast", required_argument, 0, 'm'}, + {"device", required_argument, 0, 'd'}, + {"threads", required_argument, 0, 't'}, + {"id", required_argument, 0, 'i'}, + {0, 0, 0, 0} + }; + /* option index for getopt_long */ + int option_index = 0; + + c = getopt_long(argc, argv, "vm:d:i:t:", + long_options, &option_index); + + /* at end of options? */ + if (c == -1) + break; + + /* + cmd = (cmd ? -1 : xx) is used to set cmd when it was + unset (0) before. Otherwise save error value + */ + switch (c) { + case 'v': + verbose++; + break; + case 'm': + server_info->mnet = optarg; /* multicast address */ + break; + case 'd': + cmd = (cmd ? -1 : 2); /* device/file */ + server_info->filename = optarg; + break; + case 'i': + if (sscanf(optarg, "%u",&server_info->id) != 1) { + fprintf(stderr,"ERROR: Id not a 16bit-integer (>0)\n"); + cmd = -1; + } + break; + case 't': + if (sscanf(optarg, "%u",&server_info->threads) != 1) { + fprintf(stderr,"ERROR: Number of threads is wrong (>0)\n"); + cmd = -1; + } + break; + + default: + cmd = -1; + } + + if (cmd < 0) break; + } + + /* no/wrong command given? */ + if (cmd <= 0) { + server_help(); + goto out_free; + } + + if (!server_info->mnet) { + fprintf(stderr, "ERROR: multicast group was not set!\n"); + goto out_free; + } + + if (!(server_info->id > 0)) { + fprintf(stderr, "ERROR: unique id not set or not valid!\n"); + goto out_free; + } + + if (!(server_info->threads > 0)) { + fprintf(stderr, "ERROR: number of threads is not valid!\n"); + goto out_free; + } + + + /* call function for command */ + goto out; + + out_free: + if (server_info) + free(server_info); + server_info = NULL; + out: + return server_info; +} + +/* + * function: main(): server startup + */ +int main(int argc, char **argv) +{ + + server_info_t *server_info; + + signal(SIGINT, handle_signal); + + /* parse and verify command line options */ + if (!(server_info = server_init(argc, argv))) { + fprintf(stderr, "ERROR: Parsing arguments!\n"); + goto out_server; + } + + /* initialize network configuration and start listener thread */ + if (!(server_info->net_info = net_init(server_info->mnet))) { + fprintf(stderr, "ERROR: Initializing net!\n"); + goto out_net; + } + + if (!(server_info->filer_info = filer_init(server_info->filename))) { + fprintf(stderr, "ERROR: Initializing filer!\n"); + goto out_filer; + } + + /* initialize threads to handle requests */ + if (! + (server_info->query_info = + query_init(server_info->net_info, server_info->filer_info, + server_info->id, server_info->threads))) { + fprintf(stderr, "ERROR: Initializing query!\n"); + goto out_query; + } + + while (running) + pause(); + + fprintf(stdout, "cleaning up...\n"); + out_query: + if (server_info->filer_info) + free(server_info->filer_info); + + out_filer: + if (server_info->net_info) + free(server_info->net_info); + + out_net: + if (server_info) { + free(server_info); + } + out_server: + return 0; +} diff --git a/server/server.h b/server/server.h new file mode 100644 index 0000000..3e36bc3 --- /dev/null +++ b/server/server.h @@ -0,0 +1,21 @@ +#ifndef LINUX_DNBD_SERVER_H +#define LINUX_DNBD_SERVER_H 1 + +#include "filer.h" +#include "net.h" +#include "query.h" + +/* server relevant information mainly given by command line */ +struct server_info { + const char *filename; + int id; + int threads; + const char *mnet; + filer_info_t *filer_info; + net_info_t *net_info; + query_info_t *query_info; +}; + +typedef struct server_info server_info_t; + +#endif |