diff options
Diffstat (limited to 'src/main/java/org/openslx/filetransfer')
12 files changed, 281 insertions, 112 deletions
diff --git a/src/main/java/org/openslx/filetransfer/ClassTest.java b/src/main/java/org/openslx/filetransfer/ClassTest.java index 9d5bc82..04dc40d 100644 --- a/src/main/java/org/openslx/filetransfer/ClassTest.java +++ b/src/main/java/org/openslx/filetransfer/ClassTest.java @@ -28,8 +28,8 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; -import org.apache.log4j.BasicConfigurator; -import org.slf4j.LoggerFactory; +import org.apache.logging.log4j.core.config.Configurator; +import org.apache.logging.log4j.core.config.DefaultConfiguration; public class ClassTest { @@ -39,15 +39,10 @@ public class ClassTest private static String inFile; private static String outFile; - static { - // This is a temporary workaround for this annoying log4j error msg. - // Initializing the logger before anything else is done. - BasicConfigurator.configure(); - LoggerFactory.getLogger( "ROOT" ); - } - public static void main( String[] args ) throws Exception { + Configurator.initialize(new DefaultConfiguration()); + if ( args.length != 4 ) { System.out.println( "Need 4 argument: <keystore> <passphrase> <infile> <outfile>" ); System.exit( 1 ); diff --git a/src/main/java/org/openslx/filetransfer/Downloader.java b/src/main/java/org/openslx/filetransfer/Downloader.java index 50162fc..5aff94b 100644 --- a/src/main/java/org/openslx/filetransfer/Downloader.java +++ b/src/main/java/org/openslx/filetransfer/Downloader.java @@ -12,12 +12,13 @@ import javax.net.ssl.SSLContext; import net.jpountz.lz4.LZ4FastDecompressor; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class Downloader extends Transfer { - private static final Logger log = Logger.getLogger( Downloader.class ); + private static final Logger log = LogManager.getLogger( Downloader.class ); private final LZ4FastDecompressor decompressor = lz4factory.fastDecompressor(); @@ -115,6 +116,7 @@ public class Downloader extends Transfer compressed += compressedLength; uncompressed += decompressedLength; if ( decompressedLength > len ) { + // TODO: Partial reads with buffering, if remote payload is larger than our buffer throw new RuntimeException( "This should never happen! ;)" ); } if ( decompressedLength == compressedLength ) { @@ -167,6 +169,19 @@ public class Downloader extends Transfer } FileRange requestedRange; try { + byte[] incoming = new byte[ 500000 ]; + /* TODO once the Lz4InputStream can handle small buffer sizes / partial reads + for ( int bufsiz = 600; bufsiz >= 100 && incoming == null; bufsiz -= 100 ) { + try { + incoming = new byte[ bufsiz * 1024 ]; + } catch ( OutOfMemoryError e ) { + } + } + if ( incoming == null ) { + log.error( "Could not allocate buffer for receiving." ); + return false; + } + */ while ( ( requestedRange = rangeCallback.get() ) != null ) { if ( requestedRange.startOffset < 0 || requestedRange.startOffset >= requestedRange.endOffset ) { log.error( "Callback supplied bad range (" + requestedRange.startOffset + " to " + requestedRange.endOffset + ")" ); @@ -187,16 +202,23 @@ public class Downloader extends Transfer log.error( "Did not receive meta data from uploading remote peer after requesting range, aborting." ); return false; } + if ( getRemoteError() != null ) { + log.error( "Remote peer sent error: " + getRemoteError() ); + return false; + } FileRange remoteRange = meta.getRange(); - if ( remoteRange == null || !remoteRange.equals( requestedRange ) ) { - log.error( "Confirmed range by remote peer does not match requested range, aborting download." ); + if ( remoteRange == null ) { + log.error( "Remote metadata does not contain range confirmation. " + meta ); + } + if ( !remoteRange.equals( requestedRange ) ) { + log.error( "Confirmed range by remote peer (" + remoteRange + + ") does not match requested range (" + requestedRange + "), aborting download." ); return false; } // Receive requested range int chunkLength = requestedRange.getLength(); // If the uploader sets the COMPRESS field, assume compressed chunk InputStream inStream = meta.peerWantsCompression() ? compressedIn : dataFromServer; - byte[] incoming = new byte[ 500000 ]; // 500kb int hasRead = 0; while ( hasRead < chunkLength ) { int ret; @@ -207,7 +229,7 @@ public class Downloader extends Transfer return false; } } catch ( IOException e ) { - log.error( "Could not read payload from socket" ); + log.error( "Could not read payload from socket", e ); sendErrorCode( "payload read error" ); return false; } @@ -227,7 +249,7 @@ public class Downloader extends Transfer compressedIn.printStats(); try { transferSocket.shutdownOutput(); - } catch ( IOException e ) { + } catch ( Exception e ) { } } finally { this.close( null ); diff --git a/src/main/java/org/openslx/filetransfer/FileRange.java b/src/main/java/org/openslx/filetransfer/FileRange.java index e8a7d12..ed86e51 100644 --- a/src/main/java/org/openslx/filetransfer/FileRange.java +++ b/src/main/java/org/openslx/filetransfer/FileRange.java @@ -64,4 +64,10 @@ public class FileRange return (int)startOffset ^ Integer.rotateLeft( (int)endOffset, 16 ) ^ (int)(startOffset >> 32); } + @Override + public String toString() + { + return startOffset + "-" + endOffset; + } + } diff --git a/src/main/java/org/openslx/filetransfer/Listener.java b/src/main/java/org/openslx/filetransfer/Listener.java index a0fc172..fc990fc 100644 --- a/src/main/java/org/openslx/filetransfer/Listener.java +++ b/src/main/java/org/openslx/filetransfer/Listener.java @@ -3,6 +3,7 @@ package org.openslx.filetransfer; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; +import java.net.SocketException; import java.net.SocketTimeoutException; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; @@ -11,9 +12,11 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLException; import javax.net.ssl.SSLServerSocketFactory; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.openslx.util.PrioThreadFactory; public class Listener @@ -25,11 +28,11 @@ public class Listener private Thread acceptThread = null; private final int readTimeoutMs; private final ExecutorService processingPool = new ThreadPoolExecutor( 0, 8, 5, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), - new PrioThreadFactory( "BFTP-Init" ) ); + new PrioThreadFactory( "BFTP-BS" ) ); - private static final byte CONNECTING_PEER_WANTS_TO_UPLOAD = 85; // hex - code 'U' = 85. - private static final byte CONNECTING_PEER_WANTS_TO_DOWNLOAD = 68; // hex - code 'D' = 68. - private static Logger log = Logger.getLogger( Listener.class ); + private static final byte CONNECTING_PEER_WANTS_TO_UPLOAD = 85; // ASCII 'U' = 85. + private static final byte CONNECTING_PEER_WANTS_TO_DOWNLOAD = 68; // ASCII 'D' = 68. + private static Logger log = LogManager.getLogger( Listener.class ); /***********************************************************************/ /** @@ -67,8 +70,10 @@ public class Listener SSLServerSocketFactory sslServerSocketFactory = context.getServerSocketFactory(); listenSocket = sslServerSocketFactory.createServerSocket(); } + listenSocket.setSoTimeout( 5000 ); listenSocket.setReuseAddress( true ); listenSocket.bind( new InetSocketAddress( this.port ) ); + listenSocket.setSoTimeout( 0 ); } catch ( Exception e ) { log.error( "Cannot listen on port " + this.port, e ); listenSocket = null; @@ -82,16 +87,16 @@ public class Listener if ( acceptThread != null ) return; final Listener instance = this; - acceptThread = new Thread( "BFTP-Listen-" + this.port ) { + acceptThread = new Thread( "BFTP:" + this.port ) { @Override public void run() { try { // Run accept loop in own thread while ( !isInterrupted() ) { - Socket acceptedSocket = null; + final Socket connection; try { - acceptedSocket = listenSocket.accept(); + connection = listenSocket.accept(); } catch ( SocketTimeoutException e ) { continue; } catch ( Exception e ) { @@ -105,14 +110,12 @@ public class Listener continue; } // Handle each accepted connection in a thread pool - final Socket connection = acceptedSocket; Runnable handler = new Runnable() { @Override public void run() { - try { - // Give initial byte signalling mode of operation 5 secs to arrive + // Give initial byte signaling mode of operation 5 secs to arrive connection.setSoTimeout( 5000 ); byte[] b = new byte[ 1 ]; @@ -127,25 +130,32 @@ public class Listener if ( b[0] == CONNECTING_PEER_WANTS_TO_UPLOAD ) { // --> start Downloader(socket). Downloader d = new Downloader( connection ); + // Will take care of connection cleanup incomingEvent.incomingUploadRequest( d ); } else if ( b[0] == CONNECTING_PEER_WANTS_TO_DOWNLOAD ) { // --> start Uploader(socket). Uploader u = new Uploader( connection ); + // Will take care of connection cleanup incomingEvent.incomingDownloadRequest( u ); } else { - log.debug( "Got invalid init-byte ... close connection" ); + log.debug( "Got invalid init-byte ... closing connection" ); Transfer.safeClose( connection ); } + } catch ( SSLException e ) { + Transfer.safeClose( connection ); + log.warn( "SSL error when acceping client " + connection.getInetAddress().getHostAddress() ); + } catch ( SocketException e ) { + // No reason to log, probably - connection where client did nothing after connecting. } catch ( Exception e ) { - log.warn( "Error accepting client", e ); Transfer.safeClose( connection ); + log.warn( "Error handling client", e ); } } }; try { processingPool.execute( handler ); } catch ( RejectedExecutionException e ) { - Transfer.safeClose( acceptedSocket ); + Transfer.safeClose( connection ); } } } finally { diff --git a/src/main/java/org/openslx/filetransfer/Transfer.java b/src/main/java/org/openslx/filetransfer/Transfer.java index 589d142..aebd3ce 100644 --- a/src/main/java/org/openslx/filetransfer/Transfer.java +++ b/src/main/java/org/openslx/filetransfer/Transfer.java @@ -9,13 +9,14 @@ import java.net.Socket; import java.net.SocketTimeoutException; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; import net.jpountz.lz4.LZ4Factory; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.Logger; import org.openslx.util.Util; public abstract class Transfer @@ -23,7 +24,7 @@ public abstract class Transfer protected final Socket transferSocket; protected final DataOutputStream outStream; protected final DataInputStream dataFromServer; - protected String ERROR = null; + private String remoteError; private boolean shouldGetToken; protected boolean useCompression = true; @@ -51,7 +52,7 @@ public abstract class Transfer transferSocket = sslSocketFactory.createSocket(); } transferSocket.setSoTimeout( readTimeoutMs ); - transferSocket.connect( new InetSocketAddress( host, port ) ); + transferSocket.connect( new InetSocketAddress( host, port ), 4000 ); outStream = new DataOutputStream( transferSocket.getOutputStream() ); dataFromServer = new DataInputStream( transferSocket.getInputStream() ); @@ -169,6 +170,7 @@ public abstract class Transfer */ protected MetaData readMetaData() { + Map<String, String> entries = new HashMap<>(); try { while ( true ) { @@ -183,7 +185,7 @@ public abstract class Transfer continue; } if ( splitted[0].equals( "ERROR" ) ) - ERROR = splitted[1]; + remoteError = splitted[1]; if ( entries.containsKey( splitted[0] ) ) { log.warn( "Received meta data key " + splitted[0] + " when already received, ignoring!" ); } else { @@ -192,10 +194,10 @@ public abstract class Transfer } } catch ( SocketTimeoutException ste ) { sendErrorCode( "timeout" ); - this.close( "Socket Timeout occured in readMetaData. " + ERROR ); + this.close( "Socket Timeout occured in readMetaData." ); return null; } catch ( Exception e ) { - this.close( "Exception occured in readMetaData: " + e.toString() + " " + ERROR ); + this.close( "Exception occured in readMetaData: " + e.toString() ); return null; } return new MetaData( entries ); @@ -219,11 +221,17 @@ public abstract class Transfer */ protected void close( String error, UploadStatusCallback callback, boolean sendToPeer ) { + close( error, callback, sendToPeer, null ); + } + + protected void close( String error, UploadStatusCallback callback, boolean sendToPeer, Exception e ) + { if ( error != null ) { if ( sendToPeer ) sendErrorCode( error ); if ( callback != null ) callback.uploadError( error ); + log.info( "Closing with error '" + error + "'", e ); } synchronized ( transferSocket ) { safeClose( dataFromServer, outStream, transferSocket ); @@ -232,7 +240,12 @@ public abstract class Transfer protected void close( String error ) { - close( error, null, false ); + close( error, null ); + } + + protected void close( String error, Exception e ) + { + close( error, null, false, e ); } public void cancel() @@ -272,7 +285,7 @@ public abstract class Transfer */ public String getRemoteError() { - return ERROR; + return remoteError; } /** @@ -406,6 +419,21 @@ public abstract class Transfer return meta.containsKey( "COMPRESS" ); } + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + for ( Entry<String, String> it : meta.entrySet() ) { + if ( sb.length() != 0 ) { + sb.append( ' ' ); + } + sb.append( it.getKey() ); + sb.append( '=' ); + sb.append( it.getValue() ); + } + return sb.toString(); + } + } } diff --git a/src/main/java/org/openslx/filetransfer/Uploader.java b/src/main/java/org/openslx/filetransfer/Uploader.java index e82955b..6edc268 100644 --- a/src/main/java/org/openslx/filetransfer/Uploader.java +++ b/src/main/java/org/openslx/filetransfer/Uploader.java @@ -12,12 +12,13 @@ import javax.net.ssl.SSLContext; import net.jpountz.lz4.LZ4Compressor; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class Uploader extends Transfer { - private static final Logger log = Logger.getLogger( Uploader.class ); + private static final Logger log = LogManager.getLogger( Uploader.class ); private final LZ4Compressor compressor = lz4factory.fastCompressor(); @@ -65,6 +66,10 @@ public class Uploader extends Transfer return upload( filename, null ); } + /** + * Compressing output stream that will either write LZ4-compressed data, or if the data + * doesn't compress well, just the original uncompressed data. + */ private class Lz4OutStream extends OutputStream { @@ -72,7 +77,7 @@ public class Uploader extends Transfer private byte[] buffer; - private long compressed, uncompressed; + private long bytesSentTotal, bytesDecompressedTotal; private int chunksCompressed, chunksUncompressed; @@ -95,16 +100,17 @@ public class Uploader extends Transfer if ( buffer == null || buffer.length < maxCompressedLength ) { buffer = new byte[ maxCompressedLength ]; } - uncompressed += decompressedLength; + bytesDecompressedTotal += decompressedLength; int compressedLength = compressor.compress( data, off, decompressedLength, buffer, 0, maxCompressedLength ); parentStream.writeInt( decompressedLength ); + // Only send compressed data if we got down to at least ~88% the original size if ( ( compressedLength * 9 / 8 ) < decompressedLength ) { - compressed += compressedLength; + bytesSentTotal += compressedLength; chunksCompressed++; parentStream.writeInt( compressedLength ); parentStream.write( buffer, 0, compressedLength ); } else { - compressed += decompressedLength; + bytesSentTotal += decompressedLength; chunksUncompressed++; parentStream.writeInt( decompressedLength ); parentStream.write( data, off, decompressedLength ); @@ -113,15 +119,14 @@ public class Uploader extends Transfer public void printStats() { - if ( compressed == 0 ) + if ( bytesSentTotal == 0 ) return; - log.info( "Sent bytes: " + compressed + ", decompressed bytes: " + uncompressed ); - log.info( "Sent compressed: " + chunksCompressed + ", uncompressed: " + chunksUncompressed ); + log.info( "Bytes sent: " + bytesSentTotal + ", decompressed to: " + bytesDecompressedTotal ); + log.info( "Chunks sent compressed: " + chunksCompressed + ", uncompressed: " + chunksUncompressed ); } } - @SuppressWarnings( "resource" ) public boolean upload( String filename, UploadStatusCallback callback ) { if ( shouldGetToken() ) { @@ -136,6 +141,18 @@ public class Uploader extends Transfer this.close( "Could not open given file for reading.", callback, true ); return false; } + byte[] data = null; + // Cannot go above 500000 for backwards compat + for ( int bufsiz = 500; bufsiz >= 100 && data == null; bufsiz -= 100 ) { + try { + data = new byte[ bufsiz * 1000 ]; + } catch ( OutOfMemoryError e ) { + } + } + if ( data == null ) { + this.close( "Could not allocate buffer for reading.", callback, true ); + return false; + } while ( !Thread.currentThread().isInterrupted() ) { // Loop as long as remote peer is requesting chunks from this file // Read meta data of remote peer - either new range, or it's telling us it's done MetaData meta = readMetaData(); @@ -145,6 +162,10 @@ public class Uploader extends Transfer } if ( meta.isDone() ) // Download complete? break; + if ( getRemoteError() != null ) { + this.close( "Remote peer sent error: " + getRemoteError(), callback, true ); + return false; + } // Not complete, so there must be another range request FileRange requestedRange = meta.getRange(); if ( requestedRange == null ) { @@ -158,14 +179,14 @@ public class Uploader extends Transfer return false; } } catch ( IOException e ) { - this.close( "Could not get current length of file " + filename, callback, false ); + this.close( "Could not get current length of file " + filename, callback, false, e ); return false; } // Seek to requested chunk try { file.seek( requestedRange.startOffset ); } catch ( IOException e ) { - this.close( "Could not seek to start of requested range in given file (" + requestedRange.startOffset + ")", callback, true ); + this.close( "Could not seek to start of requested range in given file (" + requestedRange.startOffset + ")", callback, true, e ); return false; } // Send confirmation of range and compression mode we're about to send @@ -185,7 +206,6 @@ public class Uploader extends Transfer return false; } // Finally send requested chunk - byte[] data = new byte[ 500000 ]; // 500kb int hasRead = 0; int length = requestedRange.getLength(); while ( hasRead < length ) { @@ -193,7 +213,7 @@ public class Uploader extends Transfer try { ret = file.read( data, 0, Math.min( length - hasRead, data.length ) ); } catch ( IOException e ) { - this.close( "Error reading from file ", callback, true ); + this.close( "Error reading from file ", callback, true, e ); return false; } if ( ret == -1 ) { @@ -204,7 +224,7 @@ public class Uploader extends Transfer try { outStr.write( data, 0, ret ); } catch ( IOException e ) { - this.close( "Sending payload failed" ); + this.close( "Sending payload failed", e ); return false; } if ( callback != null ) diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java index 91d6f1e..27f8e8c 100644 --- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java +++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java @@ -1,5 +1,6 @@ package org.openslx.filetransfer.util; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -10,14 +11,15 @@ import java.util.LinkedList; import java.util.List; import java.util.zip.CRC32; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.openslx.filetransfer.LocalChunkSource.ChunkSource; import org.openslx.util.ThriftUtil; public class ChunkList { - private static final Logger LOGGER = Logger.getLogger( ChunkList.class ); + private static final Logger LOGGER = LogManager.getLogger( ChunkList.class ); /** * Here we keep a list of all chunks in the proper order, in case we quickly need to access one @@ -91,7 +93,7 @@ public class ChunkList * Get CRC32 list in DNBD3 format. All checksums are little * endian and prefixed by the crc32 sum of the list itself. */ - public synchronized byte[] getDnbd3Crc32List() throws IOException + public synchronized byte[] getDnbd3Crc32List() throws IllegalStateException { byte buffer[] = new byte[ allChunks.size() * 4 + 4 ]; // 4 byte per chunk plus master long nextChunkOffset = 0; @@ -142,7 +144,6 @@ public class ChunkList * Returns true if this list contains a chunk with state MISSING, * which means the chunk doesn't have a sha1 known to exist in * another image. - * @return */ public synchronized boolean hasLocallyMissingChunk() { @@ -204,7 +205,7 @@ public class ChunkList missingChunks.addAll( append ); } } catch ( Exception e ) { - LOGGER.warn( "chunk clone list if messed up", e ); + LOGGER.warn( "chunk clone list is messed up", e ); } } } @@ -414,6 +415,35 @@ public class ChunkList return sb.toString(); } + public synchronized String getStats() + { + int complete = 0, copying = 0, hashing = 0, missing = 0, qfc = 0, uploading = 0; + for ( FileChunk chunk : allChunks ) { + switch ( chunk.status ) { + case COMPLETE: + complete++; + break; + case COPYING: + copying++; + break; + case HASHING: + hashing++; + break; + case MISSING: + missing++; + break; + case QUEUED_FOR_COPY: + qfc++; + break; + case UPLOADING: + uploading++; + break; + } + } + return "(" + allChunks.size() + ":" + completeChunks.size() + "/" + pendingChunks.size() + "/" + missingChunks.size() + ")" + + " (" + complete + "/" + copying + "/" + hashing + "/" + missing + "/" + qfc + "/" + uploading + ")"; + } + public synchronized boolean isEmpty() { return allChunks.isEmpty(); @@ -491,4 +521,21 @@ public class ChunkList return chunk.sha1sum != null && Arrays.equals( FileChunk.NULL_BLOCK_SHA1, chunk.sha1sum ); } + /** + * Write DNBD3 CRC32 list to given file. + * + * @throws IllegalStateException + * @throws IOException + */ + public void writeCrc32List( String crcfile ) throws IllegalStateException, IOException + { + byte[] dnbd3Crc32List = null; + dnbd3Crc32List = getDnbd3Crc32List(); + if ( dnbd3Crc32List != null ) { + try ( FileOutputStream fos = new FileOutputStream( crcfile ) ) { + fos.write( dnbd3Crc32List ); + } + } + } + } diff --git a/src/main/java/org/openslx/filetransfer/util/FileChunk.java b/src/main/java/org/openslx/filetransfer/util/FileChunk.java index 6594e31..99b30ea 100644 --- a/src/main/java/org/openslx/filetransfer/util/FileChunk.java +++ b/src/main/java/org/openslx/filetransfer/util/FileChunk.java @@ -5,14 +5,15 @@ import java.util.Iterator; import java.util.List; import java.util.zip.CRC32; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.openslx.filetransfer.FileRange; import org.openslx.filetransfer.LocalChunkSource.ChunkSource; public class FileChunk { - private static final Logger LOGGER = Logger.getLogger( FileChunk.class ); + private static final Logger LOGGER = LogManager.getLogger( FileChunk.class ); /** * Length in bytes of binary sha1 representation diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java index f6b27f7..abbcd35 100644 --- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java +++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java @@ -9,15 +9,18 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class HashChecker { public static final int BLOCKING = 1; - public static final int CALC_HASH = 2; + public static final int CHECK_SHA1 = 2; public static final int CALC_CRC32 = 4; - - private static final Logger LOGGER = Logger.getLogger( HashChecker.class ); + public static final int CALC_SHA1 = 8; + public static final int NO_SLOW_WARN = 16; + + private static final Logger LOGGER = LogManager.getLogger( HashChecker.class ); private final BlockingQueue<HashTask> queue; @@ -26,7 +29,7 @@ public class HashChecker private final String algorithm; private boolean invalid = false; - + private final int queueCapacity; public HashChecker( String algorithm ) throws NoSuchAlgorithmException @@ -96,11 +99,12 @@ public class HashChecker public boolean queue( FileChunk chunk, byte[] data, HashCheckCallback callback, int flags ) throws InterruptedException { boolean blocking = ( flags & BLOCKING ) != 0; - boolean doHash = ( flags & CALC_HASH ) != 0; - boolean doCrc32 = ( flags & CALC_CRC32 ) != 0; - if ( doHash && chunk.getSha1Sum() == null ) + boolean checkSha1 = ( flags & CHECK_SHA1 ) != 0; + boolean calcCrc32 = ( flags & CALC_CRC32 ) != 0; + boolean calcSha1 = ( flags & CALC_SHA1 ) != 0; + if ( checkSha1 && chunk.getSha1Sum() == null ) throw new NullPointerException( "Chunk has no sha1 hash" ); - HashTask task = new HashTask( data, chunk, callback, doHash, doCrc32 ); + HashTask task = new HashTask( data, chunk, callback, checkSha1, calcCrc32, calcSha1 ); synchronized ( threads ) { if ( invalid ) { execCallback( task, HashResult.FAILURE ); @@ -132,11 +136,18 @@ public class HashChecker } } } - if ( doHash ) { + if ( checkSha1 ) { chunk.setStatus( ChunkStatus.HASHING ); } if ( blocking ) { + long pre = System.currentTimeMillis(); queue.put( task ); + if ( ( flags & NO_SLOW_WARN ) == 0 ) { + long duration = System.currentTimeMillis() - pre; + if ( duration > 1000 ) { + LOGGER.warn( "HashChecker.queue() took " + duration + "ms" ); + } + } } else { if ( !queue.offer( task ) ) { return false; @@ -152,7 +163,7 @@ public class HashChecker { return queue.size(); } - + public int getQueueCapacity() { return queueCapacity; @@ -202,15 +213,19 @@ public class HashChecker break; } HashResult result = HashResult.NONE; - if ( task.doHash ) { + if ( task.checkSha1 || task.calcSha1 ) { // Calculate digest - md.update( task.data, 0, task.chunk.range.getLength() ); - byte[] digest = md.digest(); - result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID; + md.update( task.data, 0, task.chunk.range.getLength() ); + byte[] digest = md.digest(); + if ( task.checkSha1 ) { + result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID; + } else { + task.chunk.setSha1Sum( digest ); + } } - if ( task.doCrc32 ) { - // Calculate CRC32 - task.chunk.calculateDnbd3Crc32( task.data ); + if ( task.calcCrc32 ) { + // Calculate CRC32 + task.chunk.calculateDnbd3Crc32( task.data ); } execCallback( task, result ); } @@ -223,7 +238,7 @@ public class HashChecker public static enum HashResult { - NONE, // No hashing tool place + NONE, // No hashing took place VALID, // Hash matches INVALID, // Hash does not match FAILURE // Error calculating hash @@ -234,16 +249,18 @@ public class HashChecker public final byte[] data; public final FileChunk chunk; public final HashCheckCallback callback; - public final boolean doHash; - public final boolean doCrc32; + public final boolean checkSha1; + public final boolean calcCrc32; + public final boolean calcSha1; - public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean doHash, boolean doCrc32 ) + public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean checkSha1, boolean calcCrc32, boolean calcSha1 ) { this.data = data; this.chunk = chunk; this.callback = callback; - this.doHash = doHash; - this.doCrc32 = doCrc32; + this.checkSha1 = checkSha1; + this.calcCrc32 = calcCrc32; + this.calcSha1 = calcSha1; } } diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java index 8a69020..5cca7b8 100644 --- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java +++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java @@ -14,7 +14,8 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.openslx.bwlp.thrift.iface.TransferState; import org.openslx.bwlp.thrift.iface.TransferStatus; import org.openslx.filetransfer.DataReceivedCallback; @@ -30,7 +31,7 @@ import org.openslx.util.ThriftUtil; public abstract class IncomingTransferBase extends AbstractTransfer implements HashCheckCallback { - private static final Logger LOGGER = Logger.getLogger( IncomingTransferBase.class ); + private static final Logger LOGGER = LogManager.getLogger( IncomingTransferBase.class ); /** * Remote peer is uploading, so on our end, we have Downloaders @@ -153,7 +154,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H @Override public final int getActiveConnectionCount() { - return downloads.size(); + synchronized ( downloads ) { + return downloads.size(); + } } public final boolean hashesEqual( List<ByteBuffer> blockHashes ) @@ -215,11 +218,11 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H public void updateBlockHashList( List<byte[]> hashList ) { if ( state != TransferState.IDLE && state != TransferState.WORKING ) { - LOGGER.debug( this.getId() + ": Rejecting block hash list in state " + state ); + LOGGER.info( this.getId() + ": Rejecting block hash list in state " + state ); return; } if ( hashList == null ) { - LOGGER.debug( this.getId() + ": Rejecting null block hash list" ); + LOGGER.info( this.getId() + ": Rejecting null block hash list" ); return; } int firstNew = chunks.updateSha1Sums( hashList ); @@ -249,7 +252,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H continue; } try { - if ( !hashChecker.queue( chunk, data, this, HashChecker.CALC_HASH ) ) { // false == queue full, stop + if ( !hashChecker.queue( chunk, data, this, HashChecker.CHECK_SHA1 ) ) { // false == queue full, stop chunks.markCompleted( chunk, false ); break; } @@ -286,6 +289,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H if ( sources != null && !sources.isEmpty() ) { chunks.markLocalCopyCandidates( sources ); } + if ( state == TransferState.IDLE ) { + state = TransferState.WORKING; + } localCopyManager.trigger(); } @@ -389,7 +395,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H currentChunk = chunks.getMissing(); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); - cancel(); + LOGGER.info( "Incoming transfer connection was interrupted" ); return null; } if ( currentChunk == null ) { @@ -429,7 +435,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H InterruptedException passEx = null; if ( hashChecker != null && currentChunk.getSha1Sum() != null ) { try { - hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH ); + hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CHECK_SHA1 ); return true; } catch ( InterruptedException e ) { passEx = e; @@ -437,7 +443,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } // We have no hash checker, or hasher rejected block, // or the hash for the current chunk is unknown - flush to disk + long pre = System.currentTimeMillis(); writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer ); + long duration = System.currentTimeMillis() - pre; + if ( duration > 2000 ) { + LOGGER.warn( "Writing chunk to disk before hash check took " + duration + "ms. Storage backend overloaded?" ); + } chunks.markCompleted( currentChunk, false ); chunkStatusChanged( currentChunk ); if ( passEx != null ) @@ -463,6 +474,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H @Override public void run() { + int active; try { CbHandler cbh = new CbHandler( connection ); if ( connection.download( cbh, cbh ) ) { @@ -481,7 +493,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } chunkStatusChanged( cbh.currentChunk ); } - LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" ); + LOGGER.info( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped prematurely" ); } if ( state != TransferState.FINISHED && state != TransferState.ERROR ) { lastActivityTime.set( System.currentTimeMillis() ); @@ -489,6 +501,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } finally { synchronized ( downloads ) { downloads.remove( connection ); + active = downloads.size(); } } if ( chunks.isComplete() ) { @@ -499,6 +512,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H if ( localCopyManager != null ) { localCopyManager.trigger(); } + LOGGER.info( "Downloader disconnected, " + active + " still running. " + chunks.getStats() ); + } else { + LOGGER.info( "Downloader disconnected, state=" + state + ". " + chunks.getStats() ); } } } ); @@ -563,7 +579,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H public void hashCheckDone( HashResult result, byte[] data, FileChunk chunk ) { if ( state != TransferState.IDLE && state != TransferState.WORKING ) { - LOGGER.debug( "hashCheckDone called in bad state " + state.name() ); + LOGGER.warn( "hashCheckDone called in bad state " + state.name() ); return; } switch ( result ) { @@ -576,7 +592,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H chunks.markCompleted( chunk, true ); } else { try { + long pre = System.currentTimeMillis(); writeFileData( chunk.range.startOffset, chunk.range.getLength(), data ); + long duration = System.currentTimeMillis() - pre; + if ( duration > 2000 ) { + LOGGER.warn( "Writing chunk to disk after hash check took " + duration + "ms. Storage backend overloaded?" ); + } chunks.markCompleted( chunk, true ); } catch ( Exception e ) { LOGGER.warn( "Cannot write to file after hash check", e ); @@ -600,7 +621,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } // A block finished, see if we can queue a new one queueUnhashedChunk( false ); - if ( localCopyManager != null ) { + if ( localCopyManager != null && localCopyManager.isAlive() ) { localCopyManager.trigger(); } } @@ -617,7 +638,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H try { data = loadChunkFromFile( chunk ); } catch ( EOFException e1 ) { - LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking is invalid." ); + LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking as invalid." ); chunks.markFailed( chunk ); chunkStatusChanged( chunk ); return; @@ -629,7 +650,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H return; } try { - int flags = HashChecker.CALC_HASH; + int flags = HashChecker.CHECK_SHA1; if ( blocking ) { flags |= HashChecker.BLOCKING; } @@ -645,7 +666,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H final synchronized void finishUploadInternal() { - if ( state == TransferState.FINISHED ) { + if ( state == TransferState.FINISHED || state == TransferState.ERROR ) { return; } try { @@ -659,17 +680,13 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H if ( localCopyManager != null ) { localCopyManager.interrupt(); } - if ( state != TransferState.WORKING ) { + state = TransferState.FINISHED; // Races... + if ( !finishIncomingTransfer() ) { state = TransferState.ERROR; - } else { - state = TransferState.FINISHED; // Races... - if ( !finishIncomingTransfer() ) { - state = TransferState.ERROR; - } } } - protected HashChecker getHashChecker() + public static HashChecker getHashChecker() { return hashChecker; } diff --git a/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java b/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java index 54dd2d0..e1fad97 100644 --- a/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java +++ b/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java @@ -9,7 +9,8 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.openslx.filetransfer.LocalChunkSource.ChunkSource; import org.openslx.filetransfer.LocalChunkSource.SourceFile; import org.openslx.util.Util; @@ -17,7 +18,7 @@ import org.openslx.util.Util; public class LocalCopyManager extends Thread { - private static final Logger LOGGER = Logger.getLogger( LocalCopyManager.class ); + private static final Logger LOGGER = LogManager.getLogger( LocalCopyManager.class ); private FileChunk currentChunk = null; diff --git a/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java index 18296c5..ad2e96c 100644 --- a/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java +++ b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java @@ -6,7 +6,8 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.filetransfer.Uploader; @@ -17,7 +18,7 @@ public abstract class OutgoingTransferBase extends AbstractTransfer * Constants */ - private static final Logger LOGGER = Logger.getLogger( OutgoingTransferBase.class ); + private static final Logger LOGGER = LogManager.getLogger( OutgoingTransferBase.class ); private static final long INACTIVITY_TIMEOUT = TimeUnit.MINUTES.toMillis( 5 ); @@ -74,9 +75,13 @@ public abstract class OutgoingTransferBase extends AbstractTransfer @Override public void run() { - boolean ret = connection.upload( sourceFile.getAbsolutePath() ); - synchronized ( uploads ) { - uploads.remove( connection ); + boolean ret = false; + try { + ret = connection.upload( sourceFile.getAbsolutePath() ); + } finally { + synchronized ( uploads ) { + uploads.remove( connection ); + } } if ( ret ) { connectFails.set( 0 ); |