Spring 集成聚合器基于最后修改的发布策略

Spring Integration aggregator's release strategy based on last modified

我正在尝试实现以下场景:

  1. 我得到一堆具有共同文件模式的文件,即 doc0001_page0001、doc0001_page0002、doc0001_page0003、doc0002_page0001(其中 doc0001 是一个包含以下内容的文档我需要合并 3 页,doc0002 只有 1 页)
  2. 我想以一种方式聚合它们,只有当收集了特定文档的所有文件时我才会发布一个组(拾取 3 个文件后的 doc0001,拾取 1 个文件后的 doc0002)

我的想法是按字母顺序读取文件,最后修改一个组后等待2秒发布(g.getLastModified()小于当前时间负2秒)

我尝试了以下但没有成功:

return IntegrationFlows.from(Files.inboundAdapter(tmpDir.getRoot())
                                  .patternFilter("*.json")
                                  .useWatchService(true)
                                  .watchEvents(FileReadingMessageSource.WatchEventType.CREATE,
                                          FileReadingMessageSource.WatchEventType.MODIFY),
        e -> e.poller(Pollers.fixedDelay(100)
                             .errorChannel("filePollingErrorChannel")))
                       .enrichHeaders(h -> h.headerExpression("CORRELATION_PATTERN", "headers[" + FileHeaders.FILENAME + "].substring(0,7)")) // docxxxx.length()
                       .aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
                                        .releaseStrategy(g -> g.getLastModified() < System.currentTimeMillis() - 2000))                       .channel(MessageChannels.queue("fileReadingResultChannel"))
                       .get();

将发布策略更改为以下内容也不起作用:

.aggregate(a -> a.correlationExpression("headers['CORRELATION_PATTERN']")
                .releaseStrategy(g -> {
                    Stream<Message<?>> stream = g.getMessages()
                                                 .stream();
                    Long timestamp = (Long) stream.skip(stream.count() - 1)
                                                  .findFirst()
                                                  .get()
                                                  .getHeaders()
                                                  .get(MessageHeaders.TIMESTAMP);
                    System.out.println("Timestamp: " + timestamp);
                    return timestamp.longValue() < System.currentTimeMillis() - 2000;

                }))

我是不是误解了发布策略的概念?

此外,是否可以从 releaseStrategy 块中打印出一些内容?我想比较时间戳(参见 System.out.println("Timestamp: " + timestamp);

我用不同的方法找到了解决方案。我仍然不明白为什么上面的那个不起作用。

我还找到了一种更清晰的定义相关函数的方法。

IntegrationFlows.from(Files.inboundAdapter(tmpDir.getRoot())
                                  .patternFilter("*.json")
                                  .useWatchService(true)
                                  .watchEvents(FileReadingMessageSource.WatchEventType.CREATE, FileReadingMessageSource.WatchEventType.MODIFY), e -> e
        .poller(Pollers.fixedDelay(100)))
                       .enrichHeaders(h -> h.headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID, m -> ((String) m
                               .getHeaders()
                               .get(FileHeaders.FILENAME)).substring(0, 17)))
                       .aggregate(a -> a.groupTimeout(2000)
                                        .sendPartialResultOnExpiry(true))
                       .channel(MessageChannels.queue("fileReadingResultChannel"))
                       .get();

是的,由于您不知道消息组的整个序列,您别无选择,除非使用 groupTimeout。常规 releaseStrategy 仅在消息到达聚合器时才起作用。由于在收到一条消息时您没有足够的信息来发布群组,它将永远保留在群组商店中。

聚合器中引入了 groupTimeout 选项,特别是针对此类用例,当我们肯定希望发布一个没有足够消息以正常分组的组时。

您可以考虑使用 groupTimeoutExpression 而不是基于常量的 groupTimeoutMessageGroup 是 SpEL 的根评估上下文对象,因此您将能够访问它所提到的 lastModified

.sendPartialResultOnExpiry(true) 是这里处理的正确选项。

在文档中查看更多信息:https://docs.spring.io/spring-integration/reference/html/#agg-and-group-to