在运行时在数据流 python 中向 'beam.io.BigQuerySource' 提供 'query' 参数

Providing 'query' parameter to 'beam.io.BigQuerySource' at runtime in dataflow python

TLDR:我想 运行 beam.io.BigQuerySource 每个月使用数据流 API 和模板进行不同的查询。如果那不可能,那么我可以在 运行 时将查询传递给 beam.io.BigQuerySource,同时仍然使用 Dataflow API 和模板吗?

我有一个数据流 'batch' 数据管道,它读取 BigQuery table,如下所示

def run(argv=None):
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--pro_id',
        dest='pro_id',
        type=str,
        default='xxxxxxxxxx',
        help='project id')
    parser.add_argument(
        '--dataset',
        dest='dataset',
        type=str,
        default='xxxxxxxxxx',
        help='bigquery dataset to read data from')

    args, pipeline_args = parser.parse_known_args(argv)
    project_id = args.pro_id
    dataset_id = args.dataset

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    with beam.Pipeline(argv=pipeline_args) as p:
    
        companies = (
                p
                | "Read from BigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=query_bq(project_id, dataset_id),
                                                                              use_standard_sql=True))
        )

beam.io.BigQuerySource的查询参数是通过这样的函数计算的

from datetime import datetime
def query_bq(project, dataset):
    month = datetime.today().replace(day=1).strftime("%Y_%m_%d")
    query = (
        f'SELECT * FROM `{project}.{dataset}.data_{month}_json` '
        f'LIMIT 10'
    )
    return query

这里有几点要注意

  1. 我想运行这个数据管道一天一次
  2. table ID 每个月都在变化。因此,例如,本月的 table ID 将为 data_2020_06_01_json,下个月的 table ID 将为 data_2020_07_01_json,所有这些均由 def query_bq(project, dataset) 计算得出以上
  3. 我想使用 Dataflow API 使用云函数、pubsub 事件、云调度程序自动执行此批处理管道的 运行ning。

这是由云调度程序每天向 pubsub 发布事件触发的云函数

def run_dataflow(event, context):
    if 'data' in event:
        pubsub_message = base64.b64decode(event['data']).decode('utf-8')
        pubsub_message_dict = ast.literal_eval(pubsub_message)
        event = pubsub_message_dict.get("eventName")
        now = datetime.today().strftime("%Y-%m-%d-%H-%M-%S")
        project = 'xxx-xxx-xxx'
        region = 'europe-west2'
        dataflow = build('dataflow', 'v1b3', cache_discovery=False)
        if event == "run_dataflow":
            job = f'dataflow-{now}'
            template = 'gs://xxxxx/templates/xxxxx'
            request = dataflow.projects().locations().templates().launch(
                projectId=project,
                gcsPath=template,
                location=region,
                body={
                    'jobName': job,
                }
            )
            response = request.execute()
            print(response)

这是我用来在数据流上启动此数据管道的命令

python main.py \
    --setup_file ./setup.py \
    --project xxx-xx-xxxx \
    --pro_id xxx-xx-xxxx \
    --dataset 'xx-xxx-xxx' \
    --machine_type=n1-standard-4 \
    --max_num_workers=5 \
    --num_workers=1 \
    --region europe-west2  \
    --serviceAccount= xxx-xxx-xxx \
    --runner DataflowRunner \
    --staging_location gs://xx/xx \
    --temp_location gs://xx/temp \
    --subnetwork="xxxxxxxxxx" \
    --template_location gs://xxxxx/templates/xxxxx

我面临的问题:

我的 query_bq 函数在编译和创建数据流模板的过程中被调用,然后加载到 GCS。 query_bq 函数在 运行 时间内不会被调用。因此,每当我的云函数调用数据流创建时,它总是从 data_2020_06_01_json table 读取并且查询中的 table 将始终保持不变,即使我们进入 7 月、8 月等。我真正想要的是根据 query_bq 函数动态更改该查询,以便将来我可以读取 data_2020_07_01_jsondata_2020_08_01_json 等等。

我也查看了生成的模板文件,看起来查询在编译后被硬编码到模板中。这是一个片段

 "name": "beamapp-xxxxx-0629014535-344920",
  "steps": [
    {
      "kind": "ParallelRead",
      "name": "s1",
      "properties": {
        "bigquery_export_format": "FORMAT_AVRO",
        "bigquery_flatten_results": true,
        "bigquery_query": "SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10",
        "bigquery_use_legacy_sql": false,
        "display_data": [
          {
            "key": "source",
            "label": "Read Source",
            "namespace": "apache_beam.runners.dataflow.ptransform_overrides.Read",
            "shortValue": "BigQuerySource",
            "type": "STRING",
            "value": "apache_beam.io.gcp.bigquery.BigQuerySource"
          },
          {
            "key": "query",
            "label": "Query",
            "namespace": "apache_beam.io.gcp.bigquery.BigQuerySource",
            "type": "STRING",
            "value": "SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10"
          },
          {
            "key": "validation",
            "label": "Validation Enabled",
            "namespace": "apache_beam.io.gcp.bigquery.BigQuerySource",
            "type": "BOOLEAN",
            "value": false
          }
        ],
        "format": "bigquery",
        "output_info": [
          {

我试过的替代方法

我也尝试了这里定义的 ValueProvider https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#pipeline-io-and-runtime-parameters

我将其添加到我的代码中

class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--query_bq', type=str)

user_options = pipeline_options.view_as(UserOptions)
p | "Read from BigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=user_options.query_bq,
                                                                              use_standard_sql=True))

当我 运行 出现这个错误时

WARNING:apache_beam.utils.retry:Retry with exponential backoff: waiting for 3.9023594566785924 seconds before retrying get_query_location because we caught exception: apitools.base.protorpclite.messages.ValidationError: Expected type <class 'str'> for field query, found SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10 (type <class 'apache_beam.options.value_provider.StaticValueProvider'>)

所以我猜 beam.io.BigQuerySource 不接受 ValueProviders

您不能在 BigQuerySource 中使用 ValueProviders,但对于较新版本的 Beam,您可以使用 beam.io.ReadFromBigQuery,它支持它们。

你会做:

result = (p 
          | beam.io.ReadFromBigQuery(query=options.input_query,
                                     ....))

您可以传递值提供程序,它还有很多其他实用程序。 Check out its documentation