summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2024-05-22 16:51:06 +0200
committerSimon Rettberg2024-05-22 16:51:06 +0200
commit18e9d5521c17b5d2f9876088950f10884855a21d (patch)
tree946b7308a6e11a254a51d15da60c936b5ca33fe4
parent[server] CoW: Fix calculating copy speed, don't return delay=0 (diff)
downloadtutor-module-18e9d5521c17b5d2f9876088950f10884855a21d.tar.gz
tutor-module-18e9d5521c17b5d2f9876088950f10884855a21d.tar.xz
tutor-module-18e9d5521c17b5d2f9876088950f10884855a21d.zip
[*] Switch to new CascadedThreadPool
-rw-r--r--dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java13
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java31
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java12
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java11
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSession.java32
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSessionManager.java19
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebServer.java15
7 files changed, 93 insertions, 40 deletions
diff --git a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
index 07efc449..479e634d 100644
--- a/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
+++ b/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/AsyncHashGenerator.java
@@ -9,7 +9,6 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -21,7 +20,7 @@ import org.openslx.bwlp.thrift.iface.TInvalidTokenException;
import org.openslx.dozmod.thrift.Session;
import org.openslx.filetransfer.util.FileChunk;
import org.openslx.thrifthelper.ThriftManager;
-import org.openslx.util.GrowingThreadPoolExecutor;
+import org.openslx.util.CascadedThreadPoolExecutor;
import org.openslx.util.PrioThreadFactory;
import org.openslx.util.Util;
@@ -29,10 +28,12 @@ public class AsyncHashGenerator extends Thread {
private static final Logger LOGGER = LogManager.getLogger(AsyncHashGenerator.class);
- private static final ThreadPoolExecutor HASH_WORK_POOL = new GrowingThreadPoolExecutor(1,
- Math.max(1, (int)Math.min(Runtime.getRuntime().availableProcessors() - 1,
- Runtime.getRuntime().maxMemory() / (FileChunk.CHUNK_SIZE * 3))),
- 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1),
+ private static final ThreadPoolExecutor HASH_WORK_POOL = new CascadedThreadPoolExecutor(1,
+ Math.max(1, (int)Math.min(
+ Runtime.getRuntime().availableProcessors() - 1,
+ Runtime.getRuntime().maxMemory() / (FileChunk.CHUNK_SIZE * 3)
+ )),
+ 10, TimeUnit.SECONDS, 3,
new PrioThreadFactory("HashGen"), new ThreadPoolExecutor.CallerRunsPolicy());
private static final ThreadLocal<MessageDigest> SHA1_DIGESTER = new ThreadLocal<MessageDigest>() {
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java
index e88ed876..a80a84f9 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java
@@ -6,8 +6,8 @@ import java.security.NoSuchAlgorithmException;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
@@ -41,8 +41,7 @@ import org.openslx.sat.thrift.version.Version;
import org.openslx.thrifthelper.ThriftManager;
import org.openslx.thrifthelper.ThriftManager.ErrorCallback;
import org.openslx.util.AppUtil;
-import org.openslx.util.GrowingThreadPoolExecutor;
-import org.openslx.util.PrioThreadFactory;
+import org.openslx.util.CascadedThreadPoolExecutor;
import org.openslx.util.QuickTimer;
import org.openslx.util.QuickTimer.Task;
@@ -180,26 +179,34 @@ public class App {
DeleteOldLectures.init();
DeleteOldUsers.init();
- // Shared executor for SSL thrift and HTTP thrift
- ExecutorService es = new GrowingThreadPoolExecutor(3, 128, 2, TimeUnit.MINUTES,
- new ArrayBlockingQueue<Runnable>(4), new PrioThreadFactory("Web/Thrift"));
-
// Start Thrift Server
Thread t;
+ ThreadPoolExecutor tpe;
// Plain
- t = new Thread(new BinaryListener(9090, false, es), "Thr-Plain:9090");
+ tpe = new CascadedThreadPoolExecutor(1, 8, 1, TimeUnit.MINUTES,
+ new SynchronousQueue<Runnable>(), // Persistent connections, don't queue before growing
+ "Thr-Plain");
+ t = new Thread(new BinaryListener(9090, false, tpe), "Thr-Plain:9090");
t.setDaemon(true);
t.start();
// SSL
- t = new Thread(new BinaryListener(9091, true, es), "Thr-SSL:9091");
+ tpe = new CascadedThreadPoolExecutor(2, 8, 1, TimeUnit.MINUTES,
+ new SynchronousQueue<Runnable>(), // Persistent connections, don't queue before growing
+ "Thr-SSL");
+ t = new Thread(new BinaryListener(9091, true, tpe), "Thr-SSL:9091");
t.start();
// Start RPC httpd
- t = new Thread(new WebServer(9080), "RPC-httpd:9080");
+ tpe = new CascadedThreadPoolExecutor(2, 16, 1, TimeUnit.MINUTES,
+ new SynchronousQueue<Runnable>(), // Semi-persistent connections, better don't queue either
+ "RPC-http");
+ t = new Thread(new WebServer(9080, tpe), "RPC-httpd:9080");
t.setDaemon(true);
t.start();
// Start JSON-Thrift httpd
- t = new Thread(new JsonHttpListener(9081, es), "Thr-httpd:9081");
+ tpe = new CascadedThreadPoolExecutor(6, 12, 1, TimeUnit.MINUTES, 3, "Thr-http");
+ tpe.allowCoreThreadTimeOut(true);
+ t = new Thread(new JsonHttpListener(9081, tpe), "Thr-httpd:9081");
t.setDaemon(true);
t.start();
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
index a1fd82d7..7656232f 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
@@ -19,6 +19,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.openslx.bwlp.sat.database.mappers.DbImage;
import org.openslx.bwlp.sat.database.models.LocalImageVersion;
+import org.openslx.bwlp.sat.fileserv.cow.CowSessionManager;
import org.openslx.bwlp.sat.util.Constants;
import org.openslx.bwlp.sat.util.FileSystem;
import org.openslx.bwlp.sat.util.Formatter;
@@ -32,7 +33,7 @@ import org.openslx.filetransfer.Listener;
import org.openslx.filetransfer.Uploader;
import org.openslx.filetransfer.util.FileChunk;
import org.openslx.thrifthelper.Comparators;
-import org.openslx.util.GrowingThreadPoolExecutor;
+import org.openslx.util.CascadedThreadPoolExecutor;
import org.openslx.util.PrioThreadFactory;
public class FileServer implements IncomingEvent {
@@ -42,13 +43,13 @@ public class FileServer implements IncomingEvent {
/**
* Listener for incoming unencrypted connections
*/
- private final Listener plainListener = new Listener(this, null, 9092, Constants.TRANSFER_TIMEOUT); // TODO: Config
+ private final Listener plainListener = new Listener(this, null, 9092, Constants.TRANSFER_TIMEOUT);
private final Listener sslListener;
- private final ExecutorService transferPool = new GrowingThreadPoolExecutor(1, Constants.MAX_UPLOADS
+ private final ExecutorService transferPool = new CascadedThreadPoolExecutor(1, Constants.MAX_UPLOADS
+ Constants.MAX_DOWNLOADS, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(),
- new PrioThreadFactory("CTransf", Thread.NORM_PRIORITY - 2));
+ new PrioThreadFactory("CTransf", Thread.NORM_PRIORITY - 2), null);
/**
* All currently running uploads, indexed by token
@@ -297,7 +298,8 @@ public class FileServer implements IncomingEvent {
}
}
this.activeDownloads = d;
- this.activeUploads = u;
+ // Account for CoW sessions too
+ this.activeUploads = u + CowSessionManager.getActiveCount();
}
}
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
index 448bcb68..54a9655e 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
@@ -7,8 +7,8 @@ import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
@@ -32,7 +32,7 @@ import org.openslx.bwlp.thrift.iface.TNotFoundException;
import org.openslx.bwlp.thrift.iface.TTransferRejectedException;
import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.thrifthelper.ThriftManager;
-import org.openslx.util.GrowingThreadPoolExecutor;
+import org.openslx.util.CascadedThreadPoolExecutor;
import org.openslx.util.PrioThreadFactory;
import org.openslx.util.QuickTimer;
import org.openslx.util.QuickTimer.Task;
@@ -44,10 +44,10 @@ public class SyncTransferHandler {
private static final Logger LOGGER = LogManager.getLogger(SyncTransferHandler.class);
- private static final GrowingThreadPoolExecutor transferPool = new GrowingThreadPoolExecutor(1,
+ private static final CascadedThreadPoolExecutor transferPool = new CascadedThreadPoolExecutor(1,
Constants.MAX_MASTER_UPLOADS + Constants.MAX_MASTER_DOWNLOADS, 1, TimeUnit.MINUTES,
- new ArrayBlockingQueue<Runnable>(1), new PrioThreadFactory("MTransf",
- Thread.NORM_PRIORITY - 3));
+ new SynchronousQueue<Runnable>(), new PrioThreadFactory("MTransf",
+ Thread.NORM_PRIORITY - 3), null);
/**
* All currently running downloads from master, indexed by token, which is == versionId
@@ -119,6 +119,7 @@ public class SyncTransferHandler {
//
static {
+ transferPool.allowCoreThreadTimeOut(true);
QuickTimer.scheduleAtFixedDelay(heartBeatTask, 123, TimeUnit.SECONDS.toMillis(56));
}
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSession.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSession.java
index c24f4acb..c40358e3 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSession.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSession.java
@@ -36,7 +36,20 @@ public class CowSession {
private static final Logger LOGGER = LogManager.getLogger(CowSession.class);
- private static int TIMEOUT_MS = 12 * 3600_000; // No update for 12 hours, assume session died
+ /**
+ * How long to keep the session around after it was completed or had an
+ * error. This is so the client can still query the status.
+ */
+ private static final long FINISHED_TIMEOUT_MS = 300_000;
+
+ /**
+ * After being inactive for a while, the session doesn't count as active,
+ * but isn't abandoned either.
+ */
+ private static final long ACTIVE_TIMEOUT_MS = 900_000;
+
+ /** No update for 12 hours, assume session died */
+ private static int ABANDONED_TIMEOUT_MS = 12 * 3600_000;
private static int COPY_BLOCK_SIZE = 1024 * 1024; // 1 MiB
@@ -124,10 +137,21 @@ public class CowSession {
startCopying();
}
+ /**
+ * Whether this session is considered abandoned. This uses a very long
+ * timeout to be safe, since returning true here means we can discard all
+ * data for this session.
+ */
public boolean timedout() {
if (status == Status.COMPLETELY_DONE || status == Status.ERROR)
- return Util.tickCount() > lastActivity + 300_000;
- return Util.tickCount() > lastActivity + TIMEOUT_MS;
+ return Util.tickCount() > lastActivity + FINISHED_TIMEOUT_MS;
+ return Util.tickCount() > lastActivity + ABANDONED_TIMEOUT_MS;
+ }
+
+ public boolean isActive() {
+ if (status == Status.COMPLETELY_DONE || status == Status.ERROR)
+ return false;
+ return Util.tickCount() < lastActivity + ACTIVE_TIMEOUT_MS;
}
public String getError() {
@@ -359,7 +383,7 @@ public class CowSession {
* Copy file from source to dest in the background
*/
private void startCopying() {
- Thread t = new Thread() {
+ Thread t = new Thread("CoW-Copy") {
@Override
public void run() {
long lastTime = Util.tickCount();
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSessionManager.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSessionManager.java
index 8413c04d..fddd6604 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSessionManager.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/cow/CowSessionManager.java
@@ -10,6 +10,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.openslx.bwlp.sat.database.mappers.DbLecture.LaunchData;
import org.openslx.bwlp.sat.thrift.SessionManager;
+import org.openslx.bwlp.sat.util.Constants;
import org.openslx.bwlp.thrift.iface.UserInfo;
import org.openslx.util.QuickTimer;
import org.openslx.util.QuickTimer.Task;
@@ -63,6 +64,22 @@ public class CowSessionManager {
lock.unlock();
}
}
+
+ public static int getActiveCount() {
+ int cnt = 0;
+ Lock lock = SESSION_LOCK.readLock();
+ lock.lock();
+ try {
+ for (CowSession s : SESSIONS.values()) {
+ if (s.isActive()) {
+ cnt++;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ return cnt;
+ }
/**
* Create a new cow session for the given image. Returns an according session id.
@@ -73,6 +90,8 @@ public class CowSessionManager {
if (user == null)
throw new RuntimeException("Unknown user");
SessionManager.remove(cowid);
+ if (getActiveCount() > Constants.MAX_UPLOADS)
+ throw new RuntimeException("Too many active CoW sessions");
//
String sessionId = UUID.randomUUID().toString();
CowSession session = new CowSession(ld.imagePath, ld.configuration, ld.imageBaseId, ld.vmName, ld.restricted, user, sessionType);
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebServer.java
index 29314db4..fcb1ee8e 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebServer.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebServer.java
@@ -9,7 +9,7 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -35,9 +35,8 @@ import org.openslx.bwlp.thrift.iface.NetShare;
import org.openslx.bwlp.thrift.iface.NetShareAuth;
import org.openslx.bwlp.thrift.iface.TNotFoundException;
import org.openslx.bwlp.thrift.iface.UserInfo;
-import org.openslx.util.GrowingThreadPoolExecutor;
+import org.openslx.util.CascadedThreadPoolExecutor;
import org.openslx.util.Json;
-import org.openslx.util.PrioThreadFactory;
import org.openslx.util.TarArchiveUtil.TarArchiveWriter;
import org.openslx.util.Util;
import org.simpleframework.xml.Serializer;
@@ -49,13 +48,13 @@ public class WebServer extends NanoHTTPD {
private static final Logger LOGGER = LogManager.getLogger(WebServer.class);
- private static final ThreadPoolExecutor tpe = new GrowingThreadPoolExecutor(1, 8, 1, TimeUnit.MINUTES,
- new ArrayBlockingQueue<Runnable>(16), new PrioThreadFactory("Web"));
+ private static final ThreadPoolExecutor LECTURE_START_TPE = new CascadedThreadPoolExecutor(2, 8,
+ 1, TimeUnit.MINUTES, 1, "tgz-gen");
private static final Serializer serializer = new Persister();
- public WebServer(int port) throws IOException {
- super(Configuration.getWebServerBindAddressLocal(), port, 16, 2);
+ public WebServer(int port, ExecutorService tpe) throws IOException {
+ super(Configuration.getWebServerBindAddressLocal(), port, tpe);
super.maxRequestSize = 65535;
}
@@ -308,7 +307,7 @@ public class WebServer extends NanoHTTPD {
return internalServerError("Cannot create COW session: " + e.getMessage());
}
// Meta is required, everything else is optional
- tpe.execute(new Runnable() {
+ LECTURE_START_TPE.execute(new Runnable() {
@Override
public void run() {
try {