如何在 Spring 云数据流中使用过滤器?

How to use Filter in Spring Cloud Data Flow?

我想在 Spring 云数据流 (SCDF) 中添加一个过滤器来调试并查看它在收到一些请求时如何工作。

我已经尝试实现 javax.servlet.Filter 并覆盖 doFilter 方法,但它似乎不起作用,因为在我的过滤器 class.

之前已经启动了一些 SCDF 的默认过滤器

有什么方法可以为 SCDF 编写过滤器吗?如果我为此目的应用此 link 是否可行?

这是我的过滤器 class:

@Component
public class CustomFilter implements Filter {


    @Override
    public void destroy() {
        System.out.println("init fillter ");
    }

    @Override
    public void doFilter(ServletRequest arg0, ServletResponse arg1, FilterChain arg2)
            throws IOException, ServletException {
        try {
            HttpServletRequest req = (HttpServletRequest) arg0;
            ContentCachingResponseWrapper responseWrapper = new ContentCachingResponseWrapper(
                    (HttpServletResponse) arg1);
            ResettableStreamHttpServletRequest wrappedRequest = new ResettableStreamHttpServletRequest(
                    (HttpServletRequest) req);
            logger.info(">>>>>link: " + req.getServletPath() + "?");
            for (Entry<String, String[]> entry : req.getParameterMap().entrySet()) {
                logger.info(">>>>>param: " + entry.getKey() + ":" + entry.getValue()[0]);
            }
            String bodyRequest = IOUtils.toString(wrappedRequest.getReader());
            logger.info(">>>>>link: " + req.getServletPath() + "?");
            for (Entry<String, String[]> entry : req.getParameterMap().entrySet()) {
                logger.info(">>>>>param: " + entry.getKey() + ":" + entry.getValue()[0]);
            }

            TransformerFactory transformerFactory = TransformerFactory.newInstance();
            transformerFactory.setAttribute("indent-number", 2);
            javax.xml.transform.Transformer transformer = transformerFactory.newTransformer();
            transformer.setOutputProperty(OutputKeys.INDENT, "yes");
            transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
            try {

                logger.info("Request: " + bodyRequest);
            } catch (Exception e) {
                logger.info("Request: " + bodyRequest);
            }

            wrappedRequest.resetInputStream();
            arg2.doFilter(wrappedRequest, responseWrapper);
            logger.info("Response: " + IOUtils.toString(responseWrapper.getContentInputStream()));

            responseWrapper.copyBodyToResponse();
        } catch (Exception ex) {
            logger.info("doFilter: " + ex);
        }
    }

    @Override
    public void init(FilterConfig arg0) throws ServletException {
        System.out.println("init fillter " + arg0);
    }

    private static class ResettableStreamHttpServletRequest extends HttpServletRequestWrapper {

        private byte[] rawData;
        private HttpServletRequest request;
        private ResettableServletInputStream servletStream;

        public ResettableStreamHttpServletRequest(HttpServletRequest request) {
            super(request);
            this.request = request;
            this.servletStream = new ResettableServletInputStream();
        }

        public void resetInputStream() {
            servletStream.stream = new ByteArrayInputStream(rawData);
        }

        @Override
        public ServletInputStream getInputStream() throws IOException {
            if (rawData == null) {
                rawData = IOUtils.toByteArray(this.request.getReader());
                servletStream.stream = new ByteArrayInputStream(rawData);
            }
            return servletStream;
        }

        @Override
        public BufferedReader getReader() throws IOException {
            if (rawData == null) {
                rawData = IOUtils.toByteArray(this.request.getReader());
                servletStream.stream = new ByteArrayInputStream(rawData);
            }
            return new BufferedReader(new InputStreamReader(servletStream));
        }

        private class ResettableServletInputStream extends ServletInputStream {

            private InputStream stream;

            @Override
            public int read() throws IOException {
                return stream.read();
            }

            @Override
            public boolean isFinished() {
                return false;
            }

            @Override
            public boolean isReady() {
                return false;
            }

            @Override
            public void setReadListener(ReadListener arg0) {

            }
        }
    }
}

Spring 云数据流服务器本质上是一个 Spring 引导应用程序,因此它允许您将特定配置 bean 注入到 extend/customize.

的数据流配置中

在您的情况下,您可以创建一个扩展 javax.servlet.http.HttpFilter 或实现 javax.servlet.Filter 的 bean,并将该 bean 注入到 SCDF 配置中。