Kafka HDFS Sink Connector Protobuf 未被写入

Kafka HDFS Sink Connector Protobuf not being written

我正在尝试使用 Kafka HDFS 3 接收器连接器将 protobuf 二进制文件写入 HDFS。但是,连接器一直在写入 avro 文件。

我已经使用以下配置设置了我的接收器连接器

{
    "name": "hdfs3-connector-test",
    "config": {
        "connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
        "tasks.max": "1",
        "topics": "testproto",
        "hdfs.url": "hdfs://10.8.0.1:9000",
        "flush.size": "3",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
        "value.converter.schema.registry.url":"http://10.8.0.1:8081",
        "confluent.topic.bootstrap.servers": "10.8.0.1:9092",
        "confluent.topic.replication.factor": "1",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true"
    }
}

如您所见,我正在使用 ProtobufConverter 作为值转换器,并且插件已安装。 (ProtobufConverter 是否正在转换为 Avro 文件格式?)。

我还注册了我的架构并使用以下 Java 文件将数据发送到主题:

package app;

import java.util.Properties;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import test.Test.*;

public class App 
{
    public static void main( String[] args )
    {
        try { 
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.8.0.1:9092");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer.class.getName());
            props.put("schema.registry.url", "http://10.8.0.1:8081");

            KafkaProducer<String, MyMsg> producer = new KafkaProducer<String, MyMsg>(props);
            String topic = "testproto";
            String key = "testkey";
            MyMsg m = MyMsg.newBuilder().setF1("testing").build();
            ProducerRecord<String, MyMsg> record = new ProducerRecord<String, MyMsg>(topic, key, m);
            producer.send(record).get();
            producer.close();
        } catch (Exception e) {
            System.out.println(e.toString());
        }
    }
}

这是我的原型文件

syntax = "proto3";
package test;

message MyMsg {
    string f1 = 1;
}

所以我的问题是,这是正确的吗?我只能使用此连接器将 Avro 文件写入 HDFS 吗?还是我的配置不正确,我应该期待 HDFS 中的 protobuf 文件?

您需要设置format.class配置,

format.class is The format class to use when writing data to the store. Format classes implement the io.confluent.connect.storage.format.Format interface.

Type: class Default: io.confluent.connect.hdfs3.avro.AvroFormat Importance: high These classes are available by default:

io.confluent.connect.hdfs3.avro.AvroFormat io.confluent.connect.hdfs3.json.JsonFormat io.confluent.connect.hdfs3.parquet.ParquetFormat io.confluent.connect.hdfs3.string.StringFormat

https://docs.confluent.io/kafka-connect-hdfs3-sink/current/configuration_options.html#hdfs3-config-options