在运行时在数据流 python 中向 'beam.io.BigQuerySource' 提供 'query' 参数
Providing 'query' parameter to 'beam.io.BigQuerySource' at runtime in dataflow python
TLDR:我想 运行 beam.io.BigQuerySource
每个月使用数据流 API 和模板进行不同的查询。如果那不可能,那么我可以在 运行 时将查询传递给 beam.io.BigQuerySource
,同时仍然使用 Dataflow API 和模板吗?
我有一个数据流 'batch' 数据管道,它读取 BigQuery table,如下所示
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--pro_id',
dest='pro_id',
type=str,
default='xxxxxxxxxx',
help='project id')
parser.add_argument(
'--dataset',
dest='dataset',
type=str,
default='xxxxxxxxxx',
help='bigquery dataset to read data from')
args, pipeline_args = parser.parse_known_args(argv)
project_id = args.pro_id
dataset_id = args.dataset
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(argv=pipeline_args) as p:
companies = (
p
| "Read from BigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=query_bq(project_id, dataset_id),
use_standard_sql=True))
)
而beam.io.BigQuerySource
的查询参数是通过这样的函数计算的
from datetime import datetime
def query_bq(project, dataset):
month = datetime.today().replace(day=1).strftime("%Y_%m_%d")
query = (
f'SELECT * FROM `{project}.{dataset}.data_{month}_json` '
f'LIMIT 10'
)
return query
这里有几点要注意
- 我想运行这个数据管道一天一次
- table ID 每个月都在变化。因此,例如,本月的 table ID 将为
data_2020_06_01_json
,下个月的 table ID 将为 data_2020_07_01_json
,所有这些均由 def query_bq(project, dataset)
计算得出以上
- 我想使用 Dataflow API 使用云函数、pubsub 事件、云调度程序自动执行此批处理管道的 运行ning。
这是由云调度程序每天向 pubsub 发布事件触发的云函数
def run_dataflow(event, context):
if 'data' in event:
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
pubsub_message_dict = ast.literal_eval(pubsub_message)
event = pubsub_message_dict.get("eventName")
now = datetime.today().strftime("%Y-%m-%d-%H-%M-%S")
project = 'xxx-xxx-xxx'
region = 'europe-west2'
dataflow = build('dataflow', 'v1b3', cache_discovery=False)
if event == "run_dataflow":
job = f'dataflow-{now}'
template = 'gs://xxxxx/templates/xxxxx'
request = dataflow.projects().locations().templates().launch(
projectId=project,
gcsPath=template,
location=region,
body={
'jobName': job,
}
)
response = request.execute()
print(response)
这是我用来在数据流上启动此数据管道的命令
python main.py \
--setup_file ./setup.py \
--project xxx-xx-xxxx \
--pro_id xxx-xx-xxxx \
--dataset 'xx-xxx-xxx' \
--machine_type=n1-standard-4 \
--max_num_workers=5 \
--num_workers=1 \
--region europe-west2 \
--serviceAccount= xxx-xxx-xxx \
--runner DataflowRunner \
--staging_location gs://xx/xx \
--temp_location gs://xx/temp \
--subnetwork="xxxxxxxxxx" \
--template_location gs://xxxxx/templates/xxxxx
我面临的问题:
我的 query_bq
函数在编译和创建数据流模板的过程中被调用,然后加载到 GCS。 query_bq
函数在 运行 时间内不会被调用。因此,每当我的云函数调用数据流创建时,它总是从 data_2020_06_01_json
table 读取并且查询中的 table 将始终保持不变,即使我们进入 7 月、8 月等。我真正想要的是根据 query_bq
函数动态更改该查询,以便将来我可以读取 data_2020_07_01_json
和 data_2020_08_01_json
等等。
我也查看了生成的模板文件,看起来查询在编译后被硬编码到模板中。这是一个片段
"name": "beamapp-xxxxx-0629014535-344920",
"steps": [
{
"kind": "ParallelRead",
"name": "s1",
"properties": {
"bigquery_export_format": "FORMAT_AVRO",
"bigquery_flatten_results": true,
"bigquery_query": "SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10",
"bigquery_use_legacy_sql": false,
"display_data": [
{
"key": "source",
"label": "Read Source",
"namespace": "apache_beam.runners.dataflow.ptransform_overrides.Read",
"shortValue": "BigQuerySource",
"type": "STRING",
"value": "apache_beam.io.gcp.bigquery.BigQuerySource"
},
{
"key": "query",
"label": "Query",
"namespace": "apache_beam.io.gcp.bigquery.BigQuerySource",
"type": "STRING",
"value": "SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10"
},
{
"key": "validation",
"label": "Validation Enabled",
"namespace": "apache_beam.io.gcp.bigquery.BigQuerySource",
"type": "BOOLEAN",
"value": false
}
],
"format": "bigquery",
"output_info": [
{
我试过的替代方法
我也尝试了这里定义的 ValueProvider
https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#pipeline-io-and-runtime-parameters
我将其添加到我的代码中
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--query_bq', type=str)
user_options = pipeline_options.view_as(UserOptions)
p | "Read from BigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=user_options.query_bq,
use_standard_sql=True))
当我 运行 出现这个错误时
WARNING:apache_beam.utils.retry:Retry with exponential backoff: waiting for 3.9023594566785924 seconds before retrying get_query_location because we caught exception: apitools.base.protorpclite.messages.ValidationError: Expected type <class 'str'> for field query, found SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10 (type <class 'apache_beam.options.value_provider.StaticValueProvider'>)
所以我猜 beam.io.BigQuerySource
不接受 ValueProviders
您不能在 BigQuerySource
中使用 ValueProvider
s,但对于较新版本的 Beam,您可以使用 beam.io.ReadFromBigQuery
,它支持它们。
你会做:
result = (p
| beam.io.ReadFromBigQuery(query=options.input_query,
....))
您可以传递值提供程序,它还有很多其他实用程序。 Check out its documentation
TLDR:我想 运行 beam.io.BigQuerySource
每个月使用数据流 API 和模板进行不同的查询。如果那不可能,那么我可以在 运行 时将查询传递给 beam.io.BigQuerySource
,同时仍然使用 Dataflow API 和模板吗?
我有一个数据流 'batch' 数据管道,它读取 BigQuery table,如下所示
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--pro_id',
dest='pro_id',
type=str,
default='xxxxxxxxxx',
help='project id')
parser.add_argument(
'--dataset',
dest='dataset',
type=str,
default='xxxxxxxxxx',
help='bigquery dataset to read data from')
args, pipeline_args = parser.parse_known_args(argv)
project_id = args.pro_id
dataset_id = args.dataset
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(argv=pipeline_args) as p:
companies = (
p
| "Read from BigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=query_bq(project_id, dataset_id),
use_standard_sql=True))
)
而beam.io.BigQuerySource
的查询参数是通过这样的函数计算的
from datetime import datetime
def query_bq(project, dataset):
month = datetime.today().replace(day=1).strftime("%Y_%m_%d")
query = (
f'SELECT * FROM `{project}.{dataset}.data_{month}_json` '
f'LIMIT 10'
)
return query
这里有几点要注意
- 我想运行这个数据管道一天一次
- table ID 每个月都在变化。因此,例如,本月的 table ID 将为
data_2020_06_01_json
,下个月的 table ID 将为data_2020_07_01_json
,所有这些均由def query_bq(project, dataset)
计算得出以上 - 我想使用 Dataflow API 使用云函数、pubsub 事件、云调度程序自动执行此批处理管道的 运行ning。
这是由云调度程序每天向 pubsub 发布事件触发的云函数
def run_dataflow(event, context):
if 'data' in event:
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
pubsub_message_dict = ast.literal_eval(pubsub_message)
event = pubsub_message_dict.get("eventName")
now = datetime.today().strftime("%Y-%m-%d-%H-%M-%S")
project = 'xxx-xxx-xxx'
region = 'europe-west2'
dataflow = build('dataflow', 'v1b3', cache_discovery=False)
if event == "run_dataflow":
job = f'dataflow-{now}'
template = 'gs://xxxxx/templates/xxxxx'
request = dataflow.projects().locations().templates().launch(
projectId=project,
gcsPath=template,
location=region,
body={
'jobName': job,
}
)
response = request.execute()
print(response)
这是我用来在数据流上启动此数据管道的命令
python main.py \
--setup_file ./setup.py \
--project xxx-xx-xxxx \
--pro_id xxx-xx-xxxx \
--dataset 'xx-xxx-xxx' \
--machine_type=n1-standard-4 \
--max_num_workers=5 \
--num_workers=1 \
--region europe-west2 \
--serviceAccount= xxx-xxx-xxx \
--runner DataflowRunner \
--staging_location gs://xx/xx \
--temp_location gs://xx/temp \
--subnetwork="xxxxxxxxxx" \
--template_location gs://xxxxx/templates/xxxxx
我面临的问题:
我的 query_bq
函数在编译和创建数据流模板的过程中被调用,然后加载到 GCS。 query_bq
函数在 运行 时间内不会被调用。因此,每当我的云函数调用数据流创建时,它总是从 data_2020_06_01_json
table 读取并且查询中的 table 将始终保持不变,即使我们进入 7 月、8 月等。我真正想要的是根据 query_bq
函数动态更改该查询,以便将来我可以读取 data_2020_07_01_json
和 data_2020_08_01_json
等等。
我也查看了生成的模板文件,看起来查询在编译后被硬编码到模板中。这是一个片段
"name": "beamapp-xxxxx-0629014535-344920",
"steps": [
{
"kind": "ParallelRead",
"name": "s1",
"properties": {
"bigquery_export_format": "FORMAT_AVRO",
"bigquery_flatten_results": true,
"bigquery_query": "SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10",
"bigquery_use_legacy_sql": false,
"display_data": [
{
"key": "source",
"label": "Read Source",
"namespace": "apache_beam.runners.dataflow.ptransform_overrides.Read",
"shortValue": "BigQuerySource",
"type": "STRING",
"value": "apache_beam.io.gcp.bigquery.BigQuerySource"
},
{
"key": "query",
"label": "Query",
"namespace": "apache_beam.io.gcp.bigquery.BigQuerySource",
"type": "STRING",
"value": "SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10"
},
{
"key": "validation",
"label": "Validation Enabled",
"namespace": "apache_beam.io.gcp.bigquery.BigQuerySource",
"type": "BOOLEAN",
"value": false
}
],
"format": "bigquery",
"output_info": [
{
我试过的替代方法
我也尝试了这里定义的 ValueProvider
https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#pipeline-io-and-runtime-parameters
我将其添加到我的代码中
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--query_bq', type=str)
user_options = pipeline_options.view_as(UserOptions)
p | "Read from BigQuery" >> beam.io.Read(beam.io.BigQuerySource(query=user_options.query_bq,
use_standard_sql=True))
当我 运行 出现这个错误时
WARNING:apache_beam.utils.retry:Retry with exponential backoff: waiting for 3.9023594566785924 seconds before retrying get_query_location because we caught exception: apitools.base.protorpclite.messages.ValidationError: Expected type <class 'str'> for field query, found SELECT * FROM `xxxx.xxxx.data_2020_06_01_json` LIMIT 10 (type <class 'apache_beam.options.value_provider.StaticValueProvider'>)
所以我猜 beam.io.BigQuerySource
不接受 ValueProviders
您不能在 BigQuerySource
中使用 ValueProvider
s,但对于较新版本的 Beam,您可以使用 beam.io.ReadFromBigQuery
,它支持它们。
你会做:
result = (p
| beam.io.ReadFromBigQuery(query=options.input_query,
....))
您可以传递值提供程序,它还有很多其他实用程序。 Check out its documentation