diff options
author | Simon Rettberg | 2015-06-05 11:25:50 +0200 |
---|---|---|
committer | Simon Rettberg | 2015-06-05 11:25:50 +0200 |
commit | 693392fe6c0022e7ec5060192ee322c7753b0d90 (patch) | |
tree | 855245927778c6b069714909929684d1da7d449f /src/main/java/org/openslx/filetransfer | |
parent | Cleanup thrift shandling stuff (diff) | |
download | master-sync-shared-693392fe6c0022e7ec5060192ee322c7753b0d90.tar.gz master-sync-shared-693392fe6c0022e7ec5060192ee322c7753b0d90.tar.xz master-sync-shared-693392fe6c0022e7ec5060192ee322c7753b0d90.zip |
Changes for Dozmod v1.1
Diffstat (limited to 'src/main/java/org/openslx/filetransfer')
7 files changed, 219 insertions, 143 deletions
diff --git a/src/main/java/org/openslx/filetransfer/ClassTest.java b/src/main/java/org/openslx/filetransfer/ClassTest.java index 92269bb..37b37c0 100644 --- a/src/main/java/org/openslx/filetransfer/ClassTest.java +++ b/src/main/java/org/openslx/filetransfer/ClassTest.java @@ -141,7 +141,7 @@ public class ClassTest // Implementing IncomingEvent for testing case. static class Test implements IncomingEvent { - public void incomingUploader( Uploader uploader ) throws IOException + public void incomingDownloadRequest( Uploader uploader ) throws IOException { if ( uploader.getToken() == null ) { System.out.println( "Incoming uploader: could not get token!" ); @@ -153,7 +153,7 @@ public class ClassTest System.out.println( "Incomgin uploader OK" ); } - public void incomingDownloader( Downloader downloader ) throws IOException + public void incomingUploadRequest( Downloader downloader ) throws IOException { if ( downloader.getToken() == null ) { System.out.println( "Incoming downloader: could not get token!" ); diff --git a/src/main/java/org/openslx/filetransfer/Downloader.java b/src/main/java/org/openslx/filetransfer/Downloader.java index 00c6f69..20a50e6 100644 --- a/src/main/java/org/openslx/filetransfer/Downloader.java +++ b/src/main/java/org/openslx/filetransfer/Downloader.java @@ -4,9 +4,9 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; +import java.net.Socket; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocket; import org.apache.log4j.Logger; @@ -38,11 +38,20 @@ public class Downloader extends Transfer * @param socket established connection to peer which requested an upload. * @throws IOException */ - protected Downloader( SSLSocket socket ) throws IOException + protected Downloader( Socket socket ) throws IOException { super( socket, log ); } + /** + * Initiate the download. This method does not return until the file transfer finished. + * + * @param destinationFile destination file name to download to + * @param rangeCallback this object's .get() method is called whenever the downloader needs to + * know which part of the file to request next. This method should return null if no + * more parts are needed, which in turn let's this method return true + * @return true on success, false otherwise + */ public boolean download( final String destinationFile, final WantRangeCallback callback ) { RandomAccessFile file = null; @@ -69,14 +78,20 @@ public class Downloader extends Transfer }; return download( cb, callback ); } finally { - if ( file != null ) - try { - file.close(); - } catch ( IOException e ) { - } + Transfer.safeClose( file ); } } + /** + * Initiate the download. This method does not return until the file transfer finished. + * + * @param dataCallback this object's .dataReceived() method is called whenever a chunk of data is + * received + * @param rangeCallback this object's .get() method is called whenever the downloader needs to + * know which part of the file to request next. This method should return null if no + * more parts are needed, which in turn let's this method return true + * @return true on success, false otherwise + */ public boolean download( DataReceivedCallback dataCallback, WantRangeCallback rangeCallback ) { if ( shouldGetToken() ) { diff --git a/src/main/java/org/openslx/filetransfer/IncomingEvent.java b/src/main/java/org/openslx/filetransfer/IncomingEvent.java index fc5dc63..d72ecdd 100644 --- a/src/main/java/org/openslx/filetransfer/IncomingEvent.java +++ b/src/main/java/org/openslx/filetransfer/IncomingEvent.java @@ -11,7 +11,7 @@ import java.io.IOException; */ public interface IncomingEvent { - void incomingUploader( Uploader uploader ) throws IOException; + void incomingDownloadRequest( Uploader uploader ) throws IOException; - void incomingDownloader( Downloader downloader ) throws IOException; + void incomingUploadRequest( Downloader downloader ) throws IOException; } diff --git a/src/main/java/org/openslx/filetransfer/Listener.java b/src/main/java/org/openslx/filetransfer/Listener.java index 699c61d..e4e99e9 100644 --- a/src/main/java/org/openslx/filetransfer/Listener.java +++ b/src/main/java/org/openslx/filetransfer/Listener.java @@ -1,29 +1,35 @@ package org.openslx.filetransfer; import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLServerSocket; import javax.net.ssl.SSLServerSocketFactory; -import javax.net.ssl.SSLSocket; import org.apache.log4j.Logger; -public class Listener extends Thread +public class Listener { - private IncomingEvent incomingEvent; - private SSLContext context; - private int port; - final private int U = 85; // hex - code 'U' = 85. - final private int D = 68; // hex - code 'D' = 68. + private final IncomingEvent incomingEvent; + private final SSLContext context; + private final int port; + private ServerSocket listenSocket = null; + private Thread acceptThread = null; + private static final byte U = 85; // hex - code 'U' = 85. + private static final byte D = 68; // hex - code 'D' = 68. private static Logger log = Logger.getLogger( Listener.class ); /***********************************************************************/ /** - * Constructor for class Listener, which gets an instance of IncomingEvent. + * File transfer listener. This is the active side, opening a port and + * waiting for incoming connections. * - * @param e + * @param e the event handler for incoming connections + * @param context the SSL context used for encryption; if null, unencrypted connections will be + * used + * @param port port to listen on */ public Listener( IncomingEvent e, SSLContext context, int port ) { @@ -38,48 +44,74 @@ public class Listener extends Thread * connection, and start Downloader or Uploader. * */ - private void listen() + private boolean listen() { - SSLServerSocket welcomeSocket = null; try { - SSLServerSocketFactory sslServerSocketFactory = context.getServerSocketFactory(); - welcomeSocket = (SSLServerSocket)sslServerSocketFactory.createServerSocket( this.port ); + if ( this.context == null ) { + listenSocket = new ServerSocket( this.port ); + } else { + SSLServerSocketFactory sslServerSocketFactory = context.getServerSocketFactory(); + listenSocket = sslServerSocketFactory.createServerSocket( this.port ); + } + } catch ( Exception e ) { + log.error( "Cannot listen on port " + this.port ); + return false; + } + return true; + } - while ( !isInterrupted() ) { - SSLSocket connectionSocket = (SSLSocket)welcomeSocket.accept(); - connectionSocket.setSoTimeout( 2000 ); // 2 second timeout enough? Maybe even use a small thread pool for handling accepted connections + private void run() + { + final Listener instance = this; + acceptThread = new Thread() { + @Override + public void run() + { + try { + while ( !isInterrupted() ) { + Socket connectionSocket = null; + try { + connectionSocket = listenSocket.accept(); + connectionSocket.setSoTimeout( 2000 ); // 2 second timeout enough? Maybe even use a small thread pool for handling accepted connections - byte[] b = new byte[ 1 ]; - int length = connectionSocket.getInputStream().read( b ); + byte[] b = new byte[ 1 ]; + int length = connectionSocket.getInputStream().read( b ); - log.debug( "Length (Listener): " + length ); + connectionSocket.setSoTimeout( 10000 ); - if ( b[0] == U ) { - log.debug( "recognized U --> starting Downloader" ); - // --> start Downloader(socket). - Downloader d = new Downloader( connectionSocket ); - incomingEvent.incomingDownloader( d ); - } - else if ( b[0] == D ) { - log.debug( "recognized D --> starting Uploader" ); - // --> start Uploader(socket). - Uploader u = new Uploader( connectionSocket ); - incomingEvent.incomingUploader( u ); - } - else { - log.debug( "Got invalid option ... close connection" ); - connectionSocket.close(); + log.debug( "Length (Listener): " + length ); + + if ( b[0] == U ) { + log.debug( "recognized U --> starting Downloader" ); + // --> start Downloader(socket). + Downloader d = new Downloader( connectionSocket ); + incomingEvent.incomingUploadRequest( d ); + } + else if ( b[0] == D ) { + log.debug( "recognized D --> starting Uploader" ); + // --> start Uploader(socket). + Uploader u = new Uploader( connectionSocket ); + incomingEvent.incomingDownloadRequest( u ); + } + else { + log.debug( "Got invalid option ... close connection" ); + connectionSocket.close(); + } + } catch ( IOException e ) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } finally { + synchronized ( instance ) { + Transfer.safeClose( listenSocket ); + listenSocket = null; + } } } - } catch ( Exception e ) { - e.printStackTrace(); // same as writing to System.err.println(e.toString). - } finally { - try { - welcomeSocket.close(); - } catch ( IOException e ) { - // Nothing we can do - } - } + }; + acceptThread.start(); + log.info( "Starting to accept " + ( this.context == null ? "UNENCRYPTED" : "encrypted" ) + " connections on port " + this.port ); } public int getPort() @@ -87,14 +119,36 @@ public class Listener extends Thread return this.port; } - @Override - public void run() + /** + * Check whether this listener is running. + * + * @return true if this instance is currently listening for connections and runs the accept loop. + */ + public synchronized boolean isRunning() { - try { - this.listen(); - } catch ( Exception e ) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + return acceptThread != null && acceptThread.isAlive() && listenSocket != null && !listenSocket.isClosed(); + } + + /** + * Check whether this listener was started. + * + * @return true if this instance was started before, but might have been stopped already. + */ + public synchronized boolean wasStarted() + { + return acceptThread != null; + } + + /** + * Start this listener. + * + * @return true if the port could be openened and the accepting thread was started + */ + public synchronized boolean start() + { + if ( !this.listen() ) + return false; + this.run(); + return true; } } diff --git a/src/main/java/org/openslx/filetransfer/Transfer.java b/src/main/java/org/openslx/filetransfer/Transfer.java index 3f2fdde..3e278c8 100644 --- a/src/main/java/org/openslx/filetransfer/Transfer.java +++ b/src/main/java/org/openslx/filetransfer/Transfer.java @@ -1,23 +1,23 @@ package org.openslx.filetransfer; +import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.Socket; import java.net.SocketTimeoutException; -import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocketFactory; import org.apache.log4j.Logger; public abstract class Transfer { - protected final SSLSocket transferSocket; + protected final Socket transferSocket; protected final DataOutputStream outStream; protected final DataInputStream dataFromServer; protected String ERROR = null; @@ -30,7 +30,7 @@ public abstract class Transfer * * @param host Remote Host * @param port Remote Port - * @param context SSL Context for encryption + * @param context SSL Context for encryption, null if plain * @param log Logger to use * @throws IOException */ @@ -38,10 +38,13 @@ public abstract class Transfer { this.log = log; // create socket. - SSLSocketFactory sslSocketFactory = context.getSocketFactory(); - - transferSocket = (SSLSocket)sslSocketFactory.createSocket(); - transferSocket.setSoTimeout( 5000 ); // set socket timeout. + if ( context == null ) { + transferSocket = new Socket(); + } else { + SSLSocketFactory sslSocketFactory = context.getSocketFactory(); + transferSocket = sslSocketFactory.createSocket(); + } + transferSocket.setSoTimeout( 10000 ); // set socket timeout. transferSocket.connect( new InetSocketAddress( host, port ) ); outStream = new DataOutputStream( transferSocket.getOutputStream() ); @@ -56,7 +59,7 @@ public abstract class Transfer * @param log Logger to use * @throws IOException */ - protected Transfer( SSLSocket socket, Logger log ) throws IOException + protected Transfer( Socket socket, Logger log ) throws IOException { this.log = log; transferSocket = socket; @@ -122,7 +125,7 @@ public abstract class Transfer protected boolean sendEndOfMeta() { try { - outStream.writeByte( 0 ); + outStream.writeShort( 0 ); } catch ( SocketTimeoutException e ) { log.error( "Error sending end of meta - socket timeout" ); return false; @@ -146,39 +149,10 @@ public abstract class Transfer Map<String, String> entries = new HashMap<>(); try { while ( true ) { - byte[] incoming = new byte[ 255 ]; - - // First get length. - int retLengthByte; - retLengthByte = dataFromServer.read( incoming, 0, 1 ); - // If .read() didn't return 1, it was not able to read first byte. - if ( retLengthByte != 1 ) { - log.debug( " retLenthByte was not 1! retLengthByte = " + retLengthByte ); - this.close( "Error occured while reading Metadata." ); - return null; - } + String data = dataFromServer.readUTF(); - int length = incoming[0] & 0xFF; - log.debug( "length (downloader): " + length ); - - if ( length == 0 ) - break; - - /* - * Read the next available bytes and split by '=' for - * getting TOKEN or RANGE. - */ - int hasRead = 0; - while ( hasRead < length ) { - int ret = dataFromServer.read( incoming, hasRead, length - hasRead ); - if ( ret == -1 ) { - this.close( "Error occured while reading Metadata." ); - return null; - } - hasRead += ret; - } - - String data = new String( incoming, 0, length, StandardCharsets.UTF_8 ); + if ( data == null || data.length() == 0 ) + break; // End of meta data String[] splitted = data.split( "=", 2 ); if ( splitted.length != 2 ) { @@ -208,12 +182,12 @@ public abstract class Transfer private void sendKeyValuePair( String key, String value ) throws IOException { - byte[] data = ( key + "=" + value ).getBytes( StandardCharsets.UTF_8 ); + if ( outStream == null ) + return; try { - outStream.writeByte( data.length ); - outStream.write( data ); - } catch ( SocketTimeoutException e ) { - log.warn( "Socket timeout when sending KVP with key " + key ); + outStream.writeUTF( key + "=" + value ); + } catch ( Exception e ) { + this.close( e.getClass().getSimpleName() + " when sending KVP with key " + key ); } } @@ -222,20 +196,21 @@ public abstract class Transfer * Method for closing connection, if download has finished. * */ - public void close( String error ) + public void close( String error, UploadStatusCallback callback, boolean sendToPeer ) { - if ( error != null ) + if ( error != null ) { + if ( sendToPeer ) + sendErrorCode( error ); + if ( callback != null ) + callback.uploadError( error ); log.info( error ); - try { - if ( transferSocket != null ) - this.transferSocket.close(); - if ( dataFromServer != null ) - dataFromServer.close(); - if ( outStream != null ) - outStream.close(); - } catch ( IOException e ) { - e.printStackTrace(); } + safeClose( dataFromServer, outStream, transferSocket ); + } + + public void close( String error ) + { + close( error, null, false ); } /** @@ -295,6 +270,28 @@ public abstract class Transfer } /** + * Close given stream, socket, anything closeable. + * Never throws any exception, if it's not closeable there's + * not much else we can do. + * + * @param list one or more closeables. Pass one, many, or an array + */ + static protected void safeClose( Closeable... list ) + { + if ( list == null ) + return; + for ( Closeable c : list ) { + if ( c == null ) + continue; + try { + c.close(); + } catch ( Throwable t ) { + // Silcence... + } + } + } + + /** * High level access to key-value-pairs. */ class MetaData diff --git a/src/main/java/org/openslx/filetransfer/UploadStatusCallback.java b/src/main/java/org/openslx/filetransfer/UploadStatusCallback.java new file mode 100644 index 0000000..72f8f61 --- /dev/null +++ b/src/main/java/org/openslx/filetransfer/UploadStatusCallback.java @@ -0,0 +1,10 @@ +package org.openslx.filetransfer; + +public interface UploadStatusCallback +{ + + public void uploadError( String message ); + + public void uploadProgress( long bytesSent ); + +} diff --git a/src/main/java/org/openslx/filetransfer/Uploader.java b/src/main/java/org/openslx/filetransfer/Uploader.java index 87845fa..748b1e2 100644 --- a/src/main/java/org/openslx/filetransfer/Uploader.java +++ b/src/main/java/org/openslx/filetransfer/Uploader.java @@ -4,9 +4,9 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; +import java.net.Socket; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocket; import org.apache.log4j.Logger; @@ -39,7 +39,7 @@ public class Uploader extends Transfer * * @throws IOException */ - public Uploader( SSLSocket socket ) throws IOException + public Uploader( Socket socket ) throws IOException { super( socket, log ); } @@ -52,6 +52,12 @@ public class Uploader extends Transfer */ public boolean upload( String filename ) { + return upload( filename, null ); + } + + @SuppressWarnings( "resource" ) + public boolean upload( String filename, UploadStatusCallback callback ) + { if ( shouldGetToken() ) { log.error( "You didn't call getToken yet!" ); return false; @@ -61,14 +67,14 @@ public class Uploader extends Transfer try { file = new RandomAccessFile( new File( filename ), "r" ); } catch ( FileNotFoundException e ) { - log.error( "Could not open " + filename + " for reading." ); + this.close( "Could not open given file for reading.", callback, true ); return false; } for ( ;; ) { // Loop as long as remote peer is requesting chunks from this file // Read meta data of remote peer - either new range, or it's telling us it's done MetaData meta = readMetaData(); if ( meta == null ) { - log.error( "Did not get meta data from remote peer." ); + this.close( "Did not get meta data from remote peer.", callback, true ); return false; } if ( meta.isDone() ) // Download complete? @@ -76,37 +82,35 @@ public class Uploader extends Transfer // Not complete, so there must be another range request FileRange requestedRange = meta.getRange(); if ( requestedRange == null ) { - log.error( "Remote peer did not include RANGE in meta data." ); - sendErrorCode( "no (valid) range in request" ); + this.close( "Peer did not include RANGE in meta data.", callback, true ); return false; } // Range inside file? try { if ( requestedRange.endOffset > file.length() ) { - log.error( "Requested range is larger than file size, aborting." ); - sendErrorCode( "range out of file bounds" ); + this.close( "Requested range is larger than file size, aborting.", callback, true ); return false; } } catch ( IOException e ) { - log.error( "Could not get current length of file " + filename ); + this.close( "Could not get current length of file " + filename, callback, false ); return false; } // Seek to requested chunk try { file.seek( requestedRange.startOffset ); } catch ( IOException e ) { - log.error( "Could not seek to start of requested range in " + filename + " (" + requestedRange.startOffset + ")" ); + this.close( "Could not seek to start of requested range in given file (" + requestedRange.startOffset + ")", callback, true ); return false; } // Send confirmation of range we're about to send try { long ptr = file.getFilePointer(); if ( !sendRange( ptr, ptr + requestedRange.getLength() ) || !sendEndOfMeta() ) { - log.error( "Could not send range confirmation" ); + this.close( "Could not send range confirmation" ); return false; } } catch ( IOException e ) { - log.error( "Could not determine current position in file " + filename ); + this.close( "Could not determine current position in file " + filename ); return false; } // Finally send requested chunk @@ -118,30 +122,26 @@ public class Uploader extends Transfer try { ret = file.read( data, 0, Math.min( length - hasRead, data.length ) ); } catch ( IOException e ) { - log.error( "Error reading from file " + filename ); + this.close( "Error reading from file ", callback, true ); return false; } if ( ret == -1 ) { - this.close( "Error occured in Uploader.sendFile()," - + " while reading from File to send." ); + this.close( "Error occured in Uploader.sendFile() while reading from File to send.", callback, true ); return false; } hasRead += ret; try { outStream.write( data, 0, ret ); } catch ( IOException e ) { - log.error( "Sending payload failed" ); + this.close( "Sending payload failed" ); return false; } + if ( callback != null ) + callback.uploadProgress( ret ); } } } finally { - if ( file != null ) { - try { - file.close(); - } catch ( IOException e ) { - } - } + Transfer.safeClose( file ); } return true; } |