summaryrefslogtreecommitdiffstats
path: root/dozentenmodulserver/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'dozentenmodulserver/src/main')
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java36
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImageBlock.java176
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java2
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java57
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java29
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java7
6 files changed, 252 insertions, 55 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java
index 6261bcb7..c5e79389 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImage.java
@@ -1,7 +1,6 @@
package org.openslx.bwlp.sat.database.mappers;
import java.io.File;
-import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -881,46 +880,13 @@ public class DbImage {
if (!rs.next())
throw new TNotFoundException();
return new ImageVersionMeta(imageVersionId, rs.getString("imagebaseid"),
- rs.getBytes("virtualizerconfig"), getBlockHashes(connection, imageVersionId));
+ rs.getBytes("virtualizerconfig"), DbImageBlock.getBlockHashes(connection, imageVersionId));
} catch (SQLException e) {
LOGGER.error("Query failed in DbImage.getVersionDetails()", e);
throw e;
}
}
- public static List<ByteBuffer> getBlockHashes(String imageVersionId) throws SQLException {
- try (MysqlConnection connection = Database.getConnection()) {
- return getBlockHashes(connection, imageVersionId);
- } catch (SQLException e) {
- LOGGER.error("Query failed in DbImage.getBlockHashes()", e);
- throw e;
- }
- }
-
- private static List<ByteBuffer> getBlockHashes(MysqlConnection connection, String imageVersionId)
- throws SQLException {
- MysqlStatement stmt = connection.prepareStatement("SELECT startbyte, blocksha1 FROM imageblock"
- + " WHERE imageversionid = :imageversionid ORDER BY startbyte ASC");
- stmt.setString("imageversionid", imageVersionId);
- ResultSet rs = stmt.executeQuery();
- List<ByteBuffer> list = new ArrayList<>();
- long expectedOffset = 0;
- while (rs.next()) {
- long currentOffset = rs.getLong("startbyte");
- if (currentOffset < expectedOffset)
- continue;
- while (currentOffset > expectedOffset) {
- list.add(null);
- expectedOffset += FileChunk.CHUNK_SIZE;
- }
- if (currentOffset == expectedOffset) {
- list.add(ByteBuffer.wrap(rs.getBytes("blocksha1")));
- expectedOffset += FileChunk.CHUNK_SIZE;
- }
- }
- return list;
- }
-
public enum DeleteState {
KEEP,
SHOULD_DELETE,
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImageBlock.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImageBlock.java
new file mode 100644
index 00000000..bdaa356e
--- /dev/null
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/database/mappers/DbImageBlock.java
@@ -0,0 +1,176 @@
+package org.openslx.bwlp.sat.database.mappers;
+
+import java.nio.ByteBuffer;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.log4j.Logger;
+import org.openslx.bwlp.sat.database.Database;
+import org.openslx.bwlp.sat.database.MysqlConnection;
+import org.openslx.bwlp.sat.database.MysqlStatement;
+import org.openslx.filetransfer.FileRange;
+import org.openslx.filetransfer.util.ChunkStatus;
+import org.openslx.filetransfer.util.FileChunk;
+
+public class DbImageBlock {
+
+ private static final Logger LOGGER = Logger.getLogger(DbImageBlock.class);
+
+ private static AsyncThread asyncBlockUpdate = null;
+
+ private static synchronized void initAsyncThread()
+ {
+ if ( asyncBlockUpdate == null ) {
+ asyncBlockUpdate = new AsyncThread();
+ asyncBlockUpdate.start();
+ }
+ }
+
+ public static void asyncUpdate( String imageVersionId, FileChunk chunk ) throws InterruptedException
+ {
+ initAsyncThread();
+ asyncBlockUpdate.put( new ChunkUpdate( imageVersionId, chunk.range, chunk.getStatus() != ChunkStatus.COMPLETE ) );
+ }
+
+ private static class AsyncThread extends Thread
+ {
+ private final ArrayBlockingQueue<ChunkUpdate> queue = new ArrayBlockingQueue<>( 100 );
+
+ public void put( ChunkUpdate chunk ) throws InterruptedException
+ {
+ queue.put( chunk );
+ }
+
+ @Override
+ public void run()
+ {
+ try {
+ while ( !interrupted() ) {
+ ChunkUpdate chunk = queue.take();
+ Thread.sleep( 100 );
+ try ( MysqlConnection connection = Database.getConnection() ) {
+ MysqlStatement stmt = connection.prepareStatement( "UPDATE imageblock SET ismissing = :ismissing"
+ + " WHERE imageversionid = :imageversionid AND startbyte = :startbyte AND blocksize = :blocksize" );
+ do {
+ stmt.setBoolean( "ismissing", chunk.isMissing );
+ stmt.setString( "imageversionid", chunk.imageVersionId );
+ stmt.setLong( "startbyte", chunk.range.startOffset );
+ stmt.setInt( "blocksize", chunk.range.getLength() );
+ stmt.executeUpdate();
+ chunk = queue.poll();
+ } while ( chunk != null );
+ connection.commit();
+ } catch ( SQLException e ) {
+ LOGGER.error( "Query failed in DbImageBlock.AsyncThread.run()", e );
+ continue;
+ }
+ Thread.sleep( 2000 );
+ }
+ } catch ( InterruptedException e ) {
+ LOGGER.debug( "async thread interrupted" );
+ interrupt();
+ }
+ }
+ }
+
+ private static class ChunkUpdate
+ {
+ public final String imageVersionId;
+ public final FileRange range;
+ public final boolean isMissing;
+
+ public ChunkUpdate( String imageVersionId, FileRange range, boolean isMissing )
+ {
+ this.imageVersionId = imageVersionId;
+ this.range = range;
+ this.isMissing = isMissing;
+ }
+ }
+
+ public static void insertChunkList( String imageVersionId, List<FileChunk> all, boolean missing ) throws SQLException
+ {
+ try ( MysqlConnection connection = Database.getConnection() ) {
+ MysqlStatement stmt = connection.prepareStatement( "INSERT IGNORE INTO imageblock"
+ + " (imageversionid, startbyte, blocksize, blocksha1, ismissing) VALUES"
+ + " (:imageversionid, :startbyte, :blocksize, :blocksha1, :ismissing)" );
+ stmt.setString( "imageversionid", imageVersionId );
+ stmt.setBoolean( "ismissing", missing );
+ for ( FileChunk chunk : all ) {
+ stmt.setLong( "startbyte", chunk.range.startOffset );
+ stmt.setInt( "blocksize", chunk.range.getLength() );
+ stmt.setBinary( "blocksha1", chunk.getSha1Sum() );
+ stmt.executeUpdate();
+ }
+ connection.commit();
+ } catch ( SQLException e ) {
+ LOGGER.error( "Query failed in DbImageBlock.insertChunkList()", e );
+ throw e;
+ }
+ }
+
+ static List<ByteBuffer> getBlockHashes(MysqlConnection connection, String imageVersionId)
+ throws SQLException {
+ MysqlStatement stmt = connection.prepareStatement("SELECT startbyte, blocksha1 FROM imageblock"
+ + " WHERE imageversionid = :imageversionid ORDER BY startbyte ASC");
+ stmt.setString("imageversionid", imageVersionId);
+ ResultSet rs = stmt.executeQuery();
+ List<ByteBuffer> list = new ArrayList<>();
+ long expectedOffset = 0;
+ while (rs.next()) {
+ long currentOffset = rs.getLong("startbyte");
+ if (currentOffset < expectedOffset)
+ continue;
+ while (currentOffset > expectedOffset) {
+ list.add(null);
+ expectedOffset += FileChunk.CHUNK_SIZE;
+ }
+ if (currentOffset == expectedOffset) {
+ list.add(ByteBuffer.wrap(rs.getBytes("blocksha1")));
+ expectedOffset += FileChunk.CHUNK_SIZE;
+ }
+ }
+ return list;
+ }
+
+ public static List<ByteBuffer> getBlockHashes(String imageVersionId) throws SQLException {
+ try (MysqlConnection connection = Database.getConnection()) {
+ return getBlockHashes(connection, imageVersionId);
+ } catch (SQLException e) {
+ LOGGER.error("Query failed in DbImage.getBlockHashes()", e);
+ throw e;
+ }
+ }
+
+ public static List<Boolean> getMissingStatusList( String imageVersionId ) throws SQLException
+ {
+ try ( MysqlConnection connection = Database.getConnection() ) {
+ MysqlStatement stmt = connection.prepareStatement( "SELECT startbyte, ismissing FROM imageblock"
+ + " WHERE imageversionid = :imageversionid ORDER BY startbyte ASC" );
+ stmt.setString( "imageversionid", imageVersionId );
+ ResultSet rs = stmt.executeQuery();
+ List<Boolean> list = new ArrayList<>();
+ long expectedOffset = 0;
+ while ( rs.next() ) {
+ long currentOffset = rs.getLong( "startbyte" );
+ if ( currentOffset < expectedOffset )
+ continue;
+ while ( currentOffset > expectedOffset ) {
+ list.add( Boolean.TRUE );
+ expectedOffset += FileChunk.CHUNK_SIZE;
+ }
+ if ( currentOffset == expectedOffset ) {
+ list.add( rs.getBoolean( "ismissing" ) );
+ expectedOffset += FileChunk.CHUNK_SIZE;
+ }
+ }
+ return list;
+ } catch ( SQLException e ) {
+ LOGGER.error( "Query failed in DbImageBlock.getBlockStatuses()", e );
+ throw e;
+ }
+ }
+
+}
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
index 8c00ad60..549bfaec 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
@@ -161,7 +161,7 @@ public class FileServer implements IncomingEvent {
IncomingDataTransfer upload;
try {
upload = new IncomingDataTransfer(key, owner, image, destinationFile, fileSize, sha1Sums,
- machineDescription);
+ machineDescription, false);
} catch (FileNotFoundException e) {
LOGGER.error("Could not open destination file for writing", e);
throw new TTransferRejectedException("Destination file not writable!");
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
index 5c7621a9..8bda7cab 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
@@ -13,6 +13,7 @@ import javax.net.ssl.SSLContext;
import org.apache.log4j.Logger;
import org.openslx.bwlp.sat.database.mappers.DbImage;
+import org.openslx.bwlp.sat.database.mappers.DbImageBlock;
import org.openslx.bwlp.sat.util.Configuration;
import org.openslx.bwlp.sat.util.Constants;
import org.openslx.bwlp.sat.util.FileSystem;
@@ -20,10 +21,12 @@ import org.openslx.bwlp.sat.util.Formatter;
import org.openslx.bwlp.thrift.iface.ImageDetailsRead;
import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.ImageVersionWrite;
+import org.openslx.bwlp.thrift.iface.TNotFoundException;
import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.bwlp.thrift.iface.UserInfo;
import org.openslx.filetransfer.Downloader;
+import org.openslx.filetransfer.util.ChunkStatus;
import org.openslx.filetransfer.util.FileChunk;
import org.openslx.filetransfer.util.IncomingTransferBase;
import org.openslx.util.ThriftUtil;
@@ -68,17 +71,18 @@ public class IncomingDataTransfer extends IncomingTransferBase {
private final TransferInformation masterTransferInfo;
public IncomingDataTransfer(String uploadId, UserInfo owner, ImageDetailsRead image,
- File destinationFile, long fileSize, List<byte[]> sha1Sums, byte[] machineDescription)
- throws FileNotFoundException {
+ File destinationFile, long fileSize, List<byte[]> sha1Sums, byte[] machineDescription,
+ boolean repairUpload) throws FileNotFoundException {
super(uploadId, destinationFile, fileSize, sha1Sums);
- this.owner = owner;
+ this.owner = repairUpload ? null : owner;
this.image = image;
this.machineDescription = machineDescription;
this.masterTransferInfo = null;
+ initRepairUpload();
}
- public IncomingDataTransfer(ImagePublishData publishData, File tmpFile, TransferInformation transferInfo)
- throws FileNotFoundException {
+ public IncomingDataTransfer(ImagePublishData publishData, File tmpFile, TransferInformation transferInfo,
+ boolean repairUpload) throws FileNotFoundException {
super(UUID.randomUUID().toString(), tmpFile, publishData.fileSize,
ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes));
ImageDetailsRead idr = new ImageDetailsRead();
@@ -94,11 +98,29 @@ public class IncomingDataTransfer extends IncomingTransferBase {
idr.setUpdaterId(publishData.uploader.userId);
idr.setUpdateTime(publishData.createTime);
idr.setVirtId(publishData.virtId);
- this.owner = publishData.uploader;
+ this.owner = repairUpload ? null : publishData.uploader;
this.image = idr;
this.machineDescription = ThriftUtil.unwrapByteBuffer(transferInfo.machineDescription);
this.masterTransferInfo = transferInfo;
this.versionSettings = new ImageVersionWrite(false);
+ initRepairUpload();
+ }
+
+ private void initRepairUpload() {
+ if (!isRepairUpload())
+ return;
+ if (getTmpFileName().exists() && getTmpFileName().length() > 0) {
+ try {
+ List<Boolean> statusList = DbImageBlock.getMissingStatusList(getVersionId());
+ if (!statusList.isEmpty()) {
+ getChunks().resumeFromStatusList(statusList, getTmpFileName().length());
+ for (int i = 0; i < 3; ++i) {
+ queueUnhashedChunk(false);
+ }
+ }
+ } catch (SQLException e) {
+ }
+ }
}
/**
@@ -148,6 +170,8 @@ public class IncomingDataTransfer extends IncomingTransferBase {
* @param data
*/
public boolean setVersionData(UserInfo user, ImageVersionWrite data) {
+ if (isRepairUpload())
+ return false;
synchronized (versionWrittenToDb) {
if (versionWrittenToDb.get()) {
return false;
@@ -170,8 +194,17 @@ public class IncomingDataTransfer extends IncomingTransferBase {
potentialFinishTime.set(System.currentTimeMillis());
// If owner is not set, this was a repair-transfer, which downloads directly to the existing target file.
// Nothing more to do in that case.
- if (isRepairUpload())
+ if (isRepairUpload()) {
+ try {
+ DbImage.markValid(true, false, DbImage.getLocalImageData(getVersionId()));
+ } catch (TNotFoundException e) {
+ LOGGER.warn("Apparently, the image " + getVersionId()
+ + " that was just repaired doesn't exist...");
+ } catch (SQLException e) {
+ }
return true;
+ }
+ // It's a fresh upload
LOGGER.info("Finalizing uploaded image " + image.imageName);
// Ready to go. First step: Rename temp file to something usable
String ext = "img";
@@ -291,7 +324,15 @@ public class IncomingDataTransfer extends IncomingTransferBase {
@Override
protected void chunkStatusChanged(FileChunk chunk) {
- // TODO Update in DB in case this is a repair upload
+ if (!isRepairUpload())
+ return;
+ ChunkStatus status = chunk.getStatus();
+ if (status == ChunkStatus.MISSING || status == ChunkStatus.COMPLETE) {
+ try {
+ DbImageBlock.asyncUpdate(getVersionId(), chunk);
+ } catch (InterruptedException e) {
+ }
+ }
}
}
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
index cd46a1c5..f3d64784 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
@@ -16,6 +16,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.openslx.bwlp.sat.database.mappers.DbImage;
+import org.openslx.bwlp.sat.database.mappers.DbImageBlock;
import org.openslx.bwlp.sat.database.mappers.DbUser;
import org.openslx.bwlp.sat.database.models.ImageVersionMeta;
import org.openslx.bwlp.sat.database.models.LocalImageVersion;
@@ -132,7 +133,7 @@ public class SyncTransferHandler {
throw new TInvocationException(InvocationError.MISSING_DATA,
"Given virtual machine has no hardware description");
ImageDetailsRead details = DbImage.getImageDetails(null, imgVersion.imageBaseId);
- List<ByteBuffer> blockHashes = DbImage.getBlockHashes(imgVersion.imageVersionId);
+ List<ByteBuffer> blockHashes = DbImageBlock.getBlockHashes(imgVersion.imageVersionId);
ImagePublishData publishData = new ImagePublishData();
publishData.createTime = imgVersion.createTime;
publishData.description = details.description;
@@ -193,13 +194,29 @@ public class SyncTransferHandler {
throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
"Communication with master server failed");
}
- File tmpFile = null;
- do {
- tmpFile = Formatter.getTempImageName();
- } while (tmpFile.exists());
+ // Already exists? Already complete?
+ LocalImageVersion localImageData;
+ try {
+ localImageData = DbImage.getLocalImageData(image.imageVersionId);
+ } catch (TNotFoundException e) {
+ localImageData = null;
+ } catch (SQLException e) {
+ throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, "Database error");
+ }
+ File tmpFile;
+ if (localImageData == null) {
+ // New
+ tmpFile = null;
+ do {
+ tmpFile = Formatter.getTempImageName();
+ } while (tmpFile.exists());
+ } else {
+ tmpFile = FileSystem.composeAbsoluteImagePath(localImageData);
+ }
tmpFile.getParentFile().mkdirs();
try {
- IncomingDataTransfer transfer = new IncomingDataTransfer(image, tmpFile, transferInfo);
+ IncomingDataTransfer transfer = new IncomingDataTransfer(image, tmpFile, transferInfo,
+ localImageData != null);
downloads.put(transfer.getId(), transfer);
return transfer.getId();
} catch (FileNotFoundException e) {
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java
index add5d13b..fc767460 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/thrift/ServerHandler.java
@@ -505,9 +505,8 @@ public class ServerHandler implements SatelliteServer.Iface {
try {
imagePublishData = ThriftManager.getMasterClient().getImageData(userToken, imageVersionId);
} catch (TException e) {
- LOGGER.error(
- "Could not query image data from master server for an image that a client wants to replicate",
- e);
+ LOGGER.error("Could not query image data from master server for"
+ + " an image that a client wants to replicate", e);
throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
"Cannot query master server for image information");
}
@@ -516,7 +515,6 @@ public class ServerHandler implements SatelliteServer.Iface {
if (imagePublishData.owner == null) {
imagePublishData.owner = imagePublishData.uploader;
}
- LOGGER.debug("Writing user " + imagePublishData.owner);
DbUser.writeUserOnReplication(imagePublishData.owner);
DbImage.writeBaseImage(imagePublishData);
} catch (SQLException e) {
@@ -524,7 +522,6 @@ public class ServerHandler implements SatelliteServer.Iface {
"Could not write to local DB");
}
imagePublishData.uploader = user;
- LOGGER.debug("Setting uploader to " + user);
return SyncTransferHandler.requestImageDownload(userToken, imagePublishData);
}