From f5618c87e63deb99920710787f6dcd34d4b17425 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Wed, 13 Apr 2016 18:41:29 +0200 Subject: (WiP) Global image sync --- extras/database.sql | 236 ++++++++++++++++----- .../java/org/openslx/imagemaster/Constants.java | 8 + src/main/java/org/openslx/imagemaster/Globals.java | 17 ++ .../java/org/openslx/imagemaster/db/Database.java | 10 + .../openslx/imagemaster/db/mappers/DbImage.java | 87 +++++++- .../imagemaster/db/mappers/DbImageBlock.java | 112 ++++++++++ .../org/openslx/imagemaster/db/mappers/DbUser.java | 35 +-- .../openslx/imagemaster/db/models/LocalUser.java | 6 + .../serverconnection/AbstractTransfer.java | 92 -------- .../serverconnection/ConnectionHandler.java | 172 +++++++-------- .../serverconnection/IncomingTransfer.java | 81 +++++++ .../serverconnection/PrioThreadFactory.java | 24 +++ .../openslx/imagemaster/session/Authenticator.java | 2 +- .../imagemaster/session/SessionManager.java | 2 +- .../imagemaster/thrift/server/BinaryListener.java | 2 +- .../thrift/server/MasterServerHandler.java | 117 +++++++++- .../java/org/openslx/imagemaster/util/Util.java | 29 ++- 17 files changed, 769 insertions(+), 263 deletions(-) create mode 100644 src/main/java/org/openslx/imagemaster/Constants.java create mode 100644 src/main/java/org/openslx/imagemaster/db/mappers/DbImageBlock.java delete mode 100644 src/main/java/org/openslx/imagemaster/serverconnection/AbstractTransfer.java create mode 100644 src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java create mode 100644 src/main/java/org/openslx/imagemaster/serverconnection/PrioThreadFactory.java diff --git a/extras/database.sql b/extras/database.sql index aec8e17..54d582b 100644 --- a/extras/database.sql +++ b/extras/database.sql @@ -1,8 +1,8 @@ --- MySQL dump 10.13 Distrib 5.5.40, for debian-linux-gnu (x86_64) +-- MySQL dump 10.13 Distrib 5.5.47, for debian-linux-gnu (i686) -- --- Host: localhost Database: master-test +-- Host: localhost Database: bwlp -- ------------------------------------------------------ --- Server version 5.5.40-0ubuntu0.12.04.1 +-- Server version 5.5.47-0+deb7u1 /*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */; /*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */; @@ -16,63 +16,154 @@ /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; -- --- Table structure for table `image` +-- Table structure for table `imagebase` -- -DROP TABLE IF EXISTS `image`; +DROP TABLE IF EXISTS `imagebase`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */; -CREATE TABLE `image` ( - `uuid` varchar(36) COLLATE utf8_unicode_ci NOT NULL, - `revision` int(10) unsigned NOT NULL, - `title` varchar(200) COLLATE utf8_unicode_ci NOT NULL, - `path` varchar(255) COLLATE utf8_unicode_ci NOT NULL, - `createtime` int(10) unsigned NOT NULL, - `updatetime` int(10) unsigned NOT NULL, - `ownerid` int(10) unsigned NOT NULL, - `operatingsystem` int(10) unsigned NOT NULL, +CREATE TABLE `imagebase` ( + `imagebaseid` char(36) CHARACTER SET ascii COLLATE ascii_bin NOT NULL, + `latestversionid` char(36) CHARACTER SET ascii COLLATE ascii_bin DEFAULT NULL, + `displayname` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL, + `description` text COLLATE utf8mb4_unicode_ci, + `osid` int(11) DEFAULT NULL, + `virtid` varchar(10) COLLATE utf8mb4_unicode_ci DEFAULT NULL, + `createtime` bigint(20) NOT NULL, + `updatetime` bigint(20) NOT NULL, + `ownerid` char(36) CHARACTER SET ascii COLLATE ascii_bin NOT NULL, + `updaterid` char(36) CHARACTER SET ascii COLLATE ascii_bin NOT NULL, + `istemplate` tinyint(1) NOT NULL, + PRIMARY KEY (`imagebaseid`), + KEY `owner` (`ownerid`), + KEY `fk_imagebase_1_idx` (`osid`), + KEY `fk_imagebase_updater_idx` (`updaterid`), + KEY `fk_imagebase_1_idx1` (`virtid`), + KEY `latestversion_idx` (`latestversionid`), + CONSTRAINT `imagebase_ibfk_1` FOREIGN KEY (`virtid`) REFERENCES `virtualizer` (`virtid`), + CONSTRAINT `imagebase_ibfk_2` FOREIGN KEY (`osid`) REFERENCES `operatingsystem` (`osid`), + CONSTRAINT `imagebase_ibfk_3` FOREIGN KEY (`ownerid`) REFERENCES `user` (`userid`), + CONSTRAINT `imagebase_ibfk_4` FOREIGN KEY (`updaterid`) REFERENCES `user` (`userid`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; +/*!40101 SET character_set_client = @saved_cs_client */; + +-- +-- Table structure for table `imageversion` +-- + +DROP TABLE IF EXISTS `imageversion`; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE `imageversion` ( + `imageversionid` char(36) CHARACTER SET ascii COLLATE ascii_bin NOT NULL, + `imagebaseid` char(36) CHARACTER SET ascii COLLATE ascii_bin DEFAULT NULL, + `createtime` bigint(20) NOT NULL, + `expiretime` bigint(20) NOT NULL, + `filesize` bigint(20) NOT NULL, + `filepath` varchar(200) COLLATE utf8mb4_unicode_ci NOT NULL, + `uploaderid` char(36) CHARACTER SET ascii COLLATE ascii_bin NOT NULL, `isvalid` tinyint(1) NOT NULL, - `isdeleted` tinyint(1) unsigned NOT NULL, - `description` text COLLATE utf8_unicode_ci NOT NULL, - `filesize` bigint(20) unsigned NOT NULL, - `missingblocks` text COLLATE utf8_unicode_ci NOT NULL, - PRIMARY KEY (`uuid`,`revision`), - KEY `ownerid` (`ownerid`), - CONSTRAINT `image_ibfk_1` FOREIGN KEY (`ownerid`) REFERENCES `user` (`userid`) ON UPDATE CASCADE -) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci; + `isprocessed` tinyint(1) NOT NULL, + `mastersha1` binary(20) DEFAULT NULL, + `virtualizerconfig` blob COMMENT 'Specific configuration of the virtualizer for this image. For vmware, this is basically a dump of the *.vmx.', + PRIMARY KEY (`imageversionid`), + KEY `version_access` (`imagebaseid`,`isvalid`,`createtime`), + KEY `fk_imageversion_2_idx` (`uploaderid`), + KEY `expire_index` (`expiretime`), + CONSTRAINT `imageversion_ibfk_1` FOREIGN KEY (`imagebaseid`) REFERENCES `imagebase` (`imagebaseid`), + CONSTRAINT `imageversion_ibfk_2` FOREIGN KEY (`uploaderid`) REFERENCES `user` (`userid`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; /*!40101 SET character_set_client = @saved_cs_client */; -- --- Table structure for table `satellite` +-- Table structure for table `operatingsystem` -- -DROP TABLE IF EXISTS `satellite`; +DROP TABLE IF EXISTS `operatingsystem`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */; -CREATE TABLE `satellite` ( - `organizationid` varchar(32) NOT NULL, - `address` varchar(64) NOT NULL, - `name` varchar(255) NOT NULL, - `authmethod` varchar(255) NOT NULL, - `publickey` text, +CREATE TABLE `operatingsystem` ( + `osid` int(11) NOT NULL AUTO_INCREMENT, + `displayname` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL, + `architecture` varchar(14) COLLATE utf8mb4_unicode_ci NOT NULL, + `maxmem` int(11) NOT NULL DEFAULT '0', + `maxcpu` int(11) NOT NULL DEFAULT '0', + PRIMARY KEY (`osid`) +) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; +/*!40101 SET character_set_client = @saved_cs_client */; + +-- +-- Table structure for table `organization` +-- + +DROP TABLE IF EXISTS `organization`; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE `organization` ( + `organizationid` char(36) CHARACTER SET ascii COLLATE ascii_bin NOT NULL, + `name` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, + `authmethod` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL, + `publickey` text COLLATE utf8mb4_unicode_ci, PRIMARY KEY (`organizationid`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8; +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; /*!40101 SET character_set_client = @saved_cs_client */; -- --- Table structure for table `satellite_suffix` +-- Table structure for table `organization_suffix` -- -DROP TABLE IF EXISTS `satellite_suffix`; +DROP TABLE IF EXISTS `organization_suffix`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */; -CREATE TABLE `satellite_suffix` ( - `organizationid` varchar(32) NOT NULL, - `suffix` varchar(32) NOT NULL, +CREATE TABLE `organization_suffix` ( + `organizationid` char(36) CHARACTER SET ascii COLLATE ascii_bin NOT NULL, + `suffix` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL, PRIMARY KEY (`suffix`), KEY `organizationid` (`organizationid`), - CONSTRAINT `satellite_suffix_ibfk_1` FOREIGN KEY (`organizationid`) REFERENCES `satellite` (`organizationid`) ON UPDATE CASCADE -) ENGINE=InnoDB DEFAULT CHARSET=utf8; + CONSTRAINT `organization_suffix_ibfk_1` FOREIGN KEY (`organizationid`) REFERENCES `organization` (`organizationid`) ON DELETE CASCADE ON UPDATE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; +/*!40101 SET character_set_client = @saved_cs_client */; + +-- +-- Table structure for table `os_x_virt` +-- + +DROP TABLE IF EXISTS `os_x_virt`; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE `os_x_virt` ( + `osid` int(11) NOT NULL, + `virtid` varchar(10) COLLATE utf8mb4_unicode_ci NOT NULL, + `virtoskeyword` varchar(30) COLLATE utf8mb4_unicode_ci NOT NULL, + PRIMARY KEY (`osid`,`virtid`), + KEY `virtoskeyword` (`virtoskeyword`), + KEY `virtid` (`virtid`), + CONSTRAINT `os_x_virt_ibfk_2` FOREIGN KEY (`virtid`) REFERENCES `virtualizer` (`virtid`) ON DELETE CASCADE, + CONSTRAINT `os_x_virt_ibfk_3` FOREIGN KEY (`osid`) REFERENCES `operatingsystem` (`osid`) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; +/*!40101 SET character_set_client = @saved_cs_client */; + +-- +-- Table structure for table `satellite` +-- + +DROP TABLE IF EXISTS `satellite`; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE `satellite` ( + `satelliteid` int(11) NOT NULL AUTO_INCREMENT, + `organizationid` char(36) CHARACTER SET ascii COLLATE ascii_bin NOT NULL, + `satellitename` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL, + `addresses` varchar(1000) COLLATE utf8mb4_unicode_ci NOT NULL, + `certsha256` binary(32) DEFAULT NULL, + `publickey` text COLLATE utf8mb4_unicode_ci NOT NULL, + `dateline` bigint(20) NOT NULL, + `userid` varchar(36) CHARACTER SET ascii COLLATE ascii_bin DEFAULT NULL, + PRIMARY KEY (`satelliteid`), + UNIQUE KEY `organizationid` (`organizationid`,`satellitename`), + KEY `dateline` (`dateline`), + CONSTRAINT `satellite_ibfk_1` FOREIGN KEY (`organizationid`) REFERENCES `organization` (`organizationid`) +) ENGINE=InnoDB AUTO_INCREMENT=24 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; /*!40101 SET character_set_client = @saved_cs_client */; -- @@ -83,19 +174,68 @@ DROP TABLE IF EXISTS `user`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */; CREATE TABLE `user` ( - `userid` int(10) unsigned NOT NULL AUTO_INCREMENT, - `login` varchar(48) DEFAULT NULL, + `userid` char(36) CHARACTER SET ascii COLLATE ascii_bin NOT NULL, + `shibid` varchar(36) CHARACTER SET ascii COLLATE ascii_bin DEFAULT NULL, `password` varchar(255) CHARACTER SET ascii COLLATE ascii_bin NOT NULL, - `organizationid` varchar(32) NOT NULL, - `firstname` varchar(32) NOT NULL, - `lastname` varchar(32) NOT NULL, - `email` varchar(48) NOT NULL, + `organizationid` char(36) CHARACTER SET ascii COLLATE ascii_bin NOT NULL, + `firstname` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL, + `lastname` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL, + `email` varchar(100) COLLATE utf8mb4_unicode_ci DEFAULT NULL, + `fixedname` tinyint(1) NOT NULL, PRIMARY KEY (`userid`), - UNIQUE KEY `username` (`organizationid`,`login`), - CONSTRAINT `user_ibfk_1` FOREIGN KEY (`organizationid`) REFERENCES `satellite` (`organizationid`) ON UPDATE CASCADE -) ENGINE=InnoDB DEFAULT CHARSET=utf8; + UNIQUE KEY `username` (`organizationid`,`userid`), + UNIQUE KEY `shibid` (`shibid`), + CONSTRAINT `user_ibfk_1` FOREIGN KEY (`organizationid`) REFERENCES `organization` (`organizationid`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; +/*!40101 SET character_set_client = @saved_cs_client */; + +-- +-- Table structure for table `virtualizer` +-- + +DROP TABLE IF EXISTS `virtualizer`; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE `virtualizer` ( + `virtid` varchar(10) COLLATE utf8mb4_unicode_ci NOT NULL, + `virtname` varchar(20) COLLATE utf8mb4_unicode_ci NOT NULL, + PRIMARY KEY (`virtid`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; /*!40101 SET character_set_client = @saved_cs_client */; +-- +-- Table structure for table `websession` +-- + +DROP TABLE IF EXISTS `websession`; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE `websession` ( + `sid` char(40) CHARACTER SET ascii COLLATE ascii_bin NOT NULL, + `dateline` bigint(11) NOT NULL, + `data` varchar(3000) COLLATE utf8mb4_unicode_ci NOT NULL, + PRIMARY KEY (`sid`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; +/*!40101 SET character_set_client = @saved_cs_client */; + +-- +-- Table structure for table `imageblock` +-- + +CREATE TABLE `imageblock` ( + `imageversionid` char(36) CHARACTER SET ascii COLLATE ascii_bin NOT NULL, + `startbyte` bigint(20) NOT NULL, + `blocksize` int(11) NOT NULL, + `blocksha1` binary(20) DEFAULT NULL, + `ismissing` tinyint(1) NOT NULL COMMENT 'true if this block is missing from the file, either because it was not transferred to the server yet, or because it failed an integrity check.', + PRIMARY KEY (`imageversionid`,`startbyte`,`blocksize`), + KEY `checksums` (`blocksha1`,`blocksize`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; + +ALTER TABLE `imageblock` + ADD CONSTRAINT `fk_imageblocksha1_1` FOREIGN KEY (`imageversionid`) REFERENCES `imageversion` (`imageversionid`) ON DELETE CASCADE ON UPDATE CASCADE; + + /*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; /*!40101 SET SQL_MODE=@OLD_SQL_MODE */; @@ -106,4 +246,4 @@ CREATE TABLE `user` ( /*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */; /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */; --- Dump completed on 2014-10-20 15:11:09 +-- Dump completed on 2016-04-04 11:35:20 diff --git a/src/main/java/org/openslx/imagemaster/Constants.java b/src/main/java/org/openslx/imagemaster/Constants.java new file mode 100644 index 0000000..1d2973b --- /dev/null +++ b/src/main/java/org/openslx/imagemaster/Constants.java @@ -0,0 +1,8 @@ +package org.openslx.imagemaster; + +public class Constants +{ + + public static final int HASHCHECK_QUEUE_LEN = 6; + +} diff --git a/src/main/java/org/openslx/imagemaster/Globals.java b/src/main/java/org/openslx/imagemaster/Globals.java index 852d8dd..aebc198 100644 --- a/src/main/java/org/openslx/imagemaster/Globals.java +++ b/src/main/java/org/openslx/imagemaster/Globals.java @@ -1,6 +1,7 @@ package org.openslx.imagemaster; import java.io.BufferedInputStream; +import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.Properties; @@ -16,6 +17,7 @@ public class Globals private static Logger LOGGER = Logger.getLogger( Globals.class ); private static final Properties properties = new Properties(); + private static File imgPath = null; /* CONSTANTS */ /** @@ -118,6 +120,12 @@ public class Globals return Util.tryToParseInt( properties.getProperty( "thrift.port.plain" ) ); } + public static long getImageValiditySeconds() + { + // TODO Auto-generated method stub + return 86400l * 500; + } + /* STRINGS */ public static String getImageDir() @@ -139,4 +147,13 @@ public class Globals { return properties.getProperty( "ssl.keystore.password" ); } + + public static File getImagePath() + { + if ( imgPath == null ) { + imgPath = new File( getImageDir() ); + } + return imgPath; + } + } diff --git a/src/main/java/org/openslx/imagemaster/db/Database.java b/src/main/java/org/openslx/imagemaster/db/Database.java index 76c44fc..765f782 100644 --- a/src/main/java/org/openslx/imagemaster/db/Database.java +++ b/src/main/java/org/openslx/imagemaster/db/Database.java @@ -144,6 +144,16 @@ public class Database pool.add( connection ); } + /** + * Return true if the given sql exception is "duplicate entry XXXX for key YYYY. + */ + public static boolean isDuplicateKeyException( SQLException e ) + { + return e != null && e.getErrorCode() == 1062; + } + + // + public static void printCharsetInformation() { LOGGER.info( "MySQL charset related variables:" ); diff --git a/src/main/java/org/openslx/imagemaster/db/mappers/DbImage.java b/src/main/java/org/openslx/imagemaster/db/mappers/DbImage.java index 2f5394c..f4c3ddc 100644 --- a/src/main/java/org/openslx/imagemaster/db/mappers/DbImage.java +++ b/src/main/java/org/openslx/imagemaster/db/mappers/DbImage.java @@ -1,7 +1,17 @@ package org.openslx.imagemaster.db.mappers; -import org.openslx.bwlp.thrift.iface.ImagePublishData; +import java.sql.ResultSet; +import java.sql.SQLException; +import org.apache.log4j.Logger; +import org.openslx.bwlp.thrift.iface.ImagePublishData; +import org.openslx.bwlp.thrift.iface.InvocationError; +import org.openslx.bwlp.thrift.iface.TInvocationException; +import org.openslx.imagemaster.Globals; +import org.openslx.imagemaster.db.Database; +import org.openslx.imagemaster.db.MysqlConnection; +import org.openslx.imagemaster.db.MysqlStatement; +import org.openslx.util.Util; /** * Representing an image in the database. @@ -10,10 +20,83 @@ import org.openslx.bwlp.thrift.iface.ImagePublishData; public class DbImage { + private static final Logger LOGGER = Logger.getLogger( DbImage.class ); + public static ImagePublishData getImageVersion( String imageVersionId ) { - // TODO Auto-generated method stub return null; } + public static void createImageBase( ImagePublishData img ) throws TInvocationException + { + // Input seems valid + try ( MysqlConnection connection = Database.getConnection() ) { + MysqlStatement stmt = connection.prepareStatement( "SELECT virtid FROM imagebase WHERE imagebaseid = :baseid" ); + stmt.setString( "baseid", img.imageBaseId ); + ResultSet rs = stmt.executeQuery(); + if ( rs.next() ) { + if ( !img.virtId.equals( rs.getString( "virtid" ) ) ) { + throw new TInvocationException( InvocationError.INVALID_DATA, "Virtualizer id mismatch" ); + } + MysqlStatement stmt2 = connection.prepareStatement( "UPDATE imagebase SET" + + " displayname = :displayname, updaterid = :updaterid," + + " description = :description, osid = :osid, updatetime = UNIX_TIMESTAMP()," + + " istemplate = :istemplate WHERE imagebaseid = :baseid" ); + stmt2.setString( "baseid", img.imageBaseId ); + stmt2.setString( "displayname", img.imageName ); + stmt2.setString( "updaterid", img.user.userId ); + stmt2.setString( "description", img.description ); + stmt2.setInt( "osid", img.osId ); + stmt2.setBoolean( "istemplate", img.isTemplate ); + stmt2.executeUpdate(); + } else { + MysqlStatement stmt2 = connection.prepareStatement( "INSERT INTO imagebase" + + " (imagebaseid, latestversionid, displayname, description, osid," + + " virtid, createtime, updatetime, ownerid, updaterid, istemplate)" + + " VALUES " + + " (:imagebaseid, NULL, :displayname, :description, :osid," + + " :virtid, :createtime, UNIX_TIMESTAMP(), :ownerid, :updaterid, :istemplate)" ); + stmt2.setString( "imagebaseid", img.imageBaseId ); + stmt2.setString( "displayname", img.imageName ); + stmt2.setString( "description", img.description ); + stmt2.setInt( "osid", img.osId ); + stmt2.setString( "virtid", img.virtId ); + stmt2.setLong( "createtime", img.createTime ); + stmt2.setString( "ownerid", img.user.userId ); + stmt2.setString( "updaterid", img.user.userId ); + stmt2.setBoolean( "istemplate", img.isTemplate ); + stmt2.executeUpdate(); + } + connection.commit(); + } catch ( SQLException e ) { + LOGGER.error( "Query failed in DbImage.createImageBase()", e ); + throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "Database boo-boo" ); + } + } + + public static void createImageVersion( ImagePublishData img, String relLocalPath ) throws SQLException + { + try ( MysqlConnection connection = Database.getConnection() ) { + // Insert version + MysqlStatement verStmt = connection.prepareStatement( "INSERT INTO imageversion" + + " (imageversionid, imagebaseid, createtime, expiretime, filesize," + + " filepath, uploaderid, isvalid, isprocessed, mastersha1, virtualizerconfig)" + + " VALUES " + + " (:imageversionid, :imagebaseid, :createtime, :expiretime, :filesize," + + " :filepath, :uploaderid, 0, 0, NULL, NULL)" ); + verStmt.setString( "imageversionid", img.imageVersionId ); + verStmt.setString( "imagebaseid", img.imageBaseId ); + verStmt.setLong( "createtime", img.createTime ); + verStmt.setLong( "expiretime", Util.unixTime() + Globals.getImageValiditySeconds() ); + verStmt.setLong( "filesize", img.fileSize ); + verStmt.setString( "filepath", relLocalPath ); + verStmt.setString( "uploaderid", img.user.userId ); + verStmt.execute(); + connection.commit(); + } catch ( SQLException e ) { + LOGGER.error( "Query failed in DbImage.createImageVersion()", e ); + throw e; + } + } + } diff --git a/src/main/java/org/openslx/imagemaster/db/mappers/DbImageBlock.java b/src/main/java/org/openslx/imagemaster/db/mappers/DbImageBlock.java new file mode 100644 index 0000000..7986d87 --- /dev/null +++ b/src/main/java/org/openslx/imagemaster/db/mappers/DbImageBlock.java @@ -0,0 +1,112 @@ +package org.openslx.imagemaster.db.mappers; + +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; + +import org.apache.log4j.Logger; +import org.openslx.filetransfer.FileRange; +import org.openslx.filetransfer.util.ChunkStatus; +import org.openslx.filetransfer.util.FileChunk; +import org.openslx.imagemaster.db.Database; +import org.openslx.imagemaster.db.MysqlConnection; +import org.openslx.imagemaster.db.MysqlStatement; + +public class DbImageBlock +{ + + private static final Logger LOGGER = Logger.getLogger( DbImageBlock.class ); + + private static AsyncThread asyncBlockUpdate = null; + + private static synchronized void initAsyncThread() + { + if ( asyncBlockUpdate == null ) { + asyncBlockUpdate = new AsyncThread(); + asyncBlockUpdate.start(); + } + } + + public static void asyncUpdate( String imageVersionId, FileChunk chunk ) throws InterruptedException + { + initAsyncThread(); + asyncBlockUpdate.put( new ChunkUpdate( imageVersionId, chunk.range, chunk.getStatus() != ChunkStatus.COMPLETE ) ); + } + + private static class AsyncThread extends Thread + { + private final ArrayBlockingQueue queue = new ArrayBlockingQueue<>( 100 ); + + public void put( ChunkUpdate chunk ) throws InterruptedException + { + queue.put( chunk ); + } + + @Override + public void run() + { + try { + while ( !interrupted() ) { + ChunkUpdate chunk = queue.take(); + Thread.sleep( 100 ); + try ( MysqlConnection connection = Database.getConnection() ) { + MysqlStatement stmt = connection.prepareStatement( "UPDATE imageblock SET ismissing = :ismissing" + + " WHERE imageversionid = :imageversionid AND startbyte = :startbyte AND blocksize = :blocksize" ); + do { + stmt.setBoolean( "ismissing", chunk.isMissing ); + stmt.setString( "imageversionid", chunk.imageVersionId ); + stmt.setLong( "startbyte", chunk.range.startOffset ); + stmt.setInt( "blocksize", chunk.range.getLength() ); + stmt.executeUpdate(); + chunk = queue.poll(); + } while ( chunk != null ); + connection.commit(); + } catch ( SQLException e ) { + LOGGER.error( "Query failed in DbImageBlock.AsyncThread.run()", e ); + continue; + } + Thread.sleep( 2000 ); + } + } catch ( InterruptedException e ) { + LOGGER.debug( "async thread interrupted" ); + interrupt(); + } + } + } + + private static class ChunkUpdate + { + public final String imageVersionId; + public final FileRange range; + public final boolean isMissing; + + public ChunkUpdate( String imageVersionId, FileRange range, boolean isMissing ) + { + this.imageVersionId = imageVersionId; + this.range = range; + this.isMissing = isMissing; + } + } + + public static void insertChunkList( String imageVersionId, List all, boolean missing ) throws SQLException + { + try ( MysqlConnection connection = Database.getConnection() ) { + MysqlStatement stmt = connection.prepareStatement( "INSERT IGNORE INTO imageblock" + + " (imageversionid, startbyte, blocksize, blocksha1, ismissing) VALUES" + + " (:imageversionid, :startbyte, :blocksize, :blocksha1, :ismissing)" ); + stmt.setString( "imageversionid", imageVersionId ); + stmt.setBoolean( "ismissing", missing ); + for ( FileChunk chunk : all ) { + stmt.setLong( "startbyte", chunk.range.startOffset ); + stmt.setInt( "blocksize", chunk.range.getLength() ); + stmt.setBinary( "blocksha1", chunk.getSha1Sum() ); + stmt.executeUpdate(); + } + connection.commit(); + } catch ( SQLException e ) { + LOGGER.error( "Query failed in DbImageBlock.insertChunkList()", e ); + throw e; + } + } + +} diff --git a/src/main/java/org/openslx/imagemaster/db/mappers/DbUser.java b/src/main/java/org/openslx/imagemaster/db/mappers/DbUser.java index 9cde273..b6040e7 100644 --- a/src/main/java/org/openslx/imagemaster/db/mappers/DbUser.java +++ b/src/main/java/org/openslx/imagemaster/db/mappers/DbUser.java @@ -35,14 +35,14 @@ public class DbUser } /** - * Query database for user with given login + * Query database for user with given user id * * @param login (global user-id, login@org for test-accounts) * @return instance of DbUser for matching entry from DB, or null if not * found * @throws SQLException if the query fails */ - public static LocalUser forLogin( final String login ) throws SQLException + public static LocalUser forUserId( final String login ) throws SQLException { try ( MysqlConnection connection = Database.getConnection() ) { MysqlStatement stmt = connection.prepareStatement( localUserSql @@ -58,22 +58,22 @@ public class DbUser } } - public static UserInfo getUserInfo( final String login ) throws SQLException, TNotFoundException - { - LocalUser user = forLogin( login ); - if ( user == null ) - throw new TNotFoundException(); - return user.toUserInfo(); - } - - public static LocalUser forLogin( String login, String password ) throws SQLException + public static LocalUser forUserId( String login, String password ) throws SQLException { - LocalUser user = forLogin( login ); + LocalUser user = forUserId( login ); if ( user == null || !Sha512Crypt.verifyPassword( password, user.password ) ) return null; return user; } + public static UserInfo getUserInfo( final String login ) throws SQLException, TNotFoundException + { + LocalUser user = forUserId( login ); + if ( user == null ) + throw new TNotFoundException(); + return user.toUserInfo(); + } + public static List findUser( String organizationId, String searchTerm ) { // TODO Implement @@ -81,18 +81,23 @@ public class DbUser } public static boolean exists( UserInfo user ) + { + return exists( user, false ); + } + + public static boolean exists( UserInfo user, boolean withIdentity ) { if ( user == null ) return false; - return exists( user.userId ); + return exists( user.userId, withIdentity ); } - private static boolean exists( String userId ) + private static boolean exists( String userId, boolean withIdentitiy ) { if ( userId == null ) return false; try { - return forLogin( userId ) != null; + return forUserId( userId ) != null; } catch ( SQLException e ) { return false; } diff --git a/src/main/java/org/openslx/imagemaster/db/models/LocalUser.java b/src/main/java/org/openslx/imagemaster/db/models/LocalUser.java index 644373b..bc9289a 100644 --- a/src/main/java/org/openslx/imagemaster/db/models/LocalUser.java +++ b/src/main/java/org/openslx/imagemaster/db/models/LocalUser.java @@ -2,6 +2,7 @@ package org.openslx.imagemaster.db.models; import org.openslx.bwlp.thrift.iface.Role; import org.openslx.bwlp.thrift.iface.UserInfo; +import org.openslx.imagemaster.util.Util; /** * Represents a user. Should be extended and given an according static method to @@ -54,5 +55,10 @@ public class LocalUser { return login; } + + public boolean isAnonymous() + { + return firstName == null || Util.isEmpty( lastName ) || Util.isEmpty( eMail ); + } } diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/AbstractTransfer.java b/src/main/java/org/openslx/imagemaster/serverconnection/AbstractTransfer.java deleted file mode 100644 index 3acac5b..0000000 --- a/src/main/java/org/openslx/imagemaster/serverconnection/AbstractTransfer.java +++ /dev/null @@ -1,92 +0,0 @@ -package org.openslx.imagemaster.serverconnection; - -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - -public abstract class AbstractTransfer { - - /** - * How long to keep this transfer information when the transfer is - * (potentially) done - */ - private static final long FINISH_TIMEOUT = TimeUnit.MINUTES.toMillis(5); - - /** - * How long to keep this transfer information when there are no active - * connections and the transfer seems unfinished - */ - private static final long IDLE_TIMEOUT = TimeUnit.HOURS.toMillis(4); - - /** - * Time stamp of when (we think) the transfer finished. Clients can/might - * not tell us they're done, and simply taking "no active connection" as a - * sign the download is done might have unwanted effects if the user's - * connection drops for a minute. If this time stamp (plus a FINISH_TIMEOUT) - * passed, - * we consider the download done and flag it for removal. - * If set to zero, the transfer is not finished, or not assumed to have - * finished. - */ - protected final AtomicLong potentialFinishTime = new AtomicLong(0); - - /** - * Time of last activity on this transfer. - */ - protected final AtomicLong lastActivityTime = new AtomicLong(System.currentTimeMillis()); - - private final String transferId; - - public AbstractTransfer(String transferId) { - this.transferId = transferId; - } - - /** - * Returns true if the transfer is considered completed. - * - * @param now pass System.currentTimeMillis() - * @return true if the transfer is considered completed - */ - public boolean isComplete(long now) { - long val = potentialFinishTime.get(); - return val != 0 && val + FINISH_TIMEOUT < now; - } - - /** - * Returns true if there has been no activity on this transfer for a certain - * amount of time. - * - * @param now pass System.currentTimeMillis() - * @return true if the transfer reached its idle timeout - */ - public final boolean hasReachedIdleTimeout(long now) { - return getActiveConnectionCount() == 0 && lastActivityTime.get() + IDLE_TIMEOUT < now; - } - - public final String getId() { - return transferId; - } - - /** - * Returns true if this transfer would potentially accept new connections. - * This should NOT return false if there are too many concurrent - * connections, as this is used to signal the client whether to keep trying - * to connect. - * - * @return true if this transfer would potentially accept new connections - */ - public abstract boolean isActive(); - - /** - * Cancel this transfer, aborting all active connections and rejecting - * further incoming ones. - */ - public abstract void cancel(); - - /** - * Returns number of active transfer connections. - * - * @return number of active transfer connections - */ - public abstract int getActiveConnectionCount(); - -} diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java index 44c8e16..141e17f 100644 --- a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java +++ b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java @@ -1,13 +1,15 @@ package org.openslx.imagemaster.serverconnection; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.ByteBuffer; import java.security.KeyStore; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.net.ssl.KeyManager; @@ -19,17 +21,13 @@ import org.openslx.bwlp.thrift.iface.ImagePublishData; import org.openslx.bwlp.thrift.iface.InvocationError; import org.openslx.bwlp.thrift.iface.TInvocationException; import org.openslx.bwlp.thrift.iface.TTransferRejectedException; -import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.filetransfer.Downloader; import org.openslx.filetransfer.IncomingEvent; import org.openslx.filetransfer.Listener; import org.openslx.filetransfer.Uploader; +import org.openslx.filetransfer.util.AbstractTransfer; import org.openslx.imagemaster.Globals; -import org.openslx.imagemaster.crcchecker.CrcFile; -import org.openslx.imagemaster.db.mappers.DbOsVirt; -import org.openslx.imagemaster.db.mappers.DbUser; -import org.openslx.imagemaster.util.RandomString; -import org.openslx.imagemaster.util.Util; +import org.openslx.util.GrowingThreadPoolExecutor; /** * Class to handle all incoming and outgoing connections. @@ -38,19 +36,24 @@ import org.openslx.imagemaster.util.Util; public class ConnectionHandler implements IncomingEvent { - private static Logger log = Logger.getLogger( ConnectionHandler.class ); - private static SSLContext sslContext; + private static final Logger LOGGER = Logger.getLogger( ConnectionHandler.class ); - private static Map pendingIncomingUploads = new ConcurrentHashMap<>(); - private static Map pendingIncomingDownloads = new ConcurrentHashMap<>(); + private static final int MAX_TRANSFERS = 12; + + private static Map incomingTransfers = new ConcurrentHashMap<>(); + private static Map outgoingTransfers = new ConcurrentHashMap<>(); private static IncomingEvent eventHandler = new ConnectionHandler(); - private static ThreadPoolExecutor uploadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue() ); - private static ThreadPoolExecutor downloadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue() ); + private final ExecutorService transferPool = new GrowingThreadPoolExecutor( 1, MAX_TRANSFERS * 2, 1, TimeUnit.MINUTES, + new SynchronousQueue(), + new PrioThreadFactory( "TransferPool", Thread.NORM_PRIORITY - 2 ) ); - private static Listener listener; + private static final Listener plainListener; + private static final Listener sslListener; static { - log.debug( "Starting listener on port " + Globals.getFiletransferPortSsl() ); + LOGGER.debug( "Starting listener on port " + Globals.getFiletransferPortSsl() ); + Listener ssl = null; + Listener plain = null; try { String pathToKeyStore = Globals.getSslKeystoreFile(); char[] passphrase = Globals.getSslKeystorePassword().toCharArray(); @@ -58,94 +61,55 @@ public class ConnectionHandler implements IncomingEvent keystore.load( new FileInputStream( pathToKeyStore ), passphrase ); KeyManagerFactory kmf = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm() ); kmf.init( keystore, passphrase ); - sslContext = SSLContext.getInstance( "TLSv1.2" ); + SSLContext sslContext = SSLContext.getInstance( "TLSv1.2" ); KeyManager[] keyManagers = kmf.getKeyManagers(); sslContext.init( keyManagers, null, null ); - listener = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 ); - listener.start(); + ssl = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 ); + ssl.start(); + plain = new Listener( eventHandler, null, Globals.getFiletransferPortPlain(), Globals.getFiletransferTimeout() * 1000 ); + plain.start(); } catch ( Exception e ) { - log.error( "Initialization failed.", e ); + LOGGER.error( "Initialization failed.", e ); System.exit( 2 ); } + sslListener = ssl; + plainListener = plain; } - /** - * Checks if this image is already uploading and returns a new list with missing blocks if so. - * Puts the new image into processing list else. - * - * @param serverSessionId The uploading server - * @param imageData The data of the image - * @return - * @throws UploadException If some error occurred during the process - */ - public static TransferInformation getUploadInfos( ImagePublishData imageData, List crcSums ) + public static IncomingTransfer registerUpload( ImagePublishData img, List blockHashes ) throws TTransferRejectedException, TInvocationException { - // check image data - if ( Util.isEmpty( imageData.imageName ) ) - throw new TInvocationException( InvocationError.INVALID_DATA, "Image name not set" ); - if ( !DbUser.exists( imageData.user ) ) - throw new TInvocationException( InvocationError.INVALID_DATA, "Invalid or missing image owner" ); - if ( DbOsVirt.osExists( imageData.osId ) ) - throw new TInvocationException( InvocationError.INVALID_DATA, "Content operating system not set" ); - if ( DbOsVirt.virtExists( imageData.virtId ) ) - throw new TInvocationException( InvocationError.INVALID_DATA, "Content virtualizer system not set" ); - if ( imageData.fileSize <= 0 ) - throw new TInvocationException( InvocationError.INVALID_DATA, "File size is too small" ); - - log.debug( "A satellite is submitting " + imageData.imageVersionId ); - - final String uuid = imageData.imageVersionId; - final String filepathRelative; - final CrcFile crcFile; - if ( crcSums == null ) { - crcFile = null; - } else { - crcFile = new CrcFile( crcSums ); - } - ImagePublishData image; - - synchronized ( pendingIncomingUploads ) { - /* - // check if image is already uploading - if ( ( image = uploadingImages.get( uuid ) ) == null ) { - // TODO insert new image to DB - uploadingImages.put( uuid, image ); + IncomingTransfer transfer; + synchronized ( incomingTransfers ) { + transfer = incomingTransfers.get( img.imageVersionId ); + if ( transfer == null ) { + if ( getUploadConnectionCount() >= MAX_TRANSFERS ) { + throw new TTransferRejectedException( "Too many active transfers" ); + } + try { + transfer = new IncomingTransfer( img, blockHashes ); + } catch ( FileNotFoundException e ) { + LOGGER.warn( "Cannot init download", e ); + throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "File access error" ); + } + incomingTransfers.put( transfer.getId(), transfer ); + incomingTransfers.put( img.imageVersionId, transfer ); } - */ } - - final String token = RandomString.generate( 50, false ); - - // TODO addUpload( token, image ); - // TODO Set crc file on image - if there is already a crc file assigned, this does nothing - return new TransferInformation( token, Globals.getFiletransferPortPlain(), Globals.getFiletransferPortSsl() ); - } - - /** - * Add a new allowed incoming upload connection - * for the given token and image. - * - * @param token The unique token - * @param image Image being uploaded - */ - public static void addUpload( String token, AbstractTransfer image ) - { - pendingIncomingUploads.put( token, image ); - log.debug( "Added upload" ); + return transfer; } - /** - * Add a new allowed incoming download connection - * for the given token and image. - * - * @param token The unique token - * @param image Image being uploaded - */ - public static void addDownload( String token, AbstractTransfer image ) + public static IncomingTransfer getExistingUpload( ImagePublishData imageData, List crcSums ) + throws TTransferRejectedException { - pendingIncomingDownloads.put( token, image ); - log.debug( "Added download" ); + IncomingTransfer transfer = incomingTransfers.get( imageData.imageVersionId ); + if ( transfer == null ) + return null; + if ( transfer.getFileSize() != imageData.fileSize ) + throw new TTransferRejectedException( "File size mismatch" ); + if ( !transfer.hashesEqual( crcSums ) ) + throw new TTransferRejectedException( "Block hashes mismatch" ); + return transfer; } /** @@ -165,8 +129,32 @@ public class ConnectionHandler implements IncomingEvent @Override public void incomingUploadRequest( final Downloader downloader ) throws IOException { - // TODO - downloader.sendErrorCode( "Too many concurrent downloads." ); - downloader.cancel(); + IncomingTransfer transfer = incomingTransfers.get( downloader.getToken() ); + if ( transfer == null ) { + downloader.sendErrorCode( "Unknown upload token." ); + downloader.cancel(); + return; + } + if ( getUploadConnectionCount() >= MAX_TRANSFERS ) { + downloader.sendErrorCode( "Too many concurrent uploads." ); + downloader.cancel(); + return; + } + if ( !transfer.addConnection( downloader, transferPool ) ) { + downloader.cancel(); + } } + + public static int getUploadConnectionCount() + { + final long now = System.currentTimeMillis(); + int active = 0; + for ( IncomingTransfer t : incomingTransfers.values() ) { + if ( t.countsTowardsConnectionLimit( now ) ) { + active += t.getActiveConnectionCount(); + } + } + return active; + } + } diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java b/src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java new file mode 100644 index 0000000..bfc65e1 --- /dev/null +++ b/src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java @@ -0,0 +1,81 @@ +package org.openslx.imagemaster.serverconnection; + +import java.io.File; +import java.io.FileNotFoundException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.UUID; + +import org.openslx.bwlp.thrift.iface.ImagePublishData; +import org.openslx.bwlp.thrift.iface.TInvocationException; +import org.openslx.bwlp.thrift.iface.TransferInformation; +import org.openslx.filetransfer.util.ChunkStatus; +import org.openslx.filetransfer.util.FileChunk; +import org.openslx.filetransfer.util.IncomingTransferBase; +import org.openslx.imagemaster.Globals; +import org.openslx.imagemaster.db.mappers.DbImageBlock; +import org.openslx.imagemaster.util.Util; +import org.openslx.util.ThriftUtil; + +public class IncomingTransfer extends IncomingTransferBase +{ + + private static final long MIN_FREE_SPACE_BYTES = FileChunk.CHUNK_SIZE * 10; + + private final String imageVersionId; + + public IncomingTransfer( ImagePublishData img, List blockHashes ) + throws TInvocationException, FileNotFoundException + { + super( UUID.randomUUID().toString(), new File( new File( Globals.getImageDir(), img.imageBaseId ), img.imageVersionId ), + img.fileSize, ThriftUtil.unwrapByteBufferList( blockHashes ) ); + this.imageVersionId = img.imageVersionId; + } + + @Override + public String getRelativePath() + { + return Util.getRelativePath( getTmpFileName(), new File( Globals.getImageDir() ) ); + } + + @Override + public synchronized void cancel() + { + super.cancel(); + getTmpFileName().delete(); + } + + @Override + protected boolean hasEnoughFreeSpace() + { + long space = Globals.getImagePath().getUsableSpace(); + return space > MIN_FREE_SPACE_BYTES; + } + + @Override + protected boolean finishIncomingTransfer() + { + potentialFinishTime.set( System.currentTimeMillis() ); + return true; + } + + @Override + public TransferInformation getTransferInfo() + { + return new TransferInformation( getId(), Globals.getFiletransferPortPlain(), Globals.getFiletransferPortSsl() ); + } + + @Override + protected void chunkStatusChanged( FileChunk chunk ) + { + ChunkStatus status = chunk.getStatus(); + if ( status == ChunkStatus.MISSING || status == ChunkStatus.COMPLETE ) { + try { + DbImageBlock.asyncUpdate( imageVersionId, chunk ); + } catch ( InterruptedException e ) { + e.printStackTrace(); + } + } + } + +} diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/PrioThreadFactory.java b/src/main/java/org/openslx/imagemaster/serverconnection/PrioThreadFactory.java new file mode 100644 index 0000000..5fa9da4 --- /dev/null +++ b/src/main/java/org/openslx/imagemaster/serverconnection/PrioThreadFactory.java @@ -0,0 +1,24 @@ +package org.openslx.imagemaster.serverconnection; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class PrioThreadFactory implements ThreadFactory { + + private final AtomicInteger counter = new AtomicInteger(); + private final String name; + private final int priority; + + public PrioThreadFactory(String name, int priority) { + this.name = name; + this.priority = priority; + } + + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r, name + "-" + counter.incrementAndGet()); + thread.setPriority(priority); + return thread; + } + +} diff --git a/src/main/java/org/openslx/imagemaster/session/Authenticator.java b/src/main/java/org/openslx/imagemaster/session/Authenticator.java index ea7e581..c11e597 100644 --- a/src/main/java/org/openslx/imagemaster/session/Authenticator.java +++ b/src/main/java/org/openslx/imagemaster/session/Authenticator.java @@ -36,7 +36,7 @@ public class Authenticator LocalUser user; try { - user = DbUser.forLogin( login, password ); + user = DbUser.forUserId( login, password ); } catch ( SQLException e ) { throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "Could not connect to database" ); } // throws exception if credentials are invalid diff --git a/src/main/java/org/openslx/imagemaster/session/SessionManager.java b/src/main/java/org/openslx/imagemaster/session/SessionManager.java index c12334a..c141d24 100644 --- a/src/main/java/org/openslx/imagemaster/session/SessionManager.java +++ b/src/main/java/org/openslx/imagemaster/session/SessionManager.java @@ -81,7 +81,7 @@ public class SessionManager }, 123, TimeUnit.MINUTES.toMillis( 13 ) ); } - public static Object getSessionFromSessionId( String sessionId ) + public static Session getSessionFromSessionId( String sessionId ) { if ( sessionId == null || sessionId.length() != 64 ) { log.debug( "invalid sessionid format: " + sessionId ); diff --git a/src/main/java/org/openslx/imagemaster/thrift/server/BinaryListener.java b/src/main/java/org/openslx/imagemaster/thrift/server/BinaryListener.java index a17f216..bf015a1 100644 --- a/src/main/java/org/openslx/imagemaster/thrift/server/BinaryListener.java +++ b/src/main/java/org/openslx/imagemaster/thrift/server/BinaryListener.java @@ -102,7 +102,7 @@ public class BinaryListener implements Runnable THsHaServer.Args args = new THsHaServer.Args( serverTransport ); args.protocolFactory( protFactory ); args.processor( processor ); - args.workerThreads( 8 ); + args.minWorkerThreads( 2 ).maxWorkerThreads( 6 ); args.maxReadBufferBytes = MAX_MSG_LEN; return new THsHaServer( args ); } diff --git a/src/main/java/org/openslx/imagemaster/thrift/server/MasterServerHandler.java b/src/main/java/org/openslx/imagemaster/thrift/server/MasterServerHandler.java index ed1db56..60f4ccb 100644 --- a/src/main/java/org/openslx/imagemaster/thrift/server/MasterServerHandler.java +++ b/src/main/java/org/openslx/imagemaster/thrift/server/MasterServerHandler.java @@ -1,5 +1,6 @@ package org.openslx.imagemaster.thrift.server; +import java.io.File; import java.nio.ByteBuffer; import java.security.Key; import java.security.NoSuchAlgorithmException; @@ -9,6 +10,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.log4j.Logger; +import org.apache.thrift.TException; import org.openslx.bwlp.thrift.iface.AuthorizationError; import org.openslx.bwlp.thrift.iface.ClientSessionData; import org.openslx.bwlp.thrift.iface.ImagePublishData; @@ -30,12 +32,20 @@ import org.openslx.bwlp.thrift.iface.TransferInformation; import org.openslx.bwlp.thrift.iface.UserInfo; import org.openslx.bwlp.thrift.iface.Virtualizer; import org.openslx.encryption.AsymKeyHolder; +import org.openslx.filetransfer.util.FileChunk; +import org.openslx.imagemaster.Globals; +import org.openslx.imagemaster.db.Database; +import org.openslx.imagemaster.db.mappers.DbImage; +import org.openslx.imagemaster.db.mappers.DbImageBlock; import org.openslx.imagemaster.db.mappers.DbOrganization; import org.openslx.imagemaster.db.mappers.DbOsVirt; import org.openslx.imagemaster.db.mappers.DbPendingSatellite; import org.openslx.imagemaster.db.mappers.DbSatellite; import org.openslx.imagemaster.db.mappers.DbUser; import org.openslx.imagemaster.db.models.LocalSatellite; +import org.openslx.imagemaster.db.models.LocalUser; +import org.openslx.imagemaster.serverconnection.ConnectionHandler; +import org.openslx.imagemaster.serverconnection.IncomingTransfer; import org.openslx.imagemaster.serversession.ServerAuthenticator; import org.openslx.imagemaster.serversession.ServerSession; import org.openslx.imagemaster.serversession.ServerSessionManager; @@ -95,6 +105,18 @@ public class MasterServerHandler implements MasterServer.Iface return SessionManager.addSession( session ); } + /** + * User tells us which satellite they connected to. + */ + @Override + public void setUsedSatellite( String sessionId, String satelliteName ) + { + Session session = SessionManager.getSessionFromSessionId( sessionId ); + if ( session == null ) + return; + //session.setUsedSatellite( satelliteName ); + } + @Override public List findUser( String sessionId, String organizationId, String searchTerm ) throws TAuthorizationException, TInvocationException @@ -197,11 +219,90 @@ public class MasterServerHandler implements MasterServer.Iface } @Override - public TransferInformation submitImage( String serverSessionId, ImagePublishData imageDescription, List blockHashes ) + public TransferInformation submitImage( String userToken, ImagePublishData img, List blockHashes ) throws TAuthorizationException, TInvocationException, TTransferRejectedException { - // TODO Auto-generated method stub - return null; + // Valid submit session? + Session session = SessionManager.getSessionFromToken( userToken ); + if ( session == null ) + throw new TAuthorizationException( AuthorizationError.INVALID_TOKEN, "Given user token not known to the server" ); + // check image data + if ( Util.isEmpty( img.imageName ) ) + throw new TInvocationException( InvocationError.INVALID_DATA, "Image name not set" ); + if ( img.fileSize <= 0 ) + throw new TInvocationException( InvocationError.INVALID_DATA, "File size is too small" ); + if ( !Util.isUUID( img.imageBaseId ) ) + throw new TInvocationException( InvocationError.MISSING_DATA, "ImagePublishData has invalid imageBaseId" ); + if ( !Util.isUUID( img.imageVersionId ) ) + throw new TInvocationException( InvocationError.MISSING_DATA, "ImagePublishData has invalid imageVersionId" ); + if ( img.user == null || img.user.userId == null ) + throw new TInvocationException( InvocationError.MISSING_DATA, "Missing user id" ); + // check for complete block hash list + boolean listComplete = false; + if ( blockHashes != null && blockHashes.size() == FileChunk.fileSizeToChunkCount( img.fileSize ) ) { + listComplete = true; + for ( ByteBuffer bb : blockHashes ) { + if ( bb == null || bb.remaining() != FileChunk.SHA1_LENGTH ) { + listComplete = false; + break; + } + } + } + if ( !listComplete ) + throw new TInvocationException( InvocationError.INVALID_DATA, "Chunk hash list missing or incomplete" ); + // Check if an upload is already assigned + IncomingTransfer existingUpload = ConnectionHandler.getExistingUpload( img, blockHashes ); + if ( existingUpload != null ) { + return existingUpload.getTransferInfo(); + } + // No existing upload - create new one + // checks that hit the db + if ( !DbOsVirt.osExists( img.osId ) ) + throw new TInvocationException( InvocationError.INVALID_DATA, "Content operating system not set" ); + if ( !DbOsVirt.virtExists( img.virtId ) ) + throw new TInvocationException( InvocationError.INVALID_DATA, "Content virtualizer system not set" ); + try { + LocalUser user = DbUser.forUserId( img.user.userId ); + if ( user == null ) { + user = DbUser.forUserId( session.getUserInfo().userId ); + if ( user != null ) { + img.user = user.toUserInfo(); + } + } + if ( user == null ) + throw new TInvocationException( InvocationError.UNKNOWN_USER, "Unknown user id " + img.user.userId ); + if ( user.isAnonymous() ) + throw new TInvocationException( InvocationError.UNKNOWN_USER, "The owner of the image does not participate in image exchange" ); + } catch ( SQLException e ) { + throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "Database error" ); + } + // Make sure we have a destination to write to + if ( !new File( Globals.getImageDir() ).isDirectory() ) + throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "Storage offline" ); + // Try to register an upload + IncomingTransfer transfer = ConnectionHandler.registerUpload( img, blockHashes ); + try { + DbImage.createImageBase( img ); + } catch ( TException t ) { + transfer.cancel(); + throw t; + } + try { + DbImage.createImageVersion( img, transfer.getRelativePath() ); + } catch ( SQLException e1 ) { + transfer.cancel(); + if ( Database.isDuplicateKeyException( e1 ) ) { + throw new TInvocationException( InvocationError.INVALID_DATA, "The image already exists on the server" ); + } else { + throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "Database error" ); + } + } + try { + DbImageBlock.insertChunkList( img.imageVersionId, transfer.getChunks().getAll(), true ); + } catch ( SQLException e ) { + LOGGER.warn( "Could not insert block hashes of image " + img.imageVersionId + " to db" ); + } + return transfer.getTransferInfo(); } @Override @@ -218,7 +319,7 @@ public class MasterServerHandler implements MasterServer.Iface try { return DbOrganization.getAll(); } catch ( SQLException e ) { - throw new TInvocationException(); + throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "Database error" ); } } @@ -228,7 +329,7 @@ public class MasterServerHandler implements MasterServer.Iface try { return DbOsVirt.getOsList(); } catch ( SQLException e ) { - throw new TInvocationException(); + throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "Database error" ); } } @@ -238,7 +339,7 @@ public class MasterServerHandler implements MasterServer.Iface try { return DbOsVirt.getVirtualizerList(); } catch ( SQLException e ) { - throw new TInvocationException(); + throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "Database error" ); } } @@ -273,10 +374,6 @@ public class MasterServerHandler implements MasterServer.Iface LOGGER.warn( "Invalid public key in registerOrganization for " + organizationId + " (By " + session.getLogin() + ")", e ); throw new TInvocationException( InvocationError.INVALID_DATA, "Cannot reconstruct public key" ); } - if ( newKey == null ) { - LOGGER.warn( "Uninstantiable public key in registerOrganization for " + organizationId + " (By " + session.getLogin() + ")" ); - throw new TInvocationException( InvocationError.INVALID_DATA, "Cannot reconstruct public key" ); - } LocalSatellite existing = DbSatellite.get( organizationId, displayName ); if ( existing != null ) { Key existingKey = existing.getPubkey(); diff --git a/src/main/java/org/openslx/imagemaster/util/Util.java b/src/main/java/org/openslx/imagemaster/util/Util.java index e9146e0..f8d9248 100644 --- a/src/main/java/org/openslx/imagemaster/util/Util.java +++ b/src/main/java/org/openslx/imagemaster/util/Util.java @@ -7,6 +7,7 @@ import java.security.interfaces.RSAPrivateKey; import java.security.interfaces.RSAPublicKey; import java.util.Arrays; import java.util.Random; +import java.util.UUID; import org.apache.log4j.Logger; @@ -120,7 +121,7 @@ public class Util public static boolean isEmpty( String str ) { - return str != null && !str.isEmpty(); + return str == null || str.isEmpty(); } public static boolean isEmpty( String str, String message, Logger logger ) @@ -217,4 +218,30 @@ public class Util return Arrays.equals( k1.getEncoded(), k2.getEncoded() ); } + public static String getRelativePath( File absolutePath, File parentDir ) + { + String file; + String dir; + try { + file = absolutePath.getCanonicalPath(); + dir = parentDir.getCanonicalPath() + File.separator; + } catch ( Exception e ) { + LOGGER.error( "Could not get relative path for " + absolutePath.toString(), e ); + return null; + } + if ( !file.startsWith( dir ) ) + return null; + return file.substring( dir.length() ); + } + + public static boolean isUUID( String id ) + { + try { + UUID.fromString( id ); + } catch ( Exception e ) { + return false; + } + return true; + } + } -- cgit v1.2.3-55-g7522