diff options
author | Simon Rettberg | 2016-04-20 17:10:14 +0200 |
---|---|---|
committer | Simon Rettberg | 2016-04-20 17:10:14 +0200 |
commit | ecd3d22510aa2f1aa0c44cee015bd690d19f45ce (patch) | |
tree | 8ec91bf9500a9575308898f0f70b5a90f0ba4737 | |
parent | Add queryUploadStatus to master server (diff) | |
download | master-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.tar.gz master-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.tar.xz master-sync-shared-ecd3d22510aa2f1aa0c44cee015bd690d19f45ce.zip |
More imgsync stuff
8 files changed, 323 insertions, 53 deletions
diff --git a/src/main/java/org/openslx/bwlp/thrift/iface/ImagePublishData.java b/src/main/java/org/openslx/bwlp/thrift/iface/ImagePublishData.java index f04fd6b..9226df6 100644 --- a/src/main/java/org/openslx/bwlp/thrift/iface/ImagePublishData.java +++ b/src/main/java/org/openslx/bwlp/thrift/iface/ImagePublishData.java @@ -34,7 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-04-18") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-04-20") public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishData, ImagePublishData._Fields>, java.io.Serializable, Cloneable, Comparable<ImagePublishData> { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ImagePublishData"); @@ -51,6 +51,7 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat private static final org.apache.thrift.protocol.TField VIRT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("virtId", org.apache.thrift.protocol.TType.STRING, (short)11); private static final org.apache.thrift.protocol.TField IS_TEMPLATE_FIELD_DESC = new org.apache.thrift.protocol.TField("isTemplate", org.apache.thrift.protocol.TType.BOOL, (short)12); private static final org.apache.thrift.protocol.TField OWNER_FIELD_DESC = new org.apache.thrift.protocol.TField("owner", org.apache.thrift.protocol.TType.STRUCT, (short)13); + private static final org.apache.thrift.protocol.TField MACHINE_DESCRIPTION_FIELD_DESC = new org.apache.thrift.protocol.TField("machineDescription", org.apache.thrift.protocol.TType.STRING, (short)14); private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); static { @@ -71,6 +72,7 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat public String virtId; // required public boolean isTemplate; // required public UserInfo owner; // required + public ByteBuffer machineDescription; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -86,7 +88,8 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat OS_ID((short)10, "osId"), VIRT_ID((short)11, "virtId"), IS_TEMPLATE((short)12, "isTemplate"), - OWNER((short)13, "owner"); + OWNER((short)13, "owner"), + MACHINE_DESCRIPTION((short)14, "machineDescription"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); @@ -127,6 +130,8 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat return IS_TEMPLATE; case 13: // OWNER return OWNER; + case 14: // MACHINE_DESCRIPTION + return MACHINE_DESCRIPTION; default: return null; } @@ -203,6 +208,8 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); tmpMap.put(_Fields.OWNER, new org.apache.thrift.meta_data.FieldMetaData("owner", org.apache.thrift.TFieldRequirementType.DEFAULT, new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, UserInfo.class))); + tmpMap.put(_Fields.MACHINE_DESCRIPTION, new org.apache.thrift.meta_data.FieldMetaData("machineDescription", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING , true))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ImagePublishData.class, metaDataMap); } @@ -223,7 +230,8 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat int osId, String virtId, boolean isTemplate, - UserInfo owner) + UserInfo owner, + ByteBuffer machineDescription) { this(); this.imageBaseId = imageBaseId; @@ -243,6 +251,7 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat this.isTemplate = isTemplate; setIsTemplateIsSet(true); this.owner = owner; + this.machineDescription = org.apache.thrift.TBaseHelper.copyBinary(machineDescription); } /** @@ -283,6 +292,9 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat if (other.isSetOwner()) { this.owner = new UserInfo(other.owner); } + if (other.isSetMachineDescription()) { + this.machineDescription = org.apache.thrift.TBaseHelper.copyBinary(other.machineDescription); + } } public ImagePublishData deepCopy() { @@ -308,6 +320,7 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat setIsTemplateIsSet(false); this.isTemplate = false; this.owner = null; + this.machineDescription = null; } public String getImageBaseId() { @@ -648,6 +661,40 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat } } + public byte[] getMachineDescription() { + setMachineDescription(org.apache.thrift.TBaseHelper.rightSize(machineDescription)); + return machineDescription == null ? null : machineDescription.array(); + } + + public ByteBuffer bufferForMachineDescription() { + return org.apache.thrift.TBaseHelper.copyBinary(machineDescription); + } + + public ImagePublishData setMachineDescription(byte[] machineDescription) { + this.machineDescription = machineDescription == null ? (ByteBuffer)null : ByteBuffer.wrap(Arrays.copyOf(machineDescription, machineDescription.length)); + return this; + } + + public ImagePublishData setMachineDescription(ByteBuffer machineDescription) { + this.machineDescription = org.apache.thrift.TBaseHelper.copyBinary(machineDescription); + return this; + } + + public void unsetMachineDescription() { + this.machineDescription = null; + } + + /** Returns true if field machineDescription is set (has been assigned a value) and false otherwise */ + public boolean isSetMachineDescription() { + return this.machineDescription != null; + } + + public void setMachineDescriptionIsSet(boolean value) { + if (!value) { + this.machineDescription = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case IMAGE_BASE_ID: @@ -754,6 +801,14 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat } break; + case MACHINE_DESCRIPTION: + if (value == null) { + unsetMachineDescription(); + } else { + setMachineDescription((ByteBuffer)value); + } + break; + } } @@ -798,6 +853,9 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat case OWNER: return getOwner(); + case MACHINE_DESCRIPTION: + return getMachineDescription(); + } throw new IllegalStateException(); } @@ -835,6 +893,8 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat return isSetIsTemplate(); case OWNER: return isSetOwner(); + case MACHINE_DESCRIPTION: + return isSetMachineDescription(); } throw new IllegalStateException(); } @@ -969,6 +1029,15 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat return false; } + boolean this_present_machineDescription = true && this.isSetMachineDescription(); + boolean that_present_machineDescription = true && that.isSetMachineDescription(); + if (this_present_machineDescription || that_present_machineDescription) { + if (!(this_present_machineDescription && that_present_machineDescription)) + return false; + if (!this.machineDescription.equals(that.machineDescription)) + return false; + } + return true; } @@ -1041,6 +1110,11 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat if (present_owner) list.add(owner); + boolean present_machineDescription = true && (isSetMachineDescription()); + list.add(present_machineDescription); + if (present_machineDescription) + list.add(machineDescription); + return list.hashCode(); } @@ -1182,6 +1256,16 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat return lastComparison; } } + lastComparison = Boolean.valueOf(isSetMachineDescription()).compareTo(other.isSetMachineDescription()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetMachineDescription()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.machineDescription, other.machineDescription); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -1289,6 +1373,14 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat sb.append(this.owner); } first = false; + if (!first) sb.append(", "); + sb.append("machineDescription:"); + if (this.machineDescription == null) { + sb.append("null"); + } else { + org.apache.thrift.TBaseHelper.toString(this.machineDescription, sb); + } + first = false; sb.append(")"); return sb.toString(); } @@ -1466,6 +1558,14 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 14: // MACHINE_DESCRIPTION + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.machineDescription = iprot.readBinary(); + struct.setMachineDescriptionIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1552,6 +1652,11 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat struct.owner.write(oprot); oprot.writeFieldEnd(); } + if (struct.machineDescription != null) { + oprot.writeFieldBegin(MACHINE_DESCRIPTION_FIELD_DESC); + oprot.writeBinary(struct.machineDescription); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -1609,7 +1714,10 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat if (struct.isSetOwner()) { optionals.set(12); } - oprot.writeBitSet(optionals, 13); + if (struct.isSetMachineDescription()) { + optionals.set(13); + } + oprot.writeBitSet(optionals, 14); if (struct.isSetImageBaseId()) { oprot.writeString(struct.imageBaseId); } @@ -1661,12 +1769,15 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat if (struct.isSetOwner()) { struct.owner.write(oprot); } + if (struct.isSetMachineDescription()) { + oprot.writeBinary(struct.machineDescription); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, ImagePublishData struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; - BitSet incoming = iprot.readBitSet(13); + BitSet incoming = iprot.readBitSet(14); if (incoming.get(0)) { struct.imageBaseId = iprot.readString(); struct.setImageBaseIdIsSet(true); @@ -1739,6 +1850,10 @@ public class ImagePublishData implements org.apache.thrift.TBase<ImagePublishDat struct.owner.read(iprot); struct.setOwnerIsSet(true); } + if (incoming.get(13)) { + struct.machineDescription = iprot.readBinary(); + struct.setMachineDescriptionIsSet(true); + } } } diff --git a/src/main/java/org/openslx/filetransfer/Listener.java b/src/main/java/org/openslx/filetransfer/Listener.java index e6bbb62..f7d4225 100644 --- a/src/main/java/org/openslx/filetransfer/Listener.java +++ b/src/main/java/org/openslx/filetransfer/Listener.java @@ -4,6 +4,11 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLServerSocketFactory; @@ -18,9 +23,10 @@ public class Listener private ServerSocket listenSocket = null; private Thread acceptThread = null; private final int readTimeoutMs; + private final ExecutorService processingPool = new ThreadPoolExecutor( 0, 8, 5, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() ); - private static final byte U = 85; // hex - code 'U' = 85. - private static final byte D = 68; // hex - code 'D' = 68. + private static final byte CONNECTING_PEER_WANTS_TO_UPLOAD = 85; // hex - code 'U' = 85. + private static final byte CONNECTING_PEER_WANTS_TO_DOWNLOAD = 68; // hex - code 'D' = 68. private static Logger log = Logger.getLogger( Listener.class ); /***********************************************************************/ @@ -48,8 +54,10 @@ public class Listener * connection, and start Downloader or Uploader. * */ - private boolean listen() + private synchronized boolean listen() { + if ( listenSocket != null ) + return true; try { if ( this.context == null ) { listenSocket = new ServerSocket(); @@ -61,61 +69,81 @@ public class Listener listenSocket.bind( new InetSocketAddress( this.port ) ); } catch ( Exception e ) { log.error( "Cannot listen on port " + this.port, e ); + listenSocket = null; return false; } return true; } - private void run() + private synchronized void run() { + if ( acceptThread != null ) + return; final Listener instance = this; acceptThread = new Thread( "BFTP-Listen-" + this.port ) { @Override public void run() { try { + // Run accept loop in own thread while ( !isInterrupted() ) { - Socket connectionSocket = null; + Socket acceptedSocket = null; try { - connectionSocket = listenSocket.accept(); + acceptedSocket = listenSocket.accept(); } catch ( SocketTimeoutException e ) { continue; } catch ( Exception e ) { log.warn( "Some exception when accepting! Trying to resume...", e ); Transfer.safeClose( listenSocket ); + listenSocket = null; if ( !listen() ) { log.error( "Could not re-open listening socket" ); break; } continue; } - try { - connectionSocket.setSoTimeout( 2000 ); // 2 second timeout enough? Maybe even use a small thread pool for handling accepted connections + // Handle each accepted connection in a thread pool + final Socket connection = acceptedSocket; + Runnable handler = new Runnable() { + @Override + public void run() + { - byte[] b = new byte[ 1 ]; - int length = connectionSocket.getInputStream().read( b ); - if ( length == -1 ) - continue; + try { + // Give initial byte signalling mode of operation 5 secs to arrive + connection.setSoTimeout( 5000 ); - connectionSocket.setSoTimeout( readTimeoutMs ); + byte[] b = new byte[ 1 ]; + int length = connection.getInputStream().read( b ); + if ( length == -1 ) { + Transfer.safeClose( connection ); + return; + } + // Byte arrived, now set desired timeout + connection.setSoTimeout( readTimeoutMs ); - if ( b[0] == U ) { - // --> start Downloader(socket). - Downloader d = new Downloader( connectionSocket ); - incomingEvent.incomingUploadRequest( d ); - } - else if ( b[0] == D ) { - // --> start Uploader(socket). - Uploader u = new Uploader( connectionSocket ); - incomingEvent.incomingDownloadRequest( u ); - } - else { - log.debug( "Got invalid init-byte ... close connection" ); - connectionSocket.close(); + if ( b[0] == CONNECTING_PEER_WANTS_TO_UPLOAD ) { + // --> start Downloader(socket). + Downloader d = new Downloader( connection ); + incomingEvent.incomingUploadRequest( d ); + } else if ( b[0] == CONNECTING_PEER_WANTS_TO_DOWNLOAD ) { + // --> start Uploader(socket). + Uploader u = new Uploader( connection ); + incomingEvent.incomingDownloadRequest( u ); + } else { + log.debug( "Got invalid init-byte ... close connection" ); + Transfer.safeClose( connection ); + } + } catch ( Exception e ) { + log.warn( "Error accepting client", e ); + Transfer.safeClose( connection ); + } } - } catch ( Exception e ) { - log.warn( "Error accepting client", e ); - Transfer.safeClose( connectionSocket ); + }; + try { + processingPool.execute( handler ); + } catch ( RejectedExecutionException e ) { + Transfer.safeClose( acceptedSocket ); } } } finally { diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java index 372b082..c497be0 100644 --- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java +++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java @@ -174,14 +174,14 @@ public class ChunkList * * @param c The chunk in question */ - public synchronized void markSuccessful( FileChunk c ) + public synchronized void markCompleted( FileChunk c, boolean hashCheckSuccessful ) { if ( !pendingChunks.remove( c ) ) { LOGGER.warn( "Inconsistent state: markSuccessful called for Chunk " + c.toString() + ", but chunk is not marked as currently transferring!" ); return; } - c.setStatus( ChunkStatus.COMPLETE ); + c.setStatus( ( hashCheckSuccessful || c.getSha1Sum() == null ) ? ChunkStatus.COMPLETE : ChunkStatus.HASHING ); completeChunks.add( c ); this.notifyAll(); } diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java index d9db7df..5fdf582 100644 --- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java +++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java @@ -105,13 +105,11 @@ public class HashChecker } } } - ChunkStatus old = chunk.getStatus(); chunk.setStatus( ChunkStatus.HASHING ); if ( blocking ) { queue.put( task ); } else { if ( !queue.offer( task ) ) { - chunk.setStatus( old ); return false; } } diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java index 04ddc17..c2d8ee9 100644 --- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java +++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java @@ -155,8 +155,9 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H return state; } - public synchronized TransferStatus getStatus() { - return new TransferStatus(chunks.getStatusArray(), getState()); + public synchronized TransferStatus getStatus() + { + return new TransferStatus( chunks.getStatusArray(), getState() ); } public final ChunkList getChunks() @@ -186,9 +187,10 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H chunks.updateSha1Sums( hashList ); if ( hashChecker == null ) return; - FileChunk chunk; - int cnt = 0; - while ( null != ( chunk = chunks.getUnhashedComplete() ) && ++cnt <= 3 ) { + for ( int cnt = 0; cnt < 3; ++cnt ) { + FileChunk chunk = chunks.getUnhashedComplete(); + if ( chunk == null ) + break; byte[] data; try { data = loadChunkFromFile( chunk ); @@ -200,14 +202,18 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } if ( data == null ) { LOGGER.warn( "blockhash update: Will mark unloadable unhashed chunk as valid :-(" ); - chunks.markSuccessful( chunk ); + chunks.markCompleted( chunk, true ); chunkStatusChanged( chunk ); continue; } try { - if ( !hashChecker.queue( chunk, data, this, false ) ) // false == blocked while adding, so stop + if ( !hashChecker.queue( chunk, data, this, false ) ) { // false == queue full, stop + chunks.markCompleted( chunk, false ); break; + } } catch ( InterruptedException e ) { + LOGGER.debug( "updateBlockHashList got interrupted" ); + chunks.markCompleted( chunk, false ); Thread.currentThread().interrupt(); return; } @@ -304,7 +310,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } else { // We have no hash checker or the hash for the current chunk is unknown - flush to disk writeFileData( currentChunk.range.startOffset, currentChunk.range.getLength(), buffer ); - chunks.markSuccessful( currentChunk ); + chunks.markCompleted( currentChunk, true ); chunkStatusChanged( currentChunk ); } } @@ -356,7 +362,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H chunks.markFailed( cbh.currentChunk ); chunkStatusChanged( cbh.currentChunk ); } - LOGGER.warn( "Download of " + getTmpFileName().getAbsolutePath() + " failed" ); + LOGGER.debug( "Connection for " + getTmpFileName().getAbsolutePath() + " dropped" ); } if ( state != TransferState.FINISHED && state != TransferState.ERROR ) { lastActivityTime.set( System.currentTimeMillis() ); @@ -368,7 +374,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H finishUploadInternal(); } else { // Keep pumping unhashed chunks into the hasher - queueUnhashedChunk(); + queueUnhashedChunk( true ); } } } ); @@ -443,7 +449,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H if ( !chunk.isWrittenToDisk() ) { writeFileData( chunk.range.startOffset, chunk.range.getLength(), data ); } - chunks.markSuccessful( chunk ); + chunks.markCompleted( chunk, true ); chunkStatusChanged( chunk ); if ( chunks.isComplete() ) { finishUploadInternal(); @@ -457,13 +463,13 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H break; } // A block finished, see if we can queue a new one - queueUnhashedChunk(); + queueUnhashedChunk( false ); } /** * Gets an unhashed chunk (if existent) and queues it for hashing */ - protected void queueUnhashedChunk() + protected void queueUnhashedChunk( boolean blocking ) { FileChunk chunk = chunks.getUnhashedComplete(); if ( chunk == null ) @@ -479,12 +485,12 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } if ( data == null ) { LOGGER.warn( "Cannot queue unhashed chunk: Will mark unloadable unhashed chunk as valid :-(" ); - chunks.markSuccessful( chunk ); + chunks.markCompleted( chunk, true ); chunkStatusChanged( chunk ); return; } try { - hashChecker.queue( chunk, data, this, true ); + hashChecker.queue( chunk, data, this, blocking ); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); } diff --git a/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java new file mode 100644 index 0000000..12deddb --- /dev/null +++ b/src/main/java/org/openslx/filetransfer/util/OutgoingTransferBase.java @@ -0,0 +1,121 @@ +package org.openslx.filetransfer.util; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.openslx.bwlp.thrift.iface.TransferInformation; +import org.openslx.filetransfer.Uploader; + +public abstract class OutgoingTransferBase extends AbstractTransfer +{ + + /* + * Constants + */ + + private static final Logger LOGGER = Logger.getLogger( OutgoingTransferBase.class ); + + private static final long INACTIVITY_TIMEOUT = TimeUnit.MINUTES.toMillis( 5 ); + + /* + * Overridable constants + */ + + protected static int MAX_CONNECTIONS_PER_TRANSFER = 2; + + /* + * Class members + */ + + /** + * Remote peer is downloading, so we have Uploaders + */ + private final List<Uploader> uploads = new ArrayList<>(); + + /** + * File being uploaded + */ + private final File sourceFile; + + private final TransferInformation transferInformation; + + public OutgoingTransferBase( String transferId, File sourceFile, int plainPort, int sslPort ) + { + super( transferId ); + this.sourceFile = sourceFile; + this.transferInformation = new TransferInformation( transferId, plainPort, sslPort ); + } + + /** + * Add another connection for this file transfer. + * + * @param connection + * @return true if the connection is accepted, false if it should be + * discarded + */ + public synchronized boolean addConnection( final Uploader connection, ExecutorService pool ) + { + synchronized ( uploads ) { + if ( uploads.size() >= MAX_CONNECTIONS_PER_TRANSFER ) + return false; + uploads.add( connection ); + } + return runConnectionInternal( connection, pool ); + } + + protected boolean runConnectionInternal( final Uploader connection, ExecutorService pool ) + { + try { + pool.execute( new Runnable() { + @Override + public void run() + { + boolean ret = connection.upload( sourceFile.getAbsolutePath() ); + synchronized ( uploads ) { + uploads.remove( connection ); + } + if ( ret && uploads.isEmpty() && potentialFinishTime.get() == 0 ) { + potentialFinishTime.set( System.currentTimeMillis() ); + } + lastActivityTime.set( System.currentTimeMillis() ); + } + } ); + } catch ( Exception e ) { + LOGGER.warn( "threadpool rejected the incoming file transfer", e ); + synchronized ( uploads ) { + uploads.remove( connection ); + } + return false; + } + return true; + } + + @Override + public TransferInformation getTransferInfo() + { + return transferInformation; + } + + @Override + public final boolean isActive() + { + return uploads.size() > 0 || lastActivityTime.get() + INACTIVITY_TIMEOUT > System.currentTimeMillis(); + } + + @Override + public void cancel() + { + // Void + } + + @Override + public final int getActiveConnectionCount() + { + return uploads.size(); + } + +} diff --git a/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java b/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java index 39accef..5cf0a10 100644 --- a/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java +++ b/src/main/java/org/openslx/thrifthelper/ImagePublishDataEx.java @@ -6,4 +6,5 @@ import org.openslx.bwlp.thrift.iface.ImagePublishData; public class ImagePublishDataEx extends ImagePublishData { public String exImagePath; + public boolean exIsValid; } diff --git a/src/main/thrift/bwlp.thrift b/src/main/thrift/bwlp.thrift index b0e866a..1389008 100644 --- a/src/main/thrift/bwlp.thrift +++ b/src/main/thrift/bwlp.thrift @@ -228,6 +228,7 @@ struct ImagePublishData { 11: string virtId, 12: bool isTemplate, 13: UserInfo owner, + 14: binary machineDescription, } struct NetRule { |