Flink 如何使用从 Avro 输入数据推断的模式创建 table
Flink how to create table with the schema inferred from Avro input data
我已经在 Flink 数据集中加载了一个 Avro 文件:
AvroInputFormat<GenericRecord> test = new AvroInputFormat<GenericRecord>(
new Path("PathToAvroFile")
, GenericRecord.class);
DataSet<GenericRecord> DS = env.createInput(test);
usersDS.print();
下面是打印 DS 的结果:
{"N_NATIONKEY": 14, "N_NAME": "KENYA", "N_REGIONKEY": 0, "N_COMMENT": " pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t"}
{"N_NATIONKEY": 15, "N_NAME": "MOROCCO", "N_REGIONKEY": 0, "N_COMMENT": "rns. blithely bold courts among the closely regular packages use furiously bold platelets?"}
{"N_NATIONKEY": 16, "N_NAME": "MOZAMBIQUE", "N_REGIONKEY": 0, "N_COMMENT": "s. ironic, unusual asymptotes wake blithely r"}
{"N_NATIONKEY": 17, "N_NAME": "PERU", "N_REGIONKEY": 1, "N_COMMENT": "platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun"}
{"N_NATIONKEY": 18, "N_NAME": "CHINA", "N_REGIONKEY": 2, "N_COMMENT": "c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos"}
{"N_NATIONKEY": 19, "N_NAME": "ROMANIA", "N_REGIONKEY": 3, "N_COMMENT": "ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account"}
{"N_NATIONKEY": 20, "N_NAME": "SAUDI ARABIA", "N_REGIONKEY": 4, "N_COMMENT": "ts. silent requests haggle. closely express packages sleep across the blithely"}
现在我想使用与 Avro 文件完全相同的架构从 DS 数据集创建一个 table,我的意思是列应该是 N_NATIONKEY、N_NAME、N_REGIONKEY, 和 N_COMMENT.
我知道使用这条线:
tableEnv.registerDataSet("tbTest", DS, "field1, field2, ...");
我可以创建一个 table 并设置列,但我希望从数据中自动推断出列。可能吗?
另外,我试过
tableEnv.registerDataSet("tbTest", DS);
但它创建了一个 table 架构:
root
|-- f0: GenericType<org.apache.avro.generic.GenericRecord>
GenericRecord
是 Table & SQL API 运行时的黑盒,因为字段的数量和它们的数据类型是未定义的。我建议使用 Avro 生成的 class 扩展 SpecificRecord
。这些特定类型也被 Flink 的类型系统识别,您可以使用适当的数据类型正确处理各个字段。
或者,您可以实现 custom UDF 来提取具有适当数据类型 getAvroInt(f0, "myField")
、getAvroString(f0, "myField")
等的字段
一些伪代码:
class AvroStringFieldExtract extends ScalarFunction {
public String eval(GenericRecord r, String fieldName) {
return r.get(fieldName).toString();
}
}
tableEnv.registerFunction("getAvroFieldString", new AvroStringFieldExtract())
我已经在 Flink 数据集中加载了一个 Avro 文件:
AvroInputFormat<GenericRecord> test = new AvroInputFormat<GenericRecord>(
new Path("PathToAvroFile")
, GenericRecord.class);
DataSet<GenericRecord> DS = env.createInput(test);
usersDS.print();
下面是打印 DS 的结果:
{"N_NATIONKEY": 14, "N_NAME": "KENYA", "N_REGIONKEY": 0, "N_COMMENT": " pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t"}
{"N_NATIONKEY": 15, "N_NAME": "MOROCCO", "N_REGIONKEY": 0, "N_COMMENT": "rns. blithely bold courts among the closely regular packages use furiously bold platelets?"}
{"N_NATIONKEY": 16, "N_NAME": "MOZAMBIQUE", "N_REGIONKEY": 0, "N_COMMENT": "s. ironic, unusual asymptotes wake blithely r"}
{"N_NATIONKEY": 17, "N_NAME": "PERU", "N_REGIONKEY": 1, "N_COMMENT": "platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun"}
{"N_NATIONKEY": 18, "N_NAME": "CHINA", "N_REGIONKEY": 2, "N_COMMENT": "c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos"}
{"N_NATIONKEY": 19, "N_NAME": "ROMANIA", "N_REGIONKEY": 3, "N_COMMENT": "ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account"}
{"N_NATIONKEY": 20, "N_NAME": "SAUDI ARABIA", "N_REGIONKEY": 4, "N_COMMENT": "ts. silent requests haggle. closely express packages sleep across the blithely"}
现在我想使用与 Avro 文件完全相同的架构从 DS 数据集创建一个 table,我的意思是列应该是 N_NATIONKEY、N_NAME、N_REGIONKEY, 和 N_COMMENT.
我知道使用这条线:
tableEnv.registerDataSet("tbTest", DS, "field1, field2, ...");
我可以创建一个 table 并设置列,但我希望从数据中自动推断出列。可能吗? 另外,我试过
tableEnv.registerDataSet("tbTest", DS);
但它创建了一个 table 架构:
root
|-- f0: GenericType<org.apache.avro.generic.GenericRecord>
GenericRecord
是 Table & SQL API 运行时的黑盒,因为字段的数量和它们的数据类型是未定义的。我建议使用 Avro 生成的 class 扩展 SpecificRecord
。这些特定类型也被 Flink 的类型系统识别,您可以使用适当的数据类型正确处理各个字段。
或者,您可以实现 custom UDF 来提取具有适当数据类型 getAvroInt(f0, "myField")
、getAvroString(f0, "myField")
等的字段
一些伪代码:
class AvroStringFieldExtract extends ScalarFunction {
public String eval(GenericRecord r, String fieldName) {
return r.get(fieldName).toString();
}
}
tableEnv.registerFunction("getAvroFieldString", new AvroStringFieldExtract())