package org.openslx.bwlp.sat.fileserv; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.sql.SQLException; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; import org.apache.log4j.Logger; import org.openslx.bwlp.sat.database.mappers.DbImage; import org.openslx.bwlp.sat.database.models.LocalImageVersion; import org.openslx.bwlp.sat.util.Constants; import org.openslx.bwlp.sat.util.FileSystem; import org.openslx.bwlp.sat.util.Formatter; import org.openslx.bwlp.sat.util.Identity; import org.openslx.bwlp.thrift.iface.ImageDetailsRead; import org.openslx.bwlp.thrift.iface.TTransferRejectedException; import org.openslx.bwlp.thrift.iface.UserInfo; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.IncomingEvent; import org.openslx.filetransfer.Listener; import org.openslx.filetransfer.Uploader; import org.openslx.thrifthelper.Comparators; import org.openslx.util.GrowingThreadPoolExecutor; import org.openslx.util.PrioThreadFactory; public class FileServer implements IncomingEvent { private static final Logger LOGGER = Logger.getLogger(FileServer.class); /** * Listener for incoming unencrypted connections */ private final Listener plainListener = new Listener(this, null, 9092, Constants.TRANSFER_TIMEOUT); // TODO: Config private final Listener sslListener; private final ExecutorService transferPool = new GrowingThreadPoolExecutor(1, Constants.MAX_UPLOADS + Constants.MAX_DOWNLOADS, 1, TimeUnit.MINUTES, new SynchronousQueue(), new PrioThreadFactory("ClientTransferPool", Thread.NORM_PRIORITY - 2)); /** * All currently running uploads, indexed by token */ private final Map uploads = new ConcurrentHashMap<>(); /** * All currently running downloads, indexed by token */ private final Map downloads = new ConcurrentHashMap<>(); private static final FileServer globalInstance = new FileServer(); private FileServer() { SSLContext ctx = Identity.getSSLContext(); sslListener = ctx == null ? null : new Listener(this, ctx, 9093, Constants.TRANSFER_TIMEOUT); LOGGER.info("Max allowed concurrent uploads from clients: " + Constants.MAX_UPLOADS); LOGGER.info("Max allowed concurrent downloads from clients: " + Constants.MAX_DOWNLOADS); LOGGER.info("Max allowed connections per transfer: " + Constants.MAX_CONNECTIONS_PER_TRANSFER); } public static FileServer instance() { return globalInstance; } public boolean start() { boolean ret = plainListener.start(); if (sslListener != null) { ret |= sslListener.start(); } return ret; } @Override public void incomingDownloadRequest(Uploader uploader) throws IOException { String token = uploader.getToken(); OutgoingDataTransfer download = downloads.get(token); if (download == null) { LOGGER.warn("Download request: Unknown token " + token); uploader.cancel(); return; } if (!download.addConnection(uploader, transferPool)) { uploader.cancel(); } } @Override public void incomingUploadRequest(Downloader downloader) throws IOException { String token = downloader.getToken(); IncomingDataTransfer upload = uploads.get(token); if (upload == null) { LOGGER.warn("Upload request: Unknown token " + token); downloader.cancel(); return; } if (!upload.addConnection(downloader, transferPool)) { if (upload.getErrorMessage() != null) { downloader.sendErrorCode(upload.getErrorMessage()); } downloader.cancel(); } } /** * Get an upload instance by given token. * * @param uploadToken * @return */ public IncomingDataTransfer getUploadByToken(String uploadToken) { if (uploadToken == null) return null; return uploads.get(uploadToken); } public OutgoingDataTransfer getDownloadByToken(String downloadToken) { if (downloadToken == null) return null; return downloads.get(downloadToken); } public IncomingDataTransfer createNewUserUpload(UserInfo owner, ImageDetailsRead image, long fileSize, List sha1Sums, byte[] machineDescription) throws TTransferRejectedException { Iterator it = uploads.values().iterator(); final long now = System.currentTimeMillis(); int activeUploads = 0; int activeUserUploads = 0; while (it.hasNext()) { IncomingDataTransfer upload = it.next(); if (upload.isComplete(now) || upload.hasReachedIdleTimeout(now)) { upload.cancel(); it.remove(); continue; } if (upload.countsTowardsConnectionLimit(now)) { if (upload.getOwner() != null && Comparators.user.compare(owner, upload.getOwner()) == 0) { activeUserUploads += 1; } activeUploads += 1; } } if (activeUploads >= Constants.MAX_UPLOADS || activeUserUploads > Constants.MAX_UPLOADS_PER_USER) { throw new TTransferRejectedException("Server busy. Too many running uploads (User: " + activeUserUploads + "/" + Constants.MAX_UPLOADS_PER_USER + "; Total: " + activeUploads + "/" + Constants.MAX_UPLOADS + ")."); } File destinationFile = null; do { destinationFile = Formatter.getTempImageName(); } while (destinationFile.exists()); destinationFile.getParentFile().mkdirs(); String key = UUID.randomUUID().toString(); IncomingDataTransfer upload; try { upload = new IncomingDataTransfer(key, owner, image, destinationFile, fileSize, sha1Sums, machineDescription, false); } catch (FileNotFoundException e) { LOGGER.error("Could not open destination file for writing", e); throw new TTransferRejectedException("Destination file not writable!"); } uploads.put(key, upload); return upload; } public int getPlainPort() { if (plainListener == null) return 0; return plainListener.getPort(); } public int getSslPort() { if (sslListener == null) return 0; return sslListener.getPort(); } public OutgoingDataTransfer createNewUserDownload(LocalImageVersion localImageData) throws TTransferRejectedException { Iterator it = downloads.values().iterator(); final long now = System.currentTimeMillis(); int activeDownloads = 0; while (it.hasNext()) { OutgoingDataTransfer download = it.next(); if (download.isComplete(now) || download.hasReachedIdleTimeout(now)) { download.cancel(); it.remove(); continue; } if (download.countsTowardsConnectionLimit(now)) { activeDownloads += 1; } } if (activeDownloads >= Constants.MAX_DOWNLOADS) { throw new TTransferRejectedException("Server busy. Too many running uploads (" + activeDownloads + "/" + Constants.MAX_UPLOADS + ")."); } // Determine src file and go File srcFile = FileSystem.composeAbsoluteImagePath(localImageData); String errorMessage = null; if (srcFile == null) { LOGGER.warn("Rejecting download of VID " + localImageData.imageVersionId + ": Invalid local relative path"); errorMessage = "File has invalid path on server"; } else { if (!srcFile.canRead()) { LOGGER.warn("Rejecting download of VID " + localImageData.imageVersionId + ": Missing " + srcFile.getPath()); errorMessage = "File missing on server"; } if (srcFile.length() != localImageData.fileSize) { LOGGER.warn("Rejecting download of VID " + localImageData.imageVersionId + ": Size mismatch for " + srcFile.getPath() + " (expected " + localImageData.fileSize + ", is " + srcFile.length() + ")"); errorMessage = "File corrupted on server"; } } if (errorMessage != null) { if (localImageData.isValid) { try { DbImage.markValid(false, true, localImageData); } catch (SQLException e) { } } throw new TTransferRejectedException(errorMessage); } String key = UUID.randomUUID().toString(); OutgoingDataTransfer transfer = new OutgoingDataTransfer(key, srcFile, getPlainPort(), getSslPort(), localImageData.imageVersionId); downloads.put(key, transfer); return transfer; } public Status getStatus() { return new Status(); } /** * Check whether the given imageVersionId refers to an active transfer. */ public boolean isActiveTransfer(String baseId, String versionId) { long now = System.currentTimeMillis(); if (versionId != null) { for (OutgoingDataTransfer odt : downloads.values()) { if (versionId != null && versionId.equals(odt.getVersionId()) && !odt.isComplete(now) && odt.isActive()) return true; } } for (IncomingDataTransfer idt : uploads.values()) { if (idt.isComplete(now) || !idt.isActive()) continue; if (versionId != null && versionId.equals(idt.getVersionId())) return true; if (baseId != null && baseId.equals(idt.getBaseId())) return true; } return false; } class Status { public final int activeUploads; public final int activeDownloads; private Status() { long now = System.currentTimeMillis(); int d = 0, u = 0; for (OutgoingDataTransfer t : downloads.values()) { if (t.countsTowardsConnectionLimit(now)) { d += 1; } } for (IncomingDataTransfer t : uploads.values()) { if (t.countsTowardsConnectionLimit(now)) { u += 1; } } this.activeDownloads = d; this.activeUploads = u; } } }