Apache Beam - JSON 分组

Apache Beam - JSON grouping

我是 python3 的 apache beam 新手,我必须用它构建特定的管道,但我不知道如何执行最后一步。

我已经转换并清理了每行 JSON 个元素,我希望它们按键分组,所有我想存储在其中的元素(其余被删除)。

例如行

{"Name":"Mark", "age":23, "transaction_no": "001", "price":59.99, "someflag" : True}
{"Name":"Mark", "age":23, "transaction_no": "002", "price":10.00, "someflag" : False}

要转换为单个 JSON 对象:

{"Mark" : [{"age":23, "transaction_no": "001", "price":59.99}, {"age":23, "transaction_no": "002", "price":10.00}

列表中的元素将只是我选择的元素(一些例如标志被删除)

在 Apache Beam 中进行此类分组的最有效方法是什么?

感谢任何帮助!!!

这是您需要的示例代码。根据我的理解,您只需要映射器每次都 return json 而不是字典列表。

所以在这种情况下,当您只需要一个 json 时,您可以按如下方式编写映射器。

import apache_beam as beam


def  map_as_json(item,key_col,cols_to_exclude):
    row = {
        item[key_col]: [
        {
            key: val for  key,val in item.items() if key not in cols_to_exclude and key not in key_col
        }
    ] 
    }
    return row
    
with beam.Pipeline() as p:
    group_stocks_by_date_name = (
        p
        | 'create'>>beam.Create(
            [
                {"Name":"Mark", "age":23, "transaction_no": "001", "price":59.99, "someflag" : True},
                {"Name":"Mark", "age":23, "transaction_no": "002", "price":10.00, "someflag" : False}
            ]
        )
        | 'selective details'>> beam.Map(map_as_json,key_col='Name',cols_to_exclude=['someflag'])
        | 'print'>>beam.Map(print)
    )

如果有帮助,请标记为答案。