使用 Java DSL 的流程记录请求、响应和总时间

Log Request, Response and Total Time taken by the flow using Java DSL

我正在尝试构建一个以 Http.inboundGateway 开头的流程 -> 做几件事,例如将请求数据存储到数据库、Header 充实、发送到 AMQP 和 returns 带有流状态(成功/失败)。

我有一些事情正在苦苦挣扎,但我无法弄清楚。

1.) 记录请求和响应。

我已经设法记录了 Http.inboundGateway 收到的请求(见下文。不确定这是否是正确的方法,但它有效。请建议有更好的方法来做到这一点).也就是说,我无法获取发送给客户端的响应消息,而且我也不知道如何计算流的事务时间并将其记录到日志文件中。如果有一种方法可以让我在每次交易后打印统计信息,如 "Received : 5, Success : 4, Failed : 1, Average Transaction Time : 250ms ..etc"

,那将非常有用
@Bean
public IntegrationFlow httpInboundGateway()
{
    return IntegrationFlows.from(Http.inboundGateway("/httplistner")
                .requestMapping(requestMapping -> requestMapping.methods(HttpMethod.POST))
                .mappedRequestHeaders("*"))
            .transform(new ObjectToStringTransformer())
            .wireTap(flow -> flow.handle(message -> logger.info(">> Received Request from Caller.\nHeaders : "+message.getHeaders() + "\nPay Load : "+message.getPayload())))
            .channel(httpRequestChannel())
            .get();
}

2.) 如何将日志语句添加到 Spring DSL 流程?

我希望能够将日志语句(用于调试)添加到我的集成 DSL 定义中,这样我就可以查看日志文件并了解发生了什么以及出了什么问题。到目前为止,除了如上面的定义所示在流程中间添加“.wireTap”之外,我无法找到一种方法来做到这一点。请建议是否有更好/正确的方法。

3.) 自定义 "Http.inboundGateway" 发送的响应。

我不知道如何自定义 Http.inboundGateway 流程完成后发送回客户端的 HTTP 响应。我该怎么做,或者您能指点我可以阅读并理解如何做的文档吗?我希望使用 Spring DSL。

错误响应也是如此。如您所见,我没有在 Http.inboundGateway 中添加错误通道。因此,如果在其当前配置中现在发生错误,客户端将获得 500 和完整的堆栈跟踪。我如何获取错误消息并能够根据错误构建自定义响应并将其发送给客户端。示例:如果他们向我发送了 XML 有效负载并且 XML 格式错误,我希望能够向他们发送 HTTP 400,并在响应中提供一些详细信息,表明他们的请求数据格式不正确。

捕获输出的技巧是 log() 后跟一个“无处可及的桥”——之所以命名是因为它没有输出通道——因此框架将结果发送回网关。

给你...

@SpringBootApplication
public class So41990546Application {

    public static void main(String[] args) {
        SpringApplication.run(So41990546Application.class, args);
    }

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from(Http.inboundGateway("/foo")
                    .requestMapping(requestMapping -> requestMapping.methods(HttpMethod.POST))
                    .mappedRequestHeaders("*")
                    .requestPayloadType(String.class))
                .log(Level.INFO, m -> "Inbound: " + m.getPayload())
                .<String, String>transform(String::toUpperCase)
                .log(Level.INFO, m -> "Outbound: " + m.getPayload())
                .bridge(e -> e.id("Bridge.to.nowhere"))
                .get();
    }

    @Bean
    public IntegrationFlow errorsFlow() {
        return IntegrationFlows.from(Http.inboundGateway("/errors")
                    .requestMapping(requestMapping -> requestMapping.methods(HttpMethod.POST))
                    .mappedRequestHeaders("*")
                    .requestPayloadType(String.class)
                    .errorChannel("errors.input"))
                .log(Level.INFO, m -> "Inbound: " + m.getPayload())
                .transform("1 / 0")
                .log(Level.INFO, m -> "Outbound: " + m.getPayload())
                .bridge(e -> e.id("Another.bridge.to.nowhere"))
                .get();
    }

    @Bean
    public IntegrationFlow errors() {
        return f -> f.transform("'Error: ' + payload.cause.message")
                .enrichHeaders(b -> b.header(HttpHeaders.STATUS_CODE, 400))
                .log(Level.INFO) // log the whole message so we can see the status code
                .bridge(e -> e.id("Another.b.t.n"));
    }

}

.log 是在 1.2 中添加的,并且在下面使用了窃听器。

编辑

如果您使用命名频道...

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from(Http.inboundGateway("/foo")
                    .requestMapping(requestMapping -> requestMapping.methods(HttpMethod.POST))
                    .mappedRequestHeaders("*")
                    .requestPayloadType(String.class))
                .channel(namedChannel())
                .log(Level.INFO, m -> "Inbound: " + m.getPayload())
                .<String, String>transform(String::toUpperCase)
                .log(Level.INFO, m -> "Outbound: " + m.getPayload())
                .bridge(e -> e.id("Bridge.to.nowhere"))
                .get();
    }

    public MessageChannel namedChannel() {
        return new DirectChannel();
    }

并启用指标,如 the documentation you can get all kinds of stats from that channel which will include average elapsed time for the downstream flow 中所述。