Apache Beam:如何在使用重复数据删除功能时解决 "ParDo requires a deterministic key coder in order to use state and timers"

Apache Beam: How to solve "ParDo requires a deterministic key coder in order to use state and timers" while using Deduplication function

我正在尝试使用 Apache Beam 的重复数据删除功能对来自 Google Cloud Pubsub 的输入消息进行重复数据删除。但是,在创建 KV<String, MyModel> 对并将其传递给 Deduplicate 转换后,我 运行 出错了。

错误:

ParDo requires a deterministic key coder in order to use state and timers

代码:

PCollection<KV<String, MyModel>> deduplicatedEvents =
    messages
        .apply(
            "CreateKVPairs",
            ParDo.of(
                new DoFn<MyModel, KV<String, MyModel>>() {
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    c.output(KV.of(c.element().getUniqueKey(),c.element()));
                  }
                }))
        .apply(
            "Deduplicate",
            Deduplicate.<KV<String, MyModel>>values());

我应该如何创建可以 encode/decode 字符串作为键的确定性编码器,以使其工作?

任何意见都会很有帮助。

Deduplicate 转换的工作原理是将整个元素放入键中,然后执行键分组操作(在本例中为有状态 ParDo)。因为 Beam 是 language-independent,按键分组是使用元素的编码形式完成的。编码为相同字节的两个元素“相等”,而编码为不同字节的两个元素“不相等”。

确定性编码器 是关于语言中的相等性(如 Java)如何与 Beam 相等性相关的概念。这意味着如果两个 Java 对象根据 Java equals() 相等,那么它们必须具有相同的编码字节。对于字符串、数字、数组等简单数据,这很容易。思考是什么让编码器 non-deterministic 是有帮助的。例如,当对两个 Map 实例进行编码时,它们可能在 Java 级别是 equals(),但是 key-value 对以不同的顺序编码,这使得它们对于 Beam 是不相等的。

如果您有一个用于 MyModel 的不确定编码器,那么 Deduplicate 将无法正常工作,您最终会得到重复项,因为 Beam 认为不同编码的对象是不相等的。

自动获得高质量确定性编码器的最简单方法可能是利用 Beam 的模式推理:https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types。您将需要确保所有字段也可以确定地编码。