summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSimon Rettberg2021-04-26 11:59:43 +0200
committerSimon Rettberg2021-04-26 11:59:43 +0200
commit71a9deeb716158d9df72fe66dcd815dd69dad8a5 (patch)
tree53f7f9934db1d603013934a9ef09dc427eb0b1ea
parentAdd ipxe version selection, use bwlp config for ipxe (diff)
downloadtmlite-bwlp-71a9deeb716158d9df72fe66dcd815dd69dad8a5.tar.gz
tmlite-bwlp-71a9deeb716158d9df72fe66dcd815dd69dad8a5.tar.xz
tmlite-bwlp-71a9deeb716158d9df72fe66dcd815dd69dad8a5.zip
[RemoteExec] Greatly speed up reading of stdout/err from client
-rw-r--r--src/main/java/org/openslx/taskmanager/tasks/RemoteExec.java114
1 files changed, 80 insertions, 34 deletions
diff --git a/src/main/java/org/openslx/taskmanager/tasks/RemoteExec.java b/src/main/java/org/openslx/taskmanager/tasks/RemoteExec.java
index 3bd0954..ca3dea2 100644
--- a/src/main/java/org/openslx/taskmanager/tasks/RemoteExec.java
+++ b/src/main/java/org/openslx/taskmanager/tasks/RemoteExec.java
@@ -2,6 +2,13 @@ package org.openslx.taskmanager.tasks;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -9,6 +16,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.log4j.Logger;
import org.openslx.satserver.util.Util;
import org.openslx.taskmanager.api.AbstractTask;
@@ -20,6 +28,10 @@ import com.jcraft.jsch.Session;
public class RemoteExec extends AbstractTask
{
+
+ private static final Logger LOGGER = Logger.getLogger( RemoteExec.class );
+
+ protected final static int MAX_OUTPUT_PER_CLIENT = 400000;
@Expose
private Client[] clients;
@@ -141,7 +153,7 @@ public class RemoteExec extends AbstractTask
tp.shutdown();
try {
- tp.awaitTermination( clients.length * 5, TimeUnit.SECONDS );
+ tp.awaitTermination( clients.length * this.timeoutSeconds + 5, TimeUnit.SECONDS );
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
return false;
@@ -149,27 +161,6 @@ public class RemoteExec extends AbstractTask
return true;
}
-
- private void copy( char[] cbuf, InputStreamReader in, StringBuffer out, StringBuffer other )
- {
- try {
- while ( in.ready() ) {
- int nb = in.read( cbuf );
- if ( nb == -1 )
- break;
- out.append( cbuf, 0, nb );
- if (out.length() + other.length() > 40000 ) {
- int trunc = 40000 - other.length();
- if ( trunc > 0 && trunc < out.length() ) {
- out.setLength( trunc );
- }
- break;
- }
- }
- } catch ( IOException e ) {
- e.printStackTrace();
- }
- }
private void execCommand( ChannelExec channel, Client client, Result result ) throws JSchException, IOException
{
@@ -183,14 +174,15 @@ public class RemoteExec extends AbstractTask
}
cmd += command;
channel.setCommand( cmd );
- InputStreamReader stdout = new InputStreamReader( channel.getInputStream(), StandardCharsets.UTF_8 );
- InputStreamReader stderr = new InputStreamReader( channel.getErrStream(), StandardCharsets.UTF_8 );
+ LimitedBufferStream lbsOut = new LimitedBufferStream( result.stdout );
+ LimitedBufferStream lbsErr = new LimitedBufferStream( result.stderr );
+ channel.setOutputStream( lbsOut );
+ channel.setErrStream( lbsErr );
result.state = State.EXEC;
channel.connect( Math.max( client.timeoutLeft - 1000, 500 ) );
long now = System.currentTimeMillis();
client.timeoutLeft -= now - st;
// Read as long as we got time
- char[] cbuf = new char[2000];
while ( client.timeoutLeft > 0 && !channel.isClosed() ) {
st = now;
try {
@@ -199,21 +191,13 @@ public class RemoteExec extends AbstractTask
Thread.currentThread().interrupt();
break;
} catch ( Exception ee ) {
+ LOGGER.warn( "Cannot sleep", ee );
break;
} finally {
now = System.currentTimeMillis();
client.timeoutLeft -= now - st;
}
- copy( cbuf, stdout, result.stdout, result.stderr );
- copy( cbuf, stderr, result.stderr, result.stdout );
- // Check for reasonable output size
- if ( result.stdout.length() + result.stderr.length() > 40000 ) {
- status.addError( "Truncating output of client " + client.clientip );
- break;
- }
}
- copy( cbuf, stdout, result.stdout, result.stderr );
- copy( cbuf, stderr, result.stderr, result.stdout );
if ( channel.isClosed() ) {
result.state = State.DONE;
} else {
@@ -228,6 +212,68 @@ public class RemoteExec extends AbstractTask
}
}
+ static class LimitedBufferStream extends OutputStream
+ {
+
+ private final StringBuffer sb;
+ private final CharBuffer cb = CharBuffer.allocate( 200 );
+ private final ByteBuffer bb = ByteBuffer.allocate( 200 );
+ private final CharsetDecoder decoder;
+
+ public LimitedBufferStream(StringBuffer sb)
+ {
+ this.sb = sb;
+ decoder = StandardCharsets.UTF_8.newDecoder();
+ decoder.onMalformedInput( CodingErrorAction.REPLACE );
+ decoder.onUnmappableCharacter( CodingErrorAction.REPLACE );
+ decoder.reset();
+ }
+
+ @Override
+ public void write( int b ) throws IOException
+ {
+ if ( sb.length() >= RemoteExec.MAX_OUTPUT_PER_CLIENT )
+ return;
+ bb.put( (byte) ( b & 0xff ) );
+ decode();
+ }
+
+ @Override
+ public void write( byte[] b, int off, int len ) throws IOException
+ {
+ if ( sb.length() >= RemoteExec.MAX_OUTPUT_PER_CLIENT )
+ return;
+ while ( len > 0 ) {
+ int nlen = Math.min( bb.remaining(), len );
+ if ( nlen <= 0 )
+ throw new RuntimeException( "Empty buffer" );
+ bb.put( b, off, nlen );
+ off += nlen;
+ len -= nlen;
+ decode();
+ }
+ }
+
+ private void decode()
+ {
+ if ( bb.position() == 0 )
+ return;
+ ( (Buffer)bb ).limit( bb.position() );
+ ( (Buffer)bb ).position( 0 );
+ try {
+ decoder.decode( bb, cb, false );
+ } catch ( Throwable t ) {
+ LOGGER.warn( "Cannot convert data to UTF8", t );
+ }
+ bb.compact();
+ ( (Buffer)cb ).limit( cb.position() );
+ ( (Buffer)cb ).position( 0 );
+ sb.append( cb.toString() );
+ ( (Buffer)cb ).clear();
+ }
+
+ }
+
/**
* Output - contains additional status data of this task
*/