使用 KSQLDB 时忽略 JSON 前面的字节顺序标记或二进制垃圾

Ignoring Byte Order Mark or binary trash in front of JSON when using KSQLDB

我正在使用 io.confluent.connect.json.JsonSchemaConverter 作为我的值转换器来生成关于模式注册表中模式的主题的有效负载。

这导致在我尝试创建 ksqldb 流时出现错误消息。日志条目显示:

Invalid UTF-32 character 0x17a2249 (above 0x0010ffff) at char #1, byte #7)

消息的前几个十六进制值如下:

请记住,我有一堆消息,屏幕截图和错误消息来自两个不同的偏移量。

ksqldb 似乎认为这可能是 BOM,因为错误消息提到 0x0010fff。 JSON (RFC4627) 不允许 BOM,对我来说这看起来就像明显的二进制垃圾。 我认为 JsonSchemaConverter 可能正在做的是在数据前面加上写入架构注册表的架构版本,在本例中为 1

有没有办法告诉 ksqldb 尝试跳过 JSON 消息的某些部分。

或者我做的根本错误与我在二进制级别上看到的内容无关?

编辑: 似乎值得一提的是我想要一个架构。我通过使用 VALUE_FORMAT='JSON_SR' 向 ksqldb 表明了这一点,这似乎可行,而且我还使用了 JsonSchemaConverter,因此编写了模式。

这不是 JSON 记录。这是一个架构注册表消息,包括一个 JSON 架构和一个值。 Magic Byte (0x0) + 4 byte int (Schema Id 1),然后是剩余的数据。

您的 Connect 属性需要 value.converter.schemas.enabled=false 才能删除它。

否则,如果你想保留一个schema,AvroConverter在兼容性方面会更好。然后您将在 KSQL 中自动拥有一个模式,而不需要解析出各个字段。

或者,我认为 FORMAT="JSON" 不是您在 KSQL 中想要的,因为它假定没有模式的记录。

请参阅此处注释:https://docs.ksqldb.io/en/latest/developer-guide/serialization/#json