From 8102e56cd9ebe064a1f4757b8d28c64661ab7cb3 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Fri, 29 May 2015 17:58:25 +0200 Subject: [server] Started work on the internal file server --- .../src/main/java/fileserv/ActiveUpload.java | 121 +++++++++++++++++++++ .../src/main/java/fileserv/FileChunk.java | 50 +++++++++ .../src/main/java/fileserv/FileServer.java | 42 +++++++ 3 files changed, 213 insertions(+) create mode 100644 dozentenmodulserver/src/main/java/fileserv/ActiveUpload.java create mode 100644 dozentenmodulserver/src/main/java/fileserv/FileChunk.java create mode 100644 dozentenmodulserver/src/main/java/fileserv/FileServer.java (limited to 'dozentenmodulserver/src/main/java') diff --git a/dozentenmodulserver/src/main/java/fileserv/ActiveUpload.java b/dozentenmodulserver/src/main/java/fileserv/ActiveUpload.java new file mode 100644 index 00000000..cf73a413 --- /dev/null +++ b/dozentenmodulserver/src/main/java/fileserv/ActiveUpload.java @@ -0,0 +1,121 @@ +package fileserv; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ThreadPoolExecutor; + +import org.apache.log4j.Logger; +import org.openslx.filetransfer.DataReceivedCallback; +import org.openslx.filetransfer.Downloader; +import org.openslx.filetransfer.FileRange; +import org.openslx.filetransfer.WantRangeCallback; + +public class ActiveUpload { + private static final Logger LOGGER = Logger.getLogger(ActiveUpload.class); + + /** + * This is an active upload, so on our end, we have a Downloader. + */ + private Downloader download = null; + + private final String destinationFile; + + private final RandomAccessFile outFile; + + private ConcurrentLinkedQueue chunks = new ConcurrentLinkedQueue<>(); + + // TODO: Hashlist for verification + + public ActiveUpload(String destinationFile, long fileSize, List sha1Sums) + throws FileNotFoundException { + this.destinationFile = destinationFile; + outFile = new RandomAccessFile(destinationFile, "rw"); + FileChunk.createChunkList(chunks, fileSize, sha1Sums); + } + + /** + * Add another connection for this file transfer. Currently only one + * connection is allowed, but this might change in the future. + * + * @param connection + * @return true if the connection is accepted, false if it should be + * discarded + */ + public synchronized boolean addConnection(Downloader connection, ThreadPoolExecutor pool) { + if (download != null) + return false; + download = connection; + pool.execute(new Runnable() { + @Override + public void run() { + CbHandler cbh = new CbHandler(); + if (!download.download(cbh, cbh) && cbh.currentChunk != null) { + // If the download failed and we have a current chunk, put it back into + // the queue, so it will be handled again later... + chunks.add(cbh.currentChunk); + } + } + }); + return true; + } + + /** + * Write some data to the local file. Thread safe so we could + * have multiple concurrent connections later. + * + * @param fileOffset + * @param dataLength + * @param data + * @return + */ + private boolean writeFileData(long fileOffset, int dataLength, byte[] data) { + synchronized (outFile) { + try { + outFile.seek(fileOffset); + outFile.write(data, 0, dataLength); + } catch (IOException e) { + LOGGER.error("Cannot write to '" + destinationFile + + "'. Disk full, network storage error, bad permissions, ...?", e); + return false; + } + } + return true; + } + + /** + * Callback class for an instance of the Downloader, which supplies + * the Downloader with wanted file ranges, and handles incoming data. + */ + private class CbHandler implements WantRangeCallback, DataReceivedCallback { + /** + * The current chunk being transfered. + */ + public FileChunk currentChunk = null; + + @Override + public boolean dataReceived(long fileOffset, int dataLength, byte[] data) { + // TODO: Maybe cache in RAM and write full CHUNK_SIZE blocks at a time? + // Would probably help with slower storage, especially if it's using + // rotating disks and we're running multiple uploads. + // Also we wouldn't have to re-read a block form disk for sha1 checking. + return writeFileData(fileOffset, dataLength, data); + } + + @Override + public FileRange get() { + if (currentChunk != null) { + // TODO: A chunk was requested before, check hash and requeue if not matching + // This needs to be async (own thread) so will be a little complicated + } + // Get next missing chunk + currentChunk = chunks.poll(); + if (currentChunk == null) + return null; // No more chunks, returning null tells the Downloader we're done. + return currentChunk.range; + } + } + +} diff --git a/dozentenmodulserver/src/main/java/fileserv/FileChunk.java b/dozentenmodulserver/src/main/java/fileserv/FileChunk.java new file mode 100644 index 00000000..2fee6378 --- /dev/null +++ b/dozentenmodulserver/src/main/java/fileserv/FileChunk.java @@ -0,0 +1,50 @@ +package fileserv; + +import java.util.Collection; +import java.util.List; + +import org.openslx.filetransfer.FileRange; + +public class FileChunk { + public static final int CHUNK_SIZE = 16 * 1024 * 1024; + + public final FileRange range; + public final byte[] sha1sum; + + public FileChunk(long startOffset, long endOffset, byte[] sha1sum) { + this.range = new FileRange(startOffset, endOffset); + this.sha1sum = sha1sum; + } + + public static int fileSizeToChunkCount(long fileSize) { + return (int) ((fileSize + CHUNK_SIZE - 1) / CHUNK_SIZE); + } + + public static void createChunkList(Collection list, long fileSize, List sha1sums) { + if (fileSize < 0) + throw new IllegalArgumentException("fileSize cannot be negative"); + long chunkCount = fileSizeToChunkCount(fileSize); + if (sha1sums != null) { + if (sha1sums.size() != chunkCount) + throw new IllegalArgumentException( + "Passed a sha1sum list, but hash count in list doesn't match expected chunk count"); + long offset = 0; + for (byte[] sha1sum : sha1sums) { // Do this as we don't know how efficient List.get(index) is... + long end = offset + CHUNK_SIZE; + if (end > fileSize) + end = fileSize; + list.add(new FileChunk(offset, end, sha1sum)); + offset = end; + } + return; + } + long offset = 0; + while (offset < fileSize) { // ...otherwise we could share this code + long end = offset + CHUNK_SIZE; + if (end > fileSize) + end = fileSize; + list.add(new FileChunk(offset, end, null)); + offset = end; + } + } +} diff --git a/dozentenmodulserver/src/main/java/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/fileserv/FileServer.java new file mode 100644 index 00000000..446b982e --- /dev/null +++ b/dozentenmodulserver/src/main/java/fileserv/FileServer.java @@ -0,0 +1,42 @@ +package fileserv; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.openslx.filetransfer.Downloader; +import org.openslx.filetransfer.IncomingEvent; +import org.openslx.filetransfer.Listener; +import org.openslx.filetransfer.Uploader; + +public class FileServer implements IncomingEvent { + + /** + * Listener for incoming unencrypted connections + */ + private Listener plainListener = new Listener(this, null, 9092); // TODO: Config + + /** + * All currently running uploads, indexed by token + */ + private Map uploads = new ConcurrentHashMap<>(); + + public boolean start() { + boolean ret = plainListener.start(); + // TODO: Start SSL listener too + return ret; + } + + @Override + public void incomingDownloadRequest(Uploader uploader) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void incomingUploadRequest(Downloader downloader) throws IOException { + // TODO Auto-generated method stub + + } + +} -- cgit v1.2.3-55-g7522