summaryrefslogtreecommitdiffstats
path: root/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java
diff options
context:
space:
mode:
authorSimon Rettberg2015-07-23 17:44:59 +0200
committerSimon Rettberg2015-07-23 17:44:59 +0200
commit756fb6041a9077bd5d14ce5dd93f3584780a98af (patch)
tree76f49fb317b7d4ef46cc892783301584fb7b0237 /dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java
parent[client] Changed date selection in lecture create page, set text fields edita... (diff)
downloadtutor-module-756fb6041a9077bd5d14ce5dd93f3584780a98af.tar.gz
tutor-module-756fb6041a9077bd5d14ce5dd93f3584780a98af.tar.xz
tutor-module-756fb6041a9077bd5d14ce5dd93f3584780a98af.zip
[*] Adapt to ChunkList changes; [server] Support multiple connections per upload
Diffstat (limited to 'dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java')
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java119
1 files changed, 92 insertions, 27 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java
index 92c2b2d7..469fa8dc 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java
@@ -5,7 +5,9 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.security.NoSuchAlgorithmException;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -13,6 +15,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.openslx.bwlp.sat.database.mappers.DbImage;
import org.openslx.bwlp.sat.util.Configuration;
+import org.openslx.bwlp.sat.util.Constants;
import org.openslx.bwlp.sat.util.FileSystem;
import org.openslx.bwlp.sat.util.Formatter;
import org.openslx.bwlp.sat.util.Util;
@@ -27,14 +30,28 @@ import org.openslx.filetransfer.FileRange;
import org.openslx.filetransfer.WantRangeCallback;
import org.openslx.filetransfer.util.ChunkList;
import org.openslx.filetransfer.util.FileChunk;
+import org.openslx.filetransfer.util.HashChecker;
+import org.openslx.filetransfer.util.HashChecker.HashCheckCallback;
+import org.openslx.filetransfer.util.HashChecker.HashResult;
+
+public class ActiveUpload implements HashCheckCallback {
-public class ActiveUpload {
private static final Logger LOGGER = Logger.getLogger(ActiveUpload.class);
/**
+ * How many concurrent connections per upload
+ */
+ private static final int MAX_CONNECTIONS = Math.max(Constants.MAX_UPLOADS / 4, 1);
+
+ /**
+ * Self reference for inner classes.
+ */
+ private final ActiveUpload activeUpload = this;
+
+ /**
* This is an active upload, so on our end, we have a Downloader.
*/
- private Downloader download = null;
+ private List<Downloader> downloads = new ArrayList<>();
private final File destinationFile;
@@ -80,7 +97,22 @@ public class ActiveUpload {
*/
private final AtomicBoolean versionWrittenToDb = new AtomicBoolean();
- // TODO: Use HashList for verification
+ /**
+ * Whether file is (still) writable. Used for the file transfer callbacks.
+ */
+ private boolean fileWritable;
+
+ private static final HashChecker hashChecker;
+
+ static {
+ HashChecker hc;
+ try {
+ hc = new HashChecker("SHA1");
+ } catch (NoSuchAlgorithmException e) {
+ hc = null;
+ }
+ hashChecker = hc;
+ }
public ActiveUpload(String uploadId, UserInfo owner, ImageDetailsRead image, File destinationFile,
long fileSize, List<ByteBuffer> sha1Sums, byte[] machineDescription) throws FileNotFoundException {
@@ -122,23 +154,29 @@ public class ActiveUpload {
* @return true if the connection is accepted, false if it should be
* discarded
*/
- public synchronized boolean addConnection(Downloader connection, ThreadPoolExecutor pool) {
- if (download != null || chunks.isComplete() || state == TransferState.FINISHED
- || state == TransferState.ERROR)
+ public synchronized boolean addConnection(final Downloader connection, ThreadPoolExecutor pool) {
+ if (chunks.isComplete() || state == TransferState.FINISHED || state == TransferState.ERROR)
return false;
- download = connection;
+ synchronized (downloads) {
+ if (downloads.size() >= MAX_CONNECTIONS)
+ return false;
+ downloads.add(connection);
+ }
try {
pool.execute(new Runnable() {
@Override
public void run() {
CbHandler cbh = new CbHandler();
- if (!download.download(cbh, cbh)) {
+ if (!connection.download(cbh, cbh)) {
if (cbh.currentChunk != null) {
// If the download failed and we have a current chunk, put it back into
// the queue, so it will be handled again later...
chunks.markFailed(cbh.currentChunk);
}
- LOGGER.warn("Download of " + destinationFile.getAbsolutePath());
+ LOGGER.warn("Download of " + destinationFile.getAbsolutePath() + " failed");
+ }
+ if (chunks.isComplete()) {
+ finishUpload();
}
}
});
@@ -159,7 +197,7 @@ public class ActiveUpload {
* @param data
* @return
*/
- private boolean writeFileData(long fileOffset, int dataLength, byte[] data) {
+ private void writeFileData(long fileOffset, int dataLength, byte[] data) {
if (state != TransferState.WORKING)
throw new IllegalStateException("Cannot write to file if state != RUNNING");
synchronized (outFile) {
@@ -169,10 +207,9 @@ public class ActiveUpload {
} catch (IOException e) {
LOGGER.error("Cannot write to '" + destinationFile
+ "'. Disk full, network storage error, bad permissions, ...?", e);
- return false;
+ fileWritable = false;
}
}
- return true;
}
/**
@@ -230,8 +267,10 @@ public class ActiveUpload {
}
public synchronized void cancel() {
- if (download != null) {
- download.cancel();
+ synchronized (downloads) {
+ for (Downloader download : downloads) {
+ download.cancel();
+ }
}
if (state != TransferState.FINISHED) {
state = TransferState.ERROR;
@@ -267,31 +306,42 @@ public class ActiveUpload {
/**
* The current chunk being transfered.
*/
- public FileChunk currentChunk = null;
+ private FileChunk currentChunk = null;
+ private byte[] buffer = new byte[FileChunk.CHUNK_SIZE];
@Override
public boolean dataReceived(long fileOffset, int dataLength, byte[] data) {
- // TODO: Maybe cache in RAM and write full CHUNK_SIZE blocks at a time?
- // Would probably help with slower storage, especially if it's using
- // rotating disks and we're running multiple uploads.
- // Also we wouldn't have to re-read a block form disk for sha1 checking.
- return writeFileData(fileOffset, dataLength, data);
+ if (currentChunk == null)
+ throw new IllegalStateException("dataReceived without current chunk");
+ if (!currentChunk.range.contains(fileOffset, fileOffset + dataLength))
+ throw new IllegalStateException("dataReceived with file data out of range");
+ System.arraycopy(data, 0, buffer, (int) (fileOffset - currentChunk.range.startOffset), dataLength);
+ return fileWritable;
}
@Override
public FileRange get() {
if (currentChunk != null) {
- // TODO: A chunk was requested before, check hash and requeue if not matching
- // This needs to be async (own thread) so will be a little complicated
- // For now, we just mark it as complete
- chunks.markSuccessful(currentChunk);
- LOGGER.info("There was a previous chunk!");
+ if (currentChunk.hasSha1Sum() && hashChecker != null) {
+ try {
+ hashChecker.queue(currentChunk, buffer, activeUpload);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ } else {
+ chunks.markSuccessful(currentChunk);
+ }
}
// Get next missing chunk
- currentChunk = chunks.getMissing();
+ try {
+ currentChunk = chunks.getMissing();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
LOGGER.info("Next missing chunk: " + currentChunk);
if (currentChunk == null) {
- finishUpload();
return null; // No more chunks, returning null tells the Downloader we're done.
}
return currentChunk.range;
@@ -306,6 +356,21 @@ public class ActiveUpload {
return uploadId;
}
+ @Override
+ public void hashCheckDone(HashResult result, byte[] data, FileChunk chunk) {
+ switch (result) {
+ case FAILURE:
+ LOGGER.warn("Hash check of chunk " + chunk.toString() + " failed. Assuming valid.");
+ // Fall through
+ case VALID:
+ writeFileData(chunk.range.startOffset, chunk.range.getLength(), data);
+ chunks.markSuccessful(chunk);
+ break;
+ case INVALID:
+ chunks.markFailed(chunk);
+ }
+ }
+
// TODO: Clean up old stale uploads
}