blob: 910d0056c21a0ef55732cf0663854d0fc9a7fb70 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
package org.openslx.dozmod.filetransfer;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.openslx.bwlp.thrift.iface.TInvalidTokenException;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.bwlp.thrift.iface.TransferStatus;
import org.openslx.thrifthelper.ThriftManager;
import org.openslx.util.QuickTimer;
import org.openslx.util.QuickTimer.Task;
/**
* Used to "watch" a transfer between a satellite server and the master server.
* We simply query the status of the upload from the master server.
*/
public class PassiveTransfer implements TransferEventEmitter {
private static final Logger LOGGER = Logger.getLogger(PassiveTransfer.class);
private static final long THRIFT_INTERVAL_MS = 2000;
/**
* List of listeners that want to get status updates about the transfer.
*/
private final List<TransferEventListener> listeners = new ArrayList<>();
private final String uploadToken;
private final boolean queryMaster;
private long lastThriftUpdate = 0;
private boolean isCancelled = false;
public PassiveTransfer(String token, boolean queryMaster) {
this.uploadToken = token;
this.queryMaster = queryMaster;
QuickTimer.scheduleAtFixedDelay(new Task() {
@Override
public void fire() {
update();
if (isCancelled) {
this.cancel();
}
}
}, 1, THRIFT_INTERVAL_MS + 1);
}
@Override
public void addListener(TransferEventListener listener) {
synchronized (listeners) {
listeners.add(listener);
}
}
@Override
public void removeListener(TransferEventListener listener) {
synchronized (listeners) {
while (listeners.remove(listener)) {
}
}
}
@Override
public boolean isCanceled() {
return isCancelled;
}
private void update() {
final long now = System.currentTimeMillis();
if (lastThriftUpdate + THRIFT_INTERVAL_MS > now)
return;
TransferState state = null;
byte[] blocks = null;
String error = null;
lastThriftUpdate = now;
try {
TransferStatus uploadStatus;
if (queryMaster) {
uploadStatus = ThriftManager.getMasterClient().queryUploadStatus(uploadToken);
} else {
uploadStatus = ThriftManager.getSatClient().queryUploadStatus(uploadToken);
}
state = uploadStatus.getState();
blocks = uploadStatus.getBlockStatus();
} catch (TInvalidTokenException e) {
error = "Upload token unknown!?";
state = TransferState.ERROR;
} catch (TException e) {
error = "Exception quering upload status: " + e.toString();
}
if (error != null) {
error = queryMaster ? "Master: " : "Satellite: " + error;
}
TransferEvent event = new TransferEvent(state, blocks, 0, 0, 0, error);
isCancelled = state == null || state == TransferState.ERROR || state == TransferState.FINISHED;
synchronized (listeners) {
for (int i = listeners.size() - 1; i >= 0; --i) {
listeners.get(i).update(event);
}
}
}
}
|