diff options
author | Simon Rettberg | 2015-07-23 17:44:59 +0200 |
---|---|---|
committer | Simon Rettberg | 2015-07-23 17:44:59 +0200 |
commit | 756fb6041a9077bd5d14ce5dd93f3584780a98af (patch) | |
tree | 76f49fb317b7d4ef46cc892783301584fb7b0237 /dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java | |
parent | [client] Changed date selection in lecture create page, set text fields edita... (diff) | |
download | tutor-module-756fb6041a9077bd5d14ce5dd93f3584780a98af.tar.gz tutor-module-756fb6041a9077bd5d14ce5dd93f3584780a98af.tar.xz tutor-module-756fb6041a9077bd5d14ce5dd93f3584780a98af.zip |
[*] Adapt to ChunkList changes; [server] Support multiple connections per upload
Diffstat (limited to 'dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java')
-rw-r--r-- | dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java | 119 |
1 files changed, 92 insertions, 27 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java index 92c2b2d7..469fa8dc 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java @@ -5,7 +5,9 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.security.NoSuchAlgorithmException; import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; @@ -13,6 +15,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; import org.openslx.bwlp.sat.database.mappers.DbImage; import org.openslx.bwlp.sat.util.Configuration; +import org.openslx.bwlp.sat.util.Constants; import org.openslx.bwlp.sat.util.FileSystem; import org.openslx.bwlp.sat.util.Formatter; import org.openslx.bwlp.sat.util.Util; @@ -27,14 +30,28 @@ import org.openslx.filetransfer.FileRange; import org.openslx.filetransfer.WantRangeCallback; import org.openslx.filetransfer.util.ChunkList; import org.openslx.filetransfer.util.FileChunk; +import org.openslx.filetransfer.util.HashChecker; +import org.openslx.filetransfer.util.HashChecker.HashCheckCallback; +import org.openslx.filetransfer.util.HashChecker.HashResult; + +public class ActiveUpload implements HashCheckCallback { -public class ActiveUpload { private static final Logger LOGGER = Logger.getLogger(ActiveUpload.class); /** + * How many concurrent connections per upload + */ + private static final int MAX_CONNECTIONS = Math.max(Constants.MAX_UPLOADS / 4, 1); + + /** + * Self reference for inner classes. + */ + private final ActiveUpload activeUpload = this; + + /** * This is an active upload, so on our end, we have a Downloader. */ - private Downloader download = null; + private List<Downloader> downloads = new ArrayList<>(); private final File destinationFile; @@ -80,7 +97,22 @@ public class ActiveUpload { */ private final AtomicBoolean versionWrittenToDb = new AtomicBoolean(); - // TODO: Use HashList for verification + /** + * Whether file is (still) writable. Used for the file transfer callbacks. + */ + private boolean fileWritable; + + private static final HashChecker hashChecker; + + static { + HashChecker hc; + try { + hc = new HashChecker("SHA1"); + } catch (NoSuchAlgorithmException e) { + hc = null; + } + hashChecker = hc; + } public ActiveUpload(String uploadId, UserInfo owner, ImageDetailsRead image, File destinationFile, long fileSize, List<ByteBuffer> sha1Sums, byte[] machineDescription) throws FileNotFoundException { @@ -122,23 +154,29 @@ public class ActiveUpload { * @return true if the connection is accepted, false if it should be * discarded */ - public synchronized boolean addConnection(Downloader connection, ThreadPoolExecutor pool) { - if (download != null || chunks.isComplete() || state == TransferState.FINISHED - || state == TransferState.ERROR) + public synchronized boolean addConnection(final Downloader connection, ThreadPoolExecutor pool) { + if (chunks.isComplete() || state == TransferState.FINISHED || state == TransferState.ERROR) return false; - download = connection; + synchronized (downloads) { + if (downloads.size() >= MAX_CONNECTIONS) + return false; + downloads.add(connection); + } try { pool.execute(new Runnable() { @Override public void run() { CbHandler cbh = new CbHandler(); - if (!download.download(cbh, cbh)) { + if (!connection.download(cbh, cbh)) { if (cbh.currentChunk != null) { // If the download failed and we have a current chunk, put it back into // the queue, so it will be handled again later... chunks.markFailed(cbh.currentChunk); } - LOGGER.warn("Download of " + destinationFile.getAbsolutePath()); + LOGGER.warn("Download of " + destinationFile.getAbsolutePath() + " failed"); + } + if (chunks.isComplete()) { + finishUpload(); } } }); @@ -159,7 +197,7 @@ public class ActiveUpload { * @param data * @return */ - private boolean writeFileData(long fileOffset, int dataLength, byte[] data) { + private void writeFileData(long fileOffset, int dataLength, byte[] data) { if (state != TransferState.WORKING) throw new IllegalStateException("Cannot write to file if state != RUNNING"); synchronized (outFile) { @@ -169,10 +207,9 @@ public class ActiveUpload { } catch (IOException e) { LOGGER.error("Cannot write to '" + destinationFile + "'. Disk full, network storage error, bad permissions, ...?", e); - return false; + fileWritable = false; } } - return true; } /** @@ -230,8 +267,10 @@ public class ActiveUpload { } public synchronized void cancel() { - if (download != null) { - download.cancel(); + synchronized (downloads) { + for (Downloader download : downloads) { + download.cancel(); + } } if (state != TransferState.FINISHED) { state = TransferState.ERROR; @@ -267,31 +306,42 @@ public class ActiveUpload { /** * The current chunk being transfered. */ - public FileChunk currentChunk = null; + private FileChunk currentChunk = null; + private byte[] buffer = new byte[FileChunk.CHUNK_SIZE]; @Override public boolean dataReceived(long fileOffset, int dataLength, byte[] data) { - // TODO: Maybe cache in RAM and write full CHUNK_SIZE blocks at a time? - // Would probably help with slower storage, especially if it's using - // rotating disks and we're running multiple uploads. - // Also we wouldn't have to re-read a block form disk for sha1 checking. - return writeFileData(fileOffset, dataLength, data); + if (currentChunk == null) + throw new IllegalStateException("dataReceived without current chunk"); + if (!currentChunk.range.contains(fileOffset, fileOffset + dataLength)) + throw new IllegalStateException("dataReceived with file data out of range"); + System.arraycopy(data, 0, buffer, (int) (fileOffset - currentChunk.range.startOffset), dataLength); + return fileWritable; } @Override public FileRange get() { if (currentChunk != null) { - // TODO: A chunk was requested before, check hash and requeue if not matching - // This needs to be async (own thread) so will be a little complicated - // For now, we just mark it as complete - chunks.markSuccessful(currentChunk); - LOGGER.info("There was a previous chunk!"); + if (currentChunk.hasSha1Sum() && hashChecker != null) { + try { + hashChecker.queue(currentChunk, buffer, activeUpload); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + } else { + chunks.markSuccessful(currentChunk); + } } // Get next missing chunk - currentChunk = chunks.getMissing(); + try { + currentChunk = chunks.getMissing(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } LOGGER.info("Next missing chunk: " + currentChunk); if (currentChunk == null) { - finishUpload(); return null; // No more chunks, returning null tells the Downloader we're done. } return currentChunk.range; @@ -306,6 +356,21 @@ public class ActiveUpload { return uploadId; } + @Override + public void hashCheckDone(HashResult result, byte[] data, FileChunk chunk) { + switch (result) { + case FAILURE: + LOGGER.warn("Hash check of chunk " + chunk.toString() + " failed. Assuming valid."); + // Fall through + case VALID: + writeFileData(chunk.range.startOffset, chunk.range.getLength(), data); + chunks.markSuccessful(chunk); + break; + case INVALID: + chunks.markFailed(chunk); + } + } + // TODO: Clean up old stale uploads } |