Java 中使用 Apache Beam 的 Kafka Avro 到 BigQuery
Kafka Avro To BigQuery using Apache Beam in Java
场景如下:
Kafka 使用 Apache Beam 到 BigQuery。这是使用 Kafka Connect 的 BigQuerySinkConnector [WePay] 的替代方案。
我已经能够从 Kafka 主题中读取 Avro 消息。我还能够准确地打印内容以进行控制台。我正在寻求有关将这些 KafkaRecords 写入 BigQuery table.
的帮助
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
//Customer is an auto generated class from avro schema using eclipse avro maven plugin
// Read from Kafka Topic and get KafkaRecords
@SuppressWarnings("unchecked")
PTransform<PBegin, PCollection<KafkaRecord<String, Customer>>> input = KafkaIO.<String, Customer>read()
.withBootstrapServers("http://server1:9092")
.withTopic("test-avro")
.withConsumerConfigUpdates(ImmutableMap.of("specific.avro.reader", (Object)"true"))
.withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
.withConsumerConfigUpdates(ImmutableMap.of("schema.registry.url", (Object)"http://server2:8181"))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(Customer.class));
// Print kafka records to console log
pipeline.apply(input)
.apply("ExtractRecord", ParDo.of(new DoFn<KafkaRecord<String,Customer>, KafkaRecord<String,Customer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
KafkaRecord<String, Customer> record = (KafkaRecord<String, Customer>) c.element();
KV<String, Customer> log = record.getKV();
System.out.println("Key Obtained: " + log.getKey());
System.out.println("Value Obtained: " + log.getValue().toString());
c.output(record);
}
}));
// Write each record to BigQuery Table
// Table is already available in BigQuery so create disposition would be CREATE_NEVER
// Records to be appended to table - so write disposition would be WRITE_APPEND
// All fields in the Customer object have corresponding column names and datatypes - so it is one to one mapping
// Connection to BigQuery is through service account JSON file. This file has been set as environment variable in run config of eclipse project
// Set table specification for BigQuery
String bqTable = "my-project:my-dataset:my-table";
当前可用的示例 - 展示了如何手动设置架构并逐个字段分配值。我正在寻找一种自动化的方法来推断 Customer Avro 对象并将其直接分配给列,而无需按字段分配这样的手动字段。
这可能吗?
经过反复试验,我完成了以下工作。
我欢迎评论评论来分享问题/提出更好的解决方案。
SchemaRegistryClient registryClient = new CachedSchemaRegistryClient(http://server2:8181,10);
SchemaMetadata latestSchemaMetadata;
Schema avroSchema = null;
try {
// getLatestSchemaMetadata takes the subject name which is topic-value format where "-value" is suffixed to topic
// so if topic is "test-avro" then subject is "test-avro-value"
latestSchemaMetadata = registryClient.getLatestSchemaMetadata("test-avro-value");
avroSchema = new Schema.Parser().parse(latestSchemaMetadata.getSchema());
} catch (IOException e) {
// TODO Auto-generated catch block
System.out.println("IO Exception while obtaining registry data");
e.printStackTrace();
} catch (RestClientException e) {
// TODO Auto-generated catch block
System.out.println("Client Exception while obtaining registry data");
e.printStackTrace();
}
// Printing avro schema obtained
System.out.println("---------------- Avro schema ----------- " + avroSchema.toString());
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
// Read from Kafka Topic and get KafkaRecords
// Create KafkaIO.Read with Avro schema deserializer
@SuppressWarnings("unchecked")
KafkaIO.Read<String, GenericRecord> read = KafkaIO.<String, GenericRecord>read()
.withBootstrapServers("http://server1:9092")
.withTopic(KafkaConfig.getInputTopic())
.withConsumerConfigUpdates(ImmutableMap.of("schema.registry.url", "http://server2:8181"))
.withConsumerConfigUpdates(ImmutableMap.of("specific.avro.reader", (Object)"true"))
.withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, AvroCoder.of(avroSchema));
// Set Beam Schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(avroSchema);
// Print kafka records to console log
// Write each record to BigQuery Table
// Table is already available in BigQuery so create disposition would be CREATE_NEVER
// Records to be appended to table - so write disposition would be WRITE_APPEND
// All fields in the Customer object have corresponding column names and datatypes - so it is one to one mapping
// Connection to BigQuery is through service account JSON file. This file has been set as environment variable in run config of eclipse project
// Set table specification for BigQuery
String bqTable = "my-project:my-dataset:my-table";
p.apply(read)
.apply("ExtractRecord", ParDo.of(new DoFn<KafkaRecord<String,GenericRecord>, KV<String, GenericRecord>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
KafkaRecord<String, GenericRecord> record = (KafkaRecord<String, GenericRecord>) c.element();
KV<String, GenericRecord> log = record.getKV();
System.out.println("Key Obtained: " + log.getKey());
System.out.println("Value Obtained: " + log.getValue().toString());
c.output(log);
}
}))
.apply(Values.<GenericRecord>create()).setSchema(beamSchema, TypeDescriptor.of(GenericRecord.class) ,AvroUtils.getToRowFunction(GenericRecord.class, avroSchema), AvroUtils.getFromRowFunction(GenericRecord.class))
.apply(BigQueryIO.<GenericRecord>write()
.to(tableSpec)
.useBeamSchema()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
p.run().waitUntilFinish();
以上也适用于 CREATE_IF_NEEDED。
场景如下:
Kafka 使用 Apache Beam 到 BigQuery。这是使用 Kafka Connect 的 BigQuerySinkConnector [WePay] 的替代方案。
我已经能够从 Kafka 主题中读取 Avro 消息。我还能够准确地打印内容以进行控制台。我正在寻求有关将这些 KafkaRecords 写入 BigQuery table.
的帮助PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
//Customer is an auto generated class from avro schema using eclipse avro maven plugin
// Read from Kafka Topic and get KafkaRecords
@SuppressWarnings("unchecked")
PTransform<PBegin, PCollection<KafkaRecord<String, Customer>>> input = KafkaIO.<String, Customer>read()
.withBootstrapServers("http://server1:9092")
.withTopic("test-avro")
.withConsumerConfigUpdates(ImmutableMap.of("specific.avro.reader", (Object)"true"))
.withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
.withConsumerConfigUpdates(ImmutableMap.of("schema.registry.url", (Object)"http://server2:8181"))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(Customer.class));
// Print kafka records to console log
pipeline.apply(input)
.apply("ExtractRecord", ParDo.of(new DoFn<KafkaRecord<String,Customer>, KafkaRecord<String,Customer>>() {
@ProcessElement
public void processElement(ProcessContext c) {
KafkaRecord<String, Customer> record = (KafkaRecord<String, Customer>) c.element();
KV<String, Customer> log = record.getKV();
System.out.println("Key Obtained: " + log.getKey());
System.out.println("Value Obtained: " + log.getValue().toString());
c.output(record);
}
}));
// Write each record to BigQuery Table
// Table is already available in BigQuery so create disposition would be CREATE_NEVER
// Records to be appended to table - so write disposition would be WRITE_APPEND
// All fields in the Customer object have corresponding column names and datatypes - so it is one to one mapping
// Connection to BigQuery is through service account JSON file. This file has been set as environment variable in run config of eclipse project
// Set table specification for BigQuery
String bqTable = "my-project:my-dataset:my-table";
当前可用的示例 - 展示了如何手动设置架构并逐个字段分配值。我正在寻找一种自动化的方法来推断 Customer Avro 对象并将其直接分配给列,而无需按字段分配这样的手动字段。
这可能吗?
经过反复试验,我完成了以下工作。
我欢迎评论评论来分享问题/提出更好的解决方案。
SchemaRegistryClient registryClient = new CachedSchemaRegistryClient(http://server2:8181,10);
SchemaMetadata latestSchemaMetadata;
Schema avroSchema = null;
try {
// getLatestSchemaMetadata takes the subject name which is topic-value format where "-value" is suffixed to topic
// so if topic is "test-avro" then subject is "test-avro-value"
latestSchemaMetadata = registryClient.getLatestSchemaMetadata("test-avro-value");
avroSchema = new Schema.Parser().parse(latestSchemaMetadata.getSchema());
} catch (IOException e) {
// TODO Auto-generated catch block
System.out.println("IO Exception while obtaining registry data");
e.printStackTrace();
} catch (RestClientException e) {
// TODO Auto-generated catch block
System.out.println("Client Exception while obtaining registry data");
e.printStackTrace();
}
// Printing avro schema obtained
System.out.println("---------------- Avro schema ----------- " + avroSchema.toString());
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
// Read from Kafka Topic and get KafkaRecords
// Create KafkaIO.Read with Avro schema deserializer
@SuppressWarnings("unchecked")
KafkaIO.Read<String, GenericRecord> read = KafkaIO.<String, GenericRecord>read()
.withBootstrapServers("http://server1:9092")
.withTopic(KafkaConfig.getInputTopic())
.withConsumerConfigUpdates(ImmutableMap.of("schema.registry.url", "http://server2:8181"))
.withConsumerConfigUpdates(ImmutableMap.of("specific.avro.reader", (Object)"true"))
.withConsumerConfigUpdates(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class, AvroCoder.of(avroSchema));
// Set Beam Schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(avroSchema);
// Print kafka records to console log
// Write each record to BigQuery Table
// Table is already available in BigQuery so create disposition would be CREATE_NEVER
// Records to be appended to table - so write disposition would be WRITE_APPEND
// All fields in the Customer object have corresponding column names and datatypes - so it is one to one mapping
// Connection to BigQuery is through service account JSON file. This file has been set as environment variable in run config of eclipse project
// Set table specification for BigQuery
String bqTable = "my-project:my-dataset:my-table";
p.apply(read)
.apply("ExtractRecord", ParDo.of(new DoFn<KafkaRecord<String,GenericRecord>, KV<String, GenericRecord>>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
KafkaRecord<String, GenericRecord> record = (KafkaRecord<String, GenericRecord>) c.element();
KV<String, GenericRecord> log = record.getKV();
System.out.println("Key Obtained: " + log.getKey());
System.out.println("Value Obtained: " + log.getValue().toString());
c.output(log);
}
}))
.apply(Values.<GenericRecord>create()).setSchema(beamSchema, TypeDescriptor.of(GenericRecord.class) ,AvroUtils.getToRowFunction(GenericRecord.class, avroSchema), AvroUtils.getFromRowFunction(GenericRecord.class))
.apply(BigQueryIO.<GenericRecord>write()
.to(tableSpec)
.useBeamSchema()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
p.run().waitUntilFinish();
以上也适用于 CREATE_IF_NEEDED。