package org.openslx.filetransfer; import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; import java.util.HashMap; import java.util.Map; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; import net.jpountz.lz4.LZ4Factory; import org.apache.log4j.Logger; import org.openslx.util.Util; public abstract class Transfer { protected final Socket transferSocket; protected final DataOutputStream outStream; protected final DataInputStream dataFromServer; protected String ERROR = null; private boolean shouldGetToken; protected boolean useCompression = true; protected final Logger log; protected final static LZ4Factory lz4factory = LZ4Factory.fastestInstance(); /** * Actively initiated transfer. * * @param host Remote Host * @param port Remote Port * @param context SSL Context for encryption, null if plain * @param log Logger to use * @throws IOException */ protected Transfer( String host, int port, int readTimeoutMs, SSLContext context, Logger log ) throws IOException { this.log = log; // create socket. if ( context == null ) { transferSocket = new Socket(); } else { SSLSocketFactory sslSocketFactory = context.getSocketFactory(); transferSocket = sslSocketFactory.createSocket(); } transferSocket.setSoTimeout( readTimeoutMs ); transferSocket.connect( new InetSocketAddress( host, port ) ); outStream = new DataOutputStream( transferSocket.getOutputStream() ); dataFromServer = new DataInputStream( transferSocket.getInputStream() ); shouldGetToken = false; } /** * Passive transfer through incoming connection. * * @param socket already connected socket to remote peer * @param log Logger to use * @throws IOException */ protected Transfer( Socket socket, Logger log ) throws IOException { this.log = log; transferSocket = socket; outStream = new DataOutputStream( transferSocket.getOutputStream() ); dataFromServer = new DataInputStream( transferSocket.getInputStream() ); shouldGetToken = true; } protected boolean sendRange( long startOffset, long endOffset ) { try { sendKeyValuePair( "RANGE", startOffset + ":" + endOffset ); } catch ( IOException e ) { e.printStackTrace(); return false; } return true; } protected void sendUseCompression() { try { sendKeyValuePair( "COMPRESS", "true" ); } catch ( IOException e ) { e.printStackTrace(); } } /***********************************************************************/ /** * Method for sending error Code to server. For example in case of wrong * token, send code for wrong token. * */ public boolean sendErrorCode( String errString ) { try { sendKeyValuePair( "ERROR", errString ); sendEndOfMeta(); } catch ( IOException e ) { e.printStackTrace(); this.close( e.toString() ); return false; } return true; } protected boolean sendToken( String token ) { try { sendKeyValuePair( "TOKEN", token ); } catch ( IOException e ) { e.printStackTrace(); this.close( e.toString() ); return false; } return true; } public void sendDoneAndClose() { sendDone(); sendEndOfMeta(); close( "Transfer finished" ); } protected boolean sendDone() { try { sendKeyValuePair( "DONE", "" ); } catch ( IOException e ) { e.printStackTrace(); this.close( e.toString() ); return false; } return true; } protected boolean sendEndOfMeta() { try { outStream.writeShort( 0 ); outStream.flush(); } catch ( SocketTimeoutException e ) { log.error( "Error sending end of meta - socket timeout" ); return false; } catch ( IOException e ) { log.error( "Error sending end of meta - " + e.toString() ); return false; } return true; } /***********************************************************************/ /** * Method for reading MetaData, like TOKEN and FileRange. * Split incoming bytes after first '=' and store value to specific * variable. * * @return map of meta data received, null on error */ protected MetaData readMetaData() { Map entries = new HashMap<>(); try { while ( true ) { String data = dataFromServer.readUTF(); if ( data == null || data.length() == 0 ) break; // End of meta data String[] splitted = data.split( "=", 2 ); if ( splitted.length != 2 ) { log.warn( "Invalid key value pair received (" + data + ")" ); continue; } if ( splitted[0].equals( "ERROR" ) ) ERROR = splitted[1]; if ( entries.containsKey( splitted[0] ) ) { log.warn( "Received meta data key " + splitted[0] + " when already received, ignoring!" ); } else { entries.put( splitted[0], splitted[1] ); } } } catch ( SocketTimeoutException ste ) { sendErrorCode( "timeout" ); this.close( "Socket Timeout occured in readMetaData. " + ERROR ); return null; } catch ( Exception e ) { this.close( "Exception occured in readMetaData: " + e.toString() + " " + ERROR ); return null; } return new MetaData( entries ); } private void sendKeyValuePair( String key, String value ) throws IOException { if ( outStream == null ) return; try { outStream.writeUTF( key + "=" + value ); } catch ( Exception e ) { this.close( e.getClass().getSimpleName() + " when sending KVP with key " + key ); } } /***********************************************************************/ /** * Method for closing connection, if download has finished. * */ protected void close( String error, UploadStatusCallback callback, boolean sendToPeer ) { if ( error != null ) { if ( sendToPeer ) sendErrorCode( error ); if ( callback != null ) callback.uploadError( error ); } synchronized ( transferSocket ) { safeClose( dataFromServer, outStream, transferSocket ); } } protected void close( String error ) { close( error, null, false ); } public void cancel() { synchronized ( transferSocket ) { try { transferSocket.shutdownOutput(); } catch ( Exception e ) { // Silence } try { transferSocket.shutdownInput(); } catch ( Exception e ) { // Silence } } } /** * Returns whether this transfer/connection is considered valid or usable, * which means the socket is still properly connected to the remote peer. * * @return true or false */ public boolean isValid() { synchronized ( transferSocket ) { return transferSocket.isConnected() && !transferSocket.isClosed() && !transferSocket.isInputShutdown() && !transferSocket.isOutputShutdown(); } } /** * Get error string received from remote side, if any. * * @return Error string, if received, or null. */ public String getRemoteError() { return ERROR; } /** * Get transfer token, sent by remote peer that initiated connection. * Call this ONLY if all of the following conditions are met: * - this is an incoming transfer connection * - you didn't call it before * - you didn't call download or upload yet * * @return The transfer token */ public String getToken() { if ( !shouldGetToken ) { log.error( "Invalid call of getToken. You either initiated the connection yourself, or you already called getToken before." ); this.close( null ); return null; } shouldGetToken = false; MetaData meta = readMetaData(); if ( meta == null ) return null; if (meta.peerWantsCompression()) { useCompression = true; } return meta.getToken(); } /** * Should we call getToken()? Used internally for detecting wrong usage of * the transfer classes. * * @return yes or no */ protected boolean shouldGetToken() { return shouldGetToken; } /** * Close given stream, socket, anything closeable. * Never throws any exception, if it's not closeable there's * not much else we can do. * * @param list one or more closeables. Pass one, many, or an array */ static protected void safeClose( Closeable... list ) { Util.safeClose( list ); } /** * High level access to key-value-pairs. */ class MetaData { private Map meta; private MetaData( Map meta ) { this.meta = meta; } /** * Get transfer token, sent by remote peer that initiated connection. * * @return The transfer token */ public String getToken() { return meta.get( "TOKEN" ); } /** * Check if remote peer set the DONE key, telling us the transfer is complete. * * @return yes or no */ public boolean isDone() { return meta.containsKey( "DONE" ); } /** * Return range from this meta data class, or null * if it doesn't contain a (valid) range key-value-pair. * * @return The range instance */ public FileRange getRange() { if ( meta.containsKey( "RANGE" ) ) return parseRange( meta.get( "RANGE" ) ); return null; } /** * Parse range in format START:END to {@link FileRange} instance. * * @param range String representation of range * @return {@link FileRange} instance of range, or null on error */ private FileRange parseRange( String range ) { if ( range == null ) return null; String parts[] = range.split( ":" ); if ( parts.length != 2 ) return null; long start, end; try { start = Long.parseLong( parts[0] ); end = Long.parseLong( parts[1] ); } catch ( Throwable t ) { log.warn( "Not parsable range: '" + range + "'" ); return null; } if ( start >= end ) { log.warn( "Invalid range. Start >= end" ); return null; } return new FileRange( start, end ); } /** * Peer indicated that it wants to use snappy compression. */ public boolean peerWantsCompression() { return meta.containsKey( "COMPRESS" ); } } }