summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer
diff options
context:
space:
mode:
authorSimon Rettberg2014-09-19 18:13:06 +0200
committerSimon Rettberg2014-09-19 18:13:06 +0200
commit52fa9a47498a3727d11a34205c9920f9a10e8aeb (patch)
treec99723aec9ef430132d1eeff08e43684cdfacbf3 /src/main/java/org/openslx/filetransfer
parentAdd debugging to file transfer (diff)
downloadmaster-sync-shared-52fa9a47498a3727d11a34205c9920f9a10e8aeb.tar.gz
master-sync-shared-52fa9a47498a3727d11a34205c9920f9a10e8aeb.tar.xz
master-sync-shared-52fa9a47498a3727d11a34205c9920f9a10e8aeb.zip
Rework file transfer, try to use callbacks for everything
No more juggling with sendRange() and sendData(), which was easy to use wrong, and cause lots of weird errors.
Diffstat (limited to 'src/main/java/org/openslx/filetransfer')
-rw-r--r--src/main/java/org/openslx/filetransfer/ClassTest.java91
-rw-r--r--src/main/java/org/openslx/filetransfer/Downloader.java149
-rw-r--r--src/main/java/org/openslx/filetransfer/FileRange.java46
-rw-r--r--src/main/java/org/openslx/filetransfer/Transfer.java365
-rw-r--r--src/main/java/org/openslx/filetransfer/Uploader.java134
-rw-r--r--src/main/java/org/openslx/filetransfer/WantRangeCallback.java12
6 files changed, 470 insertions, 327 deletions
diff --git a/src/main/java/org/openslx/filetransfer/ClassTest.java b/src/main/java/org/openslx/filetransfer/ClassTest.java
index 2a4d05c..1fecbcc 100644
--- a/src/main/java/org/openslx/filetransfer/ClassTest.java
+++ b/src/main/java/org/openslx/filetransfer/ClassTest.java
@@ -16,11 +16,10 @@
package org.openslx.filetransfer;
-import java.io.File;
import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.RandomAccessFile;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.security.KeyStore;
import javax.net.ssl.KeyManager;
@@ -34,6 +33,8 @@ import org.slf4j.LoggerFactory;
public class ClassTest
{
+
+ private static final int CHUNK_SIZE = 11111111;
private static String inFile;
private static String outFile;
@@ -47,9 +48,9 @@ public class ClassTest
public static void main( String[] args ) throws Exception
{
- if (args.length != 4) {
- System.out.println("Need 4 argument: <keystore> <passphrase> <infile> <outfile>");
- System.exit(1);
+ if ( args.length != 4 ) {
+ System.out.println( "Need 4 argument: <keystore> <passphrase> <infile> <outfile>" );
+ System.exit( 1 );
}
String pathToKeyStore = args[0];
final char[] passphrase = args[1].toCharArray();
@@ -77,11 +78,33 @@ public class ClassTest
context.init( null, trustManagers, null );
- Downloader d = new Downloader( "localhost", 6789, context );
- d.setOutputFilename( outFile );
- d.sendToken( "xyz" );
- while ( d.readMetaData() )
- d.receiveBinary();
+ Downloader d = new Downloader( "localhost", 6789, context, "xyz" );
+ boolean res = d.download( outFile, new WantRangeCallback() {
+ long pos = 0;
+ long size = -1;
+
+ @Override
+ public FileRange get()
+ {
+ if ( size == -1 ) {
+ try {
+ size = Files.size( Paths.get( inFile ) );
+ } catch ( IOException e ) {
+ return null;
+ }
+ }
+ if ( pos >= size )
+ return null;
+ long end = Math.min( pos + CHUNK_SIZE, size );
+ FileRange range = new FileRange( pos, end );
+ pos += CHUNK_SIZE;
+ return range;
+ }
+ } );
+ if ( res )
+ System.out.println( "Active Download OK" );
+ else
+ System.out.println( "Active Download FAILED" );
/*
String pathToKeyStore =
@@ -115,41 +138,29 @@ public class ClassTest
*/
}
-// Implementing IncomingEvent for testing case.
-static class Test implements IncomingEvent
-{
- public void incomingUploader( Uploader uploader ) throws IOException
+ // Implementing IncomingEvent for testing case.
+ static class Test implements IncomingEvent
{
- RandomAccessFile file;
- try {
- file = new RandomAccessFile( new File( inFile ), "r" );
- } catch ( FileNotFoundException e ) {
- e.printStackTrace();
- return;
+ public void incomingUploader( Uploader uploader ) throws IOException
+ {
+ if ( uploader.getToken() == null ) {
+ System.out.println( "Incoming uploader: could not get token!" );
+ return;
+ }
+ if ( !uploader.upload( inFile ) )
+ System.out.println( "Incoming uploader failed!" );
+ else
+ System.out.println( "Incomgin uploader OK" );
}
- long length = file.length();
- file.close();
-
- int diff = 0;
- for ( int i = 0; ( i + 254 ) < length; i += 254 ) {
- if ( !uploader.sendRange( i, i + 254 ) || !uploader.sendFile( inFile ) ) {
- System.out.println("FAIL");
+ public void incomingDownloader( Downloader downloader ) throws IOException
+ {
+ if ( downloader.getToken() == null ) {
+ System.out.println( "Incoming downloader: could not get token!" );
return;
}
- diff = (int) ( length - i );
+ // TODO: if (!downloader.download( destinationFile, callback ))
}
-
- uploader.sendRange( (int) ( length - diff ), (int)length );
- uploader.sendFile( inFile );
- }
-
- public void incomingDownloader( Downloader downloader ) throws IOException
- {
- downloader.setOutputFilename( outFile );
- while ( downloader.readMetaData() )
- downloader.receiveBinary();
}
-}
}
diff --git a/src/main/java/org/openslx/filetransfer/Downloader.java b/src/main/java/org/openslx/filetransfer/Downloader.java
index 6946173..298bcfc 100644
--- a/src/main/java/org/openslx/filetransfer/Downloader.java
+++ b/src/main/java/org/openslx/filetransfer/Downloader.java
@@ -1,9 +1,10 @@
package org.openslx.filetransfer;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.net.SocketTimeoutException;
+import java.util.Map;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
@@ -12,8 +13,6 @@ import org.apache.log4j.Logger;
public class Downloader extends Transfer
{
- // Some instance variables.
- private String outputFilename = null;
private static final Logger log = Logger.getLogger( Downloader.class );
@@ -25,10 +24,12 @@ public class Downloader extends Transfer
* @param port Port to connect to
* @throws IOException
*/
- public Downloader( String host, int port, SSLContext context ) throws IOException
+ public Downloader( String host, int port, SSLContext context, String token ) throws IOException
{
super( host, port, context, log );
- dataToServer.writeByte( 'D' );
+ outStream.writeByte( 'D' );
+ if ( !sendToken( token ) || !sendEndOfMeta() )
+ throw new IOException( "Sending token failed" );
}
/***********************************************************************/
@@ -36,94 +37,92 @@ public class Downloader extends Transfer
* Constructor used by Listener to create an incoming download connection.
*
* @param socket established connection to peer which requested an upload.
- * @throws IOException
+ * @throws IOException
*/
protected Downloader( SSLSocket socket ) throws IOException
{
super( socket, log );
}
- /***********************************************************************/
- /**
- * Method for setting outputFilename.
- *
- * @param filename
- */
- public void setOutputFilename( String filename )
- {
- outputFilename = filename;
- }
-
- /***********************************************************************/
- /**
- * Method for getting outputFilename.
- *
- * @return outputFilename
- */
- public String getOutputFilename()
- {
- return outputFilename;
- }
-
- /***********************************************************************/
- /**
- * Method to request a byte range within the file to download. This
- * method is called by the party that initiated the connection.
- *
- * @param startOffset offset in file where to start the transfer (inclusive)
- * @param endOffset end offset where to end the transfer (exclusive)
- * @return success or failure
- */
- public boolean requestRange( long startOffset, long endOffset )
- {
- return super.sendRange( startOffset, endOffset );
- }
-
- /***********************************************************************/
- /**
- * Method for reading Binary. Reading the current Range of incoming binary.
- *
- */
- public boolean receiveBinary()
+ public boolean download( String destinationFile, WantRangeCallback callback )
{
+ if ( shouldGetToken() ) {
+ log.error( "You didn't call getToken yet!" );
+ return false;
+ }
+ FileRange requestedRange;
RandomAccessFile file = null;
try {
- int chunkLength = getDiffOfRange();
- byte[] incoming = new byte[ 64000 ];
- int hasRead = 0;
- file = new RandomAccessFile( new File( outputFilename ), "rw" );
- file.seek( getStartOfRange() );
- while ( hasRead < chunkLength ) {
- int ret = dataFromServer.read( incoming, 0, Math.min( chunkLength - hasRead, incoming.length ) );
- // log.info("hasRead: " + hasRead + " length: " + length + " ret: " + ret);
- if ( ret == -1 ) {
- log.info( "Error occured while receiving payload." );
+ try {
+ file = new RandomAccessFile( new File( destinationFile ), "rw" );
+ } catch ( FileNotFoundException e2 ) {
+ log.error( "Cannot open " + destinationFile + " for writing." );
+ return false;
+ }
+ while ( ( requestedRange = callback.get() ) != null ) {
+ if ( requestedRange.startOffset < 0 || requestedRange.startOffset >= requestedRange.endOffset ) {
+ log.error( "Callback supplied bad range (" + requestedRange.startOffset + " to " + requestedRange.endOffset + ")" );
return false;
}
- hasRead += ret;
- file.write( incoming, 0, ret );
-
+ // Send range request
+ if ( !sendRange( requestedRange.startOffset, requestedRange.endOffset ) || !sendEndOfMeta() ) {
+ log.error( "Could not send next range request, download failed." );
+ return false;
+ }
+ // See if remote peer acknowledges range request
+ MetaData meta = readMetaData();
+ if ( meta == null ) {
+ log.error( "Did not receive meta data from uploading remote peer after requesting range, aborting." );
+ return false;
+ }
+ FileRange remoteRange = meta.getRange();
+ if ( remoteRange == null || !remoteRange.equals( requestedRange ) ) {
+ log.error( "Confirmed range by remote peer does not match requested range, aborting download." );
+ return false;
+ }
+ // Receive requested range
+ int chunkLength = requestedRange.getLength();
+ byte[] incoming = new byte[ 500000 ]; // 500kb
+ int hasRead = 0;
+ try {
+ file.seek( requestedRange.startOffset );
+ } catch ( IOException e1 ) {
+ log.error( "Could not seek to " + requestedRange.startOffset + " in " + destinationFile + ". Disk full?" );
+ return false;
+ }
+ while ( hasRead < chunkLength ) {
+ int ret;
+ try {
+ ret = dataFromServer.read( incoming, 0, Math.min( chunkLength - hasRead, incoming.length ) );
+ } catch ( IOException e ) {
+ log.error( "Could not read payload from socket" );
+ sendErrorCode( "payload read error" );
+ return false;
+ }
+ if ( ret == -1 ) {
+ log.info( "Remote peer unexpectedly closed the connection." );
+ return false;
+ }
+ hasRead += ret;
+ try {
+ file.write( incoming, 0, ret );
+ } catch ( IOException e ) {
+ log.error( "Could not write to " + destinationFile + ". Disk full?" );
+ return false;
+ }
+ }
}
- } catch ( SocketTimeoutException ste ) {
- ste.printStackTrace();
- sendErrorCode( "timeout" );
- this.close( "Socket timeout occured ... close connection." );
- return false;
- } catch ( Exception e ) {
- e.printStackTrace();
- this.close( "Reading RANGE " + getStartOfRange() + ":" + getEndOfRange()
- + " of file from socket failed..." );
- return false;
+ sendDone();
+ sendEndOfMeta();
} finally {
- if ( file != null ) {
+ if ( file != null )
try {
file.close();
} catch ( IOException e ) {
- e.printStackTrace();
}
- }
- RANGE = null; // Reset range for next iteration
+ this.close( null );
}
return true;
}
+
}
diff --git a/src/main/java/org/openslx/filetransfer/FileRange.java b/src/main/java/org/openslx/filetransfer/FileRange.java
new file mode 100644
index 0000000..30edefc
--- /dev/null
+++ b/src/main/java/org/openslx/filetransfer/FileRange.java
@@ -0,0 +1,46 @@
+package org.openslx.filetransfer;
+
+public class FileRange
+{
+
+ /**
+ * Offset of first byte of range in file, inclusive
+ */
+ public final long startOffset;
+ /**
+ * Offset of last byte of range in file, exclusive
+ */
+ public final long endOffset;
+
+ /**
+ * Create a FileRange instance
+ *
+ * @param startOffset Offset of first byte of range in file, inclusive
+ * @param endOffset Offset of last byte of range in file, exclusive
+ */
+ public FileRange( long startOffset, long endOffset )
+ {
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ }
+
+ /**
+ * Get length of range
+ *
+ * @return length of range, in bytes
+ */
+ public int getLength()
+ {
+ return (int) ( endOffset - startOffset );
+ }
+
+ @Override
+ public boolean equals( Object other )
+ {
+ if ( other == null || ! ( other instanceof FileRange ) )
+ return false;
+ FileRange o = (FileRange)other;
+ return o.startOffset == this.startOffset && o.endOffset == this.endOffset;
+ }
+
+}
diff --git a/src/main/java/org/openslx/filetransfer/Transfer.java b/src/main/java/org/openslx/filetransfer/Transfer.java
index a8c9a8a..3f2fdde 100644
--- a/src/main/java/org/openslx/filetransfer/Transfer.java
+++ b/src/main/java/org/openslx/filetransfer/Transfer.java
@@ -3,8 +3,11 @@ package org.openslx.filetransfer;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
@@ -14,60 +17,61 @@ import org.apache.log4j.Logger;
public abstract class Transfer
{
- protected final SSLSocketFactory sslSocketFactory;
- protected final SSLSocket satelliteSocket;
- protected final DataOutputStream dataToServer;
+ protected final SSLSocket transferSocket;
+ protected final DataOutputStream outStream;
protected final DataInputStream dataFromServer;
- protected String TOKEN = null;
- protected long[] RANGE = null;
protected String ERROR = null;
+ private boolean shouldGetToken;
protected final Logger log;
- protected Transfer( String ip, int port, SSLContext context, Logger log ) throws IOException
+ /**
+ * Actively initiated transfer.
+ *
+ * @param host Remote Host
+ * @param port Remote Port
+ * @param context SSL Context for encryption
+ * @param log Logger to use
+ * @throws IOException
+ */
+ protected Transfer( String host, int port, SSLContext context, Logger log ) throws IOException
{
this.log = log;
// create socket.
- sslSocketFactory = context.getSocketFactory();
+ SSLSocketFactory sslSocketFactory = context.getSocketFactory();
- satelliteSocket = (SSLSocket)sslSocketFactory.createSocket( ip, port );
- satelliteSocket.setSoTimeout( 2000 ); // set socket timeout.
+ transferSocket = (SSLSocket)sslSocketFactory.createSocket();
+ transferSocket.setSoTimeout( 5000 ); // set socket timeout.
+ transferSocket.connect( new InetSocketAddress( host, port ) );
- dataToServer = new DataOutputStream( satelliteSocket.getOutputStream() );
- dataFromServer = new DataInputStream( satelliteSocket.getInputStream() );
+ outStream = new DataOutputStream( transferSocket.getOutputStream() );
+ dataFromServer = new DataInputStream( transferSocket.getInputStream() );
+ shouldGetToken = false;
}
+ /**
+ * Passive transfer through incoming connection.
+ *
+ * @param socket already connected socket to remote peer
+ * @param log Logger to use
+ * @throws IOException
+ */
protected Transfer( SSLSocket socket, Logger log ) throws IOException
{
this.log = log;
- satelliteSocket = socket;
- dataToServer = new DataOutputStream( satelliteSocket.getOutputStream() );
- dataFromServer = new DataInputStream( satelliteSocket.getInputStream() );
- sslSocketFactory = null;
+ transferSocket = socket;
+ outStream = new DataOutputStream( transferSocket.getOutputStream() );
+ dataFromServer = new DataInputStream( transferSocket.getInputStream() );
+ shouldGetToken = true;
}
protected boolean sendRange( long startOffset, long endOffset )
{
- if ( RANGE != null ) {
- log.warn( "Range already set!" );
- return false;
- }
try {
+ log.debug( "Sending range: " + startOffset + " to " + endOffset );
sendKeyValuePair( "RANGE", startOffset + ":" + endOffset );
- RANGE[0] = startOffset;
- RANGE[1] = endOffset;
- } catch ( SocketTimeoutException ste ) {
- ste.printStackTrace();
- this.close( "Socket timeout occured ... close connection." );
} catch ( IOException e ) {
e.printStackTrace();
- readMetaData();
- if ( ERROR != null ) {
- if ( ERROR.equals( "timeout" ) ) {
- this.close( "Remote Socket timeout occured ... close connection." );
- }
- }
- log.info( "Sending RANGE in Uploader failed..." );
return false;
}
return true;
@@ -75,108 +79,58 @@ public abstract class Transfer
/***********************************************************************/
/**
- * Method for sending token for identification from satellite to master.
+ * Method for sending error Code to server. For example in case of wrong
+ * token, send code for wrong token.
*
- * @param token The token to send
*/
- public boolean sendToken( String token )
+ public boolean sendErrorCode( String errString )
{
- if ( TOKEN != null ) {
- log.warn( "Trying to send token while a token is already set! Ignoring..." );
- return false;
- }
- TOKEN = token;
try {
- sendKeyValuePair( "TOKEN", TOKEN );
- } catch ( SocketTimeoutException ste ) {
- ste.printStackTrace();
- this.close( "Socket timeout occured ... close connection." );
+ sendKeyValuePair( "ERROR", errString );
} catch ( IOException e ) {
e.printStackTrace();
- readMetaData();
- if ( ERROR != null ) {
- if ( ERROR.equals( "timeout" ) ) {
- this.close( "Remote Socket timeout occured ... close connection." );
- }
- }
- log.info( "Sending TOKEN in Downloader failed..." );
+ this.close( e.toString() );
return false;
}
return true;
}
- /***********************************************************************/
- /**
- * Method for reading incoming token for identification.
- *
- */
- public String getToken()
+ protected boolean sendToken( String token )
{
- return TOKEN;
- }
-
- private boolean parseRange( String range )
- {
- if ( range == null )
- return true;
- if ( RANGE != null ) {
- log.warn( "Warning: RANGE already set when trying to parse from " + range );
- return false;
- }
- String parts[] = range.split( ":", 2 );
- long ret[] = new long[ 2 ];
try {
- ret[0] = Long.parseLong( parts[0] );
- ret[1] = Long.parseLong( parts[1] );
- } catch ( Throwable t ) {
- log.warn( "Not parsable range: '" + range + "'" );
- return false;
- }
- if ( ret[1] <= ret[0] ) {
- log.warn( "Invalid range. Start >= end" );
+ sendKeyValuePair( "TOKEN", token );
+ } catch ( IOException e ) {
+ e.printStackTrace();
+ this.close( e.toString() );
return false;
}
- RANGE = ret;
return true;
}
- /***********************************************************************/
- /**
- * Getter for beginning of RANGE.
- *
- * @return
- */
- public long getStartOfRange()
+ public boolean sendDone()
{
- if ( RANGE != null ) {
- return RANGE[0];
+ try {
+ sendKeyValuePair( "DONE", "" );
+ } catch ( IOException e ) {
+ e.printStackTrace();
+ this.close( e.toString() );
+ return false;
}
- return -1;
+ return true;
}
- /***********************************************************************/
- /**
- * Getter for end of RANGE.
- *
- * @return
- */
- public long getEndOfRange()
+ protected boolean sendEndOfMeta()
{
- if ( RANGE != null ) {
- return RANGE[1];
+ try {
+ outStream.writeByte( 0 );
+ } catch ( SocketTimeoutException e ) {
+ log.error( "Error sending end of meta - socket timeout" );
+ return false;
+ } catch ( IOException e ) {
+ log.error( "Error sending end of meta - " + e.toString() );
+ return false;
}
- return -1;
- }
-
- /***********************************************************************/
- /**
- * Method for returning difference of current Range.
- *
- * @return
- */
- public int getDiffOfRange()
- {
- return (int)Math.abs( getEndOfRange() - getStartOfRange() );
+ return true;
}
/***********************************************************************/
@@ -185,10 +139,11 @@ public abstract class Transfer
* Split incoming bytes after first '=' and store value to specific
* variable.
*
- * @return true on success, false if reading failed
+ * @return map of meta data received, null on error
*/
- public boolean readMetaData()
+ protected MetaData readMetaData()
{
+ Map<String, String> entries = new HashMap<>();
try {
while ( true ) {
byte[] incoming = new byte[ 255 ];
@@ -198,9 +153,9 @@ public abstract class Transfer
retLengthByte = dataFromServer.read( incoming, 0, 1 );
// If .read() didn't return 1, it was not able to read first byte.
if ( retLengthByte != 1 ) {
- log.debug( " retLenthByte was not 1! retLengthByte = " + retLengthByte);
+ log.debug( " retLenthByte was not 1! retLengthByte = " + retLengthByte );
this.close( "Error occured while reading Metadata." );
- return false;
+ return null;
}
int length = incoming[0] & 0xFF;
@@ -218,7 +173,7 @@ public abstract class Transfer
int ret = dataFromServer.read( incoming, hasRead, length - hasRead );
if ( ret == -1 ) {
this.close( "Error occured while reading Metadata." );
- return false;
+ return null;
}
hasRead += ret;
}
@@ -230,62 +185,36 @@ public abstract class Transfer
log.warn( "Invalid key value pair received (" + data + ")" );
continue;
}
- if ( splitted[0].equals( "TOKEN" ) ) {
- if ( TOKEN != null ) {
- this.close( "Received a token when a token is already set!" );
- return false;
- }
- TOKEN = splitted[1];
- log.debug( "TOKEN: " + TOKEN );
- }
- else if ( splitted[0].equals( "RANGE" ) ) {
- if ( !parseRange( splitted[1] ) ) {
- this.close( "Could not parse RANGE token" );
- return false;
- }
- log.debug( "RANGE: '" + splitted[1] + "'" );
- }
- else if ( splitted[0].equals( "ERROR" ) ) {
+ if ( splitted[0].equals( "ERROR" ) )
ERROR = splitted[1];
- log.debug( "ERROR: " + ERROR );
+ if ( entries.containsKey( splitted[0] ) ) {
+ log.warn( "Received meta data key " + splitted[0] + " when already received, ignoring!" );
+ } else {
+ entries.put( splitted[0], splitted[1] );
}
}
} catch ( SocketTimeoutException ste ) {
ste.printStackTrace();
sendErrorCode( "timeout" );
this.close( "Socket Timeout occured in readMetaData." );
- return false;
+ return null;
} catch ( Exception e ) {
e.printStackTrace();
this.close( e.toString() );
- return false;
+ return null;
}
- return true;
+ return new MetaData( entries );
}
private void sendKeyValuePair( String key, String value ) throws IOException
{
byte[] data = ( key + "=" + value ).getBytes( StandardCharsets.UTF_8 );
- dataToServer.writeByte( data.length );
- dataToServer.write( data );
- }
-
- /***********************************************************************/
- /**
- * Method for sending error Code to server. For example in case of wrong
- * token, send code for wrong token.
- *
- */
- public Boolean sendErrorCode( String errString )
- {
try {
- sendKeyValuePair( "ERROR", errString );
- } catch ( IOException e ) {
- e.printStackTrace();
- this.close( e.toString() );
- return false;
+ outStream.writeByte( data.length );
+ outStream.write( data );
+ } catch ( SocketTimeoutException e ) {
+ log.warn( "Socket timeout when sending KVP with key " + key );
}
- return true;
}
/***********************************************************************/
@@ -298,12 +227,12 @@ public abstract class Transfer
if ( error != null )
log.info( error );
try {
- if ( satelliteSocket != null )
- this.satelliteSocket.close();
+ if ( transferSocket != null )
+ this.transferSocket.close();
if ( dataFromServer != null )
dataFromServer.close();
- if ( dataToServer != null )
- dataToServer.close();
+ if ( outStream != null )
+ outStream.close();
} catch ( IOException e ) {
e.printStackTrace();
}
@@ -317,8 +246,128 @@ public abstract class Transfer
*/
public boolean isValid()
{
- return satelliteSocket.isConnected() && !satelliteSocket.isClosed()
- && !satelliteSocket.isInputShutdown() && !satelliteSocket.isOutputShutdown();
+ return transferSocket.isConnected() && !transferSocket.isClosed()
+ && !transferSocket.isInputShutdown() && !transferSocket.isOutputShutdown();
+ }
+
+ /**
+ * Get error string received from remote side, if any.
+ *
+ * @return Error string, if received, or null.
+ */
+ public String getRemoteError()
+ {
+ return ERROR;
+ }
+
+ /**
+ * Get transfer token, sent by remote peer that initiated connection.
+ * Call this ONLY if all of the following conditions are met:
+ * - this is an incoming transfer connection
+ * - you didn't call it before
+ * - you didn't call download or upload yet
+ *
+ * @return The transfer token
+ */
+ public String getToken()
+ {
+ if ( !shouldGetToken ) {
+ log.error( "Invalid call of getToken. You either initiated the connection yourself, or you already called getToken before." );
+ this.close( null );
+ return null;
+ }
+ shouldGetToken = false;
+ MetaData meta = readMetaData();
+ if ( meta == null )
+ return null;
+ return meta.getToken();
+ }
+
+ /**
+ * Should we call getToken()? Used internally for detecting wrong usage of
+ * the transfer classes.
+ *
+ * @return yes or no
+ */
+ protected boolean shouldGetToken()
+ {
+ return shouldGetToken;
+ }
+
+ /**
+ * High level access to key-value-pairs.
+ */
+ class MetaData
+ {
+
+ private Map<String, String> meta;
+
+ private MetaData( Map<String, String> meta )
+ {
+ this.meta = meta;
+ }
+
+ /**
+ * Get transfer token, sent by remote peer that initiated connection.
+ *
+ * @return The transfer token
+ */
+ public String getToken()
+ {
+ return meta.get( "TOKEN" );
+ }
+
+ /**
+ * Check if remote peer set the DONE key, telling us the transfer is complete.
+ *
+ * @return yes or no
+ */
+ public boolean isDone()
+ {
+ return meta.containsKey( "DONE" );
+ }
+
+ /**
+ * Return range from this meta data class, or null
+ * if it doesn't contain a (valid) range key-value-pair.
+ *
+ * @return The range instance
+ */
+ public FileRange getRange()
+ {
+ if ( meta.containsKey( "RANGE" ) )
+ return parseRange( meta.get( "RANGE" ) );
+ return null;
+ }
+
+ /**
+ * Parse range in format START:END to {@link FileRange} instance.
+ *
+ * @param range String representation of range
+ * @return {@link FileRange} instance of range, or null on error
+ */
+ private FileRange parseRange( String range )
+ {
+ if ( range == null )
+ return null;
+ String parts[] = range.split( ":" );
+ if ( parts.length != 2 )
+ return null;
+ long start, end;
+ try {
+ start = Long.parseLong( parts[0] );
+ end = Long.parseLong( parts[1] );
+ } catch ( Throwable t ) {
+ log.warn( "Not parsable range: '" + range + "'" );
+ return null;
+ }
+ if ( start >= end ) {
+ log.warn( "Invalid range. Start >= end" );
+ return null;
+ }
+ return new FileRange( start, end );
+ }
+
}
}
diff --git a/src/main/java/org/openslx/filetransfer/Uploader.java b/src/main/java/org/openslx/filetransfer/Uploader.java
index 3deb272..87845fa 100644
--- a/src/main/java/org/openslx/filetransfer/Uploader.java
+++ b/src/main/java/org/openslx/filetransfer/Uploader.java
@@ -1,9 +1,9 @@
package org.openslx.filetransfer;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.net.SocketTimeoutException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
@@ -24,10 +24,12 @@ public class Uploader extends Transfer
* @param context ssl context for establishing a secure connection
* @throws IOException
*/
- public Uploader( String host, int port, SSLContext context ) throws IOException
+ public Uploader( String host, int port, SSLContext context, String token ) throws IOException
{
super( host, port, context, log );
- dataToServer.writeByte( 'U' );
+ outStream.writeByte( 'U' );
+ if ( !sendToken( token ) || !sendEndOfMeta() )
+ throw new IOException( "Sending token failed" );
}
/***********************************************************************/
@@ -44,71 +46,95 @@ public class Uploader extends Transfer
/***********************************************************************/
/**
- * Used by the peer that initiated the connection to tell the remote
- * peer which part of the file is being uploaded
- *
- * @param startOffset start offset in bytes in the file (inclusive)
- * @param endOffset end offset in file (exclusive)
- * @return
- */
- public boolean prepareSendRange( long startOffset, long endOffset )
- {
- return super.sendRange( startOffset, endOffset );
- }
-
- /***********************************************************************/
- /**
* Method for sending File with filename.
*
* @param filename
*/
- public boolean sendFile( String filename )
+ public boolean upload( String filename )
{
- if ( getStartOfRange() == -1 ) {
- this.close( "sendFile called when no range is set" );
+ if ( shouldGetToken() ) {
+ log.error( "You didn't call getToken yet!" );
return false;
}
-
RandomAccessFile file = null;
try {
- file = new RandomAccessFile( new File( filename ), "r" );
- file.seek( getStartOfRange() );
-
- byte[] data = new byte[ 64000 ];
- int hasRead = 0;
- int length = getDiffOfRange();
- // System.out.println( "diff of Range: " + length );
- while ( hasRead < length ) {
- int ret = file.read( data, 0, Math.min( length - hasRead, data.length ) );
- if ( ret == -1 ) {
- this.close( "Error occured in Uploader.sendFile(),"
- + " while reading from File to send." );
+ try {
+ file = new RandomAccessFile( new File( filename ), "r" );
+ } catch ( FileNotFoundException e ) {
+ log.error( "Could not open " + filename + " for reading." );
+ return false;
+ }
+ for ( ;; ) { // Loop as long as remote peer is requesting chunks from this file
+ // Read meta data of remote peer - either new range, or it's telling us it's done
+ MetaData meta = readMetaData();
+ if ( meta == null ) {
+ log.error( "Did not get meta data from remote peer." );
return false;
}
- hasRead += ret;
- dataToServer.write( data, 0, ret );
- }
- } catch ( SocketTimeoutException ste ) {
- ste.printStackTrace();
- sendErrorCode( "timeout" );
- this.close( "Socket timeout occured ... close connection." );
- return false;
- } catch ( IOException ioe ) {
- ioe.printStackTrace();
- readMetaData();
- if ( ERROR != null ) {
- if ( ERROR.equals( "timeout" ) ) {
- this.close( "Remote Socket timeout occured ... close connection." );
+ if ( meta.isDone() ) // Download complete?
+ break;
+ // Not complete, so there must be another range request
+ FileRange requestedRange = meta.getRange();
+ if ( requestedRange == null ) {
+ log.error( "Remote peer did not include RANGE in meta data." );
+ sendErrorCode( "no (valid) range in request" );
return false;
}
+ // Range inside file?
+ try {
+ if ( requestedRange.endOffset > file.length() ) {
+ log.error( "Requested range is larger than file size, aborting." );
+ sendErrorCode( "range out of file bounds" );
+ return false;
+ }
+ } catch ( IOException e ) {
+ log.error( "Could not get current length of file " + filename );
+ return false;
+ }
+ // Seek to requested chunk
+ try {
+ file.seek( requestedRange.startOffset );
+ } catch ( IOException e ) {
+ log.error( "Could not seek to start of requested range in " + filename + " (" + requestedRange.startOffset + ")" );
+ return false;
+ }
+ // Send confirmation of range we're about to send
+ try {
+ long ptr = file.getFilePointer();
+ if ( !sendRange( ptr, ptr + requestedRange.getLength() ) || !sendEndOfMeta() ) {
+ log.error( "Could not send range confirmation" );
+ return false;
+ }
+ } catch ( IOException e ) {
+ log.error( "Could not determine current position in file " + filename );
+ return false;
+ }
+ // Finally send requested chunk
+ byte[] data = new byte[ 500000 ]; // 500kb
+ int hasRead = 0;
+ int length = requestedRange.getLength();
+ while ( hasRead < length ) {
+ int ret;
+ try {
+ ret = file.read( data, 0, Math.min( length - hasRead, data.length ) );
+ } catch ( IOException e ) {
+ log.error( "Error reading from file " + filename );
+ return false;
+ }
+ if ( ret == -1 ) {
+ this.close( "Error occured in Uploader.sendFile(),"
+ + " while reading from File to send." );
+ return false;
+ }
+ hasRead += ret;
+ try {
+ outStream.write( data, 0, ret );
+ } catch ( IOException e ) {
+ log.error( "Sending payload failed" );
+ return false;
+ }
+ }
}
- this.close( "Sending RANGE " + getStartOfRange() + ":" + getEndOfRange() + " of File "
- + filename + " failed..." );
- return false;
- } catch ( Exception e ) {
- e.printStackTrace();
- this.close( e.toString() );
- return false;
} finally {
if ( file != null ) {
try {
diff --git a/src/main/java/org/openslx/filetransfer/WantRangeCallback.java b/src/main/java/org/openslx/filetransfer/WantRangeCallback.java
new file mode 100644
index 0000000..4581d63
--- /dev/null
+++ b/src/main/java/org/openslx/filetransfer/WantRangeCallback.java
@@ -0,0 +1,12 @@
+package org.openslx.filetransfer;
+
+/**
+ * Callback interface - called when the downloader needs to send a
+ * range request to the remote peer.
+ */
+public interface WantRangeCallback
+{
+
+ public FileRange get();
+
+}