diff options
author | Simon Rettberg | 2016-04-13 18:39:26 +0200 |
---|---|---|
committer | Simon Rettberg | 2016-04-13 18:39:26 +0200 |
commit | 5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622 (patch) | |
tree | 5bdc5411cd9954577e5489d5e4271c800d826e9c | |
parent | [client] fix bad commit (diff) | |
download | tutor-module-5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622.tar.gz tutor-module-5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622.tar.xz tutor-module-5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622.zip |
(WiP) Global image sync
14 files changed, 323 insertions, 520 deletions
diff --git a/dozentenmodulserver/src/main/java/fi/iki/elonen/NanoHTTPD.java b/dozentenmodulserver/src/main/java/fi/iki/elonen/NanoHTTPD.java index e4d265ae..f6312985 100644 --- a/dozentenmodulserver/src/main/java/fi/iki/elonen/NanoHTTPD.java +++ b/dozentenmodulserver/src/main/java/fi/iki/elonen/NanoHTTPD.java @@ -69,7 +69,7 @@ import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.log4j.Logger; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; -import org.openslx.bwlp.sat.util.GrowingThreadPoolExecutor; +import org.openslx.util.GrowingThreadPoolExecutor; /** * A simple, tiny, nicely embeddable HTTP server in Java @@ -458,7 +458,7 @@ public abstract class NanoHTTPD implements Runnable { */ public static class DefaultAsyncRunner implements AsyncRunner { private ExecutorService pool = new GrowingThreadPoolExecutor(2, 16, 1, TimeUnit.MINUTES, - new ArrayBlockingQueue<Runnable>(16)); + new ArrayBlockingQueue<Runnable>(4)); @Override public void exec(Runnable code) { diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java index 8e615ef1..ed8cbb7e 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/App.java @@ -60,7 +60,7 @@ public class App { LOGGER.fatal("Could not load configuration", e1); System.exit(1); } - + // Update database schema if applicable try { Updater.updateDatabase(); @@ -68,7 +68,7 @@ public class App { LOGGER.fatal("Updating/checking the database layout failed."); return; } - + if (Identity.loadCertificate() == null) { LOGGER.error("Could not set up TLS/SSL requirements, exiting"); System.exit(1); @@ -100,8 +100,7 @@ public class App { SSLContext ctx = null; if (Configuration.getMasterServerSsl()) { - ctx = SSLContext.getInstance("TLSv1.2"); - ctx.init(null, null, null); + ctx = Configuration.getMasterServerSslContext(); } ThriftManager.setMasterServerAddress(ctx, Configuration.getMasterServerAddress(), Configuration.getMasterServerPort(), 30000); @@ -114,12 +113,12 @@ public class App { } if (OrganizationList.get() == null || OrganizationList.get().isEmpty()) { LOGGER.fatal("Could not get initial organization list from master server." - + " Please make sure this server can connect to the internet."); + + " Please make sure this server can connect to the internet."); return; } if (OperatingSystemList.get() == null || OperatingSystemList.get().isEmpty()) { LOGGER.fatal("Could not get initial operating system list from master server." - + " Please make sure this server can connect to the internet."); + + " Please make sure this server can connect to the internet."); return; } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java index 55fa02b2..8aafe862 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java @@ -891,15 +891,35 @@ public class DbImage { } } + public static List<ByteBuffer> getBlockHashes(String imageVersionId) throws SQLException { + try (MysqlConnection connection = Database.getConnection()) { + return getBlockHashes(connection, imageVersionId); + } catch (SQLException e) { + LOGGER.error("Query failed in DbImage.getBlockHashes()", e); + throw e; + } + } + private static List<ByteBuffer> getBlockHashes(MysqlConnection connection, String imageVersionId) throws SQLException { - MysqlStatement stmt = connection.prepareStatement("SELECT blocksha1 FROM imageblock" + MysqlStatement stmt = connection.prepareStatement("SELECT startbyte, blocksha1 FROM imageblock" + " WHERE imageversionid = :imageversionid ORDER BY startbyte ASC"); stmt.setString("imageversionid", imageVersionId); ResultSet rs = stmt.executeQuery(); List<ByteBuffer> list = new ArrayList<>(); + long expectedOffset = 0; while (rs.next()) { - list.add(ByteBuffer.wrap(rs.getBytes("blocksha1"))); + long currentOffset = rs.getLong("startbyte"); + if (currentOffset < expectedOffset) + continue; + while (currentOffset > expectedOffset) { + list.add(null); + expectedOffset += FileChunk.CHUNK_SIZE; + } + if (currentOffset == expectedOffset) { + list.add(ByteBuffer.wrap(rs.getBytes("blocksha1"))); + expectedOffset += FileChunk.CHUNK_SIZE; + } } return list; } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbUser.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbUser.java index 87a05d63..81e4721d 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbUser.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbUser.java @@ -67,6 +67,24 @@ public class DbUser { } } + public static UserInfo getOrNull(String userId) throws SQLException { + try (MysqlConnection connection = Database.getConnection()) { + MysqlStatement stmt = connection.prepareStatement("SELECT userid, firstname, lastname, email, organizationid," + + " lastlogin, canlogin, issuperuser, emailnotifications" + + " FROM user WHERE userid = :userid"); + stmt.setString("userid", userId); + ResultSet rs = stmt.executeQuery(); + if (rs.next()) { + return new UserInfo(rs.getString("userid"), rs.getString("firstname"), + rs.getString("lastname"), rs.getString("email"), rs.getString("organizationid")); + } + return null; + } catch (SQLException e) { + LOGGER.error("Query failed in DbUser.getAll()", e); + throw e; + } + } + /** * Get local-only information for given user. * diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java index 844ffb74..a4e8de3c 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java @@ -8,9 +8,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; @@ -21,7 +21,6 @@ import org.openslx.bwlp.sat.database.models.LocalImageVersion; import org.openslx.bwlp.sat.util.Constants; import org.openslx.bwlp.sat.util.FileSystem; import org.openslx.bwlp.sat.util.Formatter; -import org.openslx.bwlp.sat.util.GrowingThreadPoolExecutor; import org.openslx.bwlp.sat.util.Identity; import org.openslx.bwlp.sat.util.PrioThreadFactory; import org.openslx.bwlp.thrift.iface.ImageDetailsRead; @@ -32,6 +31,7 @@ import org.openslx.filetransfer.IncomingEvent; import org.openslx.filetransfer.Listener; import org.openslx.filetransfer.Uploader; import org.openslx.thrifthelper.Comparators; +import org.openslx.util.GrowingThreadPoolExecutor; public class FileServer implements IncomingEvent { @@ -45,7 +45,7 @@ public class FileServer implements IncomingEvent { private final Listener sslListener; private final ExecutorService transferPool = new GrowingThreadPoolExecutor(1, Constants.MAX_UPLOADS - + Constants.MAX_DOWNLOADS, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1), + + Constants.MAX_DOWNLOADS, 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new PrioThreadFactory("ClientTransferPool", Thread.NORM_PRIORITY - 2)); /** diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java index 92f533e8..4ce8cc0a 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java @@ -3,11 +3,9 @@ package org.openslx.bwlp.sat.fileserv; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.RandomAccessFile; -import java.security.NoSuchAlgorithmException; import java.sql.SQLException; -import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; @@ -15,12 +13,10 @@ import javax.net.ssl.SSLContext; import org.apache.log4j.Logger; import org.openslx.bwlp.sat.database.mappers.DbImage; -import org.openslx.bwlp.sat.thrift.ThriftUtil; import org.openslx.bwlp.sat.util.Configuration; import org.openslx.bwlp.sat.util.Constants; import org.openslx.bwlp.sat.util.FileSystem; import org.openslx.bwlp.sat.util.Formatter; -import org.openslx.bwlp.sat.util.Util; import org.openslx.bwlp.thrift.iface.ImageDetailsRead; import org.openslx.bwlp.thrift.iface.ImagePublishData; import org.openslx.bwlp.thrift.iface.ImageVersionWrite; @@ -28,43 +24,20 @@ import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.bwlp.thrift.iface.TransferState; import org.openslx.bwlp.thrift.iface.TransferStatus; import org.openslx.bwlp.thrift.iface.UserInfo; -import org.openslx.filetransfer.DataReceivedCallback; import org.openslx.filetransfer.Downloader; -import org.openslx.filetransfer.FileRange; -import org.openslx.filetransfer.WantRangeCallback; -import org.openslx.filetransfer.util.ChunkList; import org.openslx.filetransfer.util.FileChunk; -import org.openslx.filetransfer.util.HashChecker; -import org.openslx.filetransfer.util.HashChecker.HashCheckCallback; -import org.openslx.filetransfer.util.HashChecker.HashResult; +import org.openslx.filetransfer.util.IncomingTransferBase; +import org.openslx.util.ThriftUtil; import org.openslx.util.vm.DiskImage; import org.openslx.util.vm.DiskImage.UnknownImageFormatException; -public class IncomingDataTransfer extends AbstractTransfer implements HashCheckCallback { +public class IncomingDataTransfer extends IncomingTransferBase { private static final Logger LOGGER = Logger.getLogger(IncomingDataTransfer.class); private static final long MIN_FREE_SPACE_BYTES = FileChunk.CHUNK_SIZE * (2 + Constants.MAX_UPLOADS); /** - * Self reference for inner classes. - */ - private final IncomingDataTransfer activeUpload = this; - - /** - * Remote peer is uploading, so on our end, we have Downloaders - */ - private List<Downloader> downloads = new ArrayList<>(); - - private final File tmpFileName; - - private final RandomAccessFile tmpFileHandle; - - private final ChunkList chunks; - - private final long fileSize; - - /** * User owning this uploaded file. */ private final UserInfo owner; @@ -80,11 +53,6 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC private ImageVersionWrite versionSettings = null; /** - * TransferState of this upload - */ - private TransferState state = TransferState.IDLE; - - /** * Description of this VM - binary dump of e.g. the *.vmx file (VMware) */ private final byte[] machineDescription; @@ -96,44 +64,24 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC private final AtomicBoolean versionWrittenToDb = new AtomicBoolean(); /** - * Whether file is (still) writable. Used for the file transfer callbacks. - */ - private boolean fileWritable = true; - - /** * Set if this is a download from the master server */ private final TransferInformation masterTransferInfo; - private static final HashChecker hashChecker; - - static { - HashChecker hc; - try { - hc = new HashChecker("SHA-1", Constants.HASHCHECK_QUEUE_LEN); - } catch (NoSuchAlgorithmException e) { - hc = null; - } - hashChecker = hc; - } - public IncomingDataTransfer(String uploadId, UserInfo owner, ImageDetailsRead image, File destinationFile, long fileSize, List<byte[]> sha1Sums, byte[] machineDescription) throws FileNotFoundException { - super(uploadId); - this.tmpFileName = destinationFile; - this.tmpFileHandle = new RandomAccessFile(destinationFile, "rw"); - this.chunks = new ChunkList(fileSize, hashChecker == null ? null : sha1Sums); + super(uploadId, destinationFile, fileSize, sha1Sums); this.owner = owner; this.image = image; - this.fileSize = fileSize; this.machineDescription = machineDescription; this.masterTransferInfo = null; } public IncomingDataTransfer(ImagePublishData publishData, File tmpFile, TransferInformation transferInfo) throws FileNotFoundException { - super(publishData.imageVersionId); + super(UUID.randomUUID().toString(), tmpFile, publishData.fileSize, + ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes)); ImageDetailsRead idr = new ImageDetailsRead(); idr.setCreateTime(publishData.createTime); idr.setDescription(publishData.description); @@ -147,13 +95,8 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC idr.setUpdaterId(publishData.user.userId); idr.setUpdateTime(publishData.createTime); idr.setVirtId(publishData.virtId); - this.tmpFileName = tmpFile; - this.tmpFileHandle = new RandomAccessFile(tmpFile, "rw"); - this.chunks = new ChunkList(publishData.fileSize, hashChecker == null ? null - : ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes)); this.owner = publishData.user; this.image = idr; - this.fileSize = publishData.fileSize; this.machineDescription = ThriftUtil.unwrapByteBuffer(transferInfo.machineDescription); this.masterTransferInfo = transferInfo; this.versionSettings = new ImageVersionWrite(false); @@ -167,10 +110,8 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC if (masterTransferInfo == null) return; synchronized (this) { - synchronized (downloads) { - if (downloads.size() >= 1) // TODO What to pick here? - return; - } + if (getActiveConnectionCount() >= 1) + return; Downloader downloader = null; if (masterTransferInfo.plainPort != 0) { try { @@ -221,114 +162,33 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC } /** - * Add another connection for this file transfer. - * - * @param connection - * @return true if the connection is accepted, false if it should be - * discarded - */ - public synchronized boolean addConnection(final Downloader connection, ExecutorService pool) { - if (state == TransferState.FINISHED || state == TransferState.ERROR) - return false; - synchronized (downloads) { - if (downloads.size() >= Constants.MAX_CONNECTIONS_PER_TRANSFER) - return false; - downloads.add(connection); - } - try { - pool.execute(new Runnable() { - @Override - public void run() { - CbHandler cbh = new CbHandler(connection); - if (!connection.download(cbh, cbh)) { - if (cbh.currentChunk != null) { - // If the download failed and we have a current chunk, put it back into - // the queue, so it will be handled again later... - chunks.markFailed(cbh.currentChunk); - } - LOGGER.warn("Download of " + tmpFileName.getAbsolutePath() + " failed"); - } - if (state != TransferState.FINISHED && state != TransferState.ERROR) { - lastActivityTime.set(System.currentTimeMillis()); - } - synchronized (downloads) { - downloads.remove(connection); - } - if (chunks.isComplete()) { - finishUpload(); - } - } - }); - } catch (Exception e) { - LOGGER.warn("threadpool rejected the incoming file transfer", e); - synchronized (downloads) { - downloads.remove(connection); - } - return false; - } - if (state == TransferState.IDLE) { - state = TransferState.WORKING; - } - return true; - } - - /** - * Write some data to the local file. Thread safe so we can - * have multiple concurrent connections. - * - * @param fileOffset - * @param dataLength - * @param data - * @return - */ - private void writeFileData(long fileOffset, int dataLength, byte[] data) { - synchronized (tmpFileHandle) { - if (state != TransferState.WORKING) - throw new IllegalStateException("Cannot write to file if state != WORKING"); - try { - tmpFileHandle.seek(fileOffset); - tmpFileHandle.write(data, 0, dataLength); - } catch (IOException e) { - LOGGER.error("Cannot write to '" + tmpFileName - + "'. Disk full, network storage error, bad permissions, ...?", e); - fileWritable = false; - } - } - if (!fileWritable) { - cancel(); - } - } - - /** * Called when the upload finished. */ - private synchronized void finishUpload() { - synchronized (tmpFileHandle) { - if (state != TransferState.WORKING) - return; - Util.safeClose(tmpFileHandle); - state = TransferState.FINISHED; - } + @Override + protected synchronized boolean finishIncomingTransfer() { + if (getState() != TransferState.FINISHED) + return false; potentialFinishTime.set(System.currentTimeMillis()); // If owner is not set, this was a repair-transfer, which downloads directly to the existing target file. // Nothing more to do in that case. if (isRepairUpload()) - return; + return true; LOGGER.info("Finalizing uploaded image " + image.imageName); // Ready to go. First step: Rename temp file to something usable String ext = "img"; try { - ext = new DiskImage(tmpFileName).format.extension; + ext = new DiskImage(getTmpFileName()).format.extension; } catch (IOException | UnknownImageFormatException e1) { } - File destination = new File(tmpFileName.getParent(), Formatter.vmName(owner, image.imageName, ext)); + File destination = new File(getTmpFileName().getParent(), Formatter.vmName(owner, image.imageName, + ext)); // Sanity check: destination should be a sub directory of the vmStorePath String relPath = FileSystem.getRelativePath(destination, Configuration.getVmStoreBasePath()); if (relPath == null) { LOGGER.warn(destination.getAbsolutePath() + " is not a subdir of " + Configuration.getVmStoreBasePath().getAbsolutePath()); cancel(); - return; + return false; } if (relPath.length() > 200) { LOGGER.error("Generated file name is >200 chars. DB will not like it"); @@ -338,7 +198,7 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC boolean ret = false; Exception renameException = null; try { - ret = tmpFileName.renameTo(destination); + ret = getTmpFileName().renameTo(destination); } catch (Exception e) { ret = false; renameException = e; @@ -346,43 +206,34 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC if (!ret) { // Rename failed :-( LOGGER.warn( - "Could not rename '" + tmpFileName.getAbsolutePath() + "' to '" + "Could not rename '" + getTmpFileName().getAbsolutePath() + "' to '" + destination.getAbsolutePath() + "'", renameException); cancel(); - return; + return false; } // Now insert meta data into DB try { synchronized (versionWrittenToDb) { - DbImage.createImageVersion(image.imageBaseId, getId(), owner, fileSize, relPath, - versionSettings, chunks, machineDescription); + DbImage.createImageVersion(image.imageBaseId, getId(), owner, getFileSize(), relPath, + versionSettings, getChunks(), machineDescription); versionWrittenToDb.set(true); } } catch (SQLException e) { LOGGER.error("Error finishing upload: Inserting version to DB failed", e); - state = TransferState.ERROR; // Also delete uploaded file, as there is no reference to it FileSystem.deleteAsync(destination); cancel(); - return; + return false; } + return true; } @Override public synchronized void cancel() { - if (state != TransferState.FINISHED && state != TransferState.ERROR) { - state = TransferState.ERROR; - } - synchronized (downloads) { - for (Downloader download : downloads) { - download.cancel(); - } - } - lastActivityTime.set(0); - Util.safeClose(tmpFileHandle); - if (!isRepairUpload() && tmpFileName.exists()) { - FileSystem.deleteAsync(tmpFileName); + super.cancel(); + if (!isRepairUpload() && getTmpFileName().exists()) { + FileSystem.deleteAsync(getTmpFileName()); } } @@ -399,203 +250,46 @@ public class IncomingDataTransfer extends AbstractTransfer implements HashCheckC return this.owner; } - public File getDestinationFile() { - return this.tmpFileName; - } - - public long getSize() { - return this.fileSize; - } - - /** - * Callback class for an instance of the Downloader, which supplies - * the Downloader with wanted file ranges, and handles incoming data. - */ - private class CbHandler implements WantRangeCallback, DataReceivedCallback { - /** - * The current chunk being transfered. - */ - private FileChunk currentChunk = null; - /** - * Current buffer to receive to - */ - private byte[] buffer = new byte[FileChunk.CHUNK_SIZE]; - /** - * Downloader object - */ - private final Downloader downloader; - - private CbHandler(Downloader downloader) { - this.downloader = downloader; - } - - @Override - public boolean dataReceived(long fileOffset, int dataLength, byte[] data) { - if (currentChunk == null) - throw new IllegalStateException("dataReceived without current chunk"); - if (!currentChunk.range.contains(fileOffset, fileOffset + dataLength)) - throw new IllegalStateException("dataReceived with file data out of range"); - System.arraycopy(data, 0, buffer, (int) (fileOffset - currentChunk.range.startOffset), dataLength); - return fileWritable; - } - - @Override - public FileRange get() { - if (currentChunk != null) { - if (hashChecker != null && currentChunk.getSha1Sum() != null) { - try { - hashChecker.queue(currentChunk, buffer, activeUpload); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return null; - } - try { - buffer = new byte[buffer.length]; - } catch (OutOfMemoryError e) { - // Usually catching OOM errors is a bad idea, but it's quite safe here as - // we know exactly where it happened, no hidden sub-calls through 20 objects. - // The most likely cause here is that the hash checker/disk cannot keep up - // writing out completed chunks, so we just sleep a bit and try again. If it still - // fails, we exit completely. - try { - Thread.sleep(6000); - } catch (InterruptedException e1) { - Thread.currentThread().interrupt(); - return null; - } - // Might raise OOM again, but THIS TIME I MEAN IT - try { - buffer = new byte[buffer.length]; - } catch (OutOfMemoryError e2) { - downloader.sendErrorCode("Out of RAM"); - cancel(); - } - } - } else { - // We have no hash checker or the hash for the current chunk is unknown - flush to disk - writeFileData(currentChunk.range.startOffset, currentChunk.range.getLength(), buffer); - chunks.markSuccessful(currentChunk); - } - } - // Get next missing chunk - try { - currentChunk = chunks.getMissing(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - cancel(); - return null; - } - if (currentChunk == null) { - return null; // No more chunks, returning null tells the Downloader we're done. - } - // Check remaining disk space and abort if it's too low - long space = FileSystem.getAvailableStorageBytes(); - if (space != -1 && space < MIN_FREE_SPACE_BYTES) { - downloader.sendErrorCode("Out of disk space"); - LOGGER.error("Out of space: Cancelling upload of " - + (image == null ? "image" : image.imageName) + " by " - + Formatter.userFullName(owner)); - cancel(); - return null; - } - return currentChunk.range; - } - } - public synchronized TransferStatus getStatus() { - return new TransferStatus(chunks.getStatusArray(), state); + return new TransferStatus(getChunks().getStatusArray(), getState()); } @Override - public void hashCheckDone(HashResult result, byte[] data, FileChunk chunk) { - if (state != TransferState.IDLE && state != TransferState.WORKING) - return; - switch (result) { - case FAILURE: - LOGGER.warn("Hash check of chunk " + chunk.toString() - + " could not be executed. Assuming valid :-("); - // Fall through - case VALID: - if (!chunk.isWrittenToDisk()) { - writeFileData(chunk.range.startOffset, chunk.range.getLength(), data); - } - chunks.markSuccessful(chunk); - if (chunks.isComplete()) { - finishUpload(); - } - break; - case INVALID: - LOGGER.warn("Hash check of chunk " + chunk.getChunkIndex() + " resulted in mismatch " - + chunk.getFailCount() + "x :-("); - chunks.markFailed(chunk); - break; - } + public boolean isActive() { + return getState() == TransferState.IDLE || getState() == TransferState.WORKING; } - private byte[] loadChunkFromFile(FileChunk chunk) { - synchronized (tmpFileHandle) { - if (state != TransferState.IDLE && state != TransferState.WORKING) - return null; - try { - tmpFileHandle.seek(chunk.range.startOffset); - byte[] buffer = new byte[chunk.range.getLength()]; - tmpFileHandle.readFully(buffer); - return buffer; - } catch (IOException e) { - LOGGER.error( - "Could not read chunk " + chunk.getChunkIndex() + " of File " - + tmpFileName.toString(), e); - return null; - } + @Override + protected void finalize() { + try { + super.finalize(); + } catch (Throwable t) { + } + try { + cancel(); + } catch (Throwable t) { } } - public void updateBlockHashList(List<byte[]> hashList) { - if (state != TransferState.IDLE && state != TransferState.WORKING) { - LOGGER.debug(this.getId() + ": Rejecting block hash list in state " + state); - return; - } - if (hashChecker == null || hashList == null) { - LOGGER.debug(this.getId() + ": Rejecting block hash list: No hasher"); - return; - } - chunks.updateSha1Sums(hashList); - FileChunk chunk; - while (null != (chunk = chunks.getUnhashedComplete())) { - byte[] data = loadChunkFromFile(chunk); - if (data == null) { - LOGGER.warn("Will mark unloadable chunk as valid :-("); - chunks.markSuccessful(chunk); - continue; - } - try { - hashChecker.queue(chunk, data, this); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - } + @Override + protected boolean hasEnoughFreeSpace() { + return FileSystem.getAvailableStorageBytes() > MIN_FREE_SPACE_BYTES; } @Override - public boolean isActive() { - return state == TransferState.IDLE || state == TransferState.WORKING; + public TransferInformation getTransferInfo() { + return new TransferInformation(getId(), FileServer.instance().getPlainPort(), FileServer.instance() + .getSslPort()); } @Override - public int getActiveConnectionCount() { - return downloads.size(); + public String getRelativePath() { + return FileSystem.getRelativePath(getTmpFileName(), Configuration.getVmStoreBasePath()); } @Override - protected void finalize() { - try { - Util.safeClose(tmpFileHandle); - if (tmpFileName.exists()) { - FileSystem.deleteAsync(tmpFileName); - } - } catch (Throwable t) { - } + protected void chunkStatusChanged(FileChunk chunk) { + // TODO Update in DB in case this is a repair upload } } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java index 31c219aa..581d7ba5 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java @@ -1,12 +1,18 @@ package org.openslx.bwlp.sat.fileserv; import java.io.File; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutorService; import org.apache.log4j.Logger; +import org.openslx.bwlp.sat.util.Configuration; import org.openslx.bwlp.sat.util.Constants; +import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.filetransfer.Uploader; public class OutgoingDataTransfer extends AbstractTransfer { @@ -20,19 +26,75 @@ public class OutgoingDataTransfer extends AbstractTransfer { private final File sourceFile; + private final TransferInformation masterTransferInfo; + private boolean isCanceled = false; + /** + * For downloads by clients. + * + * @param uuid UUID of this transfer + * @param file file to send to client + */ public OutgoingDataTransfer(String uuid, File file) { super(uuid); this.sourceFile = file; + this.masterTransferInfo = null; + } + + /** + * For uploads to the master server. + * + * @param transferInfo TI received by master server when it granted the + * upload + * @param absFile file to send to master server + */ + public OutgoingDataTransfer(TransferInformation transferInfo, File absFile) { + super(UUID.randomUUID().toString()); + this.masterTransferInfo = transferInfo; + this.sourceFile = absFile; } /** * Called periodically if this is a transfer from the master server, so we * can make sure the transfer is running. */ - public void heartBeat(ExecutorService pool) { - // TODO + public synchronized void heartBeat(ExecutorService pool) { + if (isCanceled || masterTransferInfo == null) + return; + synchronized (uploads) { + if (uploads.size() >= 1) + return; + } + Uploader uploader = null; + Exception connectException = null; + if (masterTransferInfo.plainPort != 0) { + // Try plain + try { + uploader = new Uploader(Configuration.getMasterServerAddress(), masterTransferInfo.plainPort, + 10000, null, masterTransferInfo.token); + } catch (IOException e) { + uploader = null; + connectException = e; + } + } + if (uploader == null && masterTransferInfo.sslPort != 0 && Configuration.getMasterServerSsl()) { + // Try SSL + try { + uploader = new Uploader(Configuration.getMasterServerAddress(), masterTransferInfo.sslPort, + 10000, Configuration.getMasterServerSslContext(), masterTransferInfo.token); + } catch (KeyManagementException | NoSuchAlgorithmException | IOException e) { + connectException = e; + } + } + if (uploader == null) { + LOGGER.debug("Cannot connect to master server for uploading", connectException); + } else { + synchronized (uploads) { + uploads.add(uploader); + } + runConnectionInternal(uploader, pool); + } } /** @@ -43,7 +105,7 @@ public class OutgoingDataTransfer extends AbstractTransfer { * discarded */ public synchronized boolean addConnection(final Uploader connection, ExecutorService pool) { - if (isCanceled) + if (isCanceled || masterTransferInfo != null) return false; potentialFinishTime.set(0); synchronized (uploads) { @@ -51,16 +113,19 @@ public class OutgoingDataTransfer extends AbstractTransfer { return false; uploads.add(connection); } + return runConnectionInternal(connection, pool); + } + + private boolean runConnectionInternal(final Uploader connection, ExecutorService pool) { try { pool.execute(new Runnable() { @Override public void run() { - potentialFinishTime.set(0); boolean ret = connection.upload(sourceFile.getAbsolutePath()); synchronized (uploads) { uploads.remove(connection); } - if (ret && uploads.isEmpty()) { + if (ret && uploads.isEmpty() && potentialFinishTime.get() == 0) { potentialFinishTime.set(System.currentTimeMillis()); } lastActivityTime.set(System.currentTimeMillis()); diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java index f7a9de85..27e11185 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java @@ -2,7 +2,10 @@ package org.openslx.bwlp.sat.fileserv; import java.io.File; import java.io.FileNotFoundException; +import java.nio.ByteBuffer; +import java.sql.SQLException; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -10,17 +13,23 @@ import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.apache.thrift.TException; +import org.openslx.bwlp.sat.database.mappers.DbImage; +import org.openslx.bwlp.sat.database.mappers.DbUser; +import org.openslx.bwlp.sat.database.models.LocalImageVersion; import org.openslx.bwlp.sat.util.Constants; +import org.openslx.bwlp.sat.util.FileSystem; import org.openslx.bwlp.sat.util.Formatter; -import org.openslx.bwlp.sat.util.GrowingThreadPoolExecutor; import org.openslx.bwlp.sat.util.PrioThreadFactory; +import org.openslx.bwlp.thrift.iface.ImageDetailsRead; import org.openslx.bwlp.thrift.iface.ImagePublishData; import org.openslx.bwlp.thrift.iface.InvocationError; import org.openslx.bwlp.thrift.iface.TAuthorizationException; import org.openslx.bwlp.thrift.iface.TInvocationException; import org.openslx.bwlp.thrift.iface.TNotFoundException; +import org.openslx.bwlp.thrift.iface.TTransferRejectedException; import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.thrifthelper.ThriftManager; +import org.openslx.util.GrowingThreadPoolExecutor; import org.openslx.util.QuickTimer; import org.openslx.util.QuickTimer.Task; @@ -50,20 +59,31 @@ public class SyncTransferHandler { private final Runnable worker = new Runnable() { @Override public void run() { - for (IncomingDataTransfer download : downloads.values()) { + final long now = System.currentTimeMillis(); + for (Iterator<IncomingDataTransfer> it = downloads.values().iterator(); it.hasNext();) { + IncomingDataTransfer download = it.next(); if (download.isActive()) download.heartBeat(transferPool); + if (download.isComplete(now)) { + LOGGER.info("Download from master server complete"); + it.remove(); + } } - for (OutgoingDataTransfer upload : uploads.values()) { + for (Iterator<OutgoingDataTransfer> it = uploads.values().iterator(); it.hasNext();) { + OutgoingDataTransfer upload = it.next(); if (upload.isActive()) upload.heartBeat(transferPool); + if (upload.isComplete(now)) { + LOGGER.info("Upload to master server complete"); + it.remove(); + } } } }; @Override public void fire() { - if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 1) + if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 2) return; transferPool.execute(worker); } @@ -75,18 +95,77 @@ public class SyncTransferHandler { QuickTimer.scheduleAtFixedDelay(heartBeatTask, 123, TimeUnit.SECONDS.toMillis(56)); } - public synchronized static String requestImageDownload(ImagePublishData image) + public synchronized static String requestImageUpload(String userToken, LocalImageVersion img) + throws SQLException, TNotFoundException, TInvocationException, TAuthorizationException, + TTransferRejectedException { + TransferInformation transferInfo; + OutgoingDataTransfer existing = uploads.get(img.imageVersionId); + if (existing != null) + return existing.getId(); + File absFile = FileSystem.composeAbsoluteImagePath(img); + if (!absFile.isFile() || !absFile.canRead()) { + LOGGER.error("Cannot upload " + img.imageVersionId + ": file missing: " + + absFile.getAbsolutePath()); + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, "Source file not readable"); + } + if (absFile.length() != img.fileSize) { + LOGGER.error("Cannot upload" + img.imageVersionId + ": wrong file size - expected " + + img.fileSize + ", got " + absFile.length()); + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "File corrupted on satellite server"); + } + checkUploadCount(); + ImageDetailsRead details = DbImage.getImageDetails(null, img.imageBaseId); + List<ByteBuffer> blockHashes = DbImage.getBlockHashes(img.imageVersionId); + ImagePublishData publishData = new ImagePublishData(); + publishData.createTime = img.createTime; + publishData.description = details.description; + publishData.fileSize = img.fileSize; + publishData.imageBaseId = img.imageBaseId; + publishData.imageName = details.imageName; + publishData.imageVersionId = img.imageVersionId; + publishData.isTemplate = details.isTemplate; + publishData.osId = details.osId; + publishData.user = DbUser.getOrNull(img.uploaderId); + publishData.virtId = details.virtId; + try { + transferInfo = ThriftManager.getMasterClient().submitImage(userToken, publishData, blockHashes); + } catch (TAuthorizationException e) { + LOGGER.warn("Master server rejected our session on uploadImage", e); + throw e; + } catch (TInvocationException e) { + LOGGER.warn("Master server made a boo-boo on uploadImage", e); + throw e; + } catch (TTransferRejectedException e) { + LOGGER.warn("Master server rejected our upload request", e); + throw e; + } catch (TException e) { + LOGGER.warn("Unknown exception on uploadImage to master server", e); + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "Communication with master server failed"); + } + OutgoingDataTransfer transfer = new OutgoingDataTransfer(transferInfo, absFile); + uploads.put(transfer.getId(), transfer); + transfer.heartBeat(transferPool); + return transfer.getId(); + } + + public synchronized static String requestImageDownload(String userToken, ImagePublishData image) throws TInvocationException, TAuthorizationException, TNotFoundException { TransferInformation transferInfo; // Already replicating this one? - if (downloads.containsKey(image.imageVersionId)) - return image.imageVersionId; + IncomingDataTransfer existing = downloads.get(image.imageVersionId); + if (existing != null) + return existing.getId(); checkDownloadCount(); try { - transferInfo = ThriftManager.getMasterClient().downloadImage(null, image.imageVersionId); + transferInfo = ThriftManager.getMasterClient().downloadImage(userToken, image.imageVersionId); } catch (TAuthorizationException e) { LOGGER.warn("Master server rejected our session on downloadImage", e); throw e; + } catch (TInvocationException e) { + LOGGER.warn("Master server made a boo-boo on downloadImage", e); + throw e; } catch (TNotFoundException e) { LOGGER.warn("Master server couldn't find image on downloadImage", e); throw e; @@ -131,6 +210,26 @@ public class SyncTransferHandler { } } + private static void checkUploadCount() throws TInvocationException { + Iterator<OutgoingDataTransfer> it = uploads.values().iterator(); + final long now = System.currentTimeMillis(); + int activeUploads = 0; + while (it.hasNext()) { + OutgoingDataTransfer download = it.next(); + if (download.isComplete(now) || download.hasReachedIdleTimeout(now)) { + download.cancel(); + it.remove(); + continue; + } + activeUploads++; + } + if (activeUploads > Constants.MAX_MASTER_UPLOADS) { + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "Server busy. Too many running uploads (" + activeUploads + "/" + + Constants.MAX_MASTER_UPLOADS + ")."); + } + } + /** * Get an upload instance by given token. * diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/mail/Mail.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/mail/Mail.java index 72271b57..1d232a57 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/mail/Mail.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/mail/Mail.java @@ -4,6 +4,7 @@ import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import org.apache.commons.codec.binary.Hex; import org.openslx.bwlp.thrift.iface.UserInfo; public class Mail { @@ -38,7 +39,7 @@ public class Mail { synchronized (md) { md.update(recipient.userId.getBytes(StandardCharsets.UTF_8)); md.update(message.getBytes(StandardCharsets.UTF_8)); - return new String(md.digest(), StandardCharsets.US_ASCII); + return Hex.encodeHexString(md.digest()); } } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java index 0ea9921d..2d3d2b91 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java @@ -67,6 +67,7 @@ import org.openslx.bwlp.thrift.iface.Virtualizer; import org.openslx.bwlp.thrift.iface.WhoamiInfo; import org.openslx.sat.thrift.version.Version; import org.openslx.thrifthelper.ThriftManager; +import org.openslx.util.ThriftUtil; public class ServerHandler implements SatelliteServer.Iface { @@ -515,7 +516,7 @@ public class ServerHandler implements SatelliteServer.Iface { throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, "Could not write to local DB"); } - return SyncTransferHandler.requestImageDownload(imagePublishData); + return SyncTransferHandler.requestImageDownload(userToken, imagePublishData); } @Override @@ -579,6 +580,27 @@ public class ServerHandler implements SatelliteServer.Iface { @Override public LectureRead getLectureDetails(String userToken, String lectureId) throws TAuthorizationException, TNotFoundException, TInvocationException { + // + // TEST + // + /* + String imageVersionId = "e9de1941-b673-4711-b033-d8c37d1e2d3e"; + LocalImageVersion img; + try { + img = DbImage.getLocalImageData(imageVersionId); + SyncTransferHandler.requestImageUpload(userToken, img); + } catch (SQLException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + } catch (TTransferRejectedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + */ + // + // + // + // UserInfo user = SessionManager.getOrFail(userToken); User.canSeeLectureDetailsOrFail(user); try { diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ThriftUtil.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ThriftUtil.java deleted file mode 100644 index ee02f440..00000000 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ThriftUtil.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.openslx.bwlp.sat.thrift; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -public class ThriftUtil { - - public static List<byte[]> unwrapByteBufferList(List<ByteBuffer> blockHashes) { - if (blockHashes == null || blockHashes.isEmpty()) - return null; - List<byte[]> hashList = new ArrayList<>(blockHashes.size()); - for (ByteBuffer hash : blockHashes) { - byte[] buffer = new byte[hash.remaining()]; - hash.get(buffer); - hashList.add(buffer); - } - return hashList; - } - - public static byte[] unwrapByteBuffer(ByteBuffer buffer) { - byte[] byteArray = null; - if (buffer != null) { - byteArray = new byte[buffer.remaining()]; - buffer.get(byteArray); - } - return byteArray; - } - -} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Configuration.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Configuration.java index 5dd9b8ec..f2926d64 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Configuration.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Configuration.java @@ -4,8 +4,12 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; import java.util.Properties; +import javax.net.ssl.SSLContext; + import org.apache.log4j.Logger; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @@ -24,6 +28,7 @@ public class Configuration { private static boolean masterSsl = true; private static int masterPort = 9091; private static String dbLocationTable; + private static SSLContext ctx = null; public static boolean load() throws IOException { // Load configuration from java properties file @@ -77,7 +82,7 @@ public class Configuration { public static String getDbPassword() { return dbPassword; } - + public static String getDbLocationTable() { return dbLocationTable; } @@ -104,4 +109,19 @@ public class Configuration { return new File(vmStoreProdPath, subdirDate.print(System.currentTimeMillis())); } + public static SSLContext getMasterServerSslContext() throws NoSuchAlgorithmException, + KeyManagementException { + if (!getMasterServerSsl()) + throw new RuntimeException("SSL not activated"); + if (ctx == null) { + synchronized (LOGGER) { + if (ctx == null) { + ctx = SSLContext.getInstance("TLSv1.2"); + ctx.init(null, null, null); + } + } + } + return ctx; + } + } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/GrowingThreadPoolExecutor.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/GrowingThreadPoolExecutor.java deleted file mode 100644 index a7265199..00000000 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/GrowingThreadPoolExecutor.java +++ /dev/null @@ -1,85 +0,0 @@ -package org.openslx.bwlp.sat.util; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * Grows to maximum pool size before queueing. See - * http://stackoverflow.com/a/20153234/2043481 - */ -public class GrowingThreadPoolExecutor extends ThreadPoolExecutor { - private int userSpecifiedCorePoolSize; - private int taskCount; - - /** - * The default rejected execution handler - */ - private static final RejectedExecutionHandler defaultHandler = - new AbortPolicy(); - - public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, - TimeUnit unit, BlockingQueue<Runnable> workQueue) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, defaultHandler); - } - - public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, - BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); - } - - public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, - BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { - this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), - handler); - } - - public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, - BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); - userSpecifiedCorePoolSize = corePoolSize; - } - - @Override - public void execute(Runnable runnable) { - synchronized (this) { - taskCount++; - setCorePoolSizeToTaskCountWithinBounds(); - } - super.execute(runnable); - } - - @Override - protected void afterExecute(Runnable runnable, Throwable throwable) { - super.afterExecute(runnable, throwable); - synchronized (this) { - taskCount--; - setCorePoolSizeToTaskCountWithinBounds(); - } - } - - private void setCorePoolSizeToTaskCountWithinBounds() { - int threads = taskCount; - if (threads < userSpecifiedCorePoolSize) - threads = userSpecifiedCorePoolSize; - if (threads > getMaximumPoolSize()) - threads = getMaximumPoolSize(); - super.setCorePoolSize(threads); - } - - public void setCorePoolSize(int corePoolSize) { - synchronized (this) { - userSpecifiedCorePoolSize = corePoolSize; - } - } - - @Override - public int getCorePoolSize() { - synchronized (this) { - return userSpecifiedCorePoolSize; - } - } -}
\ No newline at end of file diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Util.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Util.java index 53bfa403..8e6950e8 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Util.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/util/Util.java @@ -1,6 +1,5 @@ package org.openslx.bwlp.sat.util; -import java.io.Closeable; import java.util.regex.Pattern; public class Util { @@ -9,25 +8,6 @@ public class Util { * Try to close everything passed to this method. Never throw an exception * if it fails, just silently continue. * - * @param item One or more objects that are Closeable - */ - public static void safeClose(Closeable... item) { - if (item == null) - return; - for (Closeable c : item) { - if (c == null) - continue; - try { - c.close(); - } catch (Exception e) { - } - } - } - - /** - * Try to close everything passed to this method. Never throw an exception - * if it fails, just silently continue. - * * @param item One or more objects that are AutoCloseable */ public static void safeClose(AutoCloseable... item) { |