由于模式类型错误(JSON 而不是 AVRO),Pulsar 函数无法反序列化消息
Pulsar function fails to deserialize message because of wrong schema type (JSON instead of AVRO)
当 运行 Pulsar 在 docker 中独立时,我们在特定情况下反序列化消息时会遇到这个奇怪的问题。
我们正在使用版本 2.7.1.
我们有一个创建主题和函数的脚本,之后为类型为 JSON 的麻烦主题创建模式。整个模式是正确的,但类型不正确。这是发送任何消息之前的全部内容。
我们还启用了 set-is-allow-auto-update-schema
.
我们称它为 trouble-topic
,它由 2 个来源填充:ValidationFunction
和一个 Spring 引导微服务。
ValidationFunction
验证消息,如果消息有效,它会将映射的消息发送到由 Spring 引导微服务使用的主题,然后对其执行一些逻辑并将其发送到 trouble-topic
,但如果验证失败,它会直接向 trouble-topic
.
发送消息
当使用来自 Spring 引导微服务的 sendAsync
和以下生产者时,架构得到更新,具有 AVRO 作为类型,并且 TroubleFunction
读取 trouble-topic
工作正常之后:
pulsarClient
.newProducer(AvroSchema.of(OurClass.class))
.topic(troubleTopicName))
.create()
但是如果在此之前有些消息验证失败,并且在使用上述Producer之前将消息直接发送到trouble-topic
,我们会得到一个解析异常。我们通过以下方式从函数发送消息:
context.newOutputMessage(troubleTopicName, AvroSchema.of(OurClass.class))
.value(value)
.sendAsync();
由于某种原因,这不会更新模式类型,模式类型仍然是 JSON。我使用 pulsar admin CLI 在每个步骤上验证了模式类型。当这种情况发生在微服务生产者第一次更新模式类型之前时,TroubleFunction
读取 trouble-topic
失败并出现以下错误:
11:43:49.322 [tenant/namespace/TroubleFunction-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [tenant/namespace/TroubleFunction:0] Uncaught exception in Java Instance
org.apache.pulsar.client.api.SchemaSerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 2)): only regular white space (\r, \n, \t) is allowed between tokens
at [Source: (byte[])avro-serialized-msg-i-have-to-hide Parsing exception: cvc-complex-type.2.4.a: Invalid content was found starting with element 'ElementName'. One of '{"foo:bar":ElementName}' is expected."; line: 1, column: 2]
所以我的问题是这两者之间有什么区别,为什么从函数发送消息不能正确更新架构类型?是不是下面用的是同一个Producer?还有没有办法解决这个问题,以便在初始化时设置模式类型,或者至少在从函数发送消息时更新模式类型?
首先,该功劳的功劳。我想有一天这会被很好地记录下来,但现在还没有。我很幸运有一本 EAP 版本的 Apache Pulsar in Action 书,其中这个示例存储库用于展示一些 Pulsar 功能:https://github.com/david-streamlio/GottaEat
我强烈推荐这本书,并为所有使用 Pulsar 的人浏览这些示例,在 pulsar slack 社区上有人提到它昨天从 MEAP 毕业,它应该很快就会有印刷版,所以检查一下出来。也可以考虑加入 Pulsar slack。
答案:
这段代码让我理解了它应该如何工作:
Map<String, ConsumerConfig> inputSpecs = new HashMap<String, ConsumerConfig> ();
inputSpecs.put("persistent://orders/inbound/food-orders",
ConsumerConfig.builder().schemaType("avro").build());
FunctionConfig functionConfig =
FunctionConfig.builder()
...
.inputSpecs(inputSpecs)
...
.build();
Java 代码可用于在使用 LocalRunner 时设置功能,但使用 pulsar admin cli(我们使用的)和 rest api 可以实现相同的配置。您也可以使用函数配置文件,并在配置 yaml 中按以下方式指定它:
inputSpecs:
$topicName:
schemaType: AVRO
$topicName
总是采用以下格式:persistent://tenant/namespace/topic
一旦您为 TroubleFunction
指定了输入规范,就会使用正确的模式类型有效地创建模式,并且反序列化也能完美地工作。
当 运行 Pulsar 在 docker 中独立时,我们在特定情况下反序列化消息时会遇到这个奇怪的问题。 我们正在使用版本 2.7.1.
我们有一个创建主题和函数的脚本,之后为类型为 JSON 的麻烦主题创建模式。整个模式是正确的,但类型不正确。这是发送任何消息之前的全部内容。
我们还启用了 set-is-allow-auto-update-schema
.
我们称它为 trouble-topic
,它由 2 个来源填充:ValidationFunction
和一个 Spring 引导微服务。
ValidationFunction
验证消息,如果消息有效,它会将映射的消息发送到由 Spring 引导微服务使用的主题,然后对其执行一些逻辑并将其发送到 trouble-topic
,但如果验证失败,它会直接向 trouble-topic
.
当使用来自 Spring 引导微服务的 sendAsync
和以下生产者时,架构得到更新,具有 AVRO 作为类型,并且 TroubleFunction
读取 trouble-topic
工作正常之后:
pulsarClient
.newProducer(AvroSchema.of(OurClass.class))
.topic(troubleTopicName))
.create()
但是如果在此之前有些消息验证失败,并且在使用上述Producer之前将消息直接发送到trouble-topic
,我们会得到一个解析异常。我们通过以下方式从函数发送消息:
context.newOutputMessage(troubleTopicName, AvroSchema.of(OurClass.class))
.value(value)
.sendAsync();
由于某种原因,这不会更新模式类型,模式类型仍然是 JSON。我使用 pulsar admin CLI 在每个步骤上验证了模式类型。当这种情况发生在微服务生产者第一次更新模式类型之前时,TroubleFunction
读取 trouble-topic
失败并出现以下错误:
11:43:49.322 [tenant/namespace/TroubleFunction-0] ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - [tenant/namespace/TroubleFunction:0] Uncaught exception in Java Instance
org.apache.pulsar.client.api.SchemaSerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 2)): only regular white space (\r, \n, \t) is allowed between tokens
at [Source: (byte[])avro-serialized-msg-i-have-to-hide Parsing exception: cvc-complex-type.2.4.a: Invalid content was found starting with element 'ElementName'. One of '{"foo:bar":ElementName}' is expected."; line: 1, column: 2]
所以我的问题是这两者之间有什么区别,为什么从函数发送消息不能正确更新架构类型?是不是下面用的是同一个Producer?还有没有办法解决这个问题,以便在初始化时设置模式类型,或者至少在从函数发送消息时更新模式类型?
首先,该功劳的功劳。我想有一天这会被很好地记录下来,但现在还没有。我很幸运有一本 EAP 版本的 Apache Pulsar in Action 书,其中这个示例存储库用于展示一些 Pulsar 功能:https://github.com/david-streamlio/GottaEat
我强烈推荐这本书,并为所有使用 Pulsar 的人浏览这些示例,在 pulsar slack 社区上有人提到它昨天从 MEAP 毕业,它应该很快就会有印刷版,所以检查一下出来。也可以考虑加入 Pulsar slack。
答案:
这段代码让我理解了它应该如何工作:
Map<String, ConsumerConfig> inputSpecs = new HashMap<String, ConsumerConfig> ();
inputSpecs.put("persistent://orders/inbound/food-orders",
ConsumerConfig.builder().schemaType("avro").build());
FunctionConfig functionConfig =
FunctionConfig.builder()
...
.inputSpecs(inputSpecs)
...
.build();
Java 代码可用于在使用 LocalRunner 时设置功能,但使用 pulsar admin cli(我们使用的)和 rest api 可以实现相同的配置。您也可以使用函数配置文件,并在配置 yaml 中按以下方式指定它:
inputSpecs:
$topicName:
schemaType: AVRO
$topicName
总是采用以下格式:persistent://tenant/namespace/topic
一旦您为 TroubleFunction
指定了输入规范,就会使用正确的模式类型有效地创建模式,并且反序列化也能完美地工作。