python 列表到数据流的字典
python list to dictionary for dataflow
我正在尝试将 JSON 文件转换成字典并应用 key/value 对,这样我就可以使用 groupbykey() 基本上删除 key/value 对。
这是文件的原始内容:
{"tax_pd":"200003","ein":"720378282"}
{"tax_pd":"200012","ein":"274027765"}
{"tax_pd":"200012","ein":"042746989"}
{"tax_pd":"200012","ein":"205993971"}
我是这样格式化的:
(u'201208', u'010620100')
(u'201208', u'860785769')
(u'201208', u'371650138')
(u'201208', u'237253410')
我想将它们变成 key/value 对,这样我就可以在我的数据流管道中应用 GroupByKey。我想我需要先把它变成字典吗?
我是 python 和 google 云应用程序的新手,一些帮助会很棒!
编辑:代码片段
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'ReadInputText' >> beam.io.ReadFromText(known_args.input)
| 'YieldWords' >> beam.ParDo(ExtractWordsFn())
# | 'GroupByKey' >> beam.GroupByKey()
| 'WriteInputText' >> beam.io.WriteToText(known_args.output))
class ExtractWordsFn(beam.DoFn):
def process(self, element):
words = re.findall(r'[0-9]+', element)
yield tuple(words)
一个快速的纯Python解决方案是:
import json
with open('path/to/my/file.json','rb') as fh:
lines = [json.loads(l) for l in fh.readlines()]
# [{'tax_pd': '200003', 'ein': '720378282'}, {'tax_pd': '200012', 'ein': '274027765'}, {'tax_pd': '200012', 'ein': '042746989'}, {'tax_pd': '200012', 'ein': '205993971'}]
查看您的数据,您没有通过 tax_pd
和 ein
执行 key:value 的唯一键。假设会发生碰撞,您可以执行以下操作:
myresults = {}
for line in lines:
# I'm assuming we want to use tax_pd as the key, and ein as the value, but this can be extended to other keys
# This will return None if the tax_pd is not already found
if not myresults.get(line.get('tax_pd')):
myresults[line.get('tax_pd')] = [line.get('ein')]
else:
myresults[line.get('tax_pd')] = list(set([line.get('ein'), *myresults[line.get('tax_pd')]))
#results
#{'200003': ['720378282'], '200012': ['205993971', '042746989', '274027765']}
这样你就有了唯一的键,以及对应的唯一 ein
值的列表。不完全确定这是否是您想要的。 set
会自动去重列表,wrapping list
重新转换数据类型
然后您可以通过 tax_id
显式查找:
myresults.get('200012')
# ['205993971', '042746989', '274027765']
编辑:要从云存储中读取,代码片段 here 翻译后更易于使用:
with gcs.open(filename) as fh:
lines = fh.read().split('\n')
您可以使用他们的 api 文档
设置您的 gcs 对象
我正在尝试将 JSON 文件转换成字典并应用 key/value 对,这样我就可以使用 groupbykey() 基本上删除 key/value 对。
这是文件的原始内容:
{"tax_pd":"200003","ein":"720378282"}
{"tax_pd":"200012","ein":"274027765"}
{"tax_pd":"200012","ein":"042746989"}
{"tax_pd":"200012","ein":"205993971"}
我是这样格式化的:
(u'201208', u'010620100')
(u'201208', u'860785769')
(u'201208', u'371650138')
(u'201208', u'237253410')
我想将它们变成 key/value 对,这样我就可以在我的数据流管道中应用 GroupByKey。我想我需要先把它变成字典吗?
我是 python 和 google 云应用程序的新手,一些帮助会很棒!
编辑:代码片段
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'ReadInputText' >> beam.io.ReadFromText(known_args.input)
| 'YieldWords' >> beam.ParDo(ExtractWordsFn())
# | 'GroupByKey' >> beam.GroupByKey()
| 'WriteInputText' >> beam.io.WriteToText(known_args.output))
class ExtractWordsFn(beam.DoFn):
def process(self, element):
words = re.findall(r'[0-9]+', element)
yield tuple(words)
一个快速的纯Python解决方案是:
import json
with open('path/to/my/file.json','rb') as fh:
lines = [json.loads(l) for l in fh.readlines()]
# [{'tax_pd': '200003', 'ein': '720378282'}, {'tax_pd': '200012', 'ein': '274027765'}, {'tax_pd': '200012', 'ein': '042746989'}, {'tax_pd': '200012', 'ein': '205993971'}]
查看您的数据,您没有通过 tax_pd
和 ein
执行 key:value 的唯一键。假设会发生碰撞,您可以执行以下操作:
myresults = {}
for line in lines:
# I'm assuming we want to use tax_pd as the key, and ein as the value, but this can be extended to other keys
# This will return None if the tax_pd is not already found
if not myresults.get(line.get('tax_pd')):
myresults[line.get('tax_pd')] = [line.get('ein')]
else:
myresults[line.get('tax_pd')] = list(set([line.get('ein'), *myresults[line.get('tax_pd')]))
#results
#{'200003': ['720378282'], '200012': ['205993971', '042746989', '274027765']}
这样你就有了唯一的键,以及对应的唯一 ein
值的列表。不完全确定这是否是您想要的。 set
会自动去重列表,wrapping list
重新转换数据类型
然后您可以通过 tax_id
显式查找:
myresults.get('200012')
# ['205993971', '042746989', '274027765']
编辑:要从云存储中读取,代码片段 here 翻译后更易于使用:
with gcs.open(filename) as fh:
lines = fh.read().split('\n')
您可以使用他们的 api 文档
设置您的 gcs 对象