summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2015-07-21 15:44:24 +0200
committerSimon Rettberg2015-07-21 15:44:24 +0200
commit75fb574803e7b290aaae7a9d3a379bcc2de9a614 (patch)
treee973fe12d81b7e4ee9b2a8ddcecb8ec14e0b7acc
parentMerge branch 'v1.1' of git.openslx.org:openslx-ng/tutor-module into v1.1 (diff)
downloadtutor-module-75fb574803e7b290aaae7a9d3a379bcc2de9a614.tar.gz
tutor-module-75fb574803e7b290aaae7a9d3a379bcc2de9a614.tar.xz
tutor-module-75fb574803e7b290aaae7a9d3a379bcc2de9a614.zip
[client] Continue refactoring Upload and Download Task
-rw-r--r--dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/DownloadTask.java209
-rw-r--r--dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/TransferTask.java14
-rw-r--r--dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java5
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java2
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java2
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ChunkList.java113
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileChunk.java77
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java1
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java2
9 files changed, 144 insertions, 281 deletions
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/DownloadTask.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/DownloadTask.java
index 2acd40e4..db75c518 100644
--- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/DownloadTask.java
+++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/DownloadTask.java
@@ -1,143 +1,184 @@
package org.openslx.dozmod.filetransfer;
import java.io.File;
-import java.io.IOException;
+import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
+import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.dozmod.Config;
import org.openslx.filetransfer.DataReceivedCallback;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.FileRange;
+import org.openslx.filetransfer.Transfer;
import org.openslx.filetransfer.WantRangeCallback;
+import org.openslx.filetransfer.util.ChunkList;
+import org.openslx.filetransfer.util.FileChunk;
+import org.openslx.util.Util;
/**
* Execute file download in a background thread and update the progress.
- *
- * @author www.codejava.net
- *
*/
-public class DownloadTask implements Runnable {
+public class DownloadTask extends TransferTask {
/**
* Logger instance for this class.
*/
private final static Logger LOGGER = Logger.getLogger(DownloadTask.class);
- private static final double UPDATE_INTERVAL_SECONDS = 0.6;
- private static final double UPDATE_INTERVAL_MS = UPDATE_INTERVAL_SECONDS * 1000;
- private static final double BYTES_PER_MIB = 1024 * 1024;
- private static final long CHUNK_SIZE = 16 * 1024 * 1024;
-
private final String host;
private final int port;
private final String downloadToken;
- private final String saveDir;
- private final long fileSize;
-
- public DownloadTask(String host, int port, String downloadToken, String saveDir, long fileSize) {
+ private final RandomAccessFile fileHandle;
+ private final ChunkList chunks;
+ private boolean fileWritable = true;
+ private AtomicInteger consecutiveInitFails = new AtomicInteger();
+
+ public DownloadTask(String host, int port, String downloadToken, File destinationFile, long fileSize,
+ List<ByteBuffer> sha1Sums) throws FileNotFoundException {
+ super(destinationFile, fileSize);
this.host = host;
this.port = port;
this.downloadToken = downloadToken;
- this.saveDir = saveDir;
- this.fileSize = fileSize;
+ this.fileHandle = new RandomAccessFile(destinationFile, "rw");
+ this.chunks = new ChunkList(fileSize, sha1Sums);
}
- class Callbacks implements WantRangeCallback, DataReceivedCallback {
- // initialize the counters needed for speed calculations
- private long currentRequestedOffset = -1;
- private long totalBytesRead = 0;
+ private class DownloadHandler implements WantRangeCallback, DataReceivedCallback {
+ private FileChunk current = null;
+ private byte[] buffer = null;
+ // progress counter
+ private long currentSpeed = 0;
+ private long currentBytes = 0;
private long lastUpdate = 0;
private long lastBytes = 0;
- private long currentBytes = 0;
- private final RandomAccessFile file;
-
- public Callbacks(RandomAccessFile file) {
- this.file = file;
- }
@Override
public FileRange get() {
- if (currentRequestedOffset == -1)
- currentRequestedOffset = 0;
- else
- currentRequestedOffset += CHUNK_SIZE;
- if (currentRequestedOffset >= fileSize)
- return null;
- long end = currentRequestedOffset + CHUNK_SIZE;
- if (end > fileSize)
- end = fileSize;
- return new FileRange(currentRequestedOffset, end);
+ handleCompletedChunk(current, buffer);
+ consecutiveInitFails.lazySet(0);
+ current = chunks.getMissing();
+ if (current != null) {
+ buffer = new byte[current.range.getLength()];
+ }
+ return current.range;
}
@Override
public boolean dataReceived(final long fileOffset, final int dataLength, final byte[] data) {
- try {
- file.seek(fileOffset);
- file.write(data, 0, dataLength);
- } catch (Exception e) {
- LOGGER.error("Could not write to file at offset " + fileOffset, e);
- return false;
- }
+ if (current == null)
+ throw new IllegalStateException("dataReceived without current chunk");
+ if (!current.range.contains(fileOffset, fileOffset + dataLength))
+ throw new IllegalStateException("dataReceived with file data out of range");
+ System.arraycopy(data, 0, buffer, (int) (fileOffset - current.range.startOffset), dataLength);
currentBytes += dataLength;
- totalBytesRead += dataLength;
final long now = System.currentTimeMillis();
if (lastUpdate + UPDATE_INTERVAL_MS < now) {
- final int percentCompleted = (int) ((totalBytesRead * 100) / fileSize);
- lastBytes = (lastBytes * 2 + currentBytes) / 3;
- final double speed = lastBytes / UPDATE_INTERVAL_SECONDS;
- LOGGER.debug(percentCompleted + "% complete (speed: " + speed/BYTES_PER_MIB + ", total: " + totalBytesRead + ")");
- lastUpdate = now;
+ synchronized (this) {
+ // Calculate updated speed
+ lastBytes = (lastBytes * 2 + currentBytes) / 3;
+ currentSpeed = (1000 * lastBytes) / (now - lastUpdate);
+ lastUpdate = now;
+ }
+ // Reset counters
currentBytes = 0;
}
- return true;
+ return fileWritable;
}
- }
+ private long getCurrentSpeed() {
+ synchronized (this) {
+ return currentSpeed;
+ }
+ }
- /**
- * Executed in background thread
- */
- @Override
- public void run() {
-
- boolean ret = false;
- // show filesize in the GUI
+ }
- Downloader download = null;
- RandomAccessFile file = null;
+ private void handleCompletedChunk(FileChunk chunk, byte[] buffer) {
+ if (chunk == null)
+ return;
+ // TODO: Hash check, async
+ chunks.markSuccessful(chunk);
try {
- download = new Downloader(host, port, Config.TRANSFER_TIMEOUT, null, downloadToken); // TODO: SSL
+ synchronized (fileHandle) {
+ fileHandle.seek(chunk.range.startOffset);
+ fileHandle.write(buffer, 0, chunk.range.getLength());
+ }
+ } catch (Exception e) {
+ LOGGER.error("Could not write to file at offset " + chunk.range.startOffset, e);
+ fileWritable = false;
+ }
+ }
+
+ private class DownloadThread extends TransferThread {
+ private Downloader downloader = null;
+ private DownloadHandler cb = new DownloadHandler();
+
+ @Override
+ public void run() {
try {
- file = new RandomAccessFile(new File(saveDir), "rw");
- } catch (Exception e2) {
- LOGGER.error("Could not open for writing: ", e2);
+ downloader = new Downloader(host, port, Config.TRANSFER_TIMEOUT, null, downloadToken);
+ } catch (Exception e) {
+ LOGGER.warn("Could not initialize new uploader", e);
+ connectFailed(this);
return;
+ } // TODO: SSL
+ connectSucceeded(this);
+
+ boolean ret = downloader.download(cb, cb);
+ if (!ret) {
+ consecutiveInitFails.incrementAndGet();
+ }
+ if (cb.current != null) {
+ chunks.markFailed(cb.current);
}
+ transferEnded(this, ret);
+ }
- Callbacks cb = new Callbacks(file);
+ @Override
+ protected Transfer getTransfer() {
+ return downloader;
+ }
- ret = download.download(cb, cb);
- } catch (IOException e) {
- LOGGER.error("Could not open connection: ", e);
- return;
- } finally {
- if (file != null) {
- try {
- file.close();
- } catch (Exception e) {
- }
- }
- if (download != null)
- download.cancel();
+ @Override
+ public long getCurrentSpeed() {
+ return cb.getCurrentSpeed();
}
- // if the download succeeded, set the progress to 100% manually again here to make
- // sure the GUI knows about it.
- if (ret) {
- LOGGER.debug("successul download.");
+ }
+
+ @Override
+ protected void cleanup() {
+ Util.safeClose(fileHandle);
+ }
+
+ @Override
+ protected TransferEvent getTransferEvent() {
+ final TransferState state;
+ final byte[] progress = chunks.getStatusArray().array();
+ final String error;
+ if (consecutiveInitFails.get() > 20) {
+ state = TransferState.ERROR;
+ error = "Cannot talk to server after 20 tries...";
+ } else {
+ state = TransferState.WORKING;
+ error = null;
+ }
+ long speed = 0;
+ long timeRemaining = 0;
+ synchronized (transfers) {
+ for (TransferThread thread : transfers) {
+ speed += thread.getCurrentSpeed();
+ }
}
+ return new TransferEvent(state, progress, speed, timeRemaining, error);
+ }
- return;
+ @Override
+ protected TransferThread createNewThread() {
+ return new DownloadThread();
}
}
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/TransferTask.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/TransferTask.java
index ee6aeff5..3bb8fa94 100644
--- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/TransferTask.java
+++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/TransferTask.java
@@ -60,11 +60,18 @@ public abstract class TransferTask implements Runnable {
Util.joinThread(t);
}
for (TransferThread t : connectingTransfers) {
- t.getTransfer().cancel();
+ if (t.getTransfer() != null) {
+ t.getTransfer().cancel();
+ }
t.interrupt();
Util.joinThread(t);
}
isRunning = false;
+ cleanup();
+ }
+
+ protected void cleanup() {
+ // By default, this does nothing
}
public boolean isRunning() {
@@ -111,7 +118,9 @@ public abstract class TransferTask implements Runnable {
t.getTransfer().cancel();
}
for (TransferThread t : connectingTransfers) {
- t.getTransfer().cancel();
+ if (t.getTransfer() != null) {
+ t.getTransfer().cancel();
+ }
}
isCancelled = true;
}
@@ -131,6 +140,7 @@ public abstract class TransferTask implements Runnable {
if (transfers.size() + connectingTransfers.size() < minConnectionCount) {
TransferThread thread = createNewThread();
if (thread != null) {
+ thread.setDaemon(true);
thread.start();
connectingTransfers.add(thread);
}
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java
index 779344a3..b0655115 100644
--- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java
+++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/UploadTask.java
@@ -48,10 +48,10 @@ public class UploadTask extends TransferTask {
private class UploadThread extends TransferThread {
// private long totalBytesRead = 0;
private long currentSpeed = 0;
+ private Uploader uploader = null;
@Override
public void run() {
- final Uploader uploader;
try {
uploader = new Uploader(host, port, Config.TRANSFER_TIMEOUT, null, uploadToken);
} catch (Exception e) {
@@ -102,8 +102,7 @@ public class UploadTask extends TransferTask {
@Override
protected Transfer getTransfer() {
- // TODO Auto-generated method stub
- return null;
+ return uploader;
}
}
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java
index 6894c85f..adf0f26c 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java
@@ -11,7 +11,6 @@ import org.openslx.bwlp.sat.database.Database;
import org.openslx.bwlp.sat.database.MysqlConnection;
import org.openslx.bwlp.sat.database.MysqlStatement;
import org.openslx.bwlp.sat.database.Paginator;
-import org.openslx.bwlp.sat.fileserv.ChunkList;
import org.openslx.bwlp.thrift.iface.ImageBaseWrite;
import org.openslx.bwlp.thrift.iface.ImageDetailsRead;
import org.openslx.bwlp.thrift.iface.ImagePermissions;
@@ -21,6 +20,7 @@ import org.openslx.bwlp.thrift.iface.ImageVersionWrite;
import org.openslx.bwlp.thrift.iface.ShareMode;
import org.openslx.bwlp.thrift.iface.TNotFoundException;
import org.openslx.bwlp.thrift.iface.UserInfo;
+import org.openslx.filetransfer.util.ChunkList;
public class DbImage {
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java
index da15656d..08d0d30f 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java
@@ -24,6 +24,8 @@ import org.openslx.filetransfer.DataReceivedCallback;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.FileRange;
import org.openslx.filetransfer.WantRangeCallback;
+import org.openslx.filetransfer.util.ChunkList;
+import org.openslx.filetransfer.util.FileChunk;
public class ActiveUpload {
private static final Logger LOGGER = Logger.getLogger(ActiveUpload.class);
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ChunkList.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ChunkList.java
deleted file mode 100644
index 385a6484..00000000
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ChunkList.java
+++ /dev/null
@@ -1,113 +0,0 @@
-package org.openslx.bwlp.sat.fileserv;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.log4j.Logger;
-
-public class ChunkList {
-
- private static final Logger LOGGER = Logger.getLogger(ChunkList.class);
-
- /**
- * Chunks that are missing from the file
- */
- private final List<FileChunk> missingChunks = new LinkedList<>();
-
- /**
- * Chunks that are currently being uploaded or hash-checked
- */
- private final List<FileChunk> pendingChunks = new LinkedList<>();
-
- private final List<FileChunk> completeChunks = new ArrayList<>(100);
-
- // 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying
- private final ByteBuffer statusArray;
-
- // Do we need to keep valid chunks, or chunks that failed too many times?
-
- public ChunkList(long fileSize, List<ByteBuffer> sha1Sums) {
- FileChunk.createChunkList(missingChunks, fileSize, sha1Sums);
- statusArray = ByteBuffer.allocate(missingChunks.size());
- }
-
- /**
- * Get a missing chunk, marking it pending.
- *
- * @return chunk marked as missing
- */
- public synchronized FileChunk getMissing() {
- if (missingChunks.isEmpty())
- return null;
- FileChunk c = missingChunks.remove(0);
- pendingChunks.add(c);
- return c;
- }
-
- /**
- * Get the block status as byte representation.
- */
- public synchronized ByteBuffer getStatusArray() {
- byte[] array = statusArray.array();
- //Arrays.fill(array, (byte)0);
- for (FileChunk c : missingChunks) {
- array[c.getChunkIndex()] = 1;
- }
- for (FileChunk c : pendingChunks) {
- array[c.getChunkIndex()] = 2;
- }
- for (FileChunk c : completeChunks) {
- array[c.getChunkIndex()] = 0;
- }
- return statusArray;
- }
-
- /**
- * Get completed chunks as list
- *
- * @return List containing all successfully transfered chunks
- */
- public synchronized List<FileChunk> getCompleted() {
- return new ArrayList<>(completeChunks);
- }
-
- /**
- * Mark a chunk currently transferring as successfully transfered.
- *
- * @param c The chunk in question
- */
- public synchronized void markSuccessful(FileChunk c) {
- if (!pendingChunks.remove(c)) {
- LOGGER.warn("Inconsistent state: markTransferred called for Chunk " + c.toString()
- + ", but chunk is not marked as currently transferring!");
- return;
- }
- completeChunks.add(c);
- }
-
- /**
- * Mark a chunk currently transferring or being hash checked as failed
- * transfer. This increases its fail count and re-adds it to the list of
- * missing chunks.
- *
- * @param c The chunk in question
- * @return Number of times transfer of this chunk failed
- */
- public synchronized int markFailed(FileChunk c) {
- if (!pendingChunks.remove(c)) {
- LOGGER.warn("Inconsistent state: markTransferred called for Chunk " + c.toString()
- + ", but chunk is not marked as currently transferring!");
- return -1;
- }
- // Add as first element so it will be re-transmitted immediately
- missingChunks.add(0, c);
- return c.incFailed();
- }
-
- public synchronized boolean isComplete() {
- return missingChunks.isEmpty() && pendingChunks.isEmpty();
- }
-
-}
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileChunk.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileChunk.java
deleted file mode 100644
index b322e65d..00000000
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileChunk.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package org.openslx.bwlp.sat.fileserv;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-
-import org.openslx.filetransfer.FileRange;
-
-public class FileChunk {
-
- public static final int CHUNK_SIZE_MIB = 16;
- public static final int CHUNK_SIZE = CHUNK_SIZE_MIB * (1024 * 1024);
-
- public final FileRange range;
- public final byte[] sha1sum;
- private int failCount = 0;
-
- public FileChunk(long startOffset, long endOffset, byte[] sha1sum) {
- this.range = new FileRange(startOffset, endOffset);
- this.sha1sum = sha1sum;
- }
-
- /**
- * Signal that transferring this chunk seems to have failed (checksum
- * mismatch).
- *
- * @return Number of times the transfer failed now
- */
- public synchronized int incFailed() {
- return ++failCount;
- }
-
- public int getChunkIndex() {
- return (int)(range.startOffset / CHUNK_SIZE);
- }
-
- @Override
- public String toString() {
- return "[Chunk " + getChunkIndex() + " (" + range.startOffset + "-" + range.endOffset + "), fails: " + failCount + "]";
- }
-
- //
-
- public static int fileSizeToChunkCount(long fileSize) {
- return (int) ((fileSize + CHUNK_SIZE - 1) / CHUNK_SIZE);
- }
-
- public static void createChunkList(Collection<FileChunk> list, long fileSize, List<ByteBuffer> sha1Sums) {
- if (fileSize < 0)
- throw new IllegalArgumentException("fileSize cannot be negative");
- if (!list.isEmpty())
- throw new IllegalArgumentException("Passed list is not empty");
- long chunkCount = fileSizeToChunkCount(fileSize);
- if (sha1Sums != null) {
- if (sha1Sums.size() != chunkCount)
- throw new IllegalArgumentException(
- "Passed a sha1sum list, but hash count in list doesn't match expected chunk count");
- long offset = 0;
- for (ByteBuffer sha1sum : sha1Sums) { // Do this as we don't know how efficient List.get(index) is...
- long end = offset + CHUNK_SIZE;
- if (end > fileSize)
- end = fileSize;
- list.add(new FileChunk(offset, end, sha1sum.array()));
- offset = end;
- }
- return;
- }
- long offset = 0;
- while (offset < fileSize) { // ...otherwise we could share this code
- long end = offset + CHUNK_SIZE;
- if (end > fileSize)
- end = fileSize;
- list.add(new FileChunk(offset, end, null));
- offset = end;
- }
- }
-}
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
index 93089b5a..3abc5f98 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
@@ -68,6 +68,7 @@ public class FileServer implements IncomingEvent {
ActiveUpload upload = uploads.get(token);
if (upload == null) {
LOGGER.warn("Unknown token " + token);
+ downloader.cancel();
return;
}
upload.addConnection(downloader, transferPool);
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java
index 8216fd86..ad5ea0e2 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java
@@ -1,6 +1,6 @@
package org.openslx.bwlp.sat.util;
-import org.openslx.bwlp.sat.fileserv.FileChunk;
+import org.openslx.filetransfer.util.FileChunk;
public class Constants {
public static final String INCOMPLETE_UPLOAD_SUFFIX = ".part";