summaryrefslogtreecommitdiffstats
path: root/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv
diff options
context:
space:
mode:
Diffstat (limited to 'dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv')
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java24
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java8
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java12
-rw-r--r--dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java30
4 files changed, 65 insertions, 9 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
index 656a515b..b1466b5c 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/FileServer.java
@@ -236,7 +236,7 @@ public class FileServer implements IncomingEvent {
throw new TTransferRejectedException(errorMessage);
}
String key = UUID.randomUUID().toString();
- OutgoingDataTransfer transfer = new OutgoingDataTransfer(key, srcFile, getPlainPort(), getSslPort());
+ OutgoingDataTransfer transfer = new OutgoingDataTransfer(key, srcFile, getPlainPort(), getSslPort(), localImageData.imageVersionId);
downloads.put(key, transfer);
return transfer;
}
@@ -244,6 +244,28 @@ public class FileServer implements IncomingEvent {
public Status getStatus() {
return new Status();
}
+
+ /**
+ * Check whether the given imageVersionId refers to an active transfer.
+ */
+ public boolean isActiveTransfer(String baseId, String versionId) {
+ long now = System.currentTimeMillis();
+ if (versionId != null) {
+ for (OutgoingDataTransfer odt : downloads.values()) {
+ if (versionId != null && versionId.equals(odt.getVersionId()) && !odt.isComplete(now) && odt.isActive())
+ return true;
+ }
+ }
+ for (IncomingDataTransfer idt : uploads.values()) {
+ if (idt.isComplete(now) || !idt.isActive())
+ continue;
+ if (versionId != null && versionId.equals(idt.getVersionId()))
+ return true;
+ if (baseId != null && baseId.equals(idt.getBaseId()))
+ return true;
+ }
+ return false;
+ }
class Status {
public final int activeUploads;
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
index 5267cab4..02a5eb3d 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/IncomingDataTransfer.java
@@ -96,7 +96,7 @@ public class IncomingDataTransfer extends IncomingTransferBase {
public IncomingDataTransfer(ImagePublishData publishData, File tmpFile, TransferInformation transferInfo,
boolean repairUpload) throws FileNotFoundException {
- super(UUID.randomUUID().toString(), tmpFile, publishData.fileSize,
+ super(publishData.imageVersionId, tmpFile, publishData.fileSize,
ThriftUtil.unwrapByteBufferList(transferInfo.blockHashes), StorageChunkSource.instance);
ImageDetailsRead idr = new ImageDetailsRead();
idr.setCreateTime(publishData.createTime);
@@ -301,11 +301,15 @@ public class IncomingDataTransfer extends IncomingTransferBase {
return true;
}
- private String getVersionId() {
+ public String getVersionId() {
if (masterTransferInfo == null)
return getId();
return image.latestVersionId;
}
+
+ public String getBaseId() {
+ return image.imageBaseId;
+ }
@Override
public synchronized void cancel() {
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java
index 6ef4d4a6..37510270 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/OutgoingDataTransfer.java
@@ -20,6 +20,8 @@ public class OutgoingDataTransfer extends OutgoingTransferBase {
private static final Logger LOGGER = Logger.getLogger(OutgoingDataTransfer.class);
private final TransferInformation masterTransferInfo;
+
+ private final String versionId;
/**
* For downloads by clients.
@@ -27,9 +29,10 @@ public class OutgoingDataTransfer extends OutgoingTransferBase {
* @param uuid UUID of this transfer
* @param file file to send to client
*/
- public OutgoingDataTransfer(String uuid, File file, int plainPort, int sslPort) {
+ public OutgoingDataTransfer(String uuid, File file, int plainPort, int sslPort, String versionId) {
super(uuid, file, plainPort, sslPort);
this.masterTransferInfo = null;
+ this.versionId = versionId;
}
/**
@@ -39,9 +42,10 @@ public class OutgoingDataTransfer extends OutgoingTransferBase {
* upload
* @param absFile file to send to master server
*/
- public OutgoingDataTransfer(TransferInformation transferInfo, File absFile) {
+ public OutgoingDataTransfer(TransferInformation transferInfo, File absFile, String versionId) {
super(transferInfo.token, absFile, 0, 0);
this.masterTransferInfo = transferInfo;
+ this.versionId = versionId;
}
/**
@@ -99,4 +103,8 @@ public class OutgoingDataTransfer extends OutgoingTransferBase {
throw new RuntimeException("Not implemented");
}
+ public Object getVersionId() {
+ return versionId;
+ }
+
}
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
index e868ceaa..cf3bddf2 100644
--- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
+++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
@@ -49,7 +49,7 @@ public class SyncTransferHandler {
Thread.NORM_PRIORITY - 3));
/**
- * All currently running downloads from master, indexed by token
+ * All currently running downloads from master, indexed by token, which is == versionId
*/
private static final Map<String, IncomingDataTransfer> downloads = new ConcurrentHashMap<>();
@@ -105,7 +105,7 @@ public class SyncTransferHandler {
};
@Override
- public void fire() {
+ public synchronized void fire() {
if (uploadsByTransferId.isEmpty() && uploadsByVersionId.isEmpty() && downloads.isEmpty())
return; // Nothing to do anyways, don't wake up another thread
if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 2 && ++skips < 10)
@@ -182,12 +182,12 @@ public class SyncTransferHandler {
throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
"Communication with master server failed");
}
- OutgoingDataTransfer transfer = new OutgoingDataTransfer(transferInfo, absFile);
+ OutgoingDataTransfer transfer = new OutgoingDataTransfer(transferInfo, absFile, imgVersion.imageVersionId);
uploadsByVersionId.put(imgVersion.imageVersionId, transfer);
uploadsByTransferId.put(transfer.getId(), transfer);
- transfer.heartBeat(transferPool);
LOGGER.info("Client wants to upload image " + imgVersion.imageVersionId
+ ", created transfer " + transfer.getId());
+ heartBeatTask.fire();
return transfer.getId();
}
@@ -239,6 +239,7 @@ public class SyncTransferHandler {
IncomingDataTransfer transfer = new IncomingDataTransfer(image, tmpFile, transferInfo,
localImageData != null);
downloads.put(transfer.getId(), transfer);
+ heartBeatTask.fire();
return transfer.getId();
} catch (FileNotFoundException e) {
LOGGER.warn("Could not open " + tmpFile.getAbsolutePath());
@@ -308,5 +309,26 @@ public class SyncTransferHandler {
return null;
return downloads.get(downloadToken);
}
+
+ /**
+ * Check whether the given imageVersionId refers to an active transfer.
+ */
+ public static boolean isActiveTransfer(String baseId, String versionId) {
+ if (versionId != null) {
+ OutgoingDataTransfer odt = uploadsByVersionId.get(versionId);
+ if (odt != null && !odt.isComplete(System.currentTimeMillis()) && odt.isActive())
+ return true;
+ }
+ long now = System.currentTimeMillis();
+ for (IncomingDataTransfer idt : downloads.values()) {
+ if (idt.isComplete(now) || !idt.isActive())
+ continue;
+ if (versionId != null && versionId.equals(idt.getVersionId()))
+ return true;
+ if (baseId != null && baseId.equals(idt.getBaseId()))
+ return true;
+ }
+ return false;
+ }
}