具有动态目的地的 WriteToBigQuery

WriteToBigQuery with dynamic destinations

我正在开发一个 Apache Beam 管道,它从 pub/sub 中读取一堆事件,然后根据事件类型将它们写入单独的 BigQuery tables。

我知道 WriteToBigQuery 支持动态目标,但我的问题是目标是从事件中读取的数据派生的。例如: 一个事件看起来像

{
 "object_id": 123,
 ... some metadata,
 "object_data": {object related info}
}

应写入 BigQuery table 的数据位于事件的 object_data 键下,但是,table 名称源自元数据中的其他字段。 我尝试使用侧面输入参数,但问题是因为每个事件都可以有不同的目的地,所以侧面输入不会相应更新。代码如下:

class DumpToBigQuery(PTransform):

    def _choose_table(self, element, table_names):
        # table_names = {"table_name": "project_name.dataset.table_name}
        table_name = table_names["table_name"]
        return table_name

    def expand(self, pcoll):
        events = (
            pcoll
            | "GroupByObjectType" >> Map(lambda e: (e["object_type"], e))
            | "Window"
            >> WindowInto(
                windowfn=FixedWindows(self.window_interval_seconds)
            )
            | "GroupByKey" >> GroupByKey()
            | "KeepLastEventOnly" >> ParDo(WillTakeLatestEventForKey()
        )

        table_name = events | Map(lambda e: ["table_name", f"{self.project}:{self.dataset}.{e[0]}"])
        table_names_dct = AsDict(table_name)

        events_to_write = events | Map(lambda e: e[1]) | Map(self._drop_unwanted_fields)

        return events_to_write | "toBQ" >> WriteToBigQuery(
            table=self._choose_table,
            table_side_inputs=(table_names_dct,),
            create_disposition=BigQueryDisposition.CREATE_NEVER,
            insert_retry_strategy=RetryStrategy.RETRY_NEVER,
        )

您可以看到辅助输入取自管道 table_name 的另一个分支,该分支基本上是从事件中提取 table 名称。然后,将其作为 WriteToBigQuery 的输入。不幸的是,这在负载下并没有真正起作用,侧输入没有更新并且一些事件使用了错误的目的地。

在此特定情况下我可以使用哪些其他方法?所有文档都使用静态示例,并没有真正涵盖这种动态方法。

我尝试的另一件事是编写一个使用 HTTP BigQuery 客户端并插入行的自定义 DoFn,这里的问题是管道的速度,因为每秒插入大约 6-7 个事件.

我遇到了类似的问题,对此我有解决办法。

我看到您有 create_disposition=BigQueryDisposition.CREATE_NEVER,因此表列表在代码运行之前就已知了。也许它很笨拙,但它是众所周知的。我有一个 DoFn,其中 yeild 有很多 TaggedOutput 它的 process 方法。然后我的管道看起来像:

parser_outputs = ['my', 'list', 'of', 'tables']
with beam.Pipeline(options=PipelineOptions(), argv=args) as p:
    pipe = (
        p
        | "Start" >> beam.Create(["example row"])
        | "Split"
        >> beam.ParDo(MySplitFn()).with_outputs(*parser_outputs)
    )

    for output in parser_outputs:
        pipe[output] | "write {}".format(output) >> beam.io.WriteToBigQuery(
            bigquery.TableReference(
                projectId=options.projectId, datasetId=DATASET_ID, tableId=output
            ),
            schema=padl_shared.getBQSchema(parser.getSchemaForDataflow(rowTypeName=output)),
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        )

    p.run().wait_until_finish()