summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java
blob: 8943524ddfc81064d5e8a99a383331f01a1688fd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
package org.openslx.filetransfer.util;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;
import org.openslx.filetransfer.LocalChunkSource.ChunkSource;
import org.openslx.filetransfer.LocalChunkSource.SourceFile;
import org.openslx.util.Util;

public class LocalCopyManager extends Thread
{

	private static final Logger LOGGER = Logger.getLogger( LocalCopyManager.class );

	private FileChunk currentChunk = null;

	private final ChunkList chunkList;

	private final IncomingTransferBase transfer;

	private final Map<String, RandomAccessFile> sources = new HashMap<>();

	private Semaphore hasWork = new Semaphore( 0 );

	private AtomicInteger copyCount = new AtomicInteger();

	private boolean paused = true;

	public LocalCopyManager( IncomingTransferBase transfer, ChunkList list )
	{
		super( "LocalCopyManager" );
		this.transfer = transfer;
		this.chunkList = list;
	}

	/**
	 * Trigger copying of another block if possible
	 */
	public synchronized void trigger()
	{
		if ( this.paused )
			return;
		if ( !isAlive() ) {
			LOGGER.warn( "Cannot be triggered when Thread is not running." );
			if ( currentChunk != null ) {
				chunkList.markFailed( currentChunk );
				currentChunk = null;
			}
			return;
		}
		if ( currentChunk == null ) {
			currentChunk = chunkList.getCopyCandidate();
			hasWork.release();
		}
	}

	@Override
	public void run()
	{
		try {
			while ( !interrupted() ) {
				while ( currentChunk != null ) {
					hasWork.drainPermits();
					copyChunk();
				}
				if ( !hasWork.tryAcquire( 10, TimeUnit.SECONDS ) ) {
					if ( chunkList.isComplete() ) {
						transfer.finishUploadInternal();
						break;
					} else if ( !transfer.isActive() ) {
						break;
					} else {
						trigger();
					}
				}
			}
		} catch ( InterruptedException | IllegalStateException e ) {
			interrupt();
		}
		synchronized ( this ) {
			if ( currentChunk != null ) {
				LOGGER.warn( "Still had a chunk when thread was interrupted." );
				chunkList.markFailed( currentChunk );
				currentChunk = null;
			}
		}
		for ( RandomAccessFile file : sources.values() ) {
			Util.safeClose( file );
		}
		LOGGER.debug( "My work here is done. Copied " + copyCount.get() + " chunks from " + sources.size() + " files." );
	}

	private void copyChunk() throws InterruptedException
	{
		ChunkSource source = currentChunk.getSources();
		if ( source != null ) {
			// OK
			for ( ;; ) {
				// Try every possible source file
				SourceFile sourceFile = getOpenFile( source, currentChunk.range.getLength() );
				if ( sourceFile == null ) {
					// Was marked as having a source file, but now we got null -- most likely
					// the source file doesn't exist or isn't readable
					LOGGER.warn( "No open file for local copying!" );
					break;
				}
				// OK
				RandomAccessFile raf = sources.get( sourceFile.fileName );
				byte[] buffer;
				try {
					raf.seek( sourceFile.offset );
					// In order not to hinder (fast) upload of unknown blocks, throttle
					// local copying as long as chunks are missing - do before allocating buffer
					// so we don't hold allocated unused memory for no reason, but the seek has
					// been done so we know the file handle is not goofed up
					if ( chunkList.hasLocallyMissingChunk() ) {
						int delay;
						HashChecker hc = transfer.getHashChecker();
						if ( hc == null ) {
							delay = 50;
						} else {
							delay = ( hc.getQueueFill() * 500 ) / hc.getQueueCapacity();
						}
						Thread.sleep( delay );
					}
					buffer = new byte[ sourceFile.chunkSize ];
					raf.readFully( buffer );
				} catch ( InterruptedException e ) {
					throw e;
				} catch ( Exception e ) {
					LOGGER.warn( "Could not read chunk to replicate from " + sourceFile.fileName, e );
					buffer = null;
					if ( e instanceof IOException ) {
						// Mark file as messed up
						sources.put( sourceFile.fileName, null );
					}
				}
				if ( buffer != null ) {
					// All is well, read chunk locally, pass on
					transfer.chunkReceivedInternal( currentChunk, buffer );
					synchronized ( this ) {
						currentChunk = null;
					}
					copyCount.incrementAndGet();
					trigger();
					return;
				}
				// Reaching here means failure
				// We'll keep looping as long as there are source files available
			}
			// End of loop over source files
		}
		// FAILED
		LOGGER.info( "Local copying failed, queueing for normal upload..." );
		synchronized ( this ) {
			chunkList.markFailed( currentChunk );
			currentChunk = null;
		}
	}

	private SourceFile getOpenFile( ChunkSource source, int requiredSize )
	{
		for ( SourceFile candidate : source.sourceCandidates ) {
			if ( sources.get( candidate.fileName ) != null )
				return candidate;
		}
		// Have to open
		for ( SourceFile candidate : source.sourceCandidates ) {
			if ( sources.containsKey( candidate.fileName ) ) // Maps to null (otherwise upper loop would have returned)
				continue; // File is broken, don't use
			if ( candidate.chunkSize != requiredSize )
				continue;
			File f = new File( candidate.fileName );
			if ( !f.exists() ) {
				sources.put( candidate.fileName, null ); // Mark for future
				continue;
			}
			try {
				RandomAccessFile raf = new RandomAccessFile( f, "r" );
				sources.put( candidate.fileName, raf );
				return candidate;
			} catch ( Exception e ) {
				LOGGER.info( "Cannot open " + candidate.fileName, e );
				sources.put( candidate.fileName, null ); // Mark for future
			}
		}
		// Nothing worked
		return null;
	}

	public boolean isPaused()
	{
		return paused;
	}

	public void setPaused( boolean paused )
	{
		this.paused = paused;
	}

}