如何使用数据流在嵌套数组中加载数据
How to load data in nested array using dataflow
我正在尝试将数据加载到下面 table。我能够在“array_data”中加载数据。
但是如何在嵌套数组“inside_array”中加载数据。我已经尝试将注释部分的数据加载到 inside_array 数组中,但它没有用。
enter image description here
这是我的代码。-
管道 p = Pipeline.create(选项);
org.apache.beam.sdk.values.PCollection<TableRow> output = p.apply(org.apache.beam.sdk.transforms.Create.of("temp"))
.apply("O/P",ParDo.of(new DoFn<String, TableRow>() {
/**
*
*/
private static final long serialVersionUID = 307542945272055650L;
@ProcessElement
public void processElemet(ProcessContext c) {
TableRow row = new TableRow();
row.set("name","Jack");
row.set("phone","9874563210");
TableRow ip = new TableRow().set("address", "M G Road").set("email","abc@gmail.com");
TableRow ip1 = new TableRow().set("address","F C Road").set("email","xyz@gmail.com");
java.util.List<TableRow> metadata = new ArrayList<TableRow>();
metadata.add(ip);
metadata.add(ip1);
row.set("array_data",metadata);
LOG.info("O/P:"+row);
c.output(row);
}}));
output.apply("Write to table",BigQueryIO.writeTableRows().withoutValidation().to("AA.nested_array")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
p.run();
任何人都有任何线索或提前suggestion.Thanks。
要使用数据流处理嵌套数组,请创建一个单独的列表并将其添加到 table 行的主数组中。
这里我尝试了这种方式,得到了预期的输出。
管道 p = Pipeline.create(选项);
org.apache.beam.sdk.values.PCollection 输出 = p.apply(org.apache.beam.sdk.transforms.Create.of("temp"))
.apply("O/P",ParDo.of(new DoFn() {
@ProcessElement
public void processElemet(ProcessContext c) {
TableRow row = new TableRow();
row.set("name","Jack");
row.set("phone","9874563210");
List<TableRow> listDest = new ArrayList<>();
TableRow t=new TableRow().set("detail1","one" ).set("detail2", "two");
TableRow t1=new TableRow().set("detail1","three" ).set("detail2", "four");
listDest.add(t);
listDest.add(t1);
TableRow ip = new TableRow().set("address", "M G Road").set("email","abc@gmail.com").set("inside_array", listDest);
TableRow ip1 = new TableRow().set("address","F C Road").set("email","xyz@gmail.com").set("inside_array", listDest);
java.util.List<TableRow> metadata = new ArrayList<TableRow>();
metadata.add(ip);
metadata.add(ip1);
row.set("array_data",metadata);
LOG.info("O/P:"+row);
c.output(row);
}}));
同时添加 table 的图像和数据。
希望如果有人正在研究同类 table。
会有所帮助
我正在尝试将数据加载到下面 table。我能够在“array_data”中加载数据。 但是如何在嵌套数组“inside_array”中加载数据。我已经尝试将注释部分的数据加载到 inside_array 数组中,但它没有用。 enter image description here
这是我的代码。- 管道 p = Pipeline.create(选项);
org.apache.beam.sdk.values.PCollection<TableRow> output = p.apply(org.apache.beam.sdk.transforms.Create.of("temp"))
.apply("O/P",ParDo.of(new DoFn<String, TableRow>() {
/**
*
*/
private static final long serialVersionUID = 307542945272055650L;
@ProcessElement
public void processElemet(ProcessContext c) {
TableRow row = new TableRow();
row.set("name","Jack");
row.set("phone","9874563210");
TableRow ip = new TableRow().set("address", "M G Road").set("email","abc@gmail.com");
TableRow ip1 = new TableRow().set("address","F C Road").set("email","xyz@gmail.com");
java.util.List<TableRow> metadata = new ArrayList<TableRow>();
metadata.add(ip);
metadata.add(ip1);
row.set("array_data",metadata);
LOG.info("O/P:"+row);
c.output(row);
}}));
output.apply("Write to table",BigQueryIO.writeTableRows().withoutValidation().to("AA.nested_array")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
p.run();
任何人都有任何线索或提前suggestion.Thanks。
要使用数据流处理嵌套数组,请创建一个单独的列表并将其添加到 table 行的主数组中。
这里我尝试了这种方式,得到了预期的输出。
管道 p = Pipeline.create(选项);
org.apache.beam.sdk.values.PCollection 输出 = p.apply(org.apache.beam.sdk.transforms.Create.of("temp"))
.apply("O/P",ParDo.of(new DoFn
@ProcessElement
public void processElemet(ProcessContext c) {
TableRow row = new TableRow();
row.set("name","Jack");
row.set("phone","9874563210");
List<TableRow> listDest = new ArrayList<>();
TableRow t=new TableRow().set("detail1","one" ).set("detail2", "two");
TableRow t1=new TableRow().set("detail1","three" ).set("detail2", "four");
listDest.add(t);
listDest.add(t1);
TableRow ip = new TableRow().set("address", "M G Road").set("email","abc@gmail.com").set("inside_array", listDest);
TableRow ip1 = new TableRow().set("address","F C Road").set("email","xyz@gmail.com").set("inside_array", listDest);
java.util.List<TableRow> metadata = new ArrayList<TableRow>();
metadata.add(ip);
metadata.add(ip1);
row.set("array_data",metadata);
LOG.info("O/P:"+row);
c.output(row);
}}));
同时添加 table 的图像和数据。
希望如果有人正在研究同类 table。
会有所帮助