diff options
author | Simon Rettberg | 2016-04-13 18:39:26 +0200 |
---|---|---|
committer | Simon Rettberg | 2016-04-13 18:39:26 +0200 |
commit | 5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622 (patch) | |
tree | 5bdc5411cd9954577e5489d5e4271c800d826e9c /dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java | |
parent | [client] fix bad commit (diff) | |
download | tutor-module-5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622.tar.gz tutor-module-5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622.tar.xz tutor-module-5a2b7a8a2f0a9ea5d01895b00f75beaa7af55622.zip |
(WiP) Global image sync
Diffstat (limited to 'dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java')
-rw-r--r-- | dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java | 115 |
1 files changed, 107 insertions, 8 deletions
diff --git a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java index f7a9de85..27e11185 100644 --- a/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java +++ b/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java @@ -2,7 +2,10 @@ package org.openslx.bwlp.sat.fileserv; import java.io.File; import java.io.FileNotFoundException; +import java.nio.ByteBuffer; +import java.sql.SQLException; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -10,17 +13,23 @@ import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.apache.thrift.TException; +import org.openslx.bwlp.sat.database.mappers.DbImage; +import org.openslx.bwlp.sat.database.mappers.DbUser; +import org.openslx.bwlp.sat.database.models.LocalImageVersion; import org.openslx.bwlp.sat.util.Constants; +import org.openslx.bwlp.sat.util.FileSystem; import org.openslx.bwlp.sat.util.Formatter; -import org.openslx.bwlp.sat.util.GrowingThreadPoolExecutor; import org.openslx.bwlp.sat.util.PrioThreadFactory; +import org.openslx.bwlp.thrift.iface.ImageDetailsRead; import org.openslx.bwlp.thrift.iface.ImagePublishData; import org.openslx.bwlp.thrift.iface.InvocationError; import org.openslx.bwlp.thrift.iface.TAuthorizationException; import org.openslx.bwlp.thrift.iface.TInvocationException; import org.openslx.bwlp.thrift.iface.TNotFoundException; +import org.openslx.bwlp.thrift.iface.TTransferRejectedException; import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.thrifthelper.ThriftManager; +import org.openslx.util.GrowingThreadPoolExecutor; import org.openslx.util.QuickTimer; import org.openslx.util.QuickTimer.Task; @@ -50,20 +59,31 @@ public class SyncTransferHandler { private final Runnable worker = new Runnable() { @Override public void run() { - for (IncomingDataTransfer download : downloads.values()) { + final long now = System.currentTimeMillis(); + for (Iterator<IncomingDataTransfer> it = downloads.values().iterator(); it.hasNext();) { + IncomingDataTransfer download = it.next(); if (download.isActive()) download.heartBeat(transferPool); + if (download.isComplete(now)) { + LOGGER.info("Download from master server complete"); + it.remove(); + } } - for (OutgoingDataTransfer upload : uploads.values()) { + for (Iterator<OutgoingDataTransfer> it = uploads.values().iterator(); it.hasNext();) { + OutgoingDataTransfer upload = it.next(); if (upload.isActive()) upload.heartBeat(transferPool); + if (upload.isComplete(now)) { + LOGGER.info("Upload to master server complete"); + it.remove(); + } } } }; @Override public void fire() { - if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 1) + if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 2) return; transferPool.execute(worker); } @@ -75,18 +95,77 @@ public class SyncTransferHandler { QuickTimer.scheduleAtFixedDelay(heartBeatTask, 123, TimeUnit.SECONDS.toMillis(56)); } - public synchronized static String requestImageDownload(ImagePublishData image) + public synchronized static String requestImageUpload(String userToken, LocalImageVersion img) + throws SQLException, TNotFoundException, TInvocationException, TAuthorizationException, + TTransferRejectedException { + TransferInformation transferInfo; + OutgoingDataTransfer existing = uploads.get(img.imageVersionId); + if (existing != null) + return existing.getId(); + File absFile = FileSystem.composeAbsoluteImagePath(img); + if (!absFile.isFile() || !absFile.canRead()) { + LOGGER.error("Cannot upload " + img.imageVersionId + ": file missing: " + + absFile.getAbsolutePath()); + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, "Source file not readable"); + } + if (absFile.length() != img.fileSize) { + LOGGER.error("Cannot upload" + img.imageVersionId + ": wrong file size - expected " + + img.fileSize + ", got " + absFile.length()); + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "File corrupted on satellite server"); + } + checkUploadCount(); + ImageDetailsRead details = DbImage.getImageDetails(null, img.imageBaseId); + List<ByteBuffer> blockHashes = DbImage.getBlockHashes(img.imageVersionId); + ImagePublishData publishData = new ImagePublishData(); + publishData.createTime = img.createTime; + publishData.description = details.description; + publishData.fileSize = img.fileSize; + publishData.imageBaseId = img.imageBaseId; + publishData.imageName = details.imageName; + publishData.imageVersionId = img.imageVersionId; + publishData.isTemplate = details.isTemplate; + publishData.osId = details.osId; + publishData.user = DbUser.getOrNull(img.uploaderId); + publishData.virtId = details.virtId; + try { + transferInfo = ThriftManager.getMasterClient().submitImage(userToken, publishData, blockHashes); + } catch (TAuthorizationException e) { + LOGGER.warn("Master server rejected our session on uploadImage", e); + throw e; + } catch (TInvocationException e) { + LOGGER.warn("Master server made a boo-boo on uploadImage", e); + throw e; + } catch (TTransferRejectedException e) { + LOGGER.warn("Master server rejected our upload request", e); + throw e; + } catch (TException e) { + LOGGER.warn("Unknown exception on uploadImage to master server", e); + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "Communication with master server failed"); + } + OutgoingDataTransfer transfer = new OutgoingDataTransfer(transferInfo, absFile); + uploads.put(transfer.getId(), transfer); + transfer.heartBeat(transferPool); + return transfer.getId(); + } + + public synchronized static String requestImageDownload(String userToken, ImagePublishData image) throws TInvocationException, TAuthorizationException, TNotFoundException { TransferInformation transferInfo; // Already replicating this one? - if (downloads.containsKey(image.imageVersionId)) - return image.imageVersionId; + IncomingDataTransfer existing = downloads.get(image.imageVersionId); + if (existing != null) + return existing.getId(); checkDownloadCount(); try { - transferInfo = ThriftManager.getMasterClient().downloadImage(null, image.imageVersionId); + transferInfo = ThriftManager.getMasterClient().downloadImage(userToken, image.imageVersionId); } catch (TAuthorizationException e) { LOGGER.warn("Master server rejected our session on downloadImage", e); throw e; + } catch (TInvocationException e) { + LOGGER.warn("Master server made a boo-boo on downloadImage", e); + throw e; } catch (TNotFoundException e) { LOGGER.warn("Master server couldn't find image on downloadImage", e); throw e; @@ -131,6 +210,26 @@ public class SyncTransferHandler { } } + private static void checkUploadCount() throws TInvocationException { + Iterator<OutgoingDataTransfer> it = uploads.values().iterator(); + final long now = System.currentTimeMillis(); + int activeUploads = 0; + while (it.hasNext()) { + OutgoingDataTransfer download = it.next(); + if (download.isComplete(now) || download.hasReachedIdleTimeout(now)) { + download.cancel(); + it.remove(); + continue; + } + activeUploads++; + } + if (activeUploads > Constants.MAX_MASTER_UPLOADS) { + throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR, + "Server busy. Too many running uploads (" + activeUploads + "/" + + Constants.MAX_MASTER_UPLOADS + ")."); + } + } + /** * Get an upload instance by given token. * |