Apache Beam Python:使用 ParDo class 返回条件语句
Apache Beam Python: returning conditional statement using ParDo class
我想检查一下,我们在 apache beam 管道中读取的 CSV 文件是否满足我期望的格式[例如:字段检查、类型检查、空值检查等] , 在执行任何转换之前。
在管道外对每个文件执行这些检查将消除并行性的概念,所以我只想知道是否可以在管道内执行它。
代码的示例:
import apache_beam as beam
branched=beam.Pipeline()
class f_c(beam.DoFn):
def process(self, element):
if element == feld:
return True
else:
return False
input_collection = (
branched
| 'Read from text file' >> beam.io.ReadFromText("gs://***.csv")
| 'Split rows' >> beam.Map(lambda line: line.split(',')))
field_check=(input_collection
| 'field function returning True or False' >> beam.ParDo(f_c())
| beam.io.WriteToText('gs://***/op'))
branched.run().wait_unitl_finish
我只用两条规则复制了一个小问题实例。
如果第 3 列的长度大于 10 且第 4 列的值大于 500,那么该记录是好的,否则就是坏的记录。
from apache_beam.io.textio import WriteToText
import apache_beam as beam
class FilterFn(beam.DoFn):
"""
The rules I have assumed is to just perform a length check on the third column and
value check on forth column
if(length of 3th column) >10 and (value of 4th column) is >500 then that record is good
"""
def process(self, text):
#------------------ -----isGoodRow BEGINS-------------------------------
def isGoodRow(a_list):
if( (len(a_list[2]) > 10) and (int(a_list[3]) >100) ):
return True
else:
return False
#------------------- ----isGoodRow ENDS-------------------------------
a_list = []
a_list = text.split(",") # this list contains all the column for a periticuar i/p Line
bool_result = isGoodRow(a_list)
if(bool_result == True):
yield beam.TaggedOutput('good', text)
else:
yield beam.TaggedOutput('bad', text)
with beam.Pipeline() as pipeline:
split_me = (
pipeline
| 'Read from text file' >> beam.io.ReadFromText("/content/sample.txt")
| 'Split words and Remove N/A' >> beam.ParDo(FilterFn()).with_outputs('good','bad')
)
good_collection = (
split_me.good
|"write good o/p" >> beam.io.WriteToText(file_path_prefix='/content/good',file_name_suffix='.txt')
)
bad_collection = (
split_me.bad
|"write bad o/p" >> beam.io.WriteToText(file_path_prefix='/content/bad',file_name_suffix='.txt')
)
我写了一个 ParDo FilterFn
标签输出基于 isGoodRow
函数。您可以根据您的要求重写逻辑。空检查,数据验证等。
另一种选择是对这个用例使用 Partition Function。
我在这个例子中使用的sample.txt:
1,Foo,FooBarFooBarFooBar,1000
2,Foo,Bar,10
3,Foo,FooBarFooBarFooBar,900
4,Foo,FooBar,800
good.txt
的输出
1,Foo,FooBarFooBarFooBar,1000
3,Foo,FooBarFooBarFooBar,900
bad.txt
的输出
2,Foo,Bar,10
4,Foo,FooBar,800
我想检查一下,我们在 apache beam 管道中读取的 CSV 文件是否满足我期望的格式[例如:字段检查、类型检查、空值检查等] , 在执行任何转换之前。
在管道外对每个文件执行这些检查将消除并行性的概念,所以我只想知道是否可以在管道内执行它。
代码的示例:
import apache_beam as beam
branched=beam.Pipeline()
class f_c(beam.DoFn):
def process(self, element):
if element == feld:
return True
else:
return False
input_collection = (
branched
| 'Read from text file' >> beam.io.ReadFromText("gs://***.csv")
| 'Split rows' >> beam.Map(lambda line: line.split(',')))
field_check=(input_collection
| 'field function returning True or False' >> beam.ParDo(f_c())
| beam.io.WriteToText('gs://***/op'))
branched.run().wait_unitl_finish
我只用两条规则复制了一个小问题实例。 如果第 3 列的长度大于 10 且第 4 列的值大于 500,那么该记录是好的,否则就是坏的记录。
from apache_beam.io.textio import WriteToText
import apache_beam as beam
class FilterFn(beam.DoFn):
"""
The rules I have assumed is to just perform a length check on the third column and
value check on forth column
if(length of 3th column) >10 and (value of 4th column) is >500 then that record is good
"""
def process(self, text):
#------------------ -----isGoodRow BEGINS-------------------------------
def isGoodRow(a_list):
if( (len(a_list[2]) > 10) and (int(a_list[3]) >100) ):
return True
else:
return False
#------------------- ----isGoodRow ENDS-------------------------------
a_list = []
a_list = text.split(",") # this list contains all the column for a periticuar i/p Line
bool_result = isGoodRow(a_list)
if(bool_result == True):
yield beam.TaggedOutput('good', text)
else:
yield beam.TaggedOutput('bad', text)
with beam.Pipeline() as pipeline:
split_me = (
pipeline
| 'Read from text file' >> beam.io.ReadFromText("/content/sample.txt")
| 'Split words and Remove N/A' >> beam.ParDo(FilterFn()).with_outputs('good','bad')
)
good_collection = (
split_me.good
|"write good o/p" >> beam.io.WriteToText(file_path_prefix='/content/good',file_name_suffix='.txt')
)
bad_collection = (
split_me.bad
|"write bad o/p" >> beam.io.WriteToText(file_path_prefix='/content/bad',file_name_suffix='.txt')
)
我写了一个 ParDo FilterFn
标签输出基于 isGoodRow
函数。您可以根据您的要求重写逻辑。空检查,数据验证等。
另一种选择是对这个用例使用 Partition Function。
我在这个例子中使用的sample.txt:
1,Foo,FooBarFooBarFooBar,1000
2,Foo,Bar,10
3,Foo,FooBarFooBarFooBar,900
4,Foo,FooBar,800
good.txt
的输出1,Foo,FooBarFooBarFooBar,1000
3,Foo,FooBarFooBarFooBar,900
bad.txt
的输出2,Foo,Bar,10
4,Foo,FooBar,800