Spring Cloud DataFlow:获取负载作为列表<Map>
Spring Cloud DataFlow: Getting payload as List<Map>
使用 Spring Cloud DataFlow 1.3.0.M2 和 Spring Cloud Stream Starters Celsius.M1。
我有两个处理器。首先产生一个 List<Map>
应该被另一个消耗。这是简化的代码。
// Processor 1
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
// Note: had Object instead of List<> as the return, hoped perhaps using a
// specific type would help, but no difference.
public List<Map<String, Object>> process(final @Payload MyPojo payload) {
final List<Map<String, Object>> results = worker.doWork(payload);
LOG.debug("Returning " + results.size() + " objects");
return results;
}
// Processor 2
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object process(final @Payload List<Map<String, Object>> payload) {
LOG.debug("Received " + payload.size() + " objects");
final List<Map<String, Object>> results = worker.moreWork(payload);
return results;
}
我正在使用 SCDF 在管道中部署这两个处理器 shell:
<source> | otherProcessors | processor1 | processor2 | log
处理器 1 的调试消息说它在列表中有 2 个对象。处理器 2 的调试消息说它收到了 40 个对象(每个映射有 20 个键=值对)——看起来这两个映射被压平成一个键=值对列表。
我为 org.spring.integration
启用了调试日志记录,消息似乎有一个地图格式列表(这是来自处理器 2):
preSend on channel 'input', message: GenericMessage
[payload=[{"m1key1":"val1","m1key2":"val2",...,"m1key20":"val20"},
{"m2key1":"val1","m2key2":"val2",...,"m2key20":"val20"}], headers={..}]
我希望处理器 2 接收处理器 1 生成的 2 个映射。我想知道这是否与泛型类型有关。有人可以指出实现这一目标的配置吗?
---- Artem 评论的更新----
处理器 1 在其 application.properties
文件中有这个:
spring.cloud.stream.bindings.output.content-type=application/json
我也试过像这样修改流定义,但似乎没有什么区别:
<source> | otherProcessors | processor1 --outputType=application/json | processor2 --inputType=application/json | log
好的,您实际上是偶然发现了一个错误:)
这已在 2.0 分支上修复,考虑到它是一个快照,目前有点不稳定。
几天后我们发布后情况应该会更好。
团队正在讨论将修复程序向后移植到 1.3 行的前进道路。
使用 Spring Cloud DataFlow 1.3.0.M2 和 Spring Cloud Stream Starters Celsius.M1。
我有两个处理器。首先产生一个 List<Map>
应该被另一个消耗。这是简化的代码。
// Processor 1
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
// Note: had Object instead of List<> as the return, hoped perhaps using a
// specific type would help, but no difference.
public List<Map<String, Object>> process(final @Payload MyPojo payload) {
final List<Map<String, Object>> results = worker.doWork(payload);
LOG.debug("Returning " + results.size() + " objects");
return results;
}
// Processor 2
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object process(final @Payload List<Map<String, Object>> payload) {
LOG.debug("Received " + payload.size() + " objects");
final List<Map<String, Object>> results = worker.moreWork(payload);
return results;
}
我正在使用 SCDF 在管道中部署这两个处理器 shell:
<source> | otherProcessors | processor1 | processor2 | log
处理器 1 的调试消息说它在列表中有 2 个对象。处理器 2 的调试消息说它收到了 40 个对象(每个映射有 20 个键=值对)——看起来这两个映射被压平成一个键=值对列表。
我为 org.spring.integration
启用了调试日志记录,消息似乎有一个地图格式列表(这是来自处理器 2):
preSend on channel 'input', message: GenericMessage
[payload=[{"m1key1":"val1","m1key2":"val2",...,"m1key20":"val20"},
{"m2key1":"val1","m2key2":"val2",...,"m2key20":"val20"}], headers={..}]
我希望处理器 2 接收处理器 1 生成的 2 个映射。我想知道这是否与泛型类型有关。有人可以指出实现这一目标的配置吗?
---- Artem 评论的更新----
处理器 1 在其 application.properties
文件中有这个:
spring.cloud.stream.bindings.output.content-type=application/json
我也试过像这样修改流定义,但似乎没有什么区别:
<source> | otherProcessors | processor1 --outputType=application/json | processor2 --inputType=application/json | log
好的,您实际上是偶然发现了一个错误:)
这已在 2.0 分支上修复,考虑到它是一个快照,目前有点不稳定。
几天后我们发布后情况应该会更好。
团队正在讨论将修复程序向后移植到 1.3 行的前进道路。