summaryrefslogtreecommitdiffstats
path: root/dozentenmodulserver/src/main/java/fileserv
diff options
context:
space:
mode:
authorSimon Rettberg2015-06-02 19:53:31 +0200
committerSimon Rettberg2015-06-02 19:53:31 +0200
commit1bc83891c68ee269727e81a13cc70da698bcc7a7 (patch)
treeb052a72ad7d65864068752f71c5ed2b49a171276 /dozentenmodulserver/src/main/java/fileserv
parent[server] Started work on the internal file server (diff)
downloadtutor-module-1bc83891c68ee269727e81a13cc70da698bcc7a7.tar.gz
tutor-module-1bc83891c68ee269727e81a13cc70da698bcc7a7.tar.xz
tutor-module-1bc83891c68ee269727e81a13cc70da698bcc7a7.zip
[server] Compiling again, still lots of stubs
Diffstat (limited to 'dozentenmodulserver/src/main/java/fileserv')
-rw-r--r--dozentenmodulserver/src/main/java/fileserv/ActiveUpload.java54
-rw-r--r--dozentenmodulserver/src/main/java/fileserv/ChunkList.java78
-rw-r--r--dozentenmodulserver/src/main/java/fileserv/FileChunk.java28
-rw-r--r--dozentenmodulserver/src/main/java/fileserv/FileServer.java64
4 files changed, 208 insertions, 16 deletions
diff --git a/dozentenmodulserver/src/main/java/fileserv/ActiveUpload.java b/dozentenmodulserver/src/main/java/fileserv/ActiveUpload.java
index cf73a413..256f8b8d 100644
--- a/dozentenmodulserver/src/main/java/fileserv/ActiveUpload.java
+++ b/dozentenmodulserver/src/main/java/fileserv/ActiveUpload.java
@@ -1,10 +1,11 @@
package fileserv;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.log4j.Logger;
@@ -12,6 +13,7 @@ import org.openslx.filetransfer.DataReceivedCallback;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.FileRange;
import org.openslx.filetransfer.WantRangeCallback;
+import org.openslx.imagemaster.thrift.iface.UserInfo;
public class ActiveUpload {
private static final Logger LOGGER = Logger.getLogger(ActiveUpload.class);
@@ -21,19 +23,28 @@ public class ActiveUpload {
*/
private Downloader download = null;
- private final String destinationFile;
+ private final File destinationFile;
private final RandomAccessFile outFile;
- private ConcurrentLinkedQueue<FileChunk> chunks = new ConcurrentLinkedQueue<>();
+ private final ChunkList chunks;
- // TODO: Hashlist for verification
+ private final long fileSize;
- public ActiveUpload(String destinationFile, long fileSize, List<byte[]> sha1Sums)
+ /**
+ * User owning this uploaded file.
+ */
+ private final UserInfo owner;
+
+ // TODO: Use HashList for verification
+
+ public ActiveUpload(UserInfo owner, File destinationFile, long fileSize, List<ByteBuffer> sha1Sums)
throws FileNotFoundException {
this.destinationFile = destinationFile;
- outFile = new RandomAccessFile(destinationFile, "rw");
- FileChunk.createChunkList(chunks, fileSize, sha1Sums);
+ this.outFile = new RandomAccessFile(destinationFile, "rw");
+ this.chunks = new ChunkList(fileSize, sha1Sums);
+ this.owner = owner;
+ this.fileSize = fileSize;
}
/**
@@ -45,7 +56,7 @@ public class ActiveUpload {
* discarded
*/
public synchronized boolean addConnection(Downloader connection, ThreadPoolExecutor pool) {
- if (download != null)
+ if (download != null || chunks.isComplete())
return false;
download = connection;
pool.execute(new Runnable() {
@@ -55,7 +66,7 @@ public class ActiveUpload {
if (!download.download(cbh, cbh) && cbh.currentChunk != null) {
// If the download failed and we have a current chunk, put it back into
// the queue, so it will be handled again later...
- chunks.add(cbh.currentChunk);
+ chunks.markFailed(cbh.currentChunk);
}
}
});
@@ -86,6 +97,27 @@ public class ActiveUpload {
}
/**
+ * Get user owning this upload. Can be null in special cases.
+ *
+ * @return instance of UserInfo for the according user.
+ */
+ public UserInfo getOwner() {
+ return this.owner;
+ }
+
+ public boolean isComplete() {
+ return chunks.isComplete() && destinationFile.length() == this.fileSize;
+ }
+
+ public File getDestinationFile() {
+ return this.destinationFile;
+ }
+
+ public long getSize() {
+ return this.fileSize;
+ }
+
+ /**
* Callback class for an instance of the Downloader, which supplies
* the Downloader with wanted file ranges, and handles incoming data.
*/
@@ -111,11 +143,13 @@ public class ActiveUpload {
// This needs to be async (own thread) so will be a little complicated
}
// Get next missing chunk
- currentChunk = chunks.poll();
+ currentChunk = chunks.getMissing();
if (currentChunk == null)
return null; // No more chunks, returning null tells the Downloader we're done.
return currentChunk.range;
}
}
+ // TODO: Clean up old stale uploads
+
}
diff --git a/dozentenmodulserver/src/main/java/fileserv/ChunkList.java b/dozentenmodulserver/src/main/java/fileserv/ChunkList.java
new file mode 100644
index 00000000..95b3e1fa
--- /dev/null
+++ b/dozentenmodulserver/src/main/java/fileserv/ChunkList.java
@@ -0,0 +1,78 @@
+package fileserv;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+
+public class ChunkList {
+
+ private static final Logger LOGGER = Logger.getLogger(ChunkList.class);
+
+ /**
+ * Chunks that are missing from the file
+ */
+ private final List<FileChunk> missingChunks = new LinkedList<>();
+
+ /**
+ * Chunks that are currently being uploaded or hash-checked
+ */
+ private final List<FileChunk> pendingChunks = new LinkedList<>();
+
+ // Do we need to keep valid chunks, or chunks that failed too many times?
+
+ public ChunkList(long fileSize, List<ByteBuffer> sha1Sums) {
+ FileChunk.createChunkList(missingChunks, fileSize, sha1Sums);
+ }
+
+ /**
+ * Get a missing chunk, marking it pending.
+ *
+ * @return chunk marked as missing
+ */
+ public synchronized FileChunk getMissing() {
+ if (missingChunks.isEmpty())
+ return null;
+ FileChunk c = missingChunks.remove(0);
+ pendingChunks.add(c);
+ return c;
+ }
+
+ /**
+ * Mark a chunk currently transferring as successfully transfered.
+ *
+ * @param c The chunk in question
+ */
+ public synchronized void markSuccessful(FileChunk c) {
+ if (!pendingChunks.remove(c)) {
+ LOGGER.warn("Inconsistent state: markTransferred called for Chunk " + c.toString()
+ + ", but chunk is not marked as currently transferring!");
+ return;
+ }
+ }
+
+ /**
+ * Mark a chunk currently transferring or being hash checked as failed
+ * transfer. This increases its fail count and re-adds it to the list of
+ * missing chunks.
+ *
+ * @param c The chunk in question
+ * @return Number of times transfer of this chunk failed
+ */
+ public synchronized int markFailed(FileChunk c) {
+ if (!pendingChunks.remove(c)) {
+ LOGGER.warn("Inconsistent state: markTransferred called for Chunk " + c.toString()
+ + ", but chunk is not marked as currently transferring!");
+ return -1;
+ }
+ // Add as first element so it will be re-transmitted immediately
+ missingChunks.add(0, c);
+ return c.incFailed();
+ }
+
+ public synchronized boolean isComplete() {
+ return missingChunks.isEmpty() && pendingChunks.isEmpty();
+ }
+
+}
diff --git a/dozentenmodulserver/src/main/java/fileserv/FileChunk.java b/dozentenmodulserver/src/main/java/fileserv/FileChunk.java
index 2fee6378..1a95d27c 100644
--- a/dozentenmodulserver/src/main/java/fileserv/FileChunk.java
+++ b/dozentenmodulserver/src/main/java/fileserv/FileChunk.java
@@ -1,39 +1,55 @@
package fileserv;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import org.openslx.filetransfer.FileRange;
public class FileChunk {
- public static final int CHUNK_SIZE = 16 * 1024 * 1024;
+
+ public static final int CHUNK_SIZE_MIB = 16;
+ public static final int CHUNK_SIZE = CHUNK_SIZE_MIB * (1024 * 1024);
public final FileRange range;
public final byte[] sha1sum;
+ private int failCount = 0;
public FileChunk(long startOffset, long endOffset, byte[] sha1sum) {
this.range = new FileRange(startOffset, endOffset);
this.sha1sum = sha1sum;
}
+ /**
+ * Signal that transferring this chunk seems to have failed (checksum
+ * mismatch).
+ *
+ * @return Number of times the transfer failed now
+ */
+ public synchronized int incFailed() {
+ return ++failCount;
+ }
+
+ //
+
public static int fileSizeToChunkCount(long fileSize) {
return (int) ((fileSize + CHUNK_SIZE - 1) / CHUNK_SIZE);
}
- public static void createChunkList(Collection<FileChunk> list, long fileSize, List<byte[]> sha1sums) {
+ public static void createChunkList(Collection<FileChunk> list, long fileSize, List<ByteBuffer> sha1Sums) {
if (fileSize < 0)
throw new IllegalArgumentException("fileSize cannot be negative");
long chunkCount = fileSizeToChunkCount(fileSize);
- if (sha1sums != null) {
- if (sha1sums.size() != chunkCount)
+ if (sha1Sums != null) {
+ if (sha1Sums.size() != chunkCount)
throw new IllegalArgumentException(
"Passed a sha1sum list, but hash count in list doesn't match expected chunk count");
long offset = 0;
- for (byte[] sha1sum : sha1sums) { // Do this as we don't know how efficient List.get(index) is...
+ for (ByteBuffer sha1sum : sha1Sums) { // Do this as we don't know how efficient List.get(index) is...
long end = offset + CHUNK_SIZE;
if (end > fileSize)
end = fileSize;
- list.add(new FileChunk(offset, end, sha1sum));
+ list.add(new FileChunk(offset, end, sha1sum.array()));
offset = end;
}
return;
diff --git a/dozentenmodulserver/src/main/java/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/fileserv/FileServer.java
index 446b982e..8322e2e9 100644
--- a/dozentenmodulserver/src/main/java/fileserv/FileServer.java
+++ b/dozentenmodulserver/src/main/java/fileserv/FileServer.java
@@ -1,13 +1,24 @@
package fileserv;
+import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.IncomingEvent;
import org.openslx.filetransfer.Listener;
import org.openslx.filetransfer.Uploader;
+import org.openslx.imagemaster.thrift.iface.UserInfo;
+import org.openslx.sat.thrift.iface.TUploadRejectedException;
+
+import util.Constants;
+import util.Formatter;
public class FileServer implements IncomingEvent {
@@ -21,6 +32,15 @@ public class FileServer implements IncomingEvent {
*/
private Map<String, ActiveUpload> uploads = new ConcurrentHashMap<>();
+ private static final FileServer globalInstance = new FileServer();
+
+ private FileServer() {
+ }
+
+ public static FileServer instance() {
+ return globalInstance;
+ }
+
public boolean start() {
boolean ret = plainListener.start();
// TODO: Start SSL listener too
@@ -39,4 +59,48 @@ public class FileServer implements IncomingEvent {
}
+ /**
+ * Get an upload instance by given token.
+ *
+ * @param uploadToken
+ * @return
+ */
+ public ActiveUpload getUploadByToken(String uploadToken) {
+ return uploads.get(uploadToken);
+ }
+
+ public String createNewUserUpload(UserInfo owner, long fileSize, List<ByteBuffer> sha1Sums)
+ throws TUploadRejectedException, FileNotFoundException {
+ Iterator<ActiveUpload> it = uploads.values().iterator();
+ int activeUploads = 0;
+ while (it.hasNext()) {
+ ActiveUpload upload = it.next();
+ if (upload.isComplete()) {
+ // TODO: Check age (short timeout) and remove
+ continue;
+ } else {
+ // Check age (long timeout) and remove
+ }
+ activeUploads++;
+ }
+ if (activeUploads > Constants.MAX_UPLOADS)
+ throw new TUploadRejectedException("Server busy. Too many running uploads.");
+ File destinationFile = null;
+ do {
+ destinationFile = Formatter.getTempImageName();
+ } while (destinationFile.exists());
+ ActiveUpload upload = new ActiveUpload(owner, destinationFile, fileSize, sha1Sums);
+ String key = UUID.randomUUID().toString();
+ uploads.put(key, upload);
+ return key;
+ }
+
+ public int getPlainPort() {
+ return plainListener.getPort();
+ }
+
+ public int getSslPort() {
+ return 0; // TODO
+ }
+
}