diff options
| author | Simon Rettberg | 2015-06-02 19:53:31 +0200 |
|---|---|---|
| committer | Simon Rettberg | 2015-06-02 19:53:31 +0200 |
| commit | 1bc83891c68ee269727e81a13cc70da698bcc7a7 (patch) | |
| tree | b052a72ad7d65864068752f71c5ed2b49a171276 /dozentenmodulserver/src/main/java/fileserv | |
| parent | [server] Started work on the internal file server (diff) | |
| download | tutor-module-1bc83891c68ee269727e81a13cc70da698bcc7a7.tar.gz tutor-module-1bc83891c68ee269727e81a13cc70da698bcc7a7.tar.xz tutor-module-1bc83891c68ee269727e81a13cc70da698bcc7a7.zip | |
[server] Compiling again, still lots of stubs
Diffstat (limited to 'dozentenmodulserver/src/main/java/fileserv')
4 files changed, 208 insertions, 16 deletions
diff --git a/dozentenmodulserver/src/main/java/fileserv/ActiveUpload.java b/dozentenmodulserver/src/main/java/fileserv/ActiveUpload.java index cf73a413..256f8b8d 100644 --- a/dozentenmodulserver/src/main/java/fileserv/ActiveUpload.java +++ b/dozentenmodulserver/src/main/java/fileserv/ActiveUpload.java @@ -1,10 +1,11 @@ package fileserv; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ThreadPoolExecutor; import org.apache.log4j.Logger; @@ -12,6 +13,7 @@ import org.openslx.filetransfer.DataReceivedCallback; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.FileRange; import org.openslx.filetransfer.WantRangeCallback; +import org.openslx.imagemaster.thrift.iface.UserInfo; public class ActiveUpload { private static final Logger LOGGER = Logger.getLogger(ActiveUpload.class); @@ -21,19 +23,28 @@ public class ActiveUpload { */ private Downloader download = null; - private final String destinationFile; + private final File destinationFile; private final RandomAccessFile outFile; - private ConcurrentLinkedQueue<FileChunk> chunks = new ConcurrentLinkedQueue<>(); + private final ChunkList chunks; - // TODO: Hashlist for verification + private final long fileSize; - public ActiveUpload(String destinationFile, long fileSize, List<byte[]> sha1Sums) + /** + * User owning this uploaded file. + */ + private final UserInfo owner; + + // TODO: Use HashList for verification + + public ActiveUpload(UserInfo owner, File destinationFile, long fileSize, List<ByteBuffer> sha1Sums) throws FileNotFoundException { this.destinationFile = destinationFile; - outFile = new RandomAccessFile(destinationFile, "rw"); - FileChunk.createChunkList(chunks, fileSize, sha1Sums); + this.outFile = new RandomAccessFile(destinationFile, "rw"); + this.chunks = new ChunkList(fileSize, sha1Sums); + this.owner = owner; + this.fileSize = fileSize; } /** @@ -45,7 +56,7 @@ public class ActiveUpload { * discarded */ public synchronized boolean addConnection(Downloader connection, ThreadPoolExecutor pool) { - if (download != null) + if (download != null || chunks.isComplete()) return false; download = connection; pool.execute(new Runnable() { @@ -55,7 +66,7 @@ public class ActiveUpload { if (!download.download(cbh, cbh) && cbh.currentChunk != null) { // If the download failed and we have a current chunk, put it back into // the queue, so it will be handled again later... - chunks.add(cbh.currentChunk); + chunks.markFailed(cbh.currentChunk); } } }); @@ -86,6 +97,27 @@ public class ActiveUpload { } /** + * Get user owning this upload. Can be null in special cases. + * + * @return instance of UserInfo for the according user. + */ + public UserInfo getOwner() { + return this.owner; + } + + public boolean isComplete() { + return chunks.isComplete() && destinationFile.length() == this.fileSize; + } + + public File getDestinationFile() { + return this.destinationFile; + } + + public long getSize() { + return this.fileSize; + } + + /** * Callback class for an instance of the Downloader, which supplies * the Downloader with wanted file ranges, and handles incoming data. */ @@ -111,11 +143,13 @@ public class ActiveUpload { // This needs to be async (own thread) so will be a little complicated } // Get next missing chunk - currentChunk = chunks.poll(); + currentChunk = chunks.getMissing(); if (currentChunk == null) return null; // No more chunks, returning null tells the Downloader we're done. return currentChunk.range; } } + // TODO: Clean up old stale uploads + } diff --git a/dozentenmodulserver/src/main/java/fileserv/ChunkList.java b/dozentenmodulserver/src/main/java/fileserv/ChunkList.java new file mode 100644 index 00000000..95b3e1fa --- /dev/null +++ b/dozentenmodulserver/src/main/java/fileserv/ChunkList.java @@ -0,0 +1,78 @@ +package fileserv; + +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; + +import org.apache.log4j.Logger; + +public class ChunkList { + + private static final Logger LOGGER = Logger.getLogger(ChunkList.class); + + /** + * Chunks that are missing from the file + */ + private final List<FileChunk> missingChunks = new LinkedList<>(); + + /** + * Chunks that are currently being uploaded or hash-checked + */ + private final List<FileChunk> pendingChunks = new LinkedList<>(); + + // Do we need to keep valid chunks, or chunks that failed too many times? + + public ChunkList(long fileSize, List<ByteBuffer> sha1Sums) { + FileChunk.createChunkList(missingChunks, fileSize, sha1Sums); + } + + /** + * Get a missing chunk, marking it pending. + * + * @return chunk marked as missing + */ + public synchronized FileChunk getMissing() { + if (missingChunks.isEmpty()) + return null; + FileChunk c = missingChunks.remove(0); + pendingChunks.add(c); + return c; + } + + /** + * Mark a chunk currently transferring as successfully transfered. + * + * @param c The chunk in question + */ + public synchronized void markSuccessful(FileChunk c) { + if (!pendingChunks.remove(c)) { + LOGGER.warn("Inconsistent state: markTransferred called for Chunk " + c.toString() + + ", but chunk is not marked as currently transferring!"); + return; + } + } + + /** + * Mark a chunk currently transferring or being hash checked as failed + * transfer. This increases its fail count and re-adds it to the list of + * missing chunks. + * + * @param c The chunk in question + * @return Number of times transfer of this chunk failed + */ + public synchronized int markFailed(FileChunk c) { + if (!pendingChunks.remove(c)) { + LOGGER.warn("Inconsistent state: markTransferred called for Chunk " + c.toString() + + ", but chunk is not marked as currently transferring!"); + return -1; + } + // Add as first element so it will be re-transmitted immediately + missingChunks.add(0, c); + return c.incFailed(); + } + + public synchronized boolean isComplete() { + return missingChunks.isEmpty() && pendingChunks.isEmpty(); + } + +} diff --git a/dozentenmodulserver/src/main/java/fileserv/FileChunk.java b/dozentenmodulserver/src/main/java/fileserv/FileChunk.java index 2fee6378..1a95d27c 100644 --- a/dozentenmodulserver/src/main/java/fileserv/FileChunk.java +++ b/dozentenmodulserver/src/main/java/fileserv/FileChunk.java @@ -1,39 +1,55 @@ package fileserv; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import org.openslx.filetransfer.FileRange; public class FileChunk { - public static final int CHUNK_SIZE = 16 * 1024 * 1024; + + public static final int CHUNK_SIZE_MIB = 16; + public static final int CHUNK_SIZE = CHUNK_SIZE_MIB * (1024 * 1024); public final FileRange range; public final byte[] sha1sum; + private int failCount = 0; public FileChunk(long startOffset, long endOffset, byte[] sha1sum) { this.range = new FileRange(startOffset, endOffset); this.sha1sum = sha1sum; } + /** + * Signal that transferring this chunk seems to have failed (checksum + * mismatch). + * + * @return Number of times the transfer failed now + */ + public synchronized int incFailed() { + return ++failCount; + } + + // + public static int fileSizeToChunkCount(long fileSize) { return (int) ((fileSize + CHUNK_SIZE - 1) / CHUNK_SIZE); } - public static void createChunkList(Collection<FileChunk> list, long fileSize, List<byte[]> sha1sums) { + public static void createChunkList(Collection<FileChunk> list, long fileSize, List<ByteBuffer> sha1Sums) { if (fileSize < 0) throw new IllegalArgumentException("fileSize cannot be negative"); long chunkCount = fileSizeToChunkCount(fileSize); - if (sha1sums != null) { - if (sha1sums.size() != chunkCount) + if (sha1Sums != null) { + if (sha1Sums.size() != chunkCount) throw new IllegalArgumentException( "Passed a sha1sum list, but hash count in list doesn't match expected chunk count"); long offset = 0; - for (byte[] sha1sum : sha1sums) { // Do this as we don't know how efficient List.get(index) is... + for (ByteBuffer sha1sum : sha1Sums) { // Do this as we don't know how efficient List.get(index) is... long end = offset + CHUNK_SIZE; if (end > fileSize) end = fileSize; - list.add(new FileChunk(offset, end, sha1sum)); + list.add(new FileChunk(offset, end, sha1sum.array())); offset = end; } return; diff --git a/dozentenmodulserver/src/main/java/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/fileserv/FileServer.java index 446b982e..8322e2e9 100644 --- a/dozentenmodulserver/src/main/java/fileserv/FileServer.java +++ b/dozentenmodulserver/src/main/java/fileserv/FileServer.java @@ -1,13 +1,24 @@ package fileserv; +import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.IncomingEvent; import org.openslx.filetransfer.Listener; import org.openslx.filetransfer.Uploader; +import org.openslx.imagemaster.thrift.iface.UserInfo; +import org.openslx.sat.thrift.iface.TUploadRejectedException; + +import util.Constants; +import util.Formatter; public class FileServer implements IncomingEvent { @@ -21,6 +32,15 @@ public class FileServer implements IncomingEvent { */ private Map<String, ActiveUpload> uploads = new ConcurrentHashMap<>(); + private static final FileServer globalInstance = new FileServer(); + + private FileServer() { + } + + public static FileServer instance() { + return globalInstance; + } + public boolean start() { boolean ret = plainListener.start(); // TODO: Start SSL listener too @@ -39,4 +59,48 @@ public class FileServer implements IncomingEvent { } + /** + * Get an upload instance by given token. + * + * @param uploadToken + * @return + */ + public ActiveUpload getUploadByToken(String uploadToken) { + return uploads.get(uploadToken); + } + + public String createNewUserUpload(UserInfo owner, long fileSize, List<ByteBuffer> sha1Sums) + throws TUploadRejectedException, FileNotFoundException { + Iterator<ActiveUpload> it = uploads.values().iterator(); + int activeUploads = 0; + while (it.hasNext()) { + ActiveUpload upload = it.next(); + if (upload.isComplete()) { + // TODO: Check age (short timeout) and remove + continue; + } else { + // Check age (long timeout) and remove + } + activeUploads++; + } + if (activeUploads > Constants.MAX_UPLOADS) + throw new TUploadRejectedException("Server busy. Too many running uploads."); + File destinationFile = null; + do { + destinationFile = Formatter.getTempImageName(); + } while (destinationFile.exists()); + ActiveUpload upload = new ActiveUpload(owner, destinationFile, fileSize, sha1Sums); + String key = UUID.randomUUID().toString(); + uploads.put(key, upload); + return key; + } + + public int getPlainPort() { + return plainListener.getPort(); + } + + public int getSslPort() { + return 0; // TODO + } + } |
