diff options
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/Uploader.java')
-rw-r--r-- | src/main/java/org/openslx/filetransfer/Uploader.java | 50 |
1 files changed, 35 insertions, 15 deletions
diff --git a/src/main/java/org/openslx/filetransfer/Uploader.java b/src/main/java/org/openslx/filetransfer/Uploader.java index e82955b..6edc268 100644 --- a/src/main/java/org/openslx/filetransfer/Uploader.java +++ b/src/main/java/org/openslx/filetransfer/Uploader.java @@ -12,12 +12,13 @@ import javax.net.ssl.SSLContext; import net.jpountz.lz4.LZ4Compressor; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class Uploader extends Transfer { - private static final Logger log = Logger.getLogger( Uploader.class ); + private static final Logger log = LogManager.getLogger( Uploader.class ); private final LZ4Compressor compressor = lz4factory.fastCompressor(); @@ -65,6 +66,10 @@ public class Uploader extends Transfer return upload( filename, null ); } + /** + * Compressing output stream that will either write LZ4-compressed data, or if the data + * doesn't compress well, just the original uncompressed data. + */ private class Lz4OutStream extends OutputStream { @@ -72,7 +77,7 @@ public class Uploader extends Transfer private byte[] buffer; - private long compressed, uncompressed; + private long bytesSentTotal, bytesDecompressedTotal; private int chunksCompressed, chunksUncompressed; @@ -95,16 +100,17 @@ public class Uploader extends Transfer if ( buffer == null || buffer.length < maxCompressedLength ) { buffer = new byte[ maxCompressedLength ]; } - uncompressed += decompressedLength; + bytesDecompressedTotal += decompressedLength; int compressedLength = compressor.compress( data, off, decompressedLength, buffer, 0, maxCompressedLength ); parentStream.writeInt( decompressedLength ); + // Only send compressed data if we got down to at least ~88% the original size if ( ( compressedLength * 9 / 8 ) < decompressedLength ) { - compressed += compressedLength; + bytesSentTotal += compressedLength; chunksCompressed++; parentStream.writeInt( compressedLength ); parentStream.write( buffer, 0, compressedLength ); } else { - compressed += decompressedLength; + bytesSentTotal += decompressedLength; chunksUncompressed++; parentStream.writeInt( decompressedLength ); parentStream.write( data, off, decompressedLength ); @@ -113,15 +119,14 @@ public class Uploader extends Transfer public void printStats() { - if ( compressed == 0 ) + if ( bytesSentTotal == 0 ) return; - log.info( "Sent bytes: " + compressed + ", decompressed bytes: " + uncompressed ); - log.info( "Sent compressed: " + chunksCompressed + ", uncompressed: " + chunksUncompressed ); + log.info( "Bytes sent: " + bytesSentTotal + ", decompressed to: " + bytesDecompressedTotal ); + log.info( "Chunks sent compressed: " + chunksCompressed + ", uncompressed: " + chunksUncompressed ); } } - @SuppressWarnings( "resource" ) public boolean upload( String filename, UploadStatusCallback callback ) { if ( shouldGetToken() ) { @@ -136,6 +141,18 @@ public class Uploader extends Transfer this.close( "Could not open given file for reading.", callback, true ); return false; } + byte[] data = null; + // Cannot go above 500000 for backwards compat + for ( int bufsiz = 500; bufsiz >= 100 && data == null; bufsiz -= 100 ) { + try { + data = new byte[ bufsiz * 1000 ]; + } catch ( OutOfMemoryError e ) { + } + } + if ( data == null ) { + this.close( "Could not allocate buffer for reading.", callback, true ); + return false; + } while ( !Thread.currentThread().isInterrupted() ) { // Loop as long as remote peer is requesting chunks from this file // Read meta data of remote peer - either new range, or it's telling us it's done MetaData meta = readMetaData(); @@ -145,6 +162,10 @@ public class Uploader extends Transfer } if ( meta.isDone() ) // Download complete? break; + if ( getRemoteError() != null ) { + this.close( "Remote peer sent error: " + getRemoteError(), callback, true ); + return false; + } // Not complete, so there must be another range request FileRange requestedRange = meta.getRange(); if ( requestedRange == null ) { @@ -158,14 +179,14 @@ public class Uploader extends Transfer return false; } } catch ( IOException e ) { - this.close( "Could not get current length of file " + filename, callback, false ); + this.close( "Could not get current length of file " + filename, callback, false, e ); return false; } // Seek to requested chunk try { file.seek( requestedRange.startOffset ); } catch ( IOException e ) { - this.close( "Could not seek to start of requested range in given file (" + requestedRange.startOffset + ")", callback, true ); + this.close( "Could not seek to start of requested range in given file (" + requestedRange.startOffset + ")", callback, true, e ); return false; } // Send confirmation of range and compression mode we're about to send @@ -185,7 +206,6 @@ public class Uploader extends Transfer return false; } // Finally send requested chunk - byte[] data = new byte[ 500000 ]; // 500kb int hasRead = 0; int length = requestedRange.getLength(); while ( hasRead < length ) { @@ -193,7 +213,7 @@ public class Uploader extends Transfer try { ret = file.read( data, 0, Math.min( length - hasRead, data.length ) ); } catch ( IOException e ) { - this.close( "Error reading from file ", callback, true ); + this.close( "Error reading from file ", callback, true, e ); return false; } if ( ret == -1 ) { @@ -204,7 +224,7 @@ public class Uploader extends Transfer try { outStr.write( data, 0, ret ); } catch ( IOException e ) { - this.close( "Sending payload failed" ); + this.close( "Sending payload failed", e ); return false; } if ( callback != null ) |