Kafka Connect S3 接收器在加载 Avro 时抛出 IllegalArgumentException
Kafka Connect S3 sink throws IllegalArgumentException when loading Avro
我正在使用 qubole's S3 sink 以 Parquet 格式将 Avro 数据加载到 S3。
在我的 Java 应用程序中,我创建了一个生产者
Properties props = new Properties();
props.put("bootstrap.servers", KafkaHelper.getServers());
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<byte[], byte[]>(props);
然后将一个GenericRecord
转换成byte[]
格式:
GenericRecord avroRecord = new GenericData.Record(avroSchema);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(avroSchema);
for (Map.Entry<String, ?> entry : map.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
avroRecord.put(key, value);
}
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, recordInjection.apply(avroRecord));
producer.send(record);
我在 Kafka Connect 属性中使用以下值:
key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
以及我的文件接收器属性中的以下配置选项:
connector.class=com.qubole.streamx.s3.S3SinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
当我 运行 连接器时,我收到以下错误消息:'java.lang.IllegalArgumentException: Avro schema must be a record'。
我是 Kafka Connect 的新手,我知道可以设置 Schema Registry 服务器——但我不明白接收器是否需要注册表来将 Avro 数据转换为 Parquet,或者是否这是我这边的某种格式或配置问题。 "record" 在此错误的上下文中指的是哪种数据格式?任何指导或帮助将不胜感激。
ByteArrayConverter
不会进行任何数据转换:它不会实际进行任何 serialization/deserialization,而是假设连接器知道如何处理原始 byte[]
数据。但是,ParquetFormat
(事实上大多数格式)不能只处理原始数据。相反,他们希望数据被反序列化并构造为记录(您可以将其视为 C 结构、POJO 等)。
请注意,qubole streamx 自述文件指出 ByteArrayConverter
在您可以安全地直接复制数据的情况下很有用。例如,如果您的数据为 JSON 或 CSV。这些不需要反序列化,因为每个 Kafka 记录值的字节可以简单地复制到输出文件中。在这些情况下这是一个很好的优化,但并不普遍适用于所有输出文件格式。
我正在使用 qubole's S3 sink 以 Parquet 格式将 Avro 数据加载到 S3。
在我的 Java 应用程序中,我创建了一个生产者
Properties props = new Properties();
props.put("bootstrap.servers", KafkaHelper.getServers());
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<byte[], byte[]>(props);
然后将一个GenericRecord
转换成byte[]
格式:
GenericRecord avroRecord = new GenericData.Record(avroSchema);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(avroSchema);
for (Map.Entry<String, ?> entry : map.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
avroRecord.put(key, value);
}
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, recordInjection.apply(avroRecord));
producer.send(record);
我在 Kafka Connect 属性中使用以下值:
key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
以及我的文件接收器属性中的以下配置选项:
connector.class=com.qubole.streamx.s3.S3SinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
当我 运行 连接器时,我收到以下错误消息:'java.lang.IllegalArgumentException: Avro schema must be a record'。
我是 Kafka Connect 的新手,我知道可以设置 Schema Registry 服务器——但我不明白接收器是否需要注册表来将 Avro 数据转换为 Parquet,或者是否这是我这边的某种格式或配置问题。 "record" 在此错误的上下文中指的是哪种数据格式?任何指导或帮助将不胜感激。
ByteArrayConverter
不会进行任何数据转换:它不会实际进行任何 serialization/deserialization,而是假设连接器知道如何处理原始 byte[]
数据。但是,ParquetFormat
(事实上大多数格式)不能只处理原始数据。相反,他们希望数据被反序列化并构造为记录(您可以将其视为 C 结构、POJO 等)。
请注意,qubole streamx 自述文件指出 ByteArrayConverter
在您可以安全地直接复制数据的情况下很有用。例如,如果您的数据为 JSON 或 CSV。这些不需要反序列化,因为每个 Kafka 记录值的字节可以简单地复制到输出文件中。在这些情况下这是一个很好的优化,但并不普遍适用于所有输出文件格式。