diff options
author | Simon Rettberg | 2015-07-16 17:41:53 +0200 |
---|---|---|
committer | Simon Rettberg | 2015-07-16 17:41:53 +0200 |
commit | 6045823f6139c0a06bbe8fa3e8de56ba87b68a2c (patch) | |
tree | ef66a014289312b3c3f1a8a0a332a332e51a35a4 | |
parent | [client] upload workflow working aside from the periodic status query of the ... (diff) | |
download | tutor-module-6045823f6139c0a06bbe8fa3e8de56ba87b68a2c.tar.gz tutor-module-6045823f6139c0a06bbe8fa3e8de56ba87b68a2c.tar.xz tutor-module-6045823f6139c0a06bbe8fa3e8de56ba87b68a2c.zip |
[server] Finish implementing uploads (no hash checking yet)
10 files changed, 241 insertions, 41 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java index 570cf1fa..5ceb72cc 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java @@ -22,6 +22,7 @@ import org.openslx.bwlp.thrift.iface.NetDirection; import org.openslx.bwlp.thrift.iface.NetRule; import org.openslx.bwlp.thrift.iface.UserInfo; import org.openslx.thrifthelper.ThriftManager; +import org.openslx.thrifthelper.ThriftManager.ErrorCallback; import org.openslx.util.QuickTimer; import org.openslx.util.QuickTimer.Task; @@ -52,10 +53,26 @@ public class App { System.exit(1); } + ThriftManager.setErrorCallback(new ErrorCallback() { + + @Override + public boolean thriftError(int failCount, String method, Throwable t) { + if (failCount > 2) { + LOGGER.warn("Thrift Client error for " + method + ", FAIL."); + return false; + } + LOGGER.info("Thrift Client error for " + method + ", retrying..."); + try { + Thread.sleep(failCount * 250); + } catch (InterruptedException e) { + } + return true; + } + }); + ThriftManager.setMasterServerAddress("bwlp-masterserver.ruf.uni-freiburg.de"); // Load useful things from master server - //LOGGER.info(ThriftManager.getMasterClient().getUserFromToken("9ECAC1AFC02FF295292362BD165847AE")); OrganizationList.get(); OperatingSystemList.get(); diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/MysqlStatement.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/MysqlStatement.java index 620ee2e4..e7573cb1 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/MysqlStatement.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/MysqlStatement.java @@ -139,6 +139,22 @@ public class MysqlStatement implements Closeable { } /** + * Sets a parameter. + * + * @param name parameter name + * @param value parameter value + * @throws SQLException if an error occurred + * @throws IllegalArgumentException if the parameter does not exist + * @see PreparedStatement#setBoolean(int, boolean) + */ + public void setBinary(String name, byte[] value) throws SQLException { + List<Integer> indexes = getIndexes(name); + for (Integer index : indexes) { + statement.setBytes(index, value); + } + } + + /** * Executes the statement. * * @return true if the first result is a {@link ResultSet} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java index c46fc690..6894c85f 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java @@ -11,6 +11,7 @@ import org.openslx.bwlp.sat.database.Database; import org.openslx.bwlp.sat.database.MysqlConnection; import org.openslx.bwlp.sat.database.MysqlStatement; import org.openslx.bwlp.sat.database.Paginator; +import org.openslx.bwlp.sat.fileserv.ChunkList; import org.openslx.bwlp.thrift.iface.ImageBaseWrite; import org.openslx.bwlp.thrift.iface.ImageDetailsRead; import org.openslx.bwlp.thrift.iface.ImagePermissions; @@ -373,4 +374,35 @@ public class DbImage { } } + public static void createImageVersion(String imageBaseId, String imageVersionId, UserInfo owner, long fileSize, String filePath, ImageVersionWrite versionSettings, ChunkList chunks) throws SQLException { + try (MysqlConnection connection = Database.getConnection()) { + final long nowSecs = System.currentTimeMillis() / 1000; + MysqlStatement stmt = connection.prepareStatement("INSERT INTO imageversion" + + " (imageversionid, imagebaseid, createtime, expiretime, filesize, filepath, uploaderid," + + " isenabled, isrestricted, isvalid, isprocessed, mastersha1, virtualizerconfig)" + + " VALUES " + + " (:imageversionid, :imagebaseid, :createtime, :expiretime, :filesize, :filepath," + + " :uploaderid, :isenabled, :isrestricted, :isvalid, :isprocessed, :mastersha1, :virtualizerconfig)"); + stmt.setString("imageversionid", imageVersionId); + stmt.setString("imagebaseid", imageBaseId); + stmt.setLong("createtime", nowSecs); + stmt.setLong("expiretime", nowSecs + 86400 * 365); // TODO: Config! + stmt.setLong("filesize", fileSize); + stmt.setString("filepath", filePath); + stmt.setString("uploaderid", owner.userId); + stmt.setBoolean("isenabled", versionSettings == null ? false : versionSettings.isEnabled); + stmt.setBoolean("isrestricted", versionSettings == null ? false : versionSettings.isRestricted); + stmt.setBoolean("isvalid", true); // TODO + stmt.setBoolean("isprocessed", false); + stmt.setBinary("mastersha1", null); // TODO + stmt.setString("virtualizerconfig", null); // TODO + stmt.executeUpdate(); + // TODO: Write chunk hashes to DB + connection.commit(); + } catch (SQLException e) { + LOGGER.error("Query failed in DbImage.createImageVersion()", e); + throw e; + } + } + } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java index ad2949aa..dbeb24ae 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ActiveUpload.java @@ -5,15 +5,21 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.sql.SQLException; import java.util.List; import java.util.UUID; import java.util.concurrent.ThreadPoolExecutor; import org.apache.log4j.Logger; +import org.openslx.bwlp.sat.database.mappers.DbImage; import org.openslx.bwlp.sat.util.Configuration; import org.openslx.bwlp.sat.util.FileSystem; import org.openslx.bwlp.sat.util.Formatter; +import org.openslx.bwlp.sat.util.Util; import org.openslx.bwlp.thrift.iface.ImageDetailsRead; +import org.openslx.bwlp.thrift.iface.ImageVersionWrite; +import org.openslx.bwlp.thrift.iface.TransferState; +import org.openslx.bwlp.thrift.iface.TransferStatus; import org.openslx.bwlp.thrift.iface.UserInfo; import org.openslx.filetransfer.DataReceivedCallback; import org.openslx.filetransfer.Downloader; @@ -46,16 +52,41 @@ public class ActiveUpload { */ private final ImageDetailsRead image; + /** + * Flags to set for this new image version. Optional field. + */ + private ImageVersionWrite versionSettings = null; + + /** + * TransferState of this upload + */ + private TransferState state = TransferState.IDLE; + + /** + * ID of this upload - will become this version id on success. + */ + private final String uploadId; + // TODO: Use HashList for verification - public ActiveUpload(UserInfo owner, ImageDetailsRead image, File destinationFile, long fileSize, - List<ByteBuffer> sha1Sums) throws FileNotFoundException { + public ActiveUpload(String uploadId, UserInfo owner, ImageDetailsRead image, File destinationFile, + long fileSize, List<ByteBuffer> sha1Sums) throws FileNotFoundException { this.destinationFile = destinationFile; this.outFile = new RandomAccessFile(destinationFile, "rw"); this.chunks = new ChunkList(fileSize, sha1Sums); this.owner = owner; this.image = image; this.fileSize = fileSize; + this.uploadId = uploadId; + } + + /** + * Set meta data for this image version. + * + * @param data + */ + public synchronized void setVersionData(ImageVersionWrite data) { + versionSettings = data; } /** @@ -67,20 +98,30 @@ public class ActiveUpload { * discarded */ public synchronized boolean addConnection(Downloader connection, ThreadPoolExecutor pool) { - if (download != null || chunks.isComplete()) + if (download != null || chunks.isComplete() || state == TransferState.FINISHED + || state == TransferState.ERROR) return false; download = connection; - pool.execute(new Runnable() { - @Override - public void run() { - CbHandler cbh = new CbHandler(); - if (!download.download(cbh, cbh) && cbh.currentChunk != null) { - // If the download failed and we have a current chunk, put it back into - // the queue, so it will be handled again later... - chunks.markFailed(cbh.currentChunk); + try { + pool.execute(new Runnable() { + @Override + public void run() { + CbHandler cbh = new CbHandler(); + if (!download.download(cbh, cbh)) { + if (cbh.currentChunk != null) { + // If the download failed and we have a current chunk, put it back into + // the queue, so it will be handled again later... + chunks.markFailed(cbh.currentChunk); + } + LOGGER.warn("Download of " + destinationFile.getAbsolutePath()); + } } - } - }); + }); + state = TransferState.WORKING; + } catch (Exception e) { + LOGGER.warn("threadpool rejected the incoming file transfer", e); + return false; + } return true; } @@ -94,6 +135,8 @@ public class ActiveUpload { * @return */ private boolean writeFileData(long fileOffset, int dataLength, byte[] data) { + if (state != TransferState.WORKING) + throw new IllegalStateException("Cannot write to file if state != RUNNING"); synchronized (outFile) { try { outFile.seek(fileOffset); @@ -107,7 +150,16 @@ public class ActiveUpload { return true; } - private void finishUpload() { + /** + * Called when the upload finished. + */ + private synchronized void finishUpload() { + if (state != TransferState.WORKING) + return; + synchronized (outFile) { + Util.safeClose(outFile); + state = TransferState.FINISHED; + } File file = destinationFile; // Ready to go. First step: Rename temp file to something usable File destination = new File(file.getParent(), Formatter.vmName(owner, image.imageName)); @@ -116,7 +168,8 @@ public class ActiveUpload { if (relPath == null) { LOGGER.warn(destination.getAbsolutePath() + " is not a subdir of " + Configuration.getVmStoreBasePath().getAbsolutePath()); - // TODO: Update state to failed... + state = TransferState.ERROR; + return; } // Execute rename @@ -133,17 +186,22 @@ public class ActiveUpload { LOGGER.warn( "Could not rename '" + file.getAbsolutePath() + "' to '" + destination.getAbsolutePath() + "'", renameException); - // TODO: Update state.... + state = TransferState.ERROR; + return; } // Now insert meta data into DB - - final String imageVersionId = UUID.randomUUID().toString(); - - // TODO: SQL magic, update state + try { + DbImage.createImageVersion(image.imageBaseId, uploadId, owner, fileSize, relPath, + versionSettings, chunks); + } catch (SQLException e) { + LOGGER.error("Error finishing upload: Inserting version to DB failed", e); + state = TransferState.ERROR; + return; + } } - public void cancel() { + public synchronized void cancel() { // TODO Auto-generated method stub } @@ -157,8 +215,8 @@ public class ActiveUpload { return this.owner; } - public boolean isComplete() { - return chunks.isComplete() && destinationFile.length() == this.fileSize; + public synchronized boolean isComplete() { + return state == TransferState.FINISHED; } public File getDestinationFile() { @@ -193,15 +251,23 @@ public class ActiveUpload { if (currentChunk != null) { // TODO: A chunk was requested before, check hash and requeue if not matching // This needs to be async (own thread) so will be a little complicated + LOGGER.info("There was a previous chunk!"); } // Get next missing chunk currentChunk = chunks.getMissing(); - if (currentChunk == null) + LOGGER.info("Next missing chunk: " + currentChunk); + if (currentChunk == null) { + finishUpload(); return null; // No more chunks, returning null tells the Downloader we're done. + } return currentChunk.range; } } + public TransferStatus getStatus() { + return new TransferStatus(chunks.getStatusArray(), state); + } + // TODO: Clean up old stale uploads } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ChunkList.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ChunkList.java index b07193c5..385a6484 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ChunkList.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/ChunkList.java @@ -1,6 +1,7 @@ package org.openslx.bwlp.sat.fileserv; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -20,10 +21,16 @@ public class ChunkList { */ private final List<FileChunk> pendingChunks = new LinkedList<>(); + private final List<FileChunk> completeChunks = new ArrayList<>(100); + + // 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying + private final ByteBuffer statusArray; + // Do we need to keep valid chunks, or chunks that failed too many times? public ChunkList(long fileSize, List<ByteBuffer> sha1Sums) { FileChunk.createChunkList(missingChunks, fileSize, sha1Sums); + statusArray = ByteBuffer.allocate(missingChunks.size()); } /** @@ -40,6 +47,33 @@ public class ChunkList { } /** + * Get the block status as byte representation. + */ + public synchronized ByteBuffer getStatusArray() { + byte[] array = statusArray.array(); + //Arrays.fill(array, (byte)0); + for (FileChunk c : missingChunks) { + array[c.getChunkIndex()] = 1; + } + for (FileChunk c : pendingChunks) { + array[c.getChunkIndex()] = 2; + } + for (FileChunk c : completeChunks) { + array[c.getChunkIndex()] = 0; + } + return statusArray; + } + + /** + * Get completed chunks as list + * + * @return List containing all successfully transfered chunks + */ + public synchronized List<FileChunk> getCompleted() { + return new ArrayList<>(completeChunks); + } + + /** * Mark a chunk currently transferring as successfully transfered. * * @param c The chunk in question @@ -50,6 +84,7 @@ public class ChunkList { + ", but chunk is not marked as currently transferring!"); return; } + completeChunks.add(c); } /** diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileChunk.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileChunk.java index ffa033a5..b322e65d 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileChunk.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileChunk.java @@ -29,6 +29,15 @@ public class FileChunk { public synchronized int incFailed() { return ++failCount; } + + public int getChunkIndex() { + return (int)(range.startOffset / CHUNK_SIZE); + } + + @Override + public String toString() { + return "[Chunk " + getChunkIndex() + " (" + range.startOffset + "-" + range.endOffset + "), fails: " + failCount + "]"; + } // @@ -39,6 +48,8 @@ public class FileChunk { public static void createChunkList(Collection<FileChunk> list, long fileSize, List<ByteBuffer> sha1Sums) { if (fileSize < 0) throw new IllegalArgumentException("fileSize cannot be negative"); + if (!list.isEmpty()) + throw new IllegalArgumentException("Passed list is not empty"); long chunkCount = fileSizeToChunkCount(fileSize); if (sha1Sums != null) { if (sha1Sums.size() != chunkCount) diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java index 44746fe2..3859322f 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java @@ -8,11 +8,15 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.openslx.bwlp.sat.util.Constants; import org.openslx.bwlp.sat.util.Formatter; +import org.openslx.bwlp.thrift.iface.ImageDetailsRead; import org.openslx.bwlp.thrift.iface.TTransferRejectedException; import org.openslx.bwlp.thrift.iface.UserInfo; import org.openslx.filetransfer.Downloader; @@ -27,12 +31,14 @@ public class FileServer implements IncomingEvent { /** * Listener for incoming unencrypted connections */ - private Listener plainListener = new Listener(this, null, 9092); // TODO: Config + private final Listener plainListener = new Listener(this, null, 9092); // TODO: Config + + private final ThreadPoolExecutor transferPool = new ThreadPoolExecutor(2, Constants.MAX_UPLOADS + Constants.MAX_DOWNLOADS, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1)); /** * All currently running uploads, indexed by token */ - private Map<String, ActiveUpload> uploads = new ConcurrentHashMap<>(); + private final Map<String, ActiveUpload> uploads = new ConcurrentHashMap<>(); private static final FileServer globalInstance = new FileServer(); @@ -57,8 +63,14 @@ public class FileServer implements IncomingEvent { @Override public void incomingUploadRequest(Downloader downloader) throws IOException { - // TODO Auto-generated method stub - + String token = downloader.getToken(); + LOGGER.info("Incoming filetransfer with token " + token); + ActiveUpload upload = uploads.get(token); + if (upload == null) { + LOGGER.warn("Unknown token " + token); + return; + } + upload.addConnection(downloader, transferPool); } /** @@ -71,7 +83,7 @@ public class FileServer implements IncomingEvent { return uploads.get(uploadToken); } - public String createNewUserUpload(UserInfo owner, long fileSize, List<ByteBuffer> sha1Sums) + public String createNewUserUpload(UserInfo owner, ImageDetailsRead image, long fileSize, List<ByteBuffer> sha1Sums) throws TTransferRejectedException { Iterator<ActiveUpload> it = uploads.values().iterator(); int activeUploads = 0; @@ -92,15 +104,16 @@ public class FileServer implements IncomingEvent { destinationFile = Formatter.getTempImageName(); } while (destinationFile.exists()); destinationFile.getParentFile().mkdirs(); - // TODO: Pass image + + String key = UUID.randomUUID().toString(); ActiveUpload upload; try { - upload = new ActiveUpload(owner, null, destinationFile, fileSize, sha1Sums); + upload = new ActiveUpload(key, owner, image, destinationFile, fileSize, sha1Sums); } catch (FileNotFoundException e) { LOGGER.error("Could not open destination file for writing", e); throw new TTransferRejectedException("Destination file not writable!"); } - String key = UUID.randomUUID().toString(); + uploads.put(key, upload); return key; } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java index 7f9f9bbc..45373ae4 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java @@ -72,7 +72,13 @@ public class ServerHandler implements SatelliteServer.Iface { TInternalServerError, TNotFoundException { UserInfo user = SessionManager.getOrFail(userToken); User.canEditBaseImageOrFail(user, imageBaseId); - String transferId = fileServer.createNewUserUpload(user, fileSize, blockHashes); + ImageDetailsRead image; + try { + image = DbImage.getImageDetails(user, imageBaseId); + } catch (SQLException e) { + throw new TInternalServerError(); + } + String transferId = fileServer.createNewUserUpload(user, image, fileSize, blockHashes); return new TransferInformation(transferId, fileServer.getPlainPort(), fileServer.getSslPort()); } @@ -86,8 +92,10 @@ public class ServerHandler implements SatelliteServer.Iface { @Override public TransferStatus queryUploadStatus(String uploadToken) throws TInvalidTokenException { - // TODO Auto-generated method stub - return null; + ActiveUpload upload = fileServer.getUploadByToken(uploadToken); + if (upload == null) + throw new TInvalidTokenException(); + return upload.getStatus(); } @Override @@ -143,9 +151,8 @@ public class ServerHandler implements SatelliteServer.Iface { @Override public List<ImageSummaryRead> getImageList(String userToken, List<String> tagSearch, int page) throws TAuthorizationException, TInternalServerError { - //UserInfo user = SessionManager.getOrFail(userToken); - //User.canListImagesOrFail(user); - UserInfo user = new UserInfo("bla", "bla", "bla", "bla", "bla"); + UserInfo user = SessionManager.getOrFail(userToken); + User.canListImagesOrFail(user); try { return DbImage.getAllVisible(user, tagSearch, page); } catch (SQLException e) { diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/SessionManager.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/SessionManager.java index 936b7a06..c8384459 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/SessionManager.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/SessionManager.java @@ -110,9 +110,10 @@ public class SessionManager { private static UserInfo getInternal(String token) throws TAuthorizationException, TInternalServerError { Entry e = tokenManager.get(token); - LOGGER.info("Cache miss for token " + token + ", asking master"); - if (e == null) + if (e == null) { + LOGGER.info("Cache miss for token " + token + ", asking master"); return getRemote(token); + } // User session already cached final long now = System.currentTimeMillis(); if (e.isTooOld(now)) { diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java index 6c2dc31b..0d6328b4 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Constants.java @@ -5,6 +5,7 @@ import org.openslx.bwlp.sat.fileserv.FileChunk; public class Constants { public static final String INCOMPLETE_UPLOAD_SUFFIX = ".part"; public static final int MAX_UPLOADS; + public static final int MAX_DOWNLOADS; static { long maxMem = Runtime.getRuntime().maxMemory(); @@ -17,5 +18,6 @@ public class Constants { // Now maxMem is the amount of memory in MiB MAX_UPLOADS = (int) Math.max(1, (maxMem - 64) / (FileChunk.CHUNK_SIZE_MIB + 1)); + MAX_DOWNLOADS = MAX_UPLOADS * 2; } } |