summaryrefslogtreecommitdiffstats
path: root/dozentenmodulserver
diff options
context:
space:
mode:
authorSimon Rettberg2018-04-12 14:07:19 +0200
committerSimon Rettberg2018-04-12 14:07:19 +0200
commite2584eb682b68da480ebe0e2c678a4fb8b219884 (patch)
treefb34376527b62dad69cb0f881d3b6199dd636919 /dozentenmodulserver
parent[server] Fix comment (diff)
downloadtutor-module-e2584eb682b68da480ebe0e2c678a4fb8b219884.tar.gz
tutor-module-e2584eb682b68da480ebe0e2c678a4fb8b219884.tar.xz
tutor-module-e2584eb682b68da480ebe0e2c678a4fb8b219884.zip
[server] RPC/Maintenance job for image checking
Diffstat (limited to 'dozentenmodulserver')
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/ImageValidCheck.java244
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/Maintenance.java15
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebRpc.java6
3 files changed, 229 insertions, 36 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/ImageValidCheck.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/ImageValidCheck.java
index 0003e0dd..6e7cf319 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/ImageValidCheck.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/ImageValidCheck.java
@@ -1,68 +1,256 @@
package org.openslx.bwlp.sat.maintenance;
import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.security.NoSuchAlgorithmException;
import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.openslx.bwlp.sat.database.mappers.DbImage;
+import org.openslx.bwlp.sat.database.mappers.DbImageBlock;
+import org.openslx.bwlp.sat.database.models.ImageVersionMeta;
import org.openslx.bwlp.sat.database.models.LocalImageVersion;
import org.openslx.bwlp.sat.util.FileSystem;
import org.openslx.bwlp.thrift.iface.TNotFoundException;
+import org.openslx.filetransfer.util.ChunkStatus;
+import org.openslx.filetransfer.util.FileChunk;
+import org.openslx.filetransfer.util.HashChecker;
+import org.openslx.filetransfer.util.HashChecker.HashCheckCallback;
+import org.openslx.filetransfer.util.HashChecker.HashResult;
+import org.openslx.filetransfer.util.StandaloneFileChunk;
+import org.openslx.util.ThriftUtil;
+import org.openslx.util.TimeoutHashMap;
import org.openslx.util.Util;
public class ImageValidCheck implements Runnable {
+ public enum CheckResult {
+ NULL_POINTER_EXCEPTION,
+ ALREADY_IN_PROGRESS,
+ TOO_MANY_QUEUED_JOBS,
+ SUBMITTED,
+ REJECTED_BY_SCHEDULER,
+ WORKING,
+ DONE,
+ FILE_NOT_FOUND,
+ FILE_ACCESS_ERROR,
+ OTHER_ERROR,
+ }
+
private static final Logger LOGGER = Logger.getLogger(ImageValidCheck.class);
+
+ private static final int MAX_CONCURRENT_CHECKS = 1;
+
+ private static Queue<ImageValidCheck> queue = new LinkedList<>();
+ private static Map<String, ImageValidCheck> inProgress = new HashMap<>();
+ private static TimeoutHashMap<String, ImageValidCheck> done = new TimeoutHashMap<>(
+ TimeUnit.MINUTES.toMillis(60));
private final String versionId;
+ private final boolean integrity;
+
+ // TODO: Set appropriately in various places; make it possible to query from RPC
+ private CheckResult result = CheckResult.DONE;
+
+ // Hash checking
+
+ private static final HashChecker hashChecker;
+
+ static {
+ long maxMem = Runtime.getRuntime().maxMemory() / (1024 * 1024);
+ int hashQueueLen;
+ if (maxMem < 1200) {
+ hashQueueLen = 1;
+ } else {
+ hashQueueLen = 2;
+ }
+ HashChecker hc;
+ try {
+ hc = new HashChecker("SHA-1", hashQueueLen);
+ } catch (NoSuchAlgorithmException e) {
+ hc = null;
+ }
+ hashChecker = hc;
+ }
- public static void check(String versionId) {
+ // End hash checking
+
+ public static CheckResult check(String versionId, boolean integrity) {
if (versionId == null)
- return;
- Maintenance.trySubmit(new ImageValidCheck(versionId));
+ return CheckResult.NULL_POINTER_EXCEPTION;
+ synchronized (inProgress) {
+ if (inProgress.containsKey(versionId))
+ return CheckResult.ALREADY_IN_PROGRESS;
+ if (inProgress.size() >= MAX_CONCURRENT_CHECKS) {
+ if (queue.size() > 1000) {
+ return CheckResult.TOO_MANY_QUEUED_JOBS;
+ }
+ queue.add(new ImageValidCheck(versionId, integrity));
+ return CheckResult.SUBMITTED;
+ }
+ ImageValidCheck check = new ImageValidCheck(versionId, integrity);
+ if (Maintenance.trySubmit(check)) {
+ inProgress.put(versionId, check);
+ return CheckResult.SUBMITTED;
+ }
+ }
+ return CheckResult.REJECTED_BY_SCHEDULER;
}
- private ImageValidCheck(String versionId) {
+ public static void checkForWork() {
+ synchronized (inProgress) {
+ while (inProgress.size() < MAX_CONCURRENT_CHECKS && !queue.isEmpty()) {
+ ImageValidCheck check = queue.poll();
+ if (check == null)
+ break;
+ if (inProgress.containsKey(check.versionId))
+ continue; // Already checking this version, try next in queue
+ if (Maintenance.trySubmit(check)) {
+ inProgress.put(check.versionId, check);
+ } else {
+ if (!queue.offer(check)) {
+ LOGGER.warn("Dropped queued check for image version " + check.versionId);
+ }
+ // Scheduler didn't accept job - don't try remaining queue
+ break;
+ }
+ }
+ }
+ }
+
+ private ImageValidCheck(String versionId, boolean integrity) {
this.versionId = versionId;
+ this.integrity = integrity;
}
@Override
public void run() {
- if (!FileSystem.waitForStorage()) {
- LOGGER.warn("Will not check " + versionId + ": Storage not online");
- return;
- }
- LocalImageVersion imageVersion;
try {
- imageVersion = DbImage.getLocalImageData(versionId);
- } catch (SQLException e) {
- return;
- } catch (TNotFoundException e) {
- LOGGER.warn("Cannot check validity of image version - not found: " + versionId);
- return;
+ if (!FileSystem.waitForStorage()) {
+ LOGGER.warn("Will not check " + versionId + ": Storage not online");
+ return;
+ }
+ LocalImageVersion imageVersion;
+ try {
+ imageVersion = DbImage.getLocalImageData(versionId);
+ } catch (SQLException e) {
+ return;
+ } catch (TNotFoundException e) {
+ LOGGER.warn("Cannot check validity of image version - not found: " + versionId);
+ return;
+ }
+ boolean valid = checkValid(imageVersion);
+ if (valid && integrity) {
+ try {
+ valid = checkBlockHashes(imageVersion);
+ } catch (IOException e) {
+ result = CheckResult.FILE_ACCESS_ERROR;
+ valid = false;
+ } catch (Exception e) {
+ result = CheckResult.OTHER_ERROR;
+ }
+ }
+ if (imageVersion.isValid == valid)
+ return; // nothing changed
+ // Update
+ try {
+ DbImage.markValid(valid, false, imageVersion);
+ } catch (SQLException e) {
+ }
+ } finally {
+ synchronized (inProgress) {
+ inProgress.remove(this.versionId);
+ }
+ checkForWork();
}
- boolean valid = checkValid(imageVersion);
- // TODO: We could check the checksums too to be extra safe
- /*
+ }
+
+ /**
+ * Do a complete hash check of the given image file.
+ */
+ private boolean checkBlockHashes(final LocalImageVersion imageVersion) throws IOException,
+ InterruptedException {
ImageVersionMeta versionDetails;
try {
versionDetails = DbImage.getVersionDetails(versionId);
} catch (TNotFoundException e) {
- LOGGER.warn("Cannot check validity of image version - not found: " + versionId);
- return;
+ LOGGER.warn("Cannot check hash of image version - not found: " + versionId);
+ return false;
} catch (SQLException e) {
- return;
+ return false;
}
- */
- if (imageVersion.isValid == valid)
- return; // nothing changed
- // Update
- try {
- DbImage.markValid(valid, false, imageVersion);
- } catch (SQLException e) {
+ // TODO
+ if (versionDetails.sha1sums == null || versionDetails.sha1sums.isEmpty()) {
+ LOGGER.info("Image does not have block hashes -- assuming ok");
+ return true;
+ }
+ int numChecked = 0;
+ final Semaphore sem = new Semaphore(0);
+ final AtomicBoolean fileOk = new AtomicBoolean(true);
+ File path = FileSystem.composeAbsoluteImagePath(imageVersion);
+ try (RandomAccessFile raf = new RandomAccessFile(path, "r")) {
+ long startOffset = 0;
+ for (ByteBuffer hash : versionDetails.sha1sums) {
+ if (hash == null) {
+ startOffset += FileChunk.CHUNK_SIZE;
+ continue;
+ }
+ long endOffset = startOffset + FileChunk.CHUNK_SIZE;
+ if (endOffset > imageVersion.fileSize) {
+ endOffset = imageVersion.fileSize;
+ }
+ StandaloneFileChunk chunk = new StandaloneFileChunk(startOffset, endOffset,
+ ThriftUtil.unwrapByteBuffer(hash));
+ byte[] buffer = new byte[(int) (endOffset - startOffset)];
+ raf.seek(startOffset);
+ raf.readFully(buffer);
+ hashChecker.queue(chunk, buffer, new HashCheckCallback() {
+ @Override
+ public void hashCheckDone(HashResult result, byte[] data, FileChunk chunk) {
+ if (result == HashResult.FAILURE) {
+ // Hashing failed, cannot tell whether OK or not :(
+ } else {
+ if (result == HashResult.INVALID) {
+ fileOk.set(false);
+ ((StandaloneFileChunk) chunk).overrideStatus(ChunkStatus.MISSING);
+ } else {
+ // >:(
+ ((StandaloneFileChunk) chunk).overrideStatus(ChunkStatus.COMPLETE);
+ }
+ try {
+ // We don't know what the state was in DB before, so just fire updates
+ DbImageBlock.asyncUpdate(imageVersion.imageVersionId, chunk);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ sem.release();
+ }
+ }, true);
+ numChecked += 1;
+ startOffset += FileChunk.CHUNK_SIZE;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw e;
}
+ // Wait until the last callback fired
+ sem.acquire(numChecked);
+ return fileOk.get();
}
+ /**
+ * "Inexpensive" validity checks. File exists, readable, size ok, etc.
+ */
private boolean checkValid(LocalImageVersion imageVersion) {
if (imageVersion == null)
return false;
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/Maintenance.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/Maintenance.java
index 9e44ef9c..e459852b 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/Maintenance.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/Maintenance.java
@@ -1,5 +1,7 @@
package org.openslx.bwlp.sat.maintenance;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -9,17 +11,20 @@ public class Maintenance extends Thread {
private static final Logger LOGGER = Logger.getLogger(Maintenance.class);
- private static Maintenance worker = null;
+ private static Set<Maintenance> workers = new HashSet<>();
private static BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(50);
private Maintenance() {
+ super();
+ setPriority((Thread.MIN_PRIORITY + Thread.NORM_PRIORITY) / 2);
}
private synchronized static void ensureRunning() {
- if (worker == null || !worker.isAlive()) {
- worker = new Maintenance();
+ if (workers.isEmpty() || (queue.size() > 5 && workers.size() < 3)) {
+ Maintenance worker = new Maintenance();
worker.start();
+ workers.add(worker);
}
}
@@ -43,9 +48,7 @@ public class Maintenance extends Thread {
}
} catch (InterruptedException e) {
LOGGER.warn("Maintenance Thread was interrupted!", e);
- if (!queue.isEmpty()) {
- ensureRunning();
- }
+ Thread.currentThread().interrupt();
}
}
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebRpc.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebRpc.java
index 5fecf748..8018e86a 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebRpc.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebRpc.java
@@ -21,6 +21,7 @@ import org.openslx.bwlp.sat.mail.SmtpMailer;
import org.openslx.bwlp.sat.mail.SmtpMailer.EncryptionMode;
import org.openslx.bwlp.sat.maintenance.DeleteOldImages;
import org.openslx.bwlp.sat.maintenance.ImageValidCheck;
+import org.openslx.bwlp.sat.maintenance.ImageValidCheck.CheckResult;
import org.openslx.util.Util;
import fi.iki.elonen.NanoHTTPD;
@@ -51,11 +52,12 @@ public class WebRpc {
private static Response checkImage(Map<String, String> params) {
String versionId = params.get("versionid");
+ boolean checkHashes = Boolean.valueOf(params.get("hash"));
if (versionId == null)
return WebServer.badRequest("Missing versionid param");
- ImageValidCheck.check(versionId);
+ CheckResult res = ImageValidCheck.check(versionId, checkHashes);
return new NanoHTTPD.Response(NanoHTTPD.Response.Status.OK, "text/plain; charset=utf-8",
- "Image queued for checking");
+ res.name());
}
/**