diff options
6 files changed, 593 insertions, 157 deletions
diff --git a/src/main/java/fi/iki/elonen/ChunkedInputStream.java b/src/main/java/fi/iki/elonen/ChunkedInputStream.java new file mode 100644 index 0000000..1c0ba8d --- /dev/null +++ b/src/main/java/fi/iki/elonen/ChunkedInputStream.java @@ -0,0 +1,340 @@ +package fi.iki.elonen; + +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * <http://www.apache.org/>. + * + */ + +import java.io.IOException; +import java.io.InputStream; + +/** + * Implements chunked transfer coding. The content is received in small chunks. + * Entities transferred using this input stream can be of unlimited length. + * After the stream is read to the end, it provides access to the trailers, + * if any. + * <p> + * Note that this class NEVER closes the underlying stream, even when + * {@link #close()} gets called. Instead, it will read until the "end" of its + * chunking on close, which allows for the seamless execution of subsequent + * HTTP 1.1 requests, while not requiring the client to remember to read the + * entire contents of the response. + * + * + * @since 4.0 + * + */ +public class ChunkedInputStream extends InputStream +{ + + private enum State + { + CHUNK_LEN, CHUNK_DATA, CHUNK_CRLF, CHUNK_INVALID + } + + private static final int BUFFER_SIZE = 2048; + + /** The session input buffer */ + private final InputStream inputStream; + + private State state; + + /** The chunk size */ + private long chunkSize; + + /** The current position within the current chunk */ + private long pos; + + /** True if we've reached the end of stream */ + private boolean eof; + + /** True if this stream is closed */ + private boolean closed; + + /** + * Default constructor. + * + * @param buffer Session input buffer + * @param inputStream Input stream + * @param http1Config Message http1Config. If {@code null} {@link Http1Config#DEFAULT} will be + * used. + * + * @since 4.4 + */ + public ChunkedInputStream( final InputStream inputStream ) + { + super(); + this.inputStream = inputStream; + this.pos = 0L; + this.state = State.CHUNK_LEN; + } + + @Override + public int available() throws IOException + { + final int len = this.inputStream.available(); + return (int)Math.min( len, this.chunkSize - this.pos ); + } + + /** + * <p> + * Returns all the data in a chunked stream in coalesced form. A chunk + * is followed by a CRLF. The method returns -1 as soon as a chunksize of 0 + * is detected. + * </p> + * + * <p> + * Trailer headers are read automatically at the end of the stream and + * can be obtained with the getResponseFooters() method. + * </p> + * + * @return -1 of the end of the stream has been reached or the next data + * byte + * @throws IOException in case of an I/O error + */ + @Override + public int read() throws IOException + { + if ( this.closed ) { + throw new StreamClosedException( "Already closed" ); + } + if ( this.eof ) { + return -1; + } + if ( state != State.CHUNK_DATA ) { + nextChunk(); + if ( this.eof ) { + return -1; + } + } + final int b = inputStream.read(); + if ( b != -1 ) { + pos++; + if ( pos >= chunkSize ) { + state = State.CHUNK_CRLF; + } + return b; + } + throw new MalformedChunkCodingException( "Truncated chunk (expected size: " + chunkSize + "; actual size: " + pos + ")" ); + } + + /** + * Read some bytes from the stream. + * + * @param b The byte array that will hold the contents from the stream. + * @param off The offset into the byte array at which bytes will start to be + * placed. + * @param len the maximum number of bytes that can be returned. + * @return The number of bytes returned or -1 if the end of stream has been + * reached. + * @throws IOException in case of an I/O error + */ + @Override + public int read( final byte[] b, final int off, final int len ) throws IOException + { + if ( closed ) { + throw new StreamClosedException( "Already closed" ); + } + + if ( eof ) { + return -1; + } + if ( state != State.CHUNK_DATA ) { + nextChunk(); + if ( eof ) { + return -1; + } + } + int bytesRead = inputStream.read( b, off, (int)Math.min( len, chunkSize - pos ) ); + if ( bytesRead != -1 ) { + pos += bytesRead; + if ( pos >= chunkSize ) { + state = State.CHUNK_CRLF; + } + return bytesRead; + } + eof = true; + throw new MalformedChunkCodingException( "Truncated chunk (expected size: " + chunkSize + "; actual size: " + pos + ")" ); + } + + /** + * Read some bytes from the stream. + * + * @param b The byte array that will hold the contents from the stream. + * @return The number of bytes returned or -1 if the end of stream has been + * reached. + * @throws IOException in case of an I/O error + */ + @Override + public int read( final byte[] b ) throws IOException + { + return read( b, 0, b.length ); + } + + /** + * Read the next chunk. + * + * @throws IOException in case of an I/O error + */ + private void nextChunk() throws IOException + { + if ( state == State.CHUNK_INVALID ) { + throw new MalformedChunkCodingException( "Corrupt data stream" ); + } + try { + chunkSize = getChunkSize(); + if ( chunkSize < 0L ) { + throw new MalformedChunkCodingException( "Negative chunk size" ); + } + state = State.CHUNK_DATA; + pos = 0L; + if ( chunkSize == 0L ) { + eof = true; + try { + inputStream.read(); + inputStream.read(); + } catch ( IOException e ) { + throw new MalformedChunkCodingException( "No CRLF after final zero chunk" ); + } + } + } catch ( final MalformedChunkCodingException ex ) { + state = State.CHUNK_INVALID; + throw ex; + } + } + + /** + * Expects the stream to start with a chunksize in hex with optional + * comments after a semicolon. The line must end with a CRLF: "a3; some + * comment\r\n" Positions the stream at the start of the next line. + */ + private long getChunkSize() throws IOException + { + int ch; + int prevCh; + boolean hadSemi; + final State st = this.state; + switch ( st ) { + case CHUNK_CRLF: + ch = inputStream.read(); + ch = ( ch << 8 ) | inputStream.read(); + if ( ch < 0 ) { + throw new MalformedChunkCodingException( + "CRLF expected at end of chunk" ); + } + if ( ch != 3338 ) { + throw new MalformedChunkCodingException( + "Unexpected content at the end of chunk" ); + } + state = State.CHUNK_LEN; + //$FALL-THROUGH$ + case CHUNK_LEN: + prevCh = -1; + hadSemi = false; + StringBuilder sb = new StringBuilder( 8 ); + for ( ;; ) { + ch = inputStream.read(); + if ( ch == -1 ) + break; + if ( prevCh == 13 && ch == 10 ) { + break; + } + prevCh = ch; + if ( ch == 13 ) + continue; + if ( ch == ';' ) { + hadSemi = true; + } + if ( hadSemi ) + continue; + sb.append( (char)ch ); + } + if ( prevCh != 13 || ch != 10 ) { + throw new StreamClosedException( "Premature end of chunk coded message body" ); + } + final String s = sb.toString(); + try { + return Long.parseLong( s, 16 ); + } catch ( final NumberFormatException e ) { + throw new MalformedChunkCodingException( "Invalid hex-length in chunk header: " + s ); + } + default: + throw new IllegalStateException( "Inconsistent codec state" ); + } + } + + /** + * Reads the remainder of the chunked message, leaving the underlying + * stream at a position to start reading the next response without + * scanning. But does NOT close the underlying stream. + * + * @throws IOException in case of an I/O error + */ + @Override + public void close() throws IOException + { + if ( !closed ) { + try { + if ( !eof && state != State.CHUNK_INVALID ) { + // Optimistically check if the content has been fully read + // when there's no data remaining in the current chunk. + // This is common when self-terminating content (e.g. JSON) + // is parsed from response streams. + if ( chunkSize == pos && chunkSize > 0 && read() == -1 ) { + return; + } + // read and discard the remainder of the message + final byte[] buff = new byte[ BUFFER_SIZE ]; + while ( read( buff ) >= 0 ) { + } + } + } finally { + eof = true; + closed = true; + } + } + } + + public static class MalformedChunkCodingException extends IOException + { + private static final long serialVersionUID = 7092137465179737109L; + + public MalformedChunkCodingException( String msg ) + { + super( msg ); + } + } + + public static class StreamClosedException extends IOException + { + private static final long serialVersionUID = -5871567871867283867L; + + public StreamClosedException( String msg ) + { + super( msg ); + } + } + +} diff --git a/src/main/java/fi/iki/elonen/NanoHTTPD.java b/src/main/java/fi/iki/elonen/NanoHTTPD.java index 319164a..395eca7 100644 --- a/src/main/java/fi/iki/elonen/NanoHTTPD.java +++ b/src/main/java/fi/iki/elonen/NanoHTTPD.java @@ -65,6 +65,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.input.BoundedInputStream; import org.apache.commons.io.output.ByteArrayOutputStream; import org.openslx.util.GrowingThreadPoolExecutor; import org.openslx.util.PrioThreadFactory; @@ -203,40 +204,40 @@ public abstract class NanoHTTPD implements Runnable do { try { - final Socket finalAccept = myServerSocket.accept(); - registerConnection( finalAccept ); - finalAccept.setSoTimeout( SOCKET_READ_TIMEOUT ); - final InputStream inputStream = finalAccept.getInputStream(); - asyncRunner.execute( new Runnable() { - @Override - public void run() - { - OutputStream outputStream = null; - try { - outputStream = finalAccept.getOutputStream(); - HTTPSession session = new HTTPSession( inputStream, outputStream, - finalAccept.getInetAddress() ); - while ( !finalAccept.isClosed() && !finalAccept.isInputShutdown() ) { - session.execute(); - } - } catch ( RejectedExecutionException e ) { + final Socket sck = myServerSocket.accept(); + registerConnection( sck ); + sck.setSoTimeout( SOCKET_READ_TIMEOUT ); + final InputStream inputStream = sck.getInputStream(); + try { + asyncRunner.execute( new Runnable() { + @Override + public void run() + { + OutputStream outputStream = null; try { - outputStream.write( "HTTP/1.1 503 Overloaded\r\nConnection: Close\r\n\r\n".getBytes() ); - } catch ( Exception e2 ) { - } - } catch ( Exception e ) { - // When the socket is closed by the client, we throw our own SocketException - // to break the "keep alive" loop above. - if ( ! ( e instanceof SocketTimeoutException ) - && ! ( e instanceof SocketException && "NanoHttpd Shutdown".equals( e.getMessage() ) ) ) { - e.printStackTrace(); + outputStream = sck.getOutputStream(); + HTTPSession session = new HTTPSession( inputStream, outputStream, + sck.getInetAddress() ); + while ( !sck.isClosed() && !sck.isInputShutdown() && !sck.isOutputShutdown() ) { + session.execute(); + } + } catch ( Exception e ) { + // When the socket is closed by the client, we throw our own SocketException + // to break the "keep alive" loop above. + if ( ! ( e instanceof SocketTimeoutException ) + && ! ( e instanceof SocketException && "NanoHttpd Shutdown".equals( e.getMessage() ) ) ) { + e.printStackTrace(); + } + } finally { + Util.safeClose( outputStream, inputStream, sck ); + unRegisterConnection( sck ); } - } finally { - Util.safeClose( outputStream, inputStream, finalAccept ); - unRegisterConnection( finalAccept ); } - } - } ); + } ); + } catch ( RejectedExecutionException e ) { + Util.safeClose( sck, inputStream ); + unRegisterConnection( sck ); + } } catch ( IOException e ) { } } while ( !myServerSocket.isClosed() ); @@ -339,11 +340,10 @@ public abstract class NanoHTTPD implements Runnable */ public Response serve( IHTTPSession session ) { - Map<String, String> files = new HashMap<String, String>(); Method method = session.getMethod(); if ( Method.PUT.equals( method ) || Method.POST.equals( method ) ) { try { - session.parseBody( files ); + session.parseBody(); } catch ( IOException ioe ) { return new Response( Response.Status.INTERNAL_ERROR, MIME_PLAINTEXT, "SERVER INTERNAL ERROR: IOException: " + ioe.getMessage() ); @@ -354,7 +354,7 @@ public abstract class NanoHTTPD implements Runnable Map<String, String> parms = session.getParms(); parms.put( QUERY_STRING_PARAMETER, session.getQueryParameterString() ); - return serve( session.getUri(), method, session.getHeaders(), parms, files ); + return serve( session.getUri(), method, session.getHeaders(), parms, null ); } /** @@ -565,7 +565,7 @@ public abstract class NanoHTTPD implements Runnable } sb.append( "HTTP/1.1 " ); sb.append( status.getDescription() ); - sb.append( " \r\n" ); + sb.append( "\r\n" ); if ( mime != null ) { sb.append( "Content-Type: " ); @@ -759,7 +759,8 @@ public abstract class NanoHTTPD implements Runnable NOT_FOUND( 404, "Not Found" ), METHOD_NOT_ALLOWED( 405, "Method Not Allowed" ), RANGE_NOT_SATISFIABLE( 416, "Requested Range Not Satisfiable" ), - INTERNAL_ERROR( 500, "Internal Server Error" ); + INTERNAL_ERROR( 500, "Internal Server Error" ), + SERVICE_UNAVAILABLE( 503, "Service Unavailable" ); private final int requestStatus; private final String description; @@ -835,7 +836,7 @@ public abstract class NanoHTTPD implements Runnable * * @param files map to modify */ - void parseBody( Map<String, String> files ) throws IOException, ResponseException; + void parseBody() throws IOException, ResponseException; } protected class HTTPSession implements IHTTPSession @@ -843,20 +844,15 @@ public abstract class NanoHTTPD implements Runnable public static final int BUFSIZE = 8192; private final OutputStream outputStream; private PushbackInputStream inputStream; + private InputStream wrappedInput; private int splitbyte; private int rlen; - private String uri; private Method method; private Map<String, String> parms; private Map<String, String> headers; private String queryParameterString; private String remoteIp; - - public HTTPSession( InputStream inputStream, OutputStream outputStream ) - { - this.inputStream = new PushbackInputStream( inputStream, BUFSIZE ); - this.outputStream = outputStream; - } + private boolean doClose; public HTTPSession( InputStream inputStream, OutputStream outputStream, InetAddress inetAddress ) { @@ -865,12 +861,13 @@ public abstract class NanoHTTPD implements Runnable remoteIp = inetAddress.isLoopbackAddress() || inetAddress.isAnyLocalAddress() ? "127.0.0.1" : inetAddress.getHostAddress().toString(); headers = new HashMap<String, String>(); + parms = new HashMap<String, String>(); } @Override public void execute() throws IOException { - boolean close = true; + doClose = true; try { // Read the first 8192 bytes. // The full header should fit in here. @@ -893,8 +890,9 @@ public abstract class NanoHTTPD implements Runnable while ( read > 0 ) { rlen += read; splitbyte = findHeaderEnd( buf, rlen ); - if ( splitbyte > 0 ) + if ( splitbyte > 0 || rlen >= BUFSIZE ) break; + // Try to keep reading as long as we've got less than 8kb read = inputStream.read( buf, rlen, BUFSIZE - rlen ); if ( maxRequestSize != 0 && rlen > maxRequestSize ) throw new SocketException( "Request too large" ); @@ -908,12 +906,8 @@ public abstract class NanoHTTPD implements Runnable inputStream.unread( buf, splitbyte, rlen - splitbyte ); } - parms = new HashMap<String, String>(); - if ( null == headers ) { - headers = new HashMap<String, String>(); - } else { - headers.clear(); - } + parms.clear(); + headers.clear(); if ( null != remoteIp ) { headers.put( "remote-addr", remoteIp ); @@ -922,19 +916,25 @@ public abstract class NanoHTTPD implements Runnable // Create a BufferedReader for parsing the header. BufferedReader hin = new BufferedReader( new InputStreamReader( new ByteArrayInputStream( buf, - 0, rlen ) ) ); + 0, splitbyte ) ) ); // Decode the header into parms and header java properties - Map<String, String> pre = new HashMap<String, String>(); - decodeHeader( hin, pre, parms, headers ); + decodeHeader( hin, parms, headers ); - method = Method.lookup( pre.get( "method" ) ); + if ( !Util.isEmptyString( headers.get( "trailer" ) ) ) { + throw new ResponseException( Response.Status.BAD_REQUEST, "BAD REQUEST: Trailers not supported." ); + } + + method = Method.lookup( headers.get( "http-method" ) ); if ( method == null ) { throw new ResponseException( Response.Status.BAD_REQUEST, "BAD REQUEST: Syntax error." ); } - uri = pre.get( "uri" ); - close = "close".equalsIgnoreCase( headers.get( "connection" ) ); + doClose = "close".equalsIgnoreCase( headers.get( "connection" ) ) || "1.0".equals( headers.get( "http-version" ) ); + if ( !doClose && method != Method.GET + && !headers.containsKey( "content-length" ) && !"chunked".equals( headers.get( "transfer-encoding" ) ) ) { + doClose = true; + } // Ok, now do the serve() Response r = serve( this ); @@ -943,10 +943,16 @@ public abstract class NanoHTTPD implements Runnable "SERVER INTERNAL ERROR: Serve() returned a null response." ); } else { r.setRequestMethod( method ); - r.send( outputStream, close ); + r.send( outputStream, doClose ); } - if ( close ) { - Util.safeClose( outputStream ); + if ( doClose ) { + Util.safeClose( outputStream, inputStream ); + } else { + InputStream is = getInputStream(); + if ( is != null ) { + while ( is.read( buf ) > 0 ) { + } + } } } catch ( SocketException e ) { // throw it out to close socket object (finalAccept) @@ -956,86 +962,85 @@ public abstract class NanoHTTPD implements Runnable } catch ( IOException ioe ) { Response r = new Response( Response.Status.INTERNAL_ERROR, MIME_PLAINTEXT, "SERVER INTERNAL ERROR: IOException: " + ioe.getMessage() ); - r.send( outputStream, close ); + r.send( outputStream, doClose ); Util.safeClose( outputStream ); } catch ( ResponseException re ) { Response r = new Response( re.getStatus(), MIME_PLAINTEXT, re.getMessage() ); - r.send( outputStream, close ); + r.send( outputStream, doClose ); Util.safeClose( outputStream ); } } @Override - public void parseBody( Map<String, String> files ) throws IOException, ResponseException + public void parseBody() throws IOException, ResponseException { + // If the method is POST, there may be parameters + // in data section, too, read it: + if ( method != Method.POST ) + return; + long size; if ( headers.containsKey( "content-length" ) ) { - size = Integer.parseInt( headers.get( "content-length" ) ); - } else if ( splitbyte < rlen ) { - size = rlen - splitbyte; + size = Util.parseInt( headers.get( "content-length" ), -1 ); } else { - size = 0; + size = -1; } - // If the method is POST, there may be parameters - // in data section, too, read it: - if ( Method.POST.equals( method ) ) { - String contentType = null; - String contentEncoding = null; - String contentTypeHeader = headers.get( "content-type" ); - - StringTokenizer st = null; - if ( contentTypeHeader != null ) { - st = new StringTokenizer( contentTypeHeader, "," ); - if ( st.hasMoreTokens() ) { - String part[] = st.nextToken().split( ";\\s*", 2 ); - contentType = part[0]; - if ( part.length == 2 ) { - contentEncoding = part[1]; - } + String contentType = null; + String contentEncoding = null; + String contentTypeHeader = headers.get( "content-type" ); + + StringTokenizer st = null; + if ( contentTypeHeader != null ) { + st = new StringTokenizer( contentTypeHeader, "," ); + if ( st.hasMoreTokens() ) { + String part[] = st.nextToken().split( ";\\s*", 2 ); + contentType = part[0].trim(); + if ( part.length == 2 ) { + contentEncoding = part[1]; } } - Charset cs = StandardCharsets.ISO_8859_1; - if ( contentEncoding != null ) { - try { - cs = Charset.forName( contentEncoding ); - } catch ( Exception e ) { - } + } + Charset cs = StandardCharsets.UTF_8; + if ( contentEncoding != null ) { + try { + cs = Charset.forName( contentEncoding ); + } catch ( Exception e ) { } - //LOGGER.debug("Content type is '" + contentType + "', encoding '" + cs.name() + "'"); + } - if ( "multipart/form-data".equalsIgnoreCase( contentType ) ) { - throw new ResponseException( Response.Status.BAD_REQUEST, - "BAD REQUEST: Content type is multipart/form-data, which is not supported" ); - } else { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - byte pbuf[] = new byte[ 1000 ]; - while ( size > 0 ) { - int ret = inputStream.read( pbuf, 0, (int)Math.min( size, pbuf.length ) ); - if ( ret <= 0 ) - break; - if ( ret >= 2 && pbuf[ret - 1] == '\n' && pbuf[ret - 2] == '\r' ) - break; - size -= ret; - baos.write( pbuf, 0, ret ); - } - String postLine = new String( baos.toByteArray(), cs ); - baos.close(); - // Handle application/x-www-form-urlencoded - if ( "application/x-www-form-urlencoded".equalsIgnoreCase( contentType ) ) { - decodeParms( postLine, parms ); - } else if ( files != null && postLine.length() != 0 ) { - // Special case for raw POST data => create a special files entry "postData" with raw content data - files.put( "postData", postLine ); - } + // Use method here so we get the limited/chunked stream if applicable + InputStream is = getInputStream(); + if ( is == null ) + return; + + if ( "multipart/form-data".equalsIgnoreCase( contentType ) ) { + throw new ResponseException( Response.Status.BAD_REQUEST, + "BAD REQUEST: Content type is multipart/form-data, which is not supported" ); + } else if ( "application/x-www-form-urlencoded".equalsIgnoreCase( contentType ) ) { + // Handle application/x-www-form-urlencoded + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte pbuf[] = new byte[ 2000 ]; + while ( size > 0 ) { + int ret = is.read( pbuf, 0, (int)Math.min( size, pbuf.length ) ); + if ( ret <= 0 ) + break; + if ( ret >= 2 && pbuf[ret - 1] == '\n' && pbuf[ret - 2] == '\r' ) + break; + size -= ret; + baos.write( pbuf, 0, ret ); } - } + String postLine = new String( baos.toByteArray(), cs ); + baos.close(); + + decodeParms( postLine, parms ); + } // Otherwise leave stream untouched so app can deal with it } /** * Decodes the sent headers and loads the data into Key/value pairs */ - private void decodeHeader( BufferedReader in, Map<String, String> pre, Map<String, String> parms, + private void decodeHeader( BufferedReader in, Map<String, String> parms, Map<String, String> headers ) throws ResponseException { @@ -1052,7 +1057,7 @@ public abstract class NanoHTTPD implements Runnable "BAD REQUEST: Syntax error. Usage: GET /example/file.html" ); } - pre.put( "method", st.nextToken() ); + String strMethod = st.nextToken(); if ( !st.hasMoreTokens() ) { throw new ResponseException( Response.Status.BAD_REQUEST, @@ -1071,21 +1076,28 @@ public abstract class NanoHTTPD implements Runnable } // If there's another token, its protocol version, - // followed by HTTP headers. Ignore version but parse headers. + // followed by HTTP headers. // NOTE: this now forces header names lower case since they are // case insensitive and vary by client. if ( st.hasMoreTokens() ) { + String strVersion = st.nextToken(); String line = in.readLine(); - while ( line != null && line.trim().length() > 0 ) { + while ( line != null && line.length() > 0 ) { int p = line.indexOf( ':' ); if ( p >= 0 ) headers.put( line.substring( 0, p ).trim().toLowerCase( Locale.US ), line.substring( p + 1 ).trim() ); line = in.readLine(); } + // Add version after, to override headers + int sl = strVersion.indexOf( '/' ); + if ( sl != -1 ) { + headers.put( "http-version", strVersion.substring( sl + 1 ) ); + } } - pre.put( "uri", uri ); + headers.put( "http-uri", uri ); + headers.put( "http-method", strMethod ); } catch ( IOException ioe ) { throw new ResponseException( Response.Status.INTERNAL_ERROR, "SERVER INTERNAL ERROR: IOException: " + ioe.getMessage(), ioe ); @@ -1093,9 +1105,8 @@ public abstract class NanoHTTPD implements Runnable } /** - * Find byte index separating header from body. It must be the last byte - * of the first two - * sequential new lines. + * Find byte index separating header from body. Return value will point + * to first byte after the final \r\n\r\n. */ private int findHeaderEnd( final byte[] buf, int rlen ) { @@ -1107,7 +1118,7 @@ public abstract class NanoHTTPD implements Runnable } splitbyte++; } - return 0; + return -1; } /** @@ -1157,7 +1168,7 @@ public abstract class NanoHTTPD implements Runnable @Override public final String getUri() { - return uri; + return headers.get( "http-uri" ); } @Override @@ -1166,9 +1177,36 @@ public abstract class NanoHTTPD implements Runnable return method; } + @SuppressWarnings( "deprecation" ) @Override public final InputStream getInputStream() { + if ( method == Method.GET ) + return null; + if ( wrappedInput == null ) { + String s = headers.get( "content-length" ); + if ( s != null ) { + long cl = Util.parseLong( s, -1 ); + if ( cl == 0 ) + return null; + if ( cl < 0 ) { + doClose = true; + return null; + } + BoundedInputStream bis = new BoundedInputStream( inputStream, cl ); + bis.setPropagateClose( false ); + return wrappedInput = bis; + } + s = headers.get( "transfer-encoding" ); + if ( s != null ) { + if ( "chunked".equalsIgnoreCase( s.trim() ) ) + return wrappedInput = new ChunkedInputStream( inputStream ); + } + } else { + return wrappedInput; + } + if ( !doClose ) + return null; return inputStream; } } diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java index bd927b1..27f8e8c 100644 --- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java +++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java @@ -1,5 +1,6 @@ package org.openslx.filetransfer.util; +import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -92,7 +93,7 @@ public class ChunkList * Get CRC32 list in DNBD3 format. All checksums are little * endian and prefixed by the crc32 sum of the list itself. */ - public synchronized byte[] getDnbd3Crc32List() throws IOException + public synchronized byte[] getDnbd3Crc32List() throws IllegalStateException { byte buffer[] = new byte[ allChunks.size() * 4 + 4 ]; // 4 byte per chunk plus master long nextChunkOffset = 0; @@ -143,7 +144,6 @@ public class ChunkList * Returns true if this list contains a chunk with state MISSING, * which means the chunk doesn't have a sha1 known to exist in * another image. - * @return */ public synchronized boolean hasLocallyMissingChunk() { @@ -521,4 +521,21 @@ public class ChunkList return chunk.sha1sum != null && Arrays.equals( FileChunk.NULL_BLOCK_SHA1, chunk.sha1sum ); } + /** + * Write DNBD3 CRC32 list to given file. + * + * @throws IllegalStateException + * @throws IOException + */ + public void writeCrc32List( String crcfile ) throws IllegalStateException, IOException + { + byte[] dnbd3Crc32List = null; + dnbd3Crc32List = getDnbd3Crc32List(); + if ( dnbd3Crc32List != null ) { + try ( FileOutputStream fos = new FileOutputStream( crcfile ) ) { + fos.write( dnbd3Crc32List ); + } + } + } + } diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java index 41bd05a..abbcd35 100644 --- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java +++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java @@ -15,9 +15,11 @@ import org.apache.logging.log4j.Logger; public class HashChecker { public static final int BLOCKING = 1; - public static final int CALC_HASH = 2; + public static final int CHECK_SHA1 = 2; public static final int CALC_CRC32 = 4; - + public static final int CALC_SHA1 = 8; + public static final int NO_SLOW_WARN = 16; + private static final Logger LOGGER = LogManager.getLogger( HashChecker.class ); private final BlockingQueue<HashTask> queue; @@ -27,7 +29,7 @@ public class HashChecker private final String algorithm; private boolean invalid = false; - + private final int queueCapacity; public HashChecker( String algorithm ) throws NoSuchAlgorithmException @@ -97,11 +99,12 @@ public class HashChecker public boolean queue( FileChunk chunk, byte[] data, HashCheckCallback callback, int flags ) throws InterruptedException { boolean blocking = ( flags & BLOCKING ) != 0; - boolean doHash = ( flags & CALC_HASH ) != 0; - boolean doCrc32 = ( flags & CALC_CRC32 ) != 0; - if ( doHash && chunk.getSha1Sum() == null ) + boolean checkSha1 = ( flags & CHECK_SHA1 ) != 0; + boolean calcCrc32 = ( flags & CALC_CRC32 ) != 0; + boolean calcSha1 = ( flags & CALC_SHA1 ) != 0; + if ( checkSha1 && chunk.getSha1Sum() == null ) throw new NullPointerException( "Chunk has no sha1 hash" ); - HashTask task = new HashTask( data, chunk, callback, doHash, doCrc32 ); + HashTask task = new HashTask( data, chunk, callback, checkSha1, calcCrc32, calcSha1 ); synchronized ( threads ) { if ( invalid ) { execCallback( task, HashResult.FAILURE ); @@ -133,15 +136,17 @@ public class HashChecker } } } - if ( doHash ) { + if ( checkSha1 ) { chunk.setStatus( ChunkStatus.HASHING ); } if ( blocking ) { long pre = System.currentTimeMillis(); queue.put( task ); - long duration = System.currentTimeMillis() - pre; - if ( duration > 1000 ) { - LOGGER.warn( "HashChecker.queue() took " + duration + "ms" ); + if ( ( flags & NO_SLOW_WARN ) == 0 ) { + long duration = System.currentTimeMillis() - pre; + if ( duration > 1000 ) { + LOGGER.warn( "HashChecker.queue() took " + duration + "ms" ); + } } } else { if ( !queue.offer( task ) ) { @@ -158,7 +163,7 @@ public class HashChecker { return queue.size(); } - + public int getQueueCapacity() { return queueCapacity; @@ -208,15 +213,19 @@ public class HashChecker break; } HashResult result = HashResult.NONE; - if ( task.doHash ) { + if ( task.checkSha1 || task.calcSha1 ) { // Calculate digest - md.update( task.data, 0, task.chunk.range.getLength() ); - byte[] digest = md.digest(); - result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID; + md.update( task.data, 0, task.chunk.range.getLength() ); + byte[] digest = md.digest(); + if ( task.checkSha1 ) { + result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID; + } else { + task.chunk.setSha1Sum( digest ); + } } - if ( task.doCrc32 ) { - // Calculate CRC32 - task.chunk.calculateDnbd3Crc32( task.data ); + if ( task.calcCrc32 ) { + // Calculate CRC32 + task.chunk.calculateDnbd3Crc32( task.data ); } execCallback( task, result ); } @@ -240,16 +249,18 @@ public class HashChecker public final byte[] data; public final FileChunk chunk; public final HashCheckCallback callback; - public final boolean doHash; - public final boolean doCrc32; + public final boolean checkSha1; + public final boolean calcCrc32; + public final boolean calcSha1; - public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean doHash, boolean doCrc32 ) + public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean checkSha1, boolean calcCrc32, boolean calcSha1 ) { this.data = data; this.chunk = chunk; this.callback = callback; - this.doHash = doHash; - this.doCrc32 = doCrc32; + this.checkSha1 = checkSha1; + this.calcCrc32 = calcCrc32; + this.calcSha1 = calcSha1; } } diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java index 8e68dc2..5cca7b8 100644 --- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java +++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java @@ -252,7 +252,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H continue; } try { - if ( !hashChecker.queue( chunk, data, this, HashChecker.CALC_HASH ) ) { // false == queue full, stop + if ( !hashChecker.queue( chunk, data, this, HashChecker.CHECK_SHA1 ) ) { // false == queue full, stop chunks.markCompleted( chunk, false ); break; } @@ -435,7 +435,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H InterruptedException passEx = null; if ( hashChecker != null && currentChunk.getSha1Sum() != null ) { try { - hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH ); + hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CHECK_SHA1 ); return true; } catch ( InterruptedException e ) { passEx = e; @@ -650,7 +650,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H return; } try { - int flags = HashChecker.CALC_HASH; + int flags = HashChecker.CHECK_SHA1; if ( blocking ) { flags |= HashChecker.BLOCKING; } @@ -686,7 +686,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H } } - protected HashChecker getHashChecker() + public static HashChecker getHashChecker() { return hashChecker; } diff --git a/src/main/java/org/openslx/util/Util.java b/src/main/java/org/openslx/util/Util.java index 8593f80..5300cfa 100644 --- a/src/main/java/org/openslx/util/Util.java +++ b/src/main/java/org/openslx/util/Util.java @@ -74,7 +74,6 @@ public class Util * * @param value string representation to parse to an int * @param defaultValue fallback value if given string can't be parsed - * @return */ public static int parseInt( String value, int defaultValue ) { @@ -85,6 +84,23 @@ public class Util } } + /** + * Parse the given String as a base10 long. + * If the string does not represent a valid long, return the given + * default value. + * + * @param value string representation to parse to a long + * @param defaultValue fallback value if given string can't be parsed + */ + public static long parseLong( String value, long defaultValue ) + { + try { + return Long.parseLong( value ); + } catch ( Exception e ) { + return defaultValue; + } + } + public static void safeClose( AutoCloseable... closeable ) { for ( AutoCloseable c : closeable ) { @@ -134,4 +150,18 @@ public class Util return System.nanoTime() / 1000; } + private static final String[] UNITS = new String[] { "B", "KB", "MB", "GB", "TB", "PB", "???" }; + + public static String formatBytes( double val ) + { + int unit = 0; + while ( val > 1024 ) { + val /= 1024; + unit++; + if (unit >= UNITS.length) + break; + } + return String.format( "%.1f %s", val, UNITS[unit] ); + } + } |