summaryrefslogblamecommitdiffstats
path: root/dozentenmodulserver/src/main/java/org/openslx/bwlp/sat/fileserv/SyncTransferHandler.java
blob: f7a9de857885916a62150626b3be114a99a34fee (plain) (tree)
1
2
3
4
5
6
7
8







                                               





                                           
                                                           
                                                   
















                                                                                         
                                                                                                      
                                                                                                           

                                                                                                        































































































                                                                                                                 

















                                                                                     
 
package org.openslx.bwlp.sat.fileserv;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.openslx.bwlp.sat.util.Constants;
import org.openslx.bwlp.sat.util.Formatter;
import org.openslx.bwlp.sat.util.GrowingThreadPoolExecutor;
import org.openslx.bwlp.sat.util.PrioThreadFactory;
import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.InvocationError;
import org.openslx.bwlp.thrift.iface.TAuthorizationException;
import org.openslx.bwlp.thrift.iface.TInvocationException;
import org.openslx.bwlp.thrift.iface.TNotFoundException;
import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.thrifthelper.ThriftManager;
import org.openslx.util.QuickTimer;
import org.openslx.util.QuickTimer.Task;

/**
 * Manages file transfers between this satellite and the master server.
 */
public class SyncTransferHandler {

	private static final Logger LOGGER = Logger.getLogger(SyncTransferHandler.class);

	private static final GrowingThreadPoolExecutor transferPool = new GrowingThreadPoolExecutor(1,
			Constants.MAX_MASTER_UPLOADS + Constants.MAX_MASTER_DOWNLOADS, 1, TimeUnit.MINUTES,
			new ArrayBlockingQueue<Runnable>(1), new PrioThreadFactory("MasterTransferPool",
					Thread.NORM_PRIORITY - 3));

	/**
	 * All currently running uploads, indexed by token
	 */
	private static final Map<String, IncomingDataTransfer> downloads = new ConcurrentHashMap<>();

	/**
	 * All currently running downloads, indexed by token
	 */
	private static final Map<String, OutgoingDataTransfer> uploads = new ConcurrentHashMap<>();

	private static Task heartBeatTask = new Task() {
		private final Runnable worker = new Runnable() {
			@Override
			public void run() {
				for (IncomingDataTransfer download : downloads.values()) {
					if (download.isActive())
						download.heartBeat(transferPool);
				}
				for (OutgoingDataTransfer upload : uploads.values()) {
					if (upload.isActive())
						upload.heartBeat(transferPool);
				}
			}
		};

		@Override
		public void fire() {
			if (transferPool.getMaximumPoolSize() - transferPool.getActiveCount() < 1)
				return;
			transferPool.execute(worker);
		}
	};

	//

	static {
		QuickTimer.scheduleAtFixedDelay(heartBeatTask, 123, TimeUnit.SECONDS.toMillis(56));
	}

	public synchronized static String requestImageDownload(ImagePublishData image)
			throws TInvocationException, TAuthorizationException, TNotFoundException {
		TransferInformation transferInfo;
		// Already replicating this one?
		if (downloads.containsKey(image.imageVersionId))
			return image.imageVersionId;
		checkDownloadCount();
		try {
			transferInfo = ThriftManager.getMasterClient().downloadImage(null, image.imageVersionId);
		} catch (TAuthorizationException e) {
			LOGGER.warn("Master server rejected our session on downloadImage", e);
			throw e;
		} catch (TNotFoundException e) {
			LOGGER.warn("Master server couldn't find image on downloadImage", e);
			throw e;
		} catch (TException e) {
			LOGGER.warn("Master server made a boo-boo on downloadImage", e);
			throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
					"Communication with master server failed");
		}
		File tmpFile = null;
		do {
			tmpFile = Formatter.getTempImageName();
		} while (tmpFile.exists());
		tmpFile.getParentFile().mkdirs();
		try {
			IncomingDataTransfer transfer = new IncomingDataTransfer(image, tmpFile, transferInfo);
			downloads.put(transfer.getId(), transfer);
			return transfer.getId();
		} catch (FileNotFoundException e) {
			LOGGER.warn("Could not open " + tmpFile.getAbsolutePath());
			throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
					"Could not access local file for writing");
		}
	}

	private static void checkDownloadCount() throws TInvocationException {
		Iterator<IncomingDataTransfer> it = downloads.values().iterator();
		final long now = System.currentTimeMillis();
		int activeDownloads = 0;
		while (it.hasNext()) {
			IncomingDataTransfer upload = it.next();
			if (upload.isComplete(now) || upload.hasReachedIdleTimeout(now)) {
				upload.cancel();
				it.remove();
				continue;
			}
			activeDownloads++;
		}
		if (activeDownloads > Constants.MAX_MASTER_DOWNLOADS) {
			throw new TInvocationException(InvocationError.INTERNAL_SERVER_ERROR,
					"Server busy. Too many running downloads (" + activeDownloads + "/"
							+ Constants.MAX_MASTER_DOWNLOADS + ").");
		}
	}

	/**
	 * Get an upload instance by given token.
	 * 
	 * @param uploadToken
	 * @return
	 */
	public static OutgoingDataTransfer getUploadByToken(String uploadToken) {
		if (uploadToken == null)
			return null;
		return uploads.get(uploadToken);
	}

	public static IncomingDataTransfer getDownloadByToken(String downloadToken) {
		if (downloadToken == null)
			return null;
		return downloads.get(downloadToken);
	}

}