diff options
author | Simon Rettberg | 2024-05-22 16:51:06 +0200 |
---|---|---|
committer | Simon Rettberg | 2024-05-22 16:51:06 +0200 |
commit | 18e9d5521c17b5d2f9876088950f10884855a21d (patch) | |
tree | 946b7308a6e11a254a51d15da60c936b5ca33fe4 /dozentenmodulserver | |
parent | [server] CoW: Fix calculating copy speed, don't return delay=0 (diff) | |
download | tutor-module-18e9d5521c17b5d2f9876088950f10884855a21d.tar.gz tutor-module-18e9d5521c17b5d2f9876088950f10884855a21d.tar.xz tutor-module-18e9d5521c17b5d2f9876088950f10884855a21d.zip |
[*] Switch to new CascadedThreadPool
Diffstat (limited to 'dozentenmodulserver')
6 files changed, 86 insertions, 34 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java index e88ed876..a80a84f9 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java @@ -6,8 +6,8 @@ import java.security.NoSuchAlgorithmException; import java.sql.SQLException; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; @@ -41,8 +41,7 @@ import org.openslx.sat.thrift.version.Version; import org.openslx.thrifthelper.ThriftManager; import org.openslx.thrifthelper.ThriftManager.ErrorCallback; import org.openslx.util.AppUtil; -import org.openslx.util.GrowingThreadPoolExecutor; -import org.openslx.util.PrioThreadFactory; +import org.openslx.util.CascadedThreadPoolExecutor; import org.openslx.util.QuickTimer; import org.openslx.util.QuickTimer.Task; @@ -180,26 +179,34 @@ public class App { DeleteOldLectures.init(); DeleteOldUsers.init(); - // Shared executor for SSL thrift and HTTP thrift - ExecutorService es = new GrowingThreadPoolExecutor(3, 128, 2, TimeUnit.MINUTES, - new ArrayBlockingQueue<Runnable>(4), new PrioThreadFactory("Web/Thrift")); - // Start Thrift Server Thread t; + ThreadPoolExecutor tpe; // Plain - t = new Thread(new BinaryListener(9090, false, es), "Thr-Plain:9090"); + tpe = new CascadedThreadPoolExecutor(1, 8, 1, TimeUnit.MINUTES, + new SynchronousQueue<Runnable>(), // Persistent connections, don't queue before growing + "Thr-Plain"); + t = new Thread(new BinaryListener(9090, false, tpe), "Thr-Plain:9090"); t.setDaemon(true); t.start(); // SSL - t = new Thread(new BinaryListener(9091, true, es), "Thr-SSL:9091"); + tpe = new CascadedThreadPoolExecutor(2, 8, 1, TimeUnit.MINUTES, + new SynchronousQueue<Runnable>(), // Persistent connections, don't queue before growing + "Thr-SSL"); + t = new Thread(new BinaryListener(9091, true, tpe), "Thr-SSL:9091"); t.start(); // Start RPC httpd - t = new Thread(new WebServer(9080), "RPC-httpd:9080"); + tpe = new CascadedThreadPoolExecutor(2, 16, 1, TimeUnit.MINUTES, + new SynchronousQueue<Runnable>(), // Semi-persistent connections, better don't queue either + "RPC-http"); + t = new Thread(new WebServer(9080, tpe), "RPC-httpd:9080"); t.setDaemon(true); t.start(); // Start JSON-Thrift httpd - t = new Thread(new JsonHttpListener(9081, es), "Thr-httpd:9081"); + tpe = new CascadedThreadPoolExecutor(6, 12, 1, TimeUnit.MINUTES, 3, "Thr-http"); + tpe.allowCoreThreadTimeOut(true); + t = new Thread(new JsonHttpListener(9081, tpe), "Thr-httpd:9081"); t.setDaemon(true); t.start(); diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java index a1fd82d7..7656232f 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java @@ -19,6 +19,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.openslx.bwlp.sat.database.mappers.DbImage; import org.openslx.bwlp.sat.database.models.LocalImageVersion; +import org.openslx.bwlp.sat.fileserv.cow.CowSessionManager; import org.openslx.bwlp.sat.util.Constants; import org.openslx.bwlp.sat.util.FileSystem; import org.openslx.bwlp.sat.util.Formatter; @@ -32,7 +33,7 @@ import org.openslx.filetransfer.Listener; import org.openslx.filetransfer.Uploader; import org.openslx.filetransfer.util.FileChunk; import org.openslx.thrifthelper.Comparators; -import org.openslx.util.GrowingThreadPoolExecutor; +import org.openslx.util.CascadedThreadPoolExecutor; import org.openslx.util.PrioThreadFactory; public class FileServer implements IncomingEvent { @@ -42,13 +43,13 @@ public class FileServer implements IncomingEvent { /** * Listener for incoming unencrypted connections */ - private final Listener plainListener = new Listener(this, null, 9092, Constants.TRANSFER_TIMEOUT); // TODO: Config + private final Listener plainListener = new Listener(this, null, 9092, Constants.TRANSFER_TIMEOUT); private final Listener sslListener; - private final ExecutorService transferPool = new GrowingThreadPoolExecutor(1, Constants.MAX_UPLOADS + private final ExecutorService transferPool = new CascadedThreadPoolExecutor(1, Constants.MAX_UPLOADS + Constants.MAX_DOWNLOADS, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), - new PrioThreadFactory("CTransf", Thread.NORM_PRIORITY - 2)); + new PrioThreadFactory("CTransf", Thread.NORM_PRIORITY - 2), null); /** * All currently running uploads, indexed by token @@ -297,7 +298,8 @@ public class FileServer implements IncomingEvent { } } this.activeDownloads = d; - this.activeUploads = u; + // Account for CoW sessions too + this.activeUploads = u + CowSessionManager.getActiveCount(); } } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java index 448bcb68..54a9655e 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java @@ -7,8 +7,8 @@ import java.sql.SQLException; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; @@ -32,7 +32,7 @@ import org.openslx.bwlp.thrift.iface.TNotFoundException; import org.openslx.bwlp.thrift.iface.TTransferRejectedException; import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.thrifthelper.ThriftManager; -import org.openslx.util.GrowingThreadPoolExecutor; +import org.openslx.util.CascadedThreadPoolExecutor; import org.openslx.util.PrioThreadFactory; import org.openslx.util.QuickTimer; import org.openslx.util.QuickTimer.Task; @@ -44,10 +44,10 @@ public class SyncTransferHandler { private static final Logger LOGGER = LogManager.getLogger(SyncTransferHandler.class); - private static final GrowingThreadPoolExecutor transferPool = new GrowingThreadPoolExecutor(1, + private static final CascadedThreadPoolExecutor transferPool = new CascadedThreadPoolExecutor(1, Constants.MAX_MASTER_UPLOADS + Constants.MAX_MASTER_DOWNLOADS, 1, TimeUnit.MINUTES, - new ArrayBlockingQueue<Runnable>(1), new PrioThreadFactory("MTransf", - Thread.NORM_PRIORITY - 3)); + new SynchronousQueue<Runnable>(), new PrioThreadFactory("MTransf", + Thread.NORM_PRIORITY - 3), null); /** * All currently running downloads from master, indexed by token, which is == versionId @@ -119,6 +119,7 @@ public class SyncTransferHandler { // static { + transferPool.allowCoreThreadTimeOut(true); QuickTimer.scheduleAtFixedDelay(heartBeatTask, 123, TimeUnit.SECONDS.toMillis(56)); } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSession.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSession.java index c24f4acb..c40358e3 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSession.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSession.java @@ -36,7 +36,20 @@ public class CowSession { private static final Logger LOGGER = LogManager.getLogger(CowSession.class); - private static int TIMEOUT_MS = 12 * 3600_000; // No update for 12 hours, assume session died + /** + * How long to keep the session around after it was completed or had an + * error. This is so the client can still query the status. + */ + private static final long FINISHED_TIMEOUT_MS = 300_000; + + /** + * After being inactive for a while, the session doesn't count as active, + * but isn't abandoned either. + */ + private static final long ACTIVE_TIMEOUT_MS = 900_000; + + /** No update for 12 hours, assume session died */ + private static int ABANDONED_TIMEOUT_MS = 12 * 3600_000; private static int COPY_BLOCK_SIZE = 1024 * 1024; // 1 MiB @@ -124,10 +137,21 @@ public class CowSession { startCopying(); } + /** + * Whether this session is considered abandoned. This uses a very long + * timeout to be safe, since returning true here means we can discard all + * data for this session. + */ public boolean timedout() { if (status == Status.COMPLETELY_DONE || status == Status.ERROR) - return Util.tickCount() > lastActivity + 300_000; - return Util.tickCount() > lastActivity + TIMEOUT_MS; + return Util.tickCount() > lastActivity + FINISHED_TIMEOUT_MS; + return Util.tickCount() > lastActivity + ABANDONED_TIMEOUT_MS; + } + + public boolean isActive() { + if (status == Status.COMPLETELY_DONE || status == Status.ERROR) + return false; + return Util.tickCount() < lastActivity + ACTIVE_TIMEOUT_MS; } public String getError() { @@ -359,7 +383,7 @@ public class CowSession { * Copy file from source to dest in the background */ private void startCopying() { - Thread t = new Thread() { + Thread t = new Thread("CoW-Copy") { @Override public void run() { long lastTime = Util.tickCount(); diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSessionManager.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSessionManager.java index 8413c04d..fddd6604 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSessionManager.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSessionManager.java @@ -10,6 +10,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.openslx.bwlp.sat.database.mappers.DbLecture.LaunchData; import org.openslx.bwlp.sat.thrift.SessionManager; +import org.openslx.bwlp.sat.util.Constants; import org.openslx.bwlp.thrift.iface.UserInfo; import org.openslx.util.QuickTimer; import org.openslx.util.QuickTimer.Task; @@ -63,6 +64,22 @@ public class CowSessionManager { lock.unlock(); } } + + public static int getActiveCount() { + int cnt = 0; + Lock lock = SESSION_LOCK.readLock(); + lock.lock(); + try { + for (CowSession s : SESSIONS.values()) { + if (s.isActive()) { + cnt++; + } + } + } finally { + lock.unlock(); + } + return cnt; + } /** * Create a new cow session for the given image. Returns an according session id. @@ -73,6 +90,8 @@ public class CowSessionManager { if (user == null) throw new RuntimeException("Unknown user"); SessionManager.remove(cowid); + if (getActiveCount() > Constants.MAX_UPLOADS) + throw new RuntimeException("Too many active CoW sessions"); // String sessionId = UUID.randomUUID().toString(); CowSession session = new CowSession(ld.imagePath, ld.configuration, ld.imageBaseId, ld.vmName, ld.restricted, user, sessionType); diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebServer.java index 29314db4..fcb1ee8e 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebServer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebServer.java @@ -9,7 +9,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -35,9 +35,8 @@ import org.openslx.bwlp.thrift.iface.NetShare; import org.openslx.bwlp.thrift.iface.NetShareAuth; import org.openslx.bwlp.thrift.iface.TNotFoundException; import org.openslx.bwlp.thrift.iface.UserInfo; -import org.openslx.util.GrowingThreadPoolExecutor; +import org.openslx.util.CascadedThreadPoolExecutor; import org.openslx.util.Json; -import org.openslx.util.PrioThreadFactory; import org.openslx.util.TarArchiveUtil.TarArchiveWriter; import org.openslx.util.Util; import org.simpleframework.xml.Serializer; @@ -49,13 +48,13 @@ public class WebServer extends NanoHTTPD { private static final Logger LOGGER = LogManager.getLogger(WebServer.class); - private static final ThreadPoolExecutor tpe = new GrowingThreadPoolExecutor(1, 8, 1, TimeUnit.MINUTES, - new ArrayBlockingQueue<Runnable>(16), new PrioThreadFactory("Web")); + private static final ThreadPoolExecutor LECTURE_START_TPE = new CascadedThreadPoolExecutor(2, 8, + 1, TimeUnit.MINUTES, 1, "tgz-gen"); private static final Serializer serializer = new Persister(); - public WebServer(int port) throws IOException { - super(Configuration.getWebServerBindAddressLocal(), port, 16, 2); + public WebServer(int port, ExecutorService tpe) throws IOException { + super(Configuration.getWebServerBindAddressLocal(), port, tpe); super.maxRequestSize = 65535; } @@ -308,7 +307,7 @@ public class WebServer extends NanoHTTPD { return internalServerError("Cannot create COW session: " + e.getMessage()); } // Meta is required, everything else is optional - tpe.execute(new Runnable() { + LECTURE_START_TPE.execute(new Runnable() { @Override public void run() { try { |