Beam - 从 Bigquery 中过滤掉记录
Beam - Filter out Records from Bigquery
我是 Apache Beam 的新手,我想完成三项任务
- 阅读 table
中的前 30 项
- 阅读 table
中排名前 30 的商店
- select 需要 bigquery 中的列并在列 Items 和 Stores.
上应用过滤器
我有以下代码来执行管道
with beam.Pipeline(options=pipeline_args) as p:
#read the dataset from bigquery
query_top_30_items = (
p
| 'GetTopItemNumbers' >> beam.io.ReadFromBigQuery(
query="""SELECT item_number, COUNT(item_number) AS freq_count FROM
[bigquery-public-data.iowa_liquor_sales.sales] GROUP BY item_number
ORDER BY freq_count DESC
LIMIT 30"""
)
| 'ReadItemNumbers' >> beam.Map(lambda elem: elem['item_number'])
| 'ItemNumberAsList' >> beam.combiners.ToList()
)
query_top_30_stores = (
p
|
'GetTopStores' >> beam.io.ReadFromBigQuery(
query = """SELECT store_number, COUNT(store_number) AS store_count
FROM [bigquery-public-data.iowa_liquor_sales.sales] GROUP BY
store_number ORDER BY store_count DESC LIMIT 30"""
)
| 'ReadStoreValues' >> beam.Map(lambda elem:elem['store_number'])
| 'StoreValuesAsList' >> beam.combiners.ToList()
)
query_whole_table = (
(query_top_30_items, query_top_30_stores)
|'ReadTable' >> beam.io.ReadFromBigQuery(
query="""SELECT item_number, store_number, bottles_sold,
state_bottle_retail FROM [bigquery-public-data.iowa_liquor_sales.sales]""")
| 'FilterByItems' >> beam.Filter(lambda row:row['item_number'] in query_top_30_items)
| 'FilterByStore' >> beam.Filter(lambda row:row['store_number'] in query_top_30_stores)
)
我已附上 Traceback 以供参考。我该如何解决这个错误?
temp_location = pcoll.pipeline.options.view_as( Traceback (most
recent call last): File "run.py", line 113, in
run() File "run.py", line 100, in run
| 'FilterByStore' >> beam.Filter(lambda row:row['store_number'] in query_top_30_stores) File
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 1058, in ror
return self.transform.ror(pvalueish, self.label) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 573, in ror
result = p.apply(self, pvalueish, label) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 646, in apply
return self.apply(transform, pvalueish) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 689, in apply
pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/runner.py",
line 188, in apply
return m(transform, input, options) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/runner.py",
line 218, in apply_PTransform
return transform.expand(input) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py",
line 1881, in expand
temp_location = pcoll.pipeline.options.view_as( AttributeError: 'tuple' object has no attribute 'pipeline'
由于我是 Beam 的新手,所以代码没有那么优化。如果我可以进一步优化此代码,请告诉我。
感谢您的宝贵时间和帮助!
在函数上应用筛选条件在管道中不起作用。您有 2 个选项:-
- 在管道中应用过滤条件。
- 对 BQ-SQL 应用过滤条件。
Filter condition over Function 对于 Function 和 return 调用函数来说是不明确的。因此,修改您的代码以将过滤条件应用于上面突出显示的 2 个位置之一。
beam.io.ReadFromBigQuery
必须位于管道的根部,并将管道对象(不是 PCollection
或 PCollection
的元组)作为输入。因此错误。
正如其他答案所提到的,您可以尝试将整个内容编写为单个 BigQuery 查询。否则,您可以在读取后使用 side inputs 进行过滤,例如
with beam.Pipeline(options=pipeline_args) as p:
#read the dataset from bigquery
query_top_30_items = ...
query_top_30_stores = ...
sales = p |'ReadTable' >> beam.io.ReadFromBigQuery(
query="""SELECT item_number, store_number, bottles_sold,
state_bottle_retail FROM [bigquery-public-data.iowa_liquor_sales.sales]""")
filtered = (
sales
| 'FilterByItems' >> beam.Filter(
lambda row, items_side_input: row['item_number'] in items_side_input,
items_side_input=beam.pvalue.AsList(query_top_30_items))
| 'FilterByStore' >> beam.Filter(
lambda row, stores_side_input: row['store_number'] in stores_side_input,
stores_side_input=beam.pvalue.AsList(query_top_30_stores))
)
我是 Apache Beam 的新手,我想完成三项任务
- 阅读 table 中的前 30 项
- 阅读 table 中排名前 30 的商店
- select 需要 bigquery 中的列并在列 Items 和 Stores. 上应用过滤器
我有以下代码来执行管道
with beam.Pipeline(options=pipeline_args) as p:
#read the dataset from bigquery
query_top_30_items = (
p
| 'GetTopItemNumbers' >> beam.io.ReadFromBigQuery(
query="""SELECT item_number, COUNT(item_number) AS freq_count FROM
[bigquery-public-data.iowa_liquor_sales.sales] GROUP BY item_number
ORDER BY freq_count DESC
LIMIT 30"""
)
| 'ReadItemNumbers' >> beam.Map(lambda elem: elem['item_number'])
| 'ItemNumberAsList' >> beam.combiners.ToList()
)
query_top_30_stores = (
p
|
'GetTopStores' >> beam.io.ReadFromBigQuery(
query = """SELECT store_number, COUNT(store_number) AS store_count
FROM [bigquery-public-data.iowa_liquor_sales.sales] GROUP BY
store_number ORDER BY store_count DESC LIMIT 30"""
)
| 'ReadStoreValues' >> beam.Map(lambda elem:elem['store_number'])
| 'StoreValuesAsList' >> beam.combiners.ToList()
)
query_whole_table = (
(query_top_30_items, query_top_30_stores)
|'ReadTable' >> beam.io.ReadFromBigQuery(
query="""SELECT item_number, store_number, bottles_sold,
state_bottle_retail FROM [bigquery-public-data.iowa_liquor_sales.sales]""")
| 'FilterByItems' >> beam.Filter(lambda row:row['item_number'] in query_top_30_items)
| 'FilterByStore' >> beam.Filter(lambda row:row['store_number'] in query_top_30_stores)
)
我已附上 Traceback 以供参考。我该如何解决这个错误?
temp_location = pcoll.pipeline.options.view_as( Traceback (most recent call last): File "run.py", line 113, in run() File "run.py", line 100, in run | 'FilterByStore' >> beam.Filter(lambda row:row['store_number'] in query_top_30_stores) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 1058, in ror return self.transform.ror(pvalueish, self.label) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 573, in ror result = p.apply(self, pvalueish, label) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 646, in apply return self.apply(transform, pvalueish) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 689, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 188, in apply return m(transform, input, options) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 218, in apply_PTransform return transform.expand(input) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1881, in expand temp_location = pcoll.pipeline.options.view_as( AttributeError: 'tuple' object has no attribute 'pipeline'
由于我是 Beam 的新手,所以代码没有那么优化。如果我可以进一步优化此代码,请告诉我。
感谢您的宝贵时间和帮助!
在函数上应用筛选条件在管道中不起作用。您有 2 个选项:-
- 在管道中应用过滤条件。
- 对 BQ-SQL 应用过滤条件。
Filter condition over Function 对于 Function 和 return 调用函数来说是不明确的。因此,修改您的代码以将过滤条件应用于上面突出显示的 2 个位置之一。
beam.io.ReadFromBigQuery
必须位于管道的根部,并将管道对象(不是 PCollection
或 PCollection
的元组)作为输入。因此错误。
正如其他答案所提到的,您可以尝试将整个内容编写为单个 BigQuery 查询。否则,您可以在读取后使用 side inputs 进行过滤,例如
with beam.Pipeline(options=pipeline_args) as p:
#read the dataset from bigquery
query_top_30_items = ...
query_top_30_stores = ...
sales = p |'ReadTable' >> beam.io.ReadFromBigQuery(
query="""SELECT item_number, store_number, bottles_sold,
state_bottle_retail FROM [bigquery-public-data.iowa_liquor_sales.sales]""")
filtered = (
sales
| 'FilterByItems' >> beam.Filter(
lambda row, items_side_input: row['item_number'] in items_side_input,
items_side_input=beam.pvalue.AsList(query_top_30_items))
| 'FilterByStore' >> beam.Filter(
lambda row, stores_side_input: row['store_number'] in stores_side_input,
stores_side_input=beam.pvalue.AsList(query_top_30_stores))
)