具有动态目的地的 WriteToBigQuery
WriteToBigQuery with dynamic destinations
我正在开发一个 Apache Beam 管道,它从 pub/sub 中读取一堆事件,然后根据事件类型将它们写入单独的 BigQuery tables。
我知道 WriteToBigQuery
支持动态目标,但我的问题是目标是从事件中读取的数据派生的。例如:
一个事件看起来像
{
"object_id": 123,
... some metadata,
"object_data": {object related info}
}
应写入 BigQuery table 的数据位于事件的 object_data
键下,但是,table 名称源自元数据中的其他字段。
我尝试使用侧面输入参数,但问题是因为每个事件都可以有不同的目的地,所以侧面输入不会相应更新。代码如下:
class DumpToBigQuery(PTransform):
def _choose_table(self, element, table_names):
# table_names = {"table_name": "project_name.dataset.table_name}
table_name = table_names["table_name"]
return table_name
def expand(self, pcoll):
events = (
pcoll
| "GroupByObjectType" >> Map(lambda e: (e["object_type"], e))
| "Window"
>> WindowInto(
windowfn=FixedWindows(self.window_interval_seconds)
)
| "GroupByKey" >> GroupByKey()
| "KeepLastEventOnly" >> ParDo(WillTakeLatestEventForKey()
)
table_name = events | Map(lambda e: ["table_name", f"{self.project}:{self.dataset}.{e[0]}"])
table_names_dct = AsDict(table_name)
events_to_write = events | Map(lambda e: e[1]) | Map(self._drop_unwanted_fields)
return events_to_write | "toBQ" >> WriteToBigQuery(
table=self._choose_table,
table_side_inputs=(table_names_dct,),
create_disposition=BigQueryDisposition.CREATE_NEVER,
insert_retry_strategy=RetryStrategy.RETRY_NEVER,
)
您可以看到辅助输入取自管道 table_name
的另一个分支,该分支基本上是从事件中提取 table 名称。然后,将其作为 WriteToBigQuery
的输入。不幸的是,这在负载下并没有真正起作用,侧输入没有更新并且一些事件使用了错误的目的地。
在此特定情况下我可以使用哪些其他方法?所有文档都使用静态示例,并没有真正涵盖这种动态方法。
我尝试的另一件事是编写一个使用 HTTP BigQuery 客户端并插入行的自定义 DoFn
,这里的问题是管道的速度,因为每秒插入大约 6-7 个事件.
我遇到了类似的问题,对此我有解决办法。
我看到您有 create_disposition=BigQueryDisposition.CREATE_NEVER
,因此表列表在代码运行之前就已知了。也许它很笨拙,但它是众所周知的。我有一个 DoFn
,其中 yeild
有很多 TaggedOutput
它的 process
方法。然后我的管道看起来像:
parser_outputs = ['my', 'list', 'of', 'tables']
with beam.Pipeline(options=PipelineOptions(), argv=args) as p:
pipe = (
p
| "Start" >> beam.Create(["example row"])
| "Split"
>> beam.ParDo(MySplitFn()).with_outputs(*parser_outputs)
)
for output in parser_outputs:
pipe[output] | "write {}".format(output) >> beam.io.WriteToBigQuery(
bigquery.TableReference(
projectId=options.projectId, datasetId=DATASET_ID, tableId=output
),
schema=padl_shared.getBQSchema(parser.getSchemaForDataflow(rowTypeName=output)),
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
)
p.run().wait_until_finish()
我正在开发一个 Apache Beam 管道,它从 pub/sub 中读取一堆事件,然后根据事件类型将它们写入单独的 BigQuery tables。
我知道 WriteToBigQuery
支持动态目标,但我的问题是目标是从事件中读取的数据派生的。例如:
一个事件看起来像
{
"object_id": 123,
... some metadata,
"object_data": {object related info}
}
应写入 BigQuery table 的数据位于事件的 object_data
键下,但是,table 名称源自元数据中的其他字段。
我尝试使用侧面输入参数,但问题是因为每个事件都可以有不同的目的地,所以侧面输入不会相应更新。代码如下:
class DumpToBigQuery(PTransform):
def _choose_table(self, element, table_names):
# table_names = {"table_name": "project_name.dataset.table_name}
table_name = table_names["table_name"]
return table_name
def expand(self, pcoll):
events = (
pcoll
| "GroupByObjectType" >> Map(lambda e: (e["object_type"], e))
| "Window"
>> WindowInto(
windowfn=FixedWindows(self.window_interval_seconds)
)
| "GroupByKey" >> GroupByKey()
| "KeepLastEventOnly" >> ParDo(WillTakeLatestEventForKey()
)
table_name = events | Map(lambda e: ["table_name", f"{self.project}:{self.dataset}.{e[0]}"])
table_names_dct = AsDict(table_name)
events_to_write = events | Map(lambda e: e[1]) | Map(self._drop_unwanted_fields)
return events_to_write | "toBQ" >> WriteToBigQuery(
table=self._choose_table,
table_side_inputs=(table_names_dct,),
create_disposition=BigQueryDisposition.CREATE_NEVER,
insert_retry_strategy=RetryStrategy.RETRY_NEVER,
)
您可以看到辅助输入取自管道 table_name
的另一个分支,该分支基本上是从事件中提取 table 名称。然后,将其作为 WriteToBigQuery
的输入。不幸的是,这在负载下并没有真正起作用,侧输入没有更新并且一些事件使用了错误的目的地。
在此特定情况下我可以使用哪些其他方法?所有文档都使用静态示例,并没有真正涵盖这种动态方法。
我尝试的另一件事是编写一个使用 HTTP BigQuery 客户端并插入行的自定义 DoFn
,这里的问题是管道的速度,因为每秒插入大约 6-7 个事件.
我遇到了类似的问题,对此我有解决办法。
我看到您有 create_disposition=BigQueryDisposition.CREATE_NEVER
,因此表列表在代码运行之前就已知了。也许它很笨拙,但它是众所周知的。我有一个 DoFn
,其中 yeild
有很多 TaggedOutput
它的 process
方法。然后我的管道看起来像:
parser_outputs = ['my', 'list', 'of', 'tables']
with beam.Pipeline(options=PipelineOptions(), argv=args) as p:
pipe = (
p
| "Start" >> beam.Create(["example row"])
| "Split"
>> beam.ParDo(MySplitFn()).with_outputs(*parser_outputs)
)
for output in parser_outputs:
pipe[output] | "write {}".format(output) >> beam.io.WriteToBigQuery(
bigquery.TableReference(
projectId=options.projectId, datasetId=DATASET_ID, tableId=output
),
schema=padl_shared.getBQSchema(parser.getSchemaForDataflow(rowTypeName=output)),
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
)
p.run().wait_until_finish()