将 JSON 字符串序列化为 AVRO Apache Beam KafkaIO

Serialize JSON String to AVRO Apache Beam KafkaIO

我有一个 JSON 字符串需要序列化为 AVRO 格式,以便我可以将其发布到具有 Avro 模式并对其进行模式验证的 Kafka 主题。下面是我到目前为止尝试过的方法,但它给出了验证错误。

@Override     
public PDone expand(PCollection<Product> input) {
    return input.apply("Write to Kafka Topic", KafkaIO.<Void, String>write().
                withBootstrapServers(kafkaUrl)
                .withTopic(kafkaTopic)
//                .withKeySerializer(StringSerializer.class)
                .withValueSerializer(StringSerializer.class)

我得到的错误是

send failed : 'This record has failed the validation on broker and hence be rejected.'

这是序列化字符串。 UTF-8 字符串不是二进制 Avro。

您需要将 ParDo, or MapElements 从 Strings/JSON 应用到 input 到 Avro 记录类型,例如 GenericRecord,或生成的 SpecificRecord class.

例如,

// Product is defined by an AVSC file and generated from avro-maven-plugin
pipeline
.apply(MapElements.via(new SimpleFunction<JSONProduct, Product>() {
  @Override
  public Product apply(JSONProduct input) {
    try {
      return AvroConverterFactory.convertProduct(input);  // TODO: Implement this yourself
    } catch (Exception e) {
      log.error("Converter Error", e);
      return null;
    }
  }
}))
.apply(Filter.by(Objects::nonNull)) // assuming you don't want nulls downstream

那么你应该可以使用这样的东西

注意:它似乎可以编译,但确实会抛出一个运行时错误,指出 class 无法转换...似乎是一个错误。

  @Override
  public POutput expand(PCollection<Product> input) {
    //noinspection unchecked
    return input.apply("Write to Kafka", KafkaIO.<Void, Product>write()
        .withBootstrapServers("localhost:29092")
        .withTopic("foobar")
        .withValueSerializer((Class<? extends Serializer<Product>>) KafkaAvroSerializer.class)
        .withProducerConfigUpdates(ImmutableMap.of(
            SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"
        )).values());
  }

(不幸的)解决方法是定义您自己的包装器 class

  public static class ProductSerializer implements Serializer<Product> {

    private final KafkaAvroSerializer inner;

    public ProductSerializer() {
      inner = new KafkaAvroSerializer();
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
      inner.configure(configs, isKey);
    }

    @Override
    public byte[] serialize(String topic, Product product) {
      return inner.serialize(topic, product);
    }

  }

这样就可以了

  @Override
  public POutput expand(PCollection<Product> input) {
    return input.apply("Write to Kafka", KafkaIO.<Void, Product>write()
        .withBootstrapServers("localhost:9092")
        .withTopic("foobar")
        .withValueSerializer(ProductSerializer.class)
        .withProducerConfigUpdates(ImmutableMap.of(
            SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"
        )).values());
  }

我使用了一个非常简单的 Avro 模式,但是

$ kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic foobar --from-beginning
{"name":"Hello"}
{"name":"World"}