From c0f8e66247911138724607459143d8875bff5d69 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Mon, 24 Jan 2022 20:11:30 +0100 Subject: [WakeOnLan] Make it multi-staged, so failed clients can be retried Clients can now be specified with one or more ways to be woken up. The methods are tried in order, until one is found that succeeds. In case of waking a client via another client using SSH, this means that connection to the intermediate client was successful, and the provided command could be run on that client and returned 0. For directed broadcasts from the server, we consider a successful .send() on the UDP socket success, as we have no way of knowing whether it reached the destination subnet/client, but assuming the reachability data in slx-admin is accurate, this can be assumed to be true. --- .../org/openslx/satserver/util/MessageSink.java | 8 + .../openslx/satserver/util/WakeOnLanExecutor.java | 136 ++++++++++ .../org/openslx/taskmanager/tasks/RemoteExec.java | 36 ++- .../org/openslx/taskmanager/tasks/WakeOnLan.java | 281 ++++++++++++--------- 4 files changed, 345 insertions(+), 116 deletions(-) create mode 100644 src/main/java/org/openslx/satserver/util/MessageSink.java create mode 100644 src/main/java/org/openslx/satserver/util/WakeOnLanExecutor.java diff --git a/src/main/java/org/openslx/satserver/util/MessageSink.java b/src/main/java/org/openslx/satserver/util/MessageSink.java new file mode 100644 index 0000000..bda7410 --- /dev/null +++ b/src/main/java/org/openslx/satserver/util/MessageSink.java @@ -0,0 +1,8 @@ +package org.openslx.satserver.util; + +public interface MessageSink +{ + + public void addMsg( String str ); + +} diff --git a/src/main/java/org/openslx/satserver/util/WakeOnLanExecutor.java b/src/main/java/org/openslx/satserver/util/WakeOnLanExecutor.java new file mode 100644 index 0000000..1ee2f81 --- /dev/null +++ b/src/main/java/org/openslx/satserver/util/WakeOnLanExecutor.java @@ -0,0 +1,136 @@ +package org.openslx.satserver.util; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class WakeOnLanExecutor +{ + + private static final Pattern RE_IPv4 = Pattern.compile( "^(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)$" ); + private static final Pattern RE_MAC = Pattern.compile( "^([0-9a-f]{2})[:-]([0-9a-f]{2}+)[:-]([0-9a-f]{2}+)[:-]([0-9a-f]{2}+)[:-]([0-9a-f]{2}+)[:-]([0-9a-f]{2}+)$", + Pattern.CASE_INSENSITIVE ); + + private byte[] buffer; + + public WakeOnLanExecutor( String password ) throws RuntimeException + { + Matcher m = null; + int pwlen = 0; + if ( !Util.isEmpty( password ) ) { + m = RE_IPv4.matcher( password ); + if ( m.matches() ) { + pwlen = 4; + } else { + m = RE_MAC.matcher( password ); + if ( m.matches() ) { + pwlen = 6; + } + } + if ( pwlen == 0 ) { + throw new RuntimeException( "Invalid password format: " + password ); + } + } + buffer = new byte[ 17 * 6 + pwlen ]; + // 6 Sync bytes + for ( int i = 0; i < 6; ++i ) { + buffer[i] = -1; + } + if ( pwlen != 0 ) { + // Append pw + try { + for ( int i = 0; i < pwlen; ++i ) { + int x = Integer.parseInt( m.group( i + 1 ), pwlen == 4 ? 10 : 16 ); + buffer[17 * 6 + i] = (byte)x; + } + } catch ( Throwable t ) { + throw new RuntimeException( "Invalid octet in password" ); + } + } + } + + /** + * Send out a bunch of WOL packets to the given list of MAC addresses. Every packet will be + * directed to given ip, i.e. all destination MACs have to be in the same subnet. + * + * @param log For adding log outout + * @param macs list of MAC addresses + * @param ip destination IP (broadcast or directed broadcast address) + * @param port destination port, usually 9 + * @return list of MACs the packet has successfully been sent to + */ + public String[] execute( MessageSink log, Collection macs, String ip, int port ) + { + if ( port == 0 ) { + port = 9; + } else if ( port > 65535 || port < 0 ) { + log.addMsg( "Invalid port: " + port + " for " + ip ); + return new String[ 0 ]; + } + + DatagramSocket sock = null; + InetAddress destAddr = null; + Set success = new HashSet<>(); + try { + try { + destAddr = InetAddress.getByName( ip ); + sock = new DatagramSocket(); + sock.setBroadcast( true ); + } catch ( SocketException e ) { + log.addMsg( "Cannot create UDP socket" ); + return new String[ 0 ]; + } catch ( UnknownHostException e ) { + log.addMsg( "Cannot resolve " + ip ); + return new String[ 0 ]; + } + + // Repeat three times + for ( int reps = 0; reps < 3; ++reps ) { + if ( reps != 0 ) { + Thread.sleep( 400 ); + } + // For each MAC, send WOL packet + for ( String mac : macs ) { + Matcher m = RE_MAC.matcher( mac ); + if ( !m.matches() ) { + log.addMsg( "Cannot parse MAC address " + mac ); + continue; + } + try { + // Patch in the 16 repetitions of the target mac address + for ( int i = 0; i < 6; ++i ) { + byte x = (byte)Integer.parseInt( m.group( i + 1 ), 16 ); + for ( int offset = 0; offset < 16; ++offset ) { + buffer[6 + offset * 6 + i] = x; + } + } + DatagramPacket dp = new DatagramPacket( buffer, buffer.length, destAddr, port ); + sock.send( dp ); + log.addMsg( "Sent packet to " + mac ); + success.add( mac ); + Thread.sleep( 10 ); + } catch ( NumberFormatException e ) { + log.addMsg( "Invalid octet in MAC address, skipping: " + mac ); + } catch ( IOException e ) { + log.addMsg( "Error sending UDP packet to " + ip + "/" + mac + ": " + e.toString() ); + } + } + } + } catch ( InterruptedException t ) { + Thread.currentThread().interrupt(); + } finally { + Util.multiClose( sock ); + } + + return success.toArray( new String[ success.size() ] ); + } + +} diff --git a/src/main/java/org/openslx/taskmanager/tasks/RemoteExec.java b/src/main/java/org/openslx/taskmanager/tasks/RemoteExec.java index de520a8..c533c27 100644 --- a/src/main/java/org/openslx/taskmanager/tasks/RemoteExec.java +++ b/src/main/java/org/openslx/taskmanager/tasks/RemoteExec.java @@ -50,6 +50,19 @@ public class RemoteExec extends AbstractTask private JSch sshClient; private Output status = new Output(); + + public RemoteExec() + { + } + + public RemoteExec(Client[] clients, String sshkey, int port, String command, int timeoutSeconds) + { + this.clients = clients; + this.sshkey = sshkey; + this.port = port; + this.command = command; + this.timeoutSeconds = timeoutSeconds; + } @Override protected boolean initTask() @@ -160,7 +173,12 @@ public class RemoteExec extends AbstractTask return true; } - + + public Output getStatusObject() + { + return this.status; + } + private void execCommand( ChannelExec channel, Client client, Result result ) throws JSchException, IOException { long st = System.currentTimeMillis(); @@ -279,9 +297,9 @@ public class RemoteExec extends AbstractTask static class Output { /** UUID -> Output */ - private Map result = new ConcurrentHashMap<>(); + Map result = new ConcurrentHashMap<>(); - private String error; + String error; private synchronized void addError( String e ) { @@ -305,6 +323,18 @@ public class RemoteExec extends AbstractTask private String username; /** How many ms of the given timeout are left */ private int timeoutLeft; + + public Client() + { + } + + public Client(String machineuuid, String clientip, int port, String username) + { + this.machineuuid = machineuuid; + this.clientip = clientip; + this.port = port; + this.username = username; + } } static enum State diff --git a/src/main/java/org/openslx/taskmanager/tasks/WakeOnLan.java b/src/main/java/org/openslx/taskmanager/tasks/WakeOnLan.java index 246749a..c92f3c9 100644 --- a/src/main/java/org/openslx/taskmanager/tasks/WakeOnLan.java +++ b/src/main/java/org/openslx/taskmanager/tasks/WakeOnLan.java @@ -1,163 +1,218 @@ package org.openslx.taskmanager.tasks; -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.net.SocketException; -import java.net.UnknownHostException; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.logging.log4j.util.Strings; +import org.openslx.satserver.util.MessageSink; import org.openslx.satserver.util.Util; +import org.openslx.satserver.util.WakeOnLanExecutor; import org.openslx.taskmanager.api.AbstractTask; +import org.openslx.taskmanager.tasks.RemoteExec.Output; import com.google.gson.annotations.Expose; public class WakeOnLan extends AbstractTask { - private static final Pattern RE_IPv4 = - Pattern.compile( "^(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)$" ); - private static final Pattern RE_MAC = - Pattern.compile( "^([0-9a-f]{2})[:-]([0-9a-f]{2}+)[:-]([0-9a-f]{2}+)[:-]([0-9a-f]{2}+)[:-]([0-9a-f]{2}+)[:-]([0-9a-f]{2}+)$", - Pattern.CASE_INSENSITIVE ); - + /** + * List of clients to wake up + */ @Expose - private String[] macs; + private List clients; + /** + * SHH data (key, command) for waking via another host + */ @Expose - private String ip; + private Map ssh; - @Expose - private String password; - + /** + * UDP port to send WOL packets to + */ @Expose private int port; private StatusObject status; - private byte[] buffer; - @Override protected boolean initTask() { status = new StatusObject(); this.setStatusObject( status ); - if ( macs == null || macs.length == 0 ) { - status.addMsg( "Macs empty" ); - } - if ( Util.isEmpty( ip ) ) { - status.addMsg( "IP empty" ); - } - int pwlen = 0; - Matcher m = null; - if ( !Util.isEmpty( password ) ) { - m = RE_IPv4.matcher( password ); - if ( m.matches() ) { - pwlen = 4; - } else { - m = RE_MAC.matcher( password ); - if ( m.matches() ) { - pwlen = 6; + if ( clients == null ) { + status.addMsg( "Clients null" ); + } else { + // Clean + for ( Iterator it = this.clients.iterator(); it.hasNext(); ) { + Machine client = it.next(); + if ( client == null || Util.isEmpty( client.mac ) || Util.isEmpty( client.ip ) + || client.methods == null || client.methods.isEmpty() ) { + it.remove(); } } - if ( pwlen == 0 ) { - status.addMsg( "Invalid password format: " + password ); + if ( clients.isEmpty() ) { + status.addMsg( "Clients empty" ); } } if ( !Util.isEmpty( status.messages ) ) return false; - buffer = new byte[ 17 * 6 + pwlen ]; - if ( pwlen != 0 ) { - try { - for ( int i = 0; i < pwlen; ++i ) { - int x = Integer.parseInt( m.group( i + 1 ), pwlen == 4 ? 10 : 16 ); - buffer[17 * 6 + i] = (byte)x; - } - } catch ( Throwable t ) { - status.addMsg( "Invalid octet in password" ); - return false; - } - } - if ( this.port == 0 ) { - this.port = 9; - } else if ( this.port > 65535 || this.port < 0 ) { - status.addMsg( "Invalid port: " + this.port ); - return false; - } return true; } @Override protected boolean execute() { - DatagramSocket sock = null; - try { - try { - sock = new DatagramSocket(); - sock.setBroadcast( true ); - } catch ( SocketException e ) { - status.addMsg( "Cannot create UDP socket" ); - return false; - } - - // Sync bytes - for ( int i = 0; i < 6; ++i ) { - buffer[i] = -1; - } - - // Repeat three times - for ( int reps = 0; reps < 3; ++reps ) { - if ( reps != 0 ) { - try { - Thread.sleep( 600 ); - } catch ( InterruptedException t ) { - Thread.currentThread().interrupt(); - break; - } + // Loop over clients until they all were handled (i.e. methods is empty) + ExecutorService tp = Executors.newFixedThreadPool( ssh.size() > 4 ? 4 : ssh.size() ); + Map> byMethod; + do { + byMethod = new HashMap<>(); + // Fetch next method for all clients + for ( Machine client : clients ) { + if ( client.methods.isEmpty() ) + continue; + // Group by method and destination IP address + String method = client.methods.remove( 0 ) + "//" + client.ip; + ArrayList list = byMethod.get( method ); + if ( list == null ) { + list = new ArrayList<>(); + byMethod.put( method, list ); } - // For each MAC - for ( int mi = 0; mi < macs.length; ++mi ) { - Matcher m = RE_MAC.matcher( macs[mi] ); - if ( !m.matches() ) { - status.addMsg( "Cannot parse MAC address " + macs[mi] ); - continue; - } - try { - for ( int i = 0; i < 6; ++i ) { - byte x = (byte)Integer.parseInt( m.group( i + 1 ), 16 ); - for ( int offset = 0; offset < 16; ++offset ) { - buffer[6 + offset * 6 + i] = x; + list.add( client.mac ); + } + // Execute + List> waitList = new ArrayList<>(); + for ( Entry> it : byMethod.entrySet() ) { + String[] parts = it.getKey().split( "//" ); + String method = parts[0]; + String ip = parts[1]; + if ( method.equalsIgnoreCase( "DIRECT" ) ) { + // Directly from server + waitList.add( tp.submit( () -> { + WakeOnLanExecutor wol = new WakeOnLanExecutor( null ); + String[] success = wol.execute( status, it.getValue(), ip, this.port ); + markSuccess( Arrays.asList( success ) ); + } ) ); + } else if ( this.ssh.containsKey( method ) ) { + // Via SSH + waitList.add( tp.submit( () -> { + SshData sshData = this.ssh.get( method ); + String macs = Strings.join( it.getValue().iterator(), ' ' ); + String command = sshData.command.replace( "%MACS%", macs ).replace( "%IP%", ip ); + RemoteExec.Client c = new RemoteExec.Client( "x", sshData.ip, sshData.port, sshData.username ); + RemoteExec task = new RemoteExec( new RemoteExec.Client[] { c }, sshData.sshkey, sshData.port, command, 5 ); + task.execute(); + Output s = task.getStatusObject(); + if ( s != null ) { + if ( s.result != null && s.result.containsKey( "x" ) && s.result.get( "x" ).exitCode == 0 ) { + markSuccess( Arrays.asList( macs ) ); + } + if ( !Util.isEmpty( s.error ) ) { + status.addMsg( s.error ); } } - } catch ( NumberFormatException e ) { - status.addMsg( "Invalid octet in MAC address: " + macs[mi] ); - continue; - } - DatagramPacket dp = new DatagramPacket( buffer, buffer.length, InetAddress.getByName( ip ), this.port ); - try { - sock.send( dp ); - status.addMsg( "Sent packet to " + macs[mi] ); - } catch ( IOException e ) { - status.addMsg( "Error sending UDP packet to " + ip + "/" + macs[mi] + ": " + e.toString() ); - } + } ) ); + } else { + status.addMsg( "Ignoring unknown SSH method '" + method + "'" ); } } - } catch ( UnknownHostException e ) { - status.addMsg( "Cannot resolve " + ip ); - return false; - } finally { - Util.multiClose( sock ); + // Wait for all jobs + for ( Future f : waitList ) { + try { + f.get(); + } catch ( InterruptedException e ) { + status.addMsg( "Threadpool got interrupted" ); + Thread.currentThread().interrupt(); + return false; + } catch ( ExecutionException e ) { + status.addMsg( "A treadpool job threw an exception" ); + e.printStackTrace(); + } + } + } while ( !byMethod.isEmpty() ); + if ( this.clients.isEmpty() ) + return true; + status.addMsg( "Failed clients:" ); + for ( Machine c : clients ) { + status.addMsg( c.ip ); } - return true; + return false; } - class StatusObject + private void markSuccess( Collection macs ) + { + synchronized ( this.clients ) { + for ( Iterator it = this.clients.iterator(); it.hasNext(); ) { + Machine client = it.next(); + if ( macs.contains( client.mac ) ) { + it.remove(); + } + } + } + } + + static class Machine + { + /** + * Destination IP (for directed broadcast) + */ + @Expose + String ip; + /** + * Destination MAC + */ + @Expose + String mac; + /** + * Desired WOL modes, from most to least preferred. either a key to the top level ssh map, + * or "DIRECT" for sending a WOL packet directly from the server + */ + @Expose + List methods; + /** + * WOL password (currently unused by slx-admin) + */ + @Expose + private String password; + } + + static class SshData + { + @Expose + String username; + @Expose + String sshkey; + @Expose + String ip; + @Expose + int port; + @Expose + String command; + } + + /* + * + */ + + /** + * Return any messages about progress and status + */ + static class StatusObject implements MessageSink { private String messages = ""; - public void addMsg( String str ) + public synchronized void addMsg( String str ) { messages = messages + "\n" + str; } -- cgit v1.2.3-55-g7522