diff options
Diffstat (limited to 'src/main/java/org/openslx/filetransfer')
6 files changed, 470 insertions, 327 deletions
diff --git a/src/main/java/org/openslx/filetransfer/ClassTest.java b/src/main/java/org/openslx/filetransfer/ClassTest.java index 2a4d05c..1fecbcc 100644 --- a/src/main/java/org/openslx/filetransfer/ClassTest.java +++ b/src/main/java/org/openslx/filetransfer/ClassTest.java @@ -16,11 +16,10 @@ package org.openslx.filetransfer; -import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; -import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.Paths; import java.security.KeyStore; import javax.net.ssl.KeyManager; @@ -34,6 +33,8 @@ import org.slf4j.LoggerFactory; public class ClassTest { + + private static final int CHUNK_SIZE = 11111111; private static String inFile; private static String outFile; @@ -47,9 +48,9 @@ public class ClassTest public static void main( String[] args ) throws Exception { - if (args.length != 4) { - System.out.println("Need 4 argument: <keystore> <passphrase> <infile> <outfile>"); - System.exit(1); + if ( args.length != 4 ) { + System.out.println( "Need 4 argument: <keystore> <passphrase> <infile> <outfile>" ); + System.exit( 1 ); } String pathToKeyStore = args[0]; final char[] passphrase = args[1].toCharArray(); @@ -77,11 +78,33 @@ public class ClassTest context.init( null, trustManagers, null ); - Downloader d = new Downloader( "localhost", 6789, context ); - d.setOutputFilename( outFile ); - d.sendToken( "xyz" ); - while ( d.readMetaData() ) - d.receiveBinary(); + Downloader d = new Downloader( "localhost", 6789, context, "xyz" ); + boolean res = d.download( outFile, new WantRangeCallback() { + long pos = 0; + long size = -1; + + @Override + public FileRange get() + { + if ( size == -1 ) { + try { + size = Files.size( Paths.get( inFile ) ); + } catch ( IOException e ) { + return null; + } + } + if ( pos >= size ) + return null; + long end = Math.min( pos + CHUNK_SIZE, size ); + FileRange range = new FileRange( pos, end ); + pos += CHUNK_SIZE; + return range; + } + } ); + if ( res ) + System.out.println( "Active Download OK" ); + else + System.out.println( "Active Download FAILED" ); /* String pathToKeyStore = @@ -115,41 +138,29 @@ public class ClassTest */ } -// Implementing IncomingEvent for testing case. -static class Test implements IncomingEvent -{ - public void incomingUploader( Uploader uploader ) throws IOException + // Implementing IncomingEvent for testing case. + static class Test implements IncomingEvent { - RandomAccessFile file; - try { - file = new RandomAccessFile( new File( inFile ), "r" ); - } catch ( FileNotFoundException e ) { - e.printStackTrace(); - return; + public void incomingUploader( Uploader uploader ) throws IOException + { + if ( uploader.getToken() == null ) { + System.out.println( "Incoming uploader: could not get token!" ); + return; + } + if ( !uploader.upload( inFile ) ) + System.out.println( "Incoming uploader failed!" ); + else + System.out.println( "Incomgin uploader OK" ); } - long length = file.length(); - file.close(); - - int diff = 0; - for ( int i = 0; ( i + 254 ) < length; i += 254 ) { - if ( !uploader.sendRange( i, i + 254 ) || !uploader.sendFile( inFile ) ) { - System.out.println("FAIL"); + public void incomingDownloader( Downloader downloader ) throws IOException + { + if ( downloader.getToken() == null ) { + System.out.println( "Incoming downloader: could not get token!" ); return; } - diff = (int) ( length - i ); + // TODO: if (!downloader.download( destinationFile, callback )) } - - uploader.sendRange( (int) ( length - diff ), (int)length ); - uploader.sendFile( inFile ); - } - - public void incomingDownloader( Downloader downloader ) throws IOException - { - downloader.setOutputFilename( outFile ); - while ( downloader.readMetaData() ) - downloader.receiveBinary(); } -} } diff --git a/src/main/java/org/openslx/filetransfer/Downloader.java b/src/main/java/org/openslx/filetransfer/Downloader.java index 6946173..298bcfc 100644 --- a/src/main/java/org/openslx/filetransfer/Downloader.java +++ b/src/main/java/org/openslx/filetransfer/Downloader.java @@ -1,9 +1,10 @@ package org.openslx.filetransfer; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; -import java.net.SocketTimeoutException; +import java.util.Map; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocket; @@ -12,8 +13,6 @@ import org.apache.log4j.Logger; public class Downloader extends Transfer { - // Some instance variables. - private String outputFilename = null; private static final Logger log = Logger.getLogger( Downloader.class ); @@ -25,10 +24,12 @@ public class Downloader extends Transfer * @param port Port to connect to * @throws IOException */ - public Downloader( String host, int port, SSLContext context ) throws IOException + public Downloader( String host, int port, SSLContext context, String token ) throws IOException { super( host, port, context, log ); - dataToServer.writeByte( 'D' ); + outStream.writeByte( 'D' ); + if ( !sendToken( token ) || !sendEndOfMeta() ) + throw new IOException( "Sending token failed" ); } /***********************************************************************/ @@ -36,94 +37,92 @@ public class Downloader extends Transfer * Constructor used by Listener to create an incoming download connection. * * @param socket established connection to peer which requested an upload. - * @throws IOException + * @throws IOException */ protected Downloader( SSLSocket socket ) throws IOException { super( socket, log ); } - /***********************************************************************/ - /** - * Method for setting outputFilename. - * - * @param filename - */ - public void setOutputFilename( String filename ) - { - outputFilename = filename; - } - - /***********************************************************************/ - /** - * Method for getting outputFilename. - * - * @return outputFilename - */ - public String getOutputFilename() - { - return outputFilename; - } - - /***********************************************************************/ - /** - * Method to request a byte range within the file to download. This - * method is called by the party that initiated the connection. - * - * @param startOffset offset in file where to start the transfer (inclusive) - * @param endOffset end offset where to end the transfer (exclusive) - * @return success or failure - */ - public boolean requestRange( long startOffset, long endOffset ) - { - return super.sendRange( startOffset, endOffset ); - } - - /***********************************************************************/ - /** - * Method for reading Binary. Reading the current Range of incoming binary. - * - */ - public boolean receiveBinary() + public boolean download( String destinationFile, WantRangeCallback callback ) { + if ( shouldGetToken() ) { + log.error( "You didn't call getToken yet!" ); + return false; + } + FileRange requestedRange; RandomAccessFile file = null; try { - int chunkLength = getDiffOfRange(); - byte[] incoming = new byte[ 64000 ]; - int hasRead = 0; - file = new RandomAccessFile( new File( outputFilename ), "rw" ); - file.seek( getStartOfRange() ); - while ( hasRead < chunkLength ) { - int ret = dataFromServer.read( incoming, 0, Math.min( chunkLength - hasRead, incoming.length ) ); - // log.info("hasRead: " + hasRead + " length: " + length + " ret: " + ret); - if ( ret == -1 ) { - log.info( "Error occured while receiving payload." ); + try { + file = new RandomAccessFile( new File( destinationFile ), "rw" ); + } catch ( FileNotFoundException e2 ) { + log.error( "Cannot open " + destinationFile + " for writing." ); + return false; + } + while ( ( requestedRange = callback.get() ) != null ) { + if ( requestedRange.startOffset < 0 || requestedRange.startOffset >= requestedRange.endOffset ) { + log.error( "Callback supplied bad range (" + requestedRange.startOffset + " to " + requestedRange.endOffset + ")" ); return false; } - hasRead += ret; - file.write( incoming, 0, ret ); - + // Send range request + if ( !sendRange( requestedRange.startOffset, requestedRange.endOffset ) || !sendEndOfMeta() ) { + log.error( "Could not send next range request, download failed." ); + return false; + } + // See if remote peer acknowledges range request + MetaData meta = readMetaData(); + if ( meta == null ) { + log.error( "Did not receive meta data from uploading remote peer after requesting range, aborting." ); + return false; + } + FileRange remoteRange = meta.getRange(); + if ( remoteRange == null || !remoteRange.equals( requestedRange ) ) { + log.error( "Confirmed range by remote peer does not match requested range, aborting download." ); + return false; + } + // Receive requested range + int chunkLength = requestedRange.getLength(); + byte[] incoming = new byte[ 500000 ]; // 500kb + int hasRead = 0; + try { + file.seek( requestedRange.startOffset ); + } catch ( IOException e1 ) { + log.error( "Could not seek to " + requestedRange.startOffset + " in " + destinationFile + ". Disk full?" ); + return false; + } + while ( hasRead < chunkLength ) { + int ret; + try { + ret = dataFromServer.read( incoming, 0, Math.min( chunkLength - hasRead, incoming.length ) ); + } catch ( IOException e ) { + log.error( "Could not read payload from socket" ); + sendErrorCode( "payload read error" ); + return false; + } + if ( ret == -1 ) { + log.info( "Remote peer unexpectedly closed the connection." ); + return false; + } + hasRead += ret; + try { + file.write( incoming, 0, ret ); + } catch ( IOException e ) { + log.error( "Could not write to " + destinationFile + ". Disk full?" ); + return false; + } + } } - } catch ( SocketTimeoutException ste ) { - ste.printStackTrace(); - sendErrorCode( "timeout" ); - this.close( "Socket timeout occured ... close connection." ); - return false; - } catch ( Exception e ) { - e.printStackTrace(); - this.close( "Reading RANGE " + getStartOfRange() + ":" + getEndOfRange() - + " of file from socket failed..." ); - return false; + sendDone(); + sendEndOfMeta(); } finally { - if ( file != null ) { + if ( file != null ) try { file.close(); } catch ( IOException e ) { - e.printStackTrace(); } - } - RANGE = null; // Reset range for next iteration + this.close( null ); } return true; } + } diff --git a/src/main/java/org/openslx/filetransfer/FileRange.java b/src/main/java/org/openslx/filetransfer/FileRange.java new file mode 100644 index 0000000..30edefc --- /dev/null +++ b/src/main/java/org/openslx/filetransfer/FileRange.java @@ -0,0 +1,46 @@ +package org.openslx.filetransfer; + +public class FileRange +{ + + /** + * Offset of first byte of range in file, inclusive + */ + public final long startOffset; + /** + * Offset of last byte of range in file, exclusive + */ + public final long endOffset; + + /** + * Create a FileRange instance + * + * @param startOffset Offset of first byte of range in file, inclusive + * @param endOffset Offset of last byte of range in file, exclusive + */ + public FileRange( long startOffset, long endOffset ) + { + this.startOffset = startOffset; + this.endOffset = endOffset; + } + + /** + * Get length of range + * + * @return length of range, in bytes + */ + public int getLength() + { + return (int) ( endOffset - startOffset ); + } + + @Override + public boolean equals( Object other ) + { + if ( other == null || ! ( other instanceof FileRange ) ) + return false; + FileRange o = (FileRange)other; + return o.startOffset == this.startOffset && o.endOffset == this.endOffset; + } + +} diff --git a/src/main/java/org/openslx/filetransfer/Transfer.java b/src/main/java/org/openslx/filetransfer/Transfer.java index a8c9a8a..3f2fdde 100644 --- a/src/main/java/org/openslx/filetransfer/Transfer.java +++ b/src/main/java/org/openslx/filetransfer/Transfer.java @@ -3,8 +3,11 @@ package org.openslx.filetransfer; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocket; @@ -14,60 +17,61 @@ import org.apache.log4j.Logger; public abstract class Transfer { - protected final SSLSocketFactory sslSocketFactory; - protected final SSLSocket satelliteSocket; - protected final DataOutputStream dataToServer; + protected final SSLSocket transferSocket; + protected final DataOutputStream outStream; protected final DataInputStream dataFromServer; - protected String TOKEN = null; - protected long[] RANGE = null; protected String ERROR = null; + private boolean shouldGetToken; protected final Logger log; - protected Transfer( String ip, int port, SSLContext context, Logger log ) throws IOException + /** + * Actively initiated transfer. + * + * @param host Remote Host + * @param port Remote Port + * @param context SSL Context for encryption + * @param log Logger to use + * @throws IOException + */ + protected Transfer( String host, int port, SSLContext context, Logger log ) throws IOException { this.log = log; // create socket. - sslSocketFactory = context.getSocketFactory(); + SSLSocketFactory sslSocketFactory = context.getSocketFactory(); - satelliteSocket = (SSLSocket)sslSocketFactory.createSocket( ip, port ); - satelliteSocket.setSoTimeout( 2000 ); // set socket timeout. + transferSocket = (SSLSocket)sslSocketFactory.createSocket(); + transferSocket.setSoTimeout( 5000 ); // set socket timeout. + transferSocket.connect( new InetSocketAddress( host, port ) ); - dataToServer = new DataOutputStream( satelliteSocket.getOutputStream() ); - dataFromServer = new DataInputStream( satelliteSocket.getInputStream() ); + outStream = new DataOutputStream( transferSocket.getOutputStream() ); + dataFromServer = new DataInputStream( transferSocket.getInputStream() ); + shouldGetToken = false; } + /** + * Passive transfer through incoming connection. + * + * @param socket already connected socket to remote peer + * @param log Logger to use + * @throws IOException + */ protected Transfer( SSLSocket socket, Logger log ) throws IOException { this.log = log; - satelliteSocket = socket; - dataToServer = new DataOutputStream( satelliteSocket.getOutputStream() ); - dataFromServer = new DataInputStream( satelliteSocket.getInputStream() ); - sslSocketFactory = null; + transferSocket = socket; + outStream = new DataOutputStream( transferSocket.getOutputStream() ); + dataFromServer = new DataInputStream( transferSocket.getInputStream() ); + shouldGetToken = true; } protected boolean sendRange( long startOffset, long endOffset ) { - if ( RANGE != null ) { - log.warn( "Range already set!" ); - return false; - } try { + log.debug( "Sending range: " + startOffset + " to " + endOffset ); sendKeyValuePair( "RANGE", startOffset + ":" + endOffset ); - RANGE[0] = startOffset; - RANGE[1] = endOffset; - } catch ( SocketTimeoutException ste ) { - ste.printStackTrace(); - this.close( "Socket timeout occured ... close connection." ); } catch ( IOException e ) { e.printStackTrace(); - readMetaData(); - if ( ERROR != null ) { - if ( ERROR.equals( "timeout" ) ) { - this.close( "Remote Socket timeout occured ... close connection." ); - } - } - log.info( "Sending RANGE in Uploader failed..." ); return false; } return true; @@ -75,108 +79,58 @@ public abstract class Transfer /***********************************************************************/ /** - * Method for sending token for identification from satellite to master. + * Method for sending error Code to server. For example in case of wrong + * token, send code for wrong token. * - * @param token The token to send */ - public boolean sendToken( String token ) + public boolean sendErrorCode( String errString ) { - if ( TOKEN != null ) { - log.warn( "Trying to send token while a token is already set! Ignoring..." ); - return false; - } - TOKEN = token; try { - sendKeyValuePair( "TOKEN", TOKEN ); - } catch ( SocketTimeoutException ste ) { - ste.printStackTrace(); - this.close( "Socket timeout occured ... close connection." ); + sendKeyValuePair( "ERROR", errString ); } catch ( IOException e ) { e.printStackTrace(); - readMetaData(); - if ( ERROR != null ) { - if ( ERROR.equals( "timeout" ) ) { - this.close( "Remote Socket timeout occured ... close connection." ); - } - } - log.info( "Sending TOKEN in Downloader failed..." ); + this.close( e.toString() ); return false; } return true; } - /***********************************************************************/ - /** - * Method for reading incoming token for identification. - * - */ - public String getToken() + protected boolean sendToken( String token ) { - return TOKEN; - } - - private boolean parseRange( String range ) - { - if ( range == null ) - return true; - if ( RANGE != null ) { - log.warn( "Warning: RANGE already set when trying to parse from " + range ); - return false; - } - String parts[] = range.split( ":", 2 ); - long ret[] = new long[ 2 ]; try { - ret[0] = Long.parseLong( parts[0] ); - ret[1] = Long.parseLong( parts[1] ); - } catch ( Throwable t ) { - log.warn( "Not parsable range: '" + range + "'" ); - return false; - } - if ( ret[1] <= ret[0] ) { - log.warn( "Invalid range. Start >= end" ); + sendKeyValuePair( "TOKEN", token ); + } catch ( IOException e ) { + e.printStackTrace(); + this.close( e.toString() ); return false; } - RANGE = ret; return true; } - /***********************************************************************/ - /** - * Getter for beginning of RANGE. - * - * @return - */ - public long getStartOfRange() + public boolean sendDone() { - if ( RANGE != null ) { - return RANGE[0]; + try { + sendKeyValuePair( "DONE", "" ); + } catch ( IOException e ) { + e.printStackTrace(); + this.close( e.toString() ); + return false; } - return -1; + return true; } - /***********************************************************************/ - /** - * Getter for end of RANGE. - * - * @return - */ - public long getEndOfRange() + protected boolean sendEndOfMeta() { - if ( RANGE != null ) { - return RANGE[1]; + try { + outStream.writeByte( 0 ); + } catch ( SocketTimeoutException e ) { + log.error( "Error sending end of meta - socket timeout" ); + return false; + } catch ( IOException e ) { + log.error( "Error sending end of meta - " + e.toString() ); + return false; } - return -1; - } - - /***********************************************************************/ - /** - * Method for returning difference of current Range. - * - * @return - */ - public int getDiffOfRange() - { - return (int)Math.abs( getEndOfRange() - getStartOfRange() ); + return true; } /***********************************************************************/ @@ -185,10 +139,11 @@ public abstract class Transfer * Split incoming bytes after first '=' and store value to specific * variable. * - * @return true on success, false if reading failed + * @return map of meta data received, null on error */ - public boolean readMetaData() + protected MetaData readMetaData() { + Map<String, String> entries = new HashMap<>(); try { while ( true ) { byte[] incoming = new byte[ 255 ]; @@ -198,9 +153,9 @@ public abstract class Transfer retLengthByte = dataFromServer.read( incoming, 0, 1 ); // If .read() didn't return 1, it was not able to read first byte. if ( retLengthByte != 1 ) { - log.debug( " retLenthByte was not 1! retLengthByte = " + retLengthByte); + log.debug( " retLenthByte was not 1! retLengthByte = " + retLengthByte ); this.close( "Error occured while reading Metadata." ); - return false; + return null; } int length = incoming[0] & 0xFF; @@ -218,7 +173,7 @@ public abstract class Transfer int ret = dataFromServer.read( incoming, hasRead, length - hasRead ); if ( ret == -1 ) { this.close( "Error occured while reading Metadata." ); - return false; + return null; } hasRead += ret; } @@ -230,62 +185,36 @@ public abstract class Transfer log.warn( "Invalid key value pair received (" + data + ")" ); continue; } - if ( splitted[0].equals( "TOKEN" ) ) { - if ( TOKEN != null ) { - this.close( "Received a token when a token is already set!" ); - return false; - } - TOKEN = splitted[1]; - log.debug( "TOKEN: " + TOKEN ); - } - else if ( splitted[0].equals( "RANGE" ) ) { - if ( !parseRange( splitted[1] ) ) { - this.close( "Could not parse RANGE token" ); - return false; - } - log.debug( "RANGE: '" + splitted[1] + "'" ); - } - else if ( splitted[0].equals( "ERROR" ) ) { + if ( splitted[0].equals( "ERROR" ) ) ERROR = splitted[1]; - log.debug( "ERROR: " + ERROR ); + if ( entries.containsKey( splitted[0] ) ) { + log.warn( "Received meta data key " + splitted[0] + " when already received, ignoring!" ); + } else { + entries.put( splitted[0], splitted[1] ); } } } catch ( SocketTimeoutException ste ) { ste.printStackTrace(); sendErrorCode( "timeout" ); this.close( "Socket Timeout occured in readMetaData." ); - return false; + return null; } catch ( Exception e ) { e.printStackTrace(); this.close( e.toString() ); - return false; + return null; } - return true; + return new MetaData( entries ); } private void sendKeyValuePair( String key, String value ) throws IOException { byte[] data = ( key + "=" + value ).getBytes( StandardCharsets.UTF_8 ); - dataToServer.writeByte( data.length ); - dataToServer.write( data ); - } - - /***********************************************************************/ - /** - * Method for sending error Code to server. For example in case of wrong - * token, send code for wrong token. - * - */ - public Boolean sendErrorCode( String errString ) - { try { - sendKeyValuePair( "ERROR", errString ); - } catch ( IOException e ) { - e.printStackTrace(); - this.close( e.toString() ); - return false; + outStream.writeByte( data.length ); + outStream.write( data ); + } catch ( SocketTimeoutException e ) { + log.warn( "Socket timeout when sending KVP with key " + key ); } - return true; } /***********************************************************************/ @@ -298,12 +227,12 @@ public abstract class Transfer if ( error != null ) log.info( error ); try { - if ( satelliteSocket != null ) - this.satelliteSocket.close(); + if ( transferSocket != null ) + this.transferSocket.close(); if ( dataFromServer != null ) dataFromServer.close(); - if ( dataToServer != null ) - dataToServer.close(); + if ( outStream != null ) + outStream.close(); } catch ( IOException e ) { e.printStackTrace(); } @@ -317,8 +246,128 @@ public abstract class Transfer */ public boolean isValid() { - return satelliteSocket.isConnected() && !satelliteSocket.isClosed() - && !satelliteSocket.isInputShutdown() && !satelliteSocket.isOutputShutdown(); + return transferSocket.isConnected() && !transferSocket.isClosed() + && !transferSocket.isInputShutdown() && !transferSocket.isOutputShutdown(); + } + + /** + * Get error string received from remote side, if any. + * + * @return Error string, if received, or null. + */ + public String getRemoteError() + { + return ERROR; + } + + /** + * Get transfer token, sent by remote peer that initiated connection. + * Call this ONLY if all of the following conditions are met: + * - this is an incoming transfer connection + * - you didn't call it before + * - you didn't call download or upload yet + * + * @return The transfer token + */ + public String getToken() + { + if ( !shouldGetToken ) { + log.error( "Invalid call of getToken. You either initiated the connection yourself, or you already called getToken before." ); + this.close( null ); + return null; + } + shouldGetToken = false; + MetaData meta = readMetaData(); + if ( meta == null ) + return null; + return meta.getToken(); + } + + /** + * Should we call getToken()? Used internally for detecting wrong usage of + * the transfer classes. + * + * @return yes or no + */ + protected boolean shouldGetToken() + { + return shouldGetToken; + } + + /** + * High level access to key-value-pairs. + */ + class MetaData + { + + private Map<String, String> meta; + + private MetaData( Map<String, String> meta ) + { + this.meta = meta; + } + + /** + * Get transfer token, sent by remote peer that initiated connection. + * + * @return The transfer token + */ + public String getToken() + { + return meta.get( "TOKEN" ); + } + + /** + * Check if remote peer set the DONE key, telling us the transfer is complete. + * + * @return yes or no + */ + public boolean isDone() + { + return meta.containsKey( "DONE" ); + } + + /** + * Return range from this meta data class, or null + * if it doesn't contain a (valid) range key-value-pair. + * + * @return The range instance + */ + public FileRange getRange() + { + if ( meta.containsKey( "RANGE" ) ) + return parseRange( meta.get( "RANGE" ) ); + return null; + } + + /** + * Parse range in format START:END to {@link FileRange} instance. + * + * @param range String representation of range + * @return {@link FileRange} instance of range, or null on error + */ + private FileRange parseRange( String range ) + { + if ( range == null ) + return null; + String parts[] = range.split( ":" ); + if ( parts.length != 2 ) + return null; + long start, end; + try { + start = Long.parseLong( parts[0] ); + end = Long.parseLong( parts[1] ); + } catch ( Throwable t ) { + log.warn( "Not parsable range: '" + range + "'" ); + return null; + } + if ( start >= end ) { + log.warn( "Invalid range. Start >= end" ); + return null; + } + return new FileRange( start, end ); + } + } } diff --git a/src/main/java/org/openslx/filetransfer/Uploader.java b/src/main/java/org/openslx/filetransfer/Uploader.java index 3deb272..87845fa 100644 --- a/src/main/java/org/openslx/filetransfer/Uploader.java +++ b/src/main/java/org/openslx/filetransfer/Uploader.java @@ -1,9 +1,9 @@ package org.openslx.filetransfer; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; -import java.net.SocketTimeoutException; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocket; @@ -24,10 +24,12 @@ public class Uploader extends Transfer * @param context ssl context for establishing a secure connection * @throws IOException */ - public Uploader( String host, int port, SSLContext context ) throws IOException + public Uploader( String host, int port, SSLContext context, String token ) throws IOException { super( host, port, context, log ); - dataToServer.writeByte( 'U' ); + outStream.writeByte( 'U' ); + if ( !sendToken( token ) || !sendEndOfMeta() ) + throw new IOException( "Sending token failed" ); } /***********************************************************************/ @@ -44,71 +46,95 @@ public class Uploader extends Transfer /***********************************************************************/ /** - * Used by the peer that initiated the connection to tell the remote - * peer which part of the file is being uploaded - * - * @param startOffset start offset in bytes in the file (inclusive) - * @param endOffset end offset in file (exclusive) - * @return - */ - public boolean prepareSendRange( long startOffset, long endOffset ) - { - return super.sendRange( startOffset, endOffset ); - } - - /***********************************************************************/ - /** * Method for sending File with filename. * * @param filename */ - public boolean sendFile( String filename ) + public boolean upload( String filename ) { - if ( getStartOfRange() == -1 ) { - this.close( "sendFile called when no range is set" ); + if ( shouldGetToken() ) { + log.error( "You didn't call getToken yet!" ); return false; } - RandomAccessFile file = null; try { - file = new RandomAccessFile( new File( filename ), "r" ); - file.seek( getStartOfRange() ); - - byte[] data = new byte[ 64000 ]; - int hasRead = 0; - int length = getDiffOfRange(); - // System.out.println( "diff of Range: " + length ); - while ( hasRead < length ) { - int ret = file.read( data, 0, Math.min( length - hasRead, data.length ) ); - if ( ret == -1 ) { - this.close( "Error occured in Uploader.sendFile()," - + " while reading from File to send." ); + try { + file = new RandomAccessFile( new File( filename ), "r" ); + } catch ( FileNotFoundException e ) { + log.error( "Could not open " + filename + " for reading." ); + return false; + } + for ( ;; ) { // Loop as long as remote peer is requesting chunks from this file + // Read meta data of remote peer - either new range, or it's telling us it's done + MetaData meta = readMetaData(); + if ( meta == null ) { + log.error( "Did not get meta data from remote peer." ); return false; } - hasRead += ret; - dataToServer.write( data, 0, ret ); - } - } catch ( SocketTimeoutException ste ) { - ste.printStackTrace(); - sendErrorCode( "timeout" ); - this.close( "Socket timeout occured ... close connection." ); - return false; - } catch ( IOException ioe ) { - ioe.printStackTrace(); - readMetaData(); - if ( ERROR != null ) { - if ( ERROR.equals( "timeout" ) ) { - this.close( "Remote Socket timeout occured ... close connection." ); + if ( meta.isDone() ) // Download complete? + break; + // Not complete, so there must be another range request + FileRange requestedRange = meta.getRange(); + if ( requestedRange == null ) { + log.error( "Remote peer did not include RANGE in meta data." ); + sendErrorCode( "no (valid) range in request" ); return false; } + // Range inside file? + try { + if ( requestedRange.endOffset > file.length() ) { + log.error( "Requested range is larger than file size, aborting." ); + sendErrorCode( "range out of file bounds" ); + return false; + } + } catch ( IOException e ) { + log.error( "Could not get current length of file " + filename ); + return false; + } + // Seek to requested chunk + try { + file.seek( requestedRange.startOffset ); + } catch ( IOException e ) { + log.error( "Could not seek to start of requested range in " + filename + " (" + requestedRange.startOffset + ")" ); + return false; + } + // Send confirmation of range we're about to send + try { + long ptr = file.getFilePointer(); + if ( !sendRange( ptr, ptr + requestedRange.getLength() ) || !sendEndOfMeta() ) { + log.error( "Could not send range confirmation" ); + return false; + } + } catch ( IOException e ) { + log.error( "Could not determine current position in file " + filename ); + return false; + } + // Finally send requested chunk + byte[] data = new byte[ 500000 ]; // 500kb + int hasRead = 0; + int length = requestedRange.getLength(); + while ( hasRead < length ) { + int ret; + try { + ret = file.read( data, 0, Math.min( length - hasRead, data.length ) ); + } catch ( IOException e ) { + log.error( "Error reading from file " + filename ); + return false; + } + if ( ret == -1 ) { + this.close( "Error occured in Uploader.sendFile()," + + " while reading from File to send." ); + return false; + } + hasRead += ret; + try { + outStream.write( data, 0, ret ); + } catch ( IOException e ) { + log.error( "Sending payload failed" ); + return false; + } + } } - this.close( "Sending RANGE " + getStartOfRange() + ":" + getEndOfRange() + " of File " - + filename + " failed..." ); - return false; - } catch ( Exception e ) { - e.printStackTrace(); - this.close( e.toString() ); - return false; } finally { if ( file != null ) { try { diff --git a/src/main/java/org/openslx/filetransfer/WantRangeCallback.java b/src/main/java/org/openslx/filetransfer/WantRangeCallback.java new file mode 100644 index 0000000..4581d63 --- /dev/null +++ b/src/main/java/org/openslx/filetransfer/WantRangeCallback.java @@ -0,0 +1,12 @@ +package org.openslx.filetransfer; + +/** + * Callback interface - called when the downloader needs to send a + * range request to the remote peer. + */ +public interface WantRangeCallback +{ + + public FileRange get(); + +} |