package org.openslx.dozmod.filetransfer; import java.util.ArrayList; import java.util.List; import org.apache.log4j.Logger; import org.apache.thrift.TException; import org.openslx.bwlp.thrift.iface.TInvalidTokenException; import org.openslx.bwlp.thrift.iface.TransferState; import org.openslx.bwlp.thrift.iface.TransferStatus; import org.openslx.thrifthelper.ThriftManager; import org.openslx.util.QuickTimer; import org.openslx.util.QuickTimer.Task; /** * Used to "watch" a transfer between a satellite server and the master server. * We simply query the status of the upload from the master server. */ public class PassiveTransfer implements TransferEventEmitter { private static final Logger LOGGER = Logger.getLogger(PassiveTransfer.class); private static final long THRIFT_INTERVAL_MS = 2000; /** * List of listeners that want to get status updates about the transfer. */ private final List listeners = new ArrayList<>(); private final String uploadToken; private final boolean queryMaster; private long lastThriftUpdate = 0; private boolean isCancelled = false; public PassiveTransfer(String token, boolean queryMaster) { this.uploadToken = token; this.queryMaster = queryMaster; QuickTimer.scheduleAtFixedDelay(new Task() { @Override public void fire() { update(); if (isCancelled) { this.cancel(); } } }, 1, THRIFT_INTERVAL_MS + 1); } @Override public void addListener(TransferEventListener listener) { synchronized (listeners) { listeners.add(listener); } } @Override public void removeListener(TransferEventListener listener) { synchronized (listeners) { while (listeners.remove(listener)) { } } } @Override public boolean isCanceled() { return isCancelled; } private void update() { final long now = System.currentTimeMillis(); if (lastThriftUpdate + THRIFT_INTERVAL_MS > now) return; TransferState state = null; byte[] blocks = null; String error = null; lastThriftUpdate = now; try { TransferStatus uploadStatus; if (queryMaster) { uploadStatus = ThriftManager.getMasterClient().queryUploadStatus(uploadToken); } else { uploadStatus = ThriftManager.getSatClient().queryUploadStatus(uploadToken); } state = uploadStatus.getState(); blocks = uploadStatus.getBlockStatus(); } catch (TInvalidTokenException e) { error = "Upload token unknown!?"; state = TransferState.ERROR; } catch (TException e) { error = "Exception quering upload status: " + e.toString(); } if (error != null) { error = queryMaster ? "Master: " : "Satellite: " + error; } TransferEvent event = new TransferEvent(state, blocks, 0, 0, 0, error); isCancelled = state == null || state == TransferState.ERROR || state == TransferState.FINISHED; synchronized (listeners) { for (int i = listeners.size() - 1; i >= 0; --i) { listeners.get(i).update(event); } } } }