diff options
Diffstat (limited to 'dozentenmodulserver/src/main')
6 files changed, 252 insertions, 55 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java index 6261bcb7..c5e79389 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java @@ -1,7 +1,6 @@ package org.openslx.bwlp.sat.database.mappers; import java.io.File; -import java.nio.ByteBuffer; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; @@ -881,46 +880,13 @@ public class DbImage { if (!rs.next()) throw new TNotFoundException(); return new ImageVersionMeta(imageVersionId, rs.getString("imagebaseid"), - rs.getBytes("virtualizerconfig"), getBlockHashes(connection, imageVersionId)); + rs.getBytes("virtualizerconfig"), DbImageBlock.getBlockHashes(connection, imageVersionId)); } catch (SQLException e) { LOGGER.error("Query failed in DbImage.getVersionDetails()", e); throw e; } } - public static List<ByteBuffer> getBlockHashes(String imageVersionId) throws SQLException { - try (MysqlConnection connection = Database.getConnection()) { - return getBlockHashes(connection, imageVersionId); - } catch (SQLException e) { - LOGGER.error("Query failed in DbImage.getBlockHashes()", e); - throw e; - } - } - - private static List<ByteBuffer> getBlockHashes(MysqlConnection connection, String imageVersionId) - throws SQLException { - MysqlStatement stmt = connection.prepareStatement("SELECT startbyte, blocksha1 FROM imageblock" - + " WHERE imageversionid = :imageversionid ORDER BY startbyte ASC"); - stmt.setString("imageversionid", imageVersionId); - ResultSet rs = stmt.executeQuery(); - List<ByteBuffer> list = new ArrayList<>(); - long expectedOffset = 0; - while (rs.next()) { - long currentOffset = rs.getLong("startbyte"); - if (currentOffset < expectedOffset) - continue; - while (currentOffset > expectedOffset) { - list.add(null); - expectedOffset += FileChunk.CHUNK_SIZE; - } - if (currentOffset == expectedOffset) { - list.add(ByteBuffer.wrap(rs.getBytes("blocksha1"))); - expectedOffset += FileChunk.CHUNK_SIZE; - } - } - return list; - } - public enum DeleteState { KEEP, SHOULD_DELETE, diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImageBlock.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImageBlock.java new file mode 100644 index 00000000..bdaa356e --- /dev/null +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImageBlock.java @@ -0,0 +1,176 @@ +package org.openslx.bwlp.sat.database.mappers; + +import java.nio.ByteBuffer; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; + +import org.apache.log4j.Logger; +import org.openslx.bwlp.sat.database.Database; +import org.openslx.bwlp.sat.database.MysqlConnection; +import org.openslx.bwlp.sat.database.MysqlStatement; +import org.openslx.filetransfer.FileRange; +import org.openslx.filetransfer.util.ChunkStatus; +import org.openslx.filetransfer.util.FileChunk; + +public class DbImageBlock { + + private static final Logger LOGGER = Logger.getLogger(DbImageBlock.class); + + private static AsyncThread asyncBlockUpdate = null; + + private static synchronized void initAsyncThread() + { + if ( asyncBlockUpdate == null ) { + asyncBlockUpdate = new AsyncThread(); + asyncBlockUpdate.start(); + } + } + + public static void asyncUpdate( String imageVersionId, FileChunk chunk ) throws InterruptedException + { + initAsyncThread(); + asyncBlockUpdate.put( new ChunkUpdate( imageVersionId, chunk.range, chunk.getStatus() != ChunkStatus.COMPLETE ) ); + } + + private static class AsyncThread extends Thread + { + private final ArrayBlockingQueue<ChunkUpdate> queue = new ArrayBlockingQueue<>( 100 ); + + public void put( ChunkUpdate chunk ) throws InterruptedException + { + queue.put( chunk ); + } + + @Override + public void run() + { + try { + while ( !interrupted() ) { + ChunkUpdate chunk = queue.take(); + Thread.sleep( 100 ); + try ( MysqlConnection connection = Database.getConnection() ) { + MysqlStatement stmt = connection.prepareStatement( "UPDATE imageblock SET ismissing = :ismissing" + + " WHERE imageversionid = :imageversionid AND startbyte = :startbyte AND blocksize = :blocksize" ); + do { + stmt.setBoolean( "ismissing", chunk.isMissing ); + stmt.setString( "imageversionid", chunk.imageVersionId ); + stmt.setLong( "startbyte", chunk.range.startOffset ); + stmt.setInt( "blocksize", chunk.range.getLength() ); + stmt.executeUpdate(); + chunk = queue.poll(); + } while ( chunk != null ); + connection.commit(); + } catch ( SQLException e ) { + LOGGER.error( "Query failed in DbImageBlock.AsyncThread.run()", e ); + continue; + } + Thread.sleep( 2000 ); + } + } catch ( InterruptedException e ) { + LOGGER.debug( "async thread interrupted" ); + interrupt(); + } + } + } + + private static class ChunkUpdate + { + public final String imageVersionId; + public final FileRange range; + public final boolean isMissing; + + public ChunkUpdate( String imageVersionId, FileRange range, boolean isMissing ) + { + this.imageVersionId = imageVersionId; + this.range = range; + this.isMissing = isMissing; + } + } + + public static void insertChunkList( String imageVersionId, List<FileChunk> all, boolean missing ) throws SQLException + { + try ( MysqlConnection connection = Database.getConnection() ) { + MysqlStatement stmt = connection.prepareStatement( "INSERT IGNORE INTO imageblock" + + " (imageversionid, startbyte, blocksize, blocksha1, ismissing) VALUES" + + " (:imageversionid, :startbyte, :blocksize, :blocksha1, :ismissing)" ); + stmt.setString( "imageversionid", imageVersionId ); + stmt.setBoolean( "ismissing", missing ); + for ( FileChunk chunk : all ) { + stmt.setLong( "startbyte", chunk.range.startOffset ); + stmt.setInt( "blocksize", chunk.range.getLength() ); + stmt.setBinary( "blocksha1", chunk.getSha1Sum() ); + stmt.executeUpdate(); + } + connection.commit(); + } catch ( SQLException e ) { + LOGGER.error( "Query failed in DbImageBlock.insertChunkList()", e ); + throw e; + } + } + + static List<ByteBuffer> getBlockHashes(MysqlConnection connection, String imageVersionId) + throws SQLException { + MysqlStatement stmt = connection.prepareStatement("SELECT startbyte, blocksha1 FROM imageblock" + + " WHERE imageversionid = :imageversionid ORDER BY startbyte ASC"); + stmt.setString("imageversionid", imageVersionId); + ResultSet rs = stmt.executeQuery(); + List<ByteBuffer> list = new ArrayList<>(); + long expectedOffset = 0; + while (rs.next()) { + long currentOffset = rs.getLong("startbyte"); + if (currentOffset < expectedOffset) + continue; + while (currentOffset > expectedOffset) { + list.add(null); + expectedOffset += FileChunk.CHUNK_SIZE; + } + if (currentOffset == expectedOffset) { + list.add(ByteBuffer.wrap(rs.getBytes("blocksha1"))); + expectedOffset += FileChunk.CHUNK_SIZE; + } + } + return list; + } + + public static List<ByteBuffer> getBlockHashes(String imageVersionId) throws SQLException { + try (MysqlConnection connection = Database.getConnection()) { + return getBlockHashes(connection, imageVersionId); + } catch (SQLException e) { + LOGGER.error("Query failed in DbImage.getBlockHashes()", e); + throw e; + } + } + + public static List<Boolean> getMissingStatusList( String imageVersionId ) throws SQLException + { + try ( MysqlConnection connection = Database.getConnection() ) { + MysqlStatement stmt = connection.prepareStatement( "SELECT startbyte, ismissing FROM imageblock" + + " WHERE imageversionid = :imageversionid ORDER BY startbyte ASC" ); + stmt.setString( "imageversionid", imageVersionId ); + ResultSet rs = stmt.executeQuery(); + List<Boolean> list = new ArrayList<>(); + long expectedOffset = 0; + while ( rs.next() ) { + long currentOffset = rs.getLong( "startbyte" ); + if ( currentOffset < expectedOffset ) + continue; + while ( currentOffset > expectedOffset ) { + list.add( Boolean.TRUE ); + expectedOffset += FileChunk.CHUNK_SIZE; + } + if ( currentOffset == expectedOffset ) { + list.add( rs.getBoolean( "ismissing" ) ); + expectedOffset += FileChunk.CHUNK_SIZE; + } + } + return list; + } catch ( SQLException e ) { + LOGGER.error( "Query failed in DbImageBlock.getBlockStatuses()", e ); + throw e; + } + } + +} diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java index 8c00ad60..549bfaec 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java @@ -161,7 +161,7 @@ public class FileServer implements IncomingEvent { IncomingDataTransfer upload; try { upload = new IncomingDataTransfer(key, owner, image, destinationFile, fileSize, sha1Sums, - machineDescription); + machineDescription, false); } catch (FileNotFoundException e) { LOGGER.error("Could not open destination file for writing", e); throw new TTransferRejectedException("Destination file not writable!"); diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java index 5c7621a9..8bda7cab 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java @@ -13,6 +13,7 @@ import javax.net.ssl.SSLContext; import org.apache.log4j.Logger; import org.openslx.bwlp.sat.database.mappers.DbImage; +import org.openslx.bwlp.sat.database.mappers.DbImageBlock; import org.openslx.bwlp.sat.util.Configuration; import org.openslx.bwlp.sat.util.Constants; import org.openslx.bwlp.sat.util.FileSystem; @@ -20,10 +21,12 @@ import org.openslx.bwlp.sat.util.Formatter; import org.openslx.bwlp.thrift.iface.ImageDetailsRead; import org.openslx.bwlp.thrift.iface.ImagePublishData; import org.openslx.bwlp.thrift.iface.ImageVersionWrite; +import org.openslx.bwlp.thrift.iface.TNotFoundException; import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.bwlp.thrift.iface.TransferState; import org.openslx.bwlp.thrift.iface.UserInfo; import org.openslx.filetransfer.Downloader; +import org.openslx.filetransfer.util.ChunkStatus; import org.openslx.filetransfer.util.FileChunk; import org.openslx.filetransfer.util.IncomingTransferBase; import org.openslx.util.ThriftUtil; @@ -68,17 +71,18 @@ public class IncomingDataTransfer extends IncomingTransferBase { private final TransferInformation masterTransferInfo; public IncomingDataTransfer(String uploadId, UserInfo owner, ImageDetailsRead image, - File destinationFile, long fileSize, List<byte[]> sha1Sums, byte[] machineDescription) - throws FileNotFoundException { + File destinationFile, long fileSize, List<byte[]> sha1Sums, byte[] machineDescription, + boolean repairUpload) throws FileNotFoundException { super(uploadId, destinationFile, fileSize, sha1Sums); - this.owner = owner; + this.owner = repairUpload ? null : owner; this.image = image; this.machineDescription = machineDescription; this.masterTransferInfo = null; + initRepairUpload(); } - public IncomingDataTransfer(ImagePublishData publishData, File tmpFile, TransferInformation transferInfo) - throws FileNotFoundException { + public IncomingDataTransfer(ImagePublishData publishData, File tmpFile, TransferInformation transferInfo, + boolean repairUpload) throws FileNotFoundException { super(UUID.randomUUID().toString(), tmpFile, publishData.fileSize, ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes)); ImageDetailsRead idr = new ImageDetailsRead(); @@ -94,11 +98,29 @@ public class IncomingDataTransfer extends IncomingTransferBase { idr.setUpdaterId(publishData.uploader.userId); idr.setUpdateTime(publishData.createTime); idr.setVirtId(publishData.virtId); - this.owner = publishData.uploader; + this.owner = repairUpload ? null : publishData.uploader; this.image = idr; this.machineDescription = ThriftUtil.unwrapByteBuffer(transferInfo.machineDescription); this.masterTransferInfo = transferInfo; this.versionSettings = new ImageVersionWrite(false); + initRepairUpload(); + } + + private void initRepairUpload() { + if (!isRepairUpload()) + return; + if (getTmpFileName().exists() && getTmpFileName().length() > 0) { + try { + List<Boolean> statusList = DbImageBlock.getMissingStatusList(getVersionId()); + if (!statusList.isEmpty()) { + getChunks().resumeFromStatusList(statusList, getTmpFileName().length()); + for (int i = 0; i < 3; ++i) { + queueUnhashedChunk(false); + } + } + } catch (SQLException e) { + } + } } /** @@ -148,6 +170,8 @@ public class IncomingDataTransfer extends IncomingTransferBase { * @param data */ public boolean setVersionData(UserInfo user, ImageVersionWrite data) { + if (isRepairUpload()) + return false; synchronized (versionWrittenToDb) { if (versionWrittenToDb.get()) { return false; @@ -170,8 +194,17 @@ public class IncomingDataTransfer extends IncomingTransferBase { potentialFinishTime.set(System.currentTimeMillis()); // If owner is not set, this was a repair-transfer, which downloads directly to the existing target file. // Nothing more to do in that case. - if (isRepairUpload()) + if (isRepairUpload()) { + try { + DbImage.markValid(true, false, DbImage.getLocalImageData(getVersionId())); + } catch (TNotFoundException e) { + LOGGER.warn("Apparently, the image " + getVersionId() + + " that was just repaired doesn't exist..."); + } catch (SQLException e) { + } return true; + } + // It's a fresh upload LOGGER.info("Finalizing uploaded image " + image.imageName); // Ready to go. First step: Rename temp file to something usable String ext = "img"; @@ -291,7 +324,15 @@ public class IncomingDataTransfer extends IncomingTransferBase { @Override protected void chunkStatusChanged(FileChunk chunk) { - // TODO Update in DB in case this is a repair upload + if (!isRepairUpload()) + return; + ChunkStatus status = chunk.getStatus(); + if (status == ChunkStatus.MISSING || status == ChunkStatus.COMPLETE) { + try { + DbImageBlock.asyncUpdate(getVersionId(), chunk); + } catch (InterruptedException e) { + } + } } } diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java index cd46a1c5..f3d64784 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java @@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.apache.thrift.TException; import org.openslx.bwlp.sat.database.mappers.DbImage; +import org.openslx.bwlp.sat.database.mappers.DbImageBlock; import org.openslx.bwlp.sat.database.mappers.DbUser; import org.openslx.bwlp.sat.database.models.ImageVersionMeta; import org.openslx.bwlp.sat.database.models.LocalImageVersion; @@ -132,7 +133,7 @@ public class SyncTransferHandler { throw new TInvocationException(InvocationError.MISSING_DATA, "Given virtual machine has no hardware description"); ImageDetailsRead details = DbImage.getImageDetails(null, imgVersion.imageBaseId); - List<ByteBuffer> blockHashes = DbImage.getBlockHashes(imgVersion.imageVersionId); + List<ByteBuffer> blockHashes = DbImageBlock.getBlockHashes(imgVersion.imageVersionId); ImagePublishData publishData = new ImagePublishData(); publishData.createTime = imgVersion.createTime; publishData.description = details.description; @@ -193,13 +194,29 @@ public class SyncTransferHandler { throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, "Communication with master server failed"); } - File tmpFile = null; - do { - tmpFile = Formatter.getTempImageName(); - } while (tmpFile.exists()); + // Already exists? Already complete? + LocalImageVersion localImageData; + try { + localImageData = DbImage.getLocalImageData(image.imageVersionId); + } catch (TNotFoundException e) { + localImageData = null; + } catch (SQLException e) { + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, "Database error"); + } + File tmpFile; + if (localImageData == null) { + // New + tmpFile = null; + do { + tmpFile = Formatter.getTempImageName(); + } while (tmpFile.exists()); + } else { + tmpFile = FileSystem.composeAbsoluteImagePath(localImageData); + } tmpFile.getParentFile().mkdirs(); try { - IncomingDataTransfer transfer = new IncomingDataTransfer(image, tmpFile, transferInfo); + IncomingDataTransfer transfer = new IncomingDataTransfer(image, tmpFile, transferInfo, + localImageData != null); downloads.put(transfer.getId(), transfer); return transfer.getId(); } catch (FileNotFoundException e) { diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java index add5d13b..fc767460 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java @@ -505,9 +505,8 @@ public class ServerHandler implements SatelliteServer.Iface { try { imagePublishData = ThriftManager.getMasterClient().getImageData(userToken, imageVersionId); } catch (TException e) { - LOGGER.error( - "Could not query image data from master server for an image that a client wants to replicate", - e); + LOGGER.error("Could not query image data from master server for" + + " an image that a client wants to replicate", e); throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, "Cannot query master server for image information"); } @@ -516,7 +515,6 @@ public class ServerHandler implements SatelliteServer.Iface { if (imagePublishData.owner == null) { imagePublishData.owner = imagePublishData.uploader; } - LOGGER.debug("Writing user " + imagePublishData.owner); DbUser.writeUserOnReplication(imagePublishData.owner); DbImage.writeBaseImage(imagePublishData); } catch (SQLException e) { @@ -524,7 +522,6 @@ public class ServerHandler implements SatelliteServer.Iface { "Could not write to local DB"); } imagePublishData.uploader = user; - LOGGER.debug("Setting uploader to " + user); return SyncTransferHandler.requestImageDownload(userToken, imagePublishData); } |