1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
package fileserv;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.log4j.Logger;
import org.openslx.filetransfer.DataReceivedCallback;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.FileRange;
import org.openslx.filetransfer.WantRangeCallback;
import org.openslx.imagemaster.thrift.iface.UserInfo;
public class ActiveUpload {
private static final Logger LOGGER = Logger.getLogger(ActiveUpload.class);
/**
* This is an active upload, so on our end, we have a Downloader.
*/
private Downloader download = null;
private final File destinationFile;
private final RandomAccessFile outFile;
private final ChunkList chunks;
private final long fileSize;
/**
* User owning this uploaded file.
*/
private final UserInfo owner;
// TODO: Use HashList for verification
public ActiveUpload(UserInfo owner, File destinationFile, long fileSize, List<ByteBuffer> sha1Sums)
throws FileNotFoundException {
this.destinationFile = destinationFile;
this.outFile = new RandomAccessFile(destinationFile, "rw");
this.chunks = new ChunkList(fileSize, sha1Sums);
this.owner = owner;
this.fileSize = fileSize;
}
/**
* Add another connection for this file transfer. Currently only one
* connection is allowed, but this might change in the future.
*
* @param connection
* @return true if the connection is accepted, false if it should be
* discarded
*/
public synchronized boolean addConnection(Downloader connection, ThreadPoolExecutor pool) {
if (download != null || chunks.isComplete())
return false;
download = connection;
pool.execute(new Runnable() {
@Override
public void run() {
CbHandler cbh = new CbHandler();
if (!download.download(cbh, cbh) && cbh.currentChunk != null) {
// If the download failed and we have a current chunk, put it back into
// the queue, so it will be handled again later...
chunks.markFailed(cbh.currentChunk);
}
}
});
return true;
}
/**
* Write some data to the local file. Thread safe so we could
* have multiple concurrent connections later.
*
* @param fileOffset
* @param dataLength
* @param data
* @return
*/
private boolean writeFileData(long fileOffset, int dataLength, byte[] data) {
synchronized (outFile) {
try {
outFile.seek(fileOffset);
outFile.write(data, 0, dataLength);
} catch (IOException e) {
LOGGER.error("Cannot write to '" + destinationFile
+ "'. Disk full, network storage error, bad permissions, ...?", e);
return false;
}
}
return true;
}
/**
* Get user owning this upload. Can be null in special cases.
*
* @return instance of UserInfo for the according user.
*/
public UserInfo getOwner() {
return this.owner;
}
public boolean isComplete() {
return chunks.isComplete() && destinationFile.length() == this.fileSize;
}
public File getDestinationFile() {
return this.destinationFile;
}
public long getSize() {
return this.fileSize;
}
/**
* Callback class for an instance of the Downloader, which supplies
* the Downloader with wanted file ranges, and handles incoming data.
*/
private class CbHandler implements WantRangeCallback, DataReceivedCallback {
/**
* The current chunk being transfered.
*/
public FileChunk currentChunk = null;
@Override
public boolean dataReceived(long fileOffset, int dataLength, byte[] data) {
// TODO: Maybe cache in RAM and write full CHUNK_SIZE blocks at a time?
// Would probably help with slower storage, especially if it's using
// rotating disks and we're running multiple uploads.
// Also we wouldn't have to re-read a block form disk for sha1 checking.
return writeFileData(fileOffset, dataLength, data);
}
@Override
public FileRange get() {
if (currentChunk != null) {
// TODO: A chunk was requested before, check hash and requeue if not matching
// This needs to be async (own thread) so will be a little complicated
}
// Get next missing chunk
currentChunk = chunks.getMissing();
if (currentChunk == null)
return null; // No more chunks, returning null tells the Downloader we're done.
return currentChunk.range;
}
}
// TODO: Clean up old stale uploads
}
|