|
|
package org.openslx.bwlp.sat.fileserv;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.openslx.bwlp.sat.util.Constants;
import org.openslx.bwlp.sat.util.Formatter;
import org.openslx.bwlp.sat.util.GrowingThreadPoolExecutor;
import org.openslx.bwlp.sat.util.PrioThreadFactory;
import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.InvocationError;
import org.openslx.bwlp.thrift.iface.TAuthorizationException;
import org.openslx.bwlp.thrift.iface.TInvocationException;
import org.openslx.bwlp.thrift.iface.TNotFoundException;
import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.thrifthelper.ThriftManager;
import org.openslx.util.QuickTimer;
import org.openslx.util.QuickTimer.Task;
/**
* Manages file transfers between this satellite and the master server.
*/
public class SyncTransferHandler {
private static final Logger LOGGER = Logger.getLogger(SyncTransferHandler.class);
private static final GrowingThreadPoolExecutor transferPool = new GrowingThreadPoolExecutor(1,
Constants.MAX_MASTER_UPLOADS + Constants.MAX_MASTER_DOWNLOADS, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(1), new PrioThreadFactory("MasterTransferPool",
Thread.NORM_PRIORITY - 3));
/**
* All currently running uploads, indexed by token
*/
private static final Map<String, IncomingDataTransfer> downloads = new ConcurrentHashMap<>();
/**
* All currently running downloads, indexed by token
*/
private static final Map<String, OutgoingDataTransfer> uploads = new ConcurrentHashMap<>();
private static Task heartBeatTask = new Task() {
private final Runnable worker = new Runnable() {
@Override
public void run() {
for (IncomingDataTransfer download : downloads.values()) {
if (download.isActive())
download.heartBeat(transferPool);
}
for (OutgoingDataTransfer upload : uploads.values()) {
if (upload.isActive())
upload.heartBeat(transferPool);
}
}
};
@Override
public void fire() {
if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 1)
return;
transferPool.execute(worker);
}
};
//
static {
QuickTimer.scheduleAtFixedDelay(heartBeatTask, 123, TimeUnit.SECONDS.toMillis(56));
}
public synchronized static String requestImageDownload(ImagePublishData image)
throws TInvocationException, TAuthorizationException, TNotFoundException {
TransferInformation transferInfo;
// Already replicating this one?
if (downloads.containsKey(image.imageVersionId))
return image.imageVersionId;
checkDownloadCount();
try {
transferInfo = ThriftManager.getMasterClient().downloadImage(null, image.imageVersionId);
} catch (TAuthorizationException e) {
LOGGER.warn("Master server rejected our session on downloadImage", e);
throw e;
} catch (TNotFoundException e) {
LOGGER.warn("Master server couldn't find image on downloadImage", e);
throw e;
} catch (TException e) {
LOGGER.warn("Master server made a boo-boo on downloadImage", e);
throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
"Communication with master server failed");
}
File tmpFile = null;
do {
tmpFile = Formatter.getTempImageName();
} while (tmpFile.exists());
tmpFile.getParentFile().mkdirs();
try {
IncomingDataTransfer transfer = new IncomingDataTransfer(image, tmpFile, transferInfo);
downloads.put(transfer.getId(), transfer);
return transfer.getId();
} catch (FileNotFoundException e) {
LOGGER.warn("Could not open " + tmpFile.getAbsolutePath());
throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
"Could not access local file for writing");
}
}
private static void checkDownloadCount() throws TInvocationException {
Iterator<IncomingDataTransfer> it = downloads.values().iterator();
final long now = System.currentTimeMillis();
int activeDownloads = 0;
while (it.hasNext()) {
IncomingDataTransfer upload = it.next();
if (upload.isComplete(now) || upload.hasReachedIdleTimeout(now)) {
upload.cancel();
it.remove();
continue;
}
activeDownloads++;
}
if (activeDownloads > Constants.MAX_MASTER_DOWNLOADS) {
throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
"Server busy. Too many running downloads (" + activeDownloads + "/"
+ Constants.MAX_MASTER_DOWNLOADS + ").");
}
}
/**
* Get an upload instance by given token.
*
* @param uploadToken
* @return
*/
public static OutgoingDataTransfer getUploadByToken(String uploadToken) {
if (uploadToken == null)
return null;
return uploads.get(uploadToken);
}
public static IncomingDataTransfer getDownloadByToken(String downloadToken) {
if (downloadToken == null)
return null;
return downloads.get(downloadToken);
}
}
|