如何使用来自 Kafka Connect SourceTask 的 AVRO 序列化程序和 Schema Registry

How to use the AVRO serializer with Schema Registry from a Kafka Connect SourceTask

我已经设置了 Confluence 数据平台并开始开发 SourceConnector 并在相应的 SourceTask.poll() 方法中执行以下操作(下面是伪 Java 代码):

    public List<SourceRecord> poll() throws InterruptedException {

....

    Envelope envelope = new Envelope();
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    Encoder enc = EncoderFactory.get().binaryEncoder(out, null);
    DatumWriter<Envelope> dw = new ReflectDatumWriter<Envelope>(Envelope.class);
    dw.write((Envelope)envelope, enc);
    enc.flush();
    out.close();
    Map<String, String> sourcePartition = new HashMap<String, String>();
    sourcePartition.put("stream", streamName);
    Map<String, Integer> sourceOffset = new HashMap<String, Integer>();
    sourceOffset.put("position", Integer.parseInt(envelope.getTimestamp()));
    records.add(new SourceRecord(sourcePartition, sourceOffset, topic, org.apache.kafka.connect.data.Schema.BYTES_SCHEMA, envelope));

....

我想使用架构注册表,以便使用注册表中的架构 ID 标记正在序列化的对象,序列化,然后通过 poll() 函数发布到 Kafka 主题。如果任意对象的模式不在注册表中,我希望它被注册并将相应生成的 ID 返回到序列化程序进程,以便它成为序列化对象的一部分,使其可反序列化。

我需要在上面的代码中做什么才能实现这一点?

要使用 SchemaRegistry,您必须使用 Confluent 提供的 类 serialize/deserialize 您的数据:

  • io.confluent.kafka.serializers.KafkaAvroSerializer
  • io.confluent.kafka.serializers.KafkaAvroDeserializer

那些 类 包含注册和从注册表请求模式的所有逻辑。

如果你使用 maven,你可以添加这个依赖:

<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-avro-serializer</artifactId>
  <version>2.0.1</version>
</dependency>

检查 https://gist.github.com/avpatel257/0a88d20200661b31ab5f5df7adc42e6f 示例实施。

您需要遵循 confluent 的依赖项才能使其正常工作。

    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>common-config</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>common-utils</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-schema-registry-client</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>3.0.0</version>
    </dependency>

根据 documentation:

在 POM 中:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>3.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.11.0.1-cp1</version>
    <scope>provided</scope>
</dependency>

在应用程序中,创建生产者:

Properties props = new Properties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
// Set any other properties
KafkaProducer producer = new KafkaProducer(props);

使用制作人:

User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
Future<RecordAndMetadata> resultFuture = producer.send(user1);

在您的注册表中,对于此示例,您需要 "User" 的架构。

Confluent 还有一个nice example in Github:

package io.confluent.examples.producer;

import JavaSessionize.avro.LogLine;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Random;

public class AvroClicksProducer {

    public static void main(String[] args) throws InterruptedException {
        if (args.length != 1) {
            System.out.println("Please provide command line arguments: schemaRegistryUrl");
            System.exit(-1);
        }

        String schemaUrl = args[0];

        Properties props = new Properties();
        // hardcoding the Kafka server URI for this example
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
        props.put("schema.registry.url", schemaUrl);

        // Hard coding topic too.
        String topic = "clicks";

        // Hard coding wait between events so demo experience will be uniformly nice
        int wait = 500;

        Producer<String, LogLine> producer = new KafkaProducer<String, LogLine>(props);

        // We keep producing new events and waiting between them until someone ctrl-c
        while (true) {
            LogLine event = EventGenerator.getNext();
            System.out.println("Generated event " + event.toString());

            // Using IP as key, so events from same IP will go to same partition
            ProducerRecord<String, LogLine> record = new ProducerRecord<String, LogLine>(topic, event.getIp().toString(), event);
            producer.send(record);
            Thread.sleep(wait);
        }
    }
}