From 71a9deeb716158d9df72fe66dcd815dd69dad8a5 Mon Sep 17 00:00:00 2001 From: Simon Rettberg Date: Mon, 26 Apr 2021 11:59:43 +0200 Subject: [RemoteExec] Greatly speed up reading of stdout/err from client --- .../org/openslx/taskmanager/tasks/RemoteExec.java | 114 +++++++++++++++------ 1 file changed, 80 insertions(+), 34 deletions(-) (limited to 'src/main/java') diff --git a/src/main/java/org/openslx/taskmanager/tasks/RemoteExec.java b/src/main/java/org/openslx/taskmanager/tasks/RemoteExec.java index 3bd0954..ca3dea2 100644 --- a/src/main/java/org/openslx/taskmanager/tasks/RemoteExec.java +++ b/src/main/java/org/openslx/taskmanager/tasks/RemoteExec.java @@ -2,6 +2,13 @@ package org.openslx.taskmanager.tasks; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStream; +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CoderResult; +import java.nio.charset.CodingErrorAction; import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -9,6 +16,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.log4j.Logger; import org.openslx.satserver.util.Util; import org.openslx.taskmanager.api.AbstractTask; @@ -20,6 +28,10 @@ import com.jcraft.jsch.Session; public class RemoteExec extends AbstractTask { + + private static final Logger LOGGER = Logger.getLogger( RemoteExec.class ); + + protected final static int MAX_OUTPUT_PER_CLIENT = 400000; @Expose private Client[] clients; @@ -141,7 +153,7 @@ public class RemoteExec extends AbstractTask tp.shutdown(); try { - tp.awaitTermination( clients.length * 5, TimeUnit.SECONDS ); + tp.awaitTermination( clients.length * this.timeoutSeconds + 5, TimeUnit.SECONDS ); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); return false; @@ -149,27 +161,6 @@ public class RemoteExec extends AbstractTask return true; } - - private void copy( char[] cbuf, InputStreamReader in, StringBuffer out, StringBuffer other ) - { - try { - while ( in.ready() ) { - int nb = in.read( cbuf ); - if ( nb == -1 ) - break; - out.append( cbuf, 0, nb ); - if (out.length() + other.length() > 40000 ) { - int trunc = 40000 - other.length(); - if ( trunc > 0 && trunc < out.length() ) { - out.setLength( trunc ); - } - break; - } - } - } catch ( IOException e ) { - e.printStackTrace(); - } - } private void execCommand( ChannelExec channel, Client client, Result result ) throws JSchException, IOException { @@ -183,14 +174,15 @@ public class RemoteExec extends AbstractTask } cmd += command; channel.setCommand( cmd ); - InputStreamReader stdout = new InputStreamReader( channel.getInputStream(), StandardCharsets.UTF_8 ); - InputStreamReader stderr = new InputStreamReader( channel.getErrStream(), StandardCharsets.UTF_8 ); + LimitedBufferStream lbsOut = new LimitedBufferStream( result.stdout ); + LimitedBufferStream lbsErr = new LimitedBufferStream( result.stderr ); + channel.setOutputStream( lbsOut ); + channel.setErrStream( lbsErr ); result.state = State.EXEC; channel.connect( Math.max( client.timeoutLeft - 1000, 500 ) ); long now = System.currentTimeMillis(); client.timeoutLeft -= now - st; // Read as long as we got time - char[] cbuf = new char[2000]; while ( client.timeoutLeft > 0 && !channel.isClosed() ) { st = now; try { @@ -199,21 +191,13 @@ public class RemoteExec extends AbstractTask Thread.currentThread().interrupt(); break; } catch ( Exception ee ) { + LOGGER.warn( "Cannot sleep", ee ); break; } finally { now = System.currentTimeMillis(); client.timeoutLeft -= now - st; } - copy( cbuf, stdout, result.stdout, result.stderr ); - copy( cbuf, stderr, result.stderr, result.stdout ); - // Check for reasonable output size - if ( result.stdout.length() + result.stderr.length() > 40000 ) { - status.addError( "Truncating output of client " + client.clientip ); - break; - } } - copy( cbuf, stdout, result.stdout, result.stderr ); - copy( cbuf, stderr, result.stderr, result.stdout ); if ( channel.isClosed() ) { result.state = State.DONE; } else { @@ -228,6 +212,68 @@ public class RemoteExec extends AbstractTask } } + static class LimitedBufferStream extends OutputStream + { + + private final StringBuffer sb; + private final CharBuffer cb = CharBuffer.allocate( 200 ); + private final ByteBuffer bb = ByteBuffer.allocate( 200 ); + private final CharsetDecoder decoder; + + public LimitedBufferStream(StringBuffer sb) + { + this.sb = sb; + decoder = StandardCharsets.UTF_8.newDecoder(); + decoder.onMalformedInput( CodingErrorAction.REPLACE ); + decoder.onUnmappableCharacter( CodingErrorAction.REPLACE ); + decoder.reset(); + } + + @Override + public void write( int b ) throws IOException + { + if ( sb.length() >= RemoteExec.MAX_OUTPUT_PER_CLIENT ) + return; + bb.put( (byte) ( b & 0xff ) ); + decode(); + } + + @Override + public void write( byte[] b, int off, int len ) throws IOException + { + if ( sb.length() >= RemoteExec.MAX_OUTPUT_PER_CLIENT ) + return; + while ( len > 0 ) { + int nlen = Math.min( bb.remaining(), len ); + if ( nlen <= 0 ) + throw new RuntimeException( "Empty buffer" ); + bb.put( b, off, nlen ); + off += nlen; + len -= nlen; + decode(); + } + } + + private void decode() + { + if ( bb.position() == 0 ) + return; + ( (Buffer)bb ).limit( bb.position() ); + ( (Buffer)bb ).position( 0 ); + try { + decoder.decode( bb, cb, false ); + } catch ( Throwable t ) { + LOGGER.warn( "Cannot convert data to UTF8", t ); + } + bb.compact(); + ( (Buffer)cb ).limit( cb.position() ); + ( (Buffer)cb ).position( 0 ); + sb.append( cb.toString() ); + ( (Buffer)cb ).clear(); + } + + } + /** * Output - contains additional status data of this task */ -- cgit v1.2.3-55-g7522