summaryrefslogtreecommitdiffstats
path: root/dozentenmodul/src/main/java/org/openslx/dozmod/filetransfer/PassiveTransfer.java
blob: 910d0056c21a0ef55732cf0663854d0fc9a7fb70 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package org.openslx.dozmod.filetransfer;

import java.util.ArrayList;
import java.util.List;

import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.openslx.bwlp.thrift.iface.TInvalidTokenException;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.bwlp.thrift.iface.TransferStatus;
import org.openslx.thrifthelper.ThriftManager;
import org.openslx.util.QuickTimer;
import org.openslx.util.QuickTimer.Task;

/**
 * Used to "watch" a transfer between a satellite server and the master server.
 * We simply query the status of the upload from the master server.
 */
public class PassiveTransfer implements TransferEventEmitter {

	private static final Logger LOGGER = Logger.getLogger(PassiveTransfer.class);

	private static final long THRIFT_INTERVAL_MS = 2000;

	/**
	 * List of listeners that want to get status updates about the transfer.
	 */
	private final List<TransferEventListener> listeners = new ArrayList<>();

	private final String uploadToken;

	private final boolean queryMaster;

	private long lastThriftUpdate = 0;

	private boolean isCancelled = false;

	public PassiveTransfer(String token, boolean queryMaster) {
		this.uploadToken = token;
		this.queryMaster = queryMaster;
		QuickTimer.scheduleAtFixedDelay(new Task() {
			@Override
			public void fire() {
				update();
				if (isCancelled) {
					this.cancel();
				}
			}
		}, 1, THRIFT_INTERVAL_MS + 1);
	}

	@Override
	public void addListener(TransferEventListener listener) {
		synchronized (listeners) {
			listeners.add(listener);
		}
	}

	@Override
	public void removeListener(TransferEventListener listener) {
		synchronized (listeners) {
			while (listeners.remove(listener)) {
			}
		}
	}

	@Override
	public boolean isCanceled() {
		return isCancelled;
	}

	private void update() {
		final long now = System.currentTimeMillis();
		if (lastThriftUpdate + THRIFT_INTERVAL_MS > now)
			return;
		TransferState state = null;
		byte[] blocks = null;
		String error = null;
		lastThriftUpdate = now;
		try {
			TransferStatus uploadStatus;
			if (queryMaster) {
				uploadStatus = ThriftManager.getMasterClient().queryUploadStatus(uploadToken);
			} else {
				uploadStatus = ThriftManager.getSatClient().queryUploadStatus(uploadToken);
			}
			state = uploadStatus.getState();
			blocks = uploadStatus.getBlockStatus();
		} catch (TInvalidTokenException e) {
			error = "Upload token unknown!?";
			state = TransferState.ERROR;
		} catch (TException e) {
			error = "Exception quering upload status: " + e.toString();
		}
		if (error != null) {
			error = queryMaster ? "Master: " : "Satellite: " + error;
		}
		TransferEvent event = new TransferEvent(state, blocks, 0, 0, 0, error);
		isCancelled = state == null || state == TransferState.ERROR || state == TransferState.FINISHED;
		synchronized (listeners) {
			for (int i = listeners.size() - 1; i >= 0; --i) {
				listeners.get(i).update(event);
			}
		}
	}

}