1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
package org.openslx.filetransfer.util;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.log4j.Logger;
public class ChunkList
{
private static final Logger LOGGER = Logger.getLogger( ChunkList.class );
/**
* Chunks that are missing from the file
*/
private final List<FileChunk> missingChunks = new LinkedList<>();
/**
* Chunks that are currently being uploaded or hash-checked
*/
private final List<FileChunk> pendingChunks = new LinkedList<>();
private final List<FileChunk> completeChunks = new ArrayList<>( 100 );
// 0 = complete, 1 = missing, 2 = uploading, 3 = queued for copying, 4 = copying
private final ByteBuffer statusArray;
// Do we need to keep valid chunks, or chunks that failed too many times?
public ChunkList( long fileSize, List<ByteBuffer> sha1Sums )
{
FileChunk.createChunkList( missingChunks, fileSize, sha1Sums );
statusArray = ByteBuffer.allocate( missingChunks.size() );
}
/**
* Get a missing chunk, marking it pending.
*
* @return chunk marked as missing
* @throws InterruptedException
*/
public synchronized FileChunk getMissing() throws InterruptedException
{
if ( missingChunks.isEmpty() && pendingChunks.isEmpty() )
return null;
if ( missingChunks.isEmpty() ) {
this.wait( 3000 );
if ( missingChunks.isEmpty() )
return null;
}
FileChunk c = missingChunks.remove( 0 );
pendingChunks.add( c );
return c;
}
/**
* Get the block status as byte representation.
*/
public synchronized ByteBuffer getStatusArray()
{
byte[] array = statusArray.array();
//Arrays.fill(array, (byte)0);
for ( FileChunk c : missingChunks ) {
array[c.getChunkIndex()] = 1;
}
for ( FileChunk c : pendingChunks ) {
array[c.getChunkIndex()] = 2;
}
for ( FileChunk c : completeChunks ) {
array[c.getChunkIndex()] = 0;
}
return statusArray;
}
/**
* Get completed chunks as list
*
* @return List containing all successfully transfered chunks
*/
public synchronized List<FileChunk> getCompleted()
{
return new ArrayList<>( completeChunks );
}
/**
* Mark a chunk currently transferring as successfully transfered.
*
* @param c The chunk in question
*/
public synchronized void markSuccessful( FileChunk c )
{
if ( !pendingChunks.remove( c ) ) {
LOGGER.warn( "Inconsistent state: markTransferred called for Chunk " + c.toString()
+ ", but chunk is not marked as currently transferring!" );
return;
}
completeChunks.add( c );
this.notifyAll();
}
/**
* Mark a chunk currently transferring or being hash checked as failed
* transfer. This increases its fail count and re-adds it to the list of
* missing chunks.
*
* @param c The chunk in question
* @return Number of times transfer of this chunk failed
*/
public synchronized int markFailed( FileChunk c )
{
if ( !pendingChunks.remove( c ) ) {
LOGGER.warn( "Inconsistent state: markTransferred called for Chunk " + c.toString()
+ ", but chunk is not marked as currently transferring!" );
return -1;
}
// Add as first element so it will be re-transmitted immediately
missingChunks.add( 0, c );
this.notifyAll();
return c.incFailed();
}
public synchronized boolean isComplete()
{
return missingChunks.isEmpty() && pendingChunks.isEmpty();
}
}
|