summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2016-04-20 17:10:14 +0200
committerSimon Rettberg2016-04-20 17:10:14 +0200
commitecd3d22510aa2f1aa0c44cee015bd690d19f45ce (patch)
tree8ec91bf9500a9575308898f0f70b5a90f0ba4737
parentAdd queryUploadStatus to master server (diff)
downloadmaster-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.tar.gz
master-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.tar.xz
master-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.zip
More imgsync stuff
-rw-r--r--src/main/java/org/openslx/bwlp/thrift/iface/ImagePublishData.java125
-rw-r--r--src/main/java/org/openslx/filetransfer/Listener.java86
-rw-r--r--src/main/java/org/openslx/filetransfer/util/ChunkList.java4
-rw-r--r--src/main/java/org/openslx/filetransfer/util/HashChecker.java2
-rw-r--r--src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java36
-rw-r--r--src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java121
-rw-r--r--src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java1
-rw-r--r--src/main/thrift/bwlp.thrift1
8 files changed, 323 insertions, 53 deletions
diff --git a/src/main/java/org/openslx/bwlp/thrift/iface/ImagePublishData.java b/src/main/java/org/openslx/bwlp/thrift/iface/ImagePublishData.java
index f04fd6b..9226df6 100644
--- a/src/main/java/org/openslx/bwlp/thrift/iface/ImagePublishData.java
+++ b/src/main/java/org/openslx/bwlp/thrift/iface/ImagePublishData.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-04-18")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-04-20")
public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishData, ImagePublishData._Fields>, java.io.Serializable, Cloneable, Comparable<ImagePublishData> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ImagePublishData");
@@ -51,6 +51,7 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
private static final org.apache.thrift.protocol.TField VIRT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("virtId", org.apache.thrift.protocol.TType.STRING, (short)11);
private static final org.apache.thrift.protocol.TField IS_TEMPLATE_FIELD_DESC = new org.apache.thrift.protocol.TField("isTemplate", org.apache.thrift.protocol.TType.BOOL, (short)12);
private static final org.apache.thrift.protocol.TField OWNER_FIELD_DESC = new org.apache.thrift.protocol.TField("owner", org.apache.thrift.protocol.TType.STRUCT, (short)13);
+ private static final org.apache.thrift.protocol.TField MACHINE_DESCRIPTION_FIELD_DESC = new org.apache.thrift.protocol.TField("machineDescription", org.apache.thrift.protocol.TType.STRING, (short)14);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -71,6 +72,7 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
public String virtId; // required
public boolean isTemplate; // required
public UserInfo owner; // required
+ public ByteBuffer machineDescription; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -86,7 +88,8 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
OS_ID((short)10, "osId"),
VIRT_ID((short)11, "virtId"),
IS_TEMPLATE((short)12, "isTemplate"),
- OWNER((short)13, "owner");
+ OWNER((short)13, "owner"),
+ MACHINE_DESCRIPTION((short)14, "machineDescription");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -127,6 +130,8 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
return IS_TEMPLATE;
case 13: // OWNER
return OWNER;
+ case 14: // MACHINE_DESCRIPTION
+ return MACHINE_DESCRIPTION;
default:
return null;
}
@@ -203,6 +208,8 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
tmpMap.put(_Fields.OWNER, new org.apache.thrift.meta_data.FieldMetaData("owner", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, UserInfo.class)));
+ tmpMap.put(_Fields.MACHINE_DESCRIPTION, new org.apache.thrift.meta_data.FieldMetaData("machineDescription", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ImagePublishData.class, metaDataMap);
}
@@ -223,7 +230,8 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
int osId,
String virtId,
boolean isTemplate,
- UserInfo owner)
+ UserInfo owner,
+ ByteBuffer machineDescription)
{
this();
this.imageBaseId = imageBaseId;
@@ -243,6 +251,7 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
this.isTemplate = isTemplate;
setIsTemplateIsSet(true);
this.owner = owner;
+ this.machineDescription = org.apache.thrift.TBaseHelper.copyBinary(machineDescription);
}
/**
@@ -283,6 +292,9 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
if (other.isSetOwner()) {
this.owner = new UserInfo(other.owner);
}
+ if (other.isSetMachineDescription()) {
+ this.machineDescription = org.apache.thrift.TBaseHelper.copyBinary(other.machineDescription);
+ }
}
public ImagePublishData deepCopy() {
@@ -308,6 +320,7 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
setIsTemplateIsSet(false);
this.isTemplate = false;
this.owner = null;
+ this.machineDescription = null;
}
public String getImageBaseId() {
@@ -648,6 +661,40 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
}
}
+ public byte[] getMachineDescription() {
+ setMachineDescription(org.apache.thrift.TBaseHelper.rightSize(machineDescription));
+ return machineDescription == null ? null : machineDescription.array();
+ }
+
+ public ByteBuffer bufferForMachineDescription() {
+ return org.apache.thrift.TBaseHelper.copyBinary(machineDescription);
+ }
+
+ public ImagePublishData setMachineDescription(byte[] machineDescription) {
+ this.machineDescription = machineDescription == null ? (ByteBuffer)null : ByteBuffer.wrap(Arrays.copyOf(machineDescription, machineDescription.length));
+ return this;
+ }
+
+ public ImagePublishData setMachineDescription(ByteBuffer machineDescription) {
+ this.machineDescription = org.apache.thrift.TBaseHelper.copyBinary(machineDescription);
+ return this;
+ }
+
+ public void unsetMachineDescription() {
+ this.machineDescription = null;
+ }
+
+ /** Returns true if field machineDescription is set (has been assigned a value) and false otherwise */
+ public boolean isSetMachineDescription() {
+ return this.machineDescription != null;
+ }
+
+ public void setMachineDescriptionIsSet(boolean value) {
+ if (!value) {
+ this.machineDescription = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case IMAGE_BASE_ID:
@@ -754,6 +801,14 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
}
break;
+ case MACHINE_DESCRIPTION:
+ if (value == null) {
+ unsetMachineDescription();
+ } else {
+ setMachineDescription((ByteBuffer)value);
+ }
+ break;
+
}
}
@@ -798,6 +853,9 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
case OWNER:
return getOwner();
+ case MACHINE_DESCRIPTION:
+ return getMachineDescription();
+
}
throw new IllegalStateException();
}
@@ -835,6 +893,8 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
return isSetIsTemplate();
case OWNER:
return isSetOwner();
+ case MACHINE_DESCRIPTION:
+ return isSetMachineDescription();
}
throw new IllegalStateException();
}
@@ -969,6 +1029,15 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
return false;
}
+ boolean this_present_machineDescription = true && this.isSetMachineDescription();
+ boolean that_present_machineDescription = true && that.isSetMachineDescription();
+ if (this_present_machineDescription || that_present_machineDescription) {
+ if (!(this_present_machineDescription && that_present_machineDescription))
+ return false;
+ if (!this.machineDescription.equals(that.machineDescription))
+ return false;
+ }
+
return true;
}
@@ -1041,6 +1110,11 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
if (present_owner)
list.add(owner);
+ boolean present_machineDescription = true && (isSetMachineDescription());
+ list.add(present_machineDescription);
+ if (present_machineDescription)
+ list.add(machineDescription);
+
return list.hashCode();
}
@@ -1182,6 +1256,16 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetMachineDescription()).compareTo(other.isSetMachineDescription());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetMachineDescription()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.machineDescription, other.machineDescription);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -1289,6 +1373,14 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
sb.append(this.owner);
}
first = false;
+ if (!first) sb.append(", ");
+ sb.append("machineDescription:");
+ if (this.machineDescription == null) {
+ sb.append("null");
+ } else {
+ org.apache.thrift.TBaseHelper.toString(this.machineDescription, sb);
+ }
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -1466,6 +1558,14 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 14: // MACHINE_DESCRIPTION
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.machineDescription = iprot.readBinary();
+ struct.setMachineDescriptionIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -1552,6 +1652,11 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
struct.owner.write(oprot);
oprot.writeFieldEnd();
}
+ if (struct.machineDescription != null) {
+ oprot.writeFieldBegin(MACHINE_DESCRIPTION_FIELD_DESC);
+ oprot.writeBinary(struct.machineDescription);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -1609,7 +1714,10 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
if (struct.isSetOwner()) {
optionals.set(12);
}
- oprot.writeBitSet(optionals, 13);
+ if (struct.isSetMachineDescription()) {
+ optionals.set(13);
+ }
+ oprot.writeBitSet(optionals, 14);
if (struct.isSetImageBaseId()) {
oprot.writeString(struct.imageBaseId);
}
@@ -1661,12 +1769,15 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
if (struct.isSetOwner()) {
struct.owner.write(oprot);
}
+ if (struct.isSetMachineDescription()) {
+ oprot.writeBinary(struct.machineDescription);
+ }
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, ImagePublishData struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
- BitSet incoming = iprot.readBitSet(13);
+ BitSet incoming = iprot.readBitSet(14);
if (incoming.get(0)) {
struct.imageBaseId = iprot.readString();
struct.setImageBaseIdIsSet(true);
@@ -1739,6 +1850,10 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat
struct.owner.read(iprot);
struct.setOwnerIsSet(true);
}
+ if (incoming.get(13)) {
+ struct.machineDescription = iprot.readBinary();
+ struct.setMachineDescriptionIsSet(true);
+ }
}
}
diff --git a/src/main/java/org/openslx/filetransfer/Listener.java b/src/main/java/org/openslx/filetransfer/Listener.java
index e6bbb62..f7d4225 100644
--- a/src/main/java/org/openslx/filetransfer/Listener.java
+++ b/src/main/java/org/openslx/filetransfer/Listener.java
@@ -4,6 +4,11 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLServerSocketFactory;
@@ -18,9 +23,10 @@ public class Listener
private ServerSocket listenSocket = null;
private Thread acceptThread = null;
private final int readTimeoutMs;
+ private final ExecutorService processingPool = new ThreadPoolExecutor( 0, 8, 5, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() );
- private static final byte U = 85; // hex - code 'U' = 85.
- private static final byte D = 68; // hex - code 'D' = 68.
+ private static final byte CONNECTING_PEER_WANTS_TO_UPLOAD = 85; // hex - code 'U' = 85.
+ private static final byte CONNECTING_PEER_WANTS_TO_DOWNLOAD = 68; // hex - code 'D' = 68.
private static Logger log = Logger.getLogger( Listener.class );
/***********************************************************************/
@@ -48,8 +54,10 @@ public class Listener
* connection, and start Downloader or Uploader.
*
*/
- private boolean listen()
+ private synchronized boolean listen()
{
+ if ( listenSocket != null )
+ return true;
try {
if ( this.context == null ) {
listenSocket = new ServerSocket();
@@ -61,61 +69,81 @@ public class Listener
listenSocket.bind( new InetSocketAddress( this.port ) );
} catch ( Exception e ) {
log.error( "Cannot listen on port " + this.port, e );
+ listenSocket = null;
return false;
}
return true;
}
- private void run()
+ private synchronized void run()
{
+ if ( acceptThread != null )
+ return;
final Listener instance = this;
acceptThread = new Thread( "BFTP-Listen-" + this.port ) {
@Override
public void run()
{
try {
+ // Run accept loop in own thread
while ( !isInterrupted() ) {
- Socket connectionSocket = null;
+ Socket acceptedSocket = null;
try {
- connectionSocket = listenSocket.accept();
+ acceptedSocket = listenSocket.accept();
} catch ( SocketTimeoutException e ) {
continue;
} catch ( Exception e ) {
log.warn( "Some exception when accepting! Trying to resume...", e );
Transfer.safeClose( listenSocket );
+ listenSocket = null;
if ( !listen() ) {
log.error( "Could not re-open listening socket" );
break;
}
continue;
}
- try {
- connectionSocket.setSoTimeout( 2000 ); // 2 second timeout enough? Maybe even use a small thread pool for handling accepted connections
+ // Handle each accepted connection in a thread pool
+ final Socket connection = acceptedSocket;
+ Runnable handler = new Runnable() {
+ @Override
+ public void run()
+ {
- byte[] b = new byte[ 1 ];
- int length = connectionSocket.getInputStream().read( b );
- if ( length == -1 )
- continue;
+ try {
+ // Give initial byte signalling mode of operation 5 secs to arrive
+ connection.setSoTimeout( 5000 );
- connectionSocket.setSoTimeout( readTimeoutMs );
+ byte[] b = new byte[ 1 ];
+ int length = connection.getInputStream().read( b );
+ if ( length == -1 ) {
+ Transfer.safeClose( connection );
+ return;
+ }
+ // Byte arrived, now set desired timeout
+ connection.setSoTimeout( readTimeoutMs );
- if ( b[0] == U ) {
- // --> start Downloader(socket).
- Downloader d = new Downloader( connectionSocket );
- incomingEvent.incomingUploadRequest( d );
- }
- else if ( b[0] == D ) {
- // --> start Uploader(socket).
- Uploader u = new Uploader( connectionSocket );
- incomingEvent.incomingDownloadRequest( u );
- }
- else {
- log.debug( "Got invalid init-byte ... close connection" );
- connectionSocket.close();
+ if ( b[0] == CONNECTING_PEER_WANTS_TO_UPLOAD ) {
+ // --> start Downloader(socket).
+ Downloader d = new Downloader( connection );
+ incomingEvent.incomingUploadRequest( d );
+ } else if ( b[0] == CONNECTING_PEER_WANTS_TO_DOWNLOAD ) {
+ // --> start Uploader(socket).
+ Uploader u = new Uploader( connection );
+ incomingEvent.incomingDownloadRequest( u );
+ } else {
+ log.debug( "Got invalid init-byte ... close connection" );
+ Transfer.safeClose( connection );
+ }
+ } catch ( Exception e ) {
+ log.warn( "Error accepting client", e );
+ Transfer.safeClose( connection );
+ }
}
- } catch ( Exception e ) {
- log.warn( "Error accepting client", e );
- Transfer.safeClose( connectionSocket );
+ };
+ try {
+ processingPool.execute( handler );
+ } catch ( RejectedExecutionException e ) {
+ Transfer.safeClose( acceptedSocket );
}
}
} finally {
diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
index 372b082..c497be0 100644
--- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java
+++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
@@ -174,14 +174,14 @@ public class ChunkList
*
* @param c The chunk in question
*/
- public synchronized void markSuccessful( FileChunk c )
+ public synchronized void markCompleted( FileChunk c, boolean hashCheckSuccessful )
{
if ( !pendingChunks.remove( c ) ) {
LOGGER.warn( "Inconsistent state: markSuccessful called for Chunk " + c.toString()
+ ", but chunk is not marked as currently transferring!" );
return;
}
- c.setStatus( ChunkStatus.COMPLETE );
+ c.setStatus( ( hashCheckSuccessful || c.getSha1Sum() == null ) ? ChunkStatus.COMPLETE : ChunkStatus.HASHING );
completeChunks.add( c );
this.notifyAll();
}
diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
index d9db7df..5fdf582 100644
--- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java
+++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
@@ -105,13 +105,11 @@ public class HashChecker
}
}
}
- ChunkStatus old = chunk.getStatus();
chunk.setStatus( ChunkStatus.HASHING );
if ( blocking ) {
queue.put( task );
} else {
if ( !queue.offer( task ) ) {
- chunk.setStatus( old );
return false;
}
}
diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
index 04ddc17..c2d8ee9 100644
--- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
+++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
@@ -155,8 +155,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
return state;
}
- public synchronized TransferStatus getStatus() {
- return new TransferStatus(chunks.getStatusArray(), getState());
+ public synchronized TransferStatus getStatus()
+ {
+ return new TransferStatus( chunks.getStatusArray(), getState() );
}
public final ChunkList getChunks()
@@ -186,9 +187,10 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
chunks.updateSha1Sums( hashList );
if ( hashChecker == null )
return;
- FileChunk chunk;
- int cnt = 0;
- while ( null != ( chunk = chunks.getUnhashedComplete() ) && ++cnt <= 3 ) {
+ for ( int cnt = 0; cnt < 3; ++cnt ) {
+ FileChunk chunk = chunks.getUnhashedComplete();
+ if ( chunk == null )
+ break;
byte[] data;
try {
data = loadChunkFromFile( chunk );
@@ -200,14 +202,18 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
if ( data == null ) {
LOGGER.warn( "blockhash update: Will mark unloadable unhashed chunk as valid :-(" );
- chunks.markSuccessful( chunk );
+ chunks.markCompleted( chunk, true );
chunkStatusChanged( chunk );
continue;
}
try {
- if ( !hashChecker.queue( chunk, data, this, false ) ) // false == blocked while adding, so stop
+ if ( !hashChecker.queue( chunk, data, this, false ) ) { // false == queue full, stop
+ chunks.markCompleted( chunk, false );
break;
+ }
} catch ( InterruptedException e ) {
+ LOGGER.debug( "updateBlockHashList got interrupted" );
+ chunks.markCompleted( chunk, false );
Thread.currentThread().interrupt();
return;
}
@@ -304,7 +310,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
} else {
// We have no hash checker or the hash for the current chunk is unknown - flush to disk
writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer );
- chunks.markSuccessful( currentChunk );
+ chunks.markCompleted( currentChunk, true );
chunkStatusChanged( currentChunk );
}
}
@@ -356,7 +362,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
chunks.markFailed( cbh.currentChunk );
chunkStatusChanged( cbh.currentChunk );
}
- LOGGER.warn( "Download of " + getTmpFileName().getAbsolutePath() + " failed" );
+ LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" );
}
if ( state != TransferState.FINISHED && state != TransferState.ERROR ) {
lastActivityTime.set( System.currentTimeMillis() );
@@ -368,7 +374,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
finishUploadInternal();
} else {
// Keep pumping unhashed chunks into the hasher
- queueUnhashedChunk();
+ queueUnhashedChunk( true );
}
}
} );
@@ -443,7 +449,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
if ( !chunk.isWrittenToDisk() ) {
writeFileData( chunk.range.startOffset, chunk.range.getLength(), data );
}
- chunks.markSuccessful( chunk );
+ chunks.markCompleted( chunk, true );
chunkStatusChanged( chunk );
if ( chunks.isComplete() ) {
finishUploadInternal();
@@ -457,13 +463,13 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
break;
}
// A block finished, see if we can queue a new one
- queueUnhashedChunk();
+ queueUnhashedChunk( false );
}
/**
* Gets an unhashed chunk (if existent) and queues it for hashing
*/
- protected void queueUnhashedChunk()
+ protected void queueUnhashedChunk( boolean blocking )
{
FileChunk chunk = chunks.getUnhashedComplete();
if ( chunk == null )
@@ -479,12 +485,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
if ( data == null ) {
LOGGER.warn( "Cannot queue unhashed chunk: Will mark unloadable unhashed chunk as valid :-(" );
- chunks.markSuccessful( chunk );
+ chunks.markCompleted( chunk, true );
chunkStatusChanged( chunk );
return;
}
try {
- hashChecker.queue( chunk, data, this, true );
+ hashChecker.queue( chunk, data, this, blocking );
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
}
diff --git a/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java
new file mode 100644
index 0000000..12deddb
--- /dev/null
+++ b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java
@@ -0,0 +1,121 @@
+package org.openslx.filetransfer.util;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.openslx.bwlp.thrift.iface.TransferInformation;
+import org.openslx.filetransfer.Uploader;
+
+public abstract class OutgoingTransferBase extends AbstractTransfer
+{
+
+ /*
+ * Constants
+ */
+
+ private static final Logger LOGGER = Logger.getLogger( OutgoingTransferBase.class );
+
+ private static final long INACTIVITY_TIMEOUT = TimeUnit.MINUTES.toMillis( 5 );
+
+ /*
+ * Overridable constants
+ */
+
+ protected static int MAX_CONNECTIONS_PER_TRANSFER = 2;
+
+ /*
+ * Class members
+ */
+
+ /**
+ * Remote peer is downloading, so we have Uploaders
+ */
+ private final List<Uploader> uploads = new ArrayList<>();
+
+ /**
+ * File being uploaded
+ */
+ private final File sourceFile;
+
+ private final TransferInformation transferInformation;
+
+ public OutgoingTransferBase( String transferId, File sourceFile, int plainPort, int sslPort )
+ {
+ super( transferId );
+ this.sourceFile = sourceFile;
+ this.transferInformation = new TransferInformation( transferId, plainPort, sslPort );
+ }
+
+ /**
+ * Add another connection for this file transfer.
+ *
+ * @param connection
+ * @return true if the connection is accepted, false if it should be
+ * discarded
+ */
+ public synchronized boolean addConnection( final Uploader connection, ExecutorService pool )
+ {
+ synchronized ( uploads ) {
+ if ( uploads.size() >= MAX_CONNECTIONS_PER_TRANSFER )
+ return false;
+ uploads.add( connection );
+ }
+ return runConnectionInternal( connection, pool );
+ }
+
+ protected boolean runConnectionInternal( final Uploader connection, ExecutorService pool )
+ {
+ try {
+ pool.execute( new Runnable() {
+ @Override
+ public void run()
+ {
+ boolean ret = connection.upload( sourceFile.getAbsolutePath() );
+ synchronized ( uploads ) {
+ uploads.remove( connection );
+ }
+ if ( ret && uploads.isEmpty() && potentialFinishTime.get() == 0 ) {
+ potentialFinishTime.set( System.currentTimeMillis() );
+ }
+ lastActivityTime.set( System.currentTimeMillis() );
+ }
+ } );
+ } catch ( Exception e ) {
+ LOGGER.warn( "threadpool rejected the incoming file transfer", e );
+ synchronized ( uploads ) {
+ uploads.remove( connection );
+ }
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public TransferInformation getTransferInfo()
+ {
+ return transferInformation;
+ }
+
+ @Override
+ public final boolean isActive()
+ {
+ return uploads.size() > 0 || lastActivityTime.get() + INACTIVITY_TIMEOUT > System.currentTimeMillis();
+ }
+
+ @Override
+ public void cancel()
+ {
+ // Void
+ }
+
+ @Override
+ public final int getActiveConnectionCount()
+ {
+ return uploads.size();
+ }
+
+}
diff --git a/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java b/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java
index 39accef..5cf0a10 100644
--- a/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java
+++ b/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java
@@ -6,4 +6,5 @@ import org.openslx.bwlp.thrift.iface.ImagePublishData;
public class ImagePublishDataEx extends ImagePublishData
{
public String exImagePath;
+ public boolean exIsValid;
}
diff --git a/src/main/thrift/bwlp.thrift b/src/main/thrift/bwlp.thrift
index b0e866a..1389008 100644
--- a/src/main/thrift/bwlp.thrift
+++ b/src/main/thrift/bwlp.thrift
@@ -228,6 +228,7 @@ struct ImagePublishData {
11: string virtId,
12: bool isTemplate,
13: UserInfo owner,
+ 14: binary machineDescription,
}
struct NetRule {