summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/main/java/fi/iki/elonen/ChunkedInputStream.java340
-rw-r--r--src/main/java/fi/iki/elonen/NanoHTTPD.java290
-rw-r--r--src/main/java/org/openslx/filetransfer/util/ChunkList.java21
-rw-r--r--src/main/java/org/openslx/filetransfer/util/HashChecker.java59
-rw-r--r--src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java8
-rw-r--r--src/main/java/org/openslx/util/Util.java32
6 files changed, 593 insertions, 157 deletions
diff --git a/src/main/java/fi/iki/elonen/ChunkedInputStream.java b/src/main/java/fi/iki/elonen/ChunkedInputStream.java
new file mode 100644
index 0000000..1c0ba8d
--- /dev/null
+++ b/src/main/java/fi/iki/elonen/ChunkedInputStream.java
@@ -0,0 +1,340 @@
+package fi.iki.elonen;
+
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation. For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Implements chunked transfer coding. The content is received in small chunks.
+ * Entities transferred using this input stream can be of unlimited length.
+ * After the stream is read to the end, it provides access to the trailers,
+ * if any.
+ * <p>
+ * Note that this class NEVER closes the underlying stream, even when
+ * {@link #close()} gets called. Instead, it will read until the "end" of its
+ * chunking on close, which allows for the seamless execution of subsequent
+ * HTTP 1.1 requests, while not requiring the client to remember to read the
+ * entire contents of the response.
+ *
+ *
+ * @since 4.0
+ *
+ */
+public class ChunkedInputStream extends InputStream
+{
+
+ private enum State
+ {
+ CHUNK_LEN, CHUNK_DATA, CHUNK_CRLF, CHUNK_INVALID
+ }
+
+ private static final int BUFFER_SIZE = 2048;
+
+ /** The session input buffer */
+ private final InputStream inputStream;
+
+ private State state;
+
+ /** The chunk size */
+ private long chunkSize;
+
+ /** The current position within the current chunk */
+ private long pos;
+
+ /** True if we've reached the end of stream */
+ private boolean eof;
+
+ /** True if this stream is closed */
+ private boolean closed;
+
+ /**
+ * Default constructor.
+ *
+ * @param buffer Session input buffer
+ * @param inputStream Input stream
+ * @param http1Config Message http1Config. If {@code null} {@link Http1Config#DEFAULT} will be
+ * used.
+ *
+ * @since 4.4
+ */
+ public ChunkedInputStream( final InputStream inputStream )
+ {
+ super();
+ this.inputStream = inputStream;
+ this.pos = 0L;
+ this.state = State.CHUNK_LEN;
+ }
+
+ @Override
+ public int available() throws IOException
+ {
+ final int len = this.inputStream.available();
+ return (int)Math.min( len, this.chunkSize - this.pos );
+ }
+
+ /**
+ * <p>
+ * Returns all the data in a chunked stream in coalesced form. A chunk
+ * is followed by a CRLF. The method returns -1 as soon as a chunksize of 0
+ * is detected.
+ * </p>
+ *
+ * <p>
+ * Trailer headers are read automatically at the end of the stream and
+ * can be obtained with the getResponseFooters() method.
+ * </p>
+ *
+ * @return -1 of the end of the stream has been reached or the next data
+ * byte
+ * @throws IOException in case of an I/O error
+ */
+ @Override
+ public int read() throws IOException
+ {
+ if ( this.closed ) {
+ throw new StreamClosedException( "Already closed" );
+ }
+ if ( this.eof ) {
+ return -1;
+ }
+ if ( state != State.CHUNK_DATA ) {
+ nextChunk();
+ if ( this.eof ) {
+ return -1;
+ }
+ }
+ final int b = inputStream.read();
+ if ( b != -1 ) {
+ pos++;
+ if ( pos >= chunkSize ) {
+ state = State.CHUNK_CRLF;
+ }
+ return b;
+ }
+ throw new MalformedChunkCodingException( "Truncated chunk (expected size: " + chunkSize + "; actual size: " + pos + ")" );
+ }
+
+ /**
+ * Read some bytes from the stream.
+ *
+ * @param b The byte array that will hold the contents from the stream.
+ * @param off The offset into the byte array at which bytes will start to be
+ * placed.
+ * @param len the maximum number of bytes that can be returned.
+ * @return The number of bytes returned or -1 if the end of stream has been
+ * reached.
+ * @throws IOException in case of an I/O error
+ */
+ @Override
+ public int read( final byte[] b, final int off, final int len ) throws IOException
+ {
+ if ( closed ) {
+ throw new StreamClosedException( "Already closed" );
+ }
+
+ if ( eof ) {
+ return -1;
+ }
+ if ( state != State.CHUNK_DATA ) {
+ nextChunk();
+ if ( eof ) {
+ return -1;
+ }
+ }
+ int bytesRead = inputStream.read( b, off, (int)Math.min( len, chunkSize - pos ) );
+ if ( bytesRead != -1 ) {
+ pos += bytesRead;
+ if ( pos >= chunkSize ) {
+ state = State.CHUNK_CRLF;
+ }
+ return bytesRead;
+ }
+ eof = true;
+ throw new MalformedChunkCodingException( "Truncated chunk (expected size: " + chunkSize + "; actual size: " + pos + ")" );
+ }
+
+ /**
+ * Read some bytes from the stream.
+ *
+ * @param b The byte array that will hold the contents from the stream.
+ * @return The number of bytes returned or -1 if the end of stream has been
+ * reached.
+ * @throws IOException in case of an I/O error
+ */
+ @Override
+ public int read( final byte[] b ) throws IOException
+ {
+ return read( b, 0, b.length );
+ }
+
+ /**
+ * Read the next chunk.
+ *
+ * @throws IOException in case of an I/O error
+ */
+ private void nextChunk() throws IOException
+ {
+ if ( state == State.CHUNK_INVALID ) {
+ throw new MalformedChunkCodingException( "Corrupt data stream" );
+ }
+ try {
+ chunkSize = getChunkSize();
+ if ( chunkSize < 0L ) {
+ throw new MalformedChunkCodingException( "Negative chunk size" );
+ }
+ state = State.CHUNK_DATA;
+ pos = 0L;
+ if ( chunkSize == 0L ) {
+ eof = true;
+ try {
+ inputStream.read();
+ inputStream.read();
+ } catch ( IOException e ) {
+ throw new MalformedChunkCodingException( "No CRLF after final zero chunk" );
+ }
+ }
+ } catch ( final MalformedChunkCodingException ex ) {
+ state = State.CHUNK_INVALID;
+ throw ex;
+ }
+ }
+
+ /**
+ * Expects the stream to start with a chunksize in hex with optional
+ * comments after a semicolon. The line must end with a CRLF: "a3; some
+ * comment\r\n" Positions the stream at the start of the next line.
+ */
+ private long getChunkSize() throws IOException
+ {
+ int ch;
+ int prevCh;
+ boolean hadSemi;
+ final State st = this.state;
+ switch ( st ) {
+ case CHUNK_CRLF:
+ ch = inputStream.read();
+ ch = ( ch << 8 ) | inputStream.read();
+ if ( ch < 0 ) {
+ throw new MalformedChunkCodingException(
+ "CRLF expected at end of chunk" );
+ }
+ if ( ch != 3338 ) {
+ throw new MalformedChunkCodingException(
+ "Unexpected content at the end of chunk" );
+ }
+ state = State.CHUNK_LEN;
+ //$FALL-THROUGH$
+ case CHUNK_LEN:
+ prevCh = -1;
+ hadSemi = false;
+ StringBuilder sb = new StringBuilder( 8 );
+ for ( ;; ) {
+ ch = inputStream.read();
+ if ( ch == -1 )
+ break;
+ if ( prevCh == 13 && ch == 10 ) {
+ break;
+ }
+ prevCh = ch;
+ if ( ch == 13 )
+ continue;
+ if ( ch == ';' ) {
+ hadSemi = true;
+ }
+ if ( hadSemi )
+ continue;
+ sb.append( (char)ch );
+ }
+ if ( prevCh != 13 || ch != 10 ) {
+ throw new StreamClosedException( "Premature end of chunk coded message body" );
+ }
+ final String s = sb.toString();
+ try {
+ return Long.parseLong( s, 16 );
+ } catch ( final NumberFormatException e ) {
+ throw new MalformedChunkCodingException( "Invalid hex-length in chunk header: " + s );
+ }
+ default:
+ throw new IllegalStateException( "Inconsistent codec state" );
+ }
+ }
+
+ /**
+ * Reads the remainder of the chunked message, leaving the underlying
+ * stream at a position to start reading the next response without
+ * scanning. But does NOT close the underlying stream.
+ *
+ * @throws IOException in case of an I/O error
+ */
+ @Override
+ public void close() throws IOException
+ {
+ if ( !closed ) {
+ try {
+ if ( !eof && state != State.CHUNK_INVALID ) {
+ // Optimistically check if the content has been fully read
+ // when there's no data remaining in the current chunk.
+ // This is common when self-terminating content (e.g. JSON)
+ // is parsed from response streams.
+ if ( chunkSize == pos && chunkSize > 0 && read() == -1 ) {
+ return;
+ }
+ // read and discard the remainder of the message
+ final byte[] buff = new byte[ BUFFER_SIZE ];
+ while ( read( buff ) >= 0 ) {
+ }
+ }
+ } finally {
+ eof = true;
+ closed = true;
+ }
+ }
+ }
+
+ public static class MalformedChunkCodingException extends IOException
+ {
+ private static final long serialVersionUID = 7092137465179737109L;
+
+ public MalformedChunkCodingException( String msg )
+ {
+ super( msg );
+ }
+ }
+
+ public static class StreamClosedException extends IOException
+ {
+ private static final long serialVersionUID = -5871567871867283867L;
+
+ public StreamClosedException( String msg )
+ {
+ super( msg );
+ }
+ }
+
+}
diff --git a/src/main/java/fi/iki/elonen/NanoHTTPD.java b/src/main/java/fi/iki/elonen/NanoHTTPD.java
index 319164a..395eca7 100644
--- a/src/main/java/fi/iki/elonen/NanoHTTPD.java
+++ b/src/main/java/fi/iki/elonen/NanoHTTPD.java
@@ -65,6 +65,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.openslx.util.GrowingThreadPoolExecutor;
import org.openslx.util.PrioThreadFactory;
@@ -203,40 +204,40 @@ public abstract class NanoHTTPD implements Runnable
do {
try {
- final Socket finalAccept = myServerSocket.accept();
- registerConnection( finalAccept );
- finalAccept.setSoTimeout( SOCKET_READ_TIMEOUT );
- final InputStream inputStream = finalAccept.getInputStream();
- asyncRunner.execute( new Runnable() {
- @Override
- public void run()
- {
- OutputStream outputStream = null;
- try {
- outputStream = finalAccept.getOutputStream();
- HTTPSession session = new HTTPSession( inputStream, outputStream,
- finalAccept.getInetAddress() );
- while ( !finalAccept.isClosed() && !finalAccept.isInputShutdown() ) {
- session.execute();
- }
- } catch ( RejectedExecutionException e ) {
+ final Socket sck = myServerSocket.accept();
+ registerConnection( sck );
+ sck.setSoTimeout( SOCKET_READ_TIMEOUT );
+ final InputStream inputStream = sck.getInputStream();
+ try {
+ asyncRunner.execute( new Runnable() {
+ @Override
+ public void run()
+ {
+ OutputStream outputStream = null;
try {
- outputStream.write( "HTTP/1.1 503 Overloaded\r\nConnection: Close\r\n\r\n".getBytes() );
- } catch ( Exception e2 ) {
- }
- } catch ( Exception e ) {
- // When the socket is closed by the client, we throw our own SocketException
- // to break the "keep alive" loop above.
- if ( ! ( e instanceof SocketTimeoutException )
- && ! ( e instanceof SocketException && "NanoHttpd Shutdown".equals( e.getMessage() ) ) ) {
- e.printStackTrace();
+ outputStream = sck.getOutputStream();
+ HTTPSession session = new HTTPSession( inputStream, outputStream,
+ sck.getInetAddress() );
+ while ( !sck.isClosed() && !sck.isInputShutdown() && !sck.isOutputShutdown() ) {
+ session.execute();
+ }
+ } catch ( Exception e ) {
+ // When the socket is closed by the client, we throw our own SocketException
+ // to break the "keep alive" loop above.
+ if ( ! ( e instanceof SocketTimeoutException )
+ && ! ( e instanceof SocketException && "NanoHttpd Shutdown".equals( e.getMessage() ) ) ) {
+ e.printStackTrace();
+ }
+ } finally {
+ Util.safeClose( outputStream, inputStream, sck );
+ unRegisterConnection( sck );
}
- } finally {
- Util.safeClose( outputStream, inputStream, finalAccept );
- unRegisterConnection( finalAccept );
}
- }
- } );
+ } );
+ } catch ( RejectedExecutionException e ) {
+ Util.safeClose( sck, inputStream );
+ unRegisterConnection( sck );
+ }
} catch ( IOException e ) {
}
} while ( !myServerSocket.isClosed() );
@@ -339,11 +340,10 @@ public abstract class NanoHTTPD implements Runnable
*/
public Response serve( IHTTPSession session )
{
- Map<String, String> files = new HashMap<String, String>();
Method method = session.getMethod();
if ( Method.PUT.equals( method ) || Method.POST.equals( method ) ) {
try {
- session.parseBody( files );
+ session.parseBody();
} catch ( IOException ioe ) {
return new Response( Response.Status.INTERNAL_ERROR, MIME_PLAINTEXT,
"SERVER INTERNAL ERROR: IOException: " + ioe.getMessage() );
@@ -354,7 +354,7 @@ public abstract class NanoHTTPD implements Runnable
Map<String, String> parms = session.getParms();
parms.put( QUERY_STRING_PARAMETER, session.getQueryParameterString() );
- return serve( session.getUri(), method, session.getHeaders(), parms, files );
+ return serve( session.getUri(), method, session.getHeaders(), parms, null );
}
/**
@@ -565,7 +565,7 @@ public abstract class NanoHTTPD implements Runnable
}
sb.append( "HTTP/1.1 " );
sb.append( status.getDescription() );
- sb.append( " \r\n" );
+ sb.append( "\r\n" );
if ( mime != null ) {
sb.append( "Content-Type: " );
@@ -759,7 +759,8 @@ public abstract class NanoHTTPD implements Runnable
NOT_FOUND( 404, "Not Found" ),
METHOD_NOT_ALLOWED( 405, "Method Not Allowed" ),
RANGE_NOT_SATISFIABLE( 416, "Requested Range Not Satisfiable" ),
- INTERNAL_ERROR( 500, "Internal Server Error" );
+ INTERNAL_ERROR( 500, "Internal Server Error" ),
+ SERVICE_UNAVAILABLE( 503, "Service Unavailable" );
private final int requestStatus;
private final String description;
@@ -835,7 +836,7 @@ public abstract class NanoHTTPD implements Runnable
*
* @param files map to modify
*/
- void parseBody( Map<String, String> files ) throws IOException, ResponseException;
+ void parseBody() throws IOException, ResponseException;
}
protected class HTTPSession implements IHTTPSession
@@ -843,20 +844,15 @@ public abstract class NanoHTTPD implements Runnable
public static final int BUFSIZE = 8192;
private final OutputStream outputStream;
private PushbackInputStream inputStream;
+ private InputStream wrappedInput;
private int splitbyte;
private int rlen;
- private String uri;
private Method method;
private Map<String, String> parms;
private Map<String, String> headers;
private String queryParameterString;
private String remoteIp;
-
- public HTTPSession( InputStream inputStream, OutputStream outputStream )
- {
- this.inputStream = new PushbackInputStream( inputStream, BUFSIZE );
- this.outputStream = outputStream;
- }
+ private boolean doClose;
public HTTPSession( InputStream inputStream, OutputStream outputStream, InetAddress inetAddress )
{
@@ -865,12 +861,13 @@ public abstract class NanoHTTPD implements Runnable
remoteIp = inetAddress.isLoopbackAddress() || inetAddress.isAnyLocalAddress() ? "127.0.0.1"
: inetAddress.getHostAddress().toString();
headers = new HashMap<String, String>();
+ parms = new HashMap<String, String>();
}
@Override
public void execute() throws IOException
{
- boolean close = true;
+ doClose = true;
try {
// Read the first 8192 bytes.
// The full header should fit in here.
@@ -893,8 +890,9 @@ public abstract class NanoHTTPD implements Runnable
while ( read > 0 ) {
rlen += read;
splitbyte = findHeaderEnd( buf, rlen );
- if ( splitbyte > 0 )
+ if ( splitbyte > 0 || rlen >= BUFSIZE )
break;
+ // Try to keep reading as long as we've got less than 8kb
read = inputStream.read( buf, rlen, BUFSIZE - rlen );
if ( maxRequestSize != 0 && rlen > maxRequestSize )
throw new SocketException( "Request too large" );
@@ -908,12 +906,8 @@ public abstract class NanoHTTPD implements Runnable
inputStream.unread( buf, splitbyte, rlen - splitbyte );
}
- parms = new HashMap<String, String>();
- if ( null == headers ) {
- headers = new HashMap<String, String>();
- } else {
- headers.clear();
- }
+ parms.clear();
+ headers.clear();
if ( null != remoteIp ) {
headers.put( "remote-addr", remoteIp );
@@ -922,19 +916,25 @@ public abstract class NanoHTTPD implements Runnable
// Create a BufferedReader for parsing the header.
BufferedReader hin = new BufferedReader( new InputStreamReader( new ByteArrayInputStream( buf,
- 0, rlen ) ) );
+ 0, splitbyte ) ) );
// Decode the header into parms and header java properties
- Map<String, String> pre = new HashMap<String, String>();
- decodeHeader( hin, pre, parms, headers );
+ decodeHeader( hin, parms, headers );
- method = Method.lookup( pre.get( "method" ) );
+ if ( !Util.isEmptyString( headers.get( "trailer" ) ) ) {
+ throw new ResponseException( Response.Status.BAD_REQUEST, "BAD REQUEST: Trailers not supported." );
+ }
+
+ method = Method.lookup( headers.get( "http-method" ) );
if ( method == null ) {
throw new ResponseException( Response.Status.BAD_REQUEST, "BAD REQUEST: Syntax error." );
}
- uri = pre.get( "uri" );
- close = "close".equalsIgnoreCase( headers.get( "connection" ) );
+ doClose = "close".equalsIgnoreCase( headers.get( "connection" ) ) || "1.0".equals( headers.get( "http-version" ) );
+ if ( !doClose && method != Method.GET
+ && !headers.containsKey( "content-length" ) && !"chunked".equals( headers.get( "transfer-encoding" ) ) ) {
+ doClose = true;
+ }
// Ok, now do the serve()
Response r = serve( this );
@@ -943,10 +943,16 @@ public abstract class NanoHTTPD implements Runnable
"SERVER INTERNAL ERROR: Serve() returned a null response." );
} else {
r.setRequestMethod( method );
- r.send( outputStream, close );
+ r.send( outputStream, doClose );
}
- if ( close ) {
- Util.safeClose( outputStream );
+ if ( doClose ) {
+ Util.safeClose( outputStream, inputStream );
+ } else {
+ InputStream is = getInputStream();
+ if ( is != null ) {
+ while ( is.read( buf ) > 0 ) {
+ }
+ }
}
} catch ( SocketException e ) {
// throw it out to close socket object (finalAccept)
@@ -956,86 +962,85 @@ public abstract class NanoHTTPD implements Runnable
} catch ( IOException ioe ) {
Response r = new Response( Response.Status.INTERNAL_ERROR, MIME_PLAINTEXT,
"SERVER INTERNAL ERROR: IOException: " + ioe.getMessage() );
- r.send( outputStream, close );
+ r.send( outputStream, doClose );
Util.safeClose( outputStream );
} catch ( ResponseException re ) {
Response r = new Response( re.getStatus(), MIME_PLAINTEXT, re.getMessage() );
- r.send( outputStream, close );
+ r.send( outputStream, doClose );
Util.safeClose( outputStream );
}
}
@Override
- public void parseBody( Map<String, String> files ) throws IOException, ResponseException
+ public void parseBody() throws IOException, ResponseException
{
+ // If the method is POST, there may be parameters
+ // in data section, too, read it:
+ if ( method != Method.POST )
+ return;
+
long size;
if ( headers.containsKey( "content-length" ) ) {
- size = Integer.parseInt( headers.get( "content-length" ) );
- } else if ( splitbyte < rlen ) {
- size = rlen - splitbyte;
+ size = Util.parseInt( headers.get( "content-length" ), -1 );
} else {
- size = 0;
+ size = -1;
}
- // If the method is POST, there may be parameters
- // in data section, too, read it:
- if ( Method.POST.equals( method ) ) {
- String contentType = null;
- String contentEncoding = null;
- String contentTypeHeader = headers.get( "content-type" );
-
- StringTokenizer st = null;
- if ( contentTypeHeader != null ) {
- st = new StringTokenizer( contentTypeHeader, "," );
- if ( st.hasMoreTokens() ) {
- String part[] = st.nextToken().split( ";\\s*", 2 );
- contentType = part[0];
- if ( part.length == 2 ) {
- contentEncoding = part[1];
- }
+ String contentType = null;
+ String contentEncoding = null;
+ String contentTypeHeader = headers.get( "content-type" );
+
+ StringTokenizer st = null;
+ if ( contentTypeHeader != null ) {
+ st = new StringTokenizer( contentTypeHeader, "," );
+ if ( st.hasMoreTokens() ) {
+ String part[] = st.nextToken().split( ";\\s*", 2 );
+ contentType = part[0].trim();
+ if ( part.length == 2 ) {
+ contentEncoding = part[1];
}
}
- Charset cs = StandardCharsets.ISO_8859_1;
- if ( contentEncoding != null ) {
- try {
- cs = Charset.forName( contentEncoding );
- } catch ( Exception e ) {
- }
+ }
+ Charset cs = StandardCharsets.UTF_8;
+ if ( contentEncoding != null ) {
+ try {
+ cs = Charset.forName( contentEncoding );
+ } catch ( Exception e ) {
}
- //LOGGER.debug("Content type is '" + contentType + "', encoding '" + cs.name() + "'");
+ }
- if ( "multipart/form-data".equalsIgnoreCase( contentType ) ) {
- throw new ResponseException( Response.Status.BAD_REQUEST,
- "BAD REQUEST: Content type is multipart/form-data, which is not supported" );
- } else {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- byte pbuf[] = new byte[ 1000 ];
- while ( size > 0 ) {
- int ret = inputStream.read( pbuf, 0, (int)Math.min( size, pbuf.length ) );
- if ( ret <= 0 )
- break;
- if ( ret >= 2 && pbuf[ret - 1] == '\n' && pbuf[ret - 2] == '\r' )
- break;
- size -= ret;
- baos.write( pbuf, 0, ret );
- }
- String postLine = new String( baos.toByteArray(), cs );
- baos.close();
- // Handle application/x-www-form-urlencoded
- if ( "application/x-www-form-urlencoded".equalsIgnoreCase( contentType ) ) {
- decodeParms( postLine, parms );
- } else if ( files != null && postLine.length() != 0 ) {
- // Special case for raw POST data => create a special files entry "postData" with raw content data
- files.put( "postData", postLine );
- }
+ // Use method here so we get the limited/chunked stream if applicable
+ InputStream is = getInputStream();
+ if ( is == null )
+ return;
+
+ if ( "multipart/form-data".equalsIgnoreCase( contentType ) ) {
+ throw new ResponseException( Response.Status.BAD_REQUEST,
+ "BAD REQUEST: Content type is multipart/form-data, which is not supported" );
+ } else if ( "application/x-www-form-urlencoded".equalsIgnoreCase( contentType ) ) {
+ // Handle application/x-www-form-urlencoded
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ byte pbuf[] = new byte[ 2000 ];
+ while ( size > 0 ) {
+ int ret = is.read( pbuf, 0, (int)Math.min( size, pbuf.length ) );
+ if ( ret <= 0 )
+ break;
+ if ( ret >= 2 && pbuf[ret - 1] == '\n' && pbuf[ret - 2] == '\r' )
+ break;
+ size -= ret;
+ baos.write( pbuf, 0, ret );
}
- }
+ String postLine = new String( baos.toByteArray(), cs );
+ baos.close();
+
+ decodeParms( postLine, parms );
+ } // Otherwise leave stream untouched so app can deal with it
}
/**
* Decodes the sent headers and loads the data into Key/value pairs
*/
- private void decodeHeader( BufferedReader in, Map<String, String> pre, Map<String, String> parms,
+ private void decodeHeader( BufferedReader in, Map<String, String> parms,
Map<String, String> headers )
throws ResponseException
{
@@ -1052,7 +1057,7 @@ public abstract class NanoHTTPD implements Runnable
"BAD REQUEST: Syntax error. Usage: GET /example/file.html" );
}
- pre.put( "method", st.nextToken() );
+ String strMethod = st.nextToken();
if ( !st.hasMoreTokens() ) {
throw new ResponseException( Response.Status.BAD_REQUEST,
@@ -1071,21 +1076,28 @@ public abstract class NanoHTTPD implements Runnable
}
// If there's another token, its protocol version,
- // followed by HTTP headers. Ignore version but parse headers.
+ // followed by HTTP headers.
// NOTE: this now forces header names lower case since they are
// case insensitive and vary by client.
if ( st.hasMoreTokens() ) {
+ String strVersion = st.nextToken();
String line = in.readLine();
- while ( line != null && line.trim().length() > 0 ) {
+ while ( line != null && line.length() > 0 ) {
int p = line.indexOf( ':' );
if ( p >= 0 )
headers.put( line.substring( 0, p ).trim().toLowerCase( Locale.US ),
line.substring( p + 1 ).trim() );
line = in.readLine();
}
+ // Add version after, to override headers
+ int sl = strVersion.indexOf( '/' );
+ if ( sl != -1 ) {
+ headers.put( "http-version", strVersion.substring( sl + 1 ) );
+ }
}
- pre.put( "uri", uri );
+ headers.put( "http-uri", uri );
+ headers.put( "http-method", strMethod );
} catch ( IOException ioe ) {
throw new ResponseException( Response.Status.INTERNAL_ERROR,
"SERVER INTERNAL ERROR: IOException: " + ioe.getMessage(), ioe );
@@ -1093,9 +1105,8 @@ public abstract class NanoHTTPD implements Runnable
}
/**
- * Find byte index separating header from body. It must be the last byte
- * of the first two
- * sequential new lines.
+ * Find byte index separating header from body. Return value will point
+ * to first byte after the final \r\n\r\n.
*/
private int findHeaderEnd( final byte[] buf, int rlen )
{
@@ -1107,7 +1118,7 @@ public abstract class NanoHTTPD implements Runnable
}
splitbyte++;
}
- return 0;
+ return -1;
}
/**
@@ -1157,7 +1168,7 @@ public abstract class NanoHTTPD implements Runnable
@Override
public final String getUri()
{
- return uri;
+ return headers.get( "http-uri" );
}
@Override
@@ -1166,9 +1177,36 @@ public abstract class NanoHTTPD implements Runnable
return method;
}
+ @SuppressWarnings( "deprecation" )
@Override
public final InputStream getInputStream()
{
+ if ( method == Method.GET )
+ return null;
+ if ( wrappedInput == null ) {
+ String s = headers.get( "content-length" );
+ if ( s != null ) {
+ long cl = Util.parseLong( s, -1 );
+ if ( cl == 0 )
+ return null;
+ if ( cl < 0 ) {
+ doClose = true;
+ return null;
+ }
+ BoundedInputStream bis = new BoundedInputStream( inputStream, cl );
+ bis.setPropagateClose( false );
+ return wrappedInput = bis;
+ }
+ s = headers.get( "transfer-encoding" );
+ if ( s != null ) {
+ if ( "chunked".equalsIgnoreCase( s.trim() ) )
+ return wrappedInput = new ChunkedInputStream( inputStream );
+ }
+ } else {
+ return wrappedInput;
+ }
+ if ( !doClose )
+ return null;
return inputStream;
}
}
diff --git a/src/main/java/org/openslx/filetransfer/util/ChunkList.java b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
index bd927b1..27f8e8c 100644
--- a/src/main/java/org/openslx/filetransfer/util/ChunkList.java
+++ b/src/main/java/org/openslx/filetransfer/util/ChunkList.java
@@ -1,5 +1,6 @@
package org.openslx.filetransfer.util;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -92,7 +93,7 @@ public class ChunkList
* Get CRC32 list in DNBD3 format. All checksums are little
* endian and prefixed by the crc32 sum of the list itself.
*/
- public synchronized byte[] getDnbd3Crc32List() throws IOException
+ public synchronized byte[] getDnbd3Crc32List() throws IllegalStateException
{
byte buffer[] = new byte[ allChunks.size() * 4 + 4 ]; // 4 byte per chunk plus master
long nextChunkOffset = 0;
@@ -143,7 +144,6 @@ public class ChunkList
* Returns true if this list contains a chunk with state MISSING,
* which means the chunk doesn't have a sha1 known to exist in
* another image.
- * @return
*/
public synchronized boolean hasLocallyMissingChunk()
{
@@ -521,4 +521,21 @@ public class ChunkList
return chunk.sha1sum != null && Arrays.equals( FileChunk.NULL_BLOCK_SHA1, chunk.sha1sum );
}
+ /**
+ * Write DNBD3 CRC32 list to given file.
+ *
+ * @throws IllegalStateException
+ * @throws IOException
+ */
+ public void writeCrc32List( String crcfile ) throws IllegalStateException, IOException
+ {
+ byte[] dnbd3Crc32List = null;
+ dnbd3Crc32List = getDnbd3Crc32List();
+ if ( dnbd3Crc32List != null ) {
+ try ( FileOutputStream fos = new FileOutputStream( crcfile ) ) {
+ fos.write( dnbd3Crc32List );
+ }
+ }
+ }
+
}
diff --git a/src/main/java/org/openslx/filetransfer/util/HashChecker.java b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
index 41bd05a..abbcd35 100644
--- a/src/main/java/org/openslx/filetransfer/util/HashChecker.java
+++ b/src/main/java/org/openslx/filetransfer/util/HashChecker.java
@@ -15,9 +15,11 @@ import org.apache.logging.log4j.Logger;
public class HashChecker
{
public static final int BLOCKING = 1;
- public static final int CALC_HASH = 2;
+ public static final int CHECK_SHA1 = 2;
public static final int CALC_CRC32 = 4;
-
+ public static final int CALC_SHA1 = 8;
+ public static final int NO_SLOW_WARN = 16;
+
private static final Logger LOGGER = LogManager.getLogger( HashChecker.class );
private final BlockingQueue<HashTask> queue;
@@ -27,7 +29,7 @@ public class HashChecker
private final String algorithm;
private boolean invalid = false;
-
+
private final int queueCapacity;
public HashChecker( String algorithm ) throws NoSuchAlgorithmException
@@ -97,11 +99,12 @@ public class HashChecker
public boolean queue( FileChunk chunk, byte[] data, HashCheckCallback callback, int flags ) throws InterruptedException
{
boolean blocking = ( flags & BLOCKING ) != 0;
- boolean doHash = ( flags & CALC_HASH ) != 0;
- boolean doCrc32 = ( flags & CALC_CRC32 ) != 0;
- if ( doHash && chunk.getSha1Sum() == null )
+ boolean checkSha1 = ( flags & CHECK_SHA1 ) != 0;
+ boolean calcCrc32 = ( flags & CALC_CRC32 ) != 0;
+ boolean calcSha1 = ( flags & CALC_SHA1 ) != 0;
+ if ( checkSha1 && chunk.getSha1Sum() == null )
throw new NullPointerException( "Chunk has no sha1 hash" );
- HashTask task = new HashTask( data, chunk, callback, doHash, doCrc32 );
+ HashTask task = new HashTask( data, chunk, callback, checkSha1, calcCrc32, calcSha1 );
synchronized ( threads ) {
if ( invalid ) {
execCallback( task, HashResult.FAILURE );
@@ -133,15 +136,17 @@ public class HashChecker
}
}
}
- if ( doHash ) {
+ if ( checkSha1 ) {
chunk.setStatus( ChunkStatus.HASHING );
}
if ( blocking ) {
long pre = System.currentTimeMillis();
queue.put( task );
- long duration = System.currentTimeMillis() - pre;
- if ( duration > 1000 ) {
- LOGGER.warn( "HashChecker.queue() took " + duration + "ms" );
+ if ( ( flags & NO_SLOW_WARN ) == 0 ) {
+ long duration = System.currentTimeMillis() - pre;
+ if ( duration > 1000 ) {
+ LOGGER.warn( "HashChecker.queue() took " + duration + "ms" );
+ }
}
} else {
if ( !queue.offer( task ) ) {
@@ -158,7 +163,7 @@ public class HashChecker
{
return queue.size();
}
-
+
public int getQueueCapacity()
{
return queueCapacity;
@@ -208,15 +213,19 @@ public class HashChecker
break;
}
HashResult result = HashResult.NONE;
- if ( task.doHash ) {
+ if ( task.checkSha1 || task.calcSha1 ) {
// Calculate digest
- md.update( task.data, 0, task.chunk.range.getLength() );
- byte[] digest = md.digest();
- result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID;
+ md.update( task.data, 0, task.chunk.range.getLength() );
+ byte[] digest = md.digest();
+ if ( task.checkSha1 ) {
+ result = Arrays.equals( digest, task.chunk.getSha1Sum() ) ? HashResult.VALID : HashResult.INVALID;
+ } else {
+ task.chunk.setSha1Sum( digest );
+ }
}
- if ( task.doCrc32 ) {
- // Calculate CRC32
- task.chunk.calculateDnbd3Crc32( task.data );
+ if ( task.calcCrc32 ) {
+ // Calculate CRC32
+ task.chunk.calculateDnbd3Crc32( task.data );
}
execCallback( task, result );
}
@@ -240,16 +249,18 @@ public class HashChecker
public final byte[] data;
public final FileChunk chunk;
public final HashCheckCallback callback;
- public final boolean doHash;
- public final boolean doCrc32;
+ public final boolean checkSha1;
+ public final boolean calcCrc32;
+ public final boolean calcSha1;
- public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean doHash, boolean doCrc32 )
+ public HashTask( byte[] data, FileChunk chunk, HashCheckCallback callback, boolean checkSha1, boolean calcCrc32, boolean calcSha1 )
{
this.data = data;
this.chunk = chunk;
this.callback = callback;
- this.doHash = doHash;
- this.doCrc32 = doCrc32;
+ this.checkSha1 = checkSha1;
+ this.calcCrc32 = calcCrc32;
+ this.calcSha1 = calcSha1;
}
}
diff --git a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
index 8e68dc2..5cca7b8 100644
--- a/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
+++ b/src/main/java/org/openslx/filetransfer/util/IncomingTransferBase.java
@@ -252,7 +252,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
continue;
}
try {
- if ( !hashChecker.queue( chunk, data, this, HashChecker.CALC_HASH ) ) { // false == queue full, stop
+ if ( !hashChecker.queue( chunk, data, this, HashChecker.CHECK_SHA1 ) ) { // false == queue full, stop
chunks.markCompleted( chunk, false );
break;
}
@@ -435,7 +435,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
InterruptedException passEx = null;
if ( hashChecker != null && currentChunk.getSha1Sum() != null ) {
try {
- hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CALC_HASH );
+ hashChecker.queue( currentChunk, buffer, IncomingTransferBase.this, HashChecker.BLOCKING | HashChecker.CHECK_SHA1 );
return true;
} catch ( InterruptedException e ) {
passEx = e;
@@ -650,7 +650,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
return;
}
try {
- int flags = HashChecker.CALC_HASH;
+ int flags = HashChecker.CHECK_SHA1;
if ( blocking ) {
flags |= HashChecker.BLOCKING;
}
@@ -686,7 +686,7 @@ public abstract class IncomingTransferBase extends AbstractTransfer implements H
}
}
- protected HashChecker getHashChecker()
+ public static HashChecker getHashChecker()
{
return hashChecker;
}
diff --git a/src/main/java/org/openslx/util/Util.java b/src/main/java/org/openslx/util/Util.java
index 8593f80..5300cfa 100644
--- a/src/main/java/org/openslx/util/Util.java
+++ b/src/main/java/org/openslx/util/Util.java
@@ -74,7 +74,6 @@ public class Util
*
* @param value string representation to parse to an int
* @param defaultValue fallback value if given string can't be parsed
- * @return
*/
public static int parseInt( String value, int defaultValue )
{
@@ -85,6 +84,23 @@ public class Util
}
}
+ /**
+ * Parse the given String as a base10 long.
+ * If the string does not represent a valid long, return the given
+ * default value.
+ *
+ * @param value string representation to parse to a long
+ * @param defaultValue fallback value if given string can't be parsed
+ */
+ public static long parseLong( String value, long defaultValue )
+ {
+ try {
+ return Long.parseLong( value );
+ } catch ( Exception e ) {
+ return defaultValue;
+ }
+ }
+
public static void safeClose( AutoCloseable... closeable )
{
for ( AutoCloseable c : closeable ) {
@@ -134,4 +150,18 @@ public class Util
return System.nanoTime() / 1000;
}
+ private static final String[] UNITS = new String[] { "B", "KB", "MB", "GB", "TB", "PB", "???" };
+
+ public static String formatBytes( double val )
+ {
+ int unit = 0;
+ while ( val > 1024 ) {
+ val /= 1024;
+ unit++;
+ if (unit >= UNITS.length)
+ break;
+ }
+ return String.format( "%.1f %s", val, UNITS[unit] );
+ }
+
}