summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorSimon Rettberg2016-04-13 18:41:29 +0200
committerSimon Rettberg2016-04-13 18:41:29 +0200
commitf5618c87e63deb99920710787f6dcd34d4b17425 (patch)
treeafc5fb21b632921094d0576f1e57dbcff240ae6d /src
parentFix getUSerInfo call on Session class (diff)
downloadmasterserver-f5618c87e63deb99920710787f6dcd34d4b17425.tar.gz
masterserver-f5618c87e63deb99920710787f6dcd34d4b17425.tar.xz
masterserver-f5618c87e63deb99920710787f6dcd34d4b17425.zip
(WiP) Global image sync
Diffstat (limited to 'src')
-rw-r--r--src/main/java/org/openslx/imagemaster/Constants.java8
-rw-r--r--src/main/java/org/openslx/imagemaster/Globals.java17
-rw-r--r--src/main/java/org/openslx/imagemaster/db/Database.java10
-rw-r--r--src/main/java/org/openslx/imagemaster/db/mappers/DbImage.java87
-rw-r--r--src/main/java/org/openslx/imagemaster/db/mappers/DbImageBlock.java112
-rw-r--r--src/main/java/org/openslx/imagemaster/db/mappers/DbUser.java35
-rw-r--r--src/main/java/org/openslx/imagemaster/db/models/LocalUser.java6
-rw-r--r--src/main/java/org/openslx/imagemaster/serverconnection/AbstractTransfer.java92
-rw-r--r--src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java172
-rw-r--r--src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java81
-rw-r--r--src/main/java/org/openslx/imagemaster/serverconnection/PrioThreadFactory.java24
-rw-r--r--src/main/java/org/openslx/imagemaster/session/Authenticator.java2
-rw-r--r--src/main/java/org/openslx/imagemaster/session/SessionManager.java2
-rw-r--r--src/main/java/org/openslx/imagemaster/thrift/server/BinaryListener.java2
-rw-r--r--src/main/java/org/openslx/imagemaster/thrift/server/MasterServerHandler.java117
-rw-r--r--src/main/java/org/openslx/imagemaster/util/Util.java29
16 files changed, 581 insertions, 215 deletions
diff --git a/src/main/java/org/openslx/imagemaster/Constants.java b/src/main/java/org/openslx/imagemaster/Constants.java
new file mode 100644
index 0000000..1d2973b
--- /dev/null
+++ b/src/main/java/org/openslx/imagemaster/Constants.java
@@ -0,0 +1,8 @@
+package org.openslx.imagemaster;
+
+public class Constants
+{
+
+ public static final int HASHCHECK_QUEUE_LEN = 6;
+
+}
diff --git a/src/main/java/org/openslx/imagemaster/Globals.java b/src/main/java/org/openslx/imagemaster/Globals.java
index 852d8dd..aebc198 100644
--- a/src/main/java/org/openslx/imagemaster/Globals.java
+++ b/src/main/java/org/openslx/imagemaster/Globals.java
@@ -1,6 +1,7 @@
package org.openslx.imagemaster;
import java.io.BufferedInputStream;
+import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
@@ -16,6 +17,7 @@ public class Globals
private static Logger LOGGER = Logger.getLogger( Globals.class );
private static final Properties properties = new Properties();
+ private static File imgPath = null;
/* CONSTANTS */
/**
@@ -118,6 +120,12 @@ public class Globals
return Util.tryToParseInt( properties.getProperty( "thrift.port.plain" ) );
}
+ public static long getImageValiditySeconds()
+ {
+ // TODO Auto-generated method stub
+ return 86400l * 500;
+ }
+
/* STRINGS */
public static String getImageDir()
@@ -139,4 +147,13 @@ public class Globals
{
return properties.getProperty( "ssl.keystore.password" );
}
+
+ public static File getImagePath()
+ {
+ if ( imgPath == null ) {
+ imgPath = new File( getImageDir() );
+ }
+ return imgPath;
+ }
+
}
diff --git a/src/main/java/org/openslx/imagemaster/db/Database.java b/src/main/java/org/openslx/imagemaster/db/Database.java
index 76c44fc..765f782 100644
--- a/src/main/java/org/openslx/imagemaster/db/Database.java
+++ b/src/main/java/org/openslx/imagemaster/db/Database.java
@@ -144,6 +144,16 @@ public class Database
pool.add( connection );
}
+ /**
+ * Return true if the given sql exception is "duplicate entry XXXX for key YYYY.
+ */
+ public static boolean isDuplicateKeyException( SQLException e )
+ {
+ return e != null && e.getErrorCode() == 1062;
+ }
+
+ //
+
public static void printCharsetInformation()
{
LOGGER.info( "MySQL charset related variables:" );
diff --git a/src/main/java/org/openslx/imagemaster/db/mappers/DbImage.java b/src/main/java/org/openslx/imagemaster/db/mappers/DbImage.java
index 2f5394c..f4c3ddc 100644
--- a/src/main/java/org/openslx/imagemaster/db/mappers/DbImage.java
+++ b/src/main/java/org/openslx/imagemaster/db/mappers/DbImage.java
@@ -1,7 +1,17 @@
package org.openslx.imagemaster.db.mappers;
-import org.openslx.bwlp.thrift.iface.ImagePublishData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import org.apache.log4j.Logger;
+import org.openslx.bwlp.thrift.iface.ImagePublishData;
+import org.openslx.bwlp.thrift.iface.InvocationError;
+import org.openslx.bwlp.thrift.iface.TInvocationException;
+import org.openslx.imagemaster.Globals;
+import org.openslx.imagemaster.db.Database;
+import org.openslx.imagemaster.db.MysqlConnection;
+import org.openslx.imagemaster.db.MysqlStatement;
+import org.openslx.util.Util;
/**
* Representing an image in the database.
@@ -10,10 +20,83 @@ import org.openslx.bwlp.thrift.iface.ImagePublishData;
public class DbImage
{
+ private static final Logger LOGGER = Logger.getLogger( DbImage.class );
+
public static ImagePublishData getImageVersion( String imageVersionId )
{
- // TODO Auto-generated method stub
return null;
}
+ public static void createImageBase( ImagePublishData img ) throws TInvocationException
+ {
+ // Input seems valid
+ try ( MysqlConnection connection = Database.getConnection() ) {
+ MysqlStatement stmt = connection.prepareStatement( "SELECT virtid FROM imagebase WHERE imagebaseid = :baseid" );
+ stmt.setString( "baseid", img.imageBaseId );
+ ResultSet rs = stmt.executeQuery();
+ if ( rs.next() ) {
+ if ( !img.virtId.equals( rs.getString( "virtid" ) ) ) {
+ throw new TInvocationException( InvocationError.INVALID_DATA, "Virtualizer id mismatch" );
+ }
+ MysqlStatement stmt2 = connection.prepareStatement( "UPDATE imagebase SET"
+ + " displayname = :displayname, updaterid = :updaterid,"
+ + " description = :description, osid = :osid, updatetime = UNIX_TIMESTAMP(),"
+ + " istemplate = :istemplate WHERE imagebaseid = :baseid" );
+ stmt2.setString( "baseid", img.imageBaseId );
+ stmt2.setString( "displayname", img.imageName );
+ stmt2.setString( "updaterid", img.user.userId );
+ stmt2.setString( "description", img.description );
+ stmt2.setInt( "osid", img.osId );
+ stmt2.setBoolean( "istemplate", img.isTemplate );
+ stmt2.executeUpdate();
+ } else {
+ MysqlStatement stmt2 = connection.prepareStatement( "INSERT INTO imagebase"
+ + " (imagebaseid, latestversionid, displayname, description, osid,"
+ + " virtid, createtime, updatetime, ownerid, updaterid, istemplate)"
+ + " VALUES "
+ + " (:imagebaseid, NULL, :displayname, :description, :osid,"
+ + " :virtid, :createtime, UNIX_TIMESTAMP(), :ownerid, :updaterid, :istemplate)" );
+ stmt2.setString( "imagebaseid", img.imageBaseId );
+ stmt2.setString( "displayname", img.imageName );
+ stmt2.setString( "description", img.description );
+ stmt2.setInt( "osid", img.osId );
+ stmt2.setString( "virtid", img.virtId );
+ stmt2.setLong( "createtime", img.createTime );
+ stmt2.setString( "ownerid", img.user.userId );
+ stmt2.setString( "updaterid", img.user.userId );
+ stmt2.setBoolean( "istemplate", img.isTemplate );
+ stmt2.executeUpdate();
+ }
+ connection.commit();
+ } catch ( SQLException e ) {
+ LOGGER.error( "Query failed in DbImage.createImageBase()", e );
+ throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "Database boo-boo" );
+ }
+ }
+
+ public static void createImageVersion( ImagePublishData img, String relLocalPath ) throws SQLException
+ {
+ try ( MysqlConnection connection = Database.getConnection() ) {
+ // Insert version
+ MysqlStatement verStmt = connection.prepareStatement( "INSERT INTO imageversion"
+ + " (imageversionid, imagebaseid, createtime, expiretime, filesize,"
+ + " filepath, uploaderid, isvalid, isprocessed, mastersha1, virtualizerconfig)"
+ + " VALUES "
+ + " (:imageversionid, :imagebaseid, :createtime, :expiretime, :filesize,"
+ + " :filepath, :uploaderid, 0, 0, NULL, NULL)" );
+ verStmt.setString( "imageversionid", img.imageVersionId );
+ verStmt.setString( "imagebaseid", img.imageBaseId );
+ verStmt.setLong( "createtime", img.createTime );
+ verStmt.setLong( "expiretime", Util.unixTime() + Globals.getImageValiditySeconds() );
+ verStmt.setLong( "filesize", img.fileSize );
+ verStmt.setString( "filepath", relLocalPath );
+ verStmt.setString( "uploaderid", img.user.userId );
+ verStmt.execute();
+ connection.commit();
+ } catch ( SQLException e ) {
+ LOGGER.error( "Query failed in DbImage.createImageVersion()", e );
+ throw e;
+ }
+ }
+
}
diff --git a/src/main/java/org/openslx/imagemaster/db/mappers/DbImageBlock.java b/src/main/java/org/openslx/imagemaster/db/mappers/DbImageBlock.java
new file mode 100644
index 0000000..7986d87
--- /dev/null
+++ b/src/main/java/org/openslx/imagemaster/db/mappers/DbImageBlock.java
@@ -0,0 +1,112 @@
+package org.openslx.imagemaster.db.mappers;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.log4j.Logger;
+import org.openslx.filetransfer.FileRange;
+import org.openslx.filetransfer.util.ChunkStatus;
+import org.openslx.filetransfer.util.FileChunk;
+import org.openslx.imagemaster.db.Database;
+import org.openslx.imagemaster.db.MysqlConnection;
+import org.openslx.imagemaster.db.MysqlStatement;
+
+public class DbImageBlock
+{
+
+ private static final Logger LOGGER = Logger.getLogger( DbImageBlock.class );
+
+ private static AsyncThread asyncBlockUpdate = null;
+
+ private static synchronized void initAsyncThread()
+ {
+ if ( asyncBlockUpdate == null ) {
+ asyncBlockUpdate = new AsyncThread();
+ asyncBlockUpdate.start();
+ }
+ }
+
+ public static void asyncUpdate( String imageVersionId, FileChunk chunk ) throws InterruptedException
+ {
+ initAsyncThread();
+ asyncBlockUpdate.put( new ChunkUpdate( imageVersionId, chunk.range, chunk.getStatus() != ChunkStatus.COMPLETE ) );
+ }
+
+ private static class AsyncThread extends Thread
+ {
+ private final ArrayBlockingQueue<ChunkUpdate> queue = new ArrayBlockingQueue<>( 100 );
+
+ public void put( ChunkUpdate chunk ) throws InterruptedException
+ {
+ queue.put( chunk );
+ }
+
+ @Override
+ public void run()
+ {
+ try {
+ while ( !interrupted() ) {
+ ChunkUpdate chunk = queue.take();
+ Thread.sleep( 100 );
+ try ( MysqlConnection connection = Database.getConnection() ) {
+ MysqlStatement stmt = connection.prepareStatement( "UPDATE imageblock SET ismissing = :ismissing"
+ + " WHERE imageversionid = :imageversionid AND startbyte = :startbyte AND blocksize = :blocksize" );
+ do {
+ stmt.setBoolean( "ismissing", chunk.isMissing );
+ stmt.setString( "imageversionid", chunk.imageVersionId );
+ stmt.setLong( "startbyte", chunk.range.startOffset );
+ stmt.setInt( "blocksize", chunk.range.getLength() );
+ stmt.executeUpdate();
+ chunk = queue.poll();
+ } while ( chunk != null );
+ connection.commit();
+ } catch ( SQLException e ) {
+ LOGGER.error( "Query failed in DbImageBlock.AsyncThread.run()", e );
+ continue;
+ }
+ Thread.sleep( 2000 );
+ }
+ } catch ( InterruptedException e ) {
+ LOGGER.debug( "async thread interrupted" );
+ interrupt();
+ }
+ }
+ }
+
+ private static class ChunkUpdate
+ {
+ public final String imageVersionId;
+ public final FileRange range;
+ public final boolean isMissing;
+
+ public ChunkUpdate( String imageVersionId, FileRange range, boolean isMissing )
+ {
+ this.imageVersionId = imageVersionId;
+ this.range = range;
+ this.isMissing = isMissing;
+ }
+ }
+
+ public static void insertChunkList( String imageVersionId, List<FileChunk> all, boolean missing ) throws SQLException
+ {
+ try ( MysqlConnection connection = Database.getConnection() ) {
+ MysqlStatement stmt = connection.prepareStatement( "INSERT IGNORE INTO imageblock"
+ + " (imageversionid, startbyte, blocksize, blocksha1, ismissing) VALUES"
+ + " (:imageversionid, :startbyte, :blocksize, :blocksha1, :ismissing)" );
+ stmt.setString( "imageversionid", imageVersionId );
+ stmt.setBoolean( "ismissing", missing );
+ for ( FileChunk chunk : all ) {
+ stmt.setLong( "startbyte", chunk.range.startOffset );
+ stmt.setInt( "blocksize", chunk.range.getLength() );
+ stmt.setBinary( "blocksha1", chunk.getSha1Sum() );
+ stmt.executeUpdate();
+ }
+ connection.commit();
+ } catch ( SQLException e ) {
+ LOGGER.error( "Query failed in DbImageBlock.insertChunkList()", e );
+ throw e;
+ }
+ }
+
+}
diff --git a/src/main/java/org/openslx/imagemaster/db/mappers/DbUser.java b/src/main/java/org/openslx/imagemaster/db/mappers/DbUser.java
index 9cde273..b6040e7 100644
--- a/src/main/java/org/openslx/imagemaster/db/mappers/DbUser.java
+++ b/src/main/java/org/openslx/imagemaster/db/mappers/DbUser.java
@@ -35,14 +35,14 @@ public class DbUser
}
/**
- * Query database for user with given login
+ * Query database for user with given user id
*
* @param login (global user-id, login@org for test-accounts)
* @return instance of DbUser for matching entry from DB, or null if not
* found
* @throws SQLException if the query fails
*/
- public static LocalUser forLogin( final String login ) throws SQLException
+ public static LocalUser forUserId( final String login ) throws SQLException
{
try ( MysqlConnection connection = Database.getConnection() ) {
MysqlStatement stmt = connection.prepareStatement( localUserSql
@@ -58,22 +58,22 @@ public class DbUser
}
}
- public static UserInfo getUserInfo( final String login ) throws SQLException, TNotFoundException
- {
- LocalUser user = forLogin( login );
- if ( user == null )
- throw new TNotFoundException();
- return user.toUserInfo();
- }
-
- public static LocalUser forLogin( String login, String password ) throws SQLException
+ public static LocalUser forUserId( String login, String password ) throws SQLException
{
- LocalUser user = forLogin( login );
+ LocalUser user = forUserId( login );
if ( user == null || !Sha512Crypt.verifyPassword( password, user.password ) )
return null;
return user;
}
+ public static UserInfo getUserInfo( final String login ) throws SQLException, TNotFoundException
+ {
+ LocalUser user = forUserId( login );
+ if ( user == null )
+ throw new TNotFoundException();
+ return user.toUserInfo();
+ }
+
public static List<UserInfo> findUser( String organizationId, String searchTerm )
{
// TODO Implement
@@ -82,17 +82,22 @@ public class DbUser
public static boolean exists( UserInfo user )
{
+ return exists( user, false );
+ }
+
+ public static boolean exists( UserInfo user, boolean withIdentity )
+ {
if ( user == null )
return false;
- return exists( user.userId );
+ return exists( user.userId, withIdentity );
}
- private static boolean exists( String userId )
+ private static boolean exists( String userId, boolean withIdentitiy )
{
if ( userId == null )
return false;
try {
- return forLogin( userId ) != null;
+ return forUserId( userId ) != null;
} catch ( SQLException e ) {
return false;
}
diff --git a/src/main/java/org/openslx/imagemaster/db/models/LocalUser.java b/src/main/java/org/openslx/imagemaster/db/models/LocalUser.java
index 644373b..bc9289a 100644
--- a/src/main/java/org/openslx/imagemaster/db/models/LocalUser.java
+++ b/src/main/java/org/openslx/imagemaster/db/models/LocalUser.java
@@ -2,6 +2,7 @@ package org.openslx.imagemaster.db.models;
import org.openslx.bwlp.thrift.iface.Role;
import org.openslx.bwlp.thrift.iface.UserInfo;
+import org.openslx.imagemaster.util.Util;
/**
* Represents a user. Should be extended and given an according static method to
@@ -54,5 +55,10 @@ public class LocalUser
{
return login;
}
+
+ public boolean isAnonymous()
+ {
+ return firstName == null || Util.isEmpty( lastName ) || Util.isEmpty( eMail );
+ }
}
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/AbstractTransfer.java b/src/main/java/org/openslx/imagemaster/serverconnection/AbstractTransfer.java
deleted file mode 100644
index 3acac5b..0000000
--- a/src/main/java/org/openslx/imagemaster/serverconnection/AbstractTransfer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package org.openslx.imagemaster.serverconnection;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-public abstract class AbstractTransfer {
-
- /**
- * How long to keep this transfer information when the transfer is
- * (potentially) done
- */
- private static final long FINISH_TIMEOUT = TimeUnit.MINUTES.toMillis(5);
-
- /**
- * How long to keep this transfer information when there are no active
- * connections and the transfer seems unfinished
- */
- private static final long IDLE_TIMEOUT = TimeUnit.HOURS.toMillis(4);
-
- /**
- * Time stamp of when (we think) the transfer finished. Clients can/might
- * not tell us they're done, and simply taking "no active connection" as a
- * sign the download is done might have unwanted effects if the user's
- * connection drops for a minute. If this time stamp (plus a FINISH_TIMEOUT)
- * passed,
- * we consider the download done and flag it for removal.
- * If set to zero, the transfer is not finished, or not assumed to have
- * finished.
- */
- protected final AtomicLong potentialFinishTime = new AtomicLong(0);
-
- /**
- * Time of last activity on this transfer.
- */
- protected final AtomicLong lastActivityTime = new AtomicLong(System.currentTimeMillis());
-
- private final String transferId;
-
- public AbstractTransfer(String transferId) {
- this.transferId = transferId;
- }
-
- /**
- * Returns true if the transfer is considered completed.
- *
- * @param now pass System.currentTimeMillis()
- * @return true if the transfer is considered completed
- */
- public boolean isComplete(long now) {
- long val = potentialFinishTime.get();
- return val != 0 && val + FINISH_TIMEOUT < now;
- }
-
- /**
- * Returns true if there has been no activity on this transfer for a certain
- * amount of time.
- *
- * @param now pass System.currentTimeMillis()
- * @return true if the transfer reached its idle timeout
- */
- public final boolean hasReachedIdleTimeout(long now) {
- return getActiveConnectionCount() == 0 && lastActivityTime.get() + IDLE_TIMEOUT < now;
- }
-
- public final String getId() {
- return transferId;
- }
-
- /**
- * Returns true if this transfer would potentially accept new connections.
- * This should NOT return false if there are too many concurrent
- * connections, as this is used to signal the client whether to keep trying
- * to connect.
- *
- * @return true if this transfer would potentially accept new connections
- */
- public abstract boolean isActive();
-
- /**
- * Cancel this transfer, aborting all active connections and rejecting
- * further incoming ones.
- */
- public abstract void cancel();
-
- /**
- * Returns number of active transfer connections.
- *
- * @return number of active transfer connections
- */
- public abstract int getActiveConnectionCount();
-
-}
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
index 44c8e16..141e17f 100644
--- a/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
+++ b/src/main/java/org/openslx/imagemaster/serverconnection/ConnectionHandler.java
@@ -1,13 +1,15 @@
package org.openslx.imagemaster.serverconnection;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.security.KeyStore;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManager;
@@ -19,17 +21,13 @@ import org.openslx.bwlp.thrift.iface.ImagePublishData;
import org.openslx.bwlp.thrift.iface.InvocationError;
import org.openslx.bwlp.thrift.iface.TInvocationException;
import org.openslx.bwlp.thrift.iface.TTransferRejectedException;
-import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.filetransfer.Downloader;
import org.openslx.filetransfer.IncomingEvent;
import org.openslx.filetransfer.Listener;
import org.openslx.filetransfer.Uploader;
+import org.openslx.filetransfer.util.AbstractTransfer;
import org.openslx.imagemaster.Globals;
-import org.openslx.imagemaster.crcchecker.CrcFile;
-import org.openslx.imagemaster.db.mappers.DbOsVirt;
-import org.openslx.imagemaster.db.mappers.DbUser;
-import org.openslx.imagemaster.util.RandomString;
-import org.openslx.imagemaster.util.Util;
+import org.openslx.util.GrowingThreadPoolExecutor;
/**
* Class to handle all incoming and outgoing connections.
@@ -38,19 +36,24 @@ import org.openslx.imagemaster.util.Util;
public class ConnectionHandler implements IncomingEvent
{
- private static Logger log = Logger.getLogger( ConnectionHandler.class );
- private static SSLContext sslContext;
+ private static final Logger LOGGER = Logger.getLogger( ConnectionHandler.class );
- private static Map<String, AbstractTransfer> pendingIncomingUploads = new ConcurrentHashMap<>();
- private static Map<String, AbstractTransfer> pendingIncomingDownloads = new ConcurrentHashMap<>();
+ private static final int MAX_TRANSFERS = 12;
+
+ private static Map<String, IncomingTransfer> incomingTransfers = new ConcurrentHashMap<>();
+ private static Map<String, AbstractTransfer> outgoingTransfers = new ConcurrentHashMap<>();
private static IncomingEvent eventHandler = new ConnectionHandler();
- private static ThreadPoolExecutor uploadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() );
- private static ThreadPoolExecutor downloadPool = new ThreadPoolExecutor( 0, 5, 6, TimeUnit.MINUTES, new SynchronousQueue<Runnable>() );
+ private final ExecutorService transferPool = new GrowingThreadPoolExecutor( 1, MAX_TRANSFERS * 2, 1, TimeUnit.MINUTES,
+ new SynchronousQueue<Runnable>(),
+ new PrioThreadFactory( "TransferPool", Thread.NORM_PRIORITY - 2 ) );
- private static Listener listener;
+ private static final Listener plainListener;
+ private static final Listener sslListener;
static {
- log.debug( "Starting listener on port " + Globals.getFiletransferPortSsl() );
+ LOGGER.debug( "Starting listener on port " + Globals.getFiletransferPortSsl() );
+ Listener ssl = null;
+ Listener plain = null;
try {
String pathToKeyStore = Globals.getSslKeystoreFile();
char[] passphrase = Globals.getSslKeystorePassword().toCharArray();
@@ -58,94 +61,55 @@ public class ConnectionHandler implements IncomingEvent
keystore.load( new FileInputStream( pathToKeyStore ), passphrase );
KeyManagerFactory kmf = KeyManagerFactory.getInstance( KeyManagerFactory.getDefaultAlgorithm() );
kmf.init( keystore, passphrase );
- sslContext = SSLContext.getInstance( "TLSv1.2" );
+ SSLContext sslContext = SSLContext.getInstance( "TLSv1.2" );
KeyManager[] keyManagers = kmf.getKeyManagers();
sslContext.init( keyManagers, null, null );
- listener = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 );
- listener.start();
+ ssl = new Listener( eventHandler, sslContext, Globals.getFiletransferPortSsl(), Globals.getFiletransferTimeout() * 1000 );
+ ssl.start();
+ plain = new Listener( eventHandler, null, Globals.getFiletransferPortPlain(), Globals.getFiletransferTimeout() * 1000 );
+ plain.start();
} catch ( Exception e ) {
- log.error( "Initialization failed.", e );
+ LOGGER.error( "Initialization failed.", e );
System.exit( 2 );
}
+ sslListener = ssl;
+ plainListener = plain;
}
- /**
- * Checks if this image is already uploading and returns a new list with missing blocks if so.
- * Puts the new image into processing list else.
- *
- * @param serverSessionId The uploading server
- * @param imageData The data of the image
- * @return
- * @throws UploadException If some error occurred during the process
- */
- public static TransferInformation getUploadInfos( ImagePublishData imageData, List<Integer> crcSums )
+ public static IncomingTransfer registerUpload( ImagePublishData img, List<ByteBuffer> blockHashes )
throws TTransferRejectedException, TInvocationException
{
- // check image data
- if ( Util.isEmpty( imageData.imageName ) )
- throw new TInvocationException( InvocationError.INVALID_DATA, "Image name not set" );
- if ( !DbUser.exists( imageData.user ) )
- throw new TInvocationException( InvocationError.INVALID_DATA, "Invalid or missing image owner" );
- if ( DbOsVirt.osExists( imageData.osId ) )
- throw new TInvocationException( InvocationError.INVALID_DATA, "Content operating system not set" );
- if ( DbOsVirt.virtExists( imageData.virtId ) )
- throw new TInvocationException( InvocationError.INVALID_DATA, "Content virtualizer system not set" );
- if ( imageData.fileSize <= 0 )
- throw new TInvocationException( InvocationError.INVALID_DATA, "File size is too small" );
-
- log.debug( "A satellite is submitting " + imageData.imageVersionId );
-
- final String uuid = imageData.imageVersionId;
- final String filepathRelative;
- final CrcFile crcFile;
- if ( crcSums == null ) {
- crcFile = null;
- } else {
- crcFile = new CrcFile( crcSums );
- }
- ImagePublishData image;
-
- synchronized ( pendingIncomingUploads ) {
- /*
- // check if image is already uploading
- if ( ( image = uploadingImages.get( uuid ) ) == null ) {
- // TODO insert new image to DB
- uploadingImages.put( uuid, image );
+ IncomingTransfer transfer;
+ synchronized ( incomingTransfers ) {
+ transfer = incomingTransfers.get( img.imageVersionId );
+ if ( transfer == null ) {
+ if ( getUploadConnectionCount() >= MAX_TRANSFERS ) {
+ throw new TTransferRejectedException( "Too many active transfers" );
+ }
+ try {
+ transfer = new IncomingTransfer( img, blockHashes );
+ } catch ( FileNotFoundException e ) {
+ LOGGER.warn( "Cannot init download", e );
+ throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "File access error" );
+ }
+ incomingTransfers.put( transfer.getId(), transfer );
+ incomingTransfers.put( img.imageVersionId, transfer );
}
- */
}
-
- final String token = RandomString.generate( 50, false );
-
- // TODO addUpload( token, image );
- // TODO Set crc file on image - if there is already a crc file assigned, this does nothing
- return new TransferInformation( token, Globals.getFiletransferPortPlain(), Globals.getFiletransferPortSsl() );
- }
-
- /**
- * Add a new allowed incoming upload connection
- * for the given token and image.
- *
- * @param token The unique token
- * @param image Image being uploaded
- */
- public static void addUpload( String token, AbstractTransfer image )
- {
- pendingIncomingUploads.put( token, image );
- log.debug( "Added upload" );
+ return transfer;
}
- /**
- * Add a new allowed incoming download connection
- * for the given token and image.
- *
- * @param token The unique token
- * @param image Image being uploaded
- */
- public static void addDownload( String token, AbstractTransfer image )
+ public static IncomingTransfer getExistingUpload( ImagePublishData imageData, List<ByteBuffer> crcSums )
+ throws TTransferRejectedException
{
- pendingIncomingDownloads.put( token, image );
- log.debug( "Added download" );
+ IncomingTransfer transfer = incomingTransfers.get( imageData.imageVersionId );
+ if ( transfer == null )
+ return null;
+ if ( transfer.getFileSize() != imageData.fileSize )
+ throw new TTransferRejectedException( "File size mismatch" );
+ if ( !transfer.hashesEqual( crcSums ) )
+ throw new TTransferRejectedException( "Block hashes mismatch" );
+ return transfer;
}
/**
@@ -165,8 +129,32 @@ public class ConnectionHandler implements IncomingEvent
@Override
public void incomingUploadRequest( final Downloader downloader ) throws IOException
{
- // TODO
- downloader.sendErrorCode( "Too many concurrent downloads." );
- downloader.cancel();
+ IncomingTransfer transfer = incomingTransfers.get( downloader.getToken() );
+ if ( transfer == null ) {
+ downloader.sendErrorCode( "Unknown upload token." );
+ downloader.cancel();
+ return;
+ }
+ if ( getUploadConnectionCount() >= MAX_TRANSFERS ) {
+ downloader.sendErrorCode( "Too many concurrent uploads." );
+ downloader.cancel();
+ return;
+ }
+ if ( !transfer.addConnection( downloader, transferPool ) ) {
+ downloader.cancel();
+ }
}
+
+ public static int getUploadConnectionCount()
+ {
+ final long now = System.currentTimeMillis();
+ int active = 0;
+ for ( IncomingTransfer t : incomingTransfers.values() ) {
+ if ( t.countsTowardsConnectionLimit( now ) ) {
+ active += t.getActiveConnectionCount();
+ }
+ }
+ return active;
+ }
+
}
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java b/src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java
new file mode 100644
index 0000000..bfc65e1
--- /dev/null
+++ b/src/main/java/org/openslx/imagemaster/serverconnection/IncomingTransfer.java
@@ -0,0 +1,81 @@
+package org.openslx.imagemaster.serverconnection;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+
+import org.openslx.bwlp.thrift.iface.ImagePublishData;
+import org.openslx.bwlp.thrift.iface.TInvocationException;
+import org.openslx.bwlp.thrift.iface.TransferInformation;
+import org.openslx.filetransfer.util.ChunkStatus;
+import org.openslx.filetransfer.util.FileChunk;
+import org.openslx.filetransfer.util.IncomingTransferBase;
+import org.openslx.imagemaster.Globals;
+import org.openslx.imagemaster.db.mappers.DbImageBlock;
+import org.openslx.imagemaster.util.Util;
+import org.openslx.util.ThriftUtil;
+
+public class IncomingTransfer extends IncomingTransferBase
+{
+
+ private static final long MIN_FREE_SPACE_BYTES = FileChunk.CHUNK_SIZE * 10;
+
+ private final String imageVersionId;
+
+ public IncomingTransfer( ImagePublishData img, List<ByteBuffer> blockHashes )
+ throws TInvocationException, FileNotFoundException
+ {
+ super( UUID.randomUUID().toString(), new File( new File( Globals.getImageDir(), img.imageBaseId ), img.imageVersionId ),
+ img.fileSize, ThriftUtil.unwrapByteBufferList( blockHashes ) );
+ this.imageVersionId = img.imageVersionId;
+ }
+
+ @Override
+ public String getRelativePath()
+ {
+ return Util.getRelativePath( getTmpFileName(), new File( Globals.getImageDir() ) );
+ }
+
+ @Override
+ public synchronized void cancel()
+ {
+ super.cancel();
+ getTmpFileName().delete();
+ }
+
+ @Override
+ protected boolean hasEnoughFreeSpace()
+ {
+ long space = Globals.getImagePath().getUsableSpace();
+ return space > MIN_FREE_SPACE_BYTES;
+ }
+
+ @Override
+ protected boolean finishIncomingTransfer()
+ {
+ potentialFinishTime.set( System.currentTimeMillis() );
+ return true;
+ }
+
+ @Override
+ public TransferInformation getTransferInfo()
+ {
+ return new TransferInformation( getId(), Globals.getFiletransferPortPlain(), Globals.getFiletransferPortSsl() );
+ }
+
+ @Override
+ protected void chunkStatusChanged( FileChunk chunk )
+ {
+ ChunkStatus status = chunk.getStatus();
+ if ( status == ChunkStatus.MISSING || status == ChunkStatus.COMPLETE ) {
+ try {
+ DbImageBlock.asyncUpdate( imageVersionId, chunk );
+ } catch ( InterruptedException e ) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/org/openslx/imagemaster/serverconnection/PrioThreadFactory.java b/src/main/java/org/openslx/imagemaster/serverconnection/PrioThreadFactory.java
new file mode 100644
index 0000000..5fa9da4
--- /dev/null
+++ b/src/main/java/org/openslx/imagemaster/serverconnection/PrioThreadFactory.java
@@ -0,0 +1,24 @@
+package org.openslx.imagemaster.serverconnection;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PrioThreadFactory implements ThreadFactory {
+
+ private final AtomicInteger counter = new AtomicInteger();
+ private final String name;
+ private final int priority;
+
+ public PrioThreadFactory(String name, int priority) {
+ this.name = name;
+ this.priority = priority;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r, name + "-" + counter.incrementAndGet());
+ thread.setPriority(priority);
+ return thread;
+ }
+
+}
diff --git a/src/main/java/org/openslx/imagemaster/session/Authenticator.java b/src/main/java/org/openslx/imagemaster/session/Authenticator.java
index ea7e581..c11e597 100644
--- a/src/main/java/org/openslx/imagemaster/session/Authenticator.java
+++ b/src/main/java/org/openslx/imagemaster/session/Authenticator.java
@@ -36,7 +36,7 @@ public class Authenticator
LocalUser user;
try {
- user = DbUser.forLogin( login, password );
+ user = DbUser.forUserId( login, password );
} catch ( SQLException e ) {
throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "Could not connect to database" );
} // throws exception if credentials are invalid
diff --git a/src/main/java/org/openslx/imagemaster/session/SessionManager.java b/src/main/java/org/openslx/imagemaster/session/SessionManager.java
index c12334a..c141d24 100644
--- a/src/main/java/org/openslx/imagemaster/session/SessionManager.java
+++ b/src/main/java/org/openslx/imagemaster/session/SessionManager.java
@@ -81,7 +81,7 @@ public class SessionManager
}, 123, TimeUnit.MINUTES.toMillis( 13 ) );
}
- public static Object getSessionFromSessionId( String sessionId )
+ public static Session getSessionFromSessionId( String sessionId )
{
if ( sessionId == null || sessionId.length() != 64 ) {
log.debug( "invalid sessionid format: " + sessionId );
diff --git a/src/main/java/org/openslx/imagemaster/thrift/server/BinaryListener.java b/src/main/java/org/openslx/imagemaster/thrift/server/BinaryListener.java
index a17f216..bf015a1 100644
--- a/src/main/java/org/openslx/imagemaster/thrift/server/BinaryListener.java
+++ b/src/main/java/org/openslx/imagemaster/thrift/server/BinaryListener.java
@@ -102,7 +102,7 @@ public class BinaryListener implements Runnable
THsHaServer.Args args = new THsHaServer.Args( serverTransport );
args.protocolFactory( protFactory );
args.processor( processor );
- args.workerThreads( 8 );
+ args.minWorkerThreads( 2 ).maxWorkerThreads( 6 );
args.maxReadBufferBytes = MAX_MSG_LEN;
return new THsHaServer( args );
}
diff --git a/src/main/java/org/openslx/imagemaster/thrift/server/MasterServerHandler.java b/src/main/java/org/openslx/imagemaster/thrift/server/MasterServerHandler.java
index ed1db56..60f4ccb 100644
--- a/src/main/java/org/openslx/imagemaster/thrift/server/MasterServerHandler.java
+++ b/src/main/java/org/openslx/imagemaster/thrift/server/MasterServerHandler.java
@@ -1,5 +1,6 @@
package org.openslx.imagemaster.thrift.server;
+import java.io.File;
import java.nio.ByteBuffer;
import java.security.Key;
import java.security.NoSuchAlgorithmException;
@@ -9,6 +10,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
import org.openslx.bwlp.thrift.iface.AuthorizationError;
import org.openslx.bwlp.thrift.iface.ClientSessionData;
import org.openslx.bwlp.thrift.iface.ImagePublishData;
@@ -30,12 +32,20 @@ import org.openslx.bwlp.thrift.iface.TransferInformation;
import org.openslx.bwlp.thrift.iface.UserInfo;
import org.openslx.bwlp.thrift.iface.Virtualizer;
import org.openslx.encryption.AsymKeyHolder;
+import org.openslx.filetransfer.util.FileChunk;
+import org.openslx.imagemaster.Globals;
+import org.openslx.imagemaster.db.Database;
+import org.openslx.imagemaster.db.mappers.DbImage;
+import org.openslx.imagemaster.db.mappers.DbImageBlock;
import org.openslx.imagemaster.db.mappers.DbOrganization;
import org.openslx.imagemaster.db.mappers.DbOsVirt;
import org.openslx.imagemaster.db.mappers.DbPendingSatellite;
import org.openslx.imagemaster.db.mappers.DbSatellite;
import org.openslx.imagemaster.db.mappers.DbUser;
import org.openslx.imagemaster.db.models.LocalSatellite;
+import org.openslx.imagemaster.db.models.LocalUser;
+import org.openslx.imagemaster.serverconnection.ConnectionHandler;
+import org.openslx.imagemaster.serverconnection.IncomingTransfer;
import org.openslx.imagemaster.serversession.ServerAuthenticator;
import org.openslx.imagemaster.serversession.ServerSession;
import org.openslx.imagemaster.serversession.ServerSessionManager;
@@ -95,6 +105,18 @@ public class MasterServerHandler implements MasterServer.Iface
return SessionManager.addSession( session );
}
+ /**
+ * User tells us which satellite they connected to.
+ */
+ @Override
+ public void setUsedSatellite( String sessionId, String satelliteName )
+ {
+ Session session = SessionManager.getSessionFromSessionId( sessionId );
+ if ( session == null )
+ return;
+ //session.setUsedSatellite( satelliteName );
+ }
+
@Override
public List<UserInfo> findUser( String sessionId, String organizationId, String searchTerm )
throws TAuthorizationException, TInvocationException
@@ -197,11 +219,90 @@ public class MasterServerHandler implements MasterServer.Iface
}
@Override
- public TransferInformation submitImage( String serverSessionId, ImagePublishData imageDescription, List<ByteBuffer> blockHashes )
+ public TransferInformation submitImage( String userToken, ImagePublishData img, List<ByteBuffer> blockHashes )
throws TAuthorizationException, TInvocationException, TTransferRejectedException
{
- // TODO Auto-generated method stub
- return null;
+ // Valid submit session?
+ Session session = SessionManager.getSessionFromToken( userToken );
+ if ( session == null )
+ throw new TAuthorizationException( AuthorizationError.INVALID_TOKEN, "Given user token not known to the server" );
+ // check image data
+ if ( Util.isEmpty( img.imageName ) )
+ throw new TInvocationException( InvocationError.INVALID_DATA, "Image name not set" );
+ if ( img.fileSize <= 0 )
+ throw new TInvocationException( InvocationError.INVALID_DATA, "File size is too small" );
+ if ( !Util.isUUID( img.imageBaseId ) )
+ throw new TInvocationException( InvocationError.MISSING_DATA, "ImagePublishData has invalid imageBaseId" );
+ if ( !Util.isUUID( img.imageVersionId ) )
+ throw new TInvocationException( InvocationError.MISSING_DATA, "ImagePublishData has invalid imageVersionId" );
+ if ( img.user == null || img.user.userId == null )
+ throw new TInvocationException( InvocationError.MISSING_DATA, "Missing user id" );
+ // check for complete block hash list
+ boolean listComplete = false;
+ if ( blockHashes != null && blockHashes.size() == FileChunk.fileSizeToChunkCount( img.fileSize ) ) {
+ listComplete = true;
+ for ( ByteBuffer bb : blockHashes ) {
+ if ( bb == null || bb.remaining() != FileChunk.SHA1_LENGTH ) {
+ listComplete = false;
+ break;
+ }
+ }
+ }
+ if ( !listComplete )
+ throw new TInvocationException( InvocationError.INVALID_DATA, "Chunk hash list missing or incomplete" );
+ // Check if an upload is already assigned
+ IncomingTransfer existingUpload = ConnectionHandler.getExistingUpload( img, blockHashes );
+ if ( existingUpload != null ) {
+ return existingUpload.getTransferInfo();
+ }
+ // No existing upload - create new one
+ // checks that hit the db
+ if ( !DbOsVirt.osExists( img.osId ) )
+ throw new TInvocationException( InvocationError.INVALID_DATA, "Content operating system not set" );
+ if ( !DbOsVirt.virtExists( img.virtId ) )
+ throw new TInvocationException( InvocationError.INVALID_DATA, "Content virtualizer system not set" );
+ try {
+ LocalUser user = DbUser.forUserId( img.user.userId );
+ if ( user == null ) {
+ user = DbUser.forUserId( session.getUserInfo().userId );
+ if ( user != null ) {
+ img.user = user.toUserInfo();
+ }
+ }
+ if ( user == null )
+ throw new TInvocationException( InvocationError.UNKNOWN_USER, "Unknown user id " + img.user.userId );
+ if ( user.isAnonymous() )
+ throw new TInvocationException( InvocationError.UNKNOWN_USER, "The owner of the image does not participate in image exchange" );
+ } catch ( SQLException e ) {
+ throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "Database error" );
+ }
+ // Make sure we have a destination to write to
+ if ( !new File( Globals.getImageDir() ).isDirectory() )
+ throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "Storage offline" );
+ // Try to register an upload
+ IncomingTransfer transfer = ConnectionHandler.registerUpload( img, blockHashes );
+ try {
+ DbImage.createImageBase( img );
+ } catch ( TException t ) {
+ transfer.cancel();
+ throw t;
+ }
+ try {
+ DbImage.createImageVersion( img, transfer.getRelativePath() );
+ } catch ( SQLException e1 ) {
+ transfer.cancel();
+ if ( Database.isDuplicateKeyException( e1 ) ) {
+ throw new TInvocationException( InvocationError.INVALID_DATA, "The image already exists on the server" );
+ } else {
+ throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "Database error" );
+ }
+ }
+ try {
+ DbImageBlock.insertChunkList( img.imageVersionId, transfer.getChunks().getAll(), true );
+ } catch ( SQLException e ) {
+ LOGGER.warn( "Could not insert block hashes of image " + img.imageVersionId + " to db" );
+ }
+ return transfer.getTransferInfo();
}
@Override
@@ -218,7 +319,7 @@ public class MasterServerHandler implements MasterServer.Iface
try {
return DbOrganization.getAll();
} catch ( SQLException e ) {
- throw new TInvocationException();
+ throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "Database error" );
}
}
@@ -228,7 +329,7 @@ public class MasterServerHandler implements MasterServer.Iface
try {
return DbOsVirt.getOsList();
} catch ( SQLException e ) {
- throw new TInvocationException();
+ throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "Database error" );
}
}
@@ -238,7 +339,7 @@ public class MasterServerHandler implements MasterServer.Iface
try {
return DbOsVirt.getVirtualizerList();
} catch ( SQLException e ) {
- throw new TInvocationException();
+ throw new TInvocationException( InvocationError.INTERNAL_SERVER_ERROR, "Database error" );
}
}
@@ -273,10 +374,6 @@ public class MasterServerHandler implements MasterServer.Iface
LOGGER.warn( "Invalid public key in registerOrganization for " + organizationId + " (By " + session.getLogin() + ")", e );
throw new TInvocationException( InvocationError.INVALID_DATA, "Cannot reconstruct public key" );
}
- if ( newKey == null ) {
- LOGGER.warn( "Uninstantiable public key in registerOrganization for " + organizationId + " (By " + session.getLogin() + ")" );
- throw new TInvocationException( InvocationError.INVALID_DATA, "Cannot reconstruct public key" );
- }
LocalSatellite existing = DbSatellite.get( organizationId, displayName );
if ( existing != null ) {
Key existingKey = existing.getPubkey();
diff --git a/src/main/java/org/openslx/imagemaster/util/Util.java b/src/main/java/org/openslx/imagemaster/util/Util.java
index e9146e0..f8d9248 100644
--- a/src/main/java/org/openslx/imagemaster/util/Util.java
+++ b/src/main/java/org/openslx/imagemaster/util/Util.java
@@ -7,6 +7,7 @@ import java.security.interfaces.RSAPrivateKey;
import java.security.interfaces.RSAPublicKey;
import java.util.Arrays;
import java.util.Random;
+import java.util.UUID;
import org.apache.log4j.Logger;
@@ -120,7 +121,7 @@ public class Util
public static boolean isEmpty( String str )
{
- return str != null && !str.isEmpty();
+ return str == null || str.isEmpty();
}
public static boolean isEmpty( String str, String message, Logger logger )
@@ -217,4 +218,30 @@ public class Util
return Arrays.equals( k1.getEncoded(), k2.getEncoded() );
}
+ public static String getRelativePath( File absolutePath, File parentDir )
+ {
+ String file;
+ String dir;
+ try {
+ file = absolutePath.getCanonicalPath();
+ dir = parentDir.getCanonicalPath() + File.separator;
+ } catch ( Exception e ) {
+ LOGGER.error( "Could not get relative path for " + absolutePath.toString(), e );
+ return null;
+ }
+ if ( !file.startsWith( dir ) )
+ return null;
+ return file.substring( dir.length() );
+ }
+
+ public static boolean isUUID( String id )
+ {
+ try {
+ UUID.fromString( id );
+ } catch ( Exception e ) {
+ return false;
+ }
+ return true;
+ }
+
}