From 52fa9a47498a3727d11a34205c9920f9a10e8aeb Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Fri, 19 Sep 2014 18:13:06 +0200 Subject: Rework file transfer, try to use callbacks for everything No more juggling with sendRange() and sendData(), which was easy to use wrong, and cause lots of weird errors. --- .../java/org/openslx/filetransfer/Transfer.java | 365 ++++++++++++--------- 1 file changed, 207 insertions(+), 158 deletions(-) (limited to 'src/main/java/org/openslx/filetransfer/Transfer.java') diff --git a/src/main/java/org/openslx/filetransfer/Transfer.java b/src/main/java/org/openslx/filetransfer/Transfer.java index a8c9a8a..3f2fdde 100644 --- a/src/main/java/org/openslx/filetransfer/Transfer.java +++ b/src/main/java/org/openslx/filetransfer/Transfer.java @@ -3,8 +3,11 @@ package org.openslx.filetransfer; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocket; @@ -14,60 +17,61 @@ import org.apache.log4j.Logger; public abstract class Transfer { - protected final SSLSocketFactory sslSocketFactory; - protected final SSLSocket satelliteSocket; - protected final DataOutputStream dataToServer; + protected final SSLSocket transferSocket; + protected final DataOutputStream outStream; protected final DataInputStream dataFromServer; - protected String TOKEN = null; - protected long[] RANGE = null; protected String ERROR = null; + private boolean shouldGetToken; protected final Logger log; - protected Transfer( String ip, int port, SSLContext context, Logger log ) throws IOException + /** + * Actively initiated transfer. + * + * @param host Remote Host + * @param port Remote Port + * @param context SSL Context for encryption + * @param log Logger to use + * @throws IOException + */ + protected Transfer( String host, int port, SSLContext context, Logger log ) throws IOException { this.log = log; // create socket. - sslSocketFactory = context.getSocketFactory(); + SSLSocketFactory sslSocketFactory = context.getSocketFactory(); - satelliteSocket = (SSLSocket)sslSocketFactory.createSocket( ip, port ); - satelliteSocket.setSoTimeout( 2000 ); // set socket timeout. + transferSocket = (SSLSocket)sslSocketFactory.createSocket(); + transferSocket.setSoTimeout( 5000 ); // set socket timeout. + transferSocket.connect( new InetSocketAddress( host, port ) ); - dataToServer = new DataOutputStream( satelliteSocket.getOutputStream() ); - dataFromServer = new DataInputStream( satelliteSocket.getInputStream() ); + outStream = new DataOutputStream( transferSocket.getOutputStream() ); + dataFromServer = new DataInputStream( transferSocket.getInputStream() ); + shouldGetToken = false; } + /** + * Passive transfer through incoming connection. + * + * @param socket already connected socket to remote peer + * @param log Logger to use + * @throws IOException + */ protected Transfer( SSLSocket socket, Logger log ) throws IOException { this.log = log; - satelliteSocket = socket; - dataToServer = new DataOutputStream( satelliteSocket.getOutputStream() ); - dataFromServer = new DataInputStream( satelliteSocket.getInputStream() ); - sslSocketFactory = null; + transferSocket = socket; + outStream = new DataOutputStream( transferSocket.getOutputStream() ); + dataFromServer = new DataInputStream( transferSocket.getInputStream() ); + shouldGetToken = true; } protected boolean sendRange( long startOffset, long endOffset ) { - if ( RANGE != null ) { - log.warn( "Range already set!" ); - return false; - } try { + log.debug( "Sending range: " + startOffset + " to " + endOffset ); sendKeyValuePair( "RANGE", startOffset + ":" + endOffset ); - RANGE[0] = startOffset; - RANGE[1] = endOffset; - } catch ( SocketTimeoutException ste ) { - ste.printStackTrace(); - this.close( "Socket timeout occured ... close connection." ); } catch ( IOException e ) { e.printStackTrace(); - readMetaData(); - if ( ERROR != null ) { - if ( ERROR.equals( "timeout" ) ) { - this.close( "Remote Socket timeout occured ... close connection." ); - } - } - log.info( "Sending RANGE in Uploader failed..." ); return false; } return true; @@ -75,108 +79,58 @@ public abstract class Transfer /***********************************************************************/ /** - * Method for sending token for identification from satellite to master. + * Method for sending error Code to server. For example in case of wrong + * token, send code for wrong token. * - * @param token The token to send */ - public boolean sendToken( String token ) + public boolean sendErrorCode( String errString ) { - if ( TOKEN != null ) { - log.warn( "Trying to send token while a token is already set! Ignoring..." ); - return false; - } - TOKEN = token; try { - sendKeyValuePair( "TOKEN", TOKEN ); - } catch ( SocketTimeoutException ste ) { - ste.printStackTrace(); - this.close( "Socket timeout occured ... close connection." ); + sendKeyValuePair( "ERROR", errString ); } catch ( IOException e ) { e.printStackTrace(); - readMetaData(); - if ( ERROR != null ) { - if ( ERROR.equals( "timeout" ) ) { - this.close( "Remote Socket timeout occured ... close connection." ); - } - } - log.info( "Sending TOKEN in Downloader failed..." ); + this.close( e.toString() ); return false; } return true; } - /***********************************************************************/ - /** - * Method for reading incoming token for identification. - * - */ - public String getToken() + protected boolean sendToken( String token ) { - return TOKEN; - } - - private boolean parseRange( String range ) - { - if ( range == null ) - return true; - if ( RANGE != null ) { - log.warn( "Warning: RANGE already set when trying to parse from " + range ); - return false; - } - String parts[] = range.split( ":", 2 ); - long ret[] = new long[ 2 ]; try { - ret[0] = Long.parseLong( parts[0] ); - ret[1] = Long.parseLong( parts[1] ); - } catch ( Throwable t ) { - log.warn( "Not parsable range: '" + range + "'" ); - return false; - } - if ( ret[1] <= ret[0] ) { - log.warn( "Invalid range. Start >= end" ); + sendKeyValuePair( "TOKEN", token ); + } catch ( IOException e ) { + e.printStackTrace(); + this.close( e.toString() ); return false; } - RANGE = ret; return true; } - /***********************************************************************/ - /** - * Getter for beginning of RANGE. - * - * @return - */ - public long getStartOfRange() + public boolean sendDone() { - if ( RANGE != null ) { - return RANGE[0]; + try { + sendKeyValuePair( "DONE", "" ); + } catch ( IOException e ) { + e.printStackTrace(); + this.close( e.toString() ); + return false; } - return -1; + return true; } - /***********************************************************************/ - /** - * Getter for end of RANGE. - * - * @return - */ - public long getEndOfRange() + protected boolean sendEndOfMeta() { - if ( RANGE != null ) { - return RANGE[1]; + try { + outStream.writeByte( 0 ); + } catch ( SocketTimeoutException e ) { + log.error( "Error sending end of meta - socket timeout" ); + return false; + } catch ( IOException e ) { + log.error( "Error sending end of meta - " + e.toString() ); + return false; } - return -1; - } - - /***********************************************************************/ - /** - * Method for returning difference of current Range. - * - * @return - */ - public int getDiffOfRange() - { - return (int)Math.abs( getEndOfRange() - getStartOfRange() ); + return true; } /***********************************************************************/ @@ -185,10 +139,11 @@ public abstract class Transfer * Split incoming bytes after first '=' and store value to specific * variable. * - * @return true on success, false if reading failed + * @return map of meta data received, null on error */ - public boolean readMetaData() + protected MetaData readMetaData() { + Map entries = new HashMap<>(); try { while ( true ) { byte[] incoming = new byte[ 255 ]; @@ -198,9 +153,9 @@ public abstract class Transfer retLengthByte = dataFromServer.read( incoming, 0, 1 ); // If .read() didn't return 1, it was not able to read first byte. if ( retLengthByte != 1 ) { - log.debug( " retLenthByte was not 1! retLengthByte = " + retLengthByte); + log.debug( " retLenthByte was not 1! retLengthByte = " + retLengthByte ); this.close( "Error occured while reading Metadata." ); - return false; + return null; } int length = incoming[0] & 0xFF; @@ -218,7 +173,7 @@ public abstract class Transfer int ret = dataFromServer.read( incoming, hasRead, length - hasRead ); if ( ret == -1 ) { this.close( "Error occured while reading Metadata." ); - return false; + return null; } hasRead += ret; } @@ -230,62 +185,36 @@ public abstract class Transfer log.warn( "Invalid key value pair received (" + data + ")" ); continue; } - if ( splitted[0].equals( "TOKEN" ) ) { - if ( TOKEN != null ) { - this.close( "Received a token when a token is already set!" ); - return false; - } - TOKEN = splitted[1]; - log.debug( "TOKEN: " + TOKEN ); - } - else if ( splitted[0].equals( "RANGE" ) ) { - if ( !parseRange( splitted[1] ) ) { - this.close( "Could not parse RANGE token" ); - return false; - } - log.debug( "RANGE: '" + splitted[1] + "'" ); - } - else if ( splitted[0].equals( "ERROR" ) ) { + if ( splitted[0].equals( "ERROR" ) ) ERROR = splitted[1]; - log.debug( "ERROR: " + ERROR ); + if ( entries.containsKey( splitted[0] ) ) { + log.warn( "Received meta data key " + splitted[0] + " when already received, ignoring!" ); + } else { + entries.put( splitted[0], splitted[1] ); } } } catch ( SocketTimeoutException ste ) { ste.printStackTrace(); sendErrorCode( "timeout" ); this.close( "Socket Timeout occured in readMetaData." ); - return false; + return null; } catch ( Exception e ) { e.printStackTrace(); this.close( e.toString() ); - return false; + return null; } - return true; + return new MetaData( entries ); } private void sendKeyValuePair( String key, String value ) throws IOException { byte[] data = ( key + "=" + value ).getBytes( StandardCharsets.UTF_8 ); - dataToServer.writeByte( data.length ); - dataToServer.write( data ); - } - - /***********************************************************************/ - /** - * Method for sending error Code to server. For example in case of wrong - * token, send code for wrong token. - * - */ - public Boolean sendErrorCode( String errString ) - { try { - sendKeyValuePair( "ERROR", errString ); - } catch ( IOException e ) { - e.printStackTrace(); - this.close( e.toString() ); - return false; + outStream.writeByte( data.length ); + outStream.write( data ); + } catch ( SocketTimeoutException e ) { + log.warn( "Socket timeout when sending KVP with key " + key ); } - return true; } /***********************************************************************/ @@ -298,12 +227,12 @@ public abstract class Transfer if ( error != null ) log.info( error ); try { - if ( satelliteSocket != null ) - this.satelliteSocket.close(); + if ( transferSocket != null ) + this.transferSocket.close(); if ( dataFromServer != null ) dataFromServer.close(); - if ( dataToServer != null ) - dataToServer.close(); + if ( outStream != null ) + outStream.close(); } catch ( IOException e ) { e.printStackTrace(); } @@ -317,8 +246,128 @@ public abstract class Transfer */ public boolean isValid() { - return satelliteSocket.isConnected() && !satelliteSocket.isClosed() - && !satelliteSocket.isInputShutdown() && !satelliteSocket.isOutputShutdown(); + return transferSocket.isConnected() && !transferSocket.isClosed() + && !transferSocket.isInputShutdown() && !transferSocket.isOutputShutdown(); + } + + /** + * Get error string received from remote side, if any. + * + * @return Error string, if received, or null. + */ + public String getRemoteError() + { + return ERROR; + } + + /** + * Get transfer token, sent by remote peer that initiated connection. + * Call this ONLY if all of the following conditions are met: + * - this is an incoming transfer connection + * - you didn't call it before + * - you didn't call download or upload yet + * + * @return The transfer token + */ + public String getToken() + { + if ( !shouldGetToken ) { + log.error( "Invalid call of getToken. You either initiated the connection yourself, or you already called getToken before." ); + this.close( null ); + return null; + } + shouldGetToken = false; + MetaData meta = readMetaData(); + if ( meta == null ) + return null; + return meta.getToken(); + } + + /** + * Should we call getToken()? Used internally for detecting wrong usage of + * the transfer classes. + * + * @return yes or no + */ + protected boolean shouldGetToken() + { + return shouldGetToken; + } + + /** + * High level access to key-value-pairs. + */ + class MetaData + { + + private Map meta; + + private MetaData( Map meta ) + { + this.meta = meta; + } + + /** + * Get transfer token, sent by remote peer that initiated connection. + * + * @return The transfer token + */ + public String getToken() + { + return meta.get( "TOKEN" ); + } + + /** + * Check if remote peer set the DONE key, telling us the transfer is complete. + * + * @return yes or no + */ + public boolean isDone() + { + return meta.containsKey( "DONE" ); + } + + /** + * Return range from this meta data class, or null + * if it doesn't contain a (valid) range key-value-pair. + * + * @return The range instance + */ + public FileRange getRange() + { + if ( meta.containsKey( "RANGE" ) ) + return parseRange( meta.get( "RANGE" ) ); + return null; + } + + /** + * Parse range in format START:END to {@link FileRange} instance. + * + * @param range String representation of range + * @return {@link FileRange} instance of range, or null on error + */ + private FileRange parseRange( String range ) + { + if ( range == null ) + return null; + String parts[] = range.split( ":" ); + if ( parts.length != 2 ) + return null; + long start, end; + try { + start = Long.parseLong( parts[0] ); + end = Long.parseLong( parts[1] ); + } catch ( Throwable t ) { + log.warn( "Not parsable range: '" + range + "'" ); + return null; + } + if ( start >= end ) { + log.warn( "Invalid range. Start >= end" ); + return null; + } + return new FileRange( start, end ); + } + } } -- cgit v1.2.3-55-g7522