Kafka HDFS Sink Connector Protobuf 未被写入
Kafka HDFS Sink Connector Protobuf not being written
我正在尝试使用 Kafka HDFS 3 接收器连接器将 protobuf 二进制文件写入 HDFS。但是,连接器一直在写入 avro 文件。
我已经使用以下配置设置了我的接收器连接器
{
"name": "hdfs3-connector-test",
"config": {
"connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
"tasks.max": "1",
"topics": "testproto",
"hdfs.url": "hdfs://10.8.0.1:9000",
"flush.size": "3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url":"http://10.8.0.1:8081",
"confluent.topic.bootstrap.servers": "10.8.0.1:9092",
"confluent.topic.replication.factor": "1",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true"
}
}
如您所见,我正在使用 ProtobufConverter 作为值转换器,并且插件已安装。 (ProtobufConverter 是否正在转换为 Avro 文件格式?)。
我还注册了我的架构并使用以下 Java 文件将数据发送到主题:
package app;
import java.util.Properties;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import test.Test.*;
public class App
{
public static void main( String[] args )
{
try {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.8.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer.class.getName());
props.put("schema.registry.url", "http://10.8.0.1:8081");
KafkaProducer<String, MyMsg> producer = new KafkaProducer<String, MyMsg>(props);
String topic = "testproto";
String key = "testkey";
MyMsg m = MyMsg.newBuilder().setF1("testing").build();
ProducerRecord<String, MyMsg> record = new ProducerRecord<String, MyMsg>(topic, key, m);
producer.send(record).get();
producer.close();
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
这是我的原型文件
syntax = "proto3";
package test;
message MyMsg {
string f1 = 1;
}
所以我的问题是,这是正确的吗?我只能使用此连接器将 Avro 文件写入 HDFS 吗?还是我的配置不正确,我应该期待 HDFS 中的 protobuf 文件?
您需要设置format.class
配置,
format.class is
The format class to use when writing data to the store. Format classes implement the io.confluent.connect.storage.format.Format interface.
Type: class
Default: io.confluent.connect.hdfs3.avro.AvroFormat
Importance: high
These classes are available by default:
io.confluent.connect.hdfs3.avro.AvroFormat
io.confluent.connect.hdfs3.json.JsonFormat
io.confluent.connect.hdfs3.parquet.ParquetFormat
io.confluent.connect.hdfs3.string.StringFormat
我正在尝试使用 Kafka HDFS 3 接收器连接器将 protobuf 二进制文件写入 HDFS。但是,连接器一直在写入 avro 文件。
我已经使用以下配置设置了我的接收器连接器
{
"name": "hdfs3-connector-test",
"config": {
"connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
"tasks.max": "1",
"topics": "testproto",
"hdfs.url": "hdfs://10.8.0.1:9000",
"flush.size": "3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url":"http://10.8.0.1:8081",
"confluent.topic.bootstrap.servers": "10.8.0.1:9092",
"confluent.topic.replication.factor": "1",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true"
}
}
如您所见,我正在使用 ProtobufConverter 作为值转换器,并且插件已安装。 (ProtobufConverter 是否正在转换为 Avro 文件格式?)。
我还注册了我的架构并使用以下 Java 文件将数据发送到主题:
package app;
import java.util.Properties;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import test.Test.*;
public class App
{
public static void main( String[] args )
{
try {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.8.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer.class.getName());
props.put("schema.registry.url", "http://10.8.0.1:8081");
KafkaProducer<String, MyMsg> producer = new KafkaProducer<String, MyMsg>(props);
String topic = "testproto";
String key = "testkey";
MyMsg m = MyMsg.newBuilder().setF1("testing").build();
ProducerRecord<String, MyMsg> record = new ProducerRecord<String, MyMsg>(topic, key, m);
producer.send(record).get();
producer.close();
} catch (Exception e) {
System.out.println(e.toString());
}
}
}
这是我的原型文件
syntax = "proto3";
package test;
message MyMsg {
string f1 = 1;
}
所以我的问题是,这是正确的吗?我只能使用此连接器将 Avro 文件写入 HDFS 吗?还是我的配置不正确,我应该期待 HDFS 中的 protobuf 文件?
您需要设置format.class
配置,
format.class is The format class to use when writing data to the store. Format classes implement the io.confluent.connect.storage.format.Format interface.
Type: class Default: io.confluent.connect.hdfs3.avro.AvroFormat Importance: high These classes are available by default:
io.confluent.connect.hdfs3.avro.AvroFormat io.confluent.connect.hdfs3.json.JsonFormat io.confluent.connect.hdfs3.parquet.ParquetFormat io.confluent.connect.hdfs3.string.StringFormat