Apache Beam - JSON 分组
Apache Beam - JSON grouping
我是 python3 的 apache beam 新手,我必须用它构建特定的管道,但我不知道如何执行最后一步。
我已经转换并清理了每行 JSON 个元素,我希望它们按键分组,所有我想存储在其中的元素(其余被删除)。
例如行
{"Name":"Mark", "age":23, "transaction_no": "001", "price":59.99, "someflag" : True}
{"Name":"Mark", "age":23, "transaction_no": "002", "price":10.00, "someflag" : False}
要转换为单个 JSON 对象:
{"Mark" : [{"age":23, "transaction_no": "001", "price":59.99}, {"age":23, "transaction_no": "002", "price":10.00}
列表中的元素将只是我选择的元素(一些例如标志被删除)
在 Apache Beam 中进行此类分组的最有效方法是什么?
感谢任何帮助!!!
这是您需要的示例代码。根据我的理解,您只需要映射器每次都 return json 而不是字典列表。
所以在这种情况下,当您只需要一个 json 时,您可以按如下方式编写映射器。
import apache_beam as beam
def map_as_json(item,key_col,cols_to_exclude):
row = {
item[key_col]: [
{
key: val for key,val in item.items() if key not in cols_to_exclude and key not in key_col
}
]
}
return row
with beam.Pipeline() as p:
group_stocks_by_date_name = (
p
| 'create'>>beam.Create(
[
{"Name":"Mark", "age":23, "transaction_no": "001", "price":59.99, "someflag" : True},
{"Name":"Mark", "age":23, "transaction_no": "002", "price":10.00, "someflag" : False}
]
)
| 'selective details'>> beam.Map(map_as_json,key_col='Name',cols_to_exclude=['someflag'])
| 'print'>>beam.Map(print)
)
如果有帮助,请标记为答案。
我是 python3 的 apache beam 新手,我必须用它构建特定的管道,但我不知道如何执行最后一步。
我已经转换并清理了每行 JSON 个元素,我希望它们按键分组,所有我想存储在其中的元素(其余被删除)。
例如行
{"Name":"Mark", "age":23, "transaction_no": "001", "price":59.99, "someflag" : True}
{"Name":"Mark", "age":23, "transaction_no": "002", "price":10.00, "someflag" : False}
要转换为单个 JSON 对象:
{"Mark" : [{"age":23, "transaction_no": "001", "price":59.99}, {"age":23, "transaction_no": "002", "price":10.00}
列表中的元素将只是我选择的元素(一些例如标志被删除)
在 Apache Beam 中进行此类分组的最有效方法是什么?
感谢任何帮助!!!
这是您需要的示例代码。根据我的理解,您只需要映射器每次都 return json 而不是字典列表。
所以在这种情况下,当您只需要一个 json 时,您可以按如下方式编写映射器。
import apache_beam as beam
def map_as_json(item,key_col,cols_to_exclude):
row = {
item[key_col]: [
{
key: val for key,val in item.items() if key not in cols_to_exclude and key not in key_col
}
]
}
return row
with beam.Pipeline() as p:
group_stocks_by_date_name = (
p
| 'create'>>beam.Create(
[
{"Name":"Mark", "age":23, "transaction_no": "001", "price":59.99, "someflag" : True},
{"Name":"Mark", "age":23, "transaction_no": "002", "price":10.00, "someflag" : False}
]
)
| 'selective details'>> beam.Map(map_as_json,key_col='Name',cols_to_exclude=['someflag'])
| 'print'>>beam.Map(print)
)
如果有帮助,请标记为答案。