Spring kstreams 无法让处理器工作 - class '[B' 不在受信任的包中
Spring kstreams cannot get processor to work - The class '[B' is not in the trusted packages
完整代码:https://github.com/BenedictWHD/kstreams-example
所以我有一个生产者(data-ingest
),一个处理器(external-message-processor
)和一个消费者(internal-message-processor
(一旦我开始工作,这将成为一个处理器,所以很抱歉目前的命名,但它是一个消费者))。
据我所知,data-ingest
的工作原理是它向主题 external_messages
发送消息。
external-message-processor
尝试读取该主题但失败并显示以下内容:
Caused by: java.lang.IllegalArgumentException: The class '[B' is not in the trusted packages: [java.util, java.lang, com.yetti.common.externalmessage, com.yetti.common.externalmessage.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
关于主题的消息示例:
Headers: __TypeId__: [B, contentType: application/json, spring_json_header_types: {"contentType":"java.lang.String"}"eyJpZCI6IjE4ZGQ2ODc4LWYwNWQtNDJiOC1iYTdlLTU2MDhmMTkzOWU3YyIsImV4dGVybmFsTWVzc2FnZVNvdXJjZSI6IlNNUyIsIm1lc3NhZ2VUeXBlIjoiVFJBTlNBQ1RJT04iLCJudW1iZXJGcm9tIjoiMSIsIm51bWJlclRvIjoiMiIsImNjeSI6Ik5UVEwiLCJxdWFudGl0eSI6IjIuNSJ9"
如您所见,TypeId 出于某种原因是“[B]”。
我已指定所有 3 个应用程序使用以下序列化器和反序列化器:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
当应用程序在生产者和消费者配置中启动时,只有 data-ingest
一个似乎真正使用了正确的序列化程序,我们有这个:
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
根据我在 application.yml 文件中看到的内容,一切都应该如此,所以我不知道为什么它没有使用我指定的 serializer/deserializer 以及为什么处理器是无法阅读主题的消息?
非常感谢您的帮助,因为我已经为此绞尽脑汁好几天了。
开头的回购包含所有配置文件、poms 和 docker-composer 文件 运行 this.
编辑 - 处理器配置:
spring.cloud.stream:
function:
definition: processExternalMessage
bindings:
processExternalMessage-in-0:
destination: external_messages
processExternalMessage-out-0:
destination: internal_messages
kafka:
bindings:
processExternalMessage-out-0:
producer:
configuration:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
processExternalMessage-in-0:
consumer:
configuration:
value:
deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
生产者配置:
spring.cloud.stream:
function:
definition: externalMessageProducer
bindings:
externalMessageProducer-out-0:
destination: external_messages
kafka:
bindings:
externalMessageProducer-out-0:
producer:
configuration:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
value.serializer
是平面配置 属性 名称。 value
不是具有 serializer
字段的嵌套对象,在 YAML 术语中
这就是另一个似乎有效的原因
还值得一提的是Kstreams uses serde
properties, not serializers directly
完整代码:https://github.com/BenedictWHD/kstreams-example
所以我有一个生产者(data-ingest
),一个处理器(external-message-processor
)和一个消费者(internal-message-processor
(一旦我开始工作,这将成为一个处理器,所以很抱歉目前的命名,但它是一个消费者))。
据我所知,data-ingest
的工作原理是它向主题 external_messages
发送消息。
external-message-processor
尝试读取该主题但失败并显示以下内容:
Caused by: java.lang.IllegalArgumentException: The class '[B' is not in the trusted packages: [java.util, java.lang, com.yetti.common.externalmessage, com.yetti.common.externalmessage.*]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
关于主题的消息示例:
Headers: __TypeId__: [B, contentType: application/json, spring_json_header_types: {"contentType":"java.lang.String"}"eyJpZCI6IjE4ZGQ2ODc4LWYwNWQtNDJiOC1iYTdlLTU2MDhmMTkzOWU3YyIsImV4dGVybmFsTWVzc2FnZVNvdXJjZSI6IlNNUyIsIm1lc3NhZ2VUeXBlIjoiVFJBTlNBQ1RJT04iLCJudW1iZXJGcm9tIjoiMSIsIm51bWJlclRvIjoiMiIsImNjeSI6Ik5UVEwiLCJxdWFudGl0eSI6IjIuNSJ9"
如您所见,TypeId 出于某种原因是“[B]”。
我已指定所有 3 个应用程序使用以下序列化器和反序列化器:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
当应用程序在生产者和消费者配置中启动时,只有 data-ingest
一个似乎真正使用了正确的序列化程序,我们有这个:
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
根据我在 application.yml 文件中看到的内容,一切都应该如此,所以我不知道为什么它没有使用我指定的 serializer/deserializer 以及为什么处理器是无法阅读主题的消息?
非常感谢您的帮助,因为我已经为此绞尽脑汁好几天了。
开头的回购包含所有配置文件、poms 和 docker-composer 文件 运行 this.
编辑 - 处理器配置:
spring.cloud.stream:
function:
definition: processExternalMessage
bindings:
processExternalMessage-in-0:
destination: external_messages
processExternalMessage-out-0:
destination: internal_messages
kafka:
bindings:
processExternalMessage-out-0:
producer:
configuration:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
processExternalMessage-in-0:
consumer:
configuration:
value:
deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
生产者配置:
spring.cloud.stream:
function:
definition: externalMessageProducer
bindings:
externalMessageProducer-out-0:
destination: external_messages
kafka:
bindings:
externalMessageProducer-out-0:
producer:
configuration:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
value.serializer
是平面配置 属性 名称。 value
不是具有 serializer
字段的嵌套对象,在 YAML 术语中
这就是另一个似乎有效的原因
还值得一提的是Kstreams uses serde
properties, not serializers directly