diff options
author | Simon Rettberg | 2015-07-16 17:41:53 +0200 |
---|---|---|
committer | Simon Rettberg | 2015-07-16 17:41:53 +0200 |
commit | 6045823f6139c0a06bbe8fa3e8de56ba87b68a2c (patch) | |
tree | ef66a014289312b3c3f1a8a0a332a332e51a35a4 /dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java | |
parent | [client] upload workflow working aside from the periodic status query of the ... (diff) | |
download | tutor-module-6045823f6139c0a06bbe8fa3e8de56ba87b68a2c.tar.gz tutor-module-6045823f6139c0a06bbe8fa3e8de56ba87b68a2c.tar.xz tutor-module-6045823f6139c0a06bbe8fa3e8de56ba87b68a2c.zip |
[server] Finish implementing uploads (no hash checking yet)
Diffstat (limited to 'dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java')
-rw-r--r-- | dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java | 114 |
1 files changed, 90 insertions, 24 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java index ad2949aa..dbeb24ae 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java @@ -5,15 +5,21 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.sql.SQLException; import java.util.List; import java.util.UUID; import java.util.concurrent.ThreadPoolExecutor; import org.apache.log4j.Logger; +import org.openslx.bwlp.sat.database.mappers.DbImage; import org.openslx.bwlp.sat.util.Configuration; import org.openslx.bwlp.sat.util.FileSystem; import org.openslx.bwlp.sat.util.Formatter; +import org.openslx.bwlp.sat.util.Util; import org.openslx.bwlp.thrift.iface.ImageDetailsRead; +import org.openslx.bwlp.thrift.iface.ImageVersionWrite; +import org.openslx.bwlp.thrift.iface.TransferState; +import org.openslx.bwlp.thrift.iface.TransferStatus; import org.openslx.bwlp.thrift.iface.UserInfo; import org.openslx.filetransfer.DataReceivedCallback; import org.openslx.filetransfer.Downloader; @@ -46,16 +52,41 @@ public class ActiveUpload { */ private final ImageDetailsRead image; + /** + * Flags to set for this new image version. Optional field. + */ + private ImageVersionWrite versionSettings = null; + + /** + * TransferState of this upload + */ + private TransferState state = TransferState.IDLE; + + /** + * ID of this upload - will become this version id on success. + */ + private final String uploadId; + // TODO: Use HashList for verification - public ActiveUpload(UserInfo owner, ImageDetailsRead image, File destinationFile, long fileSize, - List<ByteBuffer> sha1Sums) throws FileNotFoundException { + public ActiveUpload(String uploadId, UserInfo owner, ImageDetailsRead image, File destinationFile, + long fileSize, List<ByteBuffer> sha1Sums) throws FileNotFoundException { this.destinationFile = destinationFile; this.outFile = new RandomAccessFile(destinationFile, "rw"); this.chunks = new ChunkList(fileSize, sha1Sums); this.owner = owner; this.image = image; this.fileSize = fileSize; + this.uploadId = uploadId; + } + + /** + * Set meta data for this image version. + * + * @param data + */ + public synchronized void setVersionData(ImageVersionWrite data) { + versionSettings = data; } /** @@ -67,20 +98,30 @@ public class ActiveUpload { * discarded */ public synchronized boolean addConnection(Downloader connection, ThreadPoolExecutor pool) { - if (download != null || chunks.isComplete()) + if (download != null || chunks.isComplete() || state == TransferState.FINISHED + || state == TransferState.ERROR) return false; download = connection; - pool.execute(new Runnable() { - @Override - public void run() { - CbHandler cbh = new CbHandler(); - if (!download.download(cbh, cbh) && cbh.currentChunk != null) { - // If the download failed and we have a current chunk, put it back into - // the queue, so it will be handled again later... - chunks.markFailed(cbh.currentChunk); + try { + pool.execute(new Runnable() { + @Override + public void run() { + CbHandler cbh = new CbHandler(); + if (!download.download(cbh, cbh)) { + if (cbh.currentChunk != null) { + // If the download failed and we have a current chunk, put it back into + // the queue, so it will be handled again later... + chunks.markFailed(cbh.currentChunk); + } + LOGGER.warn("Download of " + destinationFile.getAbsolutePath()); + } } - } - }); + }); + state = TransferState.WORKING; + } catch (Exception e) { + LOGGER.warn("threadpool rejected the incoming file transfer", e); + return false; + } return true; } @@ -94,6 +135,8 @@ public class ActiveUpload { * @return */ private boolean writeFileData(long fileOffset, int dataLength, byte[] data) { + if (state != TransferState.WORKING) + throw new IllegalStateException("Cannot write to file if state != RUNNING"); synchronized (outFile) { try { outFile.seek(fileOffset); @@ -107,7 +150,16 @@ public class ActiveUpload { return true; } - private void finishUpload() { + /** + * Called when the upload finished. + */ + private synchronized void finishUpload() { + if (state != TransferState.WORKING) + return; + synchronized (outFile) { + Util.safeClose(outFile); + state = TransferState.FINISHED; + } File file = destinationFile; // Ready to go. First step: Rename temp file to something usable File destination = new File(file.getParent(), Formatter.vmName(owner, image.imageName)); @@ -116,7 +168,8 @@ public class ActiveUpload { if (relPath == null) { LOGGER.warn(destination.getAbsolutePath() + " is not a subdir of " + Configuration.getVmStoreBasePath().getAbsolutePath()); - // TODO: Update state to failed... + state = TransferState.ERROR; + return; } // Execute rename @@ -133,17 +186,22 @@ public class ActiveUpload { LOGGER.warn( "Could not rename '" + file.getAbsolutePath() + "' to '" + destination.getAbsolutePath() + "'", renameException); - // TODO: Update state.... + state = TransferState.ERROR; + return; } // Now insert meta data into DB - - final String imageVersionId = UUID.randomUUID().toString(); - - // TODO: SQL magic, update state + try { + DbImage.createImageVersion(image.imageBaseId, uploadId, owner, fileSize, relPath, + versionSettings, chunks); + } catch (SQLException e) { + LOGGER.error("Error finishing upload: Inserting version to DB failed", e); + state = TransferState.ERROR; + return; + } } - public void cancel() { + public synchronized void cancel() { // TODO Auto-generated method stub } @@ -157,8 +215,8 @@ public class ActiveUpload { return this.owner; } - public boolean isComplete() { - return chunks.isComplete() && destinationFile.length() == this.fileSize; + public synchronized boolean isComplete() { + return state == TransferState.FINISHED; } public File getDestinationFile() { @@ -193,15 +251,23 @@ public class ActiveUpload { if (currentChunk != null) { // TODO: A chunk was requested before, check hash and requeue if not matching // This needs to be async (own thread) so will be a little complicated + LOGGER.info("There was a previous chunk!"); } // Get next missing chunk currentChunk = chunks.getMissing(); - if (currentChunk == null) + LOGGER.info("Next missing chunk: " + currentChunk); + if (currentChunk == null) { + finishUpload(); return null; // No more chunks, returning null tells the Downloader we're done. + } return currentChunk.range; } } + public TransferStatus getStatus() { + return new TransferStatus(chunks.getStatusArray(), state); + } + // TODO: Clean up old stale uploads } |