Spring DefaultTuple 不可序列化

Spring DefaultTuple not serializable

我正在使用 spring-xd 1.3.0 版本来处理元组消息。

在使用一些点击来丰富消息后,我制作了自己的聚合器来重新组合生成的消息。

现在我想使用 postgreSql 消息存储,以在节点崩溃时保持持久性。

所以我粗略地复制粘贴了原spring-xd聚合器的原xml configuration file

然后我构建并部署了以下流:

stream create aggregate --definition "queue:scoring > scoring-aggregator --store=jdbc --username=${spring.datasource.username} --password=${spring.datasource.password} --driverClassName=${spring.datasource.driverClassName} --url=${spring.datasource.url} --initializeDatabase=false > queue:endAggr"

但是当我向这个流发送我通常的元组消息时,它已被内存存储正确处理,我得到:

xd_container_2 | Caused by: org.springframework.core.serializer.support.SerializationFailedException: Failed to serialize object using DefaultSerializer; nested exception is java.io.NotSerializableException: org.springframework.xd.tuple.DefaultTuple
xd_container_2 |        at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:68) ~[spring-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
xd_container_2 |        at org.springframework.integration.jdbc.JdbcMessageStore.addMessage(JdbcMessageStore.java:345) ~[spring-integration-jdbc-4.2.4.RELEASE.jar:na]
xd_container_2 |        at org.springframework.integration.jdbc.JdbcMessageStore.addMessageToGroup(JdbcMessageStore.java:386) ~[spring-integration-jdbc-4.2.4.RELEASE.jar:na]
xd_container_2 |        at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.store(AbstractCorrelatingMessageHandler.java:604) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
xd_container_2 |        at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:400) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
xd_container_2 |        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.2.2.RELEASE.jar:na]

现在...好吧,我卡住了,不知道如何继续。

感谢任何提示。

元组不是 Serializable - 我不确定为什么 - 但 XD 在内部使用 Kryo 进行序列化(默认情况下);您可以添加 kryo transformers before/after 聚合器作为解决方法。

编辑

另一种选择是将 Tuple 转换为 json,在 aggregator 上使用 --inputType=application/json

参见 type conversion

聚合器输出将是 JSON 个字符串的集合,将它们返回到 Tuple 将取决于您在聚合器的下游做什么。