将 JSON 字符串序列化为 AVRO Apache Beam KafkaIO
Serialize JSON String to AVRO Apache Beam KafkaIO
我有一个 JSON 字符串需要序列化为 AVRO 格式,以便我可以将其发布到具有 Avro 模式并对其进行模式验证的 Kafka 主题。下面是我到目前为止尝试过的方法,但它给出了验证错误。
@Override
public PDone expand(PCollection<Product> input) {
return input.apply("Write to Kafka Topic", KafkaIO.<Void, String>write().
withBootstrapServers(kafkaUrl)
.withTopic(kafkaTopic)
// .withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class)
我得到的错误是
send failed : 'This record has failed the validation on broker and hence be rejected.'
这是序列化字符串。 UTF-8 字符串不是二进制 Avro。
您需要将 ParDo
, or MapElements
从 Strings/JSON 应用到 input
到 Avro 记录类型,例如 GenericRecord
,或生成的 SpecificRecord
class.
例如,
// Product is defined by an AVSC file and generated from avro-maven-plugin
pipeline
.apply(MapElements.via(new SimpleFunction<JSONProduct, Product>() {
@Override
public Product apply(JSONProduct input) {
try {
return AvroConverterFactory.convertProduct(input); // TODO: Implement this yourself
} catch (Exception e) {
log.error("Converter Error", e);
return null;
}
}
}))
.apply(Filter.by(Objects::nonNull)) // assuming you don't want nulls downstream
那么你应该可以使用这样的东西
注意:它似乎可以编译,但确实会抛出一个运行时错误,指出 class 无法转换...似乎是一个错误。
@Override
public POutput expand(PCollection<Product> input) {
//noinspection unchecked
return input.apply("Write to Kafka", KafkaIO.<Void, Product>write()
.withBootstrapServers("localhost:29092")
.withTopic("foobar")
.withValueSerializer((Class<? extends Serializer<Product>>) KafkaAvroSerializer.class)
.withProducerConfigUpdates(ImmutableMap.of(
SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"
)).values());
}
(不幸的)解决方法是定义您自己的包装器 class
public static class ProductSerializer implements Serializer<Product> {
private final KafkaAvroSerializer inner;
public ProductSerializer() {
inner = new KafkaAvroSerializer();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.configure(configs, isKey);
}
@Override
public byte[] serialize(String topic, Product product) {
return inner.serialize(topic, product);
}
}
这样就可以了
@Override
public POutput expand(PCollection<Product> input) {
return input.apply("Write to Kafka", KafkaIO.<Void, Product>write()
.withBootstrapServers("localhost:9092")
.withTopic("foobar")
.withValueSerializer(ProductSerializer.class)
.withProducerConfigUpdates(ImmutableMap.of(
SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"
)).values());
}
我使用了一个非常简单的 Avro 模式,但是
$ kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic foobar --from-beginning
{"name":"Hello"}
{"name":"World"}
我有一个 JSON 字符串需要序列化为 AVRO 格式,以便我可以将其发布到具有 Avro 模式并对其进行模式验证的 Kafka 主题。下面是我到目前为止尝试过的方法,但它给出了验证错误。
@Override
public PDone expand(PCollection<Product> input) {
return input.apply("Write to Kafka Topic", KafkaIO.<Void, String>write().
withBootstrapServers(kafkaUrl)
.withTopic(kafkaTopic)
// .withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class)
我得到的错误是
send failed : 'This record has failed the validation on broker and hence be rejected.'
这是序列化字符串。 UTF-8 字符串不是二进制 Avro。
您需要将 ParDo
, or MapElements
从 Strings/JSON 应用到 input
到 Avro 记录类型,例如 GenericRecord
,或生成的 SpecificRecord
class.
例如,
// Product is defined by an AVSC file and generated from avro-maven-plugin
pipeline
.apply(MapElements.via(new SimpleFunction<JSONProduct, Product>() {
@Override
public Product apply(JSONProduct input) {
try {
return AvroConverterFactory.convertProduct(input); // TODO: Implement this yourself
} catch (Exception e) {
log.error("Converter Error", e);
return null;
}
}
}))
.apply(Filter.by(Objects::nonNull)) // assuming you don't want nulls downstream
那么你应该可以使用这样的东西
注意:它似乎可以编译,但确实会抛出一个运行时错误,指出 class 无法转换...似乎是一个错误。
@Override
public POutput expand(PCollection<Product> input) {
//noinspection unchecked
return input.apply("Write to Kafka", KafkaIO.<Void, Product>write()
.withBootstrapServers("localhost:29092")
.withTopic("foobar")
.withValueSerializer((Class<? extends Serializer<Product>>) KafkaAvroSerializer.class)
.withProducerConfigUpdates(ImmutableMap.of(
SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"
)).values());
}
(不幸的)解决方法是定义您自己的包装器 class
public static class ProductSerializer implements Serializer<Product> {
private final KafkaAvroSerializer inner;
public ProductSerializer() {
inner = new KafkaAvroSerializer();
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.configure(configs, isKey);
}
@Override
public byte[] serialize(String topic, Product product) {
return inner.serialize(topic, product);
}
}
这样就可以了
@Override
public POutput expand(PCollection<Product> input) {
return input.apply("Write to Kafka", KafkaIO.<Void, Product>write()
.withBootstrapServers("localhost:9092")
.withTopic("foobar")
.withValueSerializer(ProductSerializer.class)
.withProducerConfigUpdates(ImmutableMap.of(
SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"
)).values());
}
我使用了一个非常简单的 Avro 模式,但是
$ kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic foobar --from-beginning
{"name":"Hello"}
{"name":"World"}