diff options
12 files changed, 983 insertions, 44 deletions
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java index 07efc449..479e634d 100644 --- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java +++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java @@ -9,7 +9,6 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -21,7 +20,7 @@ import org.openslx.bwlp.thrift.iface.TInvalidTokenException; import org.openslx.dozmod.thrift.Session; import org.openslx.filetransfer.util.FileChunk; import org.openslx.thrifthelper.ThriftManager; -import org.openslx.util.GrowingThreadPoolExecutor; +import org.openslx.util.CascadedThreadPoolExecutor; import org.openslx.util.PrioThreadFactory; import org.openslx.util.Util; @@ -29,10 +28,12 @@ public class AsyncHashGenerator extends Thread { private static final Logger LOGGER = LogManager.getLogger(AsyncHashGenerator.class); - private static final ThreadPoolExecutor HASH_WORK_POOL = new GrowingThreadPoolExecutor(1, - Math.max(1, (int)Math.min(Runtime.getRuntime().availableProcessors() - 1, - Runtime.getRuntime().maxMemory() / (FileChunk.CHUNK_SIZE * 3))), - 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), + private static final ThreadPoolExecutor HASH_WORK_POOL = new CascadedThreadPoolExecutor(1, + Math.max(1, (int)Math.min( + Runtime.getRuntime().availableProcessors() - 1, + Runtime.getRuntime().maxMemory() / (FileChunk.CHUNK_SIZE * 3) + )), + 10, TimeUnit.SECONDS, 3, new PrioThreadFactory("HashGen"), new ThreadPoolExecutor.CallerRunsPolicy()); private static final ThreadLocal<MessageDigest> SHA1_DIGESTER = new ThreadLocal<MessageDigest>() { diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java index e88ed876..a80a84f9 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java @@ -6,8 +6,8 @@ import java.security.NoSuchAlgorithmException; import java.sql.SQLException; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; @@ -41,8 +41,7 @@ import org.openslx.sat.thrift.version.Version; import org.openslx.thrifthelper.ThriftManager; import org.openslx.thrifthelper.ThriftManager.ErrorCallback; import org.openslx.util.AppUtil; -import org.openslx.util.GrowingThreadPoolExecutor; -import org.openslx.util.PrioThreadFactory; +import org.openslx.util.CascadedThreadPoolExecutor; import org.openslx.util.QuickTimer; import org.openslx.util.QuickTimer.Task; @@ -180,26 +179,34 @@ public class App { DeleteOldLectures.init(); DeleteOldUsers.init(); - // Shared executor for SSL thrift and HTTP thrift - ExecutorService es = new GrowingThreadPoolExecutor(3, 128, 2, TimeUnit.MINUTES, - new ArrayBlockingQueue<Runnable>(4), new PrioThreadFactory("Web/Thrift")); - // Start Thrift Server Thread t; + ThreadPoolExecutor tpe; // Plain - t = new Thread(new BinaryListener(9090, false, es), "Thr-Plain:9090"); + tpe = new CascadedThreadPoolExecutor(1, 8, 1, TimeUnit.MINUTES, + new SynchronousQueue<Runnable>(), // Persistent connections, don't queue before growing + "Thr-Plain"); + t = new Thread(new BinaryListener(9090, false, tpe), "Thr-Plain:9090"); t.setDaemon(true); t.start(); // SSL - t = new Thread(new BinaryListener(9091, true, es), "Thr-SSL:9091"); + tpe = new CascadedThreadPoolExecutor(2, 8, 1, TimeUnit.MINUTES, + new SynchronousQueue<Runnable>(), // Persistent connections, don't queue before growing + "Thr-SSL"); + t = new Thread(new BinaryListener(9091, true, tpe), "Thr-SSL:9091"); t.start(); // Start RPC httpd - t = new Thread(new WebServer(9080), "RPC-httpd:9080"); + tpe = new CascadedThreadPoolExecutor(2, 16, 1, TimeUnit.MINUTES, + new SynchronousQueue<Runnable>(), // Semi-persistent connections, better don't queue either + "RPC-http"); + t = new Thread(new WebServer(9080, tpe), "RPC-httpd:9080"); t.setDaemon(true); t.start(); // Start JSON-Thrift httpd - t = new Thread(new JsonHttpListener(9081, es), "Thr-httpd:9081"); + tpe = new CascadedThreadPoolExecutor(6, 12, 1, TimeUnit.MINUTES, 3, "Thr-http"); + tpe.allowCoreThreadTimeOut(true); + t = new Thread(new JsonHttpListener(9081, tpe), "Thr-httpd:9081"); t.setDaemon(true); t.start(); diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbLecture.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbLecture.java index abe30a98..5e418873 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbLecture.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbLecture.java @@ -515,10 +515,16 @@ public class DbLecture { // Handle user String userFields = ""; String userJoin = ""; + boolean isSuperUser = false; if (user != null) { - userFields = " b.candownloaddefault, b.caneditdefault, b.canadmindefault," - + " ip.candownload, ip.canedit, ip.canadmin,"; - userJoin = " LEFT JOIN imagepermission ip ON (b.imagebaseid = ip.imagebaseid AND ip.userid = :userid)"; + if (User.isSuperUser(user)) { + isSuperUser = true; + user = null; + } else { + userFields = " b.candownloaddefault, b.caneditdefault, b.canadmindefault, b.ownerid," + + " ip.candownload, ip.canedit, ip.canadmin,"; + userJoin = " LEFT JOIN imagepermission ip ON (b.imagebaseid = ip.imagebaseid AND ip.userid = :userid)"; + } } // Query try (MysqlConnection connection = Database.getConnection()) { @@ -557,11 +563,17 @@ public class DbLecture { int prio = 100; // Check permissions int allowEdit = 0; - if (user != null) { + if (isSuperUser) { + allowEdit = 3; + } else if (user != null) { boolean admin; boolean download; boolean edit; - if (rs.getString("canadmin") != null) { + if (user.userId.equals(rs.getString("ownerid"))) { + admin = true; + edit = true; + download = true; + } else if (rs.getString("canadmin") != null) { admin = rs.getBoolean("canadmin"); edit = rs.getBoolean("canedit"); download = rs.getBoolean("candownload"); diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java index a1fd82d7..7656232f 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java @@ -19,6 +19,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.openslx.bwlp.sat.database.mappers.DbImage; import org.openslx.bwlp.sat.database.models.LocalImageVersion; +import org.openslx.bwlp.sat.fileserv.cow.CowSessionManager; import org.openslx.bwlp.sat.util.Constants; import org.openslx.bwlp.sat.util.FileSystem; import org.openslx.bwlp.sat.util.Formatter; @@ -32,7 +33,7 @@ import org.openslx.filetransfer.Listener; import org.openslx.filetransfer.Uploader; import org.openslx.filetransfer.util.FileChunk; import org.openslx.thrifthelper.Comparators; -import org.openslx.util.GrowingThreadPoolExecutor; +import org.openslx.util.CascadedThreadPoolExecutor; import org.openslx.util.PrioThreadFactory; public class FileServer implements IncomingEvent { @@ -42,13 +43,13 @@ public class FileServer implements IncomingEvent { /** * Listener for incoming unencrypted connections */ - private final Listener plainListener = new Listener(this, null, 9092, Constants.TRANSFER_TIMEOUT); // TODO: Config + private final Listener plainListener = new Listener(this, null, 9092, Constants.TRANSFER_TIMEOUT); private final Listener sslListener; - private final ExecutorService transferPool = new GrowingThreadPoolExecutor(1, Constants.MAX_UPLOADS + private final ExecutorService transferPool = new CascadedThreadPoolExecutor(1, Constants.MAX_UPLOADS + Constants.MAX_DOWNLOADS, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), - new PrioThreadFactory("CTransf", Thread.NORM_PRIORITY - 2)); + new PrioThreadFactory("CTransf", Thread.NORM_PRIORITY - 2), null); /** * All currently running uploads, indexed by token @@ -297,7 +298,8 @@ public class FileServer implements IncomingEvent { } } this.activeDownloads = d; - this.activeUploads = u; + // Account for CoW sessions too + this.activeUploads = u + CowSessionManager.getActiveCount(); } } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java index 448bcb68..54a9655e 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java @@ -7,8 +7,8 @@ import java.sql.SQLException; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; @@ -32,7 +32,7 @@ import org.openslx.bwlp.thrift.iface.TNotFoundException; import org.openslx.bwlp.thrift.iface.TTransferRejectedException; import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.thrifthelper.ThriftManager; -import org.openslx.util.GrowingThreadPoolExecutor; +import org.openslx.util.CascadedThreadPoolExecutor; import org.openslx.util.PrioThreadFactory; import org.openslx.util.QuickTimer; import org.openslx.util.QuickTimer.Task; @@ -44,10 +44,10 @@ public class SyncTransferHandler { private static final Logger LOGGER = LogManager.getLogger(SyncTransferHandler.class); - private static final GrowingThreadPoolExecutor transferPool = new GrowingThreadPoolExecutor(1, + private static final CascadedThreadPoolExecutor transferPool = new CascadedThreadPoolExecutor(1, Constants.MAX_MASTER_UPLOADS + Constants.MAX_MASTER_DOWNLOADS, 1, TimeUnit.MINUTES, - new ArrayBlockingQueue<Runnable>(1), new PrioThreadFactory("MTransf", - Thread.NORM_PRIORITY - 3)); + new SynchronousQueue<Runnable>(), new PrioThreadFactory("MTransf", + Thread.NORM_PRIORITY - 3), null); /** * All currently running downloads from master, indexed by token, which is == versionId @@ -119,6 +119,7 @@ public class SyncTransferHandler { // static { + transferPool.allowCoreThreadTimeOut(true); QuickTimer.scheduleAtFixedDelay(heartBeatTask, 123, TimeUnit.SECONDS.toMillis(56)); } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/Cow.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/Cow.java new file mode 100644 index 00000000..9c4b6d81 --- /dev/null +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/Cow.java @@ -0,0 +1,27 @@ +package org.openslx.bwlp.sat.fileserv.cow; + +public class Cow { + + public static int BLOCK_SIZE = 4096; + + /** Number of bytes of a L2 entry bit field */ + public static int BITFIELD_SIZE = 40; + + public static int BITFIELD_BITS = BITFIELD_SIZE * 8; + + /** + * Number of entries in L2 table, that each will have a 320bit mask for all + * the blocks in the L2 entry + */ + public static int L2_TABLE_ENTRIES = 1024; + + /** 1280KiB. Size of a cluster's data in bytes */ + public static int L2_CLUSTER_DATA_SIZE = BITFIELD_BITS * BLOCK_SIZE; + + /** + * 1280MiB. Net capacity of a full L1 entry referring to data clusters, i.e. + * what is addressable by an L2 table (=L1 entry) + */ + public static int FULL_L2_TABLE_DATA_SIZE = L2_TABLE_ENTRIES * L2_CLUSTER_DATA_SIZE; + +} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowFinalizer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowFinalizer.java new file mode 100644 index 00000000..67f38634 --- /dev/null +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowFinalizer.java @@ -0,0 +1,207 @@ +package org.openslx.bwlp.sat.fileserv.cow; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.sql.SQLException; +import java.util.UUID; +import java.util.concurrent.Semaphore; + +import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.openslx.bwlp.sat.database.mappers.DbImage; +import org.openslx.bwlp.sat.permissions.User; +import org.openslx.bwlp.sat.thrift.cache.OperatingSystemList; +import org.openslx.bwlp.sat.util.Configuration; +import org.openslx.bwlp.sat.util.FileSystem; +import org.openslx.bwlp.thrift.iface.ImageBaseWrite; +import org.openslx.bwlp.thrift.iface.ImageDetailsRead; +import org.openslx.bwlp.thrift.iface.ImageVersionWrite; +import org.openslx.filetransfer.util.ChunkList; +import org.openslx.filetransfer.util.FileChunk; +import org.openslx.filetransfer.util.HashChecker; +import org.openslx.filetransfer.util.HashChecker.HashCheckCallback; +import org.openslx.filetransfer.util.HashChecker.HashResult; +import org.openslx.filetransfer.util.IncomingTransferBase; +import org.openslx.virtualization.configuration.VirtualizationConfiguration; +import org.openslx.virtualization.configuration.VirtualizationConfigurationException; + +/** + * After a successful CoW session, hash the new image and create the database + * entry. + */ +public class CowFinalizer { + + private static final Logger LOGGER = LogManager.getLogger(CowFinalizer.class); + + private ChunkList chunks; + + private boolean hashesDone; + + private final CowSessionData data; + + private String error; + + private int progressPercent; + + private CowSession cowSession; + + public CowFinalizer(CowSessionData data, CowSession cowSession) { + this.data = data; + this.cowSession = cowSession; + if (data.newFileSize() == -1) + throw new IllegalStateException("Initialized finalizer before new file size was set"); + worker.start(); + } + + private void setError(String msg) { + if (this.error != null) + return; + LOGGER.warn(msg); + this.error = msg; + if (cowSession != null) { + cowSession.finalizerFinished(); + cowSession = null; + } + } + + public int progressPercent() { + return this.progressPercent; + } + + public String getError() { + return this.error; + } + + private void calculateHashes() throws IOException { + HashChecker hc = IncomingTransferBase.getHashChecker(); + if (hc == null) { + LOGGER.info("No hash checker, skipping..."); + return; + } + final Semaphore pendingChunks; + try (RandomAccessFile file = new RandomAccessFile(data.temporaryImageFile, "r")) { + chunks = new ChunkList(data.temporaryImageFile.length(), null); + int chunkCount = chunks.getAll().size(); + pendingChunks = new Semaphore(0); + HashCheckCallback cb = new HashCheckCallback() { + @Override + public void hashCheckDone(HashResult result, byte[] data, FileChunk chunk) { + pendingChunks.release(); + progressPercent = (pendingChunks.availablePermits() * 100) / (chunkCount + 1); + } + }; + for (FileChunk chunk : chunks.getAll()) { + byte[] buffer = new byte[chunk.range.getLength()]; + file.readFully(buffer); + hc.queue(chunk, buffer, cb, HashChecker.BLOCKING | HashChecker.CALC_CRC32 | HashChecker.CALC_SHA1 + | HashChecker.NO_SLOW_WARN); + } + pendingChunks.acquire(chunkCount); + progressPercent = 99; + hashesDone = true; + } catch (InterruptedException e) { + LOGGER.info("Interrupted while hashing"); + Thread.currentThread().interrupt(); + return; + } + } + + private final Thread worker = new Thread("cow-work") { + @Override + public void run() { + // Sanity check: destination should be a sub directory of the vmStorePath + String relPath = FileSystem.getRelativePath(data.destinationFile, + Configuration.getVmStoreBasePath()); + if (relPath == null) { + setError(data.destinationFile.getAbsolutePath() + " is not a subdir of " + + Configuration.getVmStoreBasePath().getAbsolutePath()); + return; + } + // First, generate hashes as this might take a while + try { + calculateHashes(); + } catch (IOException e) { + LOGGER.warn("Cannot calculate hashes of new image, continuing without", e); + } + if (hashesDone) { + // Put into proper place + try { + FileUtils.writeByteArrayToFile(new File(data.destinationFile + ".crc"), + chunks.getDnbd3Crc32List()); + } catch (IllegalStateException | IOException e) { + LOGGER.warn("Could not write dnbd3 crc list", e); + } + } + // Now move file to final destination + try { + Files.move(data.temporaryImageFile.toPath(), data.destinationFile.toPath(), + StandardCopyOption.ATOMIC_MOVE); + } catch (IOException e1) { + LOGGER.warn("Cannot rename", e1); + if (!data.temporaryImageFile.renameTo(data.destinationFile)) { + setError("Renaming temporary file to final name failed"); + return; + } + } + // Add DB entries + String uuid = UUID.randomUUID().toString(); + try { + boolean copy = true; + if ("EDIT".equals(data.sessionType)) { + try { + User.canEditBaseImageOrFail(data.owner, data.imageBaseId); + copy = false; + } catch (Exception e) { + LOGGER.info("Permission check for EDIT failed, falling back to COPY"); + } + } + String baseId = data.imageBaseId; + if (copy) { + String imgName; + ImageBaseWrite ibw = new ImageBaseWrite(); + ImageDetailsRead id = null; + try { + id = DbImage.getImageDetails(null, data.imageBaseId); + imgName = id.imageName; + } catch (Exception e) { + imgName = "[UNKNOWN VM]"; + } + ibw.setImageName("Copy of " + imgName); + if (id != null) { + ibw.setDefaultPermissions(id.defaultPermissions); + ibw.setDescription(ibw.imageName); + ibw.setOsId(id.osId); + ibw.setVirtId(id.virtId); + } else { + try { + VirtualizationConfiguration conf = VirtualizationConfiguration.getInstance( + OperatingSystemList.get(), data.machineDescription, data.machineDescription.length); + ibw.setVirtId(conf.getVirtualizer().getId()); + } catch (IOException | VirtualizationConfigurationException e) { + LOGGER.error("Can neither get virtID from existing image, nor guess from description", e); + } + } + baseId = DbImage.createImage(data.owner, ibw.imageName); + DbImage.updateImageMetadata(null, baseId, ibw); + } + DbImage.createImageVersion(baseId, uuid, data.owner, data.newFileSize(), + relPath, new ImageVersionWrite(data.restricted), hashesDone ? chunks : null, + data.machineDescription); + } catch (SQLException e) { + setError("Cannot write new version to database"); + return; + } + LOGGER.info("Processing CoW session done."); + progressPercent = 100; + if (cowSession != null) { + cowSession.finalizerFinished(); + cowSession = null; + } + } + }; + +} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSession.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSession.java new file mode 100644 index 00000000..c40358e3 --- /dev/null +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSession.java @@ -0,0 +1,488 @@ +package org.openslx.bwlp.sat.fileserv.cow; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.openslx.bwlp.sat.permissions.User; +import org.openslx.bwlp.sat.util.Configuration; +import org.openslx.bwlp.sat.util.Formatter; +import org.openslx.bwlp.thrift.iface.TAuthorizationException; +import org.openslx.bwlp.thrift.iface.TInvocationException; +import org.openslx.bwlp.thrift.iface.TNotFoundException; +import org.openslx.bwlp.thrift.iface.UserInfo; +import org.openslx.util.Json; +import org.openslx.util.Util; + +public class CowSession { + + // DO NOT RENAME THESE! + // Used as string literal in JSON status, expected like this on client + static enum Status { + COPYING, // Still busy copying original file, new chunks are only accepted in already copied range + WAITING_FOR_UPLOAD_DONE, // Copying done, just waiting for new chunks until client signals that it's done + WAITING_FOR_COPYING_DONE, // Copying not done, but dnbd3-fuse already told us it's done, wait for copying + UPLOAD_DONE, // dnbd3-fuse signaled merge request, upload is thus done and no new chunks are accepted + PROCESSING, // Hashing, renaming, creating DB entry + ERROR, + COMPLETELY_DONE; + } + + private static final Logger LOGGER = LogManager.getLogger(CowSession.class); + + /** + * How long to keep the session around after it was completed or had an + * error. This is so the client can still query the status. + */ + private static final long FINISHED_TIMEOUT_MS = 300_000; + + /** + * After being inactive for a while, the session doesn't count as active, + * but isn't abandoned either. + */ + private static final long ACTIVE_TIMEOUT_MS = 900_000; + + /** No update for 12 hours, assume session died */ + private static int ABANDONED_TIMEOUT_MS = 12 * 3600_000; + + private static int COPY_BLOCK_SIZE = 1024 * 1024; // 1 MiB + + public long lastActivity = Util.tickCount(); + + private final long sourceFileSize; + private RandomAccessFile sourceFile; + private RandomAccessFile destinationFile; + private Thread copyThread; + private Status status = Status.COPYING; + + private final CowSessionData data; + + private long estimatedSpeedKbs = 0; + + /** + * Use for operations on destinationFile. + * Not needed for sourceFile for now, as it's only touched in initialization + * once, and then in the copy thread. + */ + private final Object destFileLock = new Object(); + + /** Catch-all lock for: status, copyThread */ + private final Object varLock = new Object(); + + /** + * Progress of copying the original image to a new file, which happens in + * the background + */ + private long fileCopyPosition = 0; + + /** Error message to supply to client */ + private String errorMessage; + + private CowFinalizer finalizer; + + public CowSession(String sourceFileName, byte[] machineDescription, String imageBaseId, String vmName, + boolean restricted, UserInfo owner, String sessionType) throws RuntimeException { + if ("EDIT".equals(sessionType)) { + try { + User.canEditBaseImageOrFail(owner, imageBaseId); + } catch (TInvocationException | TNotFoundException | TAuthorizationException e) { + LOGGER.warn(Formatter.userFullName(owner) + " requested EDIT CoW session, but no permission", e); + throw new RuntimeException("EDIT permission denied"); + } + } + RandomAccessFile srcFile; + long sfl = -1; + try { + srcFile = new RandomAccessFile(Configuration.getVmStoreBasePath() + "/" + sourceFileName, "r"); + } catch (IOException e) { + LOGGER.error("Cannot open source file for reading", e); + throw new RuntimeException("Cannot open source file for reading"); + } + try { + sfl = srcFile.length(); + } catch (IOException e) { + LOGGER.error("Cannot determine size of source file", e); + throw new RuntimeException("Cannot determine size of source file"); + } + // Create filenames + File tmpFile = null; + do { + tmpFile = Formatter.getTempImageName(); + } while (tmpFile.exists()); + tmpFile.getParentFile().mkdirs(); + String ext = sourceFileName.replaceFirst("^.*\\.", ""); + File finalFile = new File(tmpFile.getParent(), + Formatter.vmName(System.currentTimeMillis(), owner, vmName, + ext)).getAbsoluteFile(); + RandomAccessFile df = null; + try { + df = new RandomAccessFile(tmpFile, "rw"); + } catch (Exception e) { + Util.safeClose(srcFile); + LOGGER.error("Cannot open destination file for writing", e); + throw new RuntimeException("Cannot open destination file for writing"); + } + this.sourceFileSize = sfl; + this.sourceFile = srcFile; + this.destinationFile = df; + this.data = new CowSessionData(imageBaseId, restricted, tmpFile, finalFile, owner, + machineDescription, sessionType); + LOGGER.info("Started new " + sessionType + " CoW session for " + Formatter.userFullName(owner)); + startCopying(); + } + + /** + * Whether this session is considered abandoned. This uses a very long + * timeout to be safe, since returning true here means we can discard all + * data for this session. + */ + public boolean timedout() { + if (status == Status.COMPLETELY_DONE || status == Status.ERROR) + return Util.tickCount() > lastActivity + FINISHED_TIMEOUT_MS; + return Util.tickCount() > lastActivity + ABANDONED_TIMEOUT_MS; + } + + public boolean isActive() { + if (status == Status.COMPLETELY_DONE || status == Status.ERROR) + return false; + return Util.tickCount() < lastActivity + ACTIVE_TIMEOUT_MS; + } + + public String getError() { + return errorMessage; + } + + /** + * Get current status of transfer as JSON. Will be an array of objects with key 'title' + * and optional keys 'error' and 'percent'. + * @return + */ + public String getStatusJson() { + JsonStatus ret = new JsonStatus(); + if (status == Status.COPYING || status == Status.WAITING_FOR_COPYING_DONE) { + ret.tasks.add(new ProgressItem("Copying source file", (int) (fileCopyPosition * 100 / sourceFileSize), + errorMessage)); + } else { + ret.tasks.add(new ProgressItem("Copying source file", 100, null)); + } + if (finalizer != null) { + ret.tasks.add(new ProgressItem("Hashing modified image", finalizer.progressPercent(), + finalizer.getError())); + } + ret.state = this.status.name(); + return Json.serialize(ret); + } + + /** + * Set new status of this cow session, but take care that we can't change + * back e.g. from ERROR to WORKING. + */ + private void setStatus(Status s) { + if (this.status == Status.ERROR || this.status == Status.COMPLETELY_DONE) + return; + if (s != Status.ERROR) { + boolean illegal = false; + switch (status) { + case ERROR: + case COMPLETELY_DONE: + break; + case COPYING: + illegal = (s != Status.WAITING_FOR_UPLOAD_DONE && s != Status.WAITING_FOR_COPYING_DONE); + break; + case WAITING_FOR_UPLOAD_DONE: + illegal = (s != Status.UPLOAD_DONE); + break; + case WAITING_FOR_COPYING_DONE: + illegal = (s != Status.UPLOAD_DONE); + break; + case UPLOAD_DONE: + illegal = (s != Status.PROCESSING); + break; + case PROCESSING: + illegal = (s != Status.COMPLETELY_DONE); + break; + } + if (illegal) { + LOGGER.log(Level.ERROR, "Illegal state change: " + status + " -> " + s, + new RuntimeException()); + } + } + this.status = s; + } + + private boolean transition(Status from, Status to) { + synchronized (varLock) { + if (this.status != from) + return false; + setStatus(to); + } + return true; + } + + private void setError(String message, Exception e) { + LOGGER.log(Level.ERROR, message, e); + if (errorMessage == null) { + this.errorMessage = message; + setStatus(Status.ERROR); + } + } + + /** + * Abort an ongoing CoW session. Deletes the destination file, and causes + * the copy thread to exit, if it is still running. + */ + public void abort() { + synchronized (varLock) { + if (status == Status.ERROR || status == Status.COMPLETELY_DONE) + return; + setError("Session aborted by user", null); + } + synchronized (destFileLock) { + Util.safeClose(destinationFile); + destinationFile = null; + } + LOGGER.info("Deleting " + data.temporaryImageFile.getPath()); + try { + Files.delete(data.temporaryImageFile.toPath()); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Cannot delete CoW file for aborted session", e); + } + } + + /** + * dnbd3-fuse notifies us that the upload finished. + */ + public void uploadFinished(long newFileSize) throws IllegalTransitionException { + synchronized (varLock) { + if (!transition(Status.WAITING_FOR_UPLOAD_DONE, Status.UPLOAD_DONE) + && !transition(Status.COPYING, Status.WAITING_FOR_COPYING_DONE)) + throw new IllegalTransitionException("Cannot mark upload as finished, unexpected state " + status); + } + LOGGER.info("Upload of clusters for '" + data.destinationFile + "' finished, final size: " + newFileSize); + data.setFinalSize(newFileSize); + } + + /** + * Remote client called for finishing up the session, e.g. keep the modified + * image and add the according database entries. + */ + public void requestFinalization() throws IllegalTransitionException { + synchronized (varLock) { + if (!transition(Status.UPLOAD_DONE, Status.PROCESSING)) + throw new IllegalTransitionException("Cannot finish before upload is complete"); + Util.safeClose(destinationFile); + destinationFile = null; + LOGGER.info("Finalization of '" + data.destinationFile + "' requested"); + this.finalizer = new CowFinalizer(data, this); + lastActivity = Util.tickCount(); + } + } + + void finalizerFinished() { + if (finalizer == null) + return; + if (finalizer.getError() == null) { + setStatus(Status.COMPLETELY_DONE); + } else { + setStatus(Status.ERROR); + } + } + + /** + * Add data from a modified cluster to the image. + * + * @param clusterIndex absolute index of cluster in file, i.e. + * file_offset / cluster_size + * @param rawData raw submitted data - bitmask plus changed blocks in chunk + * @return 0 on success, -1 on error, or a desired throttle in seconds to + * forward to client + */ + public int addCluster(long clusterIndex, byte[] rawData) { + if (rawData.length < Cow.BITFIELD_SIZE) { + LOGGER.warn("Short cluster upload: Not enough payload for bitfield" + + " (" + rawData.length + " bytes)"); + return -1; + } + int readPos = Cow.BITFIELD_SIZE; + synchronized (destFileLock) { + if (destinationFile == null) { + setError("Destination file already closed", null); + return -1; + } + for (int byt = 0; byt < Cow.BITFIELD_SIZE; ++byt) { + int bytVal = rawData[byt] & 0xff; + for (int bit = 0; bit < 8; ++bit) { + if ((bytVal & (1 << bit)) == 0) + continue; + // Block changed + if (readPos + Cow.BLOCK_SIZE > rawData.length) { + LOGGER.warn("Not enough data payload for COW block (short of " + + (readPos + Cow.BLOCK_SIZE - rawData.length) + " bytes)"); + return -1; + } + int ret = writeBlock(clusterIndex * Cow.BITFIELD_BITS + byt * 8 + bit, rawData, readPos); + if (ret != 0) { + return ret; + } + readPos += Cow.BLOCK_SIZE; + } + } + } + lastActivity = Util.tickCount(); + return 0; + } + + /** + * Write a block (4k) out to the image as it changed. This does not acquire + * "lock", do so in the caller! + * + * @param blockIndex absolute index in image file + * @param readArrayStart start pointer in rawData for the 4k block in + * question + * @param rawData modified blocks, in order of bitmask + * @return 0 on success, -1 on error, or a desired throttle in seconds to + * forward to client + */ + private int writeBlock(long blockIndex, byte[] rawData, int readArrayStart) { + if (status == Status.ERROR) + return -1; + + long filePos = blockIndex * Cow.BLOCK_SIZE; + long missing = filePos + Cow.BLOCK_SIZE - fileCopyPosition; + if (status == Status.COPYING && missing > 0) { + int delay = 30; + if (estimatedSpeedKbs > 0) { + delay = (int) ((missing / 1000) / estimatedSpeedKbs); + } + if (delay < 1) { + delay = 1; + } + LOGGER.info("Throttling client - Missing: " + Util.formatBytes(missing) + ", Speed: " + + Util.formatBytes(estimatedSpeedKbs * 1000) + "/s, Wait: " + delay); + return delay; + } + try { + destinationFile.seek(filePos); + destinationFile.write(rawData, readArrayStart, Cow.BLOCK_SIZE); + } catch (IOException e) { + setError("Cannot write changed cluster to destination file, aborting CoW session", e); + Util.safeClose(destinationFile); + destinationFile = null; + return -1; + } + return 0; + } + + /** + * Copy file from source to dest in the background + */ + private void startCopying() { + Thread t = new Thread("CoW-Copy") { + @Override + public void run() { + long lastTime = Util.tickCount(); + long byteCount = 0; + long[] history = new long[10]; + int historySlot = 0; + try { + byte[] chunk = new byte[COPY_BLOCK_SIZE]; + sourceFile.seek(0); + while (fileCopyPosition < sourceFileSize) { + int num = sourceFile.read(chunk); + if (num == -1) { + setError("Unexpected end of file while copying CoW file" + + " (" + fileCopyPosition + "/" + sourceFileSize + ")", null); + return; + } + synchronized (destFileLock) { + // Might be null if we aborted, or an error occurred + if (destinationFile == null) + return; + destinationFile.seek(fileCopyPosition); + destinationFile.write(chunk, 0, num); + } + fileCopyPosition += num; + byteCount += num; + if (byteCount > COPY_BLOCK_SIZE * 5) { + long now = Util.tickCount(); + long delta = now - lastTime; + if (delta > 1000) { + history[historySlot++ % history.length] = byteCount / delta; + byteCount = 0; + lastTime = now; + updateSpeedEstimate(history); + } + } + } + synchronized (varLock) { + if (!transition(Status.WAITING_FOR_COPYING_DONE, Status.UPLOAD_DONE) + && !transition(Status.COPYING, Status.WAITING_FOR_UPLOAD_DONE)) { + LOGGER.warn("Unexpected state after copying finished: " + status); + } + } + } catch (IOException e) { + setError("Error copying original file to new destination", e); + } finally { + Util.safeClose(sourceFile); + sourceFile = null; + } + synchronized (varLock) { + copyThread = null; + } + } + + private void updateSpeedEstimate(long[] history) { + long sum = 0; + for (int i = 0; i < history.length; ++i) { + // Some slots might still be 0 in the beginning, so for the first 10+ seconds + // we underestimate. But that shouldn't matter since background upload on the + // client only starts 60 seconds in. + sum += history[i]; + } + // This is effectively KB/s, as we divided by milliseconds, not seconds + estimatedSpeedKbs = sum / history.length; + } + }; + synchronized (varLock) { + copyThread = t; + copyThread.setDaemon(true); + copyThread.start(); + } + } + + public static class IllegalTransitionException extends Exception { + + private static final long serialVersionUID = -6792998975872772519L; + + public IllegalTransitionException(String message) { + super(message); + } + } + + @SuppressWarnings("unused") + private static class ProgressItem { + public final String title; + public final int percent; + public final String error; + + public ProgressItem(String title, int percent, String error) { + this.title = title; + this.percent = percent; + this.error = error; + } + } + + private static class JsonStatus + { + @SuppressWarnings("unused") + String state; + List<ProgressItem> tasks = new ArrayList<>(); + } + +} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSessionData.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSessionData.java new file mode 100644 index 00000000..58a12e44 --- /dev/null +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSessionData.java @@ -0,0 +1,47 @@ +package org.openslx.bwlp.sat.fileserv.cow; + +import java.io.File; + +import org.openslx.bwlp.thrift.iface.UserInfo; + +public class CowSessionData { + + public final String imageBaseId; + + public final boolean restricted; + + public final File temporaryImageFile; + + public final File destinationFile; + + public final UserInfo owner; + + public final byte[] machineDescription; + + private long newFileSize = -1; + + public final String sessionType; + + public CowSessionData(String imageBaseId, boolean restricted, File tmpFile, File destFile, + UserInfo owner, byte[] machineDescription, String sessionType) { + this.imageBaseId = imageBaseId; + this.restricted = restricted; + this.temporaryImageFile = tmpFile; + this.destinationFile = destFile; + this.owner = owner; + this.machineDescription = machineDescription; + this.sessionType = sessionType; + + } + + public void setFinalSize(long newFileSize) { + if (this.newFileSize != -1) + throw new IllegalStateException("Cannot set new filesize twice!"); + this.newFileSize = newFileSize; + } + + public long newFileSize() { + return this.newFileSize; + } + +} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSessionManager.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSessionManager.java new file mode 100644 index 00000000..fddd6604 --- /dev/null +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSessionManager.java @@ -0,0 +1,105 @@ +package org.openslx.bwlp.sat.fileserv.cow; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.openslx.bwlp.sat.database.mappers.DbLecture.LaunchData; +import org.openslx.bwlp.sat.thrift.SessionManager; +import org.openslx.bwlp.sat.util.Constants; +import org.openslx.bwlp.thrift.iface.UserInfo; +import org.openslx.util.QuickTimer; +import org.openslx.util.QuickTimer.Task; + +public class CowSessionManager { + + private static final ReadWriteLock SESSION_LOCK = new ReentrantReadWriteLock(); + + private static final Map<String, CowSession> SESSIONS = new HashMap<>(); + + static { + QuickTimer.scheduleAtFixedDelay(new Task() { + @Override + public void fire() { + Lock lock = SESSION_LOCK.writeLock(); + lock.lock(); + try { + for (Iterator<CowSession> iterator = SESSIONS.values().iterator(); iterator.hasNext();) { + CowSession s = iterator.next(); + if (s.timedout()) { + s.abort(); + iterator.remove(); + } + } + } finally { + lock.unlock(); + } + } + }, 3600_577, 3601_010); + } + + public static CowSession get(String id) { + Lock lock = SESSION_LOCK.readLock(); + lock.lock(); + try { + return SESSIONS.get(id); + } finally { + lock.unlock(); + } + } + + private static boolean add(String id, CowSession session) { + Lock lock = SESSION_LOCK.writeLock(); + lock.lock(); + try { + if (SESSIONS.containsKey(id)) + return false; + SESSIONS.put(id, session); + return true; + } finally { + lock.unlock(); + } + } + + public static int getActiveCount() { + int cnt = 0; + Lock lock = SESSION_LOCK.readLock(); + lock.lock(); + try { + for (CowSession s : SESSIONS.values()) { + if (s.isActive()) { + cnt++; + } + } + } finally { + lock.unlock(); + } + return cnt; + } + + /** + * Create a new cow session for the given image. Returns an according session id. + * On error, an exception is thrown, this will never return null. + */ + public static String create(String cowid, LaunchData ld, String sessionType) throws RuntimeException { + UserInfo user = SessionManager.get(cowid); + if (user == null) + throw new RuntimeException("Unknown user"); + SessionManager.remove(cowid); + if (getActiveCount() > Constants.MAX_UPLOADS) + throw new RuntimeException("Too many active CoW sessions"); + // + String sessionId = UUID.randomUUID().toString(); + CowSession session = new CowSession(ld.imagePath, ld.configuration, ld.imageBaseId, ld.vmName, ld.restricted, user, sessionType); + if (session.getError() != null) { + throw new RuntimeException(session.getError()); + } + add(sessionId, session); + return sessionId; + } + +} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/BashVars.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/BashVars.java new file mode 100644 index 00000000..e3f3996b --- /dev/null +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/BashVars.java @@ -0,0 +1,46 @@ +package org.openslx.bwlp.sat.util; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +/** + * Class for generating a bash file declaring + * a list of variables, properly escaping. + */ +public class BashVars { + + private final Map<String, String> vars = new HashMap<>(); + + public void addVar(String variable, String value) { + vars.put(variable, value); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (Entry<String, String> kvp : vars.entrySet()) { + sb.append(kvp.getKey()); + sb.append('='); + sb.append(escape(kvp.getValue())); + sb.append('\n'); + } + return sb.toString(); + } + + /** + * Escape a string and put it in quotes. + */ + private String escape(String value) { + boolean hasAp = value.contains("'"); + if (hasAp && !value.contains("!!") && !value.contains("$")) { + if (value.contains("\"")) + return '"' + value.replace("\"", "\\\"") + '"'; + return '"' + value + '"'; + } + if (hasAp) + return "'" + value.replace("'", "'\\''") + "'"; + return "'" + value + "'"; + } + +} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebServer.java index eed9651c..fcb1ee8e 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebServer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebServer.java @@ -9,7 +9,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -35,9 +35,8 @@ import org.openslx.bwlp.thrift.iface.NetShare; import org.openslx.bwlp.thrift.iface.NetShareAuth; import org.openslx.bwlp.thrift.iface.TNotFoundException; import org.openslx.bwlp.thrift.iface.UserInfo; -import org.openslx.util.GrowingThreadPoolExecutor; +import org.openslx.util.CascadedThreadPoolExecutor; import org.openslx.util.Json; -import org.openslx.util.PrioThreadFactory; import org.openslx.util.TarArchiveUtil.TarArchiveWriter; import org.openslx.util.Util; import org.simpleframework.xml.Serializer; @@ -49,13 +48,13 @@ public class WebServer extends NanoHTTPD { private static final Logger LOGGER = LogManager.getLogger(WebServer.class); - private static final ThreadPoolExecutor tpe = new GrowingThreadPoolExecutor(1, 8, 1, TimeUnit.MINUTES, - new ArrayBlockingQueue<Runnable>(16), new PrioThreadFactory("Web")); + private static final ThreadPoolExecutor LECTURE_START_TPE = new CascadedThreadPoolExecutor(2, 8, + 1, TimeUnit.MINUTES, 1, "tgz-gen"); private static final Serializer serializer = new Persister(); - public WebServer(int port) throws IOException { - super(Configuration.getWebServerBindAddressLocal(), port, 16, 2); + public WebServer(int port, ExecutorService tpe) throws IOException { + super(Configuration.getWebServerBindAddressLocal(), port, tpe); super.maxRequestSize = 65535; } @@ -227,7 +226,6 @@ public class WebServer extends NanoHTTPD { } if (finalSize < 0 || finalSize > limit) return badRequest("Illegal final file size"); - LOGGER.info("Got upload finished for CoW session"); try { session.uploadFinished(finalSize); } catch (Exception e) { @@ -244,7 +242,6 @@ public class WebServer extends NanoHTTPD { CowSession session = CowSessionManager.get(sessionId); if (session == null) return notFound("Invalid session ID"); - LOGGER.info("User requested finalization for CoW session"); try { session.requestFinalization(); } catch (Exception e) { @@ -262,7 +259,6 @@ public class WebServer extends NanoHTTPD { CowSession session = CowSessionManager.get(sessionId); if (session == null) return notFound("Invalid session ID"); - LOGGER.info("User sent abort request for CoW session"); session.abort(); return new NanoHTTPD.Response(NanoHTTPD.Response.Status.OK, "text/plain; charset=utf-8", "OK"); } @@ -311,7 +307,7 @@ public class WebServer extends NanoHTTPD { return internalServerError("Cannot create COW session: " + e.getMessage()); } // Meta is required, everything else is optional - tpe.execute(new Runnable() { + LECTURE_START_TPE.execute(new Runnable() { @Override public void run() { try { |