package org.openslx.bwlp.sat.database.mappers; import java.nio.ByteBuffer; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import org.apache.log4j.Logger; import org.openslx.bwlp.sat.database.Database; import org.openslx.bwlp.sat.database.MysqlConnection; import org.openslx.bwlp.sat.database.MysqlStatement; import org.openslx.bwlp.sat.util.FileSystem; import org.openslx.filetransfer.FileRange; import org.openslx.filetransfer.LocalChunkSource.ChunkSource; import org.openslx.filetransfer.util.ChunkStatus; import org.openslx.filetransfer.util.FileChunk; public class DbImageBlock { private static final Logger LOGGER = Logger.getLogger(DbImageBlock.class); private static AsyncThread asyncBlockUpdate = null; private static synchronized void initAsyncThread() { if (asyncBlockUpdate == null) { asyncBlockUpdate = new AsyncThread(); asyncBlockUpdate.start(); } } public static void asyncUpdate(String imageVersionId, FileChunk chunk) throws InterruptedException { initAsyncThread(); asyncBlockUpdate.put(new ChunkUpdate(imageVersionId, chunk.range, chunk.getStatus() != ChunkStatus.COMPLETE)); } private static class AsyncThread extends Thread { private final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(100); public AsyncThread() { super("DbBlockUpdater"); } public void put(ChunkUpdate chunk) throws InterruptedException { queue.put(chunk); } @Override public void run() { try { while (!interrupted()) { ChunkUpdate chunk = queue.take(); Thread.sleep(100); try (MysqlConnection connection = Database.getConnection()) { MysqlStatement stmt = connection.prepareStatement("UPDATE imageblock SET ismissing = :ismissing" + " WHERE imageversionid = :imageversionid AND startbyte = :startbyte AND blocksize = :blocksize"); do { stmt.setBoolean("ismissing", chunk.isMissing); stmt.setString("imageversionid", chunk.imageVersionId); stmt.setLong("startbyte", chunk.range.startOffset); stmt.setInt("blocksize", chunk.range.getLength()); stmt.executeUpdate(); chunk = queue.poll(); } while (chunk != null); connection.commit(); } catch (SQLException e) { LOGGER.error("Query failed in DbImageBlock.AsyncThread.run()", e); continue; } Thread.sleep(2000); } } catch (InterruptedException e) { LOGGER.debug("async thread interrupted"); interrupt(); } } } private static class ChunkUpdate { public final String imageVersionId; public final FileRange range; public final boolean isMissing; public ChunkUpdate(String imageVersionId, FileRange range, boolean isMissing) { this.imageVersionId = imageVersionId; this.range = range; this.isMissing = isMissing; } } public static void insertChunkList(String imageVersionId, List all, boolean missing) throws SQLException { try (MysqlConnection connection = Database.getConnection()) { MysqlStatement stmt = connection.prepareStatement("INSERT IGNORE INTO imageblock" + " (imageversionid, startbyte, blocksize, blocksha1, ismissing) VALUES" + " (:imageversionid, :startbyte, :blocksize, :blocksha1, :ismissing)"); stmt.setString("imageversionid", imageVersionId); stmt.setBoolean("ismissing", missing); for (FileChunk chunk : all) { stmt.setLong("startbyte", chunk.range.startOffset); stmt.setInt("blocksize", chunk.range.getLength()); stmt.setBinary("blocksha1", chunk.getSha1Sum()); stmt.executeUpdate(); } connection.commit(); } catch (SQLException e) { LOGGER.error("Query failed in DbImageBlock.insertChunkList()", e); throw e; } } /** * Get list of block hashes for an image version id. The hashes, as usual, * refer to 16MiB blocks. If hashes are missing, nulls will be inserted into * the list, since otherwise there is no way to reconstruct the offset of * the block in the file. Note however that missing hashes at the end of the * list will not be added as nulls, so there still could be less hashes in * the list than blocks in the file. */ static List getBlockHashes(MysqlConnection connection, String imageVersionId) throws SQLException { MysqlStatement stmt = connection.prepareStatement("SELECT startbyte, blocksha1 FROM imageblock" + " WHERE imageversionid = :imageversionid ORDER BY startbyte ASC"); stmt.setString("imageversionid", imageVersionId); ResultSet rs = stmt.executeQuery(); List list = new ArrayList<>(); long expectedOffset = 0; while (rs.next()) { long currentOffset = rs.getLong("startbyte"); if (currentOffset < expectedOffset) continue; while (currentOffset > expectedOffset) { list.add(null); expectedOffset += FileChunk.CHUNK_SIZE; } if (currentOffset == expectedOffset) { list.add(ByteBuffer.wrap(rs.getBytes("blocksha1"))); expectedOffset += FileChunk.CHUNK_SIZE; } } return list; } public static List getBlockHashes(String imageVersionId) throws SQLException { try (MysqlConnection connection = Database.getConnection()) { return getBlockHashes(connection, imageVersionId); } catch (SQLException e) { LOGGER.error("Query failed in DbImage.getBlockHashes()", e); throw e; } } public static List getMissingStatusList(String imageVersionId) throws SQLException { try (MysqlConnection connection = Database.getConnection()) { MysqlStatement stmt = connection.prepareStatement("SELECT startbyte, ismissing FROM imageblock" + " WHERE imageversionid = :imageversionid ORDER BY startbyte ASC"); stmt.setString("imageversionid", imageVersionId); ResultSet rs = stmt.executeQuery(); List list = new ArrayList<>(); long expectedOffset = 0; while (rs.next()) { long currentOffset = rs.getLong("startbyte"); if (currentOffset < expectedOffset) continue; while (currentOffset > expectedOffset) { list.add(Boolean.TRUE); expectedOffset += FileChunk.CHUNK_SIZE; } if (currentOffset == expectedOffset) { list.add(rs.getBoolean("ismissing")); expectedOffset += FileChunk.CHUNK_SIZE; } } return list; } catch (SQLException e) { LOGGER.error("Query failed in DbImageBlock.getBlockStatuses()", e); throw e; } } public static List getBlocksWithHash(List sums) throws SQLException { List list = null; try (MysqlConnection connection = Database.getConnection()) { MysqlStatement stmt = connection.prepareStatement("SELECT startbyte, blocksize, filepath FROM imageblock" + " INNER JOIN imageversion USING (imageversionid)" + " WHERE blocksha1 = :sha1 GROUP BY imageversionid"); for (byte[] sha1 : sums) { stmt.setBinary("sha1", sha1); ResultSet rs = stmt.executeQuery(); if (!rs.next()) continue; ChunkSource cs = new ChunkSource(sha1); do { cs.addFile(FileSystem.composeAbsolutePath(rs.getString("filepath")).getAbsolutePath(), rs.getLong("startbyte"), rs.getInt("blocksize")); } while (rs.next()); if (list == null) { list = new ArrayList<>(); } list.add(cs); } } catch (SQLException e) { LOGGER.error("Query failed in DbImageBlock.getBlocksWithHash()", e); throw e; } return list; } }