Google 数据流:运行 在 Python 中使用 BigQuery+Pub/Sub 进行动态查询
Google Dataflow: running dynamic query with BigQuery+Pub/Sub in Python
我想在管道中做的事情:
- 从 pub/sub 读取(完成)
- 将此数据转换为字典(完成)
- 从字典中取指定键的值(完成)
运行 来自 BigQuery 的 parametrized/dynamic 查询,其中 where 部分应该是这样的:
SELECT field1 FROM Table where field2 = @valueFromP/S
流水线
| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription='')
| 'String to dictionary' >> beam.Map(lambda s:data_ingestion.parse_method(s))
| 'BigQuery' >> <Here is where I'm not sure how to do it>
从 BQ 读取的正常方式如下:
| 'Read' >> beam.io.Read(beam.io.BigQuerySource(
query="SELECT field1 FROM table where field2='string'", use_standard_sql=True))
我读过有关参数化的内容 queries 但我不确定这是否适用于 apache beam。
可以使用侧输入来完成吗?
哪种方法最好?
我尝试过的:
def parse_methodBQ(input):
query=''SELECT field1 FROM table WHERE field1=\'%s\' AND field2=True' % (input['field1'])'
return query
class ReadFromBigQuery(beam.PTransform):
def expand(self, pcoll):
return (
pcoll
| 'FormatQuery' >> beam.Map(parse_methodBQ)
| 'Read' >> beam.Map(lambda s: beam.io.Read(beam.io.BigQuerySource(query=s)))
)
with beam.Pipeline(options=pipeline_options) as p:
transform = (p | 'BQ' >> ReadFromBigQuery()
结果(为什么这样?):
<Read(PTransform) label=[Read]>
正确的结果应该是这样的:
{u'Field1': u'string', u'Field2': Bool}
解决方案
筹备中:
| 'BQ' >> beam.Map(parse_method_BQ))
函数(使用 BigQuery 0.25 API 进行数据流)
def parse_method_BQ(input):
client = bigquery.Client()
QUERY = 'SELECT field1 FROM table WHERE field1=\'%s\' AND field2=True' % (input['field1'])
client.use_legacy_sql = False
query_job = client.run_async_query(query=QUERY ,job_name='temp-query-job_{}'.format(uuid.uuid4())) # API request
query_job.begin()
while True:
query_job.reload() # Refreshes the state via a GET request.
if query_job.state == 'DONE':
if query_job.error_result:
raise RuntimeError(query_job.errors)
rows = query_job.results().fetch_data()
for row in rows:
if not (row[0] is None):
return input
time.sleep(1)
你可以read the whole table or use a string query。
我了解到您将根据需要使用parse_methodBQ方法自定义查询。作为此方法 returns 查询,您可以使用 BigQuerySource 调用它。这些行在字典中。
| 'QueryTable' >> beam.Map(beam.io.BigQuerySource(parse_methodBQ))
# Each row is a dictionary where the keys are the BigQuery columns
| 'Read' >> beam.Map(lambda s: s['data'])
此外,您可以避免自定义查询并使用 filter method
关于辅助输入,请查看食谱中的 this 示例以更好地了解如何使用它们。
我想在管道中做的事情:
- 从 pub/sub 读取(完成)
- 将此数据转换为字典(完成)
- 从字典中取指定键的值(完成)
运行 来自 BigQuery 的 parametrized/dynamic 查询,其中 where 部分应该是这样的:
SELECT field1 FROM Table where field2 = @valueFromP/S
流水线
| 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription='')
| 'String to dictionary' >> beam.Map(lambda s:data_ingestion.parse_method(s))
| 'BigQuery' >> <Here is where I'm not sure how to do it>
从 BQ 读取的正常方式如下:
| 'Read' >> beam.io.Read(beam.io.BigQuerySource(
query="SELECT field1 FROM table where field2='string'", use_standard_sql=True))
我读过有关参数化的内容 queries 但我不确定这是否适用于 apache beam。
可以使用侧输入来完成吗?
哪种方法最好?
我尝试过的:
def parse_methodBQ(input):
query=''SELECT field1 FROM table WHERE field1=\'%s\' AND field2=True' % (input['field1'])'
return query
class ReadFromBigQuery(beam.PTransform):
def expand(self, pcoll):
return (
pcoll
| 'FormatQuery' >> beam.Map(parse_methodBQ)
| 'Read' >> beam.Map(lambda s: beam.io.Read(beam.io.BigQuerySource(query=s)))
)
with beam.Pipeline(options=pipeline_options) as p:
transform = (p | 'BQ' >> ReadFromBigQuery()
结果(为什么这样?):
<Read(PTransform) label=[Read]>
正确的结果应该是这样的:
{u'Field1': u'string', u'Field2': Bool}
解决方案
筹备中:
| 'BQ' >> beam.Map(parse_method_BQ))
函数(使用 BigQuery 0.25 API 进行数据流)
def parse_method_BQ(input):
client = bigquery.Client()
QUERY = 'SELECT field1 FROM table WHERE field1=\'%s\' AND field2=True' % (input['field1'])
client.use_legacy_sql = False
query_job = client.run_async_query(query=QUERY ,job_name='temp-query-job_{}'.format(uuid.uuid4())) # API request
query_job.begin()
while True:
query_job.reload() # Refreshes the state via a GET request.
if query_job.state == 'DONE':
if query_job.error_result:
raise RuntimeError(query_job.errors)
rows = query_job.results().fetch_data()
for row in rows:
if not (row[0] is None):
return input
time.sleep(1)
你可以read the whole table or use a string query。
我了解到您将根据需要使用parse_methodBQ方法自定义查询。作为此方法 returns 查询,您可以使用 BigQuerySource 调用它。这些行在字典中。
| 'QueryTable' >> beam.Map(beam.io.BigQuerySource(parse_methodBQ))
# Each row is a dictionary where the keys are the BigQuery columns
| 'Read' >> beam.Map(lambda s: s['data'])
此外,您可以避免自定义查询并使用 filter method
关于辅助输入,请查看食谱中的 this 示例以更好地了解如何使用它们。