无法使用 Nifi 和 Schema Registry 使用 Kafka Avro 记录
Unable to consume Kafka Avro records using Nifi and Schema Registry
我正在尝试使用 Nifi 从 Kafka 使用 Avro 记录。我有 3 个主题来自 Amazon Lambda 和 2 个 Spark Streaming 作业,所有这些作业都使用 HortonWorks Schema Registry 来获取 Avro 模式。
我尝试使用 ConsumeKafkaRecord_0_10 和 ConsumeKafkaRecord_2_0 并得到相同的错误:
我尝试使用内部使用纯文本模式的 AvroReader 来确定正在使用的那个,但得到了同样的错误。
当我将 AvroReader 与 Horton Schema Registry 参数一起使用时,出现此错误:
这可能有意义,因为它将记录的第一个字节视为模式的版本参数,第一个字节是 3。但这并不能解释为什么我在放置模式时得到 ArrayIndexOutOfBound纯文本格式。
最后,我可以使用 Spark Streaming 和 Schema Registry 很好地使用这些主题。难道没有人在使用 Kafka 时遇到过 NiFi 和 AvroReader 之间的这种问题吗?
堆栈:Horton Works HDP 3.4.1 // Nifi 1.9.0 // Spark 2.3 // 架构注册表 0.7
此问题与 Nifi 如何解释您的 Avro 消息的第一个字节有关。这些字节包含有关的信息:
- 协议 ID - 1 个字节
- 架构元数据 ID - 8 个字节
- 架构版本 - 4 字节
通过 HortonWork Schema Registry 的代码,我们可以发现不同的协议 ID 可用于使用 AvroSerDe 序列化您的消息。
public static final byte CONFLUENT_VERSION_PROTOCOL = 0x0;
public static final byte METADATA_ID_VERSION_PROTOCOL = 0x1;
public static final byte VERSION_ID_AS_LONG_PROTOCOL = 0x2;
public static final byte VERSION_ID_AS_INT_PROTOCOL = 0x3;
public static final byte CURRENT_PROTOCOL = VERSION_ID_AS_INT_PROTOCOL;
默认使用的是 VERSION_ID_AS_INT_PROTOCOL,这意味着 Avro 消息的第一个字节将是 03。
当通过 Nifi 代码时,我们看到它实际上只使用 METADATA_ID_VERSION_PROTOCOL,期望 01 而没有使用考虑其他任何事情。
创建 SchemaRegistryConfig 时必须强制 Spark 使用 METADATA_ID_VERSION_PROTOCOL。
val config = Map[String, Object](
"schema.registry.url" -> ConfigManager.config.getProperty("schemaregistry.default.url"),
AbstractAvroSnapshotSerializer.SERDES_PROTOCOL_VERSION -> SerDesProtocolHandlerRegistry.METADATA_ID_VERSION_PROTOCOL.asInstanceOf[Object]
)
implicit val srConfig:SchemaRegistryConfig = SchemaRegistryConfig(config)
我正在尝试使用 Nifi 从 Kafka 使用 Avro 记录。我有 3 个主题来自 Amazon Lambda 和 2 个 Spark Streaming 作业,所有这些作业都使用 HortonWorks Schema Registry 来获取 Avro 模式。
我尝试使用 ConsumeKafkaRecord_0_10 和 ConsumeKafkaRecord_2_0 并得到相同的错误:
我尝试使用内部使用纯文本模式的 AvroReader 来确定正在使用的那个,但得到了同样的错误。
当我将 AvroReader 与 Horton Schema Registry 参数一起使用时,出现此错误:
这可能有意义,因为它将记录的第一个字节视为模式的版本参数,第一个字节是 3。但这并不能解释为什么我在放置模式时得到 ArrayIndexOutOfBound纯文本格式。
最后,我可以使用 Spark Streaming 和 Schema Registry 很好地使用这些主题。难道没有人在使用 Kafka 时遇到过 NiFi 和 AvroReader 之间的这种问题吗?
堆栈:Horton Works HDP 3.4.1 // Nifi 1.9.0 // Spark 2.3 // 架构注册表 0.7
此问题与 Nifi 如何解释您的 Avro 消息的第一个字节有关。这些字节包含有关的信息:
- 协议 ID - 1 个字节
- 架构元数据 ID - 8 个字节
- 架构版本 - 4 字节
通过 HortonWork Schema Registry 的代码,我们可以发现不同的协议 ID 可用于使用 AvroSerDe 序列化您的消息。
public static final byte CONFLUENT_VERSION_PROTOCOL = 0x0;
public static final byte METADATA_ID_VERSION_PROTOCOL = 0x1;
public static final byte VERSION_ID_AS_LONG_PROTOCOL = 0x2;
public static final byte VERSION_ID_AS_INT_PROTOCOL = 0x3;
public static final byte CURRENT_PROTOCOL = VERSION_ID_AS_INT_PROTOCOL;
默认使用的是 VERSION_ID_AS_INT_PROTOCOL,这意味着 Avro 消息的第一个字节将是 03。
当通过 Nifi 代码时,我们看到它实际上只使用 METADATA_ID_VERSION_PROTOCOL,期望 01 而没有使用考虑其他任何事情。
创建 SchemaRegistryConfig 时必须强制 Spark 使用 METADATA_ID_VERSION_PROTOCOL。
val config = Map[String, Object](
"schema.registry.url" -> ConfigManager.config.getProperty("schemaregistry.default.url"),
AbstractAvroSnapshotSerializer.SERDES_PROTOCOL_VERSION -> SerDesProtocolHandlerRegistry.METADATA_ID_VERSION_PROTOCOL.asInstanceOf[Object]
)
implicit val srConfig:SchemaRegistryConfig = SchemaRegistryConfig(config)