Jetty Embedded - PUT Verb - 在 body 到达之前处理 headers

Jetty Embedded - PUT Verb - Process headers before the body arrives

我正在使用 Jetty 9,我正在尝试在所有 body 到达服务器之前处理 PUT 请求的 header。这是我所做的:

Server.java:

public class SimplestServer
{
    public static void main(String[] args) throws Exception
    {
        Server server = new Server(9080);

        ServletHandler handler = new ServletHandler();
        server.setHandler(handler);

        handler.addServletWithMapping(HelloServlet.class, "/*");
        handler.addFilterWithMapping(HelloPrintingFilter.class, "/*", EnumSet.of(DispatcherType.REQUEST));

        server.start();
        server.dumpStdErr();
        server.join();
    }

    public static class HelloServlet extends HttpServlet {
        private static final long serialVersionUID = 1L;

        @Override
        protected void doGet(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
            System.out.println(System.currentTimeMillis() + ": Hello from HelloServlet GET");
        }

        @Override
        protected void doPut(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
            System.out.println(System.currentTimeMillis() + ": Hello from HelloServlet PUT");
        }
    }

    public static class HelloPrintingFilter implements Filter {
        @Override
        public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
        throws IOException, ServletException {
            System.out.println(System.currentTimeMillis() + ": Hello from filter");
            chain.doFilter(request, response);
        }

        @Override
        public void init(FilterConfig arg0) throws ServletException {
            System.out.println(System.currentTimeMillis() + ": Init from filter");
        }

        @Override
        public void destroy() {
            System.out.println(System.currentTimeMillis() + ": Destroy from filter");
        }
    }
}

Client.java

public class SimplestClient
{
    public static void main(String[] args) throws Exception
    {
        URL url = new URL("http://localhost:9080/resource");
        HttpURLConnection httpCon = (HttpURLConnection) url.openConnection();
        httpCon.setDoOutput(true);
        httpCon.setRequestMethod("PUT");
        OutputStream out = httpCon.getOutputStream();
        byte[] b = new byte[65536];
        Random r = new Random();
        r.nextBytes(b);
        for (int i = 0; i < 1024; i++) {
            out.write(b);
        }
        System.out.println(System.currentTimeMillis() + ": Data sent. Waiting 5 seconds...");

        try {
            Thread.sleep(5000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        out.close();
        System.out.println(System.currentTimeMillis() + ": Done!");
        httpCon.getInputStream();
    }
}

简而言之,服务器程序侦听端口 9080 上的连接,当请求到达时过滤器 HelloPrintingFilter 被执行,然后请求由 HelloServlet 处理。相反,客户端连接到服务器,发送一堆数据,然后休眠 5 秒,最后关闭与服务器的连接。

两个程序的 运行 产生以下结果:

客户:

1469613522350: Data sent. Waiting 5 seconds...
1469613527351: Done!

服务器:

1469613527373: Hello from filter
1469613527373: Hello from HelloServlet PUT

查看时间戳,我只能在所有 body 到达后才能执行我的过滤器代码。谁能解释我该怎么做?一个典型的用例是:客户端尝试上传一个 5GB 的文件。 header 到达后,我想检查它们是否正常(例如,通过检查 Content-MD5 header 或我需要检查的任何自定义 header 是否存在).如果请求正常,则开始处理 body。如果请求不正确,则关闭连接。

谢谢。

使用多个请求。例如第一个请求包含自定义 header,后续请求用于上传 5GB 文件。

您没有在您的 HelloServlet.doPut() 中执行任何操作,所以您基本上是在告诉 Servlet 容器(又名 Jetty)您已完成该请求的处理。

Jetty 中的请求处理由来自网络的一系列缓冲区处理。

您的 PUT header 和您的 body 内容的开头可能适合单个缓冲区。

Jetty 将解析出 headers,然后开始将请求分派到 Servlet 链,命中您的 HelloFilter,然后您的过滤器将它与 chain.doFilter(request, response);

到达HelloServlet.doPut()的时间点,header已经处理完毕,body的内容还没有开始,等待你在doPut() 调用 HttpServletRequest.getInputStream() 并开始处理它,此时 Jetty 可以自由地开始从网络读取更多缓冲区。

Note: if your servlet exits without reading the request input stream, and the response hasn't indicated a Connection: close, then Jetty will be forced to read the entire request to completion looking for the next request after it (known as a persistent connection in HTTP/1.1 spec)

您最接近达到拒绝请求 body 内容的既定目标是使用您在 HTTP/1.1 规范中可用的内容(假设这是 HTTP/1.1 请求)。即正确的响应状态代码和服务器发起的 Connection: close 响应 header.

这是一个完整的例子:

package jetty;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Uptime;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class PutRejectExample
{
    public static class RejectServlet extends HttpServlet
    {
        @Override
        protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
        {
            timedLog("doPut() - enter");
            if (req.getHeader("X-Key") == null)
            {
                resp.setHeader("Connection", "close");
                resp.sendError(HttpServletResponse.SC_FORBIDDEN);
                timedLog("doPut() - rejected");
                return;
            }
            
            File output = File.createTempFile("reject-", ".dat");
            try (FileOutputStream out = new FileOutputStream(output))
            {
                IO.copy(req.getInputStream(), out);
            }
            resp.setStatus(HttpServletResponse.SC_OK);
            resp.setHeader("Connection", "close"); // be a good HTTP/1.1 citizen
            timedLog("doPut() - exit");
        }
    }
    
    private static Server server;
    private static int port;
    
    private static void timedLog(String format, Object... args)
    {
        System.out.printf(Uptime.getUptime() + "ms " + format + "%n", args);
    }
    
    @BeforeClass
    public static void startServer() throws Exception
    {
        server = new Server();
        ServerConnector connector = new ServerConnector(server);
        connector.setPort(0);
        server.addConnector(connector);
        
        // collection for handlers
        HandlerCollection handlers = new HandlerCollection();
        server.setHandler(handlers);
        
        // servlet context
        ServletContextHandler context = new ServletContextHandler();
        context.addServlet(RejectServlet.class, "/reject");
        handlers.addHandler(context);
        
        // default handler
        handlers.addHandler(new DefaultHandler());
        
        // start server
        server.start();
        
        // grab port
        port = connector.getLocalPort();
    }
    
    @AfterClass
    public static void stopServer() throws Exception
    {
        server.stop();
    }
    
    private void performPUT(int requestSize, String... extraRequestHeaders) throws IOException
    {
        StringBuilder req = new StringBuilder();
        req.append("PUT /reject HTTP/1.1\r\n");
        req.append("Host: localhost:").append(port).append("\r\n");
        req.append("Content-Length: ").append(requestSize).append("\r\n");
        for (String extraHeader : extraRequestHeaders)
        {
            req.append(extraHeader);
        }
        req.append("\r\n");
        
        timedLog("client open connection");
        try (Socket socket = new Socket())
        {
            socket.connect(new InetSocketAddress("localhost", port));
            
            try (OutputStream out = socket.getOutputStream();
                 InputStream in = socket.getInputStream();
                 InputStreamReader reader = new InputStreamReader(in))
            {
                timedLog("client send request (headers + body)");
                try
                {
                    // write request line + headers
                    byte headerBytes[] = req.toString().getBytes(StandardCharsets.UTF_8);
                    out.write(headerBytes);
                    out.flush();
                    
                    // write put body content
                    int bufSize = 65535;
                    byte[] buf = new byte[bufSize];
                    int sizeLeft = requestSize;
                    while (sizeLeft > 0)
                    {
                        int writeSize = Math.min(sizeLeft, bufSize);
                        ThreadLocalRandom.current().nextBytes(buf);
                        out.write(buf, 0, writeSize);
                        out.flush();
                        sizeLeft -= writeSize;
                        try
                        {
                            // simulate a slower connection
                            TimeUnit.MILLISECONDS.sleep(10);
                        }
                        catch (InterruptedException ignore)
                        {
                            // ignore
                        }
                    }
                }
                catch (IOException e)
                {
                    timedLog("client request send exception");
                    e.printStackTrace(System.out);
                }
                timedLog("client send request complete");
                
                timedLog("client read response");
                try
                {
                    StringWriter respStream = new StringWriter();
                    IO.copy(reader, respStream);
                    
                    timedLog("client response: %s", respStream.toString());
                }
                catch (IOException e)
                {
                    timedLog("client read response exception");
                    e.printStackTrace(System.out);
                }
            }
        }
        timedLog("client connection complete");
    }
    
    @Test
    public void testBadPost() throws IOException
    {
        timedLog("---- testBadPost()");
        performPUT(1024 * 1024 * 10);
    }
    
    @Test
    public void testGoodPost() throws IOException
    {
        timedLog("---- testGoodPost()");
        performPUT(1024 * 1024 * 10, "X-Key: foo\r\n");
    }
}

这使用原始 Socket 和原始流来避免被 HttpUrlConnection 中存在的所有缓冲混淆。

你将看到的正常/快乐情况的输出是这样的...

416ms ---- testGoodPost()
416ms client open connection
2016-07-27 06:40:22.180:INFO:oejs.AbstractConnector:main: Started ServerConnector@55f3ddb1{HTTP/1.1,[http/1.1]}{0.0.0.0:46748}
2016-07-27 06:40:22.181:INFO:oejs.Server:main: Started @414ms
421ms client send request (headers + body)
494ms doPut() - enter
2084ms doPut() - exit
2093ms client send request complete
2093ms client read response
2094ms client response: HTTP/1.1 200 OK
Date: Wed, 27 Jul 2016 13:40:22 GMT
Connection: close
Server: Jetty(9.3.11.v20160721)
2094ms client connection complete

被拒绝案例的输出将如下所示...

2095ms ---- testBadPost()
2095ms client open connection
2096ms client send request (headers + body)
2096ms doPut() - enter
2101ms doPut() - rejected
2107ms client request send exception
java.net.SocketException: Broken pipe
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
    at jetty.PutRejectExample.performPUT(PutRejectExample.java:137)
    at jetty.PutRejectExample.testBadPost(PutRejectExample.java:180)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:50)
2109ms client send request complete
2109ms client read response
2109ms client response: HTTP/1.1 403 Forbidden
Date: Wed, 27 Jul 2016 13:40:23 GMT
Cache-Control: must-revalidate,no-cache,no-store
Content-Type: text/html;charset=iso-8859-1
Content-Length: 322
Connection: close
Server: Jetty(9.3.11.v20160721)

<html>
<head>
<meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/>
<title>Error 403 </title>
</head>
<body>
<h2>HTTP ERROR: 403</h2>
<p>Problem accessing /reject. Reason:
<pre>    Forbidden</pre></p>
<hr /><a href="http://eclipse.org/jetty">Powered by Jetty:// 9.3.11-SNAPSHOT</a><hr/>
</body>
</html>

2109ms client connection complete

明白了!问题不在服务器端,而在客户端,这只是一个存根。具体来说,问题是 HttpUrlConnection.

中的缓冲

回顾我的 client.java,我有:

    for (int i = 0; i < 1024; i++) {
        out.write(b);
    }

如果我将循环更改为

    for (int i = 0; i < 1024*1024; i++) {
        out.write(b);
    }

我立即得到一个OutOfMemoryError异常,在服务器端什么也得不到,表明没有传输单个字节。当然这是对的,因为在将 header 放在电线上之前,HttpUrlConnection 需要知道 body 长度,因为它必须发出 Content-Length header.将客户端实现更改为原始套接字,有效控制字节何时传输,解决了该问题。

附带说明一下,可以通过删除过滤器 class 进一步简化服务器代码。完整的server-side代码是:

server.java:

public class SimplestServer
{
    public static void main(String[] args) throws Exception
    {
        Server server = new Server(9080);

        ServletHandler handler = new ServletHandler();
        server.setHandler(handler);

        handler.addServletWithMapping(HelloServlet.class, "/*");

        server.start();
        server.dumpStdErr();
        server.join();
    }

    public static class HelloServlet extends HttpServlet {
        private static final long serialVersionUID = 1L;

        @Override
        protected void doGet(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
            System.out.println(System.currentTimeMillis() + ": Hello from HelloServlet GET");
        }

        @Override
        protected void doPut(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
            System.out.println(System.currentTimeMillis() + ": Hello from HelloServlet PUT");

            // Perform some checks here
            if (request.getHeader("X-Key") == null)
            {
                response.setHeader("Connection", "close");
                response.sendError(HttpServletResponse.SC_FORBIDDEN);
                System.out.println(System.currentTimeMillis() + ": Filter --> X-Key failed!");
                return;
            }

            // Everything OK! Read the stream.
            System.out.println(System.currentTimeMillis() + ": Proceded!!");
            InputStream body = request.getInputStream();
            long bytesReadSoFar = 0;
            byte[] data = new byte[65536];
            while (true) {
                int bytesRead = body.read(data);
                if (bytesRead < 0)
                    break;
                bytesReadSoFar += bytesRead;
            }
            System.out.println(System.currentTimeMillis() + ": Finished! Read " + bytesReadSoFar + " bytes.");
            response.setHeader("Connection", "close");
            response.setStatus(HttpServletResponse.SC_OK);
        }
    }
}