summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/Transfer.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/Transfer.java')
-rw-r--r--src/main/java/org/openslx/filetransfer/Transfer.java365
1 files changed, 207 insertions, 158 deletions
diff --git a/src/main/java/org/openslx/filetransfer/Transfer.java b/src/main/java/org/openslx/filetransfer/Transfer.java
index a8c9a8a..3f2fdde 100644
--- a/src/main/java/org/openslx/filetransfer/Transfer.java
+++ b/src/main/java/org/openslx/filetransfer/Transfer.java
@@ -3,8 +3,11 @@ package org.openslx.filetransfer;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
@@ -14,60 +17,61 @@ import org.apache.log4j.Logger;
public abstract class Transfer
{
- protected final SSLSocketFactory sslSocketFactory;
- protected final SSLSocket satelliteSocket;
- protected final DataOutputStream dataToServer;
+ protected final SSLSocket transferSocket;
+ protected final DataOutputStream outStream;
protected final DataInputStream dataFromServer;
- protected String TOKEN = null;
- protected long[] RANGE = null;
protected String ERROR = null;
+ private boolean shouldGetToken;
protected final Logger log;
- protected Transfer( String ip, int port, SSLContext context, Logger log ) throws IOException
+ /**
+ * Actively initiated transfer.
+ *
+ * @param host Remote Host
+ * @param port Remote Port
+ * @param context SSL Context for encryption
+ * @param log Logger to use
+ * @throws IOException
+ */
+ protected Transfer( String host, int port, SSLContext context, Logger log ) throws IOException
{
this.log = log;
// create socket.
- sslSocketFactory = context.getSocketFactory();
+ SSLSocketFactory sslSocketFactory = context.getSocketFactory();
- satelliteSocket = (SSLSocket)sslSocketFactory.createSocket( ip, port );
- satelliteSocket.setSoTimeout( 2000 ); // set socket timeout.
+ transferSocket = (SSLSocket)sslSocketFactory.createSocket();
+ transferSocket.setSoTimeout( 5000 ); // set socket timeout.
+ transferSocket.connect( new InetSocketAddress( host, port ) );
- dataToServer = new DataOutputStream( satelliteSocket.getOutputStream() );
- dataFromServer = new DataInputStream( satelliteSocket.getInputStream() );
+ outStream = new DataOutputStream( transferSocket.getOutputStream() );
+ dataFromServer = new DataInputStream( transferSocket.getInputStream() );
+ shouldGetToken = false;
}
+ /**
+ * Passive transfer through incoming connection.
+ *
+ * @param socket already connected socket to remote peer
+ * @param log Logger to use
+ * @throws IOException
+ */
protected Transfer( SSLSocket socket, Logger log ) throws IOException
{
this.log = log;
- satelliteSocket = socket;
- dataToServer = new DataOutputStream( satelliteSocket.getOutputStream() );
- dataFromServer = new DataInputStream( satelliteSocket.getInputStream() );
- sslSocketFactory = null;
+ transferSocket = socket;
+ outStream = new DataOutputStream( transferSocket.getOutputStream() );
+ dataFromServer = new DataInputStream( transferSocket.getInputStream() );
+ shouldGetToken = true;
}
protected boolean sendRange( long startOffset, long endOffset )
{
- if ( RANGE != null ) {
- log.warn( "Range already set!" );
- return false;
- }
try {
+ log.debug( "Sending range: " + startOffset + " to " + endOffset );
sendKeyValuePair( "RANGE", startOffset + ":" + endOffset );
- RANGE[0] = startOffset;
- RANGE[1] = endOffset;
- } catch ( SocketTimeoutException ste ) {
- ste.printStackTrace();
- this.close( "Socket timeout occured ... close connection." );
} catch ( IOException e ) {
e.printStackTrace();
- readMetaData();
- if ( ERROR != null ) {
- if ( ERROR.equals( "timeout" ) ) {
- this.close( "Remote Socket timeout occured ... close connection." );
- }
- }
- log.info( "Sending RANGE in Uploader failed..." );
return false;
}
return true;
@@ -75,108 +79,58 @@ public abstract class Transfer
/***********************************************************************/
/**
- * Method for sending token for identification from satellite to master.
+ * Method for sending error Code to server. For example in case of wrong
+ * token, send code for wrong token.
*
- * @param token The token to send
*/
- public boolean sendToken( String token )
+ public boolean sendErrorCode( String errString )
{
- if ( TOKEN != null ) {
- log.warn( "Trying to send token while a token is already set! Ignoring..." );
- return false;
- }
- TOKEN = token;
try {
- sendKeyValuePair( "TOKEN", TOKEN );
- } catch ( SocketTimeoutException ste ) {
- ste.printStackTrace();
- this.close( "Socket timeout occured ... close connection." );
+ sendKeyValuePair( "ERROR", errString );
} catch ( IOException e ) {
e.printStackTrace();
- readMetaData();
- if ( ERROR != null ) {
- if ( ERROR.equals( "timeout" ) ) {
- this.close( "Remote Socket timeout occured ... close connection." );
- }
- }
- log.info( "Sending TOKEN in Downloader failed..." );
+ this.close( e.toString() );
return false;
}
return true;
}
- /***********************************************************************/
- /**
- * Method for reading incoming token for identification.
- *
- */
- public String getToken()
+ protected boolean sendToken( String token )
{
- return TOKEN;
- }
-
- private boolean parseRange( String range )
- {
- if ( range == null )
- return true;
- if ( RANGE != null ) {
- log.warn( "Warning: RANGE already set when trying to parse from " + range );
- return false;
- }
- String parts[] = range.split( ":", 2 );
- long ret[] = new long[ 2 ];
try {
- ret[0] = Long.parseLong( parts[0] );
- ret[1] = Long.parseLong( parts[1] );
- } catch ( Throwable t ) {
- log.warn( "Not parsable range: '" + range + "'" );
- return false;
- }
- if ( ret[1] <= ret[0] ) {
- log.warn( "Invalid range. Start >= end" );
+ sendKeyValuePair( "TOKEN", token );
+ } catch ( IOException e ) {
+ e.printStackTrace();
+ this.close( e.toString() );
return false;
}
- RANGE = ret;
return true;
}
- /***********************************************************************/
- /**
- * Getter for beginning of RANGE.
- *
- * @return
- */
- public long getStartOfRange()
+ public boolean sendDone()
{
- if ( RANGE != null ) {
- return RANGE[0];
+ try {
+ sendKeyValuePair( "DONE", "" );
+ } catch ( IOException e ) {
+ e.printStackTrace();
+ this.close( e.toString() );
+ return false;
}
- return -1;
+ return true;
}
- /***********************************************************************/
- /**
- * Getter for end of RANGE.
- *
- * @return
- */
- public long getEndOfRange()
+ protected boolean sendEndOfMeta()
{
- if ( RANGE != null ) {
- return RANGE[1];
+ try {
+ outStream.writeByte( 0 );
+ } catch ( SocketTimeoutException e ) {
+ log.error( "Error sending end of meta - socket timeout" );
+ return false;
+ } catch ( IOException e ) {
+ log.error( "Error sending end of meta - " + e.toString() );
+ return false;
}
- return -1;
- }
-
- /***********************************************************************/
- /**
- * Method for returning difference of current Range.
- *
- * @return
- */
- public int getDiffOfRange()
- {
- return (int)Math.abs( getEndOfRange() - getStartOfRange() );
+ return true;
}
/***********************************************************************/
@@ -185,10 +139,11 @@ public abstract class Transfer
* Split incoming bytes after first '=' and store value to specific
* variable.
*
- * @return true on success, false if reading failed
+ * @return map of meta data received, null on error
*/
- public boolean readMetaData()
+ protected MetaData readMetaData()
{
+ Map<String, String> entries = new HashMap<>();
try {
while ( true ) {
byte[] incoming = new byte[ 255 ];
@@ -198,9 +153,9 @@ public abstract class Transfer
retLengthByte = dataFromServer.read( incoming, 0, 1 );
// If .read() didn't return 1, it was not able to read first byte.
if ( retLengthByte != 1 ) {
- log.debug( " retLenthByte was not 1! retLengthByte = " + retLengthByte);
+ log.debug( " retLenthByte was not 1! retLengthByte = " + retLengthByte );
this.close( "Error occured while reading Metadata." );
- return false;
+ return null;
}
int length = incoming[0] & 0xFF;
@@ -218,7 +173,7 @@ public abstract class Transfer
int ret = dataFromServer.read( incoming, hasRead, length - hasRead );
if ( ret == -1 ) {
this.close( "Error occured while reading Metadata." );
- return false;
+ return null;
}
hasRead += ret;
}
@@ -230,62 +185,36 @@ public abstract class Transfer
log.warn( "Invalid key value pair received (" + data + ")" );
continue;
}
- if ( splitted[0].equals( "TOKEN" ) ) {
- if ( TOKEN != null ) {
- this.close( "Received a token when a token is already set!" );
- return false;
- }
- TOKEN = splitted[1];
- log.debug( "TOKEN: " + TOKEN );
- }
- else if ( splitted[0].equals( "RANGE" ) ) {
- if ( !parseRange( splitted[1] ) ) {
- this.close( "Could not parse RANGE token" );
- return false;
- }
- log.debug( "RANGE: '" + splitted[1] + "'" );
- }
- else if ( splitted[0].equals( "ERROR" ) ) {
+ if ( splitted[0].equals( "ERROR" ) )
ERROR = splitted[1];
- log.debug( "ERROR: " + ERROR );
+ if ( entries.containsKey( splitted[0] ) ) {
+ log.warn( "Received meta data key " + splitted[0] + " when already received, ignoring!" );
+ } else {
+ entries.put( splitted[0], splitted[1] );
}
}
} catch ( SocketTimeoutException ste ) {
ste.printStackTrace();
sendErrorCode( "timeout" );
this.close( "Socket Timeout occured in readMetaData." );
- return false;
+ return null;
} catch ( Exception e ) {
e.printStackTrace();
this.close( e.toString() );
- return false;
+ return null;
}
- return true;
+ return new MetaData( entries );
}
private void sendKeyValuePair( String key, String value ) throws IOException
{
byte[] data = ( key + "=" + value ).getBytes( StandardCharsets.UTF_8 );
- dataToServer.writeByte( data.length );
- dataToServer.write( data );
- }
-
- /***********************************************************************/
- /**
- * Method for sending error Code to server. For example in case of wrong
- * token, send code for wrong token.
- *
- */
- public Boolean sendErrorCode( String errString )
- {
try {
- sendKeyValuePair( "ERROR", errString );
- } catch ( IOException e ) {
- e.printStackTrace();
- this.close( e.toString() );
- return false;
+ outStream.writeByte( data.length );
+ outStream.write( data );
+ } catch ( SocketTimeoutException e ) {
+ log.warn( "Socket timeout when sending KVP with key " + key );
}
- return true;
}
/***********************************************************************/
@@ -298,12 +227,12 @@ public abstract class Transfer
if ( error != null )
log.info( error );
try {
- if ( satelliteSocket != null )
- this.satelliteSocket.close();
+ if ( transferSocket != null )
+ this.transferSocket.close();
if ( dataFromServer != null )
dataFromServer.close();
- if ( dataToServer != null )
- dataToServer.close();
+ if ( outStream != null )
+ outStream.close();
} catch ( IOException e ) {
e.printStackTrace();
}
@@ -317,8 +246,128 @@ public abstract class Transfer
*/
public boolean isValid()
{
- return satelliteSocket.isConnected() && !satelliteSocket.isClosed()
- && !satelliteSocket.isInputShutdown() && !satelliteSocket.isOutputShutdown();
+ return transferSocket.isConnected() && !transferSocket.isClosed()
+ && !transferSocket.isInputShutdown() && !transferSocket.isOutputShutdown();
+ }
+
+ /**
+ * Get error string received from remote side, if any.
+ *
+ * @return Error string, if received, or null.
+ */
+ public String getRemoteError()
+ {
+ return ERROR;
+ }
+
+ /**
+ * Get transfer token, sent by remote peer that initiated connection.
+ * Call this ONLY if all of the following conditions are met:
+ * - this is an incoming transfer connection
+ * - you didn't call it before
+ * - you didn't call download or upload yet
+ *
+ * @return The transfer token
+ */
+ public String getToken()
+ {
+ if ( !shouldGetToken ) {
+ log.error( "Invalid call of getToken. You either initiated the connection yourself, or you already called getToken before." );
+ this.close( null );
+ return null;
+ }
+ shouldGetToken = false;
+ MetaData meta = readMetaData();
+ if ( meta == null )
+ return null;
+ return meta.getToken();
+ }
+
+ /**
+ * Should we call getToken()? Used internally for detecting wrong usage of
+ * the transfer classes.
+ *
+ * @return yes or no
+ */
+ protected boolean shouldGetToken()
+ {
+ return shouldGetToken;
+ }
+
+ /**
+ * High level access to key-value-pairs.
+ */
+ class MetaData
+ {
+
+ private Map<String, String> meta;
+
+ private MetaData( Map<String, String> meta )
+ {
+ this.meta = meta;
+ }
+
+ /**
+ * Get transfer token, sent by remote peer that initiated connection.
+ *
+ * @return The transfer token
+ */
+ public String getToken()
+ {
+ return meta.get( "TOKEN" );
+ }
+
+ /**
+ * Check if remote peer set the DONE key, telling us the transfer is complete.
+ *
+ * @return yes or no
+ */
+ public boolean isDone()
+ {
+ return meta.containsKey( "DONE" );
+ }
+
+ /**
+ * Return range from this meta data class, or null
+ * if it doesn't contain a (valid) range key-value-pair.
+ *
+ * @return The range instance
+ */
+ public FileRange getRange()
+ {
+ if ( meta.containsKey( "RANGE" ) )
+ return parseRange( meta.get( "RANGE" ) );
+ return null;
+ }
+
+ /**
+ * Parse range in format START:END to {@link FileRange} instance.
+ *
+ * @param range String representation of range
+ * @return {@link FileRange} instance of range, or null on error
+ */
+ private FileRange parseRange( String range )
+ {
+ if ( range == null )
+ return null;
+ String parts[] = range.split( ":" );
+ if ( parts.length != 2 )
+ return null;
+ long start, end;
+ try {
+ start = Long.parseLong( parts[0] );
+ end = Long.parseLong( parts[1] );
+ } catch ( Throwable t ) {
+ log.warn( "Not parsable range: '" + range + "'" );
+ return null;
+ }
+ if ( start >= end ) {
+ log.warn( "Invalid range. Start >= end" );
+ return null;
+ }
+ return new FileRange( start, end );
+ }
+
}
}