From e2584eb682b68da480ebe0e2c678a4fb8b219884 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Thu, 12 Apr 2018 14:07:19 +0200 Subject: [server] RPC/Maintenance job for image checking --- .../bwlp/sat/maintenance/ImageValidCheck.java | 244 ++++++++++++++++++--- .../openslx/bwlp/sat/maintenance/Maintenance.java | 15 +- .../main/java/org/openslx/bwlp/sat/web/WebRpc.java | 6 +- 3 files changed, 229 insertions(+), 36 deletions(-) (limited to 'dozentenmodulserver') diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/ImageValidCheck.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/ImageValidCheck.java index 0003e0dd..6e7cf319 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/ImageValidCheck.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/ImageValidCheck.java @@ -1,68 +1,256 @@ package org.openslx.bwlp.sat.maintenance; import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.security.NoSuchAlgorithmException; import java.sql.SQLException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; import org.openslx.bwlp.sat.database.mappers.DbImage; +import org.openslx.bwlp.sat.database.mappers.DbImageBlock; +import org.openslx.bwlp.sat.database.models.ImageVersionMeta; import org.openslx.bwlp.sat.database.models.LocalImageVersion; import org.openslx.bwlp.sat.util.FileSystem; import org.openslx.bwlp.thrift.iface.TNotFoundException; +import org.openslx.filetransfer.util.ChunkStatus; +import org.openslx.filetransfer.util.FileChunk; +import org.openslx.filetransfer.util.HashChecker; +import org.openslx.filetransfer.util.HashChecker.HashCheckCallback; +import org.openslx.filetransfer.util.HashChecker.HashResult; +import org.openslx.filetransfer.util.StandaloneFileChunk; +import org.openslx.util.ThriftUtil; +import org.openslx.util.TimeoutHashMap; import org.openslx.util.Util; public class ImageValidCheck implements Runnable { + public enum CheckResult { + NULL_POINTER_EXCEPTION, + ALREADY_IN_PROGRESS, + TOO_MANY_QUEUED_JOBS, + SUBMITTED, + REJECTED_BY_SCHEDULER, + WORKING, + DONE, + FILE_NOT_FOUND, + FILE_ACCESS_ERROR, + OTHER_ERROR, + } + private static final Logger LOGGER = Logger.getLogger(ImageValidCheck.class); + + private static final int MAX_CONCURRENT_CHECKS = 1; + + private static Queue queue = new LinkedList<>(); + private static Map inProgress = new HashMap<>(); + private static TimeoutHashMap done = new TimeoutHashMap<>( + TimeUnit.MINUTES.toMillis(60)); private final String versionId; + private final boolean integrity; + + // TODO: Set appropriately in various places; make it possible to query from RPC + private CheckResult result = CheckResult.DONE; + + // Hash checking + + private static final HashChecker hashChecker; + + static { + long maxMem = Runtime.getRuntime().maxMemory() / (1024 * 1024); + int hashQueueLen; + if (maxMem < 1200) { + hashQueueLen = 1; + } else { + hashQueueLen = 2; + } + HashChecker hc; + try { + hc = new HashChecker("SHA-1", hashQueueLen); + } catch (NoSuchAlgorithmException e) { + hc = null; + } + hashChecker = hc; + } - public static void check(String versionId) { + // End hash checking + + public static CheckResult check(String versionId, boolean integrity) { if (versionId == null) - return; - Maintenance.trySubmit(new ImageValidCheck(versionId)); + return CheckResult.NULL_POINTER_EXCEPTION; + synchronized (inProgress) { + if (inProgress.containsKey(versionId)) + return CheckResult.ALREADY_IN_PROGRESS; + if (inProgress.size() >= MAX_CONCURRENT_CHECKS) { + if (queue.size() > 1000) { + return CheckResult.TOO_MANY_QUEUED_JOBS; + } + queue.add(new ImageValidCheck(versionId, integrity)); + return CheckResult.SUBMITTED; + } + ImageValidCheck check = new ImageValidCheck(versionId, integrity); + if (Maintenance.trySubmit(check)) { + inProgress.put(versionId, check); + return CheckResult.SUBMITTED; + } + } + return CheckResult.REJECTED_BY_SCHEDULER; } - private ImageValidCheck(String versionId) { + public static void checkForWork() { + synchronized (inProgress) { + while (inProgress.size() < MAX_CONCURRENT_CHECKS && !queue.isEmpty()) { + ImageValidCheck check = queue.poll(); + if (check == null) + break; + if (inProgress.containsKey(check.versionId)) + continue; // Already checking this version, try next in queue + if (Maintenance.trySubmit(check)) { + inProgress.put(check.versionId, check); + } else { + if (!queue.offer(check)) { + LOGGER.warn("Dropped queued check for image version " + check.versionId); + } + // Scheduler didn't accept job - don't try remaining queue + break; + } + } + } + } + + private ImageValidCheck(String versionId, boolean integrity) { this.versionId = versionId; + this.integrity = integrity; } @Override public void run() { - if (!FileSystem.waitForStorage()) { - LOGGER.warn("Will not check " + versionId + ": Storage not online"); - return; - } - LocalImageVersion imageVersion; try { - imageVersion = DbImage.getLocalImageData(versionId); - } catch (SQLException e) { - return; - } catch (TNotFoundException e) { - LOGGER.warn("Cannot check validity of image version - not found: " + versionId); - return; + if (!FileSystem.waitForStorage()) { + LOGGER.warn("Will not check " + versionId + ": Storage not online"); + return; + } + LocalImageVersion imageVersion; + try { + imageVersion = DbImage.getLocalImageData(versionId); + } catch (SQLException e) { + return; + } catch (TNotFoundException e) { + LOGGER.warn("Cannot check validity of image version - not found: " + versionId); + return; + } + boolean valid = checkValid(imageVersion); + if (valid && integrity) { + try { + valid = checkBlockHashes(imageVersion); + } catch (IOException e) { + result = CheckResult.FILE_ACCESS_ERROR; + valid = false; + } catch (Exception e) { + result = CheckResult.OTHER_ERROR; + } + } + if (imageVersion.isValid == valid) + return; // nothing changed + // Update + try { + DbImage.markValid(valid, false, imageVersion); + } catch (SQLException e) { + } + } finally { + synchronized (inProgress) { + inProgress.remove(this.versionId); + } + checkForWork(); } - boolean valid = checkValid(imageVersion); - // TODO: We could check the checksums too to be extra safe - /* + } + + /** + * Do a complete hash check of the given image file. + */ + private boolean checkBlockHashes(final LocalImageVersion imageVersion) throws IOException, + InterruptedException { ImageVersionMeta versionDetails; try { versionDetails = DbImage.getVersionDetails(versionId); } catch (TNotFoundException e) { - LOGGER.warn("Cannot check validity of image version - not found: " + versionId); - return; + LOGGER.warn("Cannot check hash of image version - not found: " + versionId); + return false; } catch (SQLException e) { - return; + return false; } - */ - if (imageVersion.isValid == valid) - return; // nothing changed - // Update - try { - DbImage.markValid(valid, false, imageVersion); - } catch (SQLException e) { + // TODO + if (versionDetails.sha1sums == null || versionDetails.sha1sums.isEmpty()) { + LOGGER.info("Image does not have block hashes -- assuming ok"); + return true; + } + int numChecked = 0; + final Semaphore sem = new Semaphore(0); + final AtomicBoolean fileOk = new AtomicBoolean(true); + File path = FileSystem.composeAbsoluteImagePath(imageVersion); + try (RandomAccessFile raf = new RandomAccessFile(path, "r")) { + long startOffset = 0; + for (ByteBuffer hash : versionDetails.sha1sums) { + if (hash == null) { + startOffset += FileChunk.CHUNK_SIZE; + continue; + } + long endOffset = startOffset + FileChunk.CHUNK_SIZE; + if (endOffset > imageVersion.fileSize) { + endOffset = imageVersion.fileSize; + } + StandaloneFileChunk chunk = new StandaloneFileChunk(startOffset, endOffset, + ThriftUtil.unwrapByteBuffer(hash)); + byte[] buffer = new byte[(int) (endOffset - startOffset)]; + raf.seek(startOffset); + raf.readFully(buffer); + hashChecker.queue(chunk, buffer, new HashCheckCallback() { + @Override + public void hashCheckDone(HashResult result, byte[] data, FileChunk chunk) { + if (result == HashResult.FAILURE) { + // Hashing failed, cannot tell whether OK or not :( + } else { + if (result == HashResult.INVALID) { + fileOk.set(false); + ((StandaloneFileChunk) chunk).overrideStatus(ChunkStatus.MISSING); + } else { + // >:( + ((StandaloneFileChunk) chunk).overrideStatus(ChunkStatus.COMPLETE); + } + try { + // We don't know what the state was in DB before, so just fire updates + DbImageBlock.asyncUpdate(imageVersion.imageVersionId, chunk); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + sem.release(); + } + }, true); + numChecked += 1; + startOffset += FileChunk.CHUNK_SIZE; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw e; } + // Wait until the last callback fired + sem.acquire(numChecked); + return fileOk.get(); } + /** + * "Inexpensive" validity checks. File exists, readable, size ok, etc. + */ private boolean checkValid(LocalImageVersion imageVersion) { if (imageVersion == null) return false; diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/Maintenance.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/Maintenance.java index 9e44ef9c..e459852b 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/Maintenance.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/maintenance/Maintenance.java @@ -1,5 +1,7 @@ package org.openslx.bwlp.sat.maintenance; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -9,17 +11,20 @@ public class Maintenance extends Thread { private static final Logger LOGGER = Logger.getLogger(Maintenance.class); - private static Maintenance worker = null; + private static Set workers = new HashSet<>(); private static BlockingQueue queue = new LinkedBlockingQueue<>(50); private Maintenance() { + super(); + setPriority((Thread.MIN_PRIORITY + Thread.NORM_PRIORITY) / 2); } private synchronized static void ensureRunning() { - if (worker == null || !worker.isAlive()) { - worker = new Maintenance(); + if (workers.isEmpty() || (queue.size() > 5 && workers.size() < 3)) { + Maintenance worker = new Maintenance(); worker.start(); + workers.add(worker); } } @@ -43,9 +48,7 @@ public class Maintenance extends Thread { } } catch (InterruptedException e) { LOGGER.warn("Maintenance Thread was interrupted!", e); - if (!queue.isEmpty()) { - ensureRunning(); - } + Thread.currentThread().interrupt(); } } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebRpc.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebRpc.java index 5fecf748..8018e86a 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebRpc.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/web/WebRpc.java @@ -21,6 +21,7 @@ import org.openslx.bwlp.sat.mail.SmtpMailer; import org.openslx.bwlp.sat.mail.SmtpMailer.EncryptionMode; import org.openslx.bwlp.sat.maintenance.DeleteOldImages; import org.openslx.bwlp.sat.maintenance.ImageValidCheck; +import org.openslx.bwlp.sat.maintenance.ImageValidCheck.CheckResult; import org.openslx.util.Util; import fi.iki.elonen.NanoHTTPD; @@ -51,11 +52,12 @@ public class WebRpc { private static Response checkImage(Map params) { String versionId = params.get("versionid"); + boolean checkHashes = Boolean.valueOf(params.get("hash")); if (versionId == null) return WebServer.badRequest("Missing versionid param"); - ImageValidCheck.check(versionId); + CheckResult res = ImageValidCheck.check(versionId, checkHashes); return new NanoHTTPD.Response(NanoHTTPD.Response.Status.OK, "text/plain; charset=utf-8", - "Image queued for checking"); + res.name()); } /** -- cgit v1.2.3-55-g7522