使用 Apache Beam 在关键列位于中间时比较 2 个文本文件 - Python
Compare 2 text files when the key columns are in the middle using Apache beam - Python
需要比较2个文本文件然后载入BQ。
假设我有 emp 文本文件(enum、ename、edept、esal)包含在下面 data.the 键在文件中间(index=2)
100,abc,d10,7000
120,xyz,d20,5000
另一个文件 dept (dnum, dname) 包含以下数据。
d10,IT
d20,engineering
根据光束行为,我认为密钥应该放在第一列以便与其他文件进行比较。我能够将它带到第一列,但我无法合并。这是我的代码。
class splitrow(beam.DoFn):
def process(self, lines):
return [lines.split(',')]
class formatinput(beam.DoFn):
def process(self, emprow):
(key,value) = (emprow[2], (emprow[0], emprow[1], emprow[3]))
return [(key,value)]
class findkv(beam.DoFn):
def process(self, deptrow):
(key,value) = (deptrow)
return [(key,value)]
# pipelineOptions
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
#Dataflow pipelines
input1_emp_collection = (
p
| "ReadEMPfile" >> beam.io.ReadFromText('gs://pybeam_bucket/emp.txt')
| "SplitEMPFile" >> beam.ParDo(splitrow())
| "FormatEMPFile" >> beam.ParDo(formatinput())
| "Print1" >> beam.Map(print)
)
input2_dept_collection = (
p
| "ReadDEPTfile" >> beam.io.ReadFromText('gs://pybeam_bucket/dept.txt')
| "SplitDEPTFile" >> beam.ParDo(splitrow())
| "FormatDEPTFile" >> beam.ParDo(findkv())
| "Print2" >> beam.Map(print)
)
result = (({
'emp': input1_emp_collection, 'dept': input2_dept_collection
})
| "Merge" >> beam.CoGroupByKey()
| beam.Map(print)
)
# Run a pipeline
if __name__ == "__main__":
p.run()
当我 运行 代码出现以下错误时:
TypeError: cannot unpack non-iterable NoneType object [while running 'Merge/pair_with_emp']
当其中一个键是输入文件的中间时,帮助我比较 t 个文本文件,并在匹配记录被识别到 BQ 时加载。我也是 apache beam 的新手。
感谢您的支持。
问题出在 | "Print1" >> beam.Map(print)
和 | "Print2" >> beam.Map(print)
上,当您将此行添加到管道末尾时,您正在将 None
发送到 CoGroupByKey()
结果。你必须删除它。
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class splitrow(beam.DoFn):
def process(self, lines):
return [lines.split(',')]
class formatinput(beam.DoFn):
def process(self, emprow):
(key,value) = (emprow[2], (emprow[0], emprow[1], emprow[3]))
return [(key,value)]
class findkv(beam.DoFn):
def process(self, deptrow):
(key,value) = (deptrow)
return [(key,value)]
# pipelineOptions
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
#Dataflow pipelines
input1_emp_collection = (
p
| "ReadEMPfile" >> beam.io.ReadFromText('gs://<bucket>/emp.txt')
| "SplitEMPFile" >> beam.ParDo(splitrow())
| "FormatEMPFile" >> beam.ParDo(formatinput())
)
input2_dept_collection = (
p
| "ReadDEPTfile" >> beam.io.ReadFromText('gs://<bucket>/dept.txt')
| "SplitDEPTFile" >> beam.ParDo(splitrow())
| "FormatDEPTFile" >> beam.ParDo(findkv())
)
#Debug lines that shows the nones
#result = input1_emp_collection | beam.Map(print)
result = (({'emp': input1_emp_collection, 'dept': input2_dept_collection})
| "Merge" >> beam.CoGroupByKey()
| beam.Map(print)
)
# Run a pipeline
if __name__ == "__main__":
p.run()
结果:
python teste2.py
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
('d10', {'emp': [('100', 'abc', '7000')], 'dept': ['IT']})
('d20', {'emp': [('120', 'xyz', '5000')], 'dept': ['engineering']})
需要比较2个文本文件然后载入BQ。 假设我有 emp 文本文件(enum、ename、edept、esal)包含在下面 data.the 键在文件中间(index=2)
100,abc,d10,7000
120,xyz,d20,5000
另一个文件 dept (dnum, dname) 包含以下数据。
d10,IT
d20,engineering
根据光束行为,我认为密钥应该放在第一列以便与其他文件进行比较。我能够将它带到第一列,但我无法合并。这是我的代码。
class splitrow(beam.DoFn):
def process(self, lines):
return [lines.split(',')]
class formatinput(beam.DoFn):
def process(self, emprow):
(key,value) = (emprow[2], (emprow[0], emprow[1], emprow[3]))
return [(key,value)]
class findkv(beam.DoFn):
def process(self, deptrow):
(key,value) = (deptrow)
return [(key,value)]
# pipelineOptions
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
#Dataflow pipelines
input1_emp_collection = (
p
| "ReadEMPfile" >> beam.io.ReadFromText('gs://pybeam_bucket/emp.txt')
| "SplitEMPFile" >> beam.ParDo(splitrow())
| "FormatEMPFile" >> beam.ParDo(formatinput())
| "Print1" >> beam.Map(print)
)
input2_dept_collection = (
p
| "ReadDEPTfile" >> beam.io.ReadFromText('gs://pybeam_bucket/dept.txt')
| "SplitDEPTFile" >> beam.ParDo(splitrow())
| "FormatDEPTFile" >> beam.ParDo(findkv())
| "Print2" >> beam.Map(print)
)
result = (({
'emp': input1_emp_collection, 'dept': input2_dept_collection
})
| "Merge" >> beam.CoGroupByKey()
| beam.Map(print)
)
# Run a pipeline
if __name__ == "__main__":
p.run()
当我 运行 代码出现以下错误时:
TypeError: cannot unpack non-iterable NoneType object [while running 'Merge/pair_with_emp']
当其中一个键是输入文件的中间时,帮助我比较 t 个文本文件,并在匹配记录被识别到 BQ 时加载。我也是 apache beam 的新手。 感谢您的支持。
问题出在 | "Print1" >> beam.Map(print)
和 | "Print2" >> beam.Map(print)
上,当您将此行添加到管道末尾时,您正在将 None
发送到 CoGroupByKey()
结果。你必须删除它。
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class splitrow(beam.DoFn):
def process(self, lines):
return [lines.split(',')]
class formatinput(beam.DoFn):
def process(self, emprow):
(key,value) = (emprow[2], (emprow[0], emprow[1], emprow[3]))
return [(key,value)]
class findkv(beam.DoFn):
def process(self, deptrow):
(key,value) = (deptrow)
return [(key,value)]
# pipelineOptions
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
#Dataflow pipelines
input1_emp_collection = (
p
| "ReadEMPfile" >> beam.io.ReadFromText('gs://<bucket>/emp.txt')
| "SplitEMPFile" >> beam.ParDo(splitrow())
| "FormatEMPFile" >> beam.ParDo(formatinput())
)
input2_dept_collection = (
p
| "ReadDEPTfile" >> beam.io.ReadFromText('gs://<bucket>/dept.txt')
| "SplitDEPTFile" >> beam.ParDo(splitrow())
| "FormatDEPTFile" >> beam.ParDo(findkv())
)
#Debug lines that shows the nones
#result = input1_emp_collection | beam.Map(print)
result = (({'emp': input1_emp_collection, 'dept': input2_dept_collection})
| "Merge" >> beam.CoGroupByKey()
| beam.Map(print)
)
# Run a pipeline
if __name__ == "__main__":
p.run()
结果:
python teste2.py
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
('d10', {'emp': [('100', 'abc', '7000')], 'dept': ['IT']})
('d20', {'emp': [('120', 'xyz', '5000')], 'dept': ['engineering']})