package org.openslx.imagemaster.db.mappers; import java.nio.ByteBuffer; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import org.apache.log4j.Logger; import org.openslx.filetransfer.FileRange; import org.openslx.filetransfer.util.ChunkStatus; import org.openslx.filetransfer.util.FileChunk; import org.openslx.imagemaster.db.Database; import org.openslx.imagemaster.db.MysqlConnection; import org.openslx.imagemaster.db.MysqlStatement; public class DbImageBlock { private static final Logger LOGGER = Logger.getLogger( DbImageBlock.class ); private static AsyncThread asyncBlockUpdate = null; private static synchronized void initAsyncThread() { if ( asyncBlockUpdate == null ) { asyncBlockUpdate = new AsyncThread(); asyncBlockUpdate.start(); } } public static void asyncUpdate( String imageVersionId, FileChunk chunk ) throws InterruptedException { initAsyncThread(); asyncBlockUpdate.put( new ChunkUpdate( imageVersionId, chunk.range, chunk.getStatus() != ChunkStatus.COMPLETE ) ); } private static class AsyncThread extends Thread { private final ArrayBlockingQueue queue = new ArrayBlockingQueue<>( 100 ); public void put( ChunkUpdate chunk ) throws InterruptedException { queue.put( chunk ); } @Override public void run() { try { while ( !interrupted() ) { ChunkUpdate chunk = queue.take(); Thread.sleep( 100 ); try ( MysqlConnection connection = Database.getConnection() ) { MysqlStatement stmt = connection.prepareStatement( "UPDATE imageblock SET ismissing = :ismissing" + " WHERE imageversionid = :imageversionid AND startbyte = :startbyte AND blocksize = :blocksize" ); do { stmt.setBoolean( "ismissing", chunk.isMissing ); stmt.setString( "imageversionid", chunk.imageVersionId ); stmt.setLong( "startbyte", chunk.range.startOffset ); stmt.setInt( "blocksize", chunk.range.getLength() ); stmt.executeUpdate(); chunk = queue.poll(); } while ( chunk != null ); connection.commit(); } catch ( SQLException e ) { LOGGER.error( "Query failed in DbImageBlock.AsyncThread.run()", e ); continue; } Thread.sleep( 2000 ); } } catch ( InterruptedException e ) { LOGGER.debug( "async thread interrupted" ); interrupt(); } } } private static class ChunkUpdate { public final String imageVersionId; public final FileRange range; public final boolean isMissing; public ChunkUpdate( String imageVersionId, FileRange range, boolean isMissing ) { this.imageVersionId = imageVersionId; this.range = range; this.isMissing = isMissing; } } public static void insertChunkList( String imageVersionId, List all, boolean missing ) throws SQLException { try ( MysqlConnection connection = Database.getConnection() ) { MysqlStatement stmt = connection.prepareStatement( "INSERT IGNORE INTO imageblock" + " (imageversionid, startbyte, blocksize, blocksha1, ismissing) VALUES" + " (:imageversionid, :startbyte, :blocksize, :blocksha1, :ismissing)" ); stmt.setString( "imageversionid", imageVersionId ); stmt.setBoolean( "ismissing", missing ); for ( FileChunk chunk : all ) { stmt.setLong( "startbyte", chunk.range.startOffset ); stmt.setInt( "blocksize", chunk.range.getLength() ); stmt.setBinary( "blocksha1", chunk.getSha1Sum() ); stmt.executeUpdate(); } connection.commit(); } catch ( SQLException e ) { LOGGER.error( "Query failed in DbImageBlock.insertChunkList()", e ); throw e; } } public static List getMissingStatusList( String imageVersionId ) throws SQLException { try ( MysqlConnection connection = Database.getConnection() ) { MysqlStatement stmt = connection.prepareStatement( "SELECT startbyte, ismissing FROM imageblock" + " WHERE imageversionid = :imageversionid ORDER BY startbyte ASC" ); stmt.setString( "imageversionid", imageVersionId ); ResultSet rs = stmt.executeQuery(); List list = new ArrayList<>(); long expectedOffset = 0; while ( rs.next() ) { long currentOffset = rs.getLong( "startbyte" ); if ( currentOffset < expectedOffset ) continue; while ( currentOffset > expectedOffset ) { list.add( Boolean.TRUE ); expectedOffset += FileChunk.CHUNK_SIZE; } if ( currentOffset == expectedOffset ) { list.add( rs.getBoolean( "ismissing" ) ); expectedOffset += FileChunk.CHUNK_SIZE; } } return list; } catch ( SQLException e ) { LOGGER.error( "Query failed in DbImageBlock.getBlockStatuses()", e ); throw e; } } public static List getBlockHashes( String imageVersionId ) throws SQLException { try ( MysqlConnection connection = Database.getConnection() ) { return getBlockHashes( connection, imageVersionId ); } catch ( SQLException e ) { LOGGER.error( "Query failed in DbImageBlock.getBlockHashes()", e ); throw e; } } private static List getBlockHashes( MysqlConnection connection, String imageVersionId ) throws SQLException { MysqlStatement stmt = connection.prepareStatement( "SELECT startbyte, blocksha1 FROM imageblock" + " WHERE imageversionid = :imageversionid ORDER BY startbyte ASC" ); stmt.setString( "imageversionid", imageVersionId ); ResultSet rs = stmt.executeQuery(); List list = new ArrayList<>(); long expectedOffset = 0; while ( rs.next() ) { long currentOffset = rs.getLong( "startbyte" ); if ( currentOffset < expectedOffset ) continue; while ( currentOffset > expectedOffset ) { list.add( null ); expectedOffset += FileChunk.CHUNK_SIZE; } if ( currentOffset == expectedOffset ) { list.add( ByteBuffer.wrap( rs.getBytes( "blocksha1" ) ) ); expectedOffset += FileChunk.CHUNK_SIZE; } } return list; } }