如何序列化来自kafka主题的消息

How to serialize message from kafka topic

我将 axon 与 mongodb 和 kafka 一起使用。 我收到了问题。

如何解决这个问题?

com.thoughtworks.xstream.converters.ConversionException: 
---- Debugging information ----
cause-exception     : com.thoughtworks.xstream.security.ForbiddenClassException
cause-message       : org.apache.kafka.common.TopicPartition
class               : java.util.HashMap
required-type       : java.util.HashMap
converter-type      : com.thoughtworks.xstream.converters.collections.MapConverter
path                : /org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaTrackingToken/positions/m/entry/org.apache.kafka.common.TopicPartition
line number         : 1
class[1]            : java.util.Collections$UnmodifiableMap
required-type[1]    : java.util.Collections$UnmodifiableMap
converter-type[1]   : com.thoughtworks.xstream.converters.reflection.ReflectionConverter
class[2]            : org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaTrackingToken
required-type[2]    : org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaTrackingToken
version             : 1.4.18
-------------------------------

如果我使用这个

axon:
  serializer:
    general: jackson
    events: jackson
    messages: jackson

将错误更改为 org.axonframework.serialization.SerializationException:反序列化对象时出错

如何解决这个问题?

@newoneim,您可能遇到了这个问题,因为您使用的是更新版本的 Java。

最近在 XStream 中发现了一些 CVE,要求其更改其序列化方法。 以前,它只是 de-/serialized 通过反射检查所有内容,假设它可以自由读取所有内容,现在你需要告诉 XStream 它可以通过反射检查哪些 类。

来自 AxonIQ 论坛的

This post 更详细地解释了这种困境。

现在,你能做些什么来解决它? 那么,使用 JacksonSerializer 将是一个简单的步骤。 奇怪的是,我希望您的配置会发现您已经为所有序列化程序设置了 JacksonSerializer

我的直觉是 Mongo 和 Axon Framework 的 Kafka 扩展的组合没有正确设置 MongoTokenStore 上的 JacksonSerializer。请注意,这是一种预感,因为我们缺少整个堆栈跟踪。然而,TokenStore 处理 TrackingTokens。而且,是 KafkaTrackingTokens 未能正确序列化。

请注意,来自 Axon 框架的 Mongo 扩展 没有 具有 Spring 启动自动配置。因此,您在 application.properties 文件中使用的设置不会被您构建的任何 Mongo-extension-specific 类 自动获取。除非你自己给它们接线,否则。