为 apache_beam Python SDK 版本 > 2.24 实施自定义编码器
Implement a custom coder for apache_beam Python SDK version > 2.24
我一直在为 python 使用 apache_beam sdk 处理我的数据工程。我用的是2.24版本。我在将 apache_beam 版本升级到 2.31 时创建的自定义编码器 class 有一些问题。自定义编码器 class 名称是 IgnoreUnicode。
所以,这是我的管道代码:
branchessap_data = (p | 'ReadData branchessap' >> beam.io.ReadFromText(branchessap, skip_header_lines =1, coder=IgnoreUnicode())
| 'SplitData branchessap' >> beam.Map(lambda x: x.split('|'))
| 'FormatToDict branchessap' >> beam.Map(lambda x: {"branch_id": x[0], "branch_sap": x[1], "branch_name": x[2], "branch_profile": x[3]})
| 'ChangeDataType branchessap' >> beam.Map(convert_types_branchessap)
| 'DELETE UNWANTED DATA BRANCHESSAP' >> beam.Map(del_unwanted_cols_branchessap)
)
这是 IgnoreUnicode class 我用来覆盖来自 apache_beam:
的默认编码器
# CLASS CHANGE FRENCH CHARACTERS
class IgnoreUnicode(Coder):
def encode(self, value):
return value.encode('utf-8','ignore')
def decode(self, value):
return value.decode('utf-8','ignore')
def is_deterministic(self):
return True
这些代码适用于 apache_beam 2.24 版。但是,如果我将它升级到 2.24 以上的版本,它会给我这样的错误(在这种情况下我使用的是 2.31 版):
对于如何在 2.24 以上的版本中实现自定义编码器,是否有任何替代解决方案?
看起来这是重组源代码的方式和在 __main__
中定义 PCoder 的不幸组合。我建议使用以下两种解决方法之一:
(1) 将 IgnoreUnicode
的定义移动到导入的正确模块而不是 __main__
,或
(2) 使用 BytesCoder 读取文件,然后使用
`beam.Map(lambda line: line.decode('utf-8','ignore'))`.
(就个人而言,我更喜欢后者,因为最好不要让编码人员改变数据。)
我一直在为 python 使用 apache_beam sdk 处理我的数据工程。我用的是2.24版本。我在将 apache_beam 版本升级到 2.31 时创建的自定义编码器 class 有一些问题。自定义编码器 class 名称是 IgnoreUnicode。 所以,这是我的管道代码:
branchessap_data = (p | 'ReadData branchessap' >> beam.io.ReadFromText(branchessap, skip_header_lines =1, coder=IgnoreUnicode())
| 'SplitData branchessap' >> beam.Map(lambda x: x.split('|'))
| 'FormatToDict branchessap' >> beam.Map(lambda x: {"branch_id": x[0], "branch_sap": x[1], "branch_name": x[2], "branch_profile": x[3]})
| 'ChangeDataType branchessap' >> beam.Map(convert_types_branchessap)
| 'DELETE UNWANTED DATA BRANCHESSAP' >> beam.Map(del_unwanted_cols_branchessap)
)
这是 IgnoreUnicode class 我用来覆盖来自 apache_beam:
的默认编码器# CLASS CHANGE FRENCH CHARACTERS
class IgnoreUnicode(Coder):
def encode(self, value):
return value.encode('utf-8','ignore')
def decode(self, value):
return value.decode('utf-8','ignore')
def is_deterministic(self):
return True
这些代码适用于 apache_beam 2.24 版。但是,如果我将它升级到 2.24 以上的版本,它会给我这样的错误(在这种情况下我使用的是 2.31 版):
对于如何在 2.24 以上的版本中实现自定义编码器,是否有任何替代解决方案?
看起来这是重组源代码的方式和在 __main__
中定义 PCoder 的不幸组合。我建议使用以下两种解决方法之一:
(1) 将 IgnoreUnicode
的定义移动到导入的正确模块而不是 __main__
,或
(2) 使用 BytesCoder 读取文件,然后使用
`beam.Map(lambda line: line.decode('utf-8','ignore'))`.
(就个人而言,我更喜欢后者,因为最好不要让编码人员改变数据。)