package org.openslx.taskmanager.tasks; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.util.Strings; import org.openslx.satserver.util.MessageSink; import org.openslx.satserver.util.Util; import org.openslx.satserver.util.WakeOnLanExecutor; import org.openslx.taskmanager.api.AbstractTask; import org.openslx.taskmanager.tasks.RemoteExec.Output; import org.openslx.taskmanager.tasks.RemoteExec.Result; import org.openslx.util.PrioThreadFactory; import; public class WakeOnLan extends AbstractTask { /** * List of clients to wake up */ @Expose private List clients; /** * SHH data (key, command) for waking via another host */ @Expose private Map ssh; /** * UDP port to send WOL packets to */ @Expose private int port; private StatusObject status; @Override protected boolean initTask() { status = new StatusObject(); this.setStatusObject( status ); if ( clients == null ) { status.addMsg( "Clients null" ); } else { // Clean for ( Iterator it = this.clients.iterator(); it.hasNext(); ) { Machine client =; if ( client == null || Util.isEmpty( client.mac ) || Util.isEmpty( client.ip ) || client.methods == null || client.methods.isEmpty() ) { it.remove(); } } if ( clients.isEmpty() ) { status.addMsg( "Clients empty" ); } } if ( ssh == null ) { ssh = new HashMap<>( 1 ); } if ( !Util.isEmpty( status.messages ) ) return false; return true; } @Override protected boolean execute() { // Loop over clients until they all were handled (i.e. methods is empty) ExecutorService tp = Executors.newFixedThreadPool( Math.max( 1, Math.min( 4, ssh.size() ) ), new PrioThreadFactory( "WOL" ) ); Map> byMethod; int jobIdCounter = 0; do { byMethod = new HashMap<>(); // Fetch next method for all clients for ( Machine client : clients ) { if ( client.methods.isEmpty() ) continue; // Group by method and destination IP address String method = client.methods.remove( 0 ) + "//" + client.ip + "//" + ( client.password == null ? "" : client.password ); ArrayList list = byMethod.get( method ); if ( list == null ) { list = new ArrayList<>(); byMethod.put( method, list ); } list.add( client.mac ); } if ( byMethod.isEmpty() ) break; status.addMsg( "Starting job distribution..." ); // Execute List> waitList = new ArrayList<>(); for ( Entry> it : byMethod.entrySet() ) { final String[] parts = it.getKey().split( "//" ); final String method = parts[0]; final List macs = it.getValue(); final String macString = Strings.join( macs.iterator(), ' ' ); final String broadcastIp = parts[1]; final String password = parts.length >= 3 ? parts[2] : ""; final int jobId = ++jobIdCounter; if ( method.equalsIgnoreCase( "DIRECT" ) ) { // Directly from server waitList.add( tp.submit( () -> { status.addMsg( jobId + ": Waking directly from server: " + macString ); WakeOnLanExecutor wol = new WakeOnLanExecutor( password ); String[] success = wol.execute( status, macs, broadcastIp, this.port ); markSuccess( Arrays.asList( success ) ); if ( success.length == macs.size() ) { status.addMsg( jobId + ": Done" ); } else { status.addMsg( jobId + ": Some failed" ); } } ) ); } else if ( this.ssh.containsKey( method ) ) { // Via SSH waitList.add( tp.submit( () -> { SshData sshData = this.ssh.get( method ); status.addMsg( jobId + ": Waking via SSH from " + sshData.ip + " to " + broadcastIp + ": " + macString ); String command = sshData.command.replace( "%MACS%", macString ) .replace( "%IP%", broadcastIp ) .replace( "%PASSWORD%", password ); RemoteExec.Client c = new RemoteExec.Client( null, sshData.ip, sshData.port, sshData.username ); RemoteExec task = new RemoteExec( new RemoteExec.Client[] { c }, sshData.sshkey, sshData.port, command, 5 ); task.execute(); Output s = task.getStatusObject(); if ( s == null ) { status.addMsg( jobId + ": Task status is null, considering failed" ); } else { if ( !Util.isEmpty( s.error ) ) { status.addMsg( s.error ); } if ( s.result != null && s.result.containsKey( sshData.ip ) && s.result.get( sshData.ip ).exitCode == 0 ) { markSuccess( macs ); status.addMsg( jobId + ": Done" ); } else { if ( s.result != null ) { for ( Result sshClient : s.result.values() ) { if ( sshClient == null ) continue; status.addMsg( jobId + ": stdout: " + sshClient.stdout + "\n" + jobId + ": stderr: " + sshClient.stderr ); } } status.addMsg( jobId + ": Non-zero exit code, considering failed" ); } } } ) ); } else { status.addMsg( "Ignoring unknown SSH method '" + method + "'" ); } } // Wait for all jobs for ( Future f : waitList ) { try { f.get(); } catch ( InterruptedException e ) { status.addMsg( "Threadpool got interrupted" ); Thread.currentThread().interrupt(); return false; } catch ( ExecutionException e ) { status.addMsg( "A treadpool job threw an exception" ); e.printStackTrace(); } } } while ( true ); tp.shutdown(); try { tp.awaitTermination( 10, TimeUnit.SECONDS ); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); } status.addMsg( "Mainloop done" ); if ( this.clients.isEmpty() ) return true; status.addMsg( "Failed clients:" ); for ( Machine c : clients ) { status.addMsg( c.ip ); } return false; } private void markSuccess( Collection macs ) { synchronized ( this.clients ) { for ( Iterator it = this.clients.iterator(); it.hasNext(); ) { Machine client =; if ( macs.contains( client.mac ) ) { it.remove(); } } } } static class Machine { /** * Destination IP (for directed broadcast) */ @Expose String ip; /** * Destination MAC */ @Expose String mac; /** * Desired WOL modes, from most to least preferred. either a key to the top level ssh map, * or "DIRECT" for sending a WOL packet directly from the server */ @Expose List methods; /** * WOL password */ @Expose private String password; } static class SshData { @Expose String username; @Expose String sshkey; @Expose String ip; @Expose int port; @Expose String command; } /* * */ /** * Return any messages about progress and status */ static class StatusObject implements MessageSink { private String messages = ""; public synchronized void addMsg( String str ) { messages = messages + "\n" + str; } } }