Beam - 从 Bigquery 中过滤掉记录

Beam - Filter out Records from Bigquery

我是 Apache Beam 的新手,我想完成三项任务

  1. 阅读 table
  2. 中的前 30 项
  3. 阅读 table
  4. 中排名前 30 的商店
  5. select 需要 bigquery 中的列并在列 ItemsStores.
  6. 上应用过滤器

我有以下代码来执行管道

with beam.Pipeline(options=pipeline_args) as p:
        #read the dataset from bigquery
        query_top_30_items = (
            p 
            | 'GetTopItemNumbers' >> beam.io.ReadFromBigQuery(
                query="""SELECT item_number, COUNT(item_number) AS freq_count FROM 
                [bigquery-public-data.iowa_liquor_sales.sales] GROUP BY item_number 
                ORDER BY freq_count DESC
                LIMIT 30"""
            )
            | 'ReadItemNumbers' >> beam.Map(lambda elem: elem['item_number'])
            | 'ItemNumberAsList' >> beam.combiners.ToList()
        )


        query_top_30_stores = (
            p
            |
            'GetTopStores' >> beam.io.ReadFromBigQuery(
                query = """SELECT store_number, COUNT(store_number) AS store_count
                 FROM [bigquery-public-data.iowa_liquor_sales.sales] GROUP BY
                 store_number ORDER BY store_count DESC LIMIT 30"""
            )
            | 'ReadStoreValues' >> beam.Map(lambda elem:elem['store_number'])
            | 'StoreValuesAsList' >> beam.combiners.ToList()
        )

        query_whole_table = (
            (query_top_30_items, query_top_30_stores)
            |'ReadTable' >> beam.io.ReadFromBigQuery(
                query="""SELECT item_number, store_number, bottles_sold,
                    state_bottle_retail  FROM [bigquery-public-data.iowa_liquor_sales.sales]""")
            | 'FilterByItems' >> beam.Filter(lambda row:row['item_number'] in query_top_30_items)
            | 'FilterByStore' >> beam.Filter(lambda row:row['store_number'] in query_top_30_stores)
        )

我已附上 Traceback 以供参考。我该如何解决这个错误?

temp_location = pcoll.pipeline.options.view_as( Traceback (most recent call last): File "run.py", line 113, in run() File "run.py", line 100, in run | 'FilterByStore' >> beam.Filter(lambda row:row['store_number'] in query_top_30_stores) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 1058, in ror return self.transform.ror(pvalueish, self.label) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 573, in ror result = p.apply(self, pvalueish, label) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 646, in apply return self.apply(transform, pvalueish) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 689, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 188, in apply return m(transform, input, options) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 218, in apply_PTransform return transform.expand(input) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1881, in expand temp_location = pcoll.pipeline.options.view_as( AttributeError: 'tuple' object has no attribute 'pipeline'

由于我是 Beam 的新手,所以代码没有那么优化。如果我可以进一步优化此代码,请告诉我。

感谢您的宝贵时间和帮助!

在函数上应用筛选条件在管道中不起作用。您有 2 个选项:-

  1. 在管道中应用过滤条件。
  2. 对 BQ-SQL 应用过滤条件。

Filter condition over Function 对于 Function 和 return 调用函数来说是不明确的。因此,修改您的代码以将过滤条件应用于上面突出显示的 2 个位置之一。

beam.io.ReadFromBigQuery 必须位于管道的根部,并将管道对象(不是 PCollectionPCollection 的元组)作为输入。因此错误。

正如其他答案所提到的,您可以尝试将整个内容编写为单个 BigQuery 查询。否则,您可以在读取后使用 side inputs 进行过滤,例如

with beam.Pipeline(options=pipeline_args) as p:
    #read the dataset from bigquery
    query_top_30_items = ...

    query_top_30_stores = ...

    sales = p |'ReadTable' >> beam.io.ReadFromBigQuery(
        query="""SELECT item_number, store_number, bottles_sold,
    state_bottle_retail  FROM [bigquery-public-data.iowa_liquor_sales.sales]""")

    filtered = (
        sales
        | 'FilterByItems' >> beam.Filter(
            lambda row, items_side_input: row['item_number'] in items_side_input,
            items_side_input=beam.pvalue.AsList(query_top_30_items))
        | 'FilterByStore' >> beam.Filter(
            lambda row, stores_side_input: row['store_number'] in stores_side_input,
            stores_side_input=beam.pvalue.AsList(query_top_30_stores))
    )