summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openslx/filetransfer')
-rw-r--r--src/main/java/org/openslx/filetransfer/ClassTest.java13
-rw-r--r--src/main/java/org/openslx/filetransfer/Downloader.java36
-rw-r--r--src/main/java/org/openslx/filetransfer/FileRange.java6
-rw-r--r--src/main/java/org/openslx/filetransfer/Listener.java38
-rw-r--r--src/main/java/org/openslx/filetransfer/Transfer.java44
-rw-r--r--src/main/java/org/openslx/filetransfer/Uploader.java50
-rw-r--r--src/main/java/org/openslx/filetransfer/util/ChunkList.java57
-rw-r--r--src/main/java/org/openslx/filetransfer/util/FileChunk.java5
-rw-r--r--src/main/java/org/openslx/filetransfer/util/HashChecker.java65
-rw-r--r--src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java59
-rw-r--r--src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java5
-rw-r--r--src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java15
12 files changed, 281 insertions, 112 deletions
diff --git a/src/main/java/org/openslx/filetransfer/ClassTest.java b/src/main/java/org/openslx/filetransfer/ClassTest.java
index 9d5bc82..04dc40d 100644
--- a/src/main/java/org/openslx/filetransfer/ClassTest.java
+++ b/src/main/java/org/openslx/filetransfer/ClassTest.java
@@ -28,8 +28,8 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
-import org.apache.log4j.BasicConfigurator;
-import org.slf4j.LoggerFactory;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.logging.log4j.core.config.DefaultConfiguration;
public class ClassTest
{
@@ -39,15 +39,10 @@ public class ClassTest
private static String inFile;
private static String outFile;
- static {
- // This is a temporary workaround for this annoying log4j error msg.
- // Initializing the logger before anything else is done.
- BasicConfigurator.configure();
- LoggerFactory.getLogger( "ROOT" );
- }
-
public static void main( String[] args ) throws Exception
{
+ Configurator.initialize(new DefaultConfiguration());
+
if ( args.length != 4 ) {
System.out.println( "Need 4 argument: <keystore> <passphrase> <infile> <outfile>" );
System.exit( 1 );
diff --git a/src/main/java/org/openslx/filetransfer/Downloader.java b/src/main/java/org/openslx/filetransfer/Downloader.java
index 50162fc..5aff94b 100644
--- a/src/main/java/org/openslx/filetransfer/Downloader.java
+++ b/src/main/java/org/openslx/filetransfer/Downloader.java
@@ -12,12 +12,13 @@ import javax.net.ssl.SSLContext;
import net.jpountz.lz4.LZ4FastDecompressor;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class Downloader extends Transfer
{
- private static final Logger log = Logger.getLogger( Downloader.class );
+ private static final Logger log = LogManager.getLogger( Downloader.class );
private final LZ4FastDecompressor decompressor = lz4factory.fastDecompressor();
@@ -115,6 +116,7 @@ public class Downloader extends Transfer
compressed += compressedLength;
uncompressed += decompressedLength;
if ( decompressedLength > len ) {
+ // TODO: Partial reads with buffering, if remote payload is larger than our buffer
throw new RuntimeException( "This should never happen! ;)" );
}
if ( decompressedLength == compressedLength ) {
@@ -167,6 +169,19 @@ public class Downloader extends Transfer
}
FileRange requestedRange;
try {
+ byte[] incoming = new byte[ 500000 ];
+ /* TODO once the Lz4InputStream can handle small buffer sizes / partial reads
+ for ( int bufsiz = 600; bufsiz >= 100 && incoming == null; bufsiz -= 100 ) {
+ try {
+ incoming = new byte[ bufsiz * 1024 ];
+ } catch ( OutOfMemoryError e ) {
+ }
+ }
+ if ( incoming == null ) {
+ log.error( "Could not allocate buffer for receiving." );
+ return false;
+ }
+ */
while ( ( requestedRange = rangeCallback.get() ) != null ) {
if ( requestedRange.startOffset < 0 || requestedRange.startOffset >= requestedRange.endOffset ) {
log.error( "Callback supplied bad range (" + requestedRange.startOffset + " to " + requestedRange.endOffset + ")" );
@@ -187,16 +202,23 @@ public class Downloader extends Transfer
log.error( "Did not receive meta data from uploading remote peer after requesting range, aborting." );
return false;
}
+ if ( getRemoteError() != null ) {
+ log.error( "Remote peer sent error: " + getRemoteError() );
+ return false;
+ }
FileRange remoteRange = meta.getRange();
- if ( remoteRange == null || !remoteRange.equals( requestedRange ) ) {
- log.error( "Confirmed range by remote peer does not match requested range, aborting download." );
+ if ( remoteRange == null ) {
+ log.error( "Remote metadata does not contain range confirmation. " + meta );
+ }
+ if ( !remoteRange.equals( requestedRange ) ) {
+ log.error( "Confirmed range by remote peer (" + remoteRange
+ + ") does not match requested range (" + requestedRange + "), aborting download." );
return false;
}
// Receive requested range
int chunkLength = requestedRange.getLength();
// If the uploader sets the COMPRESS field, assume compressed chunk
InputStream inStream = meta.peerWantsCompression() ? compressedIn : dataFromServer;
- byte[] incoming = new byte[ 500000 ]; // 500kb
int hasRead = 0;
while ( hasRead < chunkLength ) {
int ret;
@@ -207,7 +229,7 @@ public class Downloader extends Transfer
return false;
}
} catch ( IOException e ) {
- log.error( "Could not read payload from socket" );
+ log.error( "Could not read payload from socket", e );
sendErrorCode( "payload read error" );
return false;
}
@@ -227,7 +249,7 @@ public class Downloader extends Transfer
compressedIn.printStats();
try {
transferSocket.shutdownOutput();
- } catch ( IOException e ) {
+ } catch ( Exception e ) {
}
} finally {
this.close( null );
diff --git a/src/main/java/org/openslx/filetransfer/FileRange.java b/src/main/java/org/openslx/filetransfer/FileRange.java
index e8a7d12..ed86e51 100644
--- a/src/main/java/org/openslx/filetransfer/FileRange.java
+++ b/src/main/java/org/openslx/filetransfer/FileRange.java
@@ -64,4 +64,10 @@ public class FileRange
return (int)startOffset ^ Integer.rotateLeft( (int)endOffset, 16 ) ^ (int)(startOffset >> 32);
}
+ @Override
+ public String toString()
+ {
+ return startOffset + "-" + endOffset;
+ }
+
}
diff --git a/src/main/java/org/openslx/filetransfer/Listener.java b/src/main/java/org/openslx/filetransfer/Listener.java
index a0fc172..fc990fc 100644
--- a/src/main/java/org/openslx/filetransfer/Listener.java
+++ b/src/main/java/org/openslx/filetransfer/Listener.java
@@ -3,6 +3,7 @@ package org.openslx.filetransfer;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
@@ -11,9 +12,11 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
import javax.net.ssl.SSLServerSocketFactory;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.openslx.util.PrioThreadFactory;
public class Listener
@@ -25,11 +28,11 @@ public class Listener
private Thread acceptThread = null;
private final int readTimeoutMs;
private final ExecutorService processingPool = new ThreadPoolExecutor( 0, 8, 5, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(),
- new PrioThreadFactory( "BFTP-Init" ) );
+ new PrioThreadFactory( "BFTP-BS" ) );
- private static final byte CONNECTING_PEER_WANTS_TO_UPLOAD = 85; // hex - code 'U' = 85.
- private static final byte CONNECTING_PEER_WANTS_TO_DOWNLOAD = 68; // hex - code 'D' = 68.
- private static Logger log = Logger.getLogger( Listener.class );
+ private static final byte CONNECTING_PEER_WANTS_TO_UPLOAD = 85; // ASCII 'U' = 85.
+ private static final byte CONNECTING_PEER_WANTS_TO_DOWNLOAD = 68; // ASCII 'D' = 68.
+ private static Logger log = LogManager.getLogger( Listener.class );
/***********************************************************************/
/**
@@ -67,8 +70,10 @@ public class Listener
SSLServerSocketFactory sslServerSocketFactory = context.getServerSocketFactory();
listenSocket = sslServerSocketFactory.createServerSocket();
}
+ listenSocket.setSoTimeout( 5000 );
listenSocket.setReuseAddress( true );
listenSocket.bind( new InetSocketAddress( this.port ) );
+ listenSocket.setSoTimeout( 0 );
} catch ( Exception e ) {
log.error( "Cannot listen on port " + this.port, e );
listenSocket = null;
@@ -82,16 +87,16 @@ public class Listener
if ( acceptThread != null )
return;
final Listener instance = this;
- acceptThread = new Thread( "BFTP-Listen-" + this.port ) {
+ acceptThread = new Thread( "BFTP:" + this.port ) {
@Override
public void run()
{
try {
// Run accept loop in own thread
while ( !isInterrupted() ) {
- Socket acceptedSocket = null;
+ final Socket connection;
try {
- acceptedSocket = listenSocket.accept();
+ connection = listenSocket.accept();
} catch ( SocketTimeoutException e ) {
continue;
} catch ( Exception e ) {
@@ -105,14 +110,12 @@ public class Listener
continue;
}
// Handle each accepted connection in a thread pool
- final Socket connection = acceptedSocket;
Runnable handler = new Runnable() {
@Override
public void run()
{
-
try {
- // Give initial byte signalling mode of operation 5 secs to arrive
+ // Give initial byte signaling mode of operation 5 secs to arrive
connection.setSoTimeout( 5000 );
byte[] b = new byte[ 1 ];
@@ -127,25 +130,32 @@ public class Listener
if ( b[0] == CONNECTING_PEER_WANTS_TO_UPLOAD ) {
// --> start Downloader(socket).
Downloader d = new Downloader( connection );
+ // Will take care of connection cleanup
incomingEvent.incomingUploadRequest( d );
} else if ( b[0] == CONNECTING_PEER_WANTS_TO_DOWNLOAD ) {
// --> start Uploader(socket).
Uploader u = new Uploader( connection );
+ // Will take care of connection cleanup
incomingEvent.incomingDownloadRequest( u );
} else {
- log.debug( "Got invalid init-byte ... close connection" );
+ log.debug( "Got invalid init-byte ... closing connection" );
Transfer.safeClose( connection );
}
+ } catch ( SSLException e ) {
+ Transfer.safeClose( connection );
+ log.warn( "SSL error when acceping client " + connection.getInetAddress().getHostAddress() );
+ } catch ( SocketException e ) {
+ // No reason to log, probably - connection where client did nothing after connecting.
} catch ( Exception e ) {
- log.warn( "Error accepting client", e );
Transfer.safeClose( connection );
+ log.warn( "Error handling client", e );
}
}
};
try {
processingPool.execute( handler );
} catch ( RejectedExecutionException e ) {
- Transfer.safeClose( acceptedSocket );
+ Transfer.safeClose( connection );
}
}
} finally {
diff --git a/src/main/java/org/openslx/filetransfer/Transfer.java b/src/main/java/org/openslx/filetransfer/Transfer.java
index 589d142..aebd3ce 100644
--- a/src/main/java/org/openslx/filetransfer/Transfer.java
+++ b/src/main/java/org/openslx/filetransfer/Transfer.java
@@ -9,13 +9,14 @@ import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import net.jpountz.lz4.LZ4Factory;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.Logger;
import org.openslx.util.Util;
public abstract class Transfer
@@ -23,7 +24,7 @@ public abstract class Transfer
protected final Socket transferSocket;
protected final DataOutputStream outStream;
protected final DataInputStream dataFromServer;
- protected String ERROR = null;
+ private String remoteError;
private boolean shouldGetToken;
protected boolean useCompression = true;
@@ -51,7 +52,7 @@ public abstract class Transfer
transferSocket = sslSocketFactory.createSocket();
}
transferSocket.setSoTimeout( readTimeoutMs );
- transferSocket.connect( new InetSocketAddress( host, port ) );
+ transferSocket.connect( new InetSocketAddress( host, port ), 4000 );
outStream = new DataOutputStream( transferSocket.getOutputStream() );
dataFromServer = new DataInputStream( transferSocket.getInputStream() );
@@ -169,6 +170,7 @@ public abstract class Transfer
*/
protected MetaData readMetaData()
{
+
Map<String, String> entries = new HashMap<>();
try {
while ( true ) {
@@ -183,7 +185,7 @@ public abstract class Transfer
continue;
}
if ( splitted[0].equals( "ERROR" ) )
- ERROR = splitted[1];
+ remoteError = splitted[1];
if ( entries.containsKey( splitted[0] ) ) {
log.warn( "Received meta data key " + splitted[0] + " when already received, ignoring!" );
} else {
@@ -192,10 +194,10 @@ public abstract class Transfer
}
} catch ( SocketTimeoutException ste ) {
sendErrorCode( "timeout" );
- this.close( "Socket Timeout occured in readMetaData. " + ERROR );
+ this.close( "Socket Timeout occured in readMetaData." );
return null;
} catch ( Exception e ) {
- this.close( "Exception occured in readMetaData: " + e.toString() + " " + ERROR );
+ this.close( "Exception occured in readMetaData: " + e.toString() );
return null;
}
return new MetaData( entries );
@@ -219,11 +221,17 @@ public abstract class Transfer
*/
protected void close( String error, UploadStatusCallback callback, boolean sendToPeer )
{
+ close( error, callback, sendToPeer, null );
+ }
+
+ protected void close( String error, UploadStatusCallback callback, boolean sendToPeer, Exception e )
+ {
if ( error != null ) {
if ( sendToPeer )
sendErrorCode( error );
if ( callback != null )
callback.uploadError( error );
+ log.info( "Closing with error '" + error + "'", e );
}
synchronized ( transferSocket ) {
safeClose( dataFromServer, outStream, transferSocket );
@@ -232,7 +240,12 @@ public abstract class Transfer
protected void close( String error )
{
- close( error, null, false );
+ close( error, null );
+ }
+
+ protected void close( String error, Exception e )
+ {
+ close( error, null, false, e );
}
public void cancel()
@@ -272,7 +285,7 @@ public abstract class Transfer
*/
public String getRemoteError()
{
- return ERROR;
+ return remoteError;
}
/**
@@ -406,6 +419,21 @@ public abstract class Transfer
return meta.containsKey( "COMPRESS" );
}
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ for ( Entry<String, String> it : meta.entrySet() ) {
+ if ( sb.length() != 0 ) {
+ sb.append( ' ' );
+ }
+ sb.append( it.getKey() );
+ sb.append( '=' );
+ sb.append( it.getValue() );
+ }
+ return sb.toString();
+ }
+
}
}
diff --git a/src/main/java/org/openslx/filetransfer/Uploader.java b/src/main/java/org/openslx/filetransfer/Uploader.java
index e82955b..6edc268 100644
--- a/src/main/java/org/openslx/filetransfer/Uploader.java
+++ b/src/main/java/org/openslx/filetransfer/Uploader.java
@@ -12,12 +12,13 @@ import javax.net.ssl.SSLContext;
import net.jpountz.lz4.LZ4Compressor;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class Uploader extends Transfer
{
- private static final Logger log = Logger.getLogger( Uploader.class );
+ private static final Logger log = LogManager.getLogger( Uploader.class );
private final LZ4Compressor compressor = lz4factory.fastCompressor();
@@ -65,6 +66,10 @@ public class Uploader extends Transfer
return upload( filename, null );
}
+ /**
+ * Compressing output stream that will either write LZ4-compressed data, or if the data
+ * doesn't compress well, just the original uncompressed data.
+ */
private class Lz4OutStream extends OutputStream
{
@@ -72,7 +77,7 @@ public class Uploader extends Transfer
private byte[] buffer;
- private long compressed, uncompressed;
+ private long bytesSentTotal, bytesDecompressedTotal;
private int chunksCompressed, chunksUncompressed;
@@ -95,16 +100,17 @@ public class Uploader extends Transfer
if ( buffer == null || buffer.length < maxCompressedLength ) {
buffer = new byte[ maxCompressedLength ];
}
- uncompressed += decompressedLength;
+ bytesDecompressedTotal += decompressedLength;
int compressedLength = compressor.compress( data, off, decompressedLength, buffer, 0, maxCompressedLength );
parentStream.writeInt( decompressedLength );
+ // Only send compressed data if we got down to at least ~88% the original size
if ( ( compressedLength * 9 / 8 ) < decompressedLength ) {
- compressed += compressedLength;
+ bytesSentTotal += compressedLength;
chunksCompressed++;
parentStream.writeInt( compressedLength );
parentStream.write( buffer, 0, compressedLength );
} else {
- compressed += decompressedLength;
+ bytesSentTotal += decompressedLength;
chunksUncompressed++;
parentStream.writeInt( decompressedLength );
parentStream.write( data, off, decompressedLength );
@@ -113,15 +119,14 @@ public class Uploader extends Transfer
public void printStats()
{
- if ( compressed == 0 )
+ if ( bytesSentTotal == 0 )
return;
- log.info( "Sent bytes: " + compressed + ", decompressed bytes: " + uncompressed );
- log.info( "Sent compressed: " + chunksCompressed + ", uncompressed: " + chunksUncompressed );
+ log.info( "Bytes sent: " + bytesSentTotal + ", decompressed to: " + bytesDecompressedTotal );
+ log.info( "Chunks sent compressed: " + chunksCompressed + ", uncompressed: " + chunksUncompressed );
}
}
- @SuppressWarnings( "resource" )
public boolean upload( String filename, UploadStatusCallback callback )
{
if ( shouldGetToken() ) {
@@ -136,6 +141,18 @@ public class Uploader extends Transfer
this.close( "Could not open given file for reading.", callback, true );
return false;
}
+ byte[] data = null;
+ // Cannot go above 500000 for backwards compat
+ for ( int bufsiz = 500; bufsiz >= 100 && data == null; bufsiz -= 100 ) {
+ try {
+ data = new byte[ bufsiz * 1000 ];
+ } catch ( OutOfMemoryError e ) {
+ }
+ }
+ if ( data == null ) {
+ this.close( "Could not allocate buffer for reading.", callback, true );
+ return false;
+ }
while ( !Thread.currentThread().isInterrupted() ) { // Loop as long as remote peer is requesting chunks from this file
// Read meta data of remote peer - either new range, or it's telling us it's done
MetaData meta = readMetaData();
@@ -145,6 +162,10 @@ public class Uploader extends Transfer
}
if ( meta.isDone() ) // Download complete?
break;
+ if ( getRemoteError() != null ) {
+ this.close( "Remote peer sent error: " + getRemoteError(), callback, true );
+ return false;
+ }
// Not complete, so there must be another range request
FileRange requestedRange = meta.getRange();
if ( requestedRange == null ) {
@@ -158,14 +179,14 @@ public class Uploader extends Transfer
return false;
}
} catch ( IOException e ) {
- this.close( "Could not get current length of file " + filename, callback, false );
+ this.close( "Could not get current length of file " + filename, callback, false, e );
return false;
}
// Seek to requested chunk
try {
file.seek( requestedRange.startOffset );
} catch ( IOException e ) {
- this.close( "Could not seek to start of requested range in given file (" + requestedRange.startOffset + ")", callback, true );
+ this.close( "Could not seek to start of requested range in given file (" + requestedRange.startOffset + ")", callback, true, e );
return false;
}
// Send confirmation of range and compression mode we're about to send
@@ -185,7 +206,6 @@ public class Uploader extends Transfer
return false;
}
// Finally send requested chunk
- byte[] data = new byte[ 500000 ]; // 500kb
int hasRead = 0;
int length = requestedRange.getLength();
while ( hasRead < length ) {
@@ -193,7 +213,7 @@ public class Uploader extends Transfer
try {
ret = file.read( data, 0, Math.min( length - hasRead, data.length ) );
} catch ( IOException e ) {
- this.close( "Error reading from file ", callback, true );
+ this.close( "Error reading from file ", callback, true, e );
return false;
}
if ( ret == -1 ) {
@@ -204,7 +224,7 @@ public class Uploader extends Transfer
try {
outStr.write( data, 0, ret );
} catch ( IOException e ) {
- this.close( "Sending payload failed" );
+ this.close( "Sending payload failed", e );
return false;
}
if ( callback != null )
diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
index 91d6f1e..27f8e8c 100644
--- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java
+++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
@@ -1,5 +1,6 @@
package org.openslx.filetransfer.util;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -10,14 +11,15 @@ import java.util.LinkedList;
import java.util.List;
import java.util.zip.CRC32;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.openslx.filetransfer.LocalChunkSource.ChunkSource;
import org.openslx.util.ThriftUtil;
public class ChunkList
{
- private static final Logger LOGGER = Logger.getLogger( ChunkList.class );
+ private static final Logger LOGGER = LogManager.getLogger( ChunkList.class );
/**
* Here we keep a list of all chunks in the proper order, in case we quickly need to access one
@@ -91,7 +93,7 @@ public class ChunkList
* Get CRC32 list in DNBD3 format. All checksums are little
* endian and prefixed by the crc32 sum of the list itself.
*/
- public synchronized byte[] getDnbd3Crc32List() throws IOException
+ public synchronized byte[] getDnbd3Crc32List() throws IllegalStateException
{
byte buffer[] = new byte[ allChunks.size() * 4 + 4 ]; // 4 byte per chunk plus master
long nextChunkOffset = 0;
@@ -142,7 +144,6 @@ public class ChunkList
* Returns true if this list contains a chunk with state MISSING,
* which means the chunk doesn't have a sha1 known to exist in
* another image.
- * @return
*/
public synchronized boolean hasLocallyMissingChunk()
{
@@ -204,7 +205,7 @@ public class ChunkList
missingChunks.addAll( append );
}
} catch ( Exception e ) {
- LOGGER.warn( "chunk clone list if messed up", e );
+ LOGGER.warn( "chunk clone list is messed up", e );
}
}
}
@@ -414,6 +415,35 @@ public class ChunkList
return sb.toString();
}
+ public synchronized String getStats()
+ {
+ int complete = 0, copying = 0, hashing = 0, missing = 0, qfc = 0, uploading = 0;
+ for ( FileChunk chunk : allChunks ) {
+ switch ( chunk.status ) {
+ case COMPLETE:
+ complete++;
+ break;
+ case COPYING:
+ copying++;
+ break;
+ case HASHING:
+ hashing++;
+ break;
+ case MISSING:
+ missing++;
+ break;
+ case QUEUED_FOR_COPY:
+ qfc++;
+ break;
+ case UPLOADING:
+ uploading++;
+ break;
+ }
+ }
+ return "(" + allChunks.size() + ":" + completeChunks.size() + "/" + pendingChunks.size() + "/" + missingChunks.size() + ")"
+ + " (" + complete + "/" + copying + "/" + hashing + "/" + missing + "/" + qfc + "/" + uploading + ")";
+ }
+
public synchronized boolean isEmpty()
{
return allChunks.isEmpty();
@@ -491,4 +521,21 @@ public class ChunkList
return chunk.sha1sum != null && Arrays.equals( FileChunk.NULL_BLOCK_SHA1, chunk.sha1sum );
}
+ /**
+ * Write DNBD3 CRC32 list to given file.
+ *
+ * @throws IllegalStateException
+ * @throws IOException
+ */
+ public void writeCrc32List( String crcfile ) throws IllegalStateException, IOException
+ {
+ byte[] dnbd3Crc32List = null;
+ dnbd3Crc32List = getDnbd3Crc32List();
+ if ( dnbd3Crc32List != null ) {
+ try ( FileOutputStream fos = new FileOutputStream( crcfile ) ) {
+ fos.write( dnbd3Crc32List );
+ }
+ }
+ }
+
}
diff --git a/src/main/java/org/openslx/filetransfer/util/FileChunk.java b/src/main/java/org/openslx/filetransfer/util/FileChunk.java
index 6594e31..99b30ea 100644
--- a/src/main/java/org/openslx/filetransfer/util/FileChunk.java
+++ b/src/main/java/org/openslx/filetransfer/util/FileChunk.java
@@ -5,14 +5,15 @@ import java.util.Iterator;
import java.util.List;
import java.util.zip.CRC32;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.openslx.filetransfer.FileRange;
import org.openslx.filetransfer.LocalChunkSource.ChunkSource;
public class FileChunk
{
- private static final Logger LOGGER = Logger.getLogger( FileChunk.class );
+ private static final Logger LOGGER = LogManager.getLogger( FileChunk.class );
/**
* Length in bytes of binary sha1 representation
diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
index f6b27f7..abbcd35 100644
--- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java
+++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
@@ -9,15 +9,18 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class HashChecker
{
public static final int BLOCKING = 1;
- public static final int CALC_HASH = 2;
+ public static final int CHECK_SHA1 = 2;
public static final int CALC_CRC32 = 4;
-
- private static final Logger LOGGER = Logger.getLogger( HashChecker.class );
+ public static final int CALC_SHA1 = 8;
+ public static final int NO_SLOW_WARN = 16;
+
+ private static final Logger LOGGER = LogManager.getLogger( HashChecker.class );
private final BlockingQueue<HashTask> queue;
@@ -26,7 +29,7 @@ public class HashChecker
private final String algorithm;
private boolean invalid = false;
-
+
private final int queueCapacity;
public HashChecker( String algorithm ) throws NoSuchAlgorithmException
@@ -96,11 +99,12 @@ public class HashChecker
public boolean queue( FileChunk chunk, byte[] data, HashCheckCallback callback, int flags ) throws InterruptedException
{
boolean blocking = ( flags & BLOCKING ) != 0;
- boolean doHash = ( flags & CALC_HASH ) != 0;
- boolean doCrc32 = ( flags & CALC_CRC32 ) != 0;
- if ( doHash && chunk.getSha1Sum() == null )
+ boolean checkSha1 = ( flags & CHECK_SHA1 ) != 0;
+ boolean calcCrc32 = ( flags & CALC_CRC32 ) != 0;
+ boolean calcSha1 = ( flags & CALC_SHA1 ) != 0;
+ if ( checkSha1 && chunk.getSha1Sum() == null )
throw new NullPointerException( "Chunk has no sha1 hash" );
- HashTask task = new HashTask( data, chunk, callback, doHash, doCrc32 );
+ HashTask task = new HashTask( data, chunk, callback, checkSha1, calcCrc32, calcSha1 );
synchronized ( threads ) {
if ( invalid ) {
execCallback( task, HashResult.FAILURE );
@@ -132,11 +136,18 @@ public class HashChecker
}
}
}
- if ( doHash ) {
+ if ( checkSha1 ) {
chunk.setStatus( ChunkStatus.HASHING );
}
if ( blocking ) {
+ long pre = System.currentTimeMillis();
queue.put( task );
+ if ( ( flags & NO_SLOW_WARN ) == 0 ) {
+ long duration = System.currentTimeMillis() - pre;
+ if ( duration > 1000 ) {
+ LOGGER.warn( "HashChecker.queue() took " + duration + "ms" );
+ }
+ }
} else {
if ( !queue.offer( task ) ) {
return false;
@@ -152,7 +163,7 @@ public class HashChecker
{
return queue.size();
}
-
+
public int getQueueCapacity()
{
return queueCapacity;
@@ -202,15 +213,19 @@ public class HashChecker
break;
}
HashResult result = HashResult.NONE;
- if ( task.doHash ) {
+ if ( task.checkSha1 || task.calcSha1 ) {
// Calculate digest
- md.update( task.data, 0, task.chunk.range.getLength() );
- byte[] digest = md.digest();
- result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID;
+ md.update( task.data, 0, task.chunk.range.getLength() );
+ byte[] digest = md.digest();
+ if ( task.checkSha1 ) {
+ result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID;
+ } else {
+ task.chunk.setSha1Sum( digest );
+ }
}
- if ( task.doCrc32 ) {
- // Calculate CRC32
- task.chunk.calculateDnbd3Crc32( task.data );
+ if ( task.calcCrc32 ) {
+ // Calculate CRC32
+ task.chunk.calculateDnbd3Crc32( task.data );
}
execCallback( task, result );
}
@@ -223,7 +238,7 @@ public class HashChecker
public static enum HashResult
{
- NONE, // No hashing tool place
+ NONE, // No hashing took place
VALID, // Hash matches
INVALID, // Hash does not match
FAILURE // Error calculating hash
@@ -234,16 +249,18 @@ public class HashChecker
public final byte[] data;
public final FileChunk chunk;
public final HashCheckCallback callback;
- public final boolean doHash;
- public final boolean doCrc32;
+ public final boolean checkSha1;
+ public final boolean calcCrc32;
+ public final boolean calcSha1;
- public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean doHash, boolean doCrc32 )
+ public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean checkSha1, boolean calcCrc32, boolean calcSha1 )
{
this.data = data;
this.chunk = chunk;
this.callback = callback;
- this.doHash = doHash;
- this.doCrc32 = doCrc32;
+ this.checkSha1 = checkSha1;
+ this.calcCrc32 = calcCrc32;
+ this.calcSha1 = calcSha1;
}
}
diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
index 8a69020..5cca7b8 100644
--- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
+++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
@@ -14,7 +14,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.openslx.bwlp.thrift.iface.TransferState;
import org.openslx.bwlp.thrift.iface.TransferStatus;
import org.openslx.filetransfer.DataReceivedCallback;
@@ -30,7 +31,7 @@ import org.openslx.util.ThriftUtil;
public abstract class IncomingTransferBase extends AbstractTransfer implements HashCheckCallback
{
- private static final Logger LOGGER = Logger.getLogger( IncomingTransferBase.class );
+ private static final Logger LOGGER = LogManager.getLogger( IncomingTransferBase.class );
/**
* Remote peer is uploading, so on our end, we have Downloaders
@@ -153,7 +154,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
@Override
public final int getActiveConnectionCount()
{
- return downloads.size();
+ synchronized ( downloads ) {
+ return downloads.size();
+ }
}
public final boolean hashesEqual( List<ByteBuffer> blockHashes )
@@ -215,11 +218,11 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
public void updateBlockHashList( List<byte[]> hashList )
{
if ( state != TransferState.IDLE && state != TransferState.WORKING ) {
- LOGGER.debug( this.getId() + ": Rejecting block hash list in state " + state );
+ LOGGER.info( this.getId() + ": Rejecting block hash list in state " + state );
return;
}
if ( hashList == null ) {
- LOGGER.debug( this.getId() + ": Rejecting null block hash list" );
+ LOGGER.info( this.getId() + ": Rejecting null block hash list" );
return;
}
int firstNew = chunks.updateSha1Sums( hashList );
@@ -249,7 +252,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
continue;
}
try {
- if ( !hashChecker.queue( chunk, data, this, HashChecker.CALC_HASH ) ) { // false == queue full, stop
+ if ( !hashChecker.queue( chunk, data, this, HashChecker.CHECK_SHA1 ) ) { // false == queue full, stop
chunks.markCompleted( chunk, false );
break;
}
@@ -286,6 +289,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
if ( sources != null && !sources.isEmpty() ) {
chunks.markLocalCopyCandidates( sources );
}
+ if ( state == TransferState.IDLE ) {
+ state = TransferState.WORKING;
+ }
localCopyManager.trigger();
}
@@ -389,7 +395,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
currentChunk = chunks.getMissing();
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
- cancel();
+ LOGGER.info( "Incoming transfer connection was interrupted" );
return null;
}
if ( currentChunk == null ) {
@@ -429,7 +435,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
InterruptedException passEx = null;
if ( hashChecker != null && currentChunk.getSha1Sum() != null ) {
try {
- hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH );
+ hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CHECK_SHA1 );
return true;
} catch ( InterruptedException e ) {
passEx = e;
@@ -437,7 +443,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
// We have no hash checker, or hasher rejected block,
// or the hash for the current chunk is unknown - flush to disk
+ long pre = System.currentTimeMillis();
writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer );
+ long duration = System.currentTimeMillis() - pre;
+ if ( duration > 2000 ) {
+ LOGGER.warn( "Writing chunk to disk before hash check took " + duration + "ms. Storage backend overloaded?" );
+ }
chunks.markCompleted( currentChunk, false );
chunkStatusChanged( currentChunk );
if ( passEx != null )
@@ -463,6 +474,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
@Override
public void run()
{
+ int active;
try {
CbHandler cbh = new CbHandler( connection );
if ( connection.download( cbh, cbh ) ) {
@@ -481,7 +493,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
chunkStatusChanged( cbh.currentChunk );
}
- LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" );
+ LOGGER.info( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped prematurely" );
}
if ( state != TransferState.FINISHED && state != TransferState.ERROR ) {
lastActivityTime.set( System.currentTimeMillis() );
@@ -489,6 +501,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
} finally {
synchronized ( downloads ) {
downloads.remove( connection );
+ active = downloads.size();
}
}
if ( chunks.isComplete() ) {
@@ -499,6 +512,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
if ( localCopyManager != null ) {
localCopyManager.trigger();
}
+ LOGGER.info( "Downloader disconnected, " + active + " still running. " + chunks.getStats() );
+ } else {
+ LOGGER.info( "Downloader disconnected, state=" + state + ". " + chunks.getStats() );
}
}
} );
@@ -563,7 +579,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
public void hashCheckDone( HashResult result, byte[] data, FileChunk chunk )
{
if ( state != TransferState.IDLE && state != TransferState.WORKING ) {
- LOGGER.debug( "hashCheckDone called in bad state " + state.name() );
+ LOGGER.warn( "hashCheckDone called in bad state " + state.name() );
return;
}
switch ( result ) {
@@ -576,7 +592,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
chunks.markCompleted( chunk, true );
} else {
try {
+ long pre = System.currentTimeMillis();
writeFileData( chunk.range.startOffset, chunk.range.getLength(), data );
+ long duration = System.currentTimeMillis() - pre;
+ if ( duration > 2000 ) {
+ LOGGER.warn( "Writing chunk to disk after hash check took " + duration + "ms. Storage backend overloaded?" );
+ }
chunks.markCompleted( chunk, true );
} catch ( Exception e ) {
LOGGER.warn( "Cannot write to file after hash check", e );
@@ -600,7 +621,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
// A block finished, see if we can queue a new one
queueUnhashedChunk( false );
- if ( localCopyManager != null ) {
+ if ( localCopyManager != null && localCopyManager.isAlive() ) {
localCopyManager.trigger();
}
}
@@ -617,7 +638,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
try {
data = loadChunkFromFile( chunk );
} catch ( EOFException e1 ) {
- LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking is invalid." );
+ LOGGER.warn( "Cannot queue unhashed chunk: file too short. Marking as invalid." );
chunks.markFailed( chunk );
chunkStatusChanged( chunk );
return;
@@ -629,7 +650,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
return;
}
try {
- int flags = HashChecker.CALC_HASH;
+ int flags = HashChecker.CHECK_SHA1;
if ( blocking ) {
flags |= HashChecker.BLOCKING;
}
@@ -645,7 +666,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
final synchronized void finishUploadInternal()
{
- if ( state == TransferState.FINISHED ) {
+ if ( state == TransferState.FINISHED || state == TransferState.ERROR ) {
return;
}
try {
@@ -659,17 +680,13 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
if ( localCopyManager != null ) {
localCopyManager.interrupt();
}
- if ( state != TransferState.WORKING ) {
+ state = TransferState.FINISHED; // Races...
+ if ( !finishIncomingTransfer() ) {
state = TransferState.ERROR;
- } else {
- state = TransferState.FINISHED; // Races...
- if ( !finishIncomingTransfer() ) {
- state = TransferState.ERROR;
- }
}
}
- protected HashChecker getHashChecker()
+ public static HashChecker getHashChecker()
{
return hashChecker;
}
diff --git a/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java b/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java
index 54dd2d0..e1fad97 100644
--- a/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java
+++ b/src/main/java/org/openslx/filetransfer/util/LocalCopyManager.java
@@ -9,7 +9,8 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.openslx.filetransfer.LocalChunkSource.ChunkSource;
import org.openslx.filetransfer.LocalChunkSource.SourceFile;
import org.openslx.util.Util;
@@ -17,7 +18,7 @@ import org.openslx.util.Util;
public class LocalCopyManager extends Thread
{
- private static final Logger LOGGER = Logger.getLogger( LocalCopyManager.class );
+ private static final Logger LOGGER = LogManager.getLogger( LocalCopyManager.class );
private FileChunk currentChunk = null;
diff --git a/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java
index 18296c5..ad2e96c 100644
--- a/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java
+++ b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java
@@ -6,7 +6,8 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import org.apache.log4j.Logger;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.filetransfer.Uploader;
@@ -17,7 +18,7 @@ public abstract class OutgoingTransferBase extends AbstractTransfer
* Constants
*/
- private static final Logger LOGGER = Logger.getLogger( OutgoingTransferBase.class );
+ private static final Logger LOGGER = LogManager.getLogger( OutgoingTransferBase.class );
private static final long INACTIVITY_TIMEOUT = TimeUnit.MINUTES.toMillis( 5 );
@@ -74,9 +75,13 @@ public abstract class OutgoingTransferBase extends AbstractTransfer
@Override
public void run()
{
- boolean ret = connection.upload( sourceFile.getAbsolutePath() );
- synchronized ( uploads ) {
- uploads.remove( connection );
+ boolean ret = false;
+ try {
+ ret = connection.upload( sourceFile.getAbsolutePath() );
+ } finally {
+ synchronized ( uploads ) {
+ uploads.remove( connection );
+ }
}
if ( ret ) {
connectFails.set( 0 );