Apache Beam:如何在使用重复数据删除功能时解决 "ParDo requires a deterministic key coder in order to use state and timers"
Apache Beam: How to solve "ParDo requires a deterministic key coder in order to use state and timers" while using Deduplication function
我正在尝试使用 Apache Beam 的重复数据删除功能对来自 Google Cloud Pubsub 的输入消息进行重复数据删除。但是,在创建 KV<String, MyModel>
对并将其传递给 Deduplicate
转换后,我 运行 出错了。
错误:
ParDo requires a deterministic key coder in order to use state and timers
代码:
PCollection<KV<String, MyModel>> deduplicatedEvents =
messages
.apply(
"CreateKVPairs",
ParDo.of(
new DoFn<MyModel, KV<String, MyModel>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(c.element().getUniqueKey(),c.element()));
}
}))
.apply(
"Deduplicate",
Deduplicate.<KV<String, MyModel>>values());
我应该如何创建可以 encode/decode 字符串作为键的确定性编码器,以使其工作?
任何意见都会很有帮助。
Deduplicate
转换的工作原理是将整个元素放入键中,然后执行键分组操作(在本例中为有状态 ParDo)。因为 Beam 是 language-independent,按键分组是使用元素的编码形式完成的。编码为相同字节的两个元素“相等”,而编码为不同字节的两个元素“不相等”。
确定性编码器 是关于语言中的相等性(如 Java)如何与 Beam 相等性相关的概念。这意味着如果两个 Java 对象根据 Java equals()
相等,那么它们必须具有相同的编码字节。对于字符串、数字、数组等简单数据,这很容易。思考是什么让编码器 non-deterministic 是有帮助的。例如,当对两个 Map
实例进行编码时,它们可能在 Java 级别是 equals()
,但是 key-value 对以不同的顺序编码,这使得它们对于 Beam 是不相等的。
如果您有一个用于 MyModel
的不确定编码器,那么 Deduplicate
将无法正常工作,您最终会得到重复项,因为 Beam 认为不同编码的对象是不相等的。
自动获得高质量确定性编码器的最简单方法可能是利用 Beam 的模式推理:https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types。您将需要确保所有字段也可以确定地编码。
我正在尝试使用 Apache Beam 的重复数据删除功能对来自 Google Cloud Pubsub 的输入消息进行重复数据删除。但是,在创建 KV<String, MyModel>
对并将其传递给 Deduplicate
转换后,我 运行 出错了。
错误:
ParDo requires a deterministic key coder in order to use state and timers
代码:
PCollection<KV<String, MyModel>> deduplicatedEvents =
messages
.apply(
"CreateKVPairs",
ParDo.of(
new DoFn<MyModel, KV<String, MyModel>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(c.element().getUniqueKey(),c.element()));
}
}))
.apply(
"Deduplicate",
Deduplicate.<KV<String, MyModel>>values());
我应该如何创建可以 encode/decode 字符串作为键的确定性编码器,以使其工作?
任何意见都会很有帮助。
Deduplicate
转换的工作原理是将整个元素放入键中,然后执行键分组操作(在本例中为有状态 ParDo)。因为 Beam 是 language-independent,按键分组是使用元素的编码形式完成的。编码为相同字节的两个元素“相等”,而编码为不同字节的两个元素“不相等”。
确定性编码器 是关于语言中的相等性(如 Java)如何与 Beam 相等性相关的概念。这意味着如果两个 Java 对象根据 Java equals()
相等,那么它们必须具有相同的编码字节。对于字符串、数字、数组等简单数据,这很容易。思考是什么让编码器 non-deterministic 是有帮助的。例如,当对两个 Map
实例进行编码时,它们可能在 Java 级别是 equals()
,但是 key-value 对以不同的顺序编码,这使得它们对于 Beam 是不相等的。
如果您有一个用于 MyModel
的不确定编码器,那么 Deduplicate
将无法正常工作,您最终会得到重复项,因为 Beam 认为不同编码的对象是不相等的。
自动获得高质量确定性编码器的最简单方法可能是利用 Beam 的模式推理:https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types。您将需要确保所有字段也可以确定地编码。