package org.openslx.filetransfer;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.Socket;
import javax.net.ssl.SSLContext;
import net.jpountz.lz4.LZ4Compressor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class Uploader extends Transfer
{
private static final Logger log = LogManager.getLogger( Uploader.class );
private final LZ4Compressor compressor = lz4factory.fastCompressor();
private final Lz4OutStream compressedOut;
/***********************************************************************/
/**
* Actively establish upload connection to given peer.
*
* @param host Host name or address to connect to
* @param port Port to connect to
* @param context ssl context for establishing a secure connection
* @throws IOException
*/
public Uploader( String host, int port, int readTimeoutMs, SSLContext context, String token ) throws IOException
{
super( host, port, readTimeoutMs, context, log );
compressedOut = new Lz4OutStream( outStream );
outStream.writeByte( 'U' );
if ( !sendToken( token ) || !sendEndOfMeta() )
throw new IOException( "Sending token failed" );
}
/***********************************************************************/
/**
* Constructor for master uploader.
* Sends back the socket for datatransfer.
*
* @throws IOException
*/
public Uploader( Socket socket ) throws IOException
{
super( socket, log );
compressedOut = new Lz4OutStream( outStream );
}
/***********************************************************************/
/**
* Method for sending File with filename.
*
* @param filename
*/
public boolean upload( String filename )
{
return upload( filename, null );
}
/**
* Compressing output stream that will either write LZ4-compressed data, or if the data
* doesn't compress well, just the original uncompressed data.
*/
private class Lz4OutStream extends OutputStream
{
private final DataOutputStream parentStream;
private byte[] buffer;
private long bytesSentTotal, bytesDecompressedTotal;
private int chunksCompressed, chunksUncompressed;
public Lz4OutStream( DataOutputStream out )
{
parentStream = out;
log.info( "Compressor: " + compressor.getClass().getSimpleName() );
}
@Override
public void write( int b ) throws IOException
{
throw new UnsupportedOperationException( "Cannot do this" );
}
@Override
public void write( byte[] data, int off, int decompressedLength ) throws IOException
{
int maxCompressedLength = compressor.maxCompressedLength( decompressedLength );
if ( buffer == null || buffer.length < maxCompressedLength ) {
buffer = new byte[ maxCompressedLength ];
}
bytesDecompressedTotal += decompressedLength;
int compressedLength = compressor.compress( data, off, decompressedLength, buffer, 0, maxCompressedLength );
parentStream.writeInt( decompressedLength );
// Only send compressed data if we got down to at least ~88% the original size
if ( ( compressedLength * 9 / 8 ) < decompressedLength ) {
bytesSentTotal += compressedLength;
chunksCompressed++;
parentStream.writeInt( compressedLength );
parentStream.write( buffer, 0, compressedLength );
} else {
bytesSentTotal += decompressedLength;
chunksUncompressed++;
parentStream.writeInt( decompressedLength );
parentStream.write( data, off, decompressedLength );
}
}
public void printStats()
{
if ( bytesSentTotal == 0 )
return;
log.info( "Bytes sent: " + bytesSentTotal + ", decompressed to: " + bytesDecompressedTotal );
log.info( "Chunks sent compressed: " + chunksCompressed + ", uncompressed: " + chunksUncompressed );
}
}
public boolean upload( String filename, UploadStatusCallback callback )
{
if ( shouldGetToken() ) {
log.error( "You didn't call getToken yet!" );
return false;
}
RandomAccessFile file = null;
try {
try {
file = new RandomAccessFile( new File( filename ), "r" );
} catch ( FileNotFoundException e ) {
this.close( "Could not open given file for reading.", callback, true );
return false;
}
byte[] data = null;
// Cannot go above 500000 for backwards compat
for ( int bufsiz = 500; bufsiz >= 100 && data == null; bufsiz -= 100 ) {
try {
data = new byte[ bufsiz * 1000 ];
} catch ( OutOfMemoryError e ) {
}
}
if ( data == null ) {
this.close( "Could not allocate buffer for reading.", callback, true );
return false;
}
while ( !Thread.currentThread().isInterrupted() ) { // Loop as long as remote peer is requesting chunks from this file
// Read meta data of remote peer - either new range, or it's telling us it's done
MetaData meta = readMetaData();
if ( meta == null ) {
this.close( "Did not get meta data from remote peer.", callback, true );
return false;
}
if ( meta.isDone() ) // Download complete?
break;
if ( getRemoteError() != null ) {
this.close( "Remote peer sent error: " + getRemoteError(), callback, true );
return false;
}
// Not complete, so there must be another range request
FileRange requestedRange = meta.getRange();
if ( requestedRange == null ) {
this.close( "Peer did not include RANGE in meta data.", callback, true );
return false;
}
// Range inside file?
try {
if ( requestedRange.endOffset > file.length() ) {
this.close( "Requested range is larger than file size, aborting.", callback, true );
return false;
}
} catch ( IOException e ) {
this.close( "Could not get current length of file " + filename, callback, false, e );
return false;
}
// Seek to requested chunk
try {
file.seek( requestedRange.startOffset );
} catch ( IOException e ) {
this.close( "Could not seek to start of requested range in given file (" + requestedRange.startOffset + ")", callback, true, e );
return false;
}
// Send confirmation of range and compression mode we're about to send
OutputStream outStr = outStream;
try {
if ( meta.peerWantsCompression() && useCompression ) {
sendUseCompression();
outStr = compressedOut;
}
long ptr = file.getFilePointer();
if ( !sendRange( ptr, ptr + requestedRange.getLength() ) || !sendEndOfMeta() ) {
this.close( "Could not send range confirmation" );
return false;
}
} catch ( IOException e ) {
this.close( "Could not determine current position in file " + filename );
return false;
}
// Finally send requested chunk
int hasRead = 0;
int length = requestedRange.getLength();
while ( hasRead < length ) {
int ret;
try {
ret = file.read( data, 0, Math.min( length - hasRead, data.length ) );
} catch ( IOException e ) {
this.close( "Error reading from file ", callback, true, e );
return false;
}
if ( ret == -1 ) {
this.close( "Error occured in Uploader.sendFile() while reading from File to send.", callback, true );
return false;
}
hasRead += ret;
try {
outStr.write( data, 0, ret );
} catch ( IOException e ) {
this.close( "Sending payload failed", e );
return false;
}
if ( callback != null )
callback.uploadProgress( ret );
}
}
} finally {
Transfer.safeClose( file, transferSocket );
compressedOut.printStats();
}
return true;
}
}