package org.openslx.bwlp.sat.fileserv;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.openslx.bwlp.sat.database.models.LocalImageVersion;
import org.openslx.bwlp.sat.util.Configuration;
import org.openslx.bwlp.sat.util.Constants;
import org.openslx.bwlp.sat.util.Formatter;
import org.openslx.bwlp.thrift.iface.ImageDetailsRead;
import org.openslx.bwlp.thrift.iface.TTransferRejectedException;
import org.openslx.bwlp.thrift.iface.UserInfo;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.IncomingEvent;
import org.openslx.filetransfer.Listener;
import org.openslx.filetransfer.Uploader;
public class FileServer implements IncomingEvent {
private static final Logger LOGGER = Logger.getLogger(FileServer.class);
/**
* Listener for incoming unencrypted connections
*/
private final Listener plainListener = new Listener(this, null, 9092, Constants.TRANSFER_TIMEOUT); // TODO: Config
private final ThreadPoolExecutor transferPool = new ThreadPoolExecutor(2, Constants.MAX_UPLOADS
+ Constants.MAX_DOWNLOADS, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1));
/**
* All currently running uploads, indexed by token
*/
private final Map<String, ActiveUpload> uploads = new ConcurrentHashMap<>();
/**
* All currently running downloads, indexed by token
*/
private final Map<String, ActiveDownload> downloads = new ConcurrentHashMap<>();
private static final FileServer globalInstance = new FileServer();
private FileServer() {
}
public static FileServer instance() {
return globalInstance;
}
public boolean start() {
boolean ret = plainListener.start();
// TODO: Start SSL listener too
return ret;
}
@Override
public void incomingDownloadRequest(Uploader uploader) throws IOException {
String token = uploader.getToken();
LOGGER.info("Incoming filetransfer with token " + token);
ActiveDownload download = downloads.get(token);
if (download == null) {
LOGGER.warn("Unknown token " + token);
uploader.cancel();
return;
}
if (!download.addConnection(uploader, transferPool)) {
uploader.cancel();
}
}
@Override
public void incomingUploadRequest(Downloader downloader) throws IOException {
String token = downloader.getToken();
LOGGER.info("Incoming filetransfer with token " + token);
ActiveUpload upload = uploads.get(token);
if (upload == null) {
LOGGER.warn("Unknown token " + token);
downloader.cancel();
return;
}
if (!upload.addConnection(downloader, transferPool)) {
downloader.cancel();
}
}
/**
* Get an upload instance by given token.
*
* @param uploadToken
* @return
*/
public ActiveUpload getUploadByToken(String uploadToken) {
if (uploadToken == null)
return null;
return uploads.get(uploadToken);
}
public ActiveUpload createNewUserUpload(UserInfo owner, ImageDetailsRead image, long fileSize,
List<ByteBuffer> sha1Sums, byte[] machineDescription) throws TTransferRejectedException {
Iterator<ActiveUpload> it = uploads.values().iterator();
int activeUploads = 0;
while (it.hasNext()) {
ActiveUpload upload = it.next();
if (upload.isComplete()) {
// TODO: Check age (short timeout) and remove
continue;
} else {
// Check age (long timeout) and remove
}
activeUploads++;
}
if (activeUploads > Constants.MAX_UPLOADS)
throw new TTransferRejectedException("Server busy. Too many running uploads.");
File destinationFile = null;
do {
destinationFile = Formatter.getTempImageName();
} while (destinationFile.exists());
destinationFile.getParentFile().mkdirs();
String key = UUID.randomUUID().toString();
ActiveUpload upload;
try {
upload = new ActiveUpload(key, owner, image, destinationFile, fileSize, sha1Sums,
machineDescription);
} catch (FileNotFoundException e) {
LOGGER.error("Could not open destination file for writing", e);
throw new TTransferRejectedException("Destination file not writable!");
}
uploads.put(key, upload);
return upload;
}
public int getPlainPort() {
return plainListener.getPort();
}
public int getSslPort() {
return 0; // TODO
}
public ActiveDownload createNewUserDownload(LocalImageVersion localImageData)
throws TTransferRejectedException {
Iterator<ActiveDownload> it = downloads.values().iterator();
int activeDownloads = 0;
while (it.hasNext()) {
ActiveDownload download = it.next();
if (download.isComplete()) {
// TODO: Check age (short timeout) and remove
continue;
} else {
// Check age (long timeout) and remove
}
activeDownloads++;
}
if (activeDownloads > Constants.MAX_DOWNLOADS)
throw new TTransferRejectedException("Server busy. Too many running downloads.");
// Determine src file and go
File srcFile = new File(Configuration.getVmStoreBasePath(), localImageData.filePath);
if (!srcFile.canRead()) {
LOGGER.warn("Rejecting download of VID " + localImageData.imageVersionId + ": Missing "
+ srcFile.getPath());
// TODO: Mark as invalid in DB
throw new TTransferRejectedException("File missing on server");
}
String key = UUID.randomUUID().toString();
ActiveDownload transfer = new ActiveDownload(key, srcFile);
downloads.put(key, transfer);
return transfer;
}
}