summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer
diff options
context:
space:
mode:
authorSimon Rettberg2015-06-05 11:25:50 +0200
committerSimon Rettberg2015-06-05 11:25:50 +0200
commit693392fe6c0022e7ec5060192ee322c7753b0d90 (patch)
tree855245927778c6b069714909929684d1da7d449f /src/main/java/org/openslx/filetransfer
parentCleanup thrift shandling stuff (diff)
downloadmaster-sync-shared-693392fe6c0022e7ec5060192ee322c7753b0d90.tar.gz
master-sync-shared-693392fe6c0022e7ec5060192ee322c7753b0d90.tar.xz
master-sync-shared-693392fe6c0022e7ec5060192ee322c7753b0d90.zip
Changes for Dozmod v1.1
Diffstat (limited to 'src/main/java/org/openslx/filetransfer')
-rw-r--r--src/main/java/org/openslx/filetransfer/ClassTest.java4
-rw-r--r--src/main/java/org/openslx/filetransfer/Downloader.java29
-rw-r--r--src/main/java/org/openslx/filetransfer/IncomingEvent.java4
-rw-r--r--src/main/java/org/openslx/filetransfer/Listener.java158
-rw-r--r--src/main/java/org/openslx/filetransfer/Transfer.java113
-rw-r--r--src/main/java/org/openslx/filetransfer/UploadStatusCallback.java10
-rw-r--r--src/main/java/org/openslx/filetransfer/Uploader.java44
7 files changed, 219 insertions, 143 deletions
diff --git a/src/main/java/org/openslx/filetransfer/ClassTest.java b/src/main/java/org/openslx/filetransfer/ClassTest.java
index 92269bb..37b37c0 100644
--- a/src/main/java/org/openslx/filetransfer/ClassTest.java
+++ b/src/main/java/org/openslx/filetransfer/ClassTest.java
@@ -141,7 +141,7 @@ public class ClassTest
// Implementing IncomingEvent for testing case.
static class Test implements IncomingEvent
{
- public void incomingUploader( Uploader uploader ) throws IOException
+ public void incomingDownloadRequest( Uploader uploader ) throws IOException
{
if ( uploader.getToken() == null ) {
System.out.println( "Incoming uploader: could not get token!" );
@@ -153,7 +153,7 @@ public class ClassTest
System.out.println( "Incomgin uploader OK" );
}
- public void incomingDownloader( Downloader downloader ) throws IOException
+ public void incomingUploadRequest( Downloader downloader ) throws IOException
{
if ( downloader.getToken() == null ) {
System.out.println( "Incoming downloader: could not get token!" );
diff --git a/src/main/java/org/openslx/filetransfer/Downloader.java b/src/main/java/org/openslx/filetransfer/Downloader.java
index 00c6f69..20a50e6 100644
--- a/src/main/java/org/openslx/filetransfer/Downloader.java
+++ b/src/main/java/org/openslx/filetransfer/Downloader.java
@@ -4,9 +4,9 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.net.Socket;
import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocket;
import org.apache.log4j.Logger;
@@ -38,11 +38,20 @@ public class Downloader extends Transfer
* @param socket established connection to peer which requested an upload.
* @throws IOException
*/
- protected Downloader( SSLSocket socket ) throws IOException
+ protected Downloader( Socket socket ) throws IOException
{
super( socket, log );
}
+ /**
+ * Initiate the download. This method does not return until the file transfer finished.
+ *
+ * @param destinationFile destination file name to download to
+ * @param rangeCallback this object's .get() method is called whenever the downloader needs to
+ * know which part of the file to request next. This method should return null if no
+ * more parts are needed, which in turn let's this method return true
+ * @return true on success, false otherwise
+ */
public boolean download( final String destinationFile, final WantRangeCallback callback )
{
RandomAccessFile file = null;
@@ -69,14 +78,20 @@ public class Downloader extends Transfer
};
return download( cb, callback );
} finally {
- if ( file != null )
- try {
- file.close();
- } catch ( IOException e ) {
- }
+ Transfer.safeClose( file );
}
}
+ /**
+ * Initiate the download. This method does not return until the file transfer finished.
+ *
+ * @param dataCallback this object's .dataReceived() method is called whenever a chunk of data is
+ * received
+ * @param rangeCallback this object's .get() method is called whenever the downloader needs to
+ * know which part of the file to request next. This method should return null if no
+ * more parts are needed, which in turn let's this method return true
+ * @return true on success, false otherwise
+ */
public boolean download( DataReceivedCallback dataCallback, WantRangeCallback rangeCallback )
{
if ( shouldGetToken() ) {
diff --git a/src/main/java/org/openslx/filetransfer/IncomingEvent.java b/src/main/java/org/openslx/filetransfer/IncomingEvent.java
index fc5dc63..d72ecdd 100644
--- a/src/main/java/org/openslx/filetransfer/IncomingEvent.java
+++ b/src/main/java/org/openslx/filetransfer/IncomingEvent.java
@@ -11,7 +11,7 @@ import java.io.IOException;
*/
public interface IncomingEvent
{
- void incomingUploader( Uploader uploader ) throws IOException;
+ void incomingDownloadRequest( Uploader uploader ) throws IOException;
- void incomingDownloader( Downloader downloader ) throws IOException;
+ void incomingUploadRequest( Downloader downloader ) throws IOException;
}
diff --git a/src/main/java/org/openslx/filetransfer/Listener.java b/src/main/java/org/openslx/filetransfer/Listener.java
index 699c61d..e4e99e9 100644
--- a/src/main/java/org/openslx/filetransfer/Listener.java
+++ b/src/main/java/org/openslx/filetransfer/Listener.java
@@ -1,29 +1,35 @@
package org.openslx.filetransfer;
import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
import javax.net.ssl.SSLServerSocketFactory;
-import javax.net.ssl.SSLSocket;
import org.apache.log4j.Logger;
-public class Listener extends Thread
+public class Listener
{
- private IncomingEvent incomingEvent;
- private SSLContext context;
- private int port;
- final private int U = 85; // hex - code 'U' = 85.
- final private int D = 68; // hex - code 'D' = 68.
+ private final IncomingEvent incomingEvent;
+ private final SSLContext context;
+ private final int port;
+ private ServerSocket listenSocket = null;
+ private Thread acceptThread = null;
+ private static final byte U = 85; // hex - code 'U' = 85.
+ private static final byte D = 68; // hex - code 'D' = 68.
private static Logger log = Logger.getLogger( Listener.class );
/***********************************************************************/
/**
- * Constructor for class Listener, which gets an instance of IncomingEvent.
+ * File transfer listener. This is the active side, opening a port and
+ * waiting for incoming connections.
*
- * @param e
+ * @param e the event handler for incoming connections
+ * @param context the SSL context used for encryption; if null, unencrypted connections will be
+ * used
+ * @param port port to listen on
*/
public Listener( IncomingEvent e, SSLContext context, int port )
{
@@ -38,48 +44,74 @@ public class Listener extends Thread
* connection, and start Downloader or Uploader.
*
*/
- private void listen()
+ private boolean listen()
{
- SSLServerSocket welcomeSocket = null;
try {
- SSLServerSocketFactory sslServerSocketFactory = context.getServerSocketFactory();
- welcomeSocket = (SSLServerSocket)sslServerSocketFactory.createServerSocket( this.port );
+ if ( this.context == null ) {
+ listenSocket = new ServerSocket( this.port );
+ } else {
+ SSLServerSocketFactory sslServerSocketFactory = context.getServerSocketFactory();
+ listenSocket = sslServerSocketFactory.createServerSocket( this.port );
+ }
+ } catch ( Exception e ) {
+ log.error( "Cannot listen on port " + this.port );
+ return false;
+ }
+ return true;
+ }
- while ( !isInterrupted() ) {
- SSLSocket connectionSocket = (SSLSocket)welcomeSocket.accept();
- connectionSocket.setSoTimeout( 2000 ); // 2 second timeout enough? Maybe even use a small thread pool for handling accepted connections
+ private void run()
+ {
+ final Listener instance = this;
+ acceptThread = new Thread() {
+ @Override
+ public void run()
+ {
+ try {
+ while ( !isInterrupted() ) {
+ Socket connectionSocket = null;
+ try {
+ connectionSocket = listenSocket.accept();
+ connectionSocket.setSoTimeout( 2000 ); // 2 second timeout enough? Maybe even use a small thread pool for handling accepted connections
- byte[] b = new byte[ 1 ];
- int length = connectionSocket.getInputStream().read( b );
+ byte[] b = new byte[ 1 ];
+ int length = connectionSocket.getInputStream().read( b );
- log.debug( "Length (Listener): " + length );
+ connectionSocket.setSoTimeout( 10000 );
- if ( b[0] == U ) {
- log.debug( "recognized U --> starting Downloader" );
- // --> start Downloader(socket).
- Downloader d = new Downloader( connectionSocket );
- incomingEvent.incomingDownloader( d );
- }
- else if ( b[0] == D ) {
- log.debug( "recognized D --> starting Uploader" );
- // --> start Uploader(socket).
- Uploader u = new Uploader( connectionSocket );
- incomingEvent.incomingUploader( u );
- }
- else {
- log.debug( "Got invalid option ... close connection" );
- connectionSocket.close();
+ log.debug( "Length (Listener): " + length );
+
+ if ( b[0] == U ) {
+ log.debug( "recognized U --> starting Downloader" );
+ // --> start Downloader(socket).
+ Downloader d = new Downloader( connectionSocket );
+ incomingEvent.incomingUploadRequest( d );
+ }
+ else if ( b[0] == D ) {
+ log.debug( "recognized D --> starting Uploader" );
+ // --> start Uploader(socket).
+ Uploader u = new Uploader( connectionSocket );
+ incomingEvent.incomingDownloadRequest( u );
+ }
+ else {
+ log.debug( "Got invalid option ... close connection" );
+ connectionSocket.close();
+ }
+ } catch ( IOException e ) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ } finally {
+ synchronized ( instance ) {
+ Transfer.safeClose( listenSocket );
+ listenSocket = null;
+ }
}
}
- } catch ( Exception e ) {
- e.printStackTrace(); // same as writing to System.err.println(e.toString).
- } finally {
- try {
- welcomeSocket.close();
- } catch ( IOException e ) {
- // Nothing we can do
- }
- }
+ };
+ acceptThread.start();
+ log.info( "Starting to accept " + ( this.context == null ? "UNENCRYPTED" : "encrypted" ) + " connections on port " + this.port );
}
public int getPort()
@@ -87,14 +119,36 @@ public class Listener extends Thread
return this.port;
}
- @Override
- public void run()
+ /**
+ * Check whether this listener is running.
+ *
+ * @return true if this instance is currently listening for connections and runs the accept loop.
+ */
+ public synchronized boolean isRunning()
{
- try {
- this.listen();
- } catch ( Exception e ) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ return acceptThread != null && acceptThread.isAlive() && listenSocket != null && !listenSocket.isClosed();
+ }
+
+ /**
+ * Check whether this listener was started.
+ *
+ * @return true if this instance was started before, but might have been stopped already.
+ */
+ public synchronized boolean wasStarted()
+ {
+ return acceptThread != null;
+ }
+
+ /**
+ * Start this listener.
+ *
+ * @return true if the port could be openened and the accepting thread was started
+ */
+ public synchronized boolean start()
+ {
+ if ( !this.listen() )
+ return false;
+ this.run();
+ return true;
}
}
diff --git a/src/main/java/org/openslx/filetransfer/Transfer.java b/src/main/java/org/openslx/filetransfer/Transfer.java
index 3f2fdde..3e278c8 100644
--- a/src/main/java/org/openslx/filetransfer/Transfer.java
+++ b/src/main/java/org/openslx/filetransfer/Transfer.java
@@ -1,23 +1,23 @@
package org.openslx.filetransfer;
+import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.Socket;
import java.net.SocketTimeoutException;
-import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import org.apache.log4j.Logger;
public abstract class Transfer
{
- protected final SSLSocket transferSocket;
+ protected final Socket transferSocket;
protected final DataOutputStream outStream;
protected final DataInputStream dataFromServer;
protected String ERROR = null;
@@ -30,7 +30,7 @@ public abstract class Transfer
*
* @param host Remote Host
* @param port Remote Port
- * @param context SSL Context for encryption
+ * @param context SSL Context for encryption, null if plain
* @param log Logger to use
* @throws IOException
*/
@@ -38,10 +38,13 @@ public abstract class Transfer
{
this.log = log;
// create socket.
- SSLSocketFactory sslSocketFactory = context.getSocketFactory();
-
- transferSocket = (SSLSocket)sslSocketFactory.createSocket();
- transferSocket.setSoTimeout( 5000 ); // set socket timeout.
+ if ( context == null ) {
+ transferSocket = new Socket();
+ } else {
+ SSLSocketFactory sslSocketFactory = context.getSocketFactory();
+ transferSocket = sslSocketFactory.createSocket();
+ }
+ transferSocket.setSoTimeout( 10000 ); // set socket timeout.
transferSocket.connect( new InetSocketAddress( host, port ) );
outStream = new DataOutputStream( transferSocket.getOutputStream() );
@@ -56,7 +59,7 @@ public abstract class Transfer
* @param log Logger to use
* @throws IOException
*/
- protected Transfer( SSLSocket socket, Logger log ) throws IOException
+ protected Transfer( Socket socket, Logger log ) throws IOException
{
this.log = log;
transferSocket = socket;
@@ -122,7 +125,7 @@ public abstract class Transfer
protected boolean sendEndOfMeta()
{
try {
- outStream.writeByte( 0 );
+ outStream.writeShort( 0 );
} catch ( SocketTimeoutException e ) {
log.error( "Error sending end of meta - socket timeout" );
return false;
@@ -146,39 +149,10 @@ public abstract class Transfer
Map<String, String> entries = new HashMap<>();
try {
while ( true ) {
- byte[] incoming = new byte[ 255 ];
-
- // First get length.
- int retLengthByte;
- retLengthByte = dataFromServer.read( incoming, 0, 1 );
- // If .read() didn't return 1, it was not able to read first byte.
- if ( retLengthByte != 1 ) {
- log.debug( " retLenthByte was not 1! retLengthByte = " + retLengthByte );
- this.close( "Error occured while reading Metadata." );
- return null;
- }
+ String data = dataFromServer.readUTF();
- int length = incoming[0] & 0xFF;
- log.debug( "length (downloader): " + length );
-
- if ( length == 0 )
- break;
-
- /*
- * Read the next available bytes and split by '=' for
- * getting TOKEN or RANGE.
- */
- int hasRead = 0;
- while ( hasRead < length ) {
- int ret = dataFromServer.read( incoming, hasRead, length - hasRead );
- if ( ret == -1 ) {
- this.close( "Error occured while reading Metadata." );
- return null;
- }
- hasRead += ret;
- }
-
- String data = new String( incoming, 0, length, StandardCharsets.UTF_8 );
+ if ( data == null || data.length() == 0 )
+ break; // End of meta data
String[] splitted = data.split( "=", 2 );
if ( splitted.length != 2 ) {
@@ -208,12 +182,12 @@ public abstract class Transfer
private void sendKeyValuePair( String key, String value ) throws IOException
{
- byte[] data = ( key + "=" + value ).getBytes( StandardCharsets.UTF_8 );
+ if ( outStream == null )
+ return;
try {
- outStream.writeByte( data.length );
- outStream.write( data );
- } catch ( SocketTimeoutException e ) {
- log.warn( "Socket timeout when sending KVP with key " + key );
+ outStream.writeUTF( key + "=" + value );
+ } catch ( Exception e ) {
+ this.close( e.getClass().getSimpleName() + " when sending KVP with key " + key );
}
}
@@ -222,20 +196,21 @@ public abstract class Transfer
* Method for closing connection, if download has finished.
*
*/
- public void close( String error )
+ public void close( String error, UploadStatusCallback callback, boolean sendToPeer )
{
- if ( error != null )
+ if ( error != null ) {
+ if ( sendToPeer )
+ sendErrorCode( error );
+ if ( callback != null )
+ callback.uploadError( error );
log.info( error );
- try {
- if ( transferSocket != null )
- this.transferSocket.close();
- if ( dataFromServer != null )
- dataFromServer.close();
- if ( outStream != null )
- outStream.close();
- } catch ( IOException e ) {
- e.printStackTrace();
}
+ safeClose( dataFromServer, outStream, transferSocket );
+ }
+
+ public void close( String error )
+ {
+ close( error, null, false );
}
/**
@@ -295,6 +270,28 @@ public abstract class Transfer
}
/**
+ * Close given stream, socket, anything closeable.
+ * Never throws any exception, if it's not closeable there's
+ * not much else we can do.
+ *
+ * @param list one or more closeables. Pass one, many, or an array
+ */
+ static protected void safeClose( Closeable... list )
+ {
+ if ( list == null )
+ return;
+ for ( Closeable c : list ) {
+ if ( c == null )
+ continue;
+ try {
+ c.close();
+ } catch ( Throwable t ) {
+ // Silcence...
+ }
+ }
+ }
+
+ /**
* High level access to key-value-pairs.
*/
class MetaData
diff --git a/src/main/java/org/openslx/filetransfer/UploadStatusCallback.java b/src/main/java/org/openslx/filetransfer/UploadStatusCallback.java
new file mode 100644
index 0000000..72f8f61
--- /dev/null
+++ b/src/main/java/org/openslx/filetransfer/UploadStatusCallback.java
@@ -0,0 +1,10 @@
+package org.openslx.filetransfer;
+
+public interface UploadStatusCallback
+{
+
+ public void uploadError( String message );
+
+ public void uploadProgress( long bytesSent );
+
+}
diff --git a/src/main/java/org/openslx/filetransfer/Uploader.java b/src/main/java/org/openslx/filetransfer/Uploader.java
index 87845fa..748b1e2 100644
--- a/src/main/java/org/openslx/filetransfer/Uploader.java
+++ b/src/main/java/org/openslx/filetransfer/Uploader.java
@@ -4,9 +4,9 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.net.Socket;
import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocket;
import org.apache.log4j.Logger;
@@ -39,7 +39,7 @@ public class Uploader extends Transfer
*
* @throws IOException
*/
- public Uploader( SSLSocket socket ) throws IOException
+ public Uploader( Socket socket ) throws IOException
{
super( socket, log );
}
@@ -52,6 +52,12 @@ public class Uploader extends Transfer
*/
public boolean upload( String filename )
{
+ return upload( filename, null );
+ }
+
+ @SuppressWarnings( "resource" )
+ public boolean upload( String filename, UploadStatusCallback callback )
+ {
if ( shouldGetToken() ) {
log.error( "You didn't call getToken yet!" );
return false;
@@ -61,14 +67,14 @@ public class Uploader extends Transfer
try {
file = new RandomAccessFile( new File( filename ), "r" );
} catch ( FileNotFoundException e ) {
- log.error( "Could not open " + filename + " for reading." );
+ this.close( "Could not open given file for reading.", callback, true );
return false;
}
for ( ;; ) { // Loop as long as remote peer is requesting chunks from this file
// Read meta data of remote peer - either new range, or it's telling us it's done
MetaData meta = readMetaData();
if ( meta == null ) {
- log.error( "Did not get meta data from remote peer." );
+ this.close( "Did not get meta data from remote peer.", callback, true );
return false;
}
if ( meta.isDone() ) // Download complete?
@@ -76,37 +82,35 @@ public class Uploader extends Transfer
// Not complete, so there must be another range request
FileRange requestedRange = meta.getRange();
if ( requestedRange == null ) {
- log.error( "Remote peer did not include RANGE in meta data." );
- sendErrorCode( "no (valid) range in request" );
+ this.close( "Peer did not include RANGE in meta data.", callback, true );
return false;
}
// Range inside file?
try {
if ( requestedRange.endOffset > file.length() ) {
- log.error( "Requested range is larger than file size, aborting." );
- sendErrorCode( "range out of file bounds" );
+ this.close( "Requested range is larger than file size, aborting.", callback, true );
return false;
}
} catch ( IOException e ) {
- log.error( "Could not get current length of file " + filename );
+ this.close( "Could not get current length of file " + filename, callback, false );
return false;
}
// Seek to requested chunk
try {
file.seek( requestedRange.startOffset );
} catch ( IOException e ) {
- log.error( "Could not seek to start of requested range in " + filename + " (" + requestedRange.startOffset + ")" );
+ this.close( "Could not seek to start of requested range in given file (" + requestedRange.startOffset + ")", callback, true );
return false;
}
// Send confirmation of range we're about to send
try {
long ptr = file.getFilePointer();
if ( !sendRange( ptr, ptr + requestedRange.getLength() ) || !sendEndOfMeta() ) {
- log.error( "Could not send range confirmation" );
+ this.close( "Could not send range confirmation" );
return false;
}
} catch ( IOException e ) {
- log.error( "Could not determine current position in file " + filename );
+ this.close( "Could not determine current position in file " + filename );
return false;
}
// Finally send requested chunk
@@ -118,30 +122,26 @@ public class Uploader extends Transfer
try {
ret = file.read( data, 0, Math.min( length - hasRead, data.length ) );
} catch ( IOException e ) {
- log.error( "Error reading from file " + filename );
+ this.close( "Error reading from file ", callback, true );
return false;
}
if ( ret == -1 ) {
- this.close( "Error occured in Uploader.sendFile(),"
- + " while reading from File to send." );
+ this.close( "Error occured in Uploader.sendFile() while reading from File to send.", callback, true );
return false;
}
hasRead += ret;
try {
outStream.write( data, 0, ret );
} catch ( IOException e ) {
- log.error( "Sending payload failed" );
+ this.close( "Sending payload failed" );
return false;
}
+ if ( callback != null )
+ callback.uploadProgress( ret );
}
}
} finally {
- if ( file != null ) {
- try {
- file.close();
- } catch ( IOException e ) {
- }
- }
+ Transfer.safeClose( file );
}
return true;
}