package org.openslx.taskmanager.tasks;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.logging.log4j.util.Strings;
import org.openslx.satserver.util.MessageSink;
import org.openslx.satserver.util.Util;
import org.openslx.satserver.util.WakeOnLanExecutor;
import org.openslx.taskmanager.api.AbstractTask;
import org.openslx.taskmanager.tasks.RemoteExec.Output;
import org.openslx.taskmanager.tasks.RemoteExec.Result;
import com.google.gson.annotations.Expose;
public class WakeOnLan extends AbstractTask
{
/**
* List of clients to wake up
*/
@Expose
private List<Machine> clients;
/**
* SHH data (key, command) for waking via another host
*/
@Expose
private Map<String, SshData> ssh;
/**
* UDP port to send WOL packets to
*/
@Expose
private int port;
private StatusObject status;
@Override
protected boolean initTask()
{
status = new StatusObject();
this.setStatusObject( status );
if ( clients == null ) {
status.addMsg( "Clients null" );
} else {
// Clean
for ( Iterator<Machine> it = this.clients.iterator(); it.hasNext(); ) {
Machine client = it.next();
if ( client == null || Util.isEmpty( client.mac ) || Util.isEmpty( client.ip )
|| client.methods == null || client.methods.isEmpty() ) {
it.remove();
}
}
if ( clients.isEmpty() ) {
status.addMsg( "Clients empty" );
}
}
if ( ssh == null ) {
ssh = new HashMap<>( 1 );
}
if ( !Util.isEmpty( status.messages ) )
return false;
return true;
}
@Override
protected boolean execute()
{
// Loop over clients until they all were handled (i.e. methods is empty)
ExecutorService tp = Executors.newFixedThreadPool( Math.max( 1, Math.min( 4, ssh.size() ) ) );
Map<String, ArrayList<String>> byMethod;
int jobIdCounter = 0;
do {
byMethod = new HashMap<>();
// Fetch next method for all clients
for ( Machine client : clients ) {
if ( client.methods.isEmpty() )
continue;
// Group by method and destination IP address
String method = client.methods.remove( 0 ) + "//" + client.ip
+ "//" + ( client.password == null ? "" : client.password );
ArrayList<String> list = byMethod.get( method );
if ( list == null ) {
list = new ArrayList<>();
byMethod.put( method, list );
}
list.add( client.mac );
}
if ( byMethod.isEmpty() )
break;
status.addMsg( "Starting job distribution..." );
// Execute
List<Future<?>> waitList = new ArrayList<>();
for ( Entry<String, ArrayList<String>> it : byMethod.entrySet() ) {
final String[] parts = it.getKey().split( "//" );
final String method = parts[0];
final List<String> macs = it.getValue();
final String macString = Strings.join( macs.iterator(), ' ' );
final String broadcastIp = parts[1];
final String password = parts.length >= 3 ? parts[2] : "";
final int jobId = ++jobIdCounter;
if ( method.equalsIgnoreCase( "DIRECT" ) ) {
// Directly from server
waitList.add( tp.submit( () -> {
status.addMsg( jobId + ": Waking directly from server: " + macString );
WakeOnLanExecutor wol = new WakeOnLanExecutor( password );
String[] success = wol.execute( status, macs, broadcastIp, this.port );
markSuccess( Arrays.asList( success ) );
if ( success.length == macs.size() ) {
status.addMsg( jobId + ": Done" );
} else {
status.addMsg( jobId + ": Some failed" );
}
} ) );
} else if ( this.ssh.containsKey( method ) ) {
// Via SSH
waitList.add( tp.submit( () -> {
SshData sshData = this.ssh.get( method );
status.addMsg( jobId + ": Waking via SSH from " + sshData.ip + " to " + broadcastIp + ": " + macString );
String command = sshData.command.replace( "%MACS%", macString )
.replace( "%IP%", broadcastIp )
.replace( "%PASSWORD%", password );
RemoteExec.Client c = new RemoteExec.Client( null, sshData.ip, sshData.port, sshData.username );
RemoteExec task = new RemoteExec( new RemoteExec.Client[] { c }, sshData.sshkey, sshData.port, command, 5 );
task.execute();
Output s = task.getStatusObject();
if ( s == null ) {
status.addMsg( jobId + ": Task status is null, considering failed" );
} else {
if ( !Util.isEmpty( s.error ) ) {
status.addMsg( s.error );
}
if ( s.result != null && s.result.containsKey( sshData.ip ) && s.result.get( sshData.ip ).exitCode == 0 ) {
markSuccess( macs );
status.addMsg( jobId + ": Done" );
} else {
if ( s.result != null ) {
for ( Result sshClient : s.result.values() ) {
if ( sshClient == null )
continue;
status.addMsg( jobId + ": stdout: " + sshClient.stdout + "\n"
+ jobId + ": stderr: " + sshClient.stderr );
}
}
status.addMsg( jobId + ": Non-zero exit code, considering failed" );
}
}
} ) );
} else {
status.addMsg( "Ignoring unknown SSH method '" + method + "'" );
}
}
// Wait for all jobs
for ( Future<?> f : waitList ) {
try {
f.get();
} catch ( InterruptedException e ) {
status.addMsg( "Threadpool got interrupted" );
Thread.currentThread().interrupt();
return false;
} catch ( ExecutionException e ) {
status.addMsg( "A treadpool job threw an exception" );
e.printStackTrace();
}
}
} while ( true );
status.addMsg( "Mainloop done" );
if ( this.clients.isEmpty() )
return true;
status.addMsg( "Failed clients:" );
for ( Machine c : clients ) {
status.addMsg( c.ip );
}
return false;
}
private void markSuccess( Collection<String> macs )
{
synchronized ( this.clients ) {
for ( Iterator<Machine> it = this.clients.iterator(); it.hasNext(); ) {
Machine client = it.next();
if ( macs.contains( client.mac ) ) {
it.remove();
}
}
}
}
static class Machine
{
/**
* Destination IP (for directed broadcast)
*/
@Expose
String ip;
/**
* Destination MAC
*/
@Expose
String mac;
/**
* Desired WOL modes, from most to least preferred. either a key to the top level ssh map,
* or "DIRECT" for sending a WOL packet directly from the server
*/
@Expose
List<String> methods;
/**
* WOL password
*/
@Expose
private String password;
}
static class SshData
{
@Expose
String username;
@Expose
String sshkey;
@Expose
String ip;
@Expose
int port;
@Expose
String command;
}
/*
*
*/
/**
* Return any messages about progress and status
*/
static class StatusObject implements MessageSink
{
private String messages = "";
public synchronized void addMsg( String str )
{
messages = messages + "\n" + str;
}
}
}