summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openslx/filetransfer/Transfer.java
diff options
context:
space:
mode:
authorSimon Rettberg2014-09-19 18:13:06 +0200
committerSimon Rettberg2014-09-19 18:13:06 +0200
commit52fa9a47498a3727d11a34205c9920f9a10e8aeb (patch)
treec99723aec9ef430132d1eeff08e43684cdfacbf3 /src/main/java/org/openslx/filetransfer/Transfer.java
parentAdd debugging to file transfer (diff)
downloadmaster-sync-shared-52fa9a47498a3727d11a34205c9920f9a10e8aeb.tar.gz
master-sync-shared-52fa9a47498a3727d11a34205c9920f9a10e8aeb.tar.xz
master-sync-shared-52fa9a47498a3727d11a34205c9920f9a10e8aeb.zip
Rework file transfer, try to use callbacks for everything
No more juggling with sendRange() and sendData(), which was easy to use wrong, and cause lots of weird errors.
Diffstat (limited to 'src/main/java/org/openslx/filetransfer/Transfer.java')
-rw-r--r--src/main/java/org/openslx/filetransfer/Transfer.java365
1 files changed, 207 insertions, 158 deletions
diff --git a/src/main/java/org/openslx/filetransfer/Transfer.java b/src/main/java/org/openslx/filetransfer/Transfer.java
index a8c9a8a..3f2fdde 100644
--- a/src/main/java/org/openslx/filetransfer/Transfer.java
+++ b/src/main/java/org/openslx/filetransfer/Transfer.java
@@ -3,8 +3,11 @@ package org.openslx.filetransfer;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
@@ -14,60 +17,61 @@ import org.apache.log4j.Logger;
public abstract class Transfer
{
- protected final SSLSocketFactory sslSocketFactory;
- protected final SSLSocket satelliteSocket;
- protected final DataOutputStream dataToServer;
+ protected final SSLSocket transferSocket;
+ protected final DataOutputStream outStream;
protected final DataInputStream dataFromServer;
- protected String TOKEN = null;
- protected long[] RANGE = null;
protected String ERROR = null;
+ private boolean shouldGetToken;
protected final Logger log;
- protected Transfer( String ip, int port, SSLContext context, Logger log ) throws IOException
+ /**
+ * Actively initiated transfer.
+ *
+ * @param host Remote Host
+ * @param port Remote Port
+ * @param context SSL Context for encryption
+ * @param log Logger to use
+ * @throws IOException
+ */
+ protected Transfer( String host, int port, SSLContext context, Logger log ) throws IOException
{
this.log = log;
// create socket.
- sslSocketFactory = context.getSocketFactory();
+ SSLSocketFactory sslSocketFactory = context.getSocketFactory();
- satelliteSocket = (SSLSocket)sslSocketFactory.createSocket( ip, port );
- satelliteSocket.setSoTimeout( 2000 ); // set socket timeout.
+ transferSocket = (SSLSocket)sslSocketFactory.createSocket();
+ transferSocket.setSoTimeout( 5000 ); // set socket timeout.
+ transferSocket.connect( new InetSocketAddress( host, port ) );
- dataToServer = new DataOutputStream( satelliteSocket.getOutputStream() );
- dataFromServer = new DataInputStream( satelliteSocket.getInputStream() );
+ outStream = new DataOutputStream( transferSocket.getOutputStream() );
+ dataFromServer = new DataInputStream( transferSocket.getInputStream() );
+ shouldGetToken = false;
}
+ /**
+ * Passive transfer through incoming connection.
+ *
+ * @param socket already connected socket to remote peer
+ * @param log Logger to use
+ * @throws IOException
+ */
protected Transfer( SSLSocket socket, Logger log ) throws IOException
{
this.log = log;
- satelliteSocket = socket;
- dataToServer = new DataOutputStream( satelliteSocket.getOutputStream() );
- dataFromServer = new DataInputStream( satelliteSocket.getInputStream() );
- sslSocketFactory = null;
+ transferSocket = socket;
+ outStream = new DataOutputStream( transferSocket.getOutputStream() );
+ dataFromServer = new DataInputStream( transferSocket.getInputStream() );
+ shouldGetToken = true;
}
protected boolean sendRange( long startOffset, long endOffset )
{
- if ( RANGE != null ) {
- log.warn( "Range already set!" );
- return false;
- }
try {
+ log.debug( "Sending range: " + startOffset + " to " + endOffset );
sendKeyValuePair( "RANGE", startOffset + ":" + endOffset );
- RANGE[0] = startOffset;
- RANGE[1] = endOffset;
- } catch ( SocketTimeoutException ste ) {
- ste.printStackTrace();
- this.close( "Socket timeout occured ... close connection." );
} catch ( IOException e ) {
e.printStackTrace();
- readMetaData();
- if ( ERROR != null ) {
- if ( ERROR.equals( "timeout" ) ) {
- this.close( "Remote Socket timeout occured ... close connection." );
- }
- }
- log.info( "Sending RANGE in Uploader failed..." );
return false;
}
return true;
@@ -75,108 +79,58 @@ public abstract class Transfer
/***********************************************************************/
/**
- * Method for sending token for identification from satellite to master.
+ * Method for sending error Code to server. For example in case of wrong
+ * token, send code for wrong token.
*
- * @param token The token to send
*/
- public boolean sendToken( String token )
+ public boolean sendErrorCode( String errString )
{
- if ( TOKEN != null ) {
- log.warn( "Trying to send token while a token is already set! Ignoring..." );
- return false;
- }
- TOKEN = token;
try {
- sendKeyValuePair( "TOKEN", TOKEN );
- } catch ( SocketTimeoutException ste ) {
- ste.printStackTrace();
- this.close( "Socket timeout occured ... close connection." );
+ sendKeyValuePair( "ERROR", errString );
} catch ( IOException e ) {
e.printStackTrace();
- readMetaData();
- if ( ERROR != null ) {
- if ( ERROR.equals( "timeout" ) ) {
- this.close( "Remote Socket timeout occured ... close connection." );
- }
- }
- log.info( "Sending TOKEN in Downloader failed..." );
+ this.close( e.toString() );
return false;
}
return true;
}
- /***********************************************************************/
- /**
- * Method for reading incoming token for identification.
- *
- */
- public String getToken()
+ protected boolean sendToken( String token )
{
- return TOKEN;
- }
-
- private boolean parseRange( String range )
- {
- if ( range == null )
- return true;
- if ( RANGE != null ) {
- log.warn( "Warning: RANGE already set when trying to parse from " + range );
- return false;
- }
- String parts[] = range.split( ":", 2 );
- long ret[] = new long[ 2 ];
try {
- ret[0] = Long.parseLong( parts[0] );
- ret[1] = Long.parseLong( parts[1] );
- } catch ( Throwable t ) {
- log.warn( "Not parsable range: '" + range + "'" );
- return false;
- }
- if ( ret[1] <= ret[0] ) {
- log.warn( "Invalid range. Start >= end" );
+ sendKeyValuePair( "TOKEN", token );
+ } catch ( IOException e ) {
+ e.printStackTrace();
+ this.close( e.toString() );
return false;
}
- RANGE = ret;
return true;
}
- /***********************************************************************/
- /**
- * Getter for beginning of RANGE.
- *
- * @return
- */
- public long getStartOfRange()
+ public boolean sendDone()
{
- if ( RANGE != null ) {
- return RANGE[0];
+ try {
+ sendKeyValuePair( "DONE", "" );
+ } catch ( IOException e ) {
+ e.printStackTrace();
+ this.close( e.toString() );
+ return false;
}
- return -1;
+ return true;
}
- /***********************************************************************/
- /**
- * Getter for end of RANGE.
- *
- * @return
- */
- public long getEndOfRange()
+ protected boolean sendEndOfMeta()
{
- if ( RANGE != null ) {
- return RANGE[1];
+ try {
+ outStream.writeByte( 0 );
+ } catch ( SocketTimeoutException e ) {
+ log.error( "Error sending end of meta - socket timeout" );
+ return false;
+ } catch ( IOException e ) {
+ log.error( "Error sending end of meta - " + e.toString() );
+ return false;
}
- return -1;
- }
-
- /***********************************************************************/
- /**
- * Method for returning difference of current Range.
- *
- * @return
- */
- public int getDiffOfRange()
- {
- return (int)Math.abs( getEndOfRange() - getStartOfRange() );
+ return true;
}
/***********************************************************************/
@@ -185,10 +139,11 @@ public abstract class Transfer
* Split incoming bytes after first '=' and store value to specific
* variable.
*
- * @return true on success, false if reading failed
+ * @return map of meta data received, null on error
*/
- public boolean readMetaData()
+ protected MetaData readMetaData()
{
+ Map<String, String> entries = new HashMap<>();
try {
while ( true ) {
byte[] incoming = new byte[ 255 ];
@@ -198,9 +153,9 @@ public abstract class Transfer
retLengthByte = dataFromServer.read( incoming, 0, 1 );
// If .read() didn't return 1, it was not able to read first byte.
if ( retLengthByte != 1 ) {
- log.debug( " retLenthByte was not 1! retLengthByte = " + retLengthByte);
+ log.debug( " retLenthByte was not 1! retLengthByte = " + retLengthByte );
this.close( "Error occured while reading Metadata." );
- return false;
+ return null;
}
int length = incoming[0] & 0xFF;
@@ -218,7 +173,7 @@ public abstract class Transfer
int ret = dataFromServer.read( incoming, hasRead, length - hasRead );
if ( ret == -1 ) {
this.close( "Error occured while reading Metadata." );
- return false;
+ return null;
}
hasRead += ret;
}
@@ -230,62 +185,36 @@ public abstract class Transfer
log.warn( "Invalid key value pair received (" + data + ")" );
continue;
}
- if ( splitted[0].equals( "TOKEN" ) ) {
- if ( TOKEN != null ) {
- this.close( "Received a token when a token is already set!" );
- return false;
- }
- TOKEN = splitted[1];
- log.debug( "TOKEN: " + TOKEN );
- }
- else if ( splitted[0].equals( "RANGE" ) ) {
- if ( !parseRange( splitted[1] ) ) {
- this.close( "Could not parse RANGE token" );
- return false;
- }
- log.debug( "RANGE: '" + splitted[1] + "'" );
- }
- else if ( splitted[0].equals( "ERROR" ) ) {
+ if ( splitted[0].equals( "ERROR" ) )
ERROR = splitted[1];
- log.debug( "ERROR: " + ERROR );
+ if ( entries.containsKey( splitted[0] ) ) {
+ log.warn( "Received meta data key " + splitted[0] + " when already received, ignoring!" );
+ } else {
+ entries.put( splitted[0], splitted[1] );
}
}
} catch ( SocketTimeoutException ste ) {
ste.printStackTrace();
sendErrorCode( "timeout" );
this.close( "Socket Timeout occured in readMetaData." );
- return false;
+ return null;
} catch ( Exception e ) {
e.printStackTrace();
this.close( e.toString() );
- return false;
+ return null;
}
- return true;
+ return new MetaData( entries );
}
private void sendKeyValuePair( String key, String value ) throws IOException
{
byte[] data = ( key + "=" + value ).getBytes( StandardCharsets.UTF_8 );
- dataToServer.writeByte( data.length );
- dataToServer.write( data );
- }
-
- /***********************************************************************/
- /**
- * Method for sending error Code to server. For example in case of wrong
- * token, send code for wrong token.
- *
- */
- public Boolean sendErrorCode( String errString )
- {
try {
- sendKeyValuePair( "ERROR", errString );
- } catch ( IOException e ) {
- e.printStackTrace();
- this.close( e.toString() );
- return false;
+ outStream.writeByte( data.length );
+ outStream.write( data );
+ } catch ( SocketTimeoutException e ) {
+ log.warn( "Socket timeout when sending KVP with key " + key );
}
- return true;
}
/***********************************************************************/
@@ -298,12 +227,12 @@ public abstract class Transfer
if ( error != null )
log.info( error );
try {
- if ( satelliteSocket != null )
- this.satelliteSocket.close();
+ if ( transferSocket != null )
+ this.transferSocket.close();
if ( dataFromServer != null )
dataFromServer.close();
- if ( dataToServer != null )
- dataToServer.close();
+ if ( outStream != null )
+ outStream.close();
} catch ( IOException e ) {
e.printStackTrace();
}
@@ -317,8 +246,128 @@ public abstract class Transfer
*/
public boolean isValid()
{
- return satelliteSocket.isConnected() && !satelliteSocket.isClosed()
- && !satelliteSocket.isInputShutdown() && !satelliteSocket.isOutputShutdown();
+ return transferSocket.isConnected() && !transferSocket.isClosed()
+ && !transferSocket.isInputShutdown() && !transferSocket.isOutputShutdown();
+ }
+
+ /**
+ * Get error string received from remote side, if any.
+ *
+ * @return Error string, if received, or null.
+ */
+ public String getRemoteError()
+ {
+ return ERROR;
+ }
+
+ /**
+ * Get transfer token, sent by remote peer that initiated connection.
+ * Call this ONLY if all of the following conditions are met:
+ * - this is an incoming transfer connection
+ * - you didn't call it before
+ * - you didn't call download or upload yet
+ *
+ * @return The transfer token
+ */
+ public String getToken()
+ {
+ if ( !shouldGetToken ) {
+ log.error( "Invalid call of getToken. You either initiated the connection yourself, or you already called getToken before." );
+ this.close( null );
+ return null;
+ }
+ shouldGetToken = false;
+ MetaData meta = readMetaData();
+ if ( meta == null )
+ return null;
+ return meta.getToken();
+ }
+
+ /**
+ * Should we call getToken()? Used internally for detecting wrong usage of
+ * the transfer classes.
+ *
+ * @return yes or no
+ */
+ protected boolean shouldGetToken()
+ {
+ return shouldGetToken;
+ }
+
+ /**
+ * High level access to key-value-pairs.
+ */
+ class MetaData
+ {
+
+ private Map<String, String> meta;
+
+ private MetaData( Map<String, String> meta )
+ {
+ this.meta = meta;
+ }
+
+ /**
+ * Get transfer token, sent by remote peer that initiated connection.
+ *
+ * @return The transfer token
+ */
+ public String getToken()
+ {
+ return meta.get( "TOKEN" );
+ }
+
+ /**
+ * Check if remote peer set the DONE key, telling us the transfer is complete.
+ *
+ * @return yes or no
+ */
+ public boolean isDone()
+ {
+ return meta.containsKey( "DONE" );
+ }
+
+ /**
+ * Return range from this meta data class, or null
+ * if it doesn't contain a (valid) range key-value-pair.
+ *
+ * @return The range instance
+ */
+ public FileRange getRange()
+ {
+ if ( meta.containsKey( "RANGE" ) )
+ return parseRange( meta.get( "RANGE" ) );
+ return null;
+ }
+
+ /**
+ * Parse range in format START:END to {@link FileRange} instance.
+ *
+ * @param range String representation of range
+ * @return {@link FileRange} instance of range, or null on error
+ */
+ private FileRange parseRange( String range )
+ {
+ if ( range == null )
+ return null;
+ String parts[] = range.split( ":" );
+ if ( parts.length != 2 )
+ return null;
+ long start, end;
+ try {
+ start = Long.parseLong( parts[0] );
+ end = Long.parseLong( parts[1] );
+ } catch ( Throwable t ) {
+ log.warn( "Not parsable range: '" + range + "'" );
+ return null;
+ }
+ if ( start >= end ) {
+ log.warn( "Invalid range. Start >= end" );
+ return null;
+ }
+ return new FileRange( start, end );
+ }
+
}
}