Apache Beam:将具有对象列表的对象转换为多个 TableRow 以写入 BigQuery

Apache Beam : Transform an objects having a list of objects to multiple TableRows to write to BigQuery

我正在使用 Beam 管道处理 json 并将其写入 bigquery。 JSON 是这样的。

{
"message": [{
    "name": "abc",
    "itemId": "2123",
    "itemName": "test"

}, {
    "name": "vfg",
    "itemId": "56457",
    "itemName": "Chicken"
}],
"publishDate": "2017-10-26T04:54:16.207Z"

}

我使用 Jackson 将其解析为以下结构。

class Feed{
List<Message> messages; 
TimeStamp  publishDate;

}

public class Message implements Serializable{

/**
 * 
 */
private static final long serialVersionUID = 1L;
private String key;
private String value;

private Map<String, String> eventItemMap = new HashMap<>();
this property translate the list of map as a single map with all the key-value pair together. because,  the messages property will be parsed as list of HashMap objets for each key/value. This will be translated to a single map. 

现在在我的管道中,我会将集合转换为

PCollection<KV<String, Feed>>

根据 class 中的 属性 将其写入不同的 table。我写了一个转换来做到这一点。 需求是根据消息对象的数量创建多个 TableRow。我在 JSON 中还有一些属性以及将添加到 tableRow 和每个消息属性的 publishDate。 所以 table 将如下所示。

id, name, field1, field2, message1.property1, message1.property2...

id, name, field1, field2, message2.property1, message2.property2...

我尝试创建以下转换。但是,不确定它将如何根据消息列表输出多行。

private class BuildRowListFn extends DoFn<KV<String, Feed>, List<TableRow>> {

    @ProcessElement
    public void processElement(ProcessContext context) {
        Feed feed = context.element().getValue();

        List<Message> messages = feed.getMessage();
        List<TableRow> rows = new ArrayList<>();
        messages.forEach((message) -> {
            TableRow row = new TableRow();
            row.set("column1", feed.getPublishDate());
            row.set("column2", message.getEventItemMap().get("key1"));
            row.set("column3", message.getEventItemMap().get("key2"));
            rows.add(row);
        }

        );

    }

但是,这也将是一个列表,我将无法应用 BigQueryIO.write 转换。


根据 "Eugene" aka @jkff

的评论更新

谢谢@jkff。现在,我已经按照您在第二段中提到的那样更改了代码。 context.output(行)在 messages.forEach 内,将 table 行设置为

之后
List<Message> messages = feed.getMessage();
        messages.forEach((message) -> {
            TableRow row = new TableRow();
            row.set("column2", message.getEventItemMap().get("key1"));
            context.output(row);
            }

现在,当我尝试将此集合写入 BigQuery 时,

rows.apply(BigQueryIO.writeTableRows().to(getTable(projectId, datasetId, tableName)).withSchema(getSchema())
                    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND));

我收到以下异常。

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:331)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:301)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:200)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:63)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
at com.chefd.gcloud.analytics.pipeline.MyPipeline.main(MyPipeline.java:284)


Caused by: java.lang.NullPointerException
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:759)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)

请帮忙。

谢谢。

您似乎假设 DoFn 每个元素只能输出一个值。事实并非如此:它可以为每个元素输出任意数量的值——无值、一个值、多个值等。DoFn 甚至可以 output values to multiple PCollection's.

在您的情况下,您只需为 @ProcessElement 方法中的每一行调用 c.output(row),例如:rows.forEach(c::output)。当然,您还需要将 DoFn 的类型更改为 DoFn<KV<String, Feed>, TableRow>,因为其输出 PCollection 中的元素类型是 TableRow,而不是 List<TableRow> - 您只是为每个输入元素将多个元素生成到集合中,但这不会改变类型。

另一种方法是做您当前所做的,也做 c.output(rows) 然后应用 Flatten.iterables()PCollection<List<TableRow>> 展平为 PCollection<TableRow>(您可能需要将 List 替换为 Iterable 才能使其正常工作)。但另一种方法更简单。