使用 Beam SQL 查询 Avro 架构
Query Avro Schema using Beam SQL
我正在尝试使用 Apache Beam 读取 avro 文件并使用 Beam SQL 转换数据。
我在 Beam 和 Java 中还是新手。这是我的简单代码:
public class BeamSQLReadAvro {
@SuppressWarnings("serial")
public static void main(String[] args) throws IOException {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
/* Schema definition */
Schema schema = new Schema.Parser().parse(new File("data/RATE_CODE/RATE_CODE.avsc"));
/* Create record/row */
PCollection<GenericRecord> records = p.apply(AvroIO.readGenericRecords(schema).from("data/RATE_CODE/*.avro"));
/* SQL Transform */
records.apply("SQL Transform 01",SqlTransform.query("SELECT RCODE,RNAME,RDESC FROM PCOLLECTION LIMIT 10"))
/* Print output */
.apply("Output",
MapElements.via(
new SimpleFunction<Row, Row>() {
@Override
public Row apply(Row input) {
System.out.println("PCOLLECTION: " + input.getValues());
return input;
}
}
)
);
p.run().waitUntilFinish();
}
}
它给我错误
Exception in thread "main" java.lang.IllegalStateException: Cannot call getSchema when there is no schema
我不明白,我定义了一个名为 schema 的变量。这里有什么指示吗?
实际上,您的管道中有两种模式 - Avro 模式和 Beam 模式。 Avro 模式用于解析您的 Avro 输入记录,但对于 SQL 转换,您应该使用具有 Beam 模式的行。为此,AvroIO
提供了一个选项 withBeamSchemas(boolean)
,在您的情况下应将其设置为 true
,例如:
AvroIO.readGenericRecords(schema).withBeamSchemas(true).from("data/RATE_CODE/*.avro")
我正在尝试使用 Apache Beam 读取 avro 文件并使用 Beam SQL 转换数据。
我在 Beam 和 Java 中还是新手。这是我的简单代码:
public class BeamSQLReadAvro {
@SuppressWarnings("serial")
public static void main(String[] args) throws IOException {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);
/* Schema definition */
Schema schema = new Schema.Parser().parse(new File("data/RATE_CODE/RATE_CODE.avsc"));
/* Create record/row */
PCollection<GenericRecord> records = p.apply(AvroIO.readGenericRecords(schema).from("data/RATE_CODE/*.avro"));
/* SQL Transform */
records.apply("SQL Transform 01",SqlTransform.query("SELECT RCODE,RNAME,RDESC FROM PCOLLECTION LIMIT 10"))
/* Print output */
.apply("Output",
MapElements.via(
new SimpleFunction<Row, Row>() {
@Override
public Row apply(Row input) {
System.out.println("PCOLLECTION: " + input.getValues());
return input;
}
}
)
);
p.run().waitUntilFinish();
}
}
它给我错误
Exception in thread "main" java.lang.IllegalStateException: Cannot call getSchema when there is no schema
我不明白,我定义了一个名为 schema 的变量。这里有什么指示吗?
实际上,您的管道中有两种模式 - Avro 模式和 Beam 模式。 Avro 模式用于解析您的 Avro 输入记录,但对于 SQL 转换,您应该使用具有 Beam 模式的行。为此,AvroIO
提供了一个选项 withBeamSchemas(boolean)
,在您的情况下应将其设置为 true
,例如:
AvroIO.readGenericRecords(schema).withBeamSchemas(true).from("data/RATE_CODE/*.avro")