diff options
author | Simon Rettberg | 2016-06-13 15:20:16 +0200 |
---|---|---|
committer | Simon Rettberg | 2016-06-13 15:20:16 +0200 |
commit | 83273ac89e48b41881d15168d47be658cf6aeebc (patch) | |
tree | 2b985fef430593ac9d27a54ceebbcd3f8f5306ff /src | |
parent | Cleanup, features, everything, nonsense commitmsg (diff) | |
download | dnbd3-status-83273ac89e48b41881d15168d47be658cf6aeebc.tar.gz dnbd3-status-83273ac89e48b41881d15168d47be658cf6aeebc.tar.xz dnbd3-status-83273ac89e48b41881d15168d47be658cf6aeebc.zip |
Fix CPU spinning in NanoHTTPd
Diffstat (limited to 'src')
-rw-r--r-- | src/main/java/fi/iki/elonen/NanoHTTPD.java | 305 | ||||
-rw-r--r-- | src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java | 85 |
2 files changed, 279 insertions, 111 deletions
diff --git a/src/main/java/fi/iki/elonen/NanoHTTPD.java b/src/main/java/fi/iki/elonen/NanoHTTPD.java index 75026ee..bb882ee 100644 --- a/src/main/java/fi/iki/elonen/NanoHTTPD.java +++ b/src/main/java/fi/iki/elonen/NanoHTTPD.java @@ -35,14 +35,13 @@ package fi.iki.elonen; import java.io.BufferedReader; import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; -import java.io.PrintWriter; import java.io.PushbackInputStream; -import java.io.Reader; import java.io.UnsupportedEncodingException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -51,23 +50,26 @@ import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.URLDecoder; -import java.text.SimpleDateFormat; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.StringTokenizer; -import java.util.TimeZone; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.openslx.util.GrowingThreadPoolExecutor; + /** * A simple, tiny, nicely embeddable HTTP server in Java * <p/> @@ -117,12 +119,13 @@ import java.util.concurrent.TimeUnit; */ public abstract class NanoHTTPD implements Runnable { + /** * Maximum time to wait on Socket.getInputStream().read() (in milliseconds) * This is required as the Keep-Alive HTTP connections would otherwise * block the socket reading thread forever (or as long the browser is open). */ - public static final int SOCKET_READ_TIMEOUT = 15000; + public static final int SOCKET_READ_TIMEOUT = 10000; /** * Common MIME type for dynamic content: plain text */ @@ -132,7 +135,8 @@ public abstract class NanoHTTPD implements Runnable */ public static final String MIME_HTML = "text/html"; /** - * Pseudo-Parameter to use to store the actual query string in the parameters map for later + * Pseudo-Parameter to use to store the actual query string in the + * parameters map for later * re-processing. */ private static final String QUERY_STRING_PARAMETER = "NanoHttpd.QUERY_STRING"; @@ -145,6 +149,8 @@ public abstract class NanoHTTPD implements Runnable */ private AsyncRunner asyncRunner; + protected int maxRequestSize = 0; + /** * Constructs an HTTP server on given port. */ @@ -184,7 +190,8 @@ public abstract class NanoHTTPD implements Runnable try { myServerSocket = new ServerSocket(); myServerSocket.setReuseAddress( true ); - myServerSocket.bind( ( hostname != null ) ? new InetSocketAddress( hostname, myPort ) : new InetSocketAddress( myPort ) ); + myServerSocket.bind( ( hostname != null ) ? new InetSocketAddress( hostname, myPort ) + : new InetSocketAddress( myPort ) ); } catch ( Exception e ) { throw new RuntimeException( e ); } @@ -202,14 +209,16 @@ public abstract class NanoHTTPD implements Runnable OutputStream outputStream = null; try { outputStream = finalAccept.getOutputStream(); - HTTPSession session = new HTTPSession( inputStream, outputStream, finalAccept.getInetAddress() ); - while ( !finalAccept.isClosed() ) { + HTTPSession session = new HTTPSession( inputStream, outputStream, + finalAccept.getInetAddress() ); + while ( !finalAccept.isClosed() && !finalAccept.isInputShutdown() ) { session.execute(); } } catch ( Exception e ) { // When the socket is closed by the client, we throw our own SocketException // to break the "keep alive" loop above. - if ( ! ( e instanceof SocketException && "NanoHttpd Shutdown".equals( e.getMessage() ) ) ) { + if ( ! ( e instanceof SocketTimeoutException ) + && ! ( e instanceof SocketException && "NanoHttpd Shutdown".equals( e.getMessage() ) ) ) { e.printStackTrace(); } } finally { @@ -290,9 +299,11 @@ public abstract class NanoHTTPD implements Runnable * <p/> * (By default, this returns a 404 "Not Found" plain text error response.) * - * @param uri Percent-decoded URI without parameters, for example "/index.cgi" + * @param uri Percent-decoded URI without parameters, for example + * "/index.cgi" * @param method "GET", "POST" etc. - * @param parms Parsed, percent decoded parameters from URI and, in case of POST, data. + * @param parms Parsed, percent decoded parameters from URI and, in case of + * POST, data. * @param headers Header entries, percent decoded * @return HTTP response, see class Response for details */ @@ -320,7 +331,8 @@ public abstract class NanoHTTPD implements Runnable try { session.parseBody( files ); } catch ( IOException ioe ) { - return new Response( Response.Status.INTERNAL_ERROR, MIME_PLAINTEXT, "SERVER INTERNAL ERROR: IOException: " + ioe.getMessage() ); + return new Response( Response.Status.INTERNAL_ERROR, MIME_PLAINTEXT, + "SERVER INTERNAL ERROR: IOException: " + ioe.getMessage() ); } catch ( ResponseException re ) { return new Response( re.getStatus(), MIME_PLAINTEXT, re.getMessage() ); } @@ -335,7 +347,8 @@ public abstract class NanoHTTPD implements Runnable * Decode percent encoded <code>String</code> values. * * @param str the percent encoded <code>String</code> - * @return expanded form of the input, for example "foo%20bar" becomes "foo bar" + * @return expanded form of the input, for example "foo%20bar" becomes + * "foo bar" */ protected String decodePercent( String str ) { @@ -348,13 +361,15 @@ public abstract class NanoHTTPD implements Runnable } /** - * Decode parameters from a URL, handing the case where a single parameter name might have been - * supplied several times, by return lists of values. In general these lists will contain a + * Decode parameters from a URL, handing the case where a single parameter + * name might have been + * supplied several times, by return lists of values. In general these lists + * will contain a * single * element. * - * @param parms original <b>NanoHTTPD</b> parameters values, as passed to the - * <code>serve()</code> method. + * @param parms original <b>NanoHTTPD</b> parameters values, as passed to + * the <code>serve()</code> method. * @return a map of <code>String</code> (parameter name) to <code>List<String></code> (a * list of the values supplied). */ @@ -364,8 +379,10 @@ public abstract class NanoHTTPD implements Runnable } /** - * Decode parameters from a URL, handing the case where a single parameter name might have been - * supplied several times, by return lists of values. In general these lists will contain a + * Decode parameters from a URL, handing the case where a single parameter + * name might have been + * supplied several times, by return lists of values. In general these lists + * will contain a * single * element. * @@ -381,7 +398,8 @@ public abstract class NanoHTTPD implements Runnable while ( st.hasMoreTokens() ) { String e = st.nextToken(); int sep = e.indexOf( '=' ); - String propertyName = ( sep >= 0 ) ? decodePercent( e.substring( 0, sep ) ).trim() : decodePercent( e ).trim(); + String propertyName = ( sep >= 0 ) ? decodePercent( e.substring( 0, sep ) ).trim() : decodePercent( + e ).trim(); if ( !parms.containsKey( propertyName ) ) { parms.put( propertyName, new ArrayList<String>() ); } @@ -415,7 +433,12 @@ public abstract class NanoHTTPD implements Runnable */ public enum Method { - GET, PUT, POST, DELETE, HEAD, OPTIONS; + GET, + PUT, + POST, + DELETE, + HEAD, + OPTIONS; static Method lookup( String method ) { @@ -442,25 +465,20 @@ public abstract class NanoHTTPD implements Runnable * Default threading strategy for NanoHTTPD. * <p/> * <p> - * By default, the server spawns a new Thread for every incoming request. These are set to - * <i>daemon</i> status, and named according to the request number. The name is useful when - * profiling the application. + * Uses a thread pool. * </p> */ public static class DefaultAsyncRunner implements AsyncRunner { - private long requestCount; - private ExecutorService pool = new ThreadPoolExecutor( 6, 32, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>( 1 ) ); + private ExecutorService pool = new GrowingThreadPoolExecutor( 2, 16, 1, TimeUnit.MINUTES, + new ArrayBlockingQueue<Runnable>( 4 ) ); @Override public void exec( Runnable code ) { try { - pool.submit( code ); - ++requestCount; - System.out.println( "NanoHttpd Request Processor (#" + requestCount + ")" ); + pool.execute( code ); } catch ( RejectedExecutionException e ) { - System.out.println( "Too many pending requests, dropping client..." ); } } } @@ -496,7 +514,8 @@ public abstract class NanoHTTPD implements Runnable private boolean chunkedTransfer; /** - * Default constructor: response = Status.OK, mime = MIME_HTML and your supplied message + * Default constructor: response = Status.OK, mime = MIME_HTML and your + * supplied message */ public Response( String msg ) { @@ -540,56 +559,72 @@ public abstract class NanoHTTPD implements Runnable return header.get( name ); } + private static final DateTimeFormatter headerDateFormatter = DateTimeFormat.forPattern( + "E, d MMM yyyy HH:mm:ss 'GMT'" ) + .withLocale( Locale.US ) + .withZoneUTC(); + /** * Sends given response to the socket. */ protected void send( OutputStream outputStream ) { String mime = mimeType; - SimpleDateFormat gmtFrmt = new SimpleDateFormat( "E, d MMM yyyy HH:mm:ss 'GMT'", Locale.US ); - gmtFrmt.setTimeZone( TimeZone.getTimeZone( "GMT" ) ); - - try { - if ( status == null ) { - throw new Error( "sendResponse(): Status can't be null." ); - } - PrintWriter pw = new PrintWriter( outputStream ); - pw.print( "HTTP/1.1 " + status.getDescription() + " \r\n" ); - if ( mime != null ) { - pw.print( "Content-Type: " + mime + "\r\n" ); - } + StringBuilder sb = new StringBuilder(); + if ( status == null ) { + throw new Error( "sendResponse(): Status can't be null." ); + } + sb.append( "HTTP/1.1 " ); + sb.append( status.getDescription() ); + sb.append( " \r\n" ); + + if ( mime != null ) { + sb.append( "Content-Type: " ); + sb.append( mime ); + sb.append( "\r\n" ); + } - if ( header == null || header.get( "Date" ) == null ) { - pw.print( "Date: " + gmtFrmt.format( new Date() ) + "\r\n" ); - } + if ( header == null || header.get( "Date" ) == null ) { + sb.append( "Date: " ); + sb.append( headerDateFormatter.print( System.currentTimeMillis() ) ); + sb.append( "\r\n" ); + } - if ( header != null ) { - for ( String key : header.keySet() ) { - String value = header.get( key ); - pw.print( key + ": " + value + "\r\n" ); - } + if ( header != null ) { + for ( Entry<String, String> item : header.entrySet() ) { + sb.append( item.getKey() ); + sb.append( ": " ); + sb.append( item.getValue() ); + sb.append( "\r\n" ); } + } - sendConnectionHeaderIfNotAlreadyPresent( pw, header ); + sendConnectionHeaderIfNotAlreadyPresent( sb, header ); + try { if ( requestMethod != Method.HEAD && chunkedTransfer ) { - sendAsChunked( outputStream, pw ); + sendAsChunked( outputStream, sb ); } else { int pending = data != null ? data.available() : 0; - pending = sendContentLengthHeaderIfNotAlreadyPresent( pw, header, pending ); - pw.print( "\r\n" ); - pw.flush(); + pending = sendContentLengthHeaderIfNotAlreadyPresent( sb, header, pending ); + sb.append( "\r\n" ); + outputStream.write( sb.toString().getBytes( StandardCharsets.UTF_8 ) ); + sb.setLength( 0 ); sendAsFixedLength( outputStream, pending ); } - outputStream.flush(); + + if ( sb.length() != 0 ) { + outputStream.write( sb.toString().getBytes( StandardCharsets.UTF_8 ) ); + } safeClose( data ); } catch ( IOException ioe ) { // Couldn't write? No can do. } } - protected int sendContentLengthHeaderIfNotAlreadyPresent( PrintWriter pw, Map<String, String> header, int size ) + protected int sendContentLengthHeaderIfNotAlreadyPresent( StringBuilder sb, + Map<String, String> header, int size ) { for ( String headerName : header.keySet() ) { if ( headerName.equalsIgnoreCase( "content-length" ) ) { @@ -601,33 +636,43 @@ public abstract class NanoHTTPD implements Runnable } } - pw.print( "Content-Length: " + size + "\r\n" ); + sb.append( "Content-Length: " ); + sb.append( size ); + sb.append( "\r\n" ); return size; } - protected void sendConnectionHeaderIfNotAlreadyPresent( PrintWriter pw, Map<String, String> header ) + protected void sendConnectionHeaderIfNotAlreadyPresent( StringBuilder sb, Map<String, String> header ) { if ( !headerAlreadySent( header, "connection" ) ) { - pw.print( "Connection: keep-alive\r\n" ); + sb.append( "Connection: keep-alive\r\n" ); + } + if ( !headerAlreadySent( header, "keep-alive" ) ) { + sb.append( "Keep-Alive: timeout=" ); + sb.append( SOCKET_READ_TIMEOUT / 1000 - 1 ); + sb.append( "\r\n" ); } } private boolean headerAlreadySent( Map<String, String> header, String name ) { - boolean alreadySent = false; for ( String headerName : header.keySet() ) { - alreadySent |= headerName.equalsIgnoreCase( name ); + if ( headerName.equalsIgnoreCase( name ) ) + return true; } - return alreadySent; + return false; } - private void sendAsChunked( OutputStream outputStream, PrintWriter pw ) throws IOException + private static final byte[] CRLF = "\r\n".getBytes(); + private static final byte[] CHUNKED_END = "0\r\n\r\n".getBytes(); + private static final int BUFFER_SIZE = 256 * 1024; + + private void sendAsChunked( OutputStream outputStream, StringBuilder sb ) throws IOException { - pw.print( "Transfer-Encoding: chunked\r\n" ); - pw.print( "\r\n" ); - pw.flush(); - int BUFFER_SIZE = 16 * 1024; - byte[] CRLF = "\r\n".getBytes(); + sb.append( "Transfer-Encoding: chunked\r\n" ); + sb.append( "\r\n" ); + outputStream.write( sb.toString().getBytes( StandardCharsets.UTF_8 ) ); + sb.setLength( 0 ); byte[] buff = new byte[ BUFFER_SIZE ]; int read; while ( ( read = data.read( buff ) ) > 0 ) { @@ -635,7 +680,7 @@ public abstract class NanoHTTPD implements Runnable outputStream.write( buff, 0, read ); outputStream.write( CRLF ); } - outputStream.write( String.format( "0\r\n\r\n" ).getBytes() ); + outputStream.write( CHUNKED_END ); } private void sendAsFixedLength( OutputStream outputStream, int pending ) throws IOException @@ -711,11 +756,21 @@ public abstract class NanoHTTPD implements Runnable */ public enum Status implements IStatus { - SWITCH_PROTOCOL( 101, "Switching Protocols" ), OK( 200, "OK" ), CREATED( 201, "Created" ), ACCEPTED( 202, "Accepted" ), NO_CONTENT( 204, "No Content" ), PARTIAL_CONTENT( - 206, "Partial Content" ), REDIRECT( 301, - "Moved Permanently" ), NOT_MODIFIED( 304, "Not Modified" ), BAD_REQUEST( 400, "Bad Request" ), UNAUTHORIZED( 401, - "Unauthorized" ), FORBIDDEN( 403, "Forbidden" ), NOT_FOUND( 404, "Not Found" ), METHOD_NOT_ALLOWED( 405, "Method Not Allowed" ), RANGE_NOT_SATISFIABLE( 416, - "Requested Range Not Satisfiable" ), INTERNAL_ERROR( 500, "Internal Server Error" ); + SWITCH_PROTOCOL( 101, "Switching Protocols" ), + OK( 200, "OK" ), + CREATED( 201, "Created" ), + ACCEPTED( 202, "Accepted" ), + NO_CONTENT( 204, "No Content" ), + PARTIAL_CONTENT( 206, "Partial Content" ), + REDIRECT( 301, "Moved Permanently" ), + NOT_MODIFIED( 304, "Not Modified" ), + BAD_REQUEST( 400, "Bad Request" ), + UNAUTHORIZED( 401, "Unauthorized" ), + FORBIDDEN( 403, "Forbidden" ), + NOT_FOUND( 404, "Not Found" ), + METHOD_NOT_ALLOWED( 405, "Method Not Allowed" ), + RANGE_NOT_SATISFIABLE( 416, "Requested Range Not Satisfiable" ), + INTERNAL_ERROR( 500, "Internal Server Error" ); private final int requestStatus; private final String description; @@ -763,7 +818,8 @@ public abstract class NanoHTTPD implements Runnable } /** - * Handles one session, i.e. parses the HTTP request and returns the response. + * Handles one session, i.e. parses the HTTP request and returns the + * response. */ public interface IHTTPSession { @@ -816,7 +872,8 @@ public abstract class NanoHTTPD implements Runnable { this.inputStream = new PushbackInputStream( inputStream, BUFSIZE ); this.outputStream = outputStream; - remoteIp = inetAddress.isLoopbackAddress() || inetAddress.isAnyLocalAddress() ? "127.0.0.1" : inetAddress.getHostAddress().toString(); + remoteIp = inetAddress.isLoopbackAddress() || inetAddress.isAnyLocalAddress() ? "127.0.0.1" + : inetAddress.getHostAddress().toString(); headers = new HashMap<String, String>(); } @@ -838,7 +895,7 @@ public abstract class NanoHTTPD implements Runnable } catch ( Exception e ) { safeClose( inputStream ); safeClose( outputStream ); - throw new SocketException( "NanoHttpd Shutdown" ); + throw e; } if ( read == -1 ) { // socket was been closed @@ -852,6 +909,8 @@ public abstract class NanoHTTPD implements Runnable if ( splitbyte > 0 ) break; read = inputStream.read( buf, rlen, BUFSIZE - rlen ); + if ( maxRequestSize != 0 && rlen > maxRequestSize ) + throw new SocketException( "Request too large" ); } } @@ -872,7 +931,8 @@ public abstract class NanoHTTPD implements Runnable } // Create a BufferedReader for parsing the header. - BufferedReader hin = new BufferedReader( new InputStreamReader( new ByteArrayInputStream( buf, 0, rlen ) ) ); + BufferedReader hin = new BufferedReader( new InputStreamReader( new ByteArrayInputStream( buf, + 0, rlen ) ) ); // Decode the header into parms and header java properties Map<String, String> pre = new HashMap<String, String>(); @@ -888,7 +948,8 @@ public abstract class NanoHTTPD implements Runnable // Ok, now do the serve() Response r = serve( this ); if ( r == null ) { - throw new ResponseException( Response.Status.INTERNAL_ERROR, "SERVER INTERNAL ERROR: Serve() returned a null response." ); + throw new ResponseException( Response.Status.INTERNAL_ERROR, + "SERVER INTERNAL ERROR: Serve() returned a null response." ); } else { r.setRequestMethod( method ); r.send( outputStream ); @@ -899,7 +960,8 @@ public abstract class NanoHTTPD implements Runnable } catch ( SocketTimeoutException ste ) { throw ste; } catch ( IOException ioe ) { - Response r = new Response( Response.Status.INTERNAL_ERROR, MIME_PLAINTEXT, "SERVER INTERNAL ERROR: IOException: " + ioe.getMessage() ); + Response r = new Response( Response.Status.INTERNAL_ERROR, MIME_PLAINTEXT, + "SERVER INTERNAL ERROR: IOException: " + ioe.getMessage() ); r.send( outputStream ); safeClose( outputStream ); } catch ( ResponseException re ) { @@ -912,7 +974,6 @@ public abstract class NanoHTTPD implements Runnable @Override public void parseBody( Map<String, String> files ) throws IOException, ResponseException { - final Reader in = new InputStreamReader( inputStream ); long size; if ( headers.containsKey( "content-length" ) ) { size = Integer.parseInt( headers.get( "content-length" ) ); @@ -925,35 +986,51 @@ public abstract class NanoHTTPD implements Runnable // If the method is POST, there may be parameters // in data section, too, read it: if ( Method.POST.equals( method ) ) { - String contentType = ""; + String contentType = null; + String contentEncoding = null; String contentTypeHeader = headers.get( "content-type" ); StringTokenizer st = null; if ( contentTypeHeader != null ) { - st = new StringTokenizer( contentTypeHeader, ",; " ); + st = new StringTokenizer( contentTypeHeader, "," ); if ( st.hasMoreTokens() ) { - contentType = st.nextToken(); + String part[] = st.nextToken().split( ";\\s*", 2 ); + contentType = part[0]; + if ( part.length == 2 ) { + contentEncoding = part[1]; + } + } + } + Charset cs = StandardCharsets.ISO_8859_1; + if ( contentEncoding != null ) { + try { + cs = Charset.forName( contentEncoding ); + } catch ( Exception e ) { } } + //LOGGER.debug("Content type is '" + contentType + "', encoding '" + cs.name() + "'"); if ( "multipart/form-data".equalsIgnoreCase( contentType ) ) { - throw new ResponseException( Response.Status.BAD_REQUEST, "BAD REQUEST: Content type is multipart/form-data, which is not supported" ); + throw new ResponseException( Response.Status.BAD_REQUEST, + "BAD REQUEST: Content type is multipart/form-data, which is not supported" ); } else { - String postLine = ""; - StringBuilder postLineBuffer = new StringBuilder(); - char pbuf[] = new char[ 512 ]; - while ( rlen >= 0 && size > 0 && !postLine.endsWith( "\r\n" ) ) { - rlen = in.read( pbuf, 0, (int)Math.min( size, 512 ) ); - if ( rlen <= 0 ) + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte pbuf[] = new byte[ 1000 ]; + while ( size > 0 ) { + int ret = inputStream.read( pbuf, 0, (int)Math.min( size, pbuf.length ) ); + if ( ret <= 0 ) + break; + if ( ret >= 2 && pbuf[ret - 1] == '\n' && pbuf[ret - 2] == '\r' ) break; - postLine = String.valueOf( pbuf, 0, rlen ); - postLineBuffer.append( postLine ); + size -= ret; + baos.write( pbuf, 0, ret ); } - postLine = postLineBuffer.toString().trim(); + String postLine = new String( baos.toByteArray(), cs ); + baos.close(); // Handle application/x-www-form-urlencoded if ( "application/x-www-form-urlencoded".equalsIgnoreCase( contentType ) ) { decodeParms( postLine, parms ); - } else if ( postLine.length() != 0 ) { + } else if ( files != null && postLine.length() != 0 ) { // Special case for raw POST data => create a special files entry "postData" with raw content data files.put( "postData", postLine ); } @@ -964,8 +1041,8 @@ public abstract class NanoHTTPD implements Runnable /** * Decodes the sent headers and loads the data into Key/value pairs */ - private void decodeHeader( BufferedReader in, Map<String, String> pre, Map<String, String> parms, Map<String, String> headers ) - throws ResponseException + private void decodeHeader( BufferedReader in, Map<String, String> pre, Map<String, String> parms, + Map<String, String> headers ) throws ResponseException { try { // Read the request line @@ -976,13 +1053,15 @@ public abstract class NanoHTTPD implements Runnable StringTokenizer st = new StringTokenizer( inLine ); if ( !st.hasMoreTokens() ) { - throw new ResponseException( Response.Status.BAD_REQUEST, "BAD REQUEST: Syntax error. Usage: GET /example/file.html" ); + throw new ResponseException( Response.Status.BAD_REQUEST, + "BAD REQUEST: Syntax error. Usage: GET /example/file.html" ); } pre.put( "method", st.nextToken() ); if ( !st.hasMoreTokens() ) { - throw new ResponseException( Response.Status.BAD_REQUEST, "BAD REQUEST: Missing URI. Usage: GET /example/file.html" ); + throw new ResponseException( Response.Status.BAD_REQUEST, + "BAD REQUEST: Missing URI. Usage: GET /example/file.html" ); } String uri = st.nextToken(); @@ -1005,26 +1084,30 @@ public abstract class NanoHTTPD implements Runnable while ( line != null && line.trim().length() > 0 ) { int p = line.indexOf( ':' ); if ( p >= 0 ) - headers.put( line.substring( 0, p ).trim().toLowerCase( Locale.US ), line.substring( p + 1 ).trim() ); + headers.put( line.substring( 0, p ).trim().toLowerCase( Locale.US ), + line.substring( p + 1 ).trim() ); line = in.readLine(); } } pre.put( "uri", uri ); } catch ( IOException ioe ) { - throw new ResponseException( Response.Status.INTERNAL_ERROR, "SERVER INTERNAL ERROR: IOException: " + ioe.getMessage(), ioe ); + throw new ResponseException( Response.Status.INTERNAL_ERROR, + "SERVER INTERNAL ERROR: IOException: " + ioe.getMessage(), ioe ); } } /** - * Find byte index separating header from body. It must be the last byte of the first two + * Find byte index separating header from body. It must be the last byte + * of the first two * sequential new lines. */ private int findHeaderEnd( final byte[] buf, int rlen ) { int splitbyte = 0; while ( splitbyte + 3 < rlen ) { - if ( buf[splitbyte] == '\r' && buf[splitbyte + 1] == '\n' && buf[splitbyte + 2] == '\r' && buf[splitbyte + 3] == '\n' ) { + if ( buf[splitbyte] == '\r' && buf[splitbyte + 1] == '\n' && buf[splitbyte + 2] == '\r' + && buf[splitbyte + 3] == '\n' ) { return splitbyte + 4; } splitbyte++; @@ -1035,7 +1118,8 @@ public abstract class NanoHTTPD implements Runnable /** * Decodes parameters in percent-encoded URI-format ( e.g. * "name=Jack%20Daniels&pass=Single%20Malt" ) and - * adds them to given Map. NOTE: this doesn't support multiple identical keys due to the + * adds them to given Map. NOTE: this doesn't support multiple identical + * keys due to the * simplicity of Map. */ private void decodeParms( String parms, Map<String, String> p ) @@ -1051,8 +1135,7 @@ public abstract class NanoHTTPD implements Runnable String e = st.nextToken(); int sep = e.indexOf( '=' ); if ( sep >= 0 ) { - p.put( decodePercent( e.substring( 0, sep ) ).trim(), - decodePercent( e.substring( sep + 1 ) ) ); + p.put( decodePercent( e.substring( 0, sep ) ).trim(), decodePercent( e.substring( sep + 1 ) ) ); } else { p.put( decodePercent( e ).trim(), "" ); } diff --git a/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java b/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java new file mode 100644 index 0000000..7b0b2d9 --- /dev/null +++ b/src/main/java/org/openslx/util/GrowingThreadPoolExecutor.java @@ -0,0 +1,85 @@ +package org.openslx.util; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Grows to maximum pool size before queueing. See + * http://stackoverflow.com/a/20153234/2043481 + */ +public class GrowingThreadPoolExecutor extends ThreadPoolExecutor { + private int userSpecifiedCorePoolSize; + private int taskCount; + + /** + * The default rejected execution handler + */ + private static final RejectedExecutionHandler defaultHandler = + new AbortPolicy(); + + public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, + TimeUnit unit, BlockingQueue<Runnable> workQueue) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, defaultHandler); + } + + public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); + } + + public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), + handler); + } + + public GrowingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + userSpecifiedCorePoolSize = corePoolSize; + } + + @Override + public void execute(Runnable runnable) { + synchronized (this) { + taskCount++; + setCorePoolSizeToTaskCountWithinBounds(); + } + super.execute(runnable); + } + + @Override + protected void afterExecute(Runnable runnable, Throwable throwable) { + super.afterExecute(runnable, throwable); + synchronized (this) { + taskCount--; + setCorePoolSizeToTaskCountWithinBounds(); + } + } + + private void setCorePoolSizeToTaskCountWithinBounds() { + int threads = taskCount; + if (threads < userSpecifiedCorePoolSize) + threads = userSpecifiedCorePoolSize; + if (threads > getMaximumPoolSize()) + threads = getMaximumPoolSize(); + super.setCorePoolSize(threads); + } + + public void setCorePoolSize(int corePoolSize) { + synchronized (this) { + userSpecifiedCorePoolSize = corePoolSize; + } + } + + @Override + public int getCorePoolSize() { + synchronized (this) { + return userSpecifiedCorePoolSize; + } + } +}
\ No newline at end of file |