package org.openslx.imagemaster.db.mappers; import java.sql.SQLException; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import org.apache.log4j.Logger; import org.openslx.filetransfer.FileRange; import org.openslx.filetransfer.util.ChunkStatus; import org.openslx.filetransfer.util.FileChunk; import org.openslx.imagemaster.db.Database; import org.openslx.imagemaster.db.MysqlConnection; import org.openslx.imagemaster.db.MysqlStatement; public class DbImageBlock { private static final Logger LOGGER = Logger.getLogger( DbImageBlock.class ); private static AsyncThread asyncBlockUpdate = null; private static synchronized void initAsyncThread() { if ( asyncBlockUpdate == null ) { asyncBlockUpdate = new AsyncThread(); asyncBlockUpdate.start(); } } public static void asyncUpdate( String imageVersionId, FileChunk chunk ) throws InterruptedException { initAsyncThread(); asyncBlockUpdate.put( new ChunkUpdate( imageVersionId, chunk.range, chunk.getStatus() != ChunkStatus.COMPLETE ) ); } private static class AsyncThread extends Thread { private final ArrayBlockingQueue queue = new ArrayBlockingQueue<>( 100 ); public void put( ChunkUpdate chunk ) throws InterruptedException { queue.put( chunk ); } @Override public void run() { try { while ( !interrupted() ) { ChunkUpdate chunk = queue.take(); Thread.sleep( 100 ); try ( MysqlConnection connection = Database.getConnection() ) { MysqlStatement stmt = connection.prepareStatement( "UPDATE imageblock SET ismissing = :ismissing" + " WHERE imageversionid = :imageversionid AND startbyte = :startbyte AND blocksize = :blocksize" ); do { stmt.setBoolean( "ismissing", chunk.isMissing ); stmt.setString( "imageversionid", chunk.imageVersionId ); stmt.setLong( "startbyte", chunk.range.startOffset ); stmt.setInt( "blocksize", chunk.range.getLength() ); stmt.executeUpdate(); chunk = queue.poll(); } while ( chunk != null ); connection.commit(); } catch ( SQLException e ) { LOGGER.error( "Query failed in DbImageBlock.AsyncThread.run()", e ); continue; } Thread.sleep( 2000 ); } } catch ( InterruptedException e ) { LOGGER.debug( "async thread interrupted" ); interrupt(); } } } private static class ChunkUpdate { public final String imageVersionId; public final FileRange range; public final boolean isMissing; public ChunkUpdate( String imageVersionId, FileRange range, boolean isMissing ) { this.imageVersionId = imageVersionId; this.range = range; this.isMissing = isMissing; } } public static void insertChunkList( String imageVersionId, List all, boolean missing ) throws SQLException { try ( MysqlConnection connection = Database.getConnection() ) { MysqlStatement stmt = connection.prepareStatement( "INSERT IGNORE INTO imageblock" + " (imageversionid, startbyte, blocksize, blocksha1, ismissing) VALUES" + " (:imageversionid, :startbyte, :blocksize, :blocksha1, :ismissing)" ); stmt.setString( "imageversionid", imageVersionId ); stmt.setBoolean( "ismissing", missing ); for ( FileChunk chunk : all ) { stmt.setLong( "startbyte", chunk.range.startOffset ); stmt.setInt( "blocksize", chunk.range.getLength() ); stmt.setBinary( "blocksha1", chunk.getSha1Sum() ); stmt.executeUpdate(); } connection.commit(); } catch ( SQLException e ) { LOGGER.error( "Query failed in DbImageBlock.insertChunkList()", e ); throw e; } } }