diff options
author | Simon Rettberg | 2015-07-21 15:44:24 +0200 |
---|---|---|
committer | Simon Rettberg | 2015-07-21 15:44:24 +0200 |
commit | 75fb574803e7b290aaae7a9d3a379bcc2de9a614 (patch) | |
tree | e973fe12d81b7e4ee9b2a8ddcecb8ec14e0b7acc | |
parent | Merge branch 'v1.1' of git.openslx.org:openslx-ng/tutor-module into v1.1 (diff) | |
download | tutor-module-75fb574803e7b290aaae7a9d3a379bcc2de9a614.tar.gz tutor-module-75fb574803e7b290aaae7a9d3a379bcc2de9a614.tar.xz tutor-module-75fb574803e7b290aaae7a9d3a379bcc2de9a614.zip |
[client] Continue refactoring Upload and Download Task
9 files changed, 144 insertions, 281 deletions
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/DownloadTask.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/DownloadTask.java index 2acd40e4..db75c518 100644 --- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/DownloadTask.java +++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/DownloadTask.java @@ -1,143 +1,184 @@ package org.openslx.dozmod.filetransfer; import java.io.File; -import java.io.IOException; +import java.io.FileNotFoundException; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; +import org.openslx.bwlp.thrift.iface.TransferState; import org.openslx.dozmod.Config; import org.openslx.filetransfer.DataReceivedCallback; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.FileRange; +import org.openslx.filetransfer.Transfer; import org.openslx.filetransfer.WantRangeCallback; +import org.openslx.filetransfer.util.ChunkList; +import org.openslx.filetransfer.util.FileChunk; +import org.openslx.util.Util; /** * Execute file download in a background thread and update the progress. - * - * @author www.codejava.net - * */ -public class DownloadTask implements Runnable { +public class DownloadTask extends TransferTask { /** * Logger instance for this class. */ private final static Logger LOGGER = Logger.getLogger(DownloadTask.class); - private static final double UPDATE_INTERVAL_SECONDS = 0.6; - private static final double UPDATE_INTERVAL_MS = UPDATE_INTERVAL_SECONDS * 1000; - private static final double BYTES_PER_MIB = 1024 * 1024; - private static final long CHUNK_SIZE = 16 * 1024 * 1024; - private final String host; private final int port; private final String downloadToken; - private final String saveDir; - private final long fileSize; - - public DownloadTask(String host, int port, String downloadToken, String saveDir, long fileSize) { + private final RandomAccessFile fileHandle; + private final ChunkList chunks; + private boolean fileWritable = true; + private AtomicInteger consecutiveInitFails = new AtomicInteger(); + + public DownloadTask(String host, int port, String downloadToken, File destinationFile, long fileSize, + List<ByteBuffer> sha1Sums) throws FileNotFoundException { + super(destinationFile, fileSize); this.host = host; this.port = port; this.downloadToken = downloadToken; - this.saveDir = saveDir; - this.fileSize = fileSize; + this.fileHandle = new RandomAccessFile(destinationFile, "rw"); + this.chunks = new ChunkList(fileSize, sha1Sums); } - class Callbacks implements WantRangeCallback, DataReceivedCallback { - // initialize the counters needed for speed calculations - private long currentRequestedOffset = -1; - private long totalBytesRead = 0; + private class DownloadHandler implements WantRangeCallback, DataReceivedCallback { + private FileChunk current = null; + private byte[] buffer = null; + // progress counter + private long currentSpeed = 0; + private long currentBytes = 0; private long lastUpdate = 0; private long lastBytes = 0; - private long currentBytes = 0; - private final RandomAccessFile file; - - public Callbacks(RandomAccessFile file) { - this.file = file; - } @Override public FileRange get() { - if (currentRequestedOffset == -1) - currentRequestedOffset = 0; - else - currentRequestedOffset += CHUNK_SIZE; - if (currentRequestedOffset >= fileSize) - return null; - long end = currentRequestedOffset + CHUNK_SIZE; - if (end > fileSize) - end = fileSize; - return new FileRange(currentRequestedOffset, end); + handleCompletedChunk(current, buffer); + consecutiveInitFails.lazySet(0); + current = chunks.getMissing(); + if (current != null) { + buffer = new byte[current.range.getLength()]; + } + return current.range; } @Override public boolean dataReceived(final long fileOffset, final int dataLength, final byte[] data) { - try { - file.seek(fileOffset); - file.write(data, 0, dataLength); - } catch (Exception e) { - LOGGER.error("Could not write to file at offset " + fileOffset, e); - return false; - } + if (current == null) + throw new IllegalStateException("dataReceived without current chunk"); + if (!current.range.contains(fileOffset, fileOffset + dataLength)) + throw new IllegalStateException("dataReceived with file data out of range"); + System.arraycopy(data, 0, buffer, (int) (fileOffset - current.range.startOffset), dataLength); currentBytes += dataLength; - totalBytesRead += dataLength; final long now = System.currentTimeMillis(); if (lastUpdate + UPDATE_INTERVAL_MS < now) { - final int percentCompleted = (int) ((totalBytesRead * 100) / fileSize); - lastBytes = (lastBytes * 2 + currentBytes) / 3; - final double speed = lastBytes / UPDATE_INTERVAL_SECONDS; - LOGGER.debug(percentCompleted + "% complete (speed: " + speed/BYTES_PER_MIB + ", total: " + totalBytesRead + ")"); - lastUpdate = now; + synchronized (this) { + // Calculate updated speed + lastBytes = (lastBytes * 2 + currentBytes) / 3; + currentSpeed = (1000 * lastBytes) / (now - lastUpdate); + lastUpdate = now; + } + // Reset counters currentBytes = 0; } - return true; + return fileWritable; } - } + private long getCurrentSpeed() { + synchronized (this) { + return currentSpeed; + } + } - /** - * Executed in background thread - */ - @Override - public void run() { - - boolean ret = false; - // show filesize in the GUI + } - Downloader download = null; - RandomAccessFile file = null; + private void handleCompletedChunk(FileChunk chunk, byte[] buffer) { + if (chunk == null) + return; + // TODO: Hash check, async + chunks.markSuccessful(chunk); try { - download = new Downloader(host, port, Config.TRANSFER_TIMEOUT, null, downloadToken); // TODO: SSL + synchronized (fileHandle) { + fileHandle.seek(chunk.range.startOffset); + fileHandle.write(buffer, 0, chunk.range.getLength()); + } + } catch (Exception e) { + LOGGER.error("Could not write to file at offset " + chunk.range.startOffset, e); + fileWritable = false; + } + } + + private class DownloadThread extends TransferThread { + private Downloader downloader = null; + private DownloadHandler cb = new DownloadHandler(); + + @Override + public void run() { try { - file = new RandomAccessFile(new File(saveDir), "rw"); - } catch (Exception e2) { - LOGGER.error("Could not open for writing: ", e2); + downloader = new Downloader(host, port, Config.TRANSFER_TIMEOUT, null, downloadToken); + } catch (Exception e) { + LOGGER.warn("Could not initialize new uploader", e); + connectFailed(this); return; + } // TODO: SSL + connectSucceeded(this); + + boolean ret = downloader.download(cb, cb); + if (!ret) { + consecutiveInitFails.incrementAndGet(); + } + if (cb.current != null) { + chunks.markFailed(cb.current); } + transferEnded(this, ret); + } - Callbacks cb = new Callbacks(file); + @Override + protected Transfer getTransfer() { + return downloader; + } - ret = download.download(cb, cb); - } catch (IOException e) { - LOGGER.error("Could not open connection: ", e); - return; - } finally { - if (file != null) { - try { - file.close(); - } catch (Exception e) { - } - } - if (download != null) - download.cancel(); + @Override + public long getCurrentSpeed() { + return cb.getCurrentSpeed(); } - // if the download succeeded, set the progress to 100% manually again here to make - // sure the GUI knows about it. - if (ret) { - LOGGER.debug("successul download."); + } + + @Override + protected void cleanup() { + Util.safeClose(fileHandle); + } + + @Override + protected TransferEvent getTransferEvent() { + final TransferState state; + final byte[] progress = chunks.getStatusArray().array(); + final String error; + if (consecutiveInitFails.get() > 20) { + state = TransferState.ERROR; + error = "Cannot talk to server after 20 tries..."; + } else { + state = TransferState.WORKING; + error = null; + } + long speed = 0; + long timeRemaining = 0; + synchronized (transfers) { + for (TransferThread thread : transfers) { + speed += thread.getCurrentSpeed(); + } } + return new TransferEvent(state, progress, speed, timeRemaining, error); + } - return; + @Override + protected TransferThread createNewThread() { + return new DownloadThread(); } } diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/TransferTask.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/TransferTask.java index ee6aeff5..3bb8fa94 100644 --- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/TransferTask.java +++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/TransferTask.java @@ -60,11 +60,18 @@ public abstract class TransferTask implements Runnable { Util.joinThread(t); } for (TransferThread t : connectingTransfers) { - t.getTransfer().cancel(); + if (t.getTransfer() != null) { + t.getTransfer().cancel(); + } t.interrupt(); Util.joinThread(t); } isRunning = false; + cleanup(); + } + + protected void cleanup() { + // By default, this does nothing } public boolean isRunning() { @@ -111,7 +118,9 @@ public abstract class TransferTask implements Runnable { t.getTransfer().cancel(); } for (TransferThread t : connectingTransfers) { - t.getTransfer().cancel(); + if (t.getTransfer() != null) { + t.getTransfer().cancel(); + } } isCancelled = true; } @@ -131,6 +140,7 @@ public abstract class TransferTask implements Runnable { if (transfers.size() + connectingTransfers.size() < minConnectionCount) { TransferThread thread = createNewThread(); if (thread != null) { + thread.setDaemon(true); thread.start(); connectingTransfers.add(thread); } diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java index 779344a3..b0655115 100644 --- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java +++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java @@ -48,10 +48,10 @@ public class UploadTask extends TransferTask { private class UploadThread extends TransferThread { // private long totalBytesRead = 0; private long currentSpeed = 0; + private Uploader uploader = null; @Override public void run() { - final Uploader uploader; try { uploader = new Uploader(host, port, Config.TRANSFER_TIMEOUT, null, uploadToken); } catch (Exception e) { @@ -102,8 +102,7 @@ public class UploadTask extends TransferTask { @Override protected Transfer getTransfer() { - // TODO Auto-generated method stub - return null; + return uploader; } } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java index 6894c85f..adf0f26c 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java @@ -11,7 +11,6 @@ import org.openslx.bwlp.sat.database.Database; import org.openslx.bwlp.sat.database.MysqlConnection; import org.openslx.bwlp.sat.database.MysqlStatement; import org.openslx.bwlp.sat.database.Paginator; -import org.openslx.bwlp.sat.fileserv.ChunkList; import org.openslx.bwlp.thrift.iface.ImageBaseWrite; import org.openslx.bwlp.thrift.iface.ImageDetailsRead; import org.openslx.bwlp.thrift.iface.ImagePermissions; @@ -21,6 +20,7 @@ import org.openslx.bwlp.thrift.iface.ImageVersionWrite; import org.openslx.bwlp.thrift.iface.ShareMode; import org.openslx.bwlp.thrift.iface.TNotFoundException; import org.openslx.bwlp.thrift.iface.UserInfo; +import org.openslx.filetransfer.util.ChunkList; public class DbImage { diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java index da15656d..08d0d30f 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java @@ -24,6 +24,8 @@ import org.openslx.filetransfer.DataReceivedCallback; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.FileRange; import org.openslx.filetransfer.WantRangeCallback; +import org.openslx.filetransfer.util.ChunkList; +import org.openslx.filetransfer.util.FileChunk; public class ActiveUpload { private static final Logger LOGGER = Logger.getLogger(ActiveUpload.class); diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ChunkList.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ChunkList.java deleted file mode 100644 index 385a6484..00000000 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ChunkList.java +++ /dev/null @@ -1,113 +0,0 @@ -package org.openslx.bwlp.sat.fileserv; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; - -import org.apache.log4j.Logger; - -public class ChunkList { - - private static final Logger LOGGER = Logger.getLogger(ChunkList.class); - - /** - * Chunks that are missing from the file - */ - private final List<FileChunk> missingChunks = new LinkedList<>(); - - /** - * Chunks that are currently being uploaded or hash-checked - */ - private final List<FileChunk> pendingChunks = new LinkedList<>(); - - private final List<FileChunk> completeChunks = new ArrayList<>(100); - - // 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying - private final ByteBuffer statusArray; - - // Do we need to keep valid chunks, or chunks that failed too many times? - - public ChunkList(long fileSize, List<ByteBuffer> sha1Sums) { - FileChunk.createChunkList(missingChunks, fileSize, sha1Sums); - statusArray = ByteBuffer.allocate(missingChunks.size()); - } - - /** - * Get a missing chunk, marking it pending. - * - * @return chunk marked as missing - */ - public synchronized FileChunk getMissing() { - if (missingChunks.isEmpty()) - return null; - FileChunk c = missingChunks.remove(0); - pendingChunks.add(c); - return c; - } - - /** - * Get the block status as byte representation. - */ - public synchronized ByteBuffer getStatusArray() { - byte[] array = statusArray.array(); - //Arrays.fill(array, (byte)0); - for (FileChunk c : missingChunks) { - array[c.getChunkIndex()] = 1; - } - for (FileChunk c : pendingChunks) { - array[c.getChunkIndex()] = 2; - } - for (FileChunk c : completeChunks) { - array[c.getChunkIndex()] = 0; - } - return statusArray; - } - - /** - * Get completed chunks as list - * - * @return List containing all successfully transfered chunks - */ - public synchronized List<FileChunk> getCompleted() { - return new ArrayList<>(completeChunks); - } - - /** - * Mark a chunk currently transferring as successfully transfered. - * - * @param c The chunk in question - */ - public synchronized void markSuccessful(FileChunk c) { - if (!pendingChunks.remove(c)) { - LOGGER.warn("Inconsistent state: markTransferred called for Chunk " + c.toString() - + ", but chunk is not marked as currently transferring!"); - return; - } - completeChunks.add(c); - } - - /** - * Mark a chunk currently transferring or being hash checked as failed - * transfer. This increases its fail count and re-adds it to the list of - * missing chunks. - * - * @param c The chunk in question - * @return Number of times transfer of this chunk failed - */ - public synchronized int markFailed(FileChunk c) { - if (!pendingChunks.remove(c)) { - LOGGER.warn("Inconsistent state: markTransferred called for Chunk " + c.toString() - + ", but chunk is not marked as currently transferring!"); - return -1; - } - // Add as first element so it will be re-transmitted immediately - missingChunks.add(0, c); - return c.incFailed(); - } - - public synchronized boolean isComplete() { - return missingChunks.isEmpty() && pendingChunks.isEmpty(); - } - -} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileChunk.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileChunk.java deleted file mode 100644 index b322e65d..00000000 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileChunk.java +++ /dev/null @@ -1,77 +0,0 @@ -package org.openslx.bwlp.sat.fileserv; - -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.List; - -import org.openslx.filetransfer.FileRange; - -public class FileChunk { - - public static final int CHUNK_SIZE_MIB = 16; - public static final int CHUNK_SIZE = CHUNK_SIZE_MIB * (1024 * 1024); - - public final FileRange range; - public final byte[] sha1sum; - private int failCount = 0; - - public FileChunk(long startOffset, long endOffset, byte[] sha1sum) { - this.range = new FileRange(startOffset, endOffset); - this.sha1sum = sha1sum; - } - - /** - * Signal that transferring this chunk seems to have failed (checksum - * mismatch). - * - * @return Number of times the transfer failed now - */ - public synchronized int incFailed() { - return ++failCount; - } - - public int getChunkIndex() { - return (int)(range.startOffset / CHUNK_SIZE); - } - - @Override - public String toString() { - return "[Chunk " + getChunkIndex() + " (" + range.startOffset + "-" + range.endOffset + "), fails: " + failCount + "]"; - } - - // - - public static int fileSizeToChunkCount(long fileSize) { - return (int) ((fileSize + CHUNK_SIZE - 1) / CHUNK_SIZE); - } - - public static void createChunkList(Collection<FileChunk> list, long fileSize, List<ByteBuffer> sha1Sums) { - if (fileSize < 0) - throw new IllegalArgumentException("fileSize cannot be negative"); - if (!list.isEmpty()) - throw new IllegalArgumentException("Passed list is not empty"); - long chunkCount = fileSizeToChunkCount(fileSize); - if (sha1Sums != null) { - if (sha1Sums.size() != chunkCount) - throw new IllegalArgumentException( - "Passed a sha1sum list, but hash count in list doesn't match expected chunk count"); - long offset = 0; - for (ByteBuffer sha1sum : sha1Sums) { // Do this as we don't know how efficient List.get(index) is... - long end = offset + CHUNK_SIZE; - if (end > fileSize) - end = fileSize; - list.add(new FileChunk(offset, end, sha1sum.array())); - offset = end; - } - return; - } - long offset = 0; - while (offset < fileSize) { // ...otherwise we could share this code - long end = offset + CHUNK_SIZE; - if (end > fileSize) - end = fileSize; - list.add(new FileChunk(offset, end, null)); - offset = end; - } - } -} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java index 93089b5a..3abc5f98 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java @@ -68,6 +68,7 @@ public class FileServer implements IncomingEvent { ActiveUpload upload = uploads.get(token); if (upload == null) { LOGGER.warn("Unknown token " + token); + downloader.cancel(); return; } upload.addConnection(downloader, transferPool); diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java index 8216fd86..ad5ea0e2 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java @@ -1,6 +1,6 @@ package org.openslx.bwlp.sat.util; -import org.openslx.bwlp.sat.fileserv.FileChunk; +import org.openslx.filetransfer.util.FileChunk; public class Constants { public static final String INCOMPLETE_UPLOAD_SUFFIX = ".part"; |