Spring Cloud DataFlow:获取负载作为列表<Map>

Spring Cloud DataFlow: Getting payload as List<Map>

使用 Spring Cloud DataFlow 1.3.0.M2 和 Spring Cloud Stream Starters Celsius.M1。

我有两个处理器。首先产生一个 List<Map> 应该被另一个消耗。这是简化的代码。

// Processor 1
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
// Note: had Object instead of List<> as the return, hoped perhaps using a
// specific type would help, but no difference.
public List<Map<String, Object>> process(final @Payload MyPojo payload) {
    final List<Map<String, Object>> results = worker.doWork(payload);
    LOG.debug("Returning " + results.size() + " objects");
    return results;
}

// Processor 2
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object process(final @Payload List<Map<String, Object>> payload) {
    LOG.debug("Received " + payload.size() + " objects");
    final List<Map<String, Object>> results = worker.moreWork(payload);
    return results;
}

我正在使用 SCDF 在管道中部署这两个处理器 shell:

<source> | otherProcessors | processor1 | processor2 | log

处理器 1 的调试消息说它在列表中有 2 个对象。处理器 2 的调试消息说它收到了 40 个对象(每个映射有 20 个键=值对)——看起来这两个映射被压平成一个键=值对列表。

我为 org.spring.integration 启用了调试日志记录,消息似乎有一个地图格式列表(这是来自处理器 2):

preSend on channel 'input', message: GenericMessage 
  [payload=[{"m1key1":"val1","m1key2":"val2",...,"m1key20":"val20"},
   {"m2key1":"val1","m2key2":"val2",...,"m2key20":"val20"}], headers={..}]

我希望处理器 2 接收处理器 1 生成的 2 个映射。我想知道这是否与泛型类型有关。有人可以指出实现这一目标的配置吗?

---- Artem 评论的更新----

处理器 1 在其 application.properties 文件中有这个:

spring.cloud.stream.bindings.output.content-type=application/json

我也试过像这样修改流定义,但似乎没有什么区别:

<source> | otherProcessors | processor1 --outputType=application/json | processor2 --inputType=application/json | log

好的,您实际上是偶然发现了一个错误:)

这已在 2.0 分支上修复,考虑到它是一个快照,目前有点不稳定。

几天后我们发布后情况应该会更好。

团队正在讨论将修复程序向后移植到 1.3 行的前进道路。