使用 Apache Beam 在关键列位于中间时比较 2 个文本文件 - Python

Compare 2 text files when the key columns are in the middle using Apache beam - Python

需要比较2个文本文件然后载入BQ。 假设我有 emp 文本文件(enum、ename、edept、esal)包含在下面 data.the 键在文件中间(index=2)

100,abc,d10,7000
120,xyz,d20,5000

另一个文件 dept (dnum, dname) 包含以下数据。

d10,IT
d20,engineering

根据光束行为,我认为密钥应该放在第一列以便与其他文件进行比较。我能够将它带到第一列,但我无法合并。这是我的代码。

class splitrow(beam.DoFn):
    def process(self, lines):
        return [lines.split(',')]

class formatinput(beam.DoFn):
    def process(self, emprow):
        (key,value) = (emprow[2], (emprow[0], emprow[1], emprow[3]))
        return [(key,value)]

class findkv(beam.DoFn):
    def process(self, deptrow):
        (key,value) = (deptrow)
        return [(key,value)]

# pipelineOptions
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)

#Dataflow pipelines
input1_emp_collection = (
    p
    | "ReadEMPfile" >> beam.io.ReadFromText('gs://pybeam_bucket/emp.txt')
    | "SplitEMPFile" >> beam.ParDo(splitrow())
    | "FormatEMPFile" >> beam.ParDo(formatinput())
    | "Print1" >> beam.Map(print)                
)
input2_dept_collection = (
    p
    | "ReadDEPTfile" >> beam.io.ReadFromText('gs://pybeam_bucket/dept.txt')
    | "SplitDEPTFile" >> beam.ParDo(splitrow())
    | "FormatDEPTFile" >> beam.ParDo(findkv())
    | "Print2" >> beam.Map(print)                
)

result = (({
      'emp': input1_emp_collection, 'dept': input2_dept_collection
  })
  | "Merge" >> beam.CoGroupByKey()
  | beam.Map(print)
  )

# Run a pipeline
if __name__ == "__main__":
    p.run()

当我 运行 代码出现以下错误时:

TypeError: cannot unpack non-iterable NoneType object [while running 'Merge/pair_with_emp']

当其中一个键是输入文件的中间时,帮助我比较 t 个文本文件,并在匹配记录被识别到 BQ 时加载。我也是 apache beam 的新手。 感谢您的支持。

问题出在 | "Print1" >> beam.Map(print)| "Print2" >> beam.Map(print) 上,当您将此行添加到管道末尾时,您正在将 None 发送到 CoGroupByKey()结果。你必须删除它。

import apache_beam as beam 
from apache_beam.options.pipeline_options import PipelineOptions


class splitrow(beam.DoFn):
    def process(self, lines):
        return [lines.split(',')]

class formatinput(beam.DoFn):
    def process(self, emprow):
        (key,value) = (emprow[2], (emprow[0], emprow[1], emprow[3]))
        return [(key,value)]

class findkv(beam.DoFn):
    def process(self, deptrow):
        (key,value) = (deptrow)
        return [(key,value)]


# pipelineOptions
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)

#Dataflow pipelines
input1_emp_collection = (
    p
    | "ReadEMPfile" >> beam.io.ReadFromText('gs://<bucket>/emp.txt')
    | "SplitEMPFile" >> beam.ParDo(splitrow())
    | "FormatEMPFile" >> beam.ParDo(formatinput())       
)
input2_dept_collection = (
    p
    | "ReadDEPTfile" >> beam.io.ReadFromText('gs://<bucket>/dept.txt')
    | "SplitDEPTFile" >> beam.ParDo(splitrow())
    | "FormatDEPTFile" >> beam.ParDo(findkv())                
)

#Debug lines that shows the nones
#result = input1_emp_collection | beam.Map(print)

result = (({'emp': input1_emp_collection, 'dept': input2_dept_collection})
    | "Merge" >> beam.CoGroupByKey()
    | beam.Map(print)
)


# Run a pipeline
if __name__ == "__main__":
    p.run()

结果:

python teste2.py 
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
('d10', {'emp': [('100', 'abc', '7000')], 'dept': ['IT']})
('d20', {'emp': [('120', 'xyz', '5000')], 'dept': ['engineering']})