package fileserv;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.log4j.Logger;
import org.openslx.filetransfer.DataReceivedCallback;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.FileRange;
import org.openslx.filetransfer.WantRangeCallback;
public class ActiveUpload {
private static final Logger LOGGER = Logger.getLogger(ActiveUpload.class);
/**
* This is an active upload, so on our end, we have a Downloader.
*/
private Downloader download = null;
private final String destinationFile;
private final RandomAccessFile outFile;
private ConcurrentLinkedQueue<FileChunk> chunks = new ConcurrentLinkedQueue<>();
// TODO: Hashlist for verification
public ActiveUpload(String destinationFile, long fileSize, List<byte[]> sha1Sums)
throws FileNotFoundException {
this.destinationFile = destinationFile;
outFile = new RandomAccessFile(destinationFile, "rw");
FileChunk.createChunkList(chunks, fileSize, sha1Sums);
}
/**
* Add another connection for this file transfer. Currently only one
* connection is allowed, but this might change in the future.
*
* @param connection
* @return true if the connection is accepted, false if it should be
* discarded
*/
public synchronized boolean addConnection(Downloader connection, ThreadPoolExecutor pool) {
if (download != null)
return false;
download = connection;
pool.execute(new Runnable() {
@Override
public void run() {
CbHandler cbh = new CbHandler();
if (!download.download(cbh, cbh) && cbh.currentChunk != null) {
// If the download failed and we have a current chunk, put it back into
// the queue, so it will be handled again later...
chunks.add(cbh.currentChunk);
}
}
});
return true;
}
/**
* Write some data to the local file. Thread safe so we could
* have multiple concurrent connections later.
*
* @param fileOffset
* @param dataLength
* @param data
* @return
*/
private boolean writeFileData(long fileOffset, int dataLength, byte[] data) {
synchronized (outFile) {
try {
outFile.seek(fileOffset);
outFile.write(data, 0, dataLength);
} catch (IOException e) {
LOGGER.error("Cannot write to '" + destinationFile
+ "'. Disk full, network storage error, bad permissions, ...?", e);
return false;
}
}
return true;
}
/**
* Callback class for an instance of the Downloader, which supplies
* the Downloader with wanted file ranges, and handles incoming data.
*/
private class CbHandler implements WantRangeCallback, DataReceivedCallback {
/**
* The current chunk being transfered.
*/
public FileChunk currentChunk = null;
@Override
public boolean dataReceived(long fileOffset, int dataLength, byte[] data) {
// TODO: Maybe cache in RAM and write full CHUNK_SIZE blocks at a time?
// Would probably help with slower storage, especially if it's using
// rotating disks and we're running multiple uploads.
// Also we wouldn't have to re-read a block form disk for sha1 checking.
return writeFileData(fileOffset, dataLength, data);
}
@Override
public FileRange get() {
if (currentChunk != null) {
// TODO: A chunk was requested before, check hash and requeue if not matching
// This needs to be async (own thread) so will be a little complicated
}
// Get next missing chunk
currentChunk = chunks.poll();
if (currentChunk == null)
return null; // No more chunks, returning null tells the Downloader we're done.
return currentChunk.range;
}
}
}