Spring DefaultTuple 不可序列化
Spring DefaultTuple not serializable
我正在使用 spring-xd 1.3.0 版本来处理元组消息。
在使用一些点击来丰富消息后,我制作了自己的聚合器来重新组合生成的消息。
现在我想使用 postgreSql 消息存储,以在节点崩溃时保持持久性。
所以我粗略地复制粘贴了原spring-xd聚合器的原xml configuration file
然后我构建并部署了以下流:
stream create aggregate --definition "queue:scoring > scoring-aggregator --store=jdbc --username=${spring.datasource.username} --password=${spring.datasource.password} --driverClassName=${spring.datasource.driverClassName} --url=${spring.datasource.url} --initializeDatabase=false > queue:endAggr"
但是当我向这个流发送我通常的元组消息时,它已被内存存储正确处理,我得到:
xd_container_2 | Caused by: org.springframework.core.serializer.support.SerializationFailedException: Failed to serialize object using DefaultSerializer; nested exception is java.io.NotSerializableException: org.springframework.xd.tuple.DefaultTuple
xd_container_2 | at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:68) ~[spring-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
xd_container_2 | at org.springframework.integration.jdbc.JdbcMessageStore.addMessage(JdbcMessageStore.java:345) ~[spring-integration-jdbc-4.2.4.RELEASE.jar:na]
xd_container_2 | at org.springframework.integration.jdbc.JdbcMessageStore.addMessageToGroup(JdbcMessageStore.java:386) ~[spring-integration-jdbc-4.2.4.RELEASE.jar:na]
xd_container_2 | at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.store(AbstractCorrelatingMessageHandler.java:604) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
xd_container_2 | at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:400) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
xd_container_2 | at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
现在...好吧,我卡住了,不知道如何继续。
感谢任何提示。
元组不是 Serializable
- 我不确定为什么 - 但 XD 在内部使用 Kryo 进行序列化(默认情况下);您可以添加 kryo transformers before/after 聚合器作为解决方法。
编辑
另一种选择是将 Tuple
转换为 json,在 aggregator
上使用 --inputType=application/json
。
参见 type conversion。
聚合器输出将是 JSON 个字符串的集合,将它们返回到 Tuple 将取决于您在聚合器的下游做什么。
我正在使用 spring-xd 1.3.0 版本来处理元组消息。
在使用一些点击来丰富消息后,我制作了自己的聚合器来重新组合生成的消息。
现在我想使用 postgreSql 消息存储,以在节点崩溃时保持持久性。
所以我粗略地复制粘贴了原spring-xd聚合器的原xml configuration file
然后我构建并部署了以下流:
stream create aggregate --definition "queue:scoring > scoring-aggregator --store=jdbc --username=${spring.datasource.username} --password=${spring.datasource.password} --driverClassName=${spring.datasource.driverClassName} --url=${spring.datasource.url} --initializeDatabase=false > queue:endAggr"
但是当我向这个流发送我通常的元组消息时,它已被内存存储正确处理,我得到:
xd_container_2 | Caused by: org.springframework.core.serializer.support.SerializationFailedException: Failed to serialize object using DefaultSerializer; nested exception is java.io.NotSerializableException: org.springframework.xd.tuple.DefaultTuple
xd_container_2 | at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:68) ~[spring-core-4.2.2.RELEASE.jar:4.2.2.RELEASE]
xd_container_2 | at org.springframework.integration.jdbc.JdbcMessageStore.addMessage(JdbcMessageStore.java:345) ~[spring-integration-jdbc-4.2.4.RELEASE.jar:na]
xd_container_2 | at org.springframework.integration.jdbc.JdbcMessageStore.addMessageToGroup(JdbcMessageStore.java:386) ~[spring-integration-jdbc-4.2.4.RELEASE.jar:na]
xd_container_2 | at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.store(AbstractCorrelatingMessageHandler.java:604) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
xd_container_2 | at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:400) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
xd_container_2 | at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
现在...好吧,我卡住了,不知道如何继续。
感谢任何提示。
元组不是 Serializable
- 我不确定为什么 - 但 XD 在内部使用 Kryo 进行序列化(默认情况下);您可以添加 kryo transformers before/after 聚合器作为解决方法。
编辑
另一种选择是将 Tuple
转换为 json,在 aggregator
上使用 --inputType=application/json
。
参见 type conversion。
聚合器输出将是 JSON 个字符串的集合,将它们返回到 Tuple 将取决于您在聚合器的下游做什么。