summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/Uploader.java
blob: 6edc268d2c69a9d66411895670702635b54a657a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
package org.openslx.filetransfer;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.Socket;

import javax.net.ssl.SSLContext;

import net.jpountz.lz4.LZ4Compressor;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class Uploader extends Transfer
{

	private static final Logger log = LogManager.getLogger( Uploader.class );
	
	private final LZ4Compressor compressor = lz4factory.fastCompressor();
	
	private final Lz4OutStream compressedOut;
	
	/***********************************************************************/
	/**
	 * Actively establish upload connection to given peer.
	 * 
	 * @param host Host name or address to connect to
	 * @param port Port to connect to
	 * @param context ssl context for establishing a secure connection
	 * @throws IOException
	 */
	public Uploader( String host, int port, int readTimeoutMs, SSLContext context, String token ) throws IOException
	{
		super( host, port, readTimeoutMs, context, log );
		compressedOut = new Lz4OutStream( outStream );
		outStream.writeByte( 'U' );
		if ( !sendToken( token ) || !sendEndOfMeta() )
			throw new IOException( "Sending token failed" );
	}

	/***********************************************************************/
	/**
	 * Constructor for master uploader.
	 * Sends back the socket for datatransfer.
	 * 
	 * @throws IOException
	 */
	public Uploader( Socket socket ) throws IOException
	{
		super( socket, log );
		compressedOut = new Lz4OutStream( outStream );
	}

	/***********************************************************************/
	/**
	 * Method for sending File with filename.
	 * 
	 * @param filename
	 */
	public boolean upload( String filename )
	{
		return upload( filename, null );
	}
	
	/**
	 * Compressing output stream that will either write LZ4-compressed data, or if the data
	 * doesn't compress well, just the original uncompressed data.
	 */
	private class Lz4OutStream extends OutputStream
	{
		
		private final DataOutputStream parentStream;
		
		private byte[] buffer;
		
		private long bytesSentTotal, bytesDecompressedTotal;
		
		private int chunksCompressed, chunksUncompressed;
		
		public Lz4OutStream( DataOutputStream out )
		{
			parentStream = out;
			log.info( "Compressor: " + compressor.getClass().getSimpleName() );
		}

		@Override
		public void write( int b ) throws IOException
		{
			throw new UnsupportedOperationException( "Cannot do this" );
		}

		@Override
		public void write( byte[] data, int off, int decompressedLength ) throws IOException
		{
			int maxCompressedLength = compressor.maxCompressedLength( decompressedLength );
			if ( buffer == null || buffer.length < maxCompressedLength ) {
				buffer = new byte[ maxCompressedLength ];
			}
			bytesDecompressedTotal += decompressedLength;
			int compressedLength = compressor.compress( data, off, decompressedLength, buffer, 0, maxCompressedLength );
			parentStream.writeInt( decompressedLength );
			// Only send compressed data if we got down to at least ~88% the original size
			if ( ( compressedLength * 9 / 8 ) < decompressedLength ) {
				bytesSentTotal += compressedLength;
				chunksCompressed++;
				parentStream.writeInt( compressedLength );
				parentStream.write( buffer, 0, compressedLength );
			} else {
				bytesSentTotal += decompressedLength;
				chunksUncompressed++;
				parentStream.writeInt( decompressedLength );
				parentStream.write( data, off, decompressedLength );
			}
		}
		
		public void printStats()
		{
			if ( bytesSentTotal == 0 )
				return;
			log.info( "Bytes sent: " + bytesSentTotal + ", decompressed to: " + bytesDecompressedTotal );
			log.info( "Chunks sent compressed: " + chunksCompressed + ", uncompressed: " + chunksUncompressed );
		}

	}

	public boolean upload( String filename, UploadStatusCallback callback )
	{
		if ( shouldGetToken() ) {
			log.error( "You didn't call getToken yet!" );
			return false;
		}
		RandomAccessFile file = null;
		try {
			try {
				file = new RandomAccessFile( new File( filename ), "r" );
			} catch ( FileNotFoundException e ) {
				this.close( "Could not open given file for reading.", callback, true );
				return false;
			}
			byte[] data = null;
			// Cannot go above 500000 for backwards compat
			for ( int bufsiz = 500; bufsiz >= 100 && data == null; bufsiz -= 100 ) {
				try {
					data = new byte[ bufsiz * 1000 ];
				} catch ( OutOfMemoryError e ) {
				}
			}
			if ( data == null ) {
				this.close( "Could not allocate buffer for reading.", callback, true );
				return false;
			}
			while ( !Thread.currentThread().isInterrupted() ) { // Loop as long as remote peer is requesting chunks from this file
				// Read meta data of remote peer - either new range, or it's telling us it's done
				MetaData meta = readMetaData();
				if ( meta == null ) {
					this.close( "Did not get meta data from remote peer.", callback, true );
					return false;
				}
				if ( meta.isDone() ) // Download complete?
					break;
				if ( getRemoteError() != null ) {
					this.close( "Remote peer sent error: " + getRemoteError(), callback, true );
					return false;
				}
				// Not complete, so there must be another range request
				FileRange requestedRange = meta.getRange();
				if ( requestedRange == null ) {
					this.close( "Peer did not include RANGE in meta data.", callback, true );
					return false;
				}
				// Range inside file?
				try {
					if ( requestedRange.endOffset > file.length() ) {
						this.close( "Requested range is larger than file size, aborting.", callback, true );
						return false;
					}
				} catch ( IOException e ) {
					this.close( "Could not get current length of file " + filename, callback, false, e );
					return false;
				}
				// Seek to requested chunk
				try {
					file.seek( requestedRange.startOffset );
				} catch ( IOException e ) {
					this.close( "Could not seek to start of requested range in given file (" + requestedRange.startOffset + ")", callback, true, e );
					return false;
				}
				// Send confirmation of range and compression mode we're about to send
				OutputStream outStr = outStream;
				try {
					if ( meta.peerWantsCompression() && useCompression ) {
						sendUseCompression();
						outStr = compressedOut;
					}
					long ptr = file.getFilePointer();
					if ( !sendRange( ptr, ptr + requestedRange.getLength() ) || !sendEndOfMeta() ) {
						this.close( "Could not send range confirmation" );
						return false;
					}
				} catch ( IOException e ) {
					this.close( "Could not determine current position in file " + filename );
					return false;
				}
				// Finally send requested chunk
				int hasRead = 0;
				int length = requestedRange.getLength();
				while ( hasRead < length ) {
					int ret;
					try {
						ret = file.read( data, 0, Math.min( length - hasRead, data.length ) );
					} catch ( IOException e ) {
						this.close( "Error reading from file ", callback, true, e );
						return false;
					}
					if ( ret == -1 ) {
						this.close( "Error occured in Uploader.sendFile() while reading from File to send.", callback, true );
						return false;
					}
					hasRead += ret;
					try {
						outStr.write( data, 0, ret );
					} catch ( IOException e ) {
						this.close( "Sending payload failed", e );
						return false;
					}
					if ( callback != null )
						callback.uploadProgress( ret );
				}
			}
		} finally {
			Transfer.safeClose( file, transferSocket );
			compressedOut.printStats();
		}
		return true;
	}

}