summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2014-09-19 18:13:06 +0200
committerSimon Rettberg2014-09-19 18:13:06 +0200
commit52fa9a47498a3727d11a34205c9920f9a10e8aeb (patch)
treec99723aec9ef430132d1eeff08e43684cdfacbf3
parentAdd debugging to file transfer (diff)
downloadmaster-sync-shared-52fa9a47498a3727d11a34205c9920f9a10e8aeb.tar.gz
master-sync-shared-52fa9a47498a3727d11a34205c9920f9a10e8aeb.tar.xz
master-sync-shared-52fa9a47498a3727d11a34205c9920f9a10e8aeb.zip
Rework file transfer, try to use callbacks for everything
No more juggling with sendRange() and sendData(), which was easy to use wrong, and cause lots of weird errors.
-rw-r--r--src/main/java/org/openslx/filetransfer/ClassTest.java91
-rw-r--r--src/main/java/org/openslx/filetransfer/Downloader.java149
-rw-r--r--src/main/java/org/openslx/filetransfer/FileRange.java46
-rw-r--r--src/main/java/org/openslx/filetransfer/Transfer.java365
-rw-r--r--src/main/java/org/openslx/filetransfer/Uploader.java134
-rw-r--r--src/main/java/org/openslx/filetransfer/WantRangeCallback.java12
6 files changed, 470 insertions, 327 deletions
diff --git a/src/main/java/org/openslx/filetransfer/ClassTest.java b/src/main/java/org/openslx/filetransfer/ClassTest.java
index 2a4d05c..1fecbcc 100644
--- a/src/main/java/org/openslx/filetransfer/ClassTest.java
+++ b/src/main/java/org/openslx/filetransfer/ClassTest.java
@@ -16,11 +16,10 @@
package org.openslx.filetransfer;
-import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.RandomAccessFile;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.security.KeyStore;
import javax.net.ssl.KeyManager;
@@ -34,6 +33,8 @@ import org.slf4j.LoggerFactory;
public class ClassTest
{
+
+ private static final int CHUNK_SIZE = 11111111;
private static String inFile;
private static String outFile;
@@ -47,9 +48,9 @@ public class ClassTest
public static void main( String[] args ) throws Exception
{
- if (args.length != 4) {
- System.out.println("Need 4 argument: <keystore> <passphrase> <infile> <outfile>");
- System.exit(1);
+ if ( args.length != 4 ) {
+ System.out.println( "Need 4 argument: <keystore> <passphrase> <infile> <outfile>" );
+ System.exit( 1 );
}
String pathToKeyStore = args[0];
final char[] passphrase = args[1].toCharArray();
@@ -77,11 +78,33 @@ public class ClassTest
context.init( null, trustManagers, null );
- Downloader d = new Downloader( "localhost", 6789, context );
- d.setOutputFilename( outFile );
- d.sendToken( "xyz" );
- while ( d.readMetaData() )
- d.receiveBinary();
+ Downloader d = new Downloader( "localhost", 6789, context, "xyz" );
+ boolean res = d.download( outFile, new WantRangeCallback() {
+ long pos = 0;
+ long size = -1;
+
+ @Override
+ public FileRange get()
+ {
+ if ( size == -1 ) {
+ try {
+ size = Files.size( Paths.get( inFile ) );
+ } catch ( IOException e ) {
+ return null;
+ }
+ }
+ if ( pos >= size )
+ return null;
+ long end = Math.min( pos + CHUNK_SIZE, size );
+ FileRange range = new FileRange( pos, end );
+ pos += CHUNK_SIZE;
+ return range;
+ }
+ } );
+ if ( res )
+ System.out.println( "Active Download OK" );
+ else
+ System.out.println( "Active Download FAILED" );
/*
String pathToKeyStore =
@@ -115,41 +138,29 @@ public class ClassTest
*/
}
-// Implementing IncomingEvent for testing case.
-static class Test implements IncomingEvent
-{
- public void incomingUploader( Uploader uploader ) throws IOException
+ // Implementing IncomingEvent for testing case.
+ static class Test implements IncomingEvent
{
- RandomAccessFile file;
- try {
- file = new RandomAccessFile( new File( inFile ), "r" );
- } catch ( FileNotFoundException e ) {
- e.printStackTrace();
- return;
+ public void incomingUploader( Uploader uploader ) throws IOException
+ {
+ if ( uploader.getToken() == null ) {
+ System.out.println( "Incoming uploader: could not get token!" );
+ return;
+ }
+ if ( !uploader.upload( inFile ) )
+ System.out.println( "Incoming uploader failed!" );
+ else
+ System.out.println( "Incomgin uploader OK" );
}
- long length = file.length();
- file.close();
-
- int diff = 0;
- for ( int i = 0; ( i + 254 ) < length; i += 254 ) {
- if ( !uploader.sendRange( i, i + 254 ) || !uploader.sendFile( inFile ) ) {
- System.out.println("FAIL");
+ public void incomingDownloader( Downloader downloader ) throws IOException
+ {
+ if ( downloader.getToken() == null ) {
+ System.out.println( "Incoming downloader: could not get token!" );
return;
}
- diff = (int) ( length - i );
+ // TODO: if (!downloader.download( destinationFile, callback ))
}
-
- uploader.sendRange( (int) ( length - diff ), (int)length );
- uploader.sendFile( inFile );
- }
-
- public void incomingDownloader( Downloader downloader ) throws IOException
- {
- downloader.setOutputFilename( outFile );
- while ( downloader.readMetaData() )
- downloader.receiveBinary();
}
-}
}
diff --git a/src/main/java/org/openslx/filetransfer/Downloader.java b/src/main/java/org/openslx/filetransfer/Downloader.java
index 6946173..298bcfc 100644
--- a/src/main/java/org/openslx/filetransfer/Downloader.java
+++ b/src/main/java/org/openslx/filetransfer/Downloader.java
@@ -1,9 +1,10 @@
package org.openslx.filetransfer;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.net.SocketTimeoutException;
+import java.util.Map;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
@@ -12,8 +13,6 @@ import org.apache.log4j.Logger;
public class Downloader extends Transfer
{
- // Some instance variables.
- private String outputFilename = null;
private static final Logger log = Logger.getLogger( Downloader.class );
@@ -25,10 +24,12 @@ public class Downloader extends Transfer
* @param port Port to connect to
* @throws IOException
*/
- public Downloader( String host, int port, SSLContext context ) throws IOException
+ public Downloader( String host, int port, SSLContext context, String token ) throws IOException
{
super( host, port, context, log );
- dataToServer.writeByte( 'D' );
+ outStream.writeByte( 'D' );
+ if ( !sendToken( token ) || !sendEndOfMeta() )
+ throw new IOException( "Sending token failed" );
}
/***********************************************************************/
@@ -36,94 +37,92 @@ public class Downloader extends Transfer
* Constructor used by Listener to create an incoming download connection.
*
* @param socket established connection to peer which requested an upload.
- * @throws IOException
+ * @throws IOException
*/
protected Downloader( SSLSocket socket ) throws IOException
{
super( socket, log );
}
- /***********************************************************************/
- /**
- * Method for setting outputFilename.
- *
- * @param filename
- */
- public void setOutputFilename( String filename )
- {
- outputFilename = filename;
- }
-
- /***********************************************************************/
- /**
- * Method for getting outputFilename.
- *
- * @return outputFilename
- */
- public String getOutputFilename()
- {
- return outputFilename;
- }
-
- /***********************************************************************/
- /**
- * Method to request a byte range within the file to download. This
- * method is called by the party that initiated the connection.
- *
- * @param startOffset offset in file where to start the transfer (inclusive)
- * @param endOffset end offset where to end the transfer (exclusive)
- * @return success or failure
- */
- public boolean requestRange( long startOffset, long endOffset )
- {
- return super.sendRange( startOffset, endOffset );
- }
-
- /***********************************************************************/
- /**
- * Method for reading Binary. Reading the current Range of incoming binary.
- *
- */
- public boolean receiveBinary()
+ public boolean download( String destinationFile, WantRangeCallback callback )
{
+ if ( shouldGetToken() ) {
+ log.error( "You didn't call getToken yet!" );
+ return false;
+ }
+ FileRange requestedRange;
RandomAccessFile file = null;
try {
- int chunkLength = getDiffOfRange();
- byte[] incoming = new byte[ 64000 ];
- int hasRead = 0;
- file = new RandomAccessFile( new File( outputFilename ), "rw" );
- file.seek( getStartOfRange() );
- while ( hasRead < chunkLength ) {
- int ret = dataFromServer.read( incoming, 0, Math.min( chunkLength - hasRead, incoming.length ) );
- // log.info("hasRead: " + hasRead + " length: " + length + " ret: " + ret);
- if ( ret == -1 ) {
- log.info( "Error occured while receiving payload." );
+ try {
+ file = new RandomAccessFile( new File( destinationFile ), "rw" );
+ } catch ( FileNotFoundException e2 ) {
+ log.error( "Cannot open " + destinationFile + " for writing." );
+ return false;
+ }
+ while ( ( requestedRange = callback.get() ) != null ) {
+ if ( requestedRange.startOffset < 0 || requestedRange.startOffset >= requestedRange.endOffset ) {
+ log.error( "Callback supplied bad range (" + requestedRange.startOffset + " to " + requestedRange.endOffset + ")" );
return false;
}
- hasRead += ret;
- file.write( incoming, 0, ret );
-
+ // Send range request
+ if ( !sendRange( requestedRange.startOffset, requestedRange.endOffset ) || !sendEndOfMeta() ) {
+ log.error( "Could not send next range request, download failed." );
+ return false;
+ }
+ // See if remote peer acknowledges range request
+ MetaData meta = readMetaData();
+ if ( meta == null ) {
+ log.error( "Did not receive meta data from uploading remote peer after requesting range, aborting." );
+ return false;
+ }
+ FileRange remoteRange = meta.getRange();
+ if ( remoteRange == null || !remoteRange.equals( requestedRange ) ) {
+ log.error( "Confirmed range by remote peer does not match requested range, aborting download." );
+ return false;
+ }
+ // Receive requested range
+ int chunkLength = requestedRange.getLength();
+ byte[] incoming = new byte[ 500000 ]; // 500kb
+ int hasRead = 0;
+ try {
+ file.seek( requestedRange.startOffset );
+ } catch ( IOException e1 ) {
+ log.error( "Could not seek to " + requestedRange.startOffset + " in " + destinationFile + ". Disk full?" );
+ return false;
+ }
+ while ( hasRead < chunkLength ) {
+ int ret;
+ try {
+ ret = dataFromServer.read( incoming, 0, Math.min( chunkLength - hasRead, incoming.length ) );
+ } catch ( IOException e ) {
+ log.error( "Could not read payload from socket" );
+ sendErrorCode( "payload read error" );
+ return false;
+ }
+ if ( ret == -1 ) {
+ log.info( "Remote peer unexpectedly closed the connection." );
+ return false;
+ }
+ hasRead += ret;
+ try {
+ file.write( incoming, 0, ret );
+ } catch ( IOException e ) {
+ log.error( "Could not write to " + destinationFile + ". Disk full?" );
+ return false;
+ }
+ }
}
- } catch ( SocketTimeoutException ste ) {
- ste.printStackTrace();
- sendErrorCode( "timeout" );
- this.close( "Socket timeout occured ... close connection." );
- return false;
- } catch ( Exception e ) {
- e.printStackTrace();
- this.close( "Reading RANGE " + getStartOfRange() + ":" + getEndOfRange()
- + " of file from socket failed..." );
- return false;
+ sendDone();
+ sendEndOfMeta();
} finally {
- if ( file != null ) {
+ if ( file != null )
try {
file.close();
} catch ( IOException e ) {
- e.printStackTrace();
}
- }
- RANGE = null; // Reset range for next iteration
+ this.close( null );
}
return true;
}
+
}
diff --git a/src/main/java/org/openslx/filetransfer/FileRange.java b/src/main/java/org/openslx/filetransfer/FileRange.java
new file mode 100644
index 0000000..30edefc
--- /dev/null
+++ b/src/main/java/org/openslx/filetransfer/FileRange.java
@@ -0,0 +1,46 @@
+package org.openslx.filetransfer;
+
+public class FileRange
+{
+
+ /**
+ * Offset of first byte of range in file, inclusive
+ */
+ public final long startOffset;
+ /**
+ * Offset of last byte of range in file, exclusive
+ */
+ public final long endOffset;
+
+ /**
+ * Create a FileRange instance
+ *
+ * @param startOffset Offset of first byte of range in file, inclusive
+ * @param endOffset Offset of last byte of range in file, exclusive
+ */
+ public FileRange( long startOffset, long endOffset )
+ {
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ }
+
+ /**
+ * Get length of range
+ *
+ * @return length of range, in bytes
+ */
+ public int getLength()
+ {
+ return (int) ( endOffset - startOffset );
+ }
+
+ @Override
+ public boolean equals( Object other )
+ {
+ if ( other == null || ! ( other instanceof FileRange ) )
+ return false;
+ FileRange o = (FileRange)other;
+ return o.startOffset == this.startOffset && o.endOffset == this.endOffset;
+ }
+
+}
diff --git a/src/main/java/org/openslx/filetransfer/Transfer.java b/src/main/java/org/openslx/filetransfer/Transfer.java
index a8c9a8a..3f2fdde 100644
--- a/src/main/java/org/openslx/filetransfer/Transfer.java
+++ b/src/main/java/org/openslx/filetransfer/Transfer.java
@@ -3,8 +3,11 @@ package org.openslx.filetransfer;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
@@ -14,60 +17,61 @@ import org.apache.log4j.Logger;
public abstract class Transfer
{
- protected final SSLSocketFactory sslSocketFactory;
- protected final SSLSocket satelliteSocket;
- protected final DataOutputStream dataToServer;
+ protected final SSLSocket transferSocket;
+ protected final DataOutputStream outStream;
protected final DataInputStream dataFromServer;
- protected String TOKEN = null;
- protected long[] RANGE = null;
protected String ERROR = null;
+ private boolean shouldGetToken;
protected final Logger log;
- protected Transfer( String ip, int port, SSLContext context, Logger log ) throws IOException
+ /**
+ * Actively initiated transfer.
+ *
+ * @param host Remote Host
+ * @param port Remote Port
+ * @param context SSL Context for encryption
+ * @param log Logger to use
+ * @throws IOException
+ */
+ protected Transfer( String host, int port, SSLContext context, Logger log ) throws IOException
{
this.log = log;
// create socket.
- sslSocketFactory = context.getSocketFactory();
+ SSLSocketFactory sslSocketFactory = context.getSocketFactory();
- satelliteSocket = (SSLSocket)sslSocketFactory.createSocket( ip, port );
- satelliteSocket.setSoTimeout( 2000 ); // set socket timeout.
+ transferSocket = (SSLSocket)sslSocketFactory.createSocket();
+ transferSocket.setSoTimeout( 5000 ); // set socket timeout.
+ transferSocket.connect( new InetSocketAddress( host, port ) );
- dataToServer = new DataOutputStream( satelliteSocket.getOutputStream() );
- dataFromServer = new DataInputStream( satelliteSocket.getInputStream() );
+ outStream = new DataOutputStream( transferSocket.getOutputStream() );
+ dataFromServer = new DataInputStream( transferSocket.getInputStream() );
+ shouldGetToken = false;
}
+ /**
+ * Passive transfer through incoming connection.
+ *
+ * @param socket already connected socket to remote peer
+ * @param log Logger to use
+ * @throws IOException
+ */
protected Transfer( SSLSocket socket, Logger log ) throws IOException
{
this.log = log;
- satelliteSocket = socket;
- dataToServer = new DataOutputStream( satelliteSocket.getOutputStream() );
- dataFromServer = new DataInputStream( satelliteSocket.getInputStream() );
- sslSocketFactory = null;
+ transferSocket = socket;
+ outStream = new DataOutputStream( transferSocket.getOutputStream() );
+ dataFromServer = new DataInputStream( transferSocket.getInputStream() );
+ shouldGetToken = true;
}
protected boolean sendRange( long startOffset, long endOffset )
{
- if ( RANGE != null ) {
- log.warn( "Range already set!" );
- return false;
- }
try {
+ log.debug( "Sending range: " + startOffset + " to " + endOffset );
sendKeyValuePair( "RANGE", startOffset + ":" + endOffset );
- RANGE[0] = startOffset;
- RANGE[1] = endOffset;
- } catch ( SocketTimeoutException ste ) {
- ste.printStackTrace();
- this.close( "Socket timeout occured ... close connection." );
} catch ( IOException e ) {
e.printStackTrace();
- readMetaData();
- if ( ERROR != null ) {
- if ( ERROR.equals( "timeout" ) ) {
- this.close( "Remote Socket timeout occured ... close connection." );
- }
- }
- log.info( "Sending RANGE in Uploader failed..." );
return false;
}
return true;
@@ -75,108 +79,58 @@ public abstract class Transfer
/***********************************************************************/
/**
- * Method for sending token for identification from satellite to master.
+ * Method for sending error Code to server. For example in case of wrong
+ * token, send code for wrong token.
*
- * @param token The token to send
*/
- public boolean sendToken( String token )
+ public boolean sendErrorCode( String errString )
{
- if ( TOKEN != null ) {
- log.warn( "Trying to send token while a token is already set! Ignoring..." );
- return false;
- }
- TOKEN = token;
try {
- sendKeyValuePair( "TOKEN", TOKEN );
- } catch ( SocketTimeoutException ste ) {
- ste.printStackTrace();
- this.close( "Socket timeout occured ... close connection." );
+ sendKeyValuePair( "ERROR", errString );
} catch ( IOException e ) {
e.printStackTrace();
- readMetaData();
- if ( ERROR != null ) {
- if ( ERROR.equals( "timeout" ) ) {
- this.close( "Remote Socket timeout occured ... close connection." );
- }
- }
- log.info( "Sending TOKEN in Downloader failed..." );
+ this.close( e.toString() );
return false;
}
return true;
}
- /***********************************************************************/
- /**
- * Method for reading incoming token for identification.
- *
- */
- public String getToken()
+ protected boolean sendToken( String token )
{
- return TOKEN;
- }
-
- private boolean parseRange( String range )
- {
- if ( range == null )
- return true;
- if ( RANGE != null ) {
- log.warn( "Warning: RANGE already set when trying to parse from " + range );
- return false;
- }
- String parts[] = range.split( ":", 2 );
- long ret[] = new long[ 2 ];
try {
- ret[0] = Long.parseLong( parts[0] );
- ret[1] = Long.parseLong( parts[1] );
- } catch ( Throwable t ) {
- log.warn( "Not parsable range: '" + range + "'" );
- return false;
- }
- if ( ret[1] <= ret[0] ) {
- log.warn( "Invalid range. Start >= end" );
+ sendKeyValuePair( "TOKEN", token );
+ } catch ( IOException e ) {
+ e.printStackTrace();
+ this.close( e.toString() );
return false;
}
- RANGE = ret;
return true;
}
- /***********************************************************************/
- /**
- * Getter for beginning of RANGE.
- *
- * @return
- */
- public long getStartOfRange()
+ public boolean sendDone()
{
- if ( RANGE != null ) {
- return RANGE[0];
+ try {
+ sendKeyValuePair( "DONE", "" );
+ } catch ( IOException e ) {
+ e.printStackTrace();
+ this.close( e.toString() );
+ return false;
}
- return -1;
+ return true;
}
- /***********************************************************************/
- /**
- * Getter for end of RANGE.
- *
- * @return
- */
- public long getEndOfRange()
+ protected boolean sendEndOfMeta()
{
- if ( RANGE != null ) {
- return RANGE[1];
+ try {
+ outStream.writeByte( 0 );
+ } catch ( SocketTimeoutException e ) {
+ log.error( "Error sending end of meta - socket timeout" );
+ return false;
+ } catch ( IOException e ) {
+ log.error( "Error sending end of meta - " + e.toString() );
+ return false;
}
- return -1;
- }
-
- /***********************************************************************/
- /**
- * Method for returning difference of current Range.
- *
- * @return
- */
- public int getDiffOfRange()
- {
- return (int)Math.abs( getEndOfRange() - getStartOfRange() );
+ return true;
}
/***********************************************************************/
@@ -185,10 +139,11 @@ public abstract class Transfer
* Split incoming bytes after first '=' and store value to specific
* variable.
*
- * @return true on success, false if reading failed
+ * @return map of meta data received, null on error
*/
- public boolean readMetaData()
+ protected MetaData readMetaData()
{
+ Map<String, String> entries = new HashMap<>();
try {
while ( true ) {
byte[] incoming = new byte[ 255 ];
@@ -198,9 +153,9 @@ public abstract class Transfer
retLengthByte = dataFromServer.read( incoming, 0, 1 );
// If .read() didn't return 1, it was not able to read first byte.
if ( retLengthByte != 1 ) {
- log.debug( " retLenthByte was not 1! retLengthByte = " + retLengthByte);
+ log.debug( " retLenthByte was not 1! retLengthByte = " + retLengthByte );
this.close( "Error occured while reading Metadata." );
- return false;
+ return null;
}
int length = incoming[0] & 0xFF;
@@ -218,7 +173,7 @@ public abstract class Transfer
int ret = dataFromServer.read( incoming, hasRead, length - hasRead );
if ( ret == -1 ) {
this.close( "Error occured while reading Metadata." );
- return false;
+ return null;
}
hasRead += ret;
}
@@ -230,62 +185,36 @@ public abstract class Transfer
log.warn( "Invalid key value pair received (" + data + ")" );
continue;
}
- if ( splitted[0].equals( "TOKEN" ) ) {
- if ( TOKEN != null ) {
- this.close( "Received a token when a token is already set!" );
- return false;
- }
- TOKEN = splitted[1];
- log.debug( "TOKEN: " + TOKEN );
- }
- else if ( splitted[0].equals( "RANGE" ) ) {
- if ( !parseRange( splitted[1] ) ) {
- this.close( "Could not parse RANGE token" );
- return false;
- }
- log.debug( "RANGE: '" + splitted[1] + "'" );
- }
- else if ( splitted[0].equals( "ERROR" ) ) {
+ if ( splitted[0].equals( "ERROR" ) )
ERROR = splitted[1];
- log.debug( "ERROR: " + ERROR );
+ if ( entries.containsKey( splitted[0] ) ) {
+ log.warn( "Received meta data key " + splitted[0] + " when already received, ignoring!" );
+ } else {
+ entries.put( splitted[0], splitted[1] );
}
}
} catch ( SocketTimeoutException ste ) {
ste.printStackTrace();
sendErrorCode( "timeout" );
this.close( "Socket Timeout occured in readMetaData." );
- return false;
+ return null;
} catch ( Exception e ) {
e.printStackTrace();
this.close( e.toString() );
- return false;
+ return null;
}
- return true;
+ return new MetaData( entries );
}
private void sendKeyValuePair( String key, String value ) throws IOException
{
byte[] data = ( key + "=" + value ).getBytes( StandardCharsets.UTF_8 );
- dataToServer.writeByte( data.length );
- dataToServer.write( data );
- }
-
- /***********************************************************************/
- /**
- * Method for sending error Code to server. For example in case of wrong
- * token, send code for wrong token.
- *
- */
- public Boolean sendErrorCode( String errString )
- {
try {
- sendKeyValuePair( "ERROR", errString );
- } catch ( IOException e ) {
- e.printStackTrace();
- this.close( e.toString() );
- return false;
+ outStream.writeByte( data.length );
+ outStream.write( data );
+ } catch ( SocketTimeoutException e ) {
+ log.warn( "Socket timeout when sending KVP with key " + key );
}
- return true;
}
/***********************************************************************/
@@ -298,12 +227,12 @@ public abstract class Transfer
if ( error != null )
log.info( error );
try {
- if ( satelliteSocket != null )
- this.satelliteSocket.close();
+ if ( transferSocket != null )
+ this.transferSocket.close();
if ( dataFromServer != null )
dataFromServer.close();
- if ( dataToServer != null )
- dataToServer.close();
+ if ( outStream != null )
+ outStream.close();
} catch ( IOException e ) {
e.printStackTrace();
}
@@ -317,8 +246,128 @@ public abstract class Transfer
*/
public boolean isValid()
{
- return satelliteSocket.isConnected() && !satelliteSocket.isClosed()
- && !satelliteSocket.isInputShutdown() && !satelliteSocket.isOutputShutdown();
+ return transferSocket.isConnected() && !transferSocket.isClosed()
+ && !transferSocket.isInputShutdown() && !transferSocket.isOutputShutdown();
+ }
+
+ /**
+ * Get error string received from remote side, if any.
+ *
+ * @return Error string, if received, or null.
+ */
+ public String getRemoteError()
+ {
+ return ERROR;
+ }
+
+ /**
+ * Get transfer token, sent by remote peer that initiated connection.
+ * Call this ONLY if all of the following conditions are met:
+ * - this is an incoming transfer connection
+ * - you didn't call it before
+ * - you didn't call download or upload yet
+ *
+ * @return The transfer token
+ */
+ public String getToken()
+ {
+ if ( !shouldGetToken ) {
+ log.error( "Invalid call of getToken. You either initiated the connection yourself, or you already called getToken before." );
+ this.close( null );
+ return null;
+ }
+ shouldGetToken = false;
+ MetaData meta = readMetaData();
+ if ( meta == null )
+ return null;
+ return meta.getToken();
+ }
+
+ /**
+ * Should we call getToken()? Used internally for detecting wrong usage of
+ * the transfer classes.
+ *
+ * @return yes or no
+ */
+ protected boolean shouldGetToken()
+ {
+ return shouldGetToken;
+ }
+
+ /**
+ * High level access to key-value-pairs.
+ */
+ class MetaData
+ {
+
+ private Map<String, String> meta;
+
+ private MetaData( Map<String, String> meta )
+ {
+ this.meta = meta;
+ }
+
+ /**
+ * Get transfer token, sent by remote peer that initiated connection.
+ *
+ * @return The transfer token
+ */
+ public String getToken()
+ {
+ return meta.get( "TOKEN" );
+ }
+
+ /**
+ * Check if remote peer set the DONE key, telling us the transfer is complete.
+ *
+ * @return yes or no
+ */
+ public boolean isDone()
+ {
+ return meta.containsKey( "DONE" );
+ }
+
+ /**
+ * Return range from this meta data class, or null
+ * if it doesn't contain a (valid) range key-value-pair.
+ *
+ * @return The range instance
+ */
+ public FileRange getRange()
+ {
+ if ( meta.containsKey( "RANGE" ) )
+ return parseRange( meta.get( "RANGE" ) );
+ return null;
+ }
+
+ /**
+ * Parse range in format START:END to {@link FileRange} instance.
+ *
+ * @param range String representation of range
+ * @return {@link FileRange} instance of range, or null on error
+ */
+ private FileRange parseRange( String range )
+ {
+ if ( range == null )
+ return null;
+ String parts[] = range.split( ":" );
+ if ( parts.length != 2 )
+ return null;
+ long start, end;
+ try {
+ start = Long.parseLong( parts[0] );
+ end = Long.parseLong( parts[1] );
+ } catch ( Throwable t ) {
+ log.warn( "Not parsable range: '" + range + "'" );
+ return null;
+ }
+ if ( start >= end ) {
+ log.warn( "Invalid range. Start >= end" );
+ return null;
+ }
+ return new FileRange( start, end );
+ }
+
}
}
diff --git a/src/main/java/org/openslx/filetransfer/Uploader.java b/src/main/java/org/openslx/filetransfer/Uploader.java
index 3deb272..87845fa 100644
--- a/src/main/java/org/openslx/filetransfer/Uploader.java
+++ b/src/main/java/org/openslx/filetransfer/Uploader.java
@@ -1,9 +1,9 @@
package org.openslx.filetransfer;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.net.SocketTimeoutException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
@@ -24,10 +24,12 @@ public class Uploader extends Transfer
* @param context ssl context for establishing a secure connection
* @throws IOException
*/
- public Uploader( String host, int port, SSLContext context ) throws IOException
+ public Uploader( String host, int port, SSLContext context, String token ) throws IOException
{
super( host, port, context, log );
- dataToServer.writeByte( 'U' );
+ outStream.writeByte( 'U' );
+ if ( !sendToken( token ) || !sendEndOfMeta() )
+ throw new IOException( "Sending token failed" );
}
/***********************************************************************/
@@ -44,71 +46,95 @@ public class Uploader extends Transfer
/***********************************************************************/
/**
- * Used by the peer that initiated the connection to tell the remote
- * peer which part of the file is being uploaded
- *
- * @param startOffset start offset in bytes in the file (inclusive)
- * @param endOffset end offset in file (exclusive)
- * @return
- */
- public boolean prepareSendRange( long startOffset, long endOffset )
- {
- return super.sendRange( startOffset, endOffset );
- }
-
- /***********************************************************************/
- /**
* Method for sending File with filename.
*
* @param filename
*/
- public boolean sendFile( String filename )
+ public boolean upload( String filename )
{
- if ( getStartOfRange() == -1 ) {
- this.close( "sendFile called when no range is set" );
+ if ( shouldGetToken() ) {
+ log.error( "You didn't call getToken yet!" );
return false;
}
-
RandomAccessFile file = null;
try {
- file = new RandomAccessFile( new File( filename ), "r" );
- file.seek( getStartOfRange() );
-
- byte[] data = new byte[ 64000 ];
- int hasRead = 0;
- int length = getDiffOfRange();
- // System.out.println( "diff of Range: " + length );
- while ( hasRead < length ) {
- int ret = file.read( data, 0, Math.min( length - hasRead, data.length ) );
- if ( ret == -1 ) {
- this.close( "Error occured in Uploader.sendFile(),"
- + " while reading from File to send." );
+ try {
+ file = new RandomAccessFile( new File( filename ), "r" );
+ } catch ( FileNotFoundException e ) {
+ log.error( "Could not open " + filename + " for reading." );
+ return false;
+ }
+ for ( ;; ) { // Loop as long as remote peer is requesting chunks from this file
+ // Read meta data of remote peer - either new range, or it's telling us it's done
+ MetaData meta = readMetaData();
+ if ( meta == null ) {
+ log.error( "Did not get meta data from remote peer." );
return false;
}
- hasRead += ret;
- dataToServer.write( data, 0, ret );
- }
- } catch ( SocketTimeoutException ste ) {
- ste.printStackTrace();
- sendErrorCode( "timeout" );
- this.close( "Socket timeout occured ... close connection." );
- return false;
- } catch ( IOException ioe ) {
- ioe.printStackTrace();
- readMetaData();
- if ( ERROR != null ) {
- if ( ERROR.equals( "timeout" ) ) {
- this.close( "Remote Socket timeout occured ... close connection." );
+ if ( meta.isDone() ) // Download complete?
+ break;
+ // Not complete, so there must be another range request
+ FileRange requestedRange = meta.getRange();
+ if ( requestedRange == null ) {
+ log.error( "Remote peer did not include RANGE in meta data." );
+ sendErrorCode( "no (valid) range in request" );
return false;
}
+ // Range inside file?
+ try {
+ if ( requestedRange.endOffset > file.length() ) {
+ log.error( "Requested range is larger than file size, aborting." );
+ sendErrorCode( "range out of file bounds" );
+ return false;
+ }
+ } catch ( IOException e ) {
+ log.error( "Could not get current length of file " + filename );
+ return false;
+ }
+ // Seek to requested chunk
+ try {
+ file.seek( requestedRange.startOffset );
+ } catch ( IOException e ) {
+ log.error( "Could not seek to start of requested range in " + filename + " (" + requestedRange.startOffset + ")" );
+ return false;
+ }
+ // Send confirmation of range we're about to send
+ try {
+ long ptr = file.getFilePointer();
+ if ( !sendRange( ptr, ptr + requestedRange.getLength() ) || !sendEndOfMeta() ) {
+ log.error( "Could not send range confirmation" );
+ return false;
+ }
+ } catch ( IOException e ) {
+ log.error( "Could not determine current position in file " + filename );
+ return false;
+ }
+ // Finally send requested chunk
+ byte[] data = new byte[ 500000 ]; // 500kb
+ int hasRead = 0;
+ int length = requestedRange.getLength();
+ while ( hasRead < length ) {
+ int ret;
+ try {
+ ret = file.read( data, 0, Math.min( length - hasRead, data.length ) );
+ } catch ( IOException e ) {
+ log.error( "Error reading from file " + filename );
+ return false;
+ }
+ if ( ret == -1 ) {
+ this.close( "Error occured in Uploader.sendFile(),"
+ + " while reading from File to send." );
+ return false;
+ }
+ hasRead += ret;
+ try {
+ outStream.write( data, 0, ret );
+ } catch ( IOException e ) {
+ log.error( "Sending payload failed" );
+ return false;
+ }
+ }
}
- this.close( "Sending RANGE " + getStartOfRange() + ":" + getEndOfRange() + " of File "
- + filename + " failed..." );
- return false;
- } catch ( Exception e ) {
- e.printStackTrace();
- this.close( e.toString() );
- return false;
} finally {
if ( file != null ) {
try {
diff --git a/src/main/java/org/openslx/filetransfer/WantRangeCallback.java b/src/main/java/org/openslx/filetransfer/WantRangeCallback.java
new file mode 100644
index 0000000..4581d63
--- /dev/null
+++ b/src/main/java/org/openslx/filetransfer/WantRangeCallback.java
@@ -0,0 +1,12 @@
+package org.openslx.filetransfer;
+
+/**
+ * Callback interface - called when the downloader needs to send a
+ * range request to the remote peer.
+ */
+public interface WantRangeCallback
+{
+
+ public FileRange get();
+
+}