Apache Beam Python:使用 ParDo class 返回条件语句

Apache Beam Python: returning conditional statement using ParDo class

我想检查一下,我们在 apache beam 管道中读取的 CSV 文件是否满足我期望的格式[例如:字段检查、类型检查、空值检查等] , 在执行任何转换之前。

在管道外对每个文件执行这些检查将消除并行性的概念,所以我只想知道是否可以在管道内执行它。

代码的示例:

import apache_beam as beam

branched=beam.Pipeline()

class f_c(beam.DoFn):
    def process(self, element):
        if element == feld:
            return True
        else:
            return False

input_collection = ( 
branched 
    | 'Read from text file' >> beam.io.ReadFromText("gs://***.csv")
    | 'Split rows' >> beam.Map(lambda line: line.split(',')))

field_check=(input_collection
    | 'field function returning True or False' >> beam.ParDo(f_c())
    | beam.io.WriteToText('gs://***/op'))

branched.run().wait_unitl_finish

我只用两条规则复制了一个小问题实例。 如果第 3 列的长度大于 10 且第 4 列的值大于 500,那么该记录是好的,否则就是坏的记录。

    from apache_beam.io.textio import WriteToText
    import apache_beam as beam
    
    class FilterFn(beam.DoFn):
      """
      The rules I have assumed is to just perform a length check on the third column and 
      value check on forth column
      if(length of 3th column) >10 and (value of 4th column) is >500 then that record is good
      """
      def process(self, text):
    #------------------ -----isGoodRow BEGINS-------------------------------
        def isGoodRow(a_list):
          if( (len(a_list[2]) > 10) and (int(a_list[3]) >100) ):
            return True
          else:
            return False
    #------------------- ----isGoodRow ENDS-------------------------------
        a_list = []
        a_list = text.split(",") # this list contains all the column for a periticuar i/p Line
        bool_result = isGoodRow(a_list)
        if(bool_result == True):
          yield beam.TaggedOutput('good', text)
        else:
          yield beam.TaggedOutput('bad', text)
    
    with beam.Pipeline() as pipeline:
      split_me = (
          pipeline
          | 'Read from text file' >> beam.io.ReadFromText("/content/sample.txt")
          | 'Split words and Remove N/A' >> beam.ParDo(FilterFn()).with_outputs('good','bad')
          )
      good_collection = (
          split_me.good 
          |"write good o/p" >> beam.io.WriteToText(file_path_prefix='/content/good',file_name_suffix='.txt')
          ) 
      bad_collection = (
          split_me.bad 
          |"write bad o/p" >> beam.io.WriteToText(file_path_prefix='/content/bad',file_name_suffix='.txt')
          )

我写了一个 ParDo FilterFn 标签输出基于 isGoodRow 函数。您可以根据您的要求重写逻辑。空检查,数据验证等。 另一种选择是对这个用例使用 Partition Function

我在这个例子中使用的sample.txt:

1,Foo,FooBarFooBarFooBar,1000
2,Foo,Bar,10
3,Foo,FooBarFooBarFooBar,900
4,Foo,FooBar,800

good.txt

的输出
1,Foo,FooBarFooBarFooBar,1000
3,Foo,FooBarFooBarFooBar,900

bad.txt

的输出
2,Foo,Bar,10
4,Foo,FooBar,800